Skip to main content

uni_store/storage/
manager.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::compaction::{CompactionStats, CompactionStatus, CompactionTask};
5use crate::lancedb::LanceDbStore;
6use crate::runtime::WorkingGraph;
7use crate::runtime::context::QueryContext;
8use crate::runtime::l0::L0Buffer;
9use crate::storage::adjacency::AdjacencyDataset;
10use crate::storage::compaction::Compactor;
11use crate::storage::delta::{DeltaDataset, ENTRY_SIZE_ESTIMATE, Op};
12use crate::storage::direction::Direction;
13use crate::storage::edge::EdgeDataset;
14use crate::storage::index::UidIndex;
15use crate::storage::inverted_index::InvertedIndex;
16use crate::storage::main_edge::MainEdgeDataset;
17use crate::storage::main_vertex::MainVertexDataset;
18use crate::storage::vertex::VertexDataset;
19use anyhow::{Result, anyhow};
20use arrow_array::{Array, Float32Array, TimestampNanosecondArray, UInt64Array};
21use dashmap::DashMap;
22use futures::TryStreamExt;
23use lancedb::query::{ExecutableQuery, QueryBase, Select};
24use object_store::ObjectStore;
25use object_store::local::LocalFileSystem;
26use parking_lot::RwLock;
27use std::collections::{HashMap, HashSet};
28use std::sync::{Arc, Mutex};
29use std::time::{Duration, SystemTime, UNIX_EPOCH};
30use tracing::warn;
31use uni_common::config::UniConfig;
32use uni_common::core::id::{Eid, UniId, Vid};
33use uni_common::core::schema::{DistanceMetric, IndexDefinition, SchemaManager};
34use uni_common::sync::acquire_mutex;
35
36use crate::snapshot::manager::SnapshotManager;
37use crate::storage::IndexManager;
38use crate::storage::adjacency_manager::AdjacencyManager;
39use crate::storage::resilient_store::ResilientObjectStore;
40
41use uni_common::core::snapshot::SnapshotManifest;
42
43use uni_common::graph::simple_graph::Direction as GraphDirection;
44
45/// Edge state during subgraph loading - tracks version and deletion status.
46struct EdgeState {
47    neighbor: Vid,
48    version: u64,
49    deleted: bool,
50}
51
52pub struct StorageManager {
53    base_uri: String,
54    store: Arc<dyn ObjectStore>,
55    schema_manager: Arc<SchemaManager>,
56    snapshot_manager: Arc<SnapshotManager>,
57    adjacency_manager: Arc<AdjacencyManager>,
58    /// Cache of opened LanceDB tables by label name for vector search performance
59    table_cache: DashMap<String, lancedb::Table>,
60    pub config: UniConfig,
61    pub compaction_status: Arc<Mutex<CompactionStatus>>,
62    /// Optional pinned snapshot for time-travel
63    pinned_snapshot: Option<SnapshotManifest>,
64    /// LanceDB store for DataFusion-powered queries.
65    lancedb_store: Arc<LanceDbStore>,
66    /// In-memory VID-to-labels index for O(1) lookups (optional, configurable)
67    vid_labels_index: Option<Arc<parking_lot::RwLock<crate::storage::vid_labels::VidLabelsIndex>>>,
68}
69
70/// Helper to manage compaction_in_progress flag
71struct CompactionGuard {
72    status: Arc<Mutex<CompactionStatus>>,
73}
74
75impl CompactionGuard {
76    fn new(status: Arc<Mutex<CompactionStatus>>) -> Option<Self> {
77        let mut s = acquire_mutex(&status, "compaction_status").ok()?;
78        if s.compaction_in_progress {
79            return None;
80        }
81        s.compaction_in_progress = true;
82        Some(Self {
83            status: status.clone(),
84        })
85    }
86}
87
88impl Drop for CompactionGuard {
89    fn drop(&mut self) {
90        // CRITICAL: Never panic in Drop - panicking in drop() = process ABORT.
91        // See issue #18/#150. If the lock is poisoned, log and continue gracefully.
92        match uni_common::sync::acquire_mutex(&self.status, "compaction_status") {
93            Ok(mut s) => {
94                s.compaction_in_progress = false;
95                s.last_compaction = Some(std::time::SystemTime::now());
96            }
97            Err(e) => {
98                // Lock is poisoned but we're in Drop - cannot panic.
99                // Log the error and continue. System state may be inconsistent but at least
100                // we don't abort the process.
101                log::error!(
102                    "CompactionGuard drop failed to acquire poisoned lock: {}. \
103                     Compaction status may be inconsistent. Issue #18/#150",
104                    e
105                );
106            }
107        }
108    }
109}
110
111impl StorageManager {
112    /// Create a new StorageManager with LanceDB integration.
113    pub async fn new(base_uri: &str, schema_manager: Arc<SchemaManager>) -> Result<Self> {
114        Self::new_with_config(base_uri, schema_manager, UniConfig::default()).await
115    }
116
117    /// Create a new StorageManager with custom cache size.
118    pub async fn new_with_cache(
119        base_uri: &str,
120        schema_manager: Arc<SchemaManager>,
121        adjacency_cache_size: usize,
122    ) -> Result<Self> {
123        let config = UniConfig {
124            cache_size: adjacency_cache_size,
125            ..Default::default()
126        };
127        Self::new_with_config(base_uri, schema_manager, config).await
128    }
129
130    /// Create a new StorageManager with custom configuration.
131    pub async fn new_with_config(
132        base_uri: &str,
133        schema_manager: Arc<SchemaManager>,
134        config: UniConfig,
135    ) -> Result<Self> {
136        let store = Self::build_store_from_uri(base_uri)?;
137        Self::new_with_store_and_config(base_uri, store, schema_manager, config).await
138    }
139
140    /// Create a new StorageManager using an already-constructed object store.
141    ///
142    /// This is used by higher layers that need explicit store configuration
143    /// (for example custom S3 endpoints in hybrid/cloud modes).
144    pub async fn new_with_store_and_config(
145        base_uri: &str,
146        store: Arc<dyn ObjectStore>,
147        schema_manager: Arc<SchemaManager>,
148        config: UniConfig,
149    ) -> Result<Self> {
150        Self::new_with_store_and_storage_options(base_uri, store, schema_manager, config, None)
151            .await
152    }
153
154    /// Create a new StorageManager using an already-constructed object store
155    /// and explicit LanceDB storage options.
156    pub async fn new_with_store_and_storage_options(
157        base_uri: &str,
158        store: Arc<dyn ObjectStore>,
159        schema_manager: Arc<SchemaManager>,
160        config: UniConfig,
161        lancedb_storage_options: Option<HashMap<String, String>>,
162    ) -> Result<Self> {
163        let resilient_store: Arc<dyn ObjectStore> = Arc::new(ResilientObjectStore::new(
164            store,
165            config.object_store.clone(),
166        ));
167
168        let snapshot_manager = Arc::new(SnapshotManager::new(resilient_store.clone()));
169
170        // Connect to LanceDB
171        let lancedb_store =
172            LanceDbStore::connect_with_storage_options(base_uri, lancedb_storage_options).await?;
173
174        // Perform crash recovery for all known table patterns
175        Self::recover_all_staging_tables(&lancedb_store, &schema_manager).await?;
176
177        let mut sm = Self {
178            base_uri: base_uri.to_string(),
179            store: resilient_store,
180            schema_manager,
181            snapshot_manager,
182            adjacency_manager: Arc::new(AdjacencyManager::new(config.cache_size)),
183            table_cache: DashMap::new(),
184            config,
185            compaction_status: Arc::new(Mutex::new(CompactionStatus::default())),
186            pinned_snapshot: None,
187            lancedb_store: Arc::new(lancedb_store),
188            vid_labels_index: None,
189        };
190
191        // Rebuild VidLabelsIndex if enabled
192        if sm.config.enable_vid_labels_index
193            && let Err(e) = sm.rebuild_vid_labels_index().await
194        {
195            warn!(
196                "Failed to rebuild VidLabelsIndex on startup: {}. Falling back to LanceDB queries.",
197                e
198            );
199        }
200
201        Ok(sm)
202    }
203
204    /// Recover all staging tables for known table patterns.
205    ///
206    /// This runs on startup to handle crash recovery. It checks for staging tables
207    /// for all vertex labels, adjacency tables, delta tables, and main tables.
208    async fn recover_all_staging_tables(
209        lancedb_store: &LanceDbStore,
210        schema_manager: &SchemaManager,
211    ) -> Result<()> {
212        let schema = schema_manager.schema();
213
214        // Recover main vertex and edge tables
215        lancedb_store
216            .recover_staging(LanceDbStore::main_vertex_table_name())
217            .await?;
218        lancedb_store
219            .recover_staging(LanceDbStore::main_edge_table_name())
220            .await?;
221
222        // Recover per-label vertex tables
223        for label in schema.labels.keys() {
224            let table_name = LanceDbStore::vertex_table_name(label);
225            lancedb_store.recover_staging(&table_name).await?;
226        }
227
228        // Recover adjacency and delta tables for each edge type and direction
229        for edge_type in schema.edge_types.keys() {
230            for direction in &["fwd", "bwd"] {
231                // Recover delta tables
232                let delta_table_name = LanceDbStore::delta_table_name(edge_type, direction);
233                lancedb_store.recover_staging(&delta_table_name).await?;
234
235                // Recover adjacency tables for each label
236                for _label in schema.labels.keys() {
237                    let adj_table_name = LanceDbStore::adjacency_table_name(edge_type, direction);
238                    lancedb_store.recover_staging(&adj_table_name).await?;
239                }
240            }
241        }
242
243        Ok(())
244    }
245
246    fn build_store_from_uri(base_uri: &str) -> Result<Arc<dyn ObjectStore>> {
247        if base_uri.contains("://") {
248            let parsed = url::Url::parse(base_uri).map_err(|e| anyhow!("Invalid base URI: {e}"))?;
249            let (store, _path) = object_store::parse_url(&parsed)
250                .map_err(|e| anyhow!("Failed to parse object store URL: {e}"))?;
251            Ok(Arc::from(store))
252        } else {
253            // If local path, ensure it exists.
254            std::fs::create_dir_all(base_uri)?;
255            Ok(Arc::new(LocalFileSystem::new_with_prefix(base_uri)?))
256        }
257    }
258
259    pub fn pinned(&self, snapshot: SnapshotManifest) -> Self {
260        Self {
261            base_uri: self.base_uri.clone(),
262            store: self.store.clone(),
263            schema_manager: self.schema_manager.clone(),
264            snapshot_manager: self.snapshot_manager.clone(),
265            // Separate AdjacencyManager for snapshot isolation (Issue #73):
266            // warm() will load only edges visible at the snapshot's HWM.
267            // This prevents live DB's CSR (with all edges) from leaking into snapshots.
268            adjacency_manager: Arc::new(AdjacencyManager::new(self.adjacency_manager.max_bytes())),
269            table_cache: DashMap::new(),
270            config: self.config.clone(),
271            compaction_status: Arc::new(Mutex::new(CompactionStatus::default())),
272            pinned_snapshot: Some(snapshot),
273            lancedb_store: self.lancedb_store.clone(),
274            vid_labels_index: self.vid_labels_index.clone(),
275        }
276    }
277
278    pub fn get_edge_version_by_id(&self, edge_type_id: u32) -> Option<u64> {
279        let schema = self.schema_manager.schema();
280        let name = schema.edge_type_name_by_id(edge_type_id)?;
281        self.pinned_snapshot
282            .as_ref()
283            .and_then(|s| s.edges.get(name).map(|es| es.lance_version))
284    }
285
286    /// Returns the version_high_water_mark from the pinned snapshot if present.
287    ///
288    /// This is used for time-travel queries to filter data by version.
289    /// When a snapshot is pinned, only rows with `_version <= version_high_water_mark`
290    /// should be considered visible.
291    pub fn version_high_water_mark(&self) -> Option<u64> {
292        self.pinned_snapshot
293            .as_ref()
294            .map(|s| s.version_high_water_mark)
295    }
296
297    /// Apply version filtering to a base filter expression.
298    ///
299    /// If a snapshot is pinned, wraps `base_filter` with an additional
300    /// `_version <= hwm` clause. Otherwise returns `base_filter` unchanged.
301    pub fn apply_version_filter(&self, base_filter: String) -> String {
302        if let Some(hwm) = self.version_high_water_mark() {
303            format!("({}) AND (_version <= {})", base_filter, hwm)
304        } else {
305            base_filter
306        }
307    }
308
309    /// Build a filter expression that excludes soft-deleted rows and optionally
310    /// includes a user-provided filter.
311    fn build_active_filter(user_filter: Option<&str>) -> String {
312        match user_filter {
313            Some(expr) => format!("({}) AND (_deleted = false)", expr),
314            None => "_deleted = false".to_string(),
315        }
316    }
317
318    pub fn store(&self) -> Arc<dyn ObjectStore> {
319        self.store.clone()
320    }
321
322    /// Get current compaction status.
323    ///
324    /// # Errors
325    ///
326    /// Returns error if the compaction status lock is poisoned (see issue #18/#150).
327    pub fn compaction_status(
328        &self,
329    ) -> Result<CompactionStatus, uni_common::sync::LockPoisonedError> {
330        let guard = uni_common::sync::acquire_mutex(&self.compaction_status, "compaction_status")?;
331        Ok(guard.clone())
332    }
333
334    pub async fn compact(&self) -> Result<CompactionStats> {
335        // LanceDB handles compaction internally via optimize()
336        // For now, call optimize on vertex tables
337        let start = std::time::Instant::now();
338        let schema = self.schema_manager.schema();
339        let mut files_compacted = 0;
340
341        for label in schema.labels.keys() {
342            let table_name = LanceDbStore::vertex_table_name(label);
343            if self.lancedb_store.table_exists(&table_name).await? {
344                let table = self.lancedb_store.open_table(&table_name).await?;
345                table.optimize(lancedb::table::OptimizeAction::All).await?;
346                files_compacted += 1;
347                self.invalidate_table_cache(label);
348            }
349        }
350
351        Ok(CompactionStats {
352            files_compacted,
353            bytes_before: 0,
354            bytes_after: 0,
355            duration: start.elapsed(),
356            crdt_merges: 0,
357        })
358    }
359
360    pub async fn compact_label(&self, label: &str) -> Result<CompactionStats> {
361        let _guard = CompactionGuard::new(self.compaction_status.clone())
362            .ok_or_else(|| anyhow!("Compaction already in progress"))?;
363
364        let start = std::time::Instant::now();
365        let table_name = LanceDbStore::vertex_table_name(label);
366
367        if self.lancedb_store.table_exists(&table_name).await? {
368            let table = self.lancedb_store.open_table(&table_name).await?;
369            table.optimize(lancedb::table::OptimizeAction::All).await?;
370            self.invalidate_table_cache(label);
371        }
372
373        Ok(CompactionStats {
374            files_compacted: 1,
375            bytes_before: 0,
376            bytes_after: 0,
377            duration: start.elapsed(),
378            crdt_merges: 0,
379        })
380    }
381
382    pub async fn compact_edge_type(&self, edge_type: &str) -> Result<CompactionStats> {
383        let _guard = CompactionGuard::new(self.compaction_status.clone())
384            .ok_or_else(|| anyhow!("Compaction already in progress"))?;
385
386        let start = std::time::Instant::now();
387        let mut files_compacted = 0;
388
389        for dir in ["fwd", "bwd"] {
390            let table_name = LanceDbStore::delta_table_name(edge_type, dir);
391            if self.lancedb_store.table_exists(&table_name).await? {
392                let table = self.lancedb_store.open_table(&table_name).await?;
393                table.optimize(lancedb::table::OptimizeAction::All).await?;
394                files_compacted += 1;
395            }
396        }
397
398        Ok(CompactionStats {
399            files_compacted,
400            bytes_before: 0,
401            bytes_after: 0,
402            duration: start.elapsed(),
403            crdt_merges: 0,
404        })
405    }
406
407    pub async fn wait_for_compaction(&self) -> Result<()> {
408        loop {
409            let in_progress = {
410                acquire_mutex(&self.compaction_status, "compaction_status")?.compaction_in_progress
411            };
412            if !in_progress {
413                return Ok(());
414            }
415            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
416        }
417    }
418
419    pub fn start_background_compaction(
420        self: Arc<Self>,
421        mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
422    ) -> tokio::task::JoinHandle<()> {
423        if !self.config.compaction.enabled {
424            return tokio::spawn(async {});
425        }
426
427        tokio::spawn(async move {
428            let mut interval = tokio::time::interval(self.config.compaction.check_interval);
429
430            loop {
431                tokio::select! {
432                    _ = interval.tick() => {
433                        if let Err(e) = self.update_compaction_status().await {
434                            log::error!("Failed to update compaction status: {}", e);
435                            continue;
436                        }
437
438                        if let Some(task) = self.pick_compaction_task() {
439                            log::info!("Triggering background compaction: {:?}", task);
440                            if let Err(e) = Self::execute_compaction(Arc::clone(&self), task).await {
441                                log::error!("Compaction failed: {}", e);
442                            }
443                        }
444                    }
445                    _ = shutdown_rx.recv() => {
446                        log::info!("Background compaction shutting down");
447                        let _ = self.wait_for_compaction().await;
448                        break;
449                    }
450                }
451            }
452        })
453    }
454
455    async fn update_compaction_status(&self) -> Result<()> {
456        let schema = self.schema_manager.schema();
457        let mut total_tables = 0;
458        let mut total_rows: usize = 0;
459        let mut oldest_ts: Option<i64> = None;
460
461        for name in schema.edge_types.keys() {
462            for dir in ["fwd", "bwd"] {
463                let table_name = LanceDbStore::delta_table_name(name, dir);
464                let Ok(table) = self.lancedb_store.open_table(&table_name).await else {
465                    continue;
466                };
467                let row_count = table.count_rows(None).await.unwrap_or(0);
468                if row_count == 0 {
469                    continue;
470                }
471                total_tables += 1;
472                total_rows += row_count;
473
474                // Query oldest _created_at for age tracking
475                let Ok(stream) = table
476                    .query()
477                    .select(Select::Columns(vec!["_created_at".to_string()]))
478                    .execute()
479                    .await
480                else {
481                    continue;
482                };
483                let Ok(batches) = stream.try_collect::<Vec<_>>().await else {
484                    continue;
485                };
486                for batch in batches {
487                    let Some(col) = batch
488                        .column_by_name("_created_at")
489                        .and_then(|c| c.as_any().downcast_ref::<TimestampNanosecondArray>())
490                    else {
491                        continue;
492                    };
493                    for i in 0..col.len() {
494                        if !col.is_null(i) {
495                            let ts = col.value(i);
496                            oldest_ts = Some(oldest_ts.map_or(ts, |prev| prev.min(ts)));
497                        }
498                    }
499                }
500            }
501        }
502
503        let oldest_l1_age = oldest_ts
504            .and_then(|ts| {
505                let created = UNIX_EPOCH + Duration::from_nanos(ts as u64);
506                SystemTime::now().duration_since(created).ok()
507            })
508            .unwrap_or(Duration::ZERO);
509
510        let mut status = acquire_mutex(&self.compaction_status, "compaction_status")?;
511        status.l1_runs = total_tables;
512        status.l1_size_bytes = (total_rows * ENTRY_SIZE_ESTIMATE) as u64;
513        status.oldest_l1_age = oldest_l1_age;
514        Ok(())
515    }
516
517    fn pick_compaction_task(&self) -> Option<CompactionTask> {
518        let status = acquire_mutex(&self.compaction_status, "compaction_status").ok()?;
519
520        if status.l1_runs >= self.config.compaction.max_l1_runs {
521            return Some(CompactionTask::ByRunCount);
522        }
523        if status.l1_size_bytes >= self.config.compaction.max_l1_size_bytes {
524            return Some(CompactionTask::BySize);
525        }
526        if status.oldest_l1_age >= self.config.compaction.max_l1_age
527            && status.oldest_l1_age > Duration::ZERO
528        {
529            return Some(CompactionTask::ByAge);
530        }
531
532        None
533    }
534
535    /// Open a table by name and run `optimize(All)`, returning `true` on success.
536    async fn optimize_table(store: &LanceDbStore, table_name: &str) -> bool {
537        let Ok(table) = store.open_table(table_name).await else {
538            return false;
539        };
540        if let Err(e) = table.optimize(lancedb::table::OptimizeAction::All).await {
541            log::warn!("Failed to optimize table {}: {}", table_name, e);
542            return false;
543        }
544        true
545    }
546
547    async fn execute_compaction(this: Arc<Self>, _task: CompactionTask) -> Result<CompactionStats> {
548        let start = std::time::Instant::now();
549        let _guard = CompactionGuard::new(this.compaction_status.clone())
550            .ok_or_else(|| anyhow!("Compaction already in progress"))?;
551
552        let schema = this.schema_manager.schema();
553        let mut files_compacted = 0;
554
555        // ── Tier 2: Semantic compaction ──
556        // Dedup vertices, merge CRDTs, consolidate L1→L2 deltas, clean tombstones
557        let compactor = Compactor::new(Arc::clone(&this));
558        let compaction_results = compactor.compact_all().await.unwrap_or_else(|e| {
559            log::error!(
560                "Semantic compaction failed (continuing with Lance optimize): {}",
561                e
562            );
563            Vec::new()
564        });
565
566        // Re-warm adjacency CSR after semantic compaction
567        let am = this.adjacency_manager();
568        for info in &compaction_results {
569            let direction = match info.direction.as_str() {
570                "fwd" => Direction::Outgoing,
571                "bwd" => Direction::Incoming,
572                _ => continue,
573            };
574            if let Some(etid) = schema.edge_type_id_unified_case_insensitive(&info.edge_type)
575                && let Err(e) = am.warm(&this, etid, direction, None).await
576            {
577                log::warn!(
578                    "Failed to re-warm adjacency for {}/{}: {}",
579                    info.edge_type,
580                    info.direction,
581                    e
582                );
583            }
584        }
585
586        // ── Tier 3: Lance optimize ──
587        let store = &this.lancedb_store;
588
589        // Optimize edge delta and adjacency tables
590        for name in schema.edge_types.keys() {
591            for dir in ["fwd", "bwd"] {
592                let delta = LanceDbStore::delta_table_name(name, dir);
593                if Self::optimize_table(store, &delta).await {
594                    files_compacted += 1;
595                }
596                let adj = LanceDbStore::adjacency_table_name(name, dir);
597                if Self::optimize_table(store, &adj).await {
598                    files_compacted += 1;
599                }
600            }
601        }
602
603        // Optimize vertex tables
604        for label in schema.labels.keys() {
605            let table_name = LanceDbStore::vertex_table_name(label);
606            if Self::optimize_table(store, &table_name).await {
607                files_compacted += 1;
608                this.invalidate_table_cache(label);
609            }
610        }
611
612        // Optimize main vertex and edge tables
613        for table_name in [
614            LanceDbStore::main_vertex_table_name(),
615            LanceDbStore::main_edge_table_name(),
616        ] {
617            if Self::optimize_table(store, table_name).await {
618                files_compacted += 1;
619            }
620        }
621
622        {
623            let mut status = acquire_mutex(&this.compaction_status, "compaction_status")?;
624            status.total_compactions += 1;
625        }
626
627        Ok(CompactionStats {
628            files_compacted,
629            bytes_before: 0,
630            bytes_after: 0,
631            duration: start.elapsed(),
632            crdt_merges: 0,
633        })
634    }
635
636    /// Get or open a cached table for a label
637    pub async fn get_cached_table(&self, label: &str) -> Result<lancedb::Table> {
638        // Check cache first
639        if let Some(table) = self.table_cache.get(label) {
640            return Ok(table.clone());
641        }
642
643        // Open and cache
644        let table_name = LanceDbStore::vertex_table_name(label);
645        let table = self.lancedb_store.open_table(&table_name).await?;
646
647        self.table_cache.insert(label.to_string(), table.clone());
648        Ok(table)
649    }
650
651    /// Invalidate cached table (call after writes)
652    pub fn invalidate_table_cache(&self, label: &str) {
653        self.table_cache.remove(label);
654    }
655
656    /// Clear all cached tables
657    pub fn clear_table_cache(&self) {
658        self.table_cache.clear();
659    }
660
661    pub fn base_path(&self) -> &str {
662        &self.base_uri
663    }
664
665    pub fn schema_manager(&self) -> &SchemaManager {
666        &self.schema_manager
667    }
668
669    pub fn schema_manager_arc(&self) -> Arc<SchemaManager> {
670        self.schema_manager.clone()
671    }
672
673    /// Get the adjacency manager for the dual-CSR architecture.
674    pub fn adjacency_manager(&self) -> Arc<AdjacencyManager> {
675        Arc::clone(&self.adjacency_manager)
676    }
677
678    /// Warm the adjacency manager for a specific edge type and direction.
679    ///
680    /// Builds the Main CSR from L2 adjacency + L1 delta data in storage.
681    /// Called lazily on first access per edge type or at startup.
682    pub async fn warm_adjacency(
683        &self,
684        edge_type_id: u32,
685        direction: crate::storage::direction::Direction,
686        version: Option<u64>,
687    ) -> anyhow::Result<()> {
688        self.adjacency_manager
689            .warm(self, edge_type_id, direction, version)
690            .await
691    }
692
693    /// Coalesced warm_adjacency() to prevent cache stampede (Issue #13).
694    ///
695    /// Uses double-checked locking to ensure only one concurrent warm() per
696    /// (edge_type, direction) key. Subsequent callers wait for the first to complete.
697    pub async fn warm_adjacency_coalesced(
698        &self,
699        edge_type_id: u32,
700        direction: crate::storage::direction::Direction,
701        version: Option<u64>,
702    ) -> anyhow::Result<()> {
703        self.adjacency_manager
704            .warm_coalesced(self, edge_type_id, direction, version)
705            .await
706    }
707
708    /// Check whether the adjacency manager has a CSR for the given edge type and direction.
709    pub fn has_adjacency_csr(
710        &self,
711        edge_type_id: u32,
712        direction: crate::storage::direction::Direction,
713    ) -> bool {
714        self.adjacency_manager.has_csr(edge_type_id, direction)
715    }
716
717    /// Get neighbors at a specific version for snapshot queries.
718    pub fn get_neighbors_at_version(
719        &self,
720        vid: uni_common::core::id::Vid,
721        edge_type: u32,
722        direction: crate::storage::direction::Direction,
723        version: u64,
724    ) -> Vec<(uni_common::core::id::Vid, uni_common::core::id::Eid)> {
725        self.adjacency_manager
726            .get_neighbors_at_version(vid, edge_type, direction, version)
727    }
728
729    /// Get the LanceDB store.
730    pub fn lancedb_store(&self) -> &LanceDbStore {
731        &self.lancedb_store
732    }
733
734    /// Get the LanceDB store as an Arc.
735    pub fn lancedb_store_arc(&self) -> Arc<LanceDbStore> {
736        self.lancedb_store.clone()
737    }
738
739    /// Rebuild the VidLabelsIndex from the main vertex table.
740    /// This is called on startup if enable_vid_labels_index is true.
741    async fn rebuild_vid_labels_index(&mut self) -> Result<()> {
742        use crate::lancedb::LanceDbStore;
743        use crate::storage::vid_labels::VidLabelsIndex;
744
745        let lancedb_store = self.lancedb_store();
746
747        // Open the main vertex table
748        let table = match lancedb_store
749            .open_table(LanceDbStore::main_vertex_table_name())
750            .await
751        {
752            Ok(t) => t,
753            Err(_) => {
754                // Table doesn't exist yet (fresh database)
755                self.vid_labels_index =
756                    Some(Arc::new(parking_lot::RwLock::new(VidLabelsIndex::new())));
757                return Ok(());
758            }
759        };
760
761        // Scan all non-deleted vertices and collect (VID, labels)
762        let batches = table
763            .query()
764            .only_if("_deleted = false")
765            .limit(100_000) // Reasonable batch size
766            .execute()
767            .await
768            .map_err(|e| anyhow!("Failed to query main vertex table: {}", e))?
769            .try_collect::<Vec<_>>()
770            .await
771            .map_err(|e| anyhow!("Failed to collect vertex data: {}", e))?;
772
773        let mut index = VidLabelsIndex::new();
774        for batch in batches {
775            let vid_col = batch
776                .column_by_name("_vid")
777                .ok_or_else(|| anyhow!("Missing _vid column"))?
778                .as_any()
779                .downcast_ref::<UInt64Array>()
780                .ok_or_else(|| anyhow!("Invalid _vid column type"))?;
781
782            let labels_col = batch
783                .column_by_name("labels")
784                .ok_or_else(|| anyhow!("Missing labels column"))?
785                .as_any()
786                .downcast_ref::<arrow_array::ListArray>()
787                .ok_or_else(|| anyhow!("Invalid labels column type"))?;
788
789            for row_idx in 0..batch.num_rows() {
790                let vid = Vid::from(vid_col.value(row_idx));
791                let labels_array = labels_col.value(row_idx);
792                let labels_str_array = labels_array
793                    .as_any()
794                    .downcast_ref::<arrow_array::StringArray>()
795                    .ok_or_else(|| anyhow!("Invalid labels array element type"))?;
796
797                let labels: Vec<String> = (0..labels_str_array.len())
798                    .map(|i| labels_str_array.value(i).to_string())
799                    .collect();
800
801                index.insert(vid, labels);
802            }
803        }
804
805        self.vid_labels_index = Some(Arc::new(parking_lot::RwLock::new(index)));
806        Ok(())
807    }
808
809    /// Get labels for a VID from the in-memory index.
810    /// Returns None if the index is disabled or the VID is not found.
811    pub fn get_labels_from_index(&self, vid: Vid) -> Option<Vec<String>> {
812        self.vid_labels_index.as_ref().and_then(|idx| {
813            let index = idx.read();
814            index.get_labels(vid).map(|labels| labels.to_vec())
815        })
816    }
817
818    /// Update the VID-to-labels mapping in the index.
819    /// No-op if the index is disabled.
820    pub fn update_vid_labels_index(&self, vid: Vid, labels: Vec<String>) {
821        if let Some(idx) = &self.vid_labels_index {
822            let mut index = idx.write();
823            index.insert(vid, labels);
824        }
825    }
826
827    /// Remove a VID from the labels index.
828    /// No-op if the index is disabled.
829    pub fn remove_from_vid_labels_index(&self, vid: Vid) {
830        if let Some(idx) = &self.vid_labels_index {
831            let mut index = idx.write();
832            index.remove_vid(vid);
833        }
834    }
835
836    pub async fn load_subgraph_cached(
837        &self,
838        start_vids: &[Vid],
839        edge_types: &[u32],
840        max_hops: usize,
841        direction: GraphDirection,
842        _l0: Option<Arc<RwLock<L0Buffer>>>,
843    ) -> Result<WorkingGraph> {
844        let mut graph = WorkingGraph::new();
845
846        let dir = match direction {
847            GraphDirection::Outgoing => crate::storage::direction::Direction::Outgoing,
848            GraphDirection::Incoming => crate::storage::direction::Direction::Incoming,
849        };
850
851        let neighbor_is_dst = matches!(direction, GraphDirection::Outgoing);
852
853        // Initialize frontier
854        let mut frontier: Vec<Vid> = start_vids.to_vec();
855        let mut visited: HashSet<Vid> = HashSet::new();
856
857        // Initialize start vids
858        for &vid in start_vids {
859            graph.add_vertex(vid);
860        }
861
862        for _hop in 0..max_hops {
863            let mut next_frontier = HashSet::new();
864
865            for &vid in &frontier {
866                if visited.contains(&vid) {
867                    continue;
868                }
869                visited.insert(vid);
870                graph.add_vertex(vid);
871
872                for &etype_id in edge_types {
873                    // Warm adjacency with coalescing to prevent cache stampede (Issue #13)
874                    let edge_ver = self.version_high_water_mark();
875                    self.adjacency_manager
876                        .warm_coalesced(self, etype_id, dir, edge_ver)
877                        .await?;
878
879                    // Get neighbors from AdjacencyManager (Main CSR + overlay)
880                    let edges = self.adjacency_manager.get_neighbors(vid, etype_id, dir);
881
882                    for (neighbor_vid, eid) in edges {
883                        graph.add_vertex(neighbor_vid);
884                        if !visited.contains(&neighbor_vid) {
885                            next_frontier.insert(neighbor_vid);
886                        }
887
888                        if neighbor_is_dst {
889                            graph.add_edge(vid, neighbor_vid, eid, etype_id);
890                        } else {
891                            graph.add_edge(neighbor_vid, vid, eid, etype_id);
892                        }
893                    }
894                }
895            }
896            frontier = next_frontier.into_iter().collect();
897
898            // Early termination: if frontier is empty, no more vertices to explore
899            if frontier.is_empty() {
900                break;
901            }
902        }
903
904        Ok(graph)
905    }
906
907    pub fn snapshot_manager(&self) -> &SnapshotManager {
908        &self.snapshot_manager
909    }
910
911    pub fn index_manager(&self) -> IndexManager {
912        IndexManager::new(
913            &self.base_uri,
914            self.schema_manager.clone(),
915            self.lancedb_store.clone(),
916        )
917    }
918
919    pub fn vertex_dataset(&self, label: &str) -> Result<VertexDataset> {
920        let schema = self.schema_manager.schema();
921        let label_meta = schema
922            .labels
923            .get(label)
924            .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
925        Ok(VertexDataset::new(&self.base_uri, label, label_meta.id))
926    }
927
928    pub fn edge_dataset(
929        &self,
930        edge_type: &str,
931        src_label: &str,
932        dst_label: &str,
933    ) -> Result<EdgeDataset> {
934        Ok(EdgeDataset::new(
935            &self.base_uri,
936            edge_type,
937            src_label,
938            dst_label,
939        ))
940    }
941
942    pub fn delta_dataset(&self, edge_type: &str, direction: &str) -> Result<DeltaDataset> {
943        Ok(DeltaDataset::new(&self.base_uri, edge_type, direction))
944    }
945
946    pub fn adjacency_dataset(
947        &self,
948        edge_type: &str,
949        label: &str,
950        direction: &str,
951    ) -> Result<AdjacencyDataset> {
952        Ok(AdjacencyDataset::new(
953            &self.base_uri,
954            edge_type,
955            label,
956            direction,
957        ))
958    }
959
960    /// Get the main vertex dataset for unified vertex storage.
961    ///
962    /// The main vertex dataset contains all vertices regardless of label,
963    /// enabling fast ID-based lookups without knowing the label.
964    pub fn main_vertex_dataset(&self) -> MainVertexDataset {
965        MainVertexDataset::new(&self.base_uri)
966    }
967
968    /// Get the main edge dataset for unified edge storage.
969    ///
970    /// The main edge dataset contains all edges regardless of type,
971    /// enabling fast ID-based lookups without knowing the edge type.
972    pub fn main_edge_dataset(&self) -> MainEdgeDataset {
973        MainEdgeDataset::new(&self.base_uri)
974    }
975
976    pub fn uid_index(&self, label: &str) -> Result<UidIndex> {
977        Ok(UidIndex::new(&self.base_uri, label))
978    }
979
980    pub async fn inverted_index(&self, label: &str, property: &str) -> Result<InvertedIndex> {
981        let schema = self.schema_manager.schema();
982        let config = schema
983            .indexes
984            .iter()
985            .find_map(|idx| match idx {
986                IndexDefinition::Inverted(cfg)
987                    if cfg.label == label && cfg.property == property =>
988                {
989                    Some(cfg.clone())
990                }
991                _ => None,
992            })
993            .ok_or_else(|| anyhow!("Inverted index not found for {}.{}", label, property))?;
994
995        InvertedIndex::new(&self.base_uri, config).await
996    }
997
998    pub async fn vector_search(
999        &self,
1000        label: &str,
1001        property: &str,
1002        query: &[f32],
1003        k: usize,
1004        filter: Option<&str>,
1005        ctx: Option<&QueryContext>,
1006    ) -> Result<Vec<(Vid, f32)>> {
1007        // Look up vector index config to get the correct distance metric.
1008        let schema = self.schema_manager.schema();
1009        let metric = schema
1010            .vector_index_for_property(label, property)
1011            .map(|config| config.metric.clone())
1012            .unwrap_or(DistanceMetric::L2);
1013
1014        // Try to open the cached table; if the label has no data yet the Lance
1015        // table won't exist. In that case fall back to L0-only results.
1016        let table = self.get_cached_table(label).await.ok();
1017
1018        let mut results = Vec::new();
1019
1020        if let Some(table) = table {
1021            let distance_type = match &metric {
1022                DistanceMetric::L2 => lancedb::DistanceType::L2,
1023                DistanceMetric::Cosine => lancedb::DistanceType::Cosine,
1024                DistanceMetric::Dot => lancedb::DistanceType::Dot,
1025                _ => lancedb::DistanceType::L2,
1026            };
1027
1028            // Use LanceDB's vector search API
1029            let mut query_builder = table
1030                .vector_search(query.to_vec())
1031                .map_err(|e| anyhow!("Failed to create vector search: {}", e))?
1032                .column(property)
1033                .distance_type(distance_type)
1034                .limit(k);
1035
1036            query_builder = query_builder.only_if(Self::build_active_filter(filter));
1037
1038            // Apply version filtering if snapshot is pinned
1039            if ctx.is_some()
1040                && let Some(hwm) = self.version_high_water_mark()
1041            {
1042                query_builder = query_builder.only_if(format!("_version <= {}", hwm));
1043            }
1044
1045            let batches = query_builder
1046                .execute()
1047                .await
1048                .map_err(|e| anyhow!("Vector search execution failed: {}", e))?
1049                .try_collect::<Vec<_>>()
1050                .await
1051                .map_err(|e| anyhow!("Failed to collect vector search results: {}", e))?;
1052
1053            results = extract_vid_score_pairs(&batches, "_vid", "_distance")?;
1054        }
1055
1056        // Merge L0 buffer vertices into results for visibility of unflushed data.
1057        if let Some(qctx) = ctx {
1058            merge_l0_into_vector_results(&mut results, qctx, label, property, query, k, &metric);
1059        }
1060
1061        Ok(results)
1062    }
1063
1064    /// Perform a full-text search with BM25 scoring.
1065    ///
1066    /// Returns vertices matching the search query along with their BM25 scores.
1067    /// Results are sorted by score descending (most relevant first).
1068    ///
1069    /// # Arguments
1070    /// * `label` - The label to search within
1071    /// * `property` - The property column to search (must have FTS index)
1072    /// * `query` - The search query text
1073    /// * `k` - Maximum number of results to return
1074    /// * `filter` - Optional Lance filter expression
1075    /// * `ctx` - Optional query context for visibility checks
1076    ///
1077    /// # Returns
1078    /// Vector of (Vid, score) tuples, where score is the BM25 relevance score.
1079    pub async fn fts_search(
1080        &self,
1081        label: &str,
1082        property: &str,
1083        query: &str,
1084        k: usize,
1085        filter: Option<&str>,
1086        ctx: Option<&QueryContext>,
1087    ) -> Result<Vec<(Vid, f32)>> {
1088        use lance_index::scalar::FullTextSearchQuery;
1089        use lance_index::scalar::inverted::query::MatchQuery;
1090
1091        // Try to open the cached table; if the label has no data yet the Lance
1092        // table won't exist. In that case return empty results.
1093        let table = match self.get_cached_table(label).await {
1094            Ok(t) => t,
1095            Err(_) => return Ok(Vec::new()),
1096        };
1097
1098        // Build the FTS query with specific column
1099        let match_query =
1100            MatchQuery::new(query.to_string()).with_column(Some(property.to_string()));
1101        let fts_query = FullTextSearchQuery {
1102            query: match_query.into(),
1103            limit: Some(k as i64),
1104            wand_factor: None,
1105        };
1106
1107        let mut query_builder = table.query().full_text_search(fts_query).limit(k);
1108
1109        query_builder = query_builder.only_if(Self::build_active_filter(filter));
1110
1111        // Apply version filtering if snapshot is pinned
1112        if ctx.is_some()
1113            && let Some(hwm) = self.version_high_water_mark()
1114        {
1115            query_builder = query_builder.only_if(format!("_version <= {}", hwm));
1116        }
1117
1118        let batches = query_builder
1119            .execute()
1120            .await
1121            .map_err(|e| anyhow!("FTS search execution failed: {}", e))?
1122            .try_collect::<Vec<_>>()
1123            .await
1124            .map_err(|e| anyhow!("Failed to collect FTS search results: {}", e))?;
1125
1126        let mut results = extract_vid_score_pairs(&batches, "_vid", "_score")?;
1127
1128        // Results should already be sorted by score from Lance, but ensure descending order
1129        results.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
1130
1131        Ok(results)
1132    }
1133
1134    pub async fn get_vertex_by_uid(&self, uid: &UniId, label: &str) -> Result<Option<Vid>> {
1135        let index = self.uid_index(label)?;
1136        index.get_vid(uid).await
1137    }
1138
1139    pub async fn insert_vertex_with_uid(&self, label: &str, vid: Vid, uid: UniId) -> Result<()> {
1140        let index = self.uid_index(label)?;
1141        index.write_mapping(&[(uid, vid)]).await
1142    }
1143
1144    pub async fn load_subgraph(
1145        &self,
1146        start_vids: &[Vid],
1147        edge_types: &[u32],
1148        max_hops: usize,
1149        direction: GraphDirection,
1150        l0: Option<&L0Buffer>,
1151    ) -> Result<WorkingGraph> {
1152        let mut graph = WorkingGraph::new();
1153        let schema = self.schema_manager.schema();
1154
1155        // Build maps for ID lookups
1156        let label_map: HashMap<u16, String> = schema
1157            .labels
1158            .values()
1159            .map(|meta| {
1160                (
1161                    meta.id,
1162                    schema.label_name_by_id(meta.id).unwrap().to_owned(),
1163                )
1164            })
1165            .collect();
1166
1167        let edge_type_map: HashMap<u32, String> = schema
1168            .edge_types
1169            .values()
1170            .map(|meta| {
1171                (
1172                    meta.id,
1173                    schema.edge_type_name_by_id(meta.id).unwrap().to_owned(),
1174                )
1175            })
1176            .collect();
1177
1178        let target_edge_types: HashSet<u32> = edge_types.iter().cloned().collect();
1179
1180        // Initialize frontier
1181        let mut frontier: Vec<Vid> = start_vids.to_vec();
1182        let mut visited: HashSet<Vid> = HashSet::new();
1183
1184        // Add start vertices to graph
1185        for &vid in start_vids {
1186            graph.add_vertex(vid);
1187        }
1188
1189        for _hop in 0..max_hops {
1190            let mut next_frontier = HashSet::new();
1191
1192            for &vid in &frontier {
1193                if visited.contains(&vid) {
1194                    continue;
1195                }
1196                visited.insert(vid);
1197                graph.add_vertex(vid);
1198
1199                // For each edge type we want to traverse
1200                for &etype_id in &target_edge_types {
1201                    let etype_name = edge_type_map
1202                        .get(&etype_id)
1203                        .ok_or_else(|| anyhow!("Unknown edge type ID: {}", etype_id))?;
1204
1205                    // Determine directions
1206                    // Storage direction: "fwd" or "bwd".
1207                    // Query direction: Outgoing -> "fwd", Incoming -> "bwd".
1208                    let (dir_str, neighbor_is_dst) = match direction {
1209                        GraphDirection::Outgoing => ("fwd", true),
1210                        GraphDirection::Incoming => ("bwd", false),
1211                    };
1212
1213                    let mut edges: HashMap<Eid, EdgeState> = HashMap::new();
1214
1215                    // 1. L2: Adjacency (Base)
1216                    // In the new storage model, VIDs don't embed label info.
1217                    // We need to try all labels to find the adjacency data.
1218                    // Edge version from snapshot (reserved for future version filtering)
1219                    let _edge_ver = self
1220                        .pinned_snapshot
1221                        .as_ref()
1222                        .and_then(|s| s.edges.get(etype_name).map(|es| es.lance_version));
1223
1224                    // Try each label until we find adjacency data
1225                    let lancedb_store = self.lancedb_store();
1226                    for current_src_label in label_map.values() {
1227                        let adj_ds =
1228                            match self.adjacency_dataset(etype_name, current_src_label, dir_str) {
1229                                Ok(ds) => ds,
1230                                Err(_) => continue,
1231                            };
1232                        if let Some((neighbors, eids)) =
1233                            adj_ds.read_adjacency_lancedb(lancedb_store, vid).await?
1234                        {
1235                            for (n, eid) in neighbors.into_iter().zip(eids) {
1236                                edges.insert(
1237                                    eid,
1238                                    EdgeState {
1239                                        neighbor: n,
1240                                        version: 0,
1241                                        deleted: false,
1242                                    },
1243                                );
1244                            }
1245                            break; // Found adjacency data for this vid, no need to try other labels
1246                        }
1247                    }
1248
1249                    // 2. L1: Delta
1250                    let delta_ds = self.delta_dataset(etype_name, dir_str)?;
1251                    let delta_entries = delta_ds
1252                        .read_deltas_lancedb(
1253                            lancedb_store,
1254                            vid,
1255                            &schema,
1256                            self.version_high_water_mark(),
1257                        )
1258                        .await?;
1259                    Self::apply_delta_to_edges(&mut edges, delta_entries, neighbor_is_dst);
1260
1261                    // 3. L0: Buffer
1262                    if let Some(l0) = l0 {
1263                        Self::apply_l0_to_edges(&mut edges, l0, vid, etype_id, direction);
1264                    }
1265
1266                    // Add resulting edges to graph
1267                    Self::add_edges_to_graph(
1268                        &mut graph,
1269                        edges,
1270                        vid,
1271                        etype_id,
1272                        neighbor_is_dst,
1273                        &visited,
1274                        &mut next_frontier,
1275                    );
1276                }
1277            }
1278            frontier = next_frontier.into_iter().collect();
1279
1280            // Early termination: if frontier is empty, no more vertices to explore
1281            if frontier.is_empty() {
1282                break;
1283            }
1284        }
1285
1286        Ok(graph)
1287    }
1288
1289    /// Apply delta entries to edge state map, handling version conflicts.
1290    fn apply_delta_to_edges(
1291        edges: &mut HashMap<Eid, EdgeState>,
1292        delta_entries: Vec<crate::storage::delta::L1Entry>,
1293        neighbor_is_dst: bool,
1294    ) {
1295        for entry in delta_entries {
1296            let neighbor = if neighbor_is_dst {
1297                entry.dst_vid
1298            } else {
1299                entry.src_vid
1300            };
1301            let current_ver = edges.get(&entry.eid).map(|s| s.version).unwrap_or(0);
1302
1303            if entry.version > current_ver {
1304                edges.insert(
1305                    entry.eid,
1306                    EdgeState {
1307                        neighbor,
1308                        version: entry.version,
1309                        deleted: matches!(entry.op, Op::Delete),
1310                    },
1311                );
1312            }
1313        }
1314    }
1315
1316    /// Apply L0 buffer edges and tombstones to edge state map.
1317    fn apply_l0_to_edges(
1318        edges: &mut HashMap<Eid, EdgeState>,
1319        l0: &L0Buffer,
1320        vid: Vid,
1321        etype_id: u32,
1322        direction: GraphDirection,
1323    ) {
1324        let l0_neighbors = l0.get_neighbors(vid, etype_id, direction);
1325        for (neighbor, eid, ver) in l0_neighbors {
1326            let current_ver = edges.get(&eid).map(|s| s.version).unwrap_or(0);
1327            if ver > current_ver {
1328                edges.insert(
1329                    eid,
1330                    EdgeState {
1331                        neighbor,
1332                        version: ver,
1333                        deleted: false,
1334                    },
1335                );
1336            }
1337        }
1338
1339        // Check tombstones in L0
1340        for (eid, state) in edges.iter_mut() {
1341            if l0.is_tombstoned(*eid) {
1342                state.deleted = true;
1343            }
1344        }
1345    }
1346
1347    /// Add non-deleted edges to graph and collect next frontier.
1348    fn add_edges_to_graph(
1349        graph: &mut WorkingGraph,
1350        edges: HashMap<Eid, EdgeState>,
1351        vid: Vid,
1352        etype_id: u32,
1353        neighbor_is_dst: bool,
1354        visited: &HashSet<Vid>,
1355        next_frontier: &mut HashSet<Vid>,
1356    ) {
1357        for (eid, state) in edges {
1358            if state.deleted {
1359                continue;
1360            }
1361            graph.add_vertex(state.neighbor);
1362
1363            if !visited.contains(&state.neighbor) {
1364                next_frontier.insert(state.neighbor);
1365            }
1366
1367            if neighbor_is_dst {
1368                graph.add_edge(vid, state.neighbor, eid, etype_id);
1369            } else {
1370                graph.add_edge(state.neighbor, vid, eid, etype_id);
1371            }
1372        }
1373    }
1374}
1375
1376/// Extracts `(Vid, f32)` pairs from record batches using the given VID and score column names.
1377fn extract_vid_score_pairs(
1378    batches: &[arrow_array::RecordBatch],
1379    vid_column: &str,
1380    score_column: &str,
1381) -> Result<Vec<(Vid, f32)>> {
1382    let mut results = Vec::new();
1383    for batch in batches {
1384        let vid_col = batch
1385            .column_by_name(vid_column)
1386            .ok_or_else(|| anyhow!("Missing {} column", vid_column))?
1387            .as_any()
1388            .downcast_ref::<UInt64Array>()
1389            .ok_or_else(|| anyhow!("Invalid {} column type", vid_column))?;
1390
1391        let score_col = batch
1392            .column_by_name(score_column)
1393            .ok_or_else(|| anyhow!("Missing {} column", score_column))?
1394            .as_any()
1395            .downcast_ref::<Float32Array>()
1396            .ok_or_else(|| anyhow!("Invalid {} column type", score_column))?;
1397
1398        for i in 0..batch.num_rows() {
1399            results.push((Vid::from(vid_col.value(i)), score_col.value(i)));
1400        }
1401    }
1402    Ok(results)
1403}
1404
1405/// Extracts a `Vec<f32>` from a JSON property value.
1406///
1407/// Returns `None` if the property is missing, not an array, or contains
1408/// non-numeric elements.
1409fn extract_embedding_from_props(
1410    props: &uni_common::Properties,
1411    property: &str,
1412) -> Option<Vec<f32>> {
1413    let arr = props.get(property)?.as_array()?;
1414    arr.iter().map(|v| v.as_f64().map(|f| f as f32)).collect()
1415}
1416
1417/// Merges L0 buffer vertices into LanceDB vector search results.
1418///
1419/// Visits L0 buffers in precedence order (pending flush → main → transaction),
1420/// collects tombstoned VIDs and candidate embeddings, then merges them with the
1421/// existing LanceDB results so that:
1422/// - Tombstoned VIDs are removed (unless re-created in a later L0).
1423/// - VIDs present in both L0 and LanceDB use the L0 distance.
1424/// - New L0-only VIDs are appended.
1425/// - Results are re-sorted by distance ascending and truncated to `k`.
1426fn merge_l0_into_vector_results(
1427    results: &mut Vec<(Vid, f32)>,
1428    ctx: &QueryContext,
1429    label: &str,
1430    property: &str,
1431    query: &[f32],
1432    k: usize,
1433    metric: &DistanceMetric,
1434) {
1435    // Collect all L0 buffers in precedence order (earliest first, last writer wins).
1436    let mut buffers: Vec<Arc<parking_lot::RwLock<L0Buffer>>> =
1437        ctx.pending_flush_l0s.iter().map(Arc::clone).collect();
1438    buffers.push(Arc::clone(&ctx.l0));
1439    if let Some(ref txn) = ctx.transaction_l0 {
1440        buffers.push(Arc::clone(txn));
1441    }
1442
1443    // Maps VID → distance for L0 candidates (last writer wins).
1444    let mut l0_candidates: HashMap<Vid, f32> = HashMap::new();
1445    // Tombstoned VIDs across all L0 buffers.
1446    let mut tombstoned: HashSet<Vid> = HashSet::new();
1447
1448    for buf_arc in &buffers {
1449        let buf = buf_arc.read();
1450
1451        // Accumulate tombstones.
1452        for &vid in &buf.vertex_tombstones {
1453            tombstoned.insert(vid);
1454        }
1455
1456        // Scan vertices with the target label.
1457        for (&vid, labels) in &buf.vertex_labels {
1458            if !labels.iter().any(|l| l == label) {
1459                continue;
1460            }
1461            if let Some(props) = buf.vertex_properties.get(&vid)
1462                && let Some(emb) = extract_embedding_from_props(props, property)
1463            {
1464                if emb.len() != query.len() {
1465                    continue; // dimension mismatch
1466                }
1467                let dist = metric.compute_distance(&emb, query);
1468                // Last writer wins: later buffer overwrites earlier.
1469                l0_candidates.insert(vid, dist);
1470                // If re-created in a later L0, remove from tombstones.
1471                tombstoned.remove(&vid);
1472            }
1473        }
1474    }
1475
1476    // If no L0 activity affects this search, skip merge.
1477    if l0_candidates.is_empty() && tombstoned.is_empty() {
1478        return;
1479    }
1480
1481    // Remove tombstoned VIDs from LanceDB results.
1482    results.retain(|(vid, _)| !tombstoned.contains(vid));
1483
1484    // Overwrite or append L0 candidates.
1485    for (vid, dist) in &l0_candidates {
1486        if let Some(existing) = results.iter_mut().find(|(v, _)| v == vid) {
1487            existing.1 = *dist;
1488        } else {
1489            results.push((*vid, *dist));
1490        }
1491    }
1492
1493    // Re-sort by distance ascending.
1494    results.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
1495    results.truncate(k);
1496}