Skip to main content

uni_store/storage/
manager.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::compaction::{CompactionStats, CompactionStatus, CompactionTask};
5use crate::lancedb::LanceDbStore;
6use crate::runtime::WorkingGraph;
7use crate::runtime::context::QueryContext;
8use crate::runtime::l0::L0Buffer;
9use crate::storage::adjacency::AdjacencyDataset;
10use crate::storage::delta::{DeltaDataset, Op};
11use crate::storage::edge::EdgeDataset;
12use crate::storage::index::UidIndex;
13use crate::storage::inverted_index::InvertedIndex;
14use crate::storage::main_edge::MainEdgeDataset;
15use crate::storage::main_vertex::MainVertexDataset;
16use crate::storage::vertex::VertexDataset;
17use anyhow::{Result, anyhow};
18use arrow_array::{Array, Float32Array, UInt64Array};
19use dashmap::DashMap;
20use futures::TryStreamExt;
21use lancedb::query::{ExecutableQuery, QueryBase};
22use object_store::ObjectStore;
23use object_store::local::LocalFileSystem;
24use parking_lot::RwLock;
25use std::collections::{HashMap, HashSet};
26use std::sync::{Arc, Mutex};
27use tracing::warn;
28use uni_common::config::UniConfig;
29use uni_common::core::id::{Eid, UniId, Vid};
30use uni_common::core::schema::{DistanceMetric, IndexDefinition, SchemaManager};
31use uni_common::sync::acquire_mutex;
32
33use crate::snapshot::manager::SnapshotManager;
34use crate::storage::IndexManager;
35use crate::storage::adjacency_manager::AdjacencyManager;
36use crate::storage::resilient_store::ResilientObjectStore;
37
38use uni_common::core::snapshot::SnapshotManifest;
39
40use uni_common::graph::simple_graph::Direction as GraphDirection;
41
42/// Edge state during subgraph loading - tracks version and deletion status.
43struct EdgeState {
44    neighbor: Vid,
45    version: u64,
46    deleted: bool,
47}
48
49pub struct StorageManager {
50    base_uri: String,
51    store: Arc<dyn ObjectStore>,
52    schema_manager: Arc<SchemaManager>,
53    snapshot_manager: Arc<SnapshotManager>,
54    adjacency_manager: Arc<AdjacencyManager>,
55    /// Cache of opened LanceDB tables by label name for vector search performance
56    table_cache: DashMap<String, lancedb::Table>,
57    pub config: UniConfig,
58    pub compaction_status: Arc<Mutex<CompactionStatus>>,
59    /// Optional pinned snapshot for time-travel
60    pinned_snapshot: Option<SnapshotManifest>,
61    /// LanceDB store for DataFusion-powered queries.
62    lancedb_store: Arc<LanceDbStore>,
63    /// In-memory VID-to-labels index for O(1) lookups (optional, configurable)
64    vid_labels_index: Option<Arc<parking_lot::RwLock<crate::storage::vid_labels::VidLabelsIndex>>>,
65}
66
67/// Helper to manage compaction_in_progress flag
68struct CompactionGuard {
69    status: Arc<Mutex<CompactionStatus>>,
70}
71
72impl CompactionGuard {
73    fn new(status: Arc<Mutex<CompactionStatus>>) -> Option<Self> {
74        let mut s = acquire_mutex(&status, "compaction_status").ok()?;
75        if s.compaction_in_progress {
76            return None;
77        }
78        s.compaction_in_progress = true;
79        Some(Self {
80            status: status.clone(),
81        })
82    }
83}
84
85impl Drop for CompactionGuard {
86    fn drop(&mut self) {
87        // CRITICAL: Never panic in Drop - panicking in drop() = process ABORT.
88        // See issue #18/#150. If the lock is poisoned, log and continue gracefully.
89        match uni_common::sync::acquire_mutex(&self.status, "compaction_status") {
90            Ok(mut s) => {
91                s.compaction_in_progress = false;
92                s.last_compaction = Some(std::time::SystemTime::now());
93            }
94            Err(e) => {
95                // Lock is poisoned but we're in Drop - cannot panic.
96                // Log the error and continue. System state may be inconsistent but at least
97                // we don't abort the process.
98                log::error!(
99                    "CompactionGuard drop failed to acquire poisoned lock: {}. \
100                     Compaction status may be inconsistent. Issue #18/#150",
101                    e
102                );
103            }
104        }
105    }
106}
107
108impl StorageManager {
109    /// Create a new StorageManager with LanceDB integration.
110    pub async fn new(base_uri: &str, schema_manager: Arc<SchemaManager>) -> Result<Self> {
111        Self::new_with_config(base_uri, schema_manager, UniConfig::default()).await
112    }
113
114    /// Create a new StorageManager with custom cache size.
115    pub async fn new_with_cache(
116        base_uri: &str,
117        schema_manager: Arc<SchemaManager>,
118        adjacency_cache_size: usize,
119    ) -> Result<Self> {
120        let config = UniConfig {
121            cache_size: adjacency_cache_size,
122            ..Default::default()
123        };
124        Self::new_with_config(base_uri, schema_manager, config).await
125    }
126
127    /// Create a new StorageManager with custom configuration.
128    pub async fn new_with_config(
129        base_uri: &str,
130        schema_manager: Arc<SchemaManager>,
131        config: UniConfig,
132    ) -> Result<Self> {
133        let store = Self::build_store_from_uri(base_uri)?;
134        Self::new_with_store_and_config(base_uri, store, schema_manager, config).await
135    }
136
137    /// Create a new StorageManager using an already-constructed object store.
138    ///
139    /// This is used by higher layers that need explicit store configuration
140    /// (for example custom S3 endpoints in hybrid/cloud modes).
141    pub async fn new_with_store_and_config(
142        base_uri: &str,
143        store: Arc<dyn ObjectStore>,
144        schema_manager: Arc<SchemaManager>,
145        config: UniConfig,
146    ) -> Result<Self> {
147        Self::new_with_store_and_storage_options(base_uri, store, schema_manager, config, None)
148            .await
149    }
150
151    /// Create a new StorageManager using an already-constructed object store
152    /// and explicit LanceDB storage options.
153    pub async fn new_with_store_and_storage_options(
154        base_uri: &str,
155        store: Arc<dyn ObjectStore>,
156        schema_manager: Arc<SchemaManager>,
157        config: UniConfig,
158        lancedb_storage_options: Option<HashMap<String, String>>,
159    ) -> Result<Self> {
160        let resilient_store: Arc<dyn ObjectStore> = Arc::new(ResilientObjectStore::new(
161            store,
162            config.object_store.clone(),
163        ));
164
165        let snapshot_manager = Arc::new(SnapshotManager::new(resilient_store.clone()));
166
167        // Connect to LanceDB
168        let lancedb_store =
169            LanceDbStore::connect_with_storage_options(base_uri, lancedb_storage_options).await?;
170
171        // Perform crash recovery for all known table patterns
172        Self::recover_all_staging_tables(&lancedb_store, &schema_manager).await?;
173
174        let mut sm = Self {
175            base_uri: base_uri.to_string(),
176            store: resilient_store,
177            schema_manager,
178            snapshot_manager,
179            adjacency_manager: Arc::new(AdjacencyManager::new(config.cache_size)),
180            table_cache: DashMap::new(),
181            config,
182            compaction_status: Arc::new(Mutex::new(CompactionStatus::default())),
183            pinned_snapshot: None,
184            lancedb_store: Arc::new(lancedb_store),
185            vid_labels_index: None,
186        };
187
188        // Rebuild VidLabelsIndex if enabled
189        if sm.config.enable_vid_labels_index
190            && let Err(e) = sm.rebuild_vid_labels_index().await
191        {
192            warn!(
193                "Failed to rebuild VidLabelsIndex on startup: {}. Falling back to LanceDB queries.",
194                e
195            );
196        }
197
198        Ok(sm)
199    }
200
201    /// Recover all staging tables for known table patterns.
202    ///
203    /// This runs on startup to handle crash recovery. It checks for staging tables
204    /// for all vertex labels, adjacency tables, delta tables, and main tables.
205    async fn recover_all_staging_tables(
206        lancedb_store: &LanceDbStore,
207        schema_manager: &SchemaManager,
208    ) -> Result<()> {
209        let schema = schema_manager.schema();
210
211        // Recover main vertex and edge tables
212        lancedb_store
213            .recover_staging(LanceDbStore::main_vertex_table_name())
214            .await?;
215        lancedb_store
216            .recover_staging(LanceDbStore::main_edge_table_name())
217            .await?;
218
219        // Recover per-label vertex tables
220        for label in schema.labels.keys() {
221            let table_name = LanceDbStore::vertex_table_name(label);
222            lancedb_store.recover_staging(&table_name).await?;
223        }
224
225        // Recover adjacency and delta tables for each edge type and direction
226        for edge_type in schema.edge_types.keys() {
227            for direction in &["fwd", "bwd"] {
228                // Recover delta tables
229                let delta_table_name = LanceDbStore::delta_table_name(edge_type, direction);
230                lancedb_store.recover_staging(&delta_table_name).await?;
231
232                // Recover adjacency tables for each label
233                for _label in schema.labels.keys() {
234                    let adj_table_name = LanceDbStore::adjacency_table_name(edge_type, direction);
235                    lancedb_store.recover_staging(&adj_table_name).await?;
236                }
237            }
238        }
239
240        Ok(())
241    }
242
243    fn build_store_from_uri(base_uri: &str) -> Result<Arc<dyn ObjectStore>> {
244        if base_uri.contains("://") {
245            let parsed = url::Url::parse(base_uri).map_err(|e| anyhow!("Invalid base URI: {e}"))?;
246            let (store, _path) = object_store::parse_url(&parsed)
247                .map_err(|e| anyhow!("Failed to parse object store URL: {e}"))?;
248            Ok(Arc::from(store))
249        } else {
250            // If local path, ensure it exists.
251            std::fs::create_dir_all(base_uri)?;
252            Ok(Arc::new(LocalFileSystem::new_with_prefix(base_uri)?))
253        }
254    }
255
256    pub fn pinned(&self, snapshot: SnapshotManifest) -> Self {
257        Self {
258            base_uri: self.base_uri.clone(),
259            store: self.store.clone(),
260            schema_manager: self.schema_manager.clone(),
261            snapshot_manager: self.snapshot_manager.clone(),
262            // Separate AdjacencyManager for snapshot isolation (Issue #73):
263            // warm() will load only edges visible at the snapshot's HWM.
264            // This prevents live DB's CSR (with all edges) from leaking into snapshots.
265            adjacency_manager: Arc::new(AdjacencyManager::new(self.adjacency_manager.max_bytes())),
266            table_cache: DashMap::new(),
267            config: self.config.clone(),
268            compaction_status: Arc::new(Mutex::new(CompactionStatus::default())),
269            pinned_snapshot: Some(snapshot),
270            lancedb_store: self.lancedb_store.clone(),
271            vid_labels_index: self.vid_labels_index.clone(),
272        }
273    }
274
275    pub fn get_edge_version_by_id(&self, edge_type_id: u32) -> Option<u64> {
276        let schema = self.schema_manager.schema();
277        let name = schema.edge_type_name_by_id(edge_type_id)?;
278        self.pinned_snapshot
279            .as_ref()
280            .and_then(|s| s.edges.get(name).map(|es| es.lance_version))
281    }
282
283    /// Returns the version_high_water_mark from the pinned snapshot if present.
284    ///
285    /// This is used for time-travel queries to filter data by version.
286    /// When a snapshot is pinned, only rows with `_version <= version_high_water_mark`
287    /// should be considered visible.
288    pub fn version_high_water_mark(&self) -> Option<u64> {
289        self.pinned_snapshot
290            .as_ref()
291            .map(|s| s.version_high_water_mark)
292    }
293
294    /// Apply version filtering to a base filter expression.
295    ///
296    /// If a snapshot is pinned, wraps `base_filter` with an additional
297    /// `_version <= hwm` clause. Otherwise returns `base_filter` unchanged.
298    pub fn apply_version_filter(&self, base_filter: String) -> String {
299        if let Some(hwm) = self.version_high_water_mark() {
300            format!("({}) AND (_version <= {})", base_filter, hwm)
301        } else {
302            base_filter
303        }
304    }
305
306    /// Build a filter expression that excludes soft-deleted rows and optionally
307    /// includes a user-provided filter.
308    fn build_active_filter(user_filter: Option<&str>) -> String {
309        match user_filter {
310            Some(expr) => format!("({}) AND (_deleted = false)", expr),
311            None => "_deleted = false".to_string(),
312        }
313    }
314
315    pub fn store(&self) -> Arc<dyn ObjectStore> {
316        self.store.clone()
317    }
318
319    /// Get current compaction status.
320    ///
321    /// # Errors
322    ///
323    /// Returns error if the compaction status lock is poisoned (see issue #18/#150).
324    pub fn compaction_status(
325        &self,
326    ) -> Result<CompactionStatus, uni_common::sync::LockPoisonedError> {
327        let guard = uni_common::sync::acquire_mutex(&self.compaction_status, "compaction_status")?;
328        Ok(guard.clone())
329    }
330
331    pub async fn compact(&self) -> Result<CompactionStats> {
332        // LanceDB handles compaction internally via optimize()
333        // For now, call optimize on vertex tables
334        let start = std::time::Instant::now();
335        let schema = self.schema_manager.schema();
336        let mut files_compacted = 0;
337
338        for label in schema.labels.keys() {
339            let table_name = LanceDbStore::vertex_table_name(label);
340            if self.lancedb_store.table_exists(&table_name).await? {
341                let table = self.lancedb_store.open_table(&table_name).await?;
342                table.optimize(lancedb::table::OptimizeAction::All).await?;
343                files_compacted += 1;
344                self.invalidate_table_cache(label);
345            }
346        }
347
348        Ok(CompactionStats {
349            files_compacted,
350            bytes_before: 0,
351            bytes_after: 0,
352            duration: start.elapsed(),
353            crdt_merges: 0,
354        })
355    }
356
357    pub async fn compact_label(&self, label: &str) -> Result<CompactionStats> {
358        let _guard = CompactionGuard::new(self.compaction_status.clone())
359            .ok_or_else(|| anyhow!("Compaction already in progress"))?;
360
361        let start = std::time::Instant::now();
362        let table_name = LanceDbStore::vertex_table_name(label);
363
364        if self.lancedb_store.table_exists(&table_name).await? {
365            let table = self.lancedb_store.open_table(&table_name).await?;
366            table.optimize(lancedb::table::OptimizeAction::All).await?;
367            self.invalidate_table_cache(label);
368        }
369
370        Ok(CompactionStats {
371            files_compacted: 1,
372            bytes_before: 0,
373            bytes_after: 0,
374            duration: start.elapsed(),
375            crdt_merges: 0,
376        })
377    }
378
379    pub async fn compact_edge_type(&self, edge_type: &str) -> Result<CompactionStats> {
380        let _guard = CompactionGuard::new(self.compaction_status.clone())
381            .ok_or_else(|| anyhow!("Compaction already in progress"))?;
382
383        let start = std::time::Instant::now();
384        let mut files_compacted = 0;
385
386        for dir in ["fwd", "bwd"] {
387            let table_name = LanceDbStore::delta_table_name(edge_type, dir);
388            if self.lancedb_store.table_exists(&table_name).await? {
389                let table = self.lancedb_store.open_table(&table_name).await?;
390                table.optimize(lancedb::table::OptimizeAction::All).await?;
391                files_compacted += 1;
392            }
393        }
394
395        Ok(CompactionStats {
396            files_compacted,
397            bytes_before: 0,
398            bytes_after: 0,
399            duration: start.elapsed(),
400            crdt_merges: 0,
401        })
402    }
403
404    pub async fn wait_for_compaction(&self) -> Result<()> {
405        loop {
406            let in_progress = {
407                acquire_mutex(&self.compaction_status, "compaction_status")?.compaction_in_progress
408            };
409            if !in_progress {
410                return Ok(());
411            }
412            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
413        }
414    }
415
416    pub fn start_background_compaction(
417        self: Arc<Self>,
418        mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
419    ) -> tokio::task::JoinHandle<()> {
420        if !self.config.compaction.enabled {
421            return tokio::spawn(async {});
422        }
423
424        tokio::spawn(async move {
425            let mut interval = tokio::time::interval(self.config.compaction.check_interval);
426
427            loop {
428                tokio::select! {
429                    _ = interval.tick() => {
430                        if let Err(e) = self.update_compaction_status().await {
431                            log::error!("Failed to update compaction status: {}", e);
432                            continue;
433                        }
434
435                        if let Some(task) = self.pick_compaction_task() {
436                            log::info!("Triggering background compaction: {:?}", task);
437                            if let Err(e) = self.execute_compaction(task).await {
438                                log::error!("Compaction failed: {}", e);
439                            }
440                        }
441                    }
442                    _ = shutdown_rx.recv() => {
443                        log::info!("Background compaction shutting down");
444                        let _ = self.wait_for_compaction().await;
445                        break;
446                    }
447                }
448            }
449        })
450    }
451
452    async fn update_compaction_status(&self) -> Result<()> {
453        let schema = self.schema_manager.schema();
454        let mut total_tables = 0;
455
456        // Count edge delta tables
457        for name in schema.edge_types.keys() {
458            for dir in ["fwd", "bwd"] {
459                let table_name = LanceDbStore::delta_table_name(name, dir);
460                if self.lancedb_store.table_exists(&table_name).await? {
461                    total_tables += 1;
462                }
463            }
464        }
465
466        let mut status = acquire_mutex(&self.compaction_status, "compaction_status")?;
467        status.l1_runs = total_tables;
468        status.l1_size_bytes = 0; // LanceDB doesn't expose size easily
469        Ok(())
470    }
471
472    fn pick_compaction_task(&self) -> Option<CompactionTask> {
473        let status = acquire_mutex(&self.compaction_status, "compaction_status").ok()?;
474
475        if status.l1_runs >= self.config.compaction.max_l1_runs {
476            return Some(CompactionTask::ByRunCount);
477        }
478        if status.l1_size_bytes >= self.config.compaction.max_l1_size_bytes {
479            return Some(CompactionTask::BySize);
480        }
481        // TODO: Age check
482
483        None
484    }
485
486    async fn execute_compaction(&self, _task: CompactionTask) -> Result<CompactionStats> {
487        let start = std::time::Instant::now();
488        // Use guard for automatic flag management
489        let _guard = CompactionGuard::new(self.compaction_status.clone())
490            .ok_or_else(|| anyhow!("Compaction already in progress"))?;
491
492        let schema = self.schema_manager.schema();
493        let mut files_compacted = 0;
494
495        // Optimize all edge delta tables
496        for name in schema.edge_types.keys() {
497            for dir in ["fwd", "bwd"] {
498                let table_name = LanceDbStore::delta_table_name(name, dir);
499                if self.lancedb_store.table_exists(&table_name).await? {
500                    let table = self.lancedb_store.open_table(&table_name).await?;
501                    table.optimize(lancedb::table::OptimizeAction::All).await?;
502                    files_compacted += 1;
503                }
504            }
505        }
506
507        // Optimize vertex tables
508        for label in schema.labels.keys() {
509            let table_name = LanceDbStore::vertex_table_name(label);
510            if self.lancedb_store.table_exists(&table_name).await? {
511                let table = self.lancedb_store.open_table(&table_name).await?;
512                table.optimize(lancedb::table::OptimizeAction::All).await?;
513                files_compacted += 1;
514                self.invalidate_table_cache(label);
515            }
516        }
517
518        {
519            let mut status = acquire_mutex(&self.compaction_status, "compaction_status")?;
520            status.total_compactions += 1;
521        }
522
523        Ok(CompactionStats {
524            files_compacted,
525            bytes_before: 0,
526            bytes_after: 0,
527            duration: start.elapsed(),
528            crdt_merges: 0,
529        })
530    }
531
532    /// Get or open a cached table for a label
533    pub async fn get_cached_table(&self, label: &str) -> Result<lancedb::Table> {
534        // Check cache first
535        if let Some(table) = self.table_cache.get(label) {
536            return Ok(table.clone());
537        }
538
539        // Open and cache
540        let table_name = LanceDbStore::vertex_table_name(label);
541        let table = self.lancedb_store.open_table(&table_name).await?;
542
543        self.table_cache.insert(label.to_string(), table.clone());
544        Ok(table)
545    }
546
547    /// Invalidate cached table (call after writes)
548    pub fn invalidate_table_cache(&self, label: &str) {
549        self.table_cache.remove(label);
550    }
551
552    /// Clear all cached tables
553    pub fn clear_table_cache(&self) {
554        self.table_cache.clear();
555    }
556
557    pub fn base_path(&self) -> &str {
558        &self.base_uri
559    }
560
561    pub fn schema_manager(&self) -> &SchemaManager {
562        &self.schema_manager
563    }
564
565    pub fn schema_manager_arc(&self) -> Arc<SchemaManager> {
566        self.schema_manager.clone()
567    }
568
569    /// Get the adjacency manager for the dual-CSR architecture.
570    pub fn adjacency_manager(&self) -> Arc<AdjacencyManager> {
571        Arc::clone(&self.adjacency_manager)
572    }
573
574    /// Warm the adjacency manager for a specific edge type and direction.
575    ///
576    /// Builds the Main CSR from L2 adjacency + L1 delta data in storage.
577    /// Called lazily on first access per edge type or at startup.
578    pub async fn warm_adjacency(
579        &self,
580        edge_type_id: u32,
581        direction: crate::storage::direction::Direction,
582        version: Option<u64>,
583    ) -> anyhow::Result<()> {
584        self.adjacency_manager
585            .warm(self, edge_type_id, direction, version)
586            .await
587    }
588
589    /// Coalesced warm_adjacency() to prevent cache stampede (Issue #13).
590    ///
591    /// Uses double-checked locking to ensure only one concurrent warm() per
592    /// (edge_type, direction) key. Subsequent callers wait for the first to complete.
593    pub async fn warm_adjacency_coalesced(
594        &self,
595        edge_type_id: u32,
596        direction: crate::storage::direction::Direction,
597        version: Option<u64>,
598    ) -> anyhow::Result<()> {
599        self.adjacency_manager
600            .warm_coalesced(self, edge_type_id, direction, version)
601            .await
602    }
603
604    /// Check whether the adjacency manager has a CSR for the given edge type and direction.
605    pub fn has_adjacency_csr(
606        &self,
607        edge_type_id: u32,
608        direction: crate::storage::direction::Direction,
609    ) -> bool {
610        self.adjacency_manager.has_csr(edge_type_id, direction)
611    }
612
613    /// Get neighbors at a specific version for snapshot queries.
614    pub fn get_neighbors_at_version(
615        &self,
616        vid: uni_common::core::id::Vid,
617        edge_type: u32,
618        direction: crate::storage::direction::Direction,
619        version: u64,
620    ) -> Vec<(uni_common::core::id::Vid, uni_common::core::id::Eid)> {
621        self.adjacency_manager
622            .get_neighbors_at_version(vid, edge_type, direction, version)
623    }
624
625    /// Get the LanceDB store.
626    pub fn lancedb_store(&self) -> &LanceDbStore {
627        &self.lancedb_store
628    }
629
630    /// Get the LanceDB store as an Arc.
631    pub fn lancedb_store_arc(&self) -> Arc<LanceDbStore> {
632        self.lancedb_store.clone()
633    }
634
635    /// Rebuild the VidLabelsIndex from the main vertex table.
636    /// This is called on startup if enable_vid_labels_index is true.
637    async fn rebuild_vid_labels_index(&mut self) -> Result<()> {
638        use crate::lancedb::LanceDbStore;
639        use crate::storage::vid_labels::VidLabelsIndex;
640
641        let lancedb_store = self.lancedb_store();
642
643        // Open the main vertex table
644        let table = match lancedb_store
645            .open_table(LanceDbStore::main_vertex_table_name())
646            .await
647        {
648            Ok(t) => t,
649            Err(_) => {
650                // Table doesn't exist yet (fresh database)
651                self.vid_labels_index =
652                    Some(Arc::new(parking_lot::RwLock::new(VidLabelsIndex::new())));
653                return Ok(());
654            }
655        };
656
657        // Scan all non-deleted vertices and collect (VID, labels)
658        let batches = table
659            .query()
660            .only_if("_deleted = false")
661            .limit(100_000) // Reasonable batch size
662            .execute()
663            .await
664            .map_err(|e| anyhow!("Failed to query main vertex table: {}", e))?
665            .try_collect::<Vec<_>>()
666            .await
667            .map_err(|e| anyhow!("Failed to collect vertex data: {}", e))?;
668
669        let mut index = VidLabelsIndex::new();
670        for batch in batches {
671            let vid_col = batch
672                .column_by_name("_vid")
673                .ok_or_else(|| anyhow!("Missing _vid column"))?
674                .as_any()
675                .downcast_ref::<UInt64Array>()
676                .ok_or_else(|| anyhow!("Invalid _vid column type"))?;
677
678            let labels_col = batch
679                .column_by_name("labels")
680                .ok_or_else(|| anyhow!("Missing labels column"))?
681                .as_any()
682                .downcast_ref::<arrow_array::ListArray>()
683                .ok_or_else(|| anyhow!("Invalid labels column type"))?;
684
685            for row_idx in 0..batch.num_rows() {
686                let vid = Vid::from(vid_col.value(row_idx));
687                let labels_array = labels_col.value(row_idx);
688                let labels_str_array = labels_array
689                    .as_any()
690                    .downcast_ref::<arrow_array::StringArray>()
691                    .ok_or_else(|| anyhow!("Invalid labels array element type"))?;
692
693                let labels: Vec<String> = (0..labels_str_array.len())
694                    .map(|i| labels_str_array.value(i).to_string())
695                    .collect();
696
697                index.insert(vid, labels);
698            }
699        }
700
701        self.vid_labels_index = Some(Arc::new(parking_lot::RwLock::new(index)));
702        Ok(())
703    }
704
705    /// Get labels for a VID from the in-memory index.
706    /// Returns None if the index is disabled or the VID is not found.
707    pub fn get_labels_from_index(&self, vid: Vid) -> Option<Vec<String>> {
708        self.vid_labels_index.as_ref().and_then(|idx| {
709            let index = idx.read();
710            index.get_labels(vid).map(|labels| labels.to_vec())
711        })
712    }
713
714    /// Update the VID-to-labels mapping in the index.
715    /// No-op if the index is disabled.
716    pub fn update_vid_labels_index(&self, vid: Vid, labels: Vec<String>) {
717        if let Some(idx) = &self.vid_labels_index {
718            let mut index = idx.write();
719            index.insert(vid, labels);
720        }
721    }
722
723    /// Remove a VID from the labels index.
724    /// No-op if the index is disabled.
725    pub fn remove_from_vid_labels_index(&self, vid: Vid) {
726        if let Some(idx) = &self.vid_labels_index {
727            let mut index = idx.write();
728            index.remove_vid(vid);
729        }
730    }
731
732    pub async fn load_subgraph_cached(
733        &self,
734        start_vids: &[Vid],
735        edge_types: &[u32],
736        max_hops: usize,
737        direction: GraphDirection,
738        _l0: Option<Arc<RwLock<L0Buffer>>>,
739    ) -> Result<WorkingGraph> {
740        let mut graph = WorkingGraph::new();
741
742        let dir = match direction {
743            GraphDirection::Outgoing => crate::storage::direction::Direction::Outgoing,
744            GraphDirection::Incoming => crate::storage::direction::Direction::Incoming,
745        };
746
747        let neighbor_is_dst = matches!(direction, GraphDirection::Outgoing);
748
749        // Initialize frontier
750        let mut frontier: Vec<Vid> = start_vids.to_vec();
751        let mut visited: HashSet<Vid> = HashSet::new();
752
753        // Initialize start vids
754        for &vid in start_vids {
755            graph.add_vertex(vid);
756        }
757
758        for _hop in 0..max_hops {
759            let mut next_frontier = HashSet::new();
760
761            for &vid in &frontier {
762                if visited.contains(&vid) {
763                    continue;
764                }
765                visited.insert(vid);
766                graph.add_vertex(vid);
767
768                for &etype_id in edge_types {
769                    // Warm adjacency with coalescing to prevent cache stampede (Issue #13)
770                    let edge_ver = self.version_high_water_mark();
771                    self.adjacency_manager
772                        .warm_coalesced(self, etype_id, dir, edge_ver)
773                        .await?;
774
775                    // Get neighbors from AdjacencyManager (Main CSR + overlay)
776                    let edges = self.adjacency_manager.get_neighbors(vid, etype_id, dir);
777
778                    for (neighbor_vid, eid) in edges {
779                        graph.add_vertex(neighbor_vid);
780                        if !visited.contains(&neighbor_vid) {
781                            next_frontier.insert(neighbor_vid);
782                        }
783
784                        if neighbor_is_dst {
785                            graph.add_edge(vid, neighbor_vid, eid, etype_id);
786                        } else {
787                            graph.add_edge(neighbor_vid, vid, eid, etype_id);
788                        }
789                    }
790                }
791            }
792            frontier = next_frontier.into_iter().collect();
793
794            // Early termination: if frontier is empty, no more vertices to explore
795            if frontier.is_empty() {
796                break;
797            }
798        }
799
800        Ok(graph)
801    }
802
803    pub fn snapshot_manager(&self) -> &SnapshotManager {
804        &self.snapshot_manager
805    }
806
807    pub fn index_manager(&self) -> IndexManager {
808        IndexManager::new(
809            &self.base_uri,
810            self.schema_manager.clone(),
811            self.lancedb_store.clone(),
812        )
813    }
814
815    pub fn vertex_dataset(&self, label: &str) -> Result<VertexDataset> {
816        let schema = self.schema_manager.schema();
817        let label_meta = schema
818            .labels
819            .get(label)
820            .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
821        Ok(VertexDataset::new(&self.base_uri, label, label_meta.id))
822    }
823
824    pub fn edge_dataset(
825        &self,
826        edge_type: &str,
827        src_label: &str,
828        dst_label: &str,
829    ) -> Result<EdgeDataset> {
830        Ok(EdgeDataset::new(
831            &self.base_uri,
832            edge_type,
833            src_label,
834            dst_label,
835        ))
836    }
837
838    pub fn delta_dataset(&self, edge_type: &str, direction: &str) -> Result<DeltaDataset> {
839        Ok(DeltaDataset::new(&self.base_uri, edge_type, direction))
840    }
841
842    pub fn adjacency_dataset(
843        &self,
844        edge_type: &str,
845        label: &str,
846        direction: &str,
847    ) -> Result<AdjacencyDataset> {
848        Ok(AdjacencyDataset::new(
849            &self.base_uri,
850            edge_type,
851            label,
852            direction,
853        ))
854    }
855
856    /// Get the main vertex dataset for unified vertex storage.
857    ///
858    /// The main vertex dataset contains all vertices regardless of label,
859    /// enabling fast ID-based lookups without knowing the label.
860    pub fn main_vertex_dataset(&self) -> MainVertexDataset {
861        MainVertexDataset::new(&self.base_uri)
862    }
863
864    /// Get the main edge dataset for unified edge storage.
865    ///
866    /// The main edge dataset contains all edges regardless of type,
867    /// enabling fast ID-based lookups without knowing the edge type.
868    pub fn main_edge_dataset(&self) -> MainEdgeDataset {
869        MainEdgeDataset::new(&self.base_uri)
870    }
871
872    pub fn uid_index(&self, label: &str) -> Result<UidIndex> {
873        Ok(UidIndex::new(&self.base_uri, label))
874    }
875
876    pub async fn inverted_index(&self, label: &str, property: &str) -> Result<InvertedIndex> {
877        let schema = self.schema_manager.schema();
878        let config = schema
879            .indexes
880            .iter()
881            .find_map(|idx| match idx {
882                IndexDefinition::Inverted(cfg)
883                    if cfg.label == label && cfg.property == property =>
884                {
885                    Some(cfg.clone())
886                }
887                _ => None,
888            })
889            .ok_or_else(|| anyhow!("Inverted index not found for {}.{}", label, property))?;
890
891        InvertedIndex::new(&self.base_uri, config).await
892    }
893
894    pub async fn vector_search(
895        &self,
896        label: &str,
897        property: &str,
898        query: &[f32],
899        k: usize,
900        filter: Option<&str>,
901        ctx: Option<&QueryContext>,
902    ) -> Result<Vec<(Vid, f32)>> {
903        // Look up vector index config to get the correct distance metric.
904        let schema = self.schema_manager.schema();
905        let metric = schema
906            .vector_index_for_property(label, property)
907            .map(|config| config.metric.clone())
908            .unwrap_or(DistanceMetric::L2);
909
910        // Try to open the cached table; if the label has no data yet the Lance
911        // table won't exist. In that case fall back to L0-only results.
912        let table = self.get_cached_table(label).await.ok();
913
914        let mut results = Vec::new();
915
916        if let Some(table) = table {
917            let distance_type = match &metric {
918                DistanceMetric::L2 => lancedb::DistanceType::L2,
919                DistanceMetric::Cosine => lancedb::DistanceType::Cosine,
920                DistanceMetric::Dot => lancedb::DistanceType::Dot,
921                _ => lancedb::DistanceType::L2,
922            };
923
924            // Use LanceDB's vector search API
925            let mut query_builder = table
926                .vector_search(query.to_vec())
927                .map_err(|e| anyhow!("Failed to create vector search: {}", e))?
928                .column(property)
929                .distance_type(distance_type)
930                .limit(k);
931
932            query_builder = query_builder.only_if(Self::build_active_filter(filter));
933
934            // Apply version filtering if snapshot is pinned
935            if ctx.is_some()
936                && let Some(hwm) = self.version_high_water_mark()
937            {
938                query_builder = query_builder.only_if(format!("_version <= {}", hwm));
939            }
940
941            let batches = query_builder
942                .execute()
943                .await
944                .map_err(|e| anyhow!("Vector search execution failed: {}", e))?
945                .try_collect::<Vec<_>>()
946                .await
947                .map_err(|e| anyhow!("Failed to collect vector search results: {}", e))?;
948
949            results = extract_vid_score_pairs(&batches, "_vid", "_distance")?;
950        }
951
952        // Merge L0 buffer vertices into results for visibility of unflushed data.
953        if let Some(qctx) = ctx {
954            merge_l0_into_vector_results(&mut results, qctx, label, property, query, k, &metric);
955        }
956
957        Ok(results)
958    }
959
960    /// Perform a full-text search with BM25 scoring.
961    ///
962    /// Returns vertices matching the search query along with their BM25 scores.
963    /// Results are sorted by score descending (most relevant first).
964    ///
965    /// # Arguments
966    /// * `label` - The label to search within
967    /// * `property` - The property column to search (must have FTS index)
968    /// * `query` - The search query text
969    /// * `k` - Maximum number of results to return
970    /// * `filter` - Optional Lance filter expression
971    /// * `ctx` - Optional query context for visibility checks
972    ///
973    /// # Returns
974    /// Vector of (Vid, score) tuples, where score is the BM25 relevance score.
975    pub async fn fts_search(
976        &self,
977        label: &str,
978        property: &str,
979        query: &str,
980        k: usize,
981        filter: Option<&str>,
982        ctx: Option<&QueryContext>,
983    ) -> Result<Vec<(Vid, f32)>> {
984        use lance_index::scalar::FullTextSearchQuery;
985        use lance_index::scalar::inverted::query::MatchQuery;
986
987        // Try to open the cached table; if the label has no data yet the Lance
988        // table won't exist. In that case return empty results.
989        let table = match self.get_cached_table(label).await {
990            Ok(t) => t,
991            Err(_) => return Ok(Vec::new()),
992        };
993
994        // Build the FTS query with specific column
995        let match_query =
996            MatchQuery::new(query.to_string()).with_column(Some(property.to_string()));
997        let fts_query = FullTextSearchQuery {
998            query: match_query.into(),
999            limit: Some(k as i64),
1000            wand_factor: None,
1001        };
1002
1003        let mut query_builder = table.query().full_text_search(fts_query).limit(k);
1004
1005        query_builder = query_builder.only_if(Self::build_active_filter(filter));
1006
1007        // Apply version filtering if snapshot is pinned
1008        if ctx.is_some()
1009            && let Some(hwm) = self.version_high_water_mark()
1010        {
1011            query_builder = query_builder.only_if(format!("_version <= {}", hwm));
1012        }
1013
1014        let batches = query_builder
1015            .execute()
1016            .await
1017            .map_err(|e| anyhow!("FTS search execution failed: {}", e))?
1018            .try_collect::<Vec<_>>()
1019            .await
1020            .map_err(|e| anyhow!("Failed to collect FTS search results: {}", e))?;
1021
1022        let mut results = extract_vid_score_pairs(&batches, "_vid", "_score")?;
1023
1024        // Results should already be sorted by score from Lance, but ensure descending order
1025        results.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
1026
1027        Ok(results)
1028    }
1029
1030    pub async fn get_vertex_by_uid(&self, uid: &UniId, label: &str) -> Result<Option<Vid>> {
1031        let index = self.uid_index(label)?;
1032        index.get_vid(uid).await
1033    }
1034
1035    pub async fn insert_vertex_with_uid(&self, label: &str, vid: Vid, uid: UniId) -> Result<()> {
1036        let index = self.uid_index(label)?;
1037        index.write_mapping(&[(uid, vid)]).await
1038    }
1039
1040    pub async fn load_subgraph(
1041        &self,
1042        start_vids: &[Vid],
1043        edge_types: &[u32],
1044        max_hops: usize,
1045        direction: GraphDirection,
1046        l0: Option<&L0Buffer>,
1047    ) -> Result<WorkingGraph> {
1048        let mut graph = WorkingGraph::new();
1049        let schema = self.schema_manager.schema();
1050
1051        // Build maps for ID lookups
1052        let label_map: HashMap<u16, String> = schema
1053            .labels
1054            .values()
1055            .map(|meta| {
1056                (
1057                    meta.id,
1058                    schema.label_name_by_id(meta.id).unwrap().to_owned(),
1059                )
1060            })
1061            .collect();
1062
1063        let edge_type_map: HashMap<u32, String> = schema
1064            .edge_types
1065            .values()
1066            .map(|meta| {
1067                (
1068                    meta.id,
1069                    schema.edge_type_name_by_id(meta.id).unwrap().to_owned(),
1070                )
1071            })
1072            .collect();
1073
1074        let target_edge_types: HashSet<u32> = edge_types.iter().cloned().collect();
1075
1076        // Initialize frontier
1077        let mut frontier: Vec<Vid> = start_vids.to_vec();
1078        let mut visited: HashSet<Vid> = HashSet::new();
1079
1080        // Add start vertices to graph
1081        for &vid in start_vids {
1082            graph.add_vertex(vid);
1083        }
1084
1085        for _hop in 0..max_hops {
1086            let mut next_frontier = HashSet::new();
1087
1088            for &vid in &frontier {
1089                if visited.contains(&vid) {
1090                    continue;
1091                }
1092                visited.insert(vid);
1093                graph.add_vertex(vid);
1094
1095                // For each edge type we want to traverse
1096                for &etype_id in &target_edge_types {
1097                    let etype_name = edge_type_map
1098                        .get(&etype_id)
1099                        .ok_or_else(|| anyhow!("Unknown edge type ID: {}", etype_id))?;
1100
1101                    // Determine directions
1102                    // Storage direction: "fwd" or "bwd".
1103                    // Query direction: Outgoing -> "fwd", Incoming -> "bwd".
1104                    let (dir_str, neighbor_is_dst) = match direction {
1105                        GraphDirection::Outgoing => ("fwd", true),
1106                        GraphDirection::Incoming => ("bwd", false),
1107                    };
1108
1109                    let mut edges: HashMap<Eid, EdgeState> = HashMap::new();
1110
1111                    // 1. L2: Adjacency (Base)
1112                    // In the new storage model, VIDs don't embed label info.
1113                    // We need to try all labels to find the adjacency data.
1114                    // Edge version from snapshot (reserved for future version filtering)
1115                    let _edge_ver = self
1116                        .pinned_snapshot
1117                        .as_ref()
1118                        .and_then(|s| s.edges.get(etype_name).map(|es| es.lance_version));
1119
1120                    // Try each label until we find adjacency data
1121                    let lancedb_store = self.lancedb_store();
1122                    for current_src_label in label_map.values() {
1123                        let adj_ds =
1124                            match self.adjacency_dataset(etype_name, current_src_label, dir_str) {
1125                                Ok(ds) => ds,
1126                                Err(_) => continue,
1127                            };
1128                        if let Some((neighbors, eids)) =
1129                            adj_ds.read_adjacency_lancedb(lancedb_store, vid).await?
1130                        {
1131                            for (n, eid) in neighbors.into_iter().zip(eids) {
1132                                edges.insert(
1133                                    eid,
1134                                    EdgeState {
1135                                        neighbor: n,
1136                                        version: 0,
1137                                        deleted: false,
1138                                    },
1139                                );
1140                            }
1141                            break; // Found adjacency data for this vid, no need to try other labels
1142                        }
1143                    }
1144
1145                    // 2. L1: Delta
1146                    let delta_ds = self.delta_dataset(etype_name, dir_str)?;
1147                    let delta_entries = delta_ds
1148                        .read_deltas_lancedb(
1149                            lancedb_store,
1150                            vid,
1151                            &schema,
1152                            self.version_high_water_mark(),
1153                        )
1154                        .await?;
1155                    Self::apply_delta_to_edges(&mut edges, delta_entries, neighbor_is_dst);
1156
1157                    // 3. L0: Buffer
1158                    if let Some(l0) = l0 {
1159                        Self::apply_l0_to_edges(&mut edges, l0, vid, etype_id, direction);
1160                    }
1161
1162                    // Add resulting edges to graph
1163                    Self::add_edges_to_graph(
1164                        &mut graph,
1165                        edges,
1166                        vid,
1167                        etype_id,
1168                        neighbor_is_dst,
1169                        &visited,
1170                        &mut next_frontier,
1171                    );
1172                }
1173            }
1174            frontier = next_frontier.into_iter().collect();
1175
1176            // Early termination: if frontier is empty, no more vertices to explore
1177            if frontier.is_empty() {
1178                break;
1179            }
1180        }
1181
1182        Ok(graph)
1183    }
1184
1185    /// Apply delta entries to edge state map, handling version conflicts.
1186    fn apply_delta_to_edges(
1187        edges: &mut HashMap<Eid, EdgeState>,
1188        delta_entries: Vec<crate::storage::delta::L1Entry>,
1189        neighbor_is_dst: bool,
1190    ) {
1191        for entry in delta_entries {
1192            let neighbor = if neighbor_is_dst {
1193                entry.dst_vid
1194            } else {
1195                entry.src_vid
1196            };
1197            let current_ver = edges.get(&entry.eid).map(|s| s.version).unwrap_or(0);
1198
1199            if entry.version > current_ver {
1200                edges.insert(
1201                    entry.eid,
1202                    EdgeState {
1203                        neighbor,
1204                        version: entry.version,
1205                        deleted: matches!(entry.op, Op::Delete),
1206                    },
1207                );
1208            }
1209        }
1210    }
1211
1212    /// Apply L0 buffer edges and tombstones to edge state map.
1213    fn apply_l0_to_edges(
1214        edges: &mut HashMap<Eid, EdgeState>,
1215        l0: &L0Buffer,
1216        vid: Vid,
1217        etype_id: u32,
1218        direction: GraphDirection,
1219    ) {
1220        let l0_neighbors = l0.get_neighbors(vid, etype_id, direction);
1221        for (neighbor, eid, ver) in l0_neighbors {
1222            let current_ver = edges.get(&eid).map(|s| s.version).unwrap_or(0);
1223            if ver > current_ver {
1224                edges.insert(
1225                    eid,
1226                    EdgeState {
1227                        neighbor,
1228                        version: ver,
1229                        deleted: false,
1230                    },
1231                );
1232            }
1233        }
1234
1235        // Check tombstones in L0
1236        for (eid, state) in edges.iter_mut() {
1237            if l0.is_tombstoned(*eid) {
1238                state.deleted = true;
1239            }
1240        }
1241    }
1242
1243    /// Add non-deleted edges to graph and collect next frontier.
1244    fn add_edges_to_graph(
1245        graph: &mut WorkingGraph,
1246        edges: HashMap<Eid, EdgeState>,
1247        vid: Vid,
1248        etype_id: u32,
1249        neighbor_is_dst: bool,
1250        visited: &HashSet<Vid>,
1251        next_frontier: &mut HashSet<Vid>,
1252    ) {
1253        for (eid, state) in edges {
1254            if state.deleted {
1255                continue;
1256            }
1257            graph.add_vertex(state.neighbor);
1258
1259            if !visited.contains(&state.neighbor) {
1260                next_frontier.insert(state.neighbor);
1261            }
1262
1263            if neighbor_is_dst {
1264                graph.add_edge(vid, state.neighbor, eid, etype_id);
1265            } else {
1266                graph.add_edge(state.neighbor, vid, eid, etype_id);
1267            }
1268        }
1269    }
1270}
1271
1272/// Extracts `(Vid, f32)` pairs from record batches using the given VID and score column names.
1273fn extract_vid_score_pairs(
1274    batches: &[arrow_array::RecordBatch],
1275    vid_column: &str,
1276    score_column: &str,
1277) -> Result<Vec<(Vid, f32)>> {
1278    let mut results = Vec::new();
1279    for batch in batches {
1280        let vid_col = batch
1281            .column_by_name(vid_column)
1282            .ok_or_else(|| anyhow!("Missing {} column", vid_column))?
1283            .as_any()
1284            .downcast_ref::<UInt64Array>()
1285            .ok_or_else(|| anyhow!("Invalid {} column type", vid_column))?;
1286
1287        let score_col = batch
1288            .column_by_name(score_column)
1289            .ok_or_else(|| anyhow!("Missing {} column", score_column))?
1290            .as_any()
1291            .downcast_ref::<Float32Array>()
1292            .ok_or_else(|| anyhow!("Invalid {} column type", score_column))?;
1293
1294        for i in 0..batch.num_rows() {
1295            results.push((Vid::from(vid_col.value(i)), score_col.value(i)));
1296        }
1297    }
1298    Ok(results)
1299}
1300
1301/// Extracts a `Vec<f32>` from a JSON property value.
1302///
1303/// Returns `None` if the property is missing, not an array, or contains
1304/// non-numeric elements.
1305fn extract_embedding_from_props(
1306    props: &uni_common::Properties,
1307    property: &str,
1308) -> Option<Vec<f32>> {
1309    let arr = props.get(property)?.as_array()?;
1310    arr.iter().map(|v| v.as_f64().map(|f| f as f32)).collect()
1311}
1312
1313/// Merges L0 buffer vertices into LanceDB vector search results.
1314///
1315/// Visits L0 buffers in precedence order (pending flush → main → transaction),
1316/// collects tombstoned VIDs and candidate embeddings, then merges them with the
1317/// existing LanceDB results so that:
1318/// - Tombstoned VIDs are removed (unless re-created in a later L0).
1319/// - VIDs present in both L0 and LanceDB use the L0 distance.
1320/// - New L0-only VIDs are appended.
1321/// - Results are re-sorted by distance ascending and truncated to `k`.
1322fn merge_l0_into_vector_results(
1323    results: &mut Vec<(Vid, f32)>,
1324    ctx: &QueryContext,
1325    label: &str,
1326    property: &str,
1327    query: &[f32],
1328    k: usize,
1329    metric: &DistanceMetric,
1330) {
1331    // Collect all L0 buffers in precedence order (earliest first, last writer wins).
1332    let mut buffers: Vec<Arc<parking_lot::RwLock<L0Buffer>>> =
1333        ctx.pending_flush_l0s.iter().map(Arc::clone).collect();
1334    buffers.push(Arc::clone(&ctx.l0));
1335    if let Some(ref txn) = ctx.transaction_l0 {
1336        buffers.push(Arc::clone(txn));
1337    }
1338
1339    // Maps VID → distance for L0 candidates (last writer wins).
1340    let mut l0_candidates: HashMap<Vid, f32> = HashMap::new();
1341    // Tombstoned VIDs across all L0 buffers.
1342    let mut tombstoned: HashSet<Vid> = HashSet::new();
1343
1344    for buf_arc in &buffers {
1345        let buf = buf_arc.read();
1346
1347        // Accumulate tombstones.
1348        for &vid in &buf.vertex_tombstones {
1349            tombstoned.insert(vid);
1350        }
1351
1352        // Scan vertices with the target label.
1353        for (&vid, labels) in &buf.vertex_labels {
1354            if !labels.iter().any(|l| l == label) {
1355                continue;
1356            }
1357            if let Some(props) = buf.vertex_properties.get(&vid)
1358                && let Some(emb) = extract_embedding_from_props(props, property)
1359            {
1360                if emb.len() != query.len() {
1361                    continue; // dimension mismatch
1362                }
1363                let dist = metric.compute_distance(&emb, query);
1364                // Last writer wins: later buffer overwrites earlier.
1365                l0_candidates.insert(vid, dist);
1366                // If re-created in a later L0, remove from tombstones.
1367                tombstoned.remove(&vid);
1368            }
1369        }
1370    }
1371
1372    // If no L0 activity affects this search, skip merge.
1373    if l0_candidates.is_empty() && tombstoned.is_empty() {
1374        return;
1375    }
1376
1377    // Remove tombstoned VIDs from LanceDB results.
1378    results.retain(|(vid, _)| !tombstoned.contains(vid));
1379
1380    // Overwrite or append L0 candidates.
1381    for (vid, dist) in &l0_candidates {
1382        if let Some(existing) = results.iter_mut().find(|(v, _)| v == vid) {
1383            existing.1 = *dist;
1384        } else {
1385            results.push((*vid, *dist));
1386        }
1387    }
1388
1389    // Re-sort by distance ascending.
1390    results.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
1391    results.truncate(k);
1392}