Skip to main content

uni_store/storage/
manager.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::backend::StorageBackend;
5#[cfg(feature = "lance-backend")]
6use crate::backend::lance::LanceDbBackend;
7use crate::backend::table_names;
8use crate::backend::types::ScanRequest;
9use crate::compaction::{CompactionStats, CompactionStatus, CompactionTask};
10use crate::runtime::WorkingGraph;
11use crate::runtime::context::QueryContext;
12use crate::runtime::l0::L0Buffer;
13use crate::storage::adjacency::AdjacencyDataset;
14use crate::storage::compaction::Compactor;
15use crate::storage::delta::{DeltaDataset, ENTRY_SIZE_ESTIMATE, Op};
16use crate::storage::direction::Direction;
17#[cfg(feature = "lance-backend")]
18use crate::storage::edge::EdgeDataset;
19#[cfg(feature = "lance-backend")]
20use crate::storage::index::UidIndex;
21#[cfg(feature = "lance-backend")]
22use crate::storage::inverted_index::InvertedIndex;
23use crate::storage::main_edge::MainEdgeDataset;
24use crate::storage::main_vertex::MainVertexDataset;
25use crate::storage::vertex::VertexDataset;
26use anyhow::{Result, anyhow};
27use arrow_array::{Array, Float32Array, TimestampNanosecondArray, UInt64Array};
28use object_store::ObjectStore;
29#[cfg(feature = "lance-backend")]
30use object_store::local::LocalFileSystem;
31use parking_lot::RwLock;
32use std::collections::{HashMap, HashSet};
33use std::sync::{Arc, Mutex};
34use std::time::{Duration, SystemTime, UNIX_EPOCH};
35use tracing::warn;
36use uni_common::config::UniConfig;
37#[cfg(feature = "lance-backend")]
38use uni_common::core::id::UniId;
39use uni_common::core::id::{Eid, Vid};
40#[cfg(feature = "lance-backend")]
41use uni_common::core::schema::IndexDefinition;
42use uni_common::core::schema::{DistanceMetric, SchemaManager};
43use uni_common::sync::acquire_mutex;
44
45use crate::snapshot::manager::SnapshotManager;
46use crate::storage::IndexManager;
47use crate::storage::adjacency_manager::AdjacencyManager;
48use crate::storage::resilient_store::ResilientObjectStore;
49
50use uni_common::core::snapshot::SnapshotManifest;
51
52use uni_common::graph::simple_graph::Direction as GraphDirection;
53
54/// Edge state during subgraph loading - tracks version and deletion status.
55struct EdgeState {
56    neighbor: Vid,
57    version: u64,
58    deleted: bool,
59}
60
61pub struct StorageManager {
62    base_uri: String,
63    store: Arc<dyn ObjectStore>,
64    schema_manager: Arc<SchemaManager>,
65    snapshot_manager: Arc<SnapshotManager>,
66    adjacency_manager: Arc<AdjacencyManager>,
67    pub config: UniConfig,
68    pub compaction_status: Arc<Mutex<CompactionStatus>>,
69    /// Counter of in-flight `flush_to_l1` operations. Compaction skips
70    /// delta-clear when this is non-zero to avoid wiping rows a flush is
71    /// about to append. Counter (not bool) so multiple async flushes can
72    /// be in flight concurrently.
73    pub flush_in_progress: std::sync::atomic::AtomicUsize,
74    /// Optional pinned snapshot for time-travel
75    pinned_snapshot: Option<SnapshotManifest>,
76    /// Optional fork scope for branch-aware reads (Phase 1 read-only).
77    ///
78    /// Mutually exclusive with `pinned_snapshot`: a single
79    /// `StorageManager` is either pinned to a snapshot or scoped to a
80    /// fork, never both. Phase 4's `pin_to_version` on a forked session
81    /// builds a separate combined manager out of band; Phase 1 forbids
82    /// mixing.
83    fork_scope: Option<Arc<crate::fork::ForkScope>>,
84    /// Pluggable storage backend.
85    backend: Arc<dyn StorageBackend>,
86    /// In-memory VID-to-labels index for O(1) lookups (optional, configurable)
87    vid_labels_index: Option<Arc<parking_lot::RwLock<crate::storage::vid_labels::VidLabelsIndex>>>,
88}
89
90/// RAII counter increment for `StorageManager.flush_in_progress`.
91///
92/// Acquired during the rotate phase of a flush (see
93/// `runtime::writer::flush_l0_rotate`) and dropped when the full
94/// rotate/stream/finalize pipeline completes. Compaction's delta-clear
95/// gate skips while this counter is non-zero, so the counter must
96/// reflect "flush has started, has not completed" — including any
97/// async stream phase running on a spawned task.
98pub struct FlushInProgressGuard {
99    storage: Arc<StorageManager>,
100}
101
102impl FlushInProgressGuard {
103    pub fn new(storage: &Arc<StorageManager>) -> Self {
104        storage
105            .flush_in_progress
106            .fetch_add(1, std::sync::atomic::Ordering::AcqRel);
107        Self {
108            storage: storage.clone(),
109        }
110    }
111}
112
113impl Drop for FlushInProgressGuard {
114    fn drop(&mut self) {
115        // M-PANIC-IS-STOP: must not panic in Drop. Atomic op cannot fail.
116        self.storage
117            .flush_in_progress
118            .fetch_sub(1, std::sync::atomic::Ordering::AcqRel);
119    }
120}
121
122/// Whether a Lance error represents a commit conflict that retrying may
123/// resolve. These fire under async-flush when ≥2 streams concurrently try
124/// to create the same table OR when an Append races with a still-in-progress
125/// Overwrite (create_table). See the Lance commit-conflict-resolver in
126/// `lance-3.0.1/src/io/commit/conflict_resolver.rs`.
127fn is_lance_conflict(err: &anyhow::Error) -> bool {
128    let msg = err.to_string();
129    msg.contains("Incompatible transaction") || msg.contains("conflict")
130}
131
132/// Runs `op` with exponential-backoff retry on Lance commit conflicts.
133/// Up to 10 attempts (~10s worst case); backoff is 1ms, 2ms, 4ms, ...,
134/// 512ms. Non-conflict errors return immediately. `op` is re-invoked each
135/// attempt so it can re-check table existence and adjust strategy.
136async fn retry_on_lance_conflict<F, Fut>(mut op: F) -> anyhow::Result<()>
137where
138    F: FnMut() -> Fut,
139    Fut: std::future::Future<Output = anyhow::Result<()>>,
140{
141    for attempt in 0u32..10 {
142        match op().await {
143            Ok(()) => return Ok(()),
144            Err(e) => {
145                if !is_lance_conflict(&e) || attempt == 9 {
146                    return Err(e);
147                }
148                let backoff_ms = 1u64 << attempt;
149                tokio::time::sleep(std::time::Duration::from_millis(backoff_ms)).await;
150            }
151        }
152    }
153    unreachable!("retry loop exits via Ok or Err")
154}
155
156/// MergeInsert sibling of `write_batch_with_lance_conflict_retry`.
157///
158/// Source `batch` must contain the join columns in `on` plus any
159/// columns to update. Matched rows have `WhenMatched::UpdateAll`
160/// applied; unmatched source rows are dropped (partial writes never
161/// INSERT). Returns an error if the target table does not exist.
162/// Retries on Lance commit conflicts via `retry_on_lance_conflict`.
163/// RecordBatch clones are cheap (column data is Arc'd).
164pub async fn merge_insert_batch_with_lance_conflict_retry(
165    backend: &dyn crate::backend::StorageBackend,
166    table_name: &str,
167    batch: arrow_array::RecordBatch,
168    on: &[&str],
169) -> anyhow::Result<()> {
170    retry_on_lance_conflict(|| async {
171        let exists = backend.table_exists(table_name).await?;
172        if !exists {
173            anyhow::bail!(
174                "merge_insert target table '{}' does not exist (partial writes \
175                 require the row to already be present; CREATE goes through Append)",
176                table_name
177            );
178        }
179        backend
180            .merge_insert(table_name, on, vec![batch.clone()])
181            .await
182    })
183    .await
184}
185
186/// Race-safe write: creates the table if missing, otherwise appends.
187/// Each attempt re-checks `table_exists` and adjusts strategy: Append if
188/// now-exists, Create if still-missing. Retries on Lance commit conflicts
189/// via `retry_on_lance_conflict`.
190///
191/// Used by every dataset's `write_batch` helper to absorb the Lance
192/// commit-conflict-resolver behavior. RecordBatch clones are cheap
193/// (column data is Arc'd).
194pub async fn write_batch_with_lance_conflict_retry(
195    backend: &dyn crate::backend::StorageBackend,
196    table_name: &str,
197    batch: arrow_array::RecordBatch,
198) -> anyhow::Result<()> {
199    use crate::backend::types::WriteMode;
200    retry_on_lance_conflict(|| async {
201        let exists = backend.table_exists(table_name).await?;
202        if exists {
203            backend
204                .write(table_name, vec![batch.clone()], WriteMode::Append)
205                .await
206        } else {
207            backend.create_table(table_name, vec![batch.clone()]).await
208        }
209    })
210    .await
211}
212
213/// Helper to manage compaction_in_progress flag
214struct CompactionGuard {
215    status: Arc<Mutex<CompactionStatus>>,
216}
217
218impl CompactionGuard {
219    fn new(status: Arc<Mutex<CompactionStatus>>) -> Option<Self> {
220        let mut s = acquire_mutex(&status, "compaction_status").ok()?;
221        if s.compaction_in_progress {
222            return None;
223        }
224        s.compaction_in_progress = true;
225        Some(Self {
226            status: status.clone(),
227        })
228    }
229}
230
231impl Drop for CompactionGuard {
232    fn drop(&mut self) {
233        // CRITICAL: Never panic in Drop - panicking in drop() = process ABORT.
234        // See issue #18/#150. If the lock is poisoned, log and continue gracefully.
235        match uni_common::sync::acquire_mutex(&self.status, "compaction_status") {
236            Ok(mut s) => {
237                s.compaction_in_progress = false;
238                s.last_compaction = Some(std::time::SystemTime::now());
239            }
240            Err(e) => {
241                // Lock is poisoned but we're in Drop - cannot panic.
242                // Log the error and continue. System state may be inconsistent but at least
243                // we don't abort the process.
244                log::error!(
245                    "CompactionGuard drop failed to acquire poisoned lock: {}. \
246                     Compaction status may be inconsistent. Issue #18/#150",
247                    e
248                );
249            }
250        }
251    }
252}
253
254impl StorageManager {
255    /// Create a new StorageManager with a pre-configured backend.
256    pub async fn new_with_backend(
257        base_uri: &str,
258        store: Arc<dyn ObjectStore>,
259        backend: Arc<dyn StorageBackend>,
260        schema_manager: Arc<SchemaManager>,
261        config: UniConfig,
262    ) -> Result<Self> {
263        let resilient_store: Arc<dyn ObjectStore> = Arc::new(ResilientObjectStore::new(
264            store,
265            config.object_store.clone(),
266        ));
267
268        let snapshot_manager = Arc::new(SnapshotManager::new(resilient_store.clone()));
269
270        // Perform crash recovery for all known table patterns
271        Self::recover_all_staging_tables(backend.as_ref(), &schema_manager).await?;
272
273        let mut sm = Self {
274            base_uri: base_uri.to_string(),
275            store: resilient_store,
276            schema_manager,
277            snapshot_manager,
278            adjacency_manager: Arc::new(AdjacencyManager::new(config.cache_size)),
279            config,
280            compaction_status: Arc::new(Mutex::new(CompactionStatus::default())),
281            flush_in_progress: std::sync::atomic::AtomicUsize::new(0),
282            pinned_snapshot: None,
283            fork_scope: None,
284            backend,
285            vid_labels_index: None,
286        };
287
288        // Rebuild VidLabelsIndex if enabled
289        if sm.config.enable_vid_labels_index
290            && let Err(e) = sm.rebuild_vid_labels_index().await
291        {
292            warn!(
293                "Failed to rebuild VidLabelsIndex on startup: {}. Falling back to storage queries.",
294                e
295            );
296        }
297
298        Ok(sm)
299    }
300
301    /// Create a new StorageManager with LanceDB integration.
302    #[cfg(feature = "lance-backend")]
303    pub async fn new(base_uri: &str, schema_manager: Arc<SchemaManager>) -> Result<Self> {
304        Self::new_with_config(base_uri, schema_manager, UniConfig::default()).await
305    }
306
307    /// Create a new StorageManager with custom cache size.
308    #[cfg(feature = "lance-backend")]
309    pub async fn new_with_cache(
310        base_uri: &str,
311        schema_manager: Arc<SchemaManager>,
312        adjacency_cache_size: usize,
313    ) -> Result<Self> {
314        let config = UniConfig {
315            cache_size: adjacency_cache_size,
316            ..Default::default()
317        };
318        Self::new_with_config(base_uri, schema_manager, config).await
319    }
320
321    /// Create a new StorageManager with custom configuration.
322    #[cfg(feature = "lance-backend")]
323    pub async fn new_with_config(
324        base_uri: &str,
325        schema_manager: Arc<SchemaManager>,
326        config: UniConfig,
327    ) -> Result<Self> {
328        let store = Self::build_store_from_uri(base_uri)?;
329        Self::new_with_store_and_config(base_uri, store, schema_manager, config).await
330    }
331
332    /// Create a new StorageManager using an already-constructed object store.
333    #[cfg(feature = "lance-backend")]
334    pub async fn new_with_store_and_config(
335        base_uri: &str,
336        store: Arc<dyn ObjectStore>,
337        schema_manager: Arc<SchemaManager>,
338        config: UniConfig,
339    ) -> Result<Self> {
340        Self::new_with_store_and_storage_options(base_uri, store, schema_manager, config, None)
341            .await
342    }
343
344    /// Create a new StorageManager with LanceDB storage options.
345    #[cfg(feature = "lance-backend")]
346    pub async fn new_with_store_and_storage_options(
347        base_uri: &str,
348        store: Arc<dyn ObjectStore>,
349        schema_manager: Arc<SchemaManager>,
350        config: UniConfig,
351        lancedb_storage_options: Option<HashMap<String, String>>,
352    ) -> Result<Self> {
353        let backend = Arc::new(LanceDbBackend::connect(base_uri, lancedb_storage_options).await?);
354        Self::new_with_backend(base_uri, store, backend, schema_manager, config).await
355    }
356
357    /// Recover all staging tables for known table patterns.
358    ///
359    /// This runs on startup to handle crash recovery. It checks for staging tables
360    /// for all vertex labels, adjacency tables, delta tables, and main tables.
361    async fn recover_all_staging_tables(
362        backend: &dyn StorageBackend,
363        schema_manager: &SchemaManager,
364    ) -> Result<()> {
365        let schema = schema_manager.schema();
366
367        // Recover main vertex and edge tables
368        backend
369            .recover_staging(table_names::main_vertex_table_name())
370            .await?;
371        backend
372            .recover_staging(table_names::main_edge_table_name())
373            .await?;
374
375        // Recover per-label vertex tables
376        for label in schema.labels.keys() {
377            let name = table_names::vertex_table_name(label);
378            backend.recover_staging(&name).await?;
379        }
380
381        // Recover adjacency and delta tables for each edge type and direction
382        for edge_type in schema.edge_types.keys() {
383            for direction in &["fwd", "bwd"] {
384                // Recover delta tables
385                let delta_name = table_names::delta_table_name(edge_type, direction);
386                backend.recover_staging(&delta_name).await?;
387
388                // Recover adjacency tables for each label
389                for _label in schema.labels.keys() {
390                    let adj_name = table_names::adjacency_table_name(edge_type, direction);
391                    backend.recover_staging(&adj_name).await?;
392                }
393            }
394        }
395
396        Ok(())
397    }
398
399    #[cfg(feature = "lance-backend")]
400    fn build_store_from_uri(base_uri: &str) -> Result<Arc<dyn ObjectStore>> {
401        if base_uri.contains("://") {
402            let parsed = url::Url::parse(base_uri).map_err(|e| anyhow!("Invalid base URI: {e}"))?;
403            let (store, _path) = object_store::parse_url(&parsed)
404                .map_err(|e| anyhow!("Failed to parse object store URL: {e}"))?;
405            Ok(Arc::from(store))
406        } else {
407            // If local path, ensure it exists.
408            std::fs::create_dir_all(base_uri)?;
409            Ok(Arc::new(LocalFileSystem::new_with_prefix(base_uri)?))
410        }
411    }
412
413    pub fn pinned(&self, snapshot: SnapshotManifest) -> Self {
414        // Phase 4a: pinning a forked session is now supported. The
415        // resulting StorageManager keeps `fork_scope` so reads continue
416        // to route through the fork's Lance branches via `base_paths`,
417        // and adds `pinned_snapshot` so writers / writers' read views
418        // resolve at the snapshot's HWM. Writes are gated separately by
419        // the session-level `is_pinned` check (`Session::tx` rejects
420        // them via `UniError::ReadOnly`).
421        Self {
422            base_uri: self.base_uri.clone(),
423            store: self.store.clone(),
424            schema_manager: self.schema_manager.clone(),
425            snapshot_manager: self.snapshot_manager.clone(),
426            // Separate AdjacencyManager for snapshot isolation (Issue #73):
427            // warm() will load only edges visible at the snapshot's HWM.
428            // This prevents live DB's CSR (with all edges) from leaking into snapshots.
429            adjacency_manager: Arc::new(AdjacencyManager::new(self.adjacency_manager.max_bytes())),
430            config: self.config.clone(),
431            compaction_status: Arc::new(Mutex::new(CompactionStatus::default())),
432            flush_in_progress: std::sync::atomic::AtomicUsize::new(0),
433            pinned_snapshot: Some(snapshot),
434            fork_scope: self.fork_scope.clone(),
435            backend: self.backend.clone(),
436            vid_labels_index: self.vid_labels_index.clone(),
437        }
438    }
439
440    /// Construct a fork-scoped clone of this `StorageManager`.
441    ///
442    /// All reads through dataset factories *and* through `backend()`
443    /// on the returned manager route through the fork's Lance branches
444    /// via `base_paths`. The `AdjacencyManager` is fresh (per Issue
445    /// #73 reasoning — same as `pinned`) to prevent primary's CSR from
446    /// leaking into the fork. `fork_scope` and `pinned_snapshot` are
447    /// mutually exclusive.
448    ///
449    /// The backend is wrapped in [`crate::backend::branched::BranchedBackend`]
450    /// so that every `ScanRequest` constructed *anywhere* (PropertyManager,
451    /// MainVertexDataset static methods, etc.) automatically picks up
452    /// the fork's branch for tables the fork has branched. Untracked
453    /// tables fall back to primary, matching Phase 1 read semantics.
454    pub fn at_fork(&self, scope: Arc<crate::fork::ForkScope>) -> Self {
455        self.at_fork_with_schema(scope, self.schema_manager.clone())
456    }
457
458    /// Variant of [`Self::at_fork`] that uses an explicit
459    /// `merged_schema` for the fork's storage rather than primary's
460    /// schema_manager. Used by `UniInner::at_fork` so that the
461    /// fork-side strict-schema checks (in `uni-query` / `uni-store`'s
462    /// writer) see fork-local labels and edge types added through
463    /// `Session::fork_schema()`. Without this, those checks would
464    /// route through primary's schema and reject fork-local labels.
465    pub fn at_fork_with_schema(
466        &self,
467        scope: Arc<crate::fork::ForkScope>,
468        merged_schema: Arc<SchemaManager>,
469    ) -> Self {
470        debug_assert!(
471            self.pinned_snapshot.is_none(),
472            "forking a pinned StorageManager is unsupported in Phase 1"
473        );
474        let branched_backend: Arc<dyn StorageBackend> = Arc::new(
475            crate::backend::branched::BranchedBackend::new(self.backend.clone(), scope.clone()),
476        );
477        Self {
478            base_uri: self.base_uri.clone(),
479            store: self.store.clone(),
480            schema_manager: merged_schema,
481            snapshot_manager: self.snapshot_manager.clone(),
482            adjacency_manager: Arc::new(AdjacencyManager::new(self.adjacency_manager.max_bytes())),
483            config: self.config.clone(),
484            compaction_status: Arc::new(Mutex::new(CompactionStatus::default())),
485            flush_in_progress: std::sync::atomic::AtomicUsize::new(0),
486            pinned_snapshot: None,
487            fork_scope: Some(scope),
488            backend: branched_backend,
489            vid_labels_index: self.vid_labels_index.clone(),
490        }
491    }
492
493    /// Borrow the active fork scope, if any.
494    pub fn fork_scope(&self) -> Option<&Arc<crate::fork::ForkScope>> {
495        self.fork_scope.as_ref()
496    }
497
498    /// Phase 5a: query whether a fork-local index exists for the
499    /// `(label, column)` pair on the active fork scope. Returns
500    /// `None` outside a fork or when no fork-local build has
501    /// completed for that pair.
502    ///
503    /// The planner consults this to decide whether to emit
504    /// `FusedIndexScan` (returns `Some`) or fall back to the
505    /// inherited primary index via `base_paths` (returns `None`).
506    /// The lookup is a `DashMap::get` on `ForkScope` — O(1) and
507    /// safe to call per query without caching above this layer.
508    #[must_use]
509    pub fn fork_index_exists(
510        &self,
511        label: &str,
512        column: &str,
513    ) -> Option<crate::fork::ForkLocalIndexKind> {
514        self.fork_scope
515            .as_ref()
516            .and_then(|s| s.fork_local_index(label, column))
517    }
518
519    /// Base URI for this storage manager (the directory or remote
520    /// prefix under which dataset directories live).
521    pub fn base_uri(&self) -> &str {
522        &self.base_uri
523    }
524
525    pub fn get_edge_version_by_id(&self, edge_type_id: u32) -> Option<u64> {
526        let schema = self.schema_manager.schema();
527        let name = schema.edge_type_name_by_id(edge_type_id)?;
528        self.pinned_snapshot
529            .as_ref()
530            .and_then(|s| s.edges.get(name).map(|es| es.lance_version))
531    }
532
533    /// Returns the version_high_water_mark from the pinned snapshot if present.
534    ///
535    /// This is used for time-travel queries to filter data by version.
536    /// When a snapshot is pinned, only rows with `_version <= version_high_water_mark`
537    /// should be considered visible.
538    pub fn version_high_water_mark(&self) -> Option<u64> {
539        self.pinned_snapshot
540            .as_ref()
541            .map(|s| s.version_high_water_mark)
542    }
543
544    /// Apply version filtering to a base filter expression.
545    ///
546    /// If a snapshot is pinned, wraps `base_filter` with an additional
547    /// `_version <= hwm` clause. Otherwise returns `base_filter` unchanged.
548    pub fn apply_version_filter(&self, base_filter: String) -> String {
549        if let Some(hwm) = self.version_high_water_mark() {
550            format!("({}) AND (_version <= {})", base_filter, hwm)
551        } else {
552            base_filter
553        }
554    }
555
556    /// Build a filter expression that excludes soft-deleted rows and optionally
557    /// includes a user-provided filter.
558    fn build_active_filter(user_filter: Option<&str>) -> String {
559        match user_filter {
560            Some(expr) => format!("({}) AND (_deleted = false)", expr),
561            None => "_deleted = false".to_string(),
562        }
563    }
564
565    pub fn store(&self) -> Arc<dyn ObjectStore> {
566        self.store.clone()
567    }
568
569    /// Get current compaction status.
570    ///
571    /// # Errors
572    ///
573    /// Returns error if the compaction status lock is poisoned (see issue #18/#150).
574    pub fn compaction_status(
575        &self,
576    ) -> Result<CompactionStatus, uni_common::sync::LockPoisonedError> {
577        let guard = uni_common::sync::acquire_mutex(&self.compaction_status, "compaction_status")?;
578        Ok(guard.clone())
579    }
580
581    pub async fn compact(&self) -> Result<CompactionStats> {
582        // Backend handles compaction internally via optimize_table()
583        let start = std::time::Instant::now();
584        let schema = self.schema_manager.schema();
585        let mut files_compacted = 0;
586
587        for label in schema.labels.keys() {
588            let name = table_names::vertex_table_name(label);
589            if self.backend.table_exists(&name).await? {
590                self.backend.optimize_table(&name).await?;
591                files_compacted += 1;
592                self.backend.invalidate_cache(&name);
593            }
594        }
595
596        Ok(CompactionStats {
597            files_compacted,
598            bytes_before: 0,
599            bytes_after: 0,
600            duration: start.elapsed(),
601            crdt_merges: 0,
602        })
603    }
604
605    pub async fn compact_label(&self, label: &str) -> Result<CompactionStats> {
606        let _guard = CompactionGuard::new(self.compaction_status.clone())
607            .ok_or_else(|| anyhow!("Compaction already in progress"))?;
608
609        let start = std::time::Instant::now();
610        let name = table_names::vertex_table_name(label);
611
612        if self.backend.table_exists(&name).await? {
613            self.backend.optimize_table(&name).await?;
614            self.backend.invalidate_cache(&name);
615        }
616
617        Ok(CompactionStats {
618            files_compacted: 1,
619            bytes_before: 0,
620            bytes_after: 0,
621            duration: start.elapsed(),
622            crdt_merges: 0,
623        })
624    }
625
626    pub async fn compact_edge_type(&self, edge_type: &str) -> Result<CompactionStats> {
627        let _guard = CompactionGuard::new(self.compaction_status.clone())
628            .ok_or_else(|| anyhow!("Compaction already in progress"))?;
629
630        let start = std::time::Instant::now();
631        let mut files_compacted = 0;
632
633        for dir in ["fwd", "bwd"] {
634            let name = table_names::delta_table_name(edge_type, dir);
635            if self.backend.table_exists(&name).await? {
636                self.backend.optimize_table(&name).await?;
637                files_compacted += 1;
638            }
639        }
640
641        Ok(CompactionStats {
642            files_compacted,
643            bytes_before: 0,
644            bytes_after: 0,
645            duration: start.elapsed(),
646            crdt_merges: 0,
647        })
648    }
649
650    pub async fn wait_for_compaction(&self) -> Result<()> {
651        loop {
652            let in_progress = {
653                acquire_mutex(&self.compaction_status, "compaction_status")?.compaction_in_progress
654            };
655            if !in_progress {
656                return Ok(());
657            }
658            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
659        }
660    }
661
662    pub fn start_background_compaction(
663        self: Arc<Self>,
664        mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
665    ) -> tokio::task::JoinHandle<()> {
666        if !self.config.compaction.enabled {
667            return tokio::spawn(async {});
668        }
669
670        tokio::spawn(async move {
671            // Use interval_at to delay the first tick. tokio::time::interval fires
672            // immediately on the first tick, which can race with queries that run
673            // right after database open. Delaying by the check_interval gives
674            // initial queries time to complete before compaction modifies tables
675            // (optimize(All) can GC index files that concurrent queries depend on).
676            let start = tokio::time::Instant::now() + self.config.compaction.check_interval;
677            let mut interval =
678                tokio::time::interval_at(start, self.config.compaction.check_interval);
679
680            loop {
681                tokio::select! {
682                    _ = interval.tick() => {
683                        if let Err(e) = self.update_compaction_status().await {
684                            log::error!("Failed to update compaction status: {}", e);
685                            continue;
686                        }
687
688                        if let Some(task) = self.pick_compaction_task() {
689                            log::info!("Triggering background compaction: {:?}", task);
690                            if let Err(e) = Self::execute_compaction(Arc::clone(&self), task).await {
691                                log::error!("Compaction failed: {}", e);
692                            }
693                        }
694                    }
695                    _ = shutdown_rx.recv() => {
696                        log::info!("Background compaction shutting down");
697                        let _ = self.wait_for_compaction().await;
698                        break;
699                    }
700                }
701            }
702        })
703    }
704
705    async fn update_compaction_status(&self) -> Result<()> {
706        let schema = self.schema_manager.schema();
707        let backend = self.backend.as_ref();
708        let mut total_rows: usize = 0;
709        let mut oldest_ts: Option<i64> = None;
710
711        for name in schema.edge_types.keys() {
712            for dir in ["fwd", "bwd"] {
713                let tbl_name = table_names::delta_table_name(name, dir);
714                if !backend.table_exists(&tbl_name).await.unwrap_or(false) {
715                    continue;
716                }
717                let row_count = backend.count_rows(&tbl_name, None).await.unwrap_or(0);
718                if row_count == 0 {
719                    continue;
720                }
721                total_rows += row_count;
722
723                // Query oldest _created_at for age tracking
724                let request =
725                    ScanRequest::all(&tbl_name).with_columns(vec!["_created_at".to_string()]);
726                let Ok(batches) = backend.scan(request).await else {
727                    continue;
728                };
729                for batch in batches {
730                    let Some(col) = batch
731                        .column_by_name("_created_at")
732                        .and_then(|c| c.as_any().downcast_ref::<TimestampNanosecondArray>())
733                    else {
734                        continue;
735                    };
736                    for i in 0..col.len() {
737                        if !col.is_null(i) {
738                            let ts = col.value(i);
739                            oldest_ts = Some(oldest_ts.map_or(ts, |prev| prev.min(ts)));
740                        }
741                    }
742                }
743            }
744        }
745
746        let oldest_l1_age = oldest_ts
747            .and_then(|ts| {
748                let created = UNIX_EPOCH + Duration::from_nanos(ts as u64);
749                SystemTime::now().duration_since(created).ok()
750            })
751            .unwrap_or(Duration::ZERO);
752
753        let mut status = acquire_mutex(&self.compaction_status, "compaction_status")?;
754        // Note: l1_runs is managed by flush_to_l1 (increment) and execute_compaction
755        // (reset). It counts flush generations, not delta table count.
756        status.l1_size_bytes = (total_rows * ENTRY_SIZE_ESTIMATE) as u64;
757        status.oldest_l1_age = oldest_l1_age;
758        Ok(())
759    }
760
761    fn pick_compaction_task(&self) -> Option<CompactionTask> {
762        let status = acquire_mutex(&self.compaction_status, "compaction_status").ok()?;
763
764        if status.l1_runs >= self.config.compaction.max_l1_runs {
765            return Some(CompactionTask::ByRunCount);
766        }
767        if status.l1_size_bytes >= self.config.compaction.max_l1_size_bytes {
768            return Some(CompactionTask::BySize);
769        }
770        if status.oldest_l1_age >= self.config.compaction.max_l1_age
771            && status.oldest_l1_age > Duration::ZERO
772        {
773            return Some(CompactionTask::ByAge);
774        }
775
776        None
777    }
778
779    /// Optimize a table via the backend, returning `true` on success.
780    async fn try_optimize_table(backend: &dyn StorageBackend, table_name: &str) -> bool {
781        if let Err(e) = backend.optimize_table(table_name).await {
782            log::warn!("Failed to optimize table {}: {}", table_name, e);
783            return false;
784        }
785        true
786    }
787
788    /// Trigger L1 compaction asynchronously without blocking the caller.
789    /// Safe to call frequently — CompactionGuard prevents concurrent runs.
790    pub fn trigger_async_compaction(self: &Arc<Self>) {
791        let this = Arc::clone(self);
792        tokio::spawn(async move {
793            if let Err(e) = Self::execute_compaction(this, CompactionTask::ByRunCount).await {
794                // "Compaction already in progress" is expected when called frequently
795                log::debug!("Post-flush compaction skipped: {}", e);
796            }
797        });
798    }
799
800    pub(crate) async fn execute_compaction(
801        this: Arc<Self>,
802        _task: CompactionTask,
803    ) -> Result<CompactionStats> {
804        let start = std::time::Instant::now();
805        let _guard = CompactionGuard::new(this.compaction_status.clone())
806            .ok_or_else(|| anyhow!("Compaction already in progress"))?;
807
808        let schema = this.schema_manager.schema();
809        let mut files_compacted = 0;
810
811        // ── Tier 2: Semantic compaction ──
812        // Dedup vertices, merge CRDTs, consolidate L1→L2 deltas, clean tombstones
813        let compactor = Compactor::new(Arc::clone(&this));
814        let compaction_results = compactor.compact_all().await.unwrap_or_else(|e| {
815            log::error!(
816                "Semantic compaction failed (continuing with backend optimize): {}",
817                e
818            );
819            Vec::new()
820        });
821
822        // Re-warm adjacency CSR after semantic compaction
823        let am = this.adjacency_manager();
824        for info in &compaction_results {
825            let direction = match info.direction.as_str() {
826                "fwd" => Direction::Outgoing,
827                "bwd" => Direction::Incoming,
828                _ => continue,
829            };
830            if let Some(etid) = schema.edge_type_id_unified_case_insensitive(&info.edge_type)
831                && let Err(e) = am.warm(&this, etid, direction, None).await
832            {
833                log::warn!(
834                    "Failed to re-warm adjacency for {}/{}: {}",
835                    info.edge_type,
836                    info.direction,
837                    e
838                );
839            }
840        }
841
842        // ── Tier 3: Backend optimize ──
843        let backend = this.backend.as_ref();
844
845        // Optimize edge delta and adjacency tables
846        for name in schema.edge_types.keys() {
847            for dir in ["fwd", "bwd"] {
848                let delta = table_names::delta_table_name(name, dir);
849                if Self::try_optimize_table(backend, &delta).await {
850                    files_compacted += 1;
851                }
852                let adj = table_names::adjacency_table_name(name, dir);
853                if Self::try_optimize_table(backend, &adj).await {
854                    files_compacted += 1;
855                }
856            }
857        }
858
859        // Optimize vertex tables
860        for label in schema.labels.keys() {
861            let tbl = table_names::vertex_table_name(label);
862            if Self::try_optimize_table(backend, &tbl).await {
863                files_compacted += 1;
864                backend.invalidate_cache(&tbl);
865            }
866        }
867
868        // Optimize main vertex and edge tables
869        for tbl in [
870            table_names::main_vertex_table_name(),
871            table_names::main_edge_table_name(),
872        ] {
873            if Self::try_optimize_table(backend, tbl).await {
874                files_compacted += 1;
875            }
876        }
877
878        {
879            let mut status = acquire_mutex(&this.compaction_status, "compaction_status")?;
880            status.total_compactions += 1;
881            status.l1_runs = 0; // Reset flush generation counter
882        }
883
884        Ok(CompactionStats {
885            files_compacted,
886            bytes_before: 0,
887            bytes_after: 0,
888            duration: start.elapsed(),
889            crdt_merges: 0,
890        })
891    }
892
893    /// Open a LanceDB table for a label.
894    ///
895    /// Invalidate cached table state (call after writes).
896    pub fn invalidate_table_cache(&self, label: &str) {
897        let name = table_names::vertex_table_name(label);
898        self.backend.invalidate_cache(&name);
899    }
900
901    pub fn base_path(&self) -> &str {
902        &self.base_uri
903    }
904
905    pub fn schema_manager(&self) -> &SchemaManager {
906        &self.schema_manager
907    }
908
909    pub fn schema_manager_arc(&self) -> Arc<SchemaManager> {
910        self.schema_manager.clone()
911    }
912
913    /// Get the adjacency manager for the dual-CSR architecture.
914    pub fn adjacency_manager(&self) -> Arc<AdjacencyManager> {
915        Arc::clone(&self.adjacency_manager)
916    }
917
918    /// Warm the adjacency manager for a specific edge type and direction.
919    ///
920    /// Builds the Main CSR from L2 adjacency + L1 delta data in storage.
921    /// Called lazily on first access per edge type or at startup.
922    pub async fn warm_adjacency(
923        &self,
924        edge_type_id: u32,
925        direction: crate::storage::direction::Direction,
926        version: Option<u64>,
927    ) -> anyhow::Result<()> {
928        self.adjacency_manager
929            .warm(self, edge_type_id, direction, version)
930            .await
931    }
932
933    /// Coalesced warm_adjacency() to prevent cache stampede (Issue #13).
934    ///
935    /// Uses double-checked locking to ensure only one concurrent warm() per
936    /// (edge_type, direction) key. Subsequent callers wait for the first to complete.
937    pub async fn warm_adjacency_coalesced(
938        &self,
939        edge_type_id: u32,
940        direction: crate::storage::direction::Direction,
941        version: Option<u64>,
942    ) -> anyhow::Result<()> {
943        self.adjacency_manager
944            .warm_coalesced(self, edge_type_id, direction, version)
945            .await
946    }
947
948    /// Check whether the adjacency manager has a CSR for the given edge type and direction.
949    pub fn has_adjacency_csr(
950        &self,
951        edge_type_id: u32,
952        direction: crate::storage::direction::Direction,
953    ) -> bool {
954        self.adjacency_manager.has_csr(edge_type_id, direction)
955    }
956
957    /// Get neighbors at a specific version for snapshot queries.
958    pub fn get_neighbors_at_version(
959        &self,
960        vid: uni_common::core::id::Vid,
961        edge_type: u32,
962        direction: crate::storage::direction::Direction,
963        version: u64,
964    ) -> Vec<(uni_common::core::id::Vid, uni_common::core::id::Eid)> {
965        self.adjacency_manager
966            .get_neighbors_at_version(vid, edge_type, direction, version)
967    }
968
969    /// Get the storage backend.
970    pub fn backend(&self) -> &dyn StorageBackend {
971        self.backend.as_ref()
972    }
973
974    /// Get the storage backend as an Arc.
975    pub fn backend_arc(&self) -> Arc<dyn StorageBackend> {
976        self.backend.clone()
977    }
978
979    /// Rebuild the VidLabelsIndex from the main vertex table.
980    /// This is called on startup if enable_vid_labels_index is true.
981    async fn rebuild_vid_labels_index(&mut self) -> Result<()> {
982        use crate::storage::vid_labels::VidLabelsIndex;
983
984        let backend = self.backend.as_ref();
985        let vtable = table_names::main_vertex_table_name();
986
987        // Check if the table exists (fresh database)
988        if !backend.table_exists(vtable).await.unwrap_or(false) {
989            self.vid_labels_index = Some(Arc::new(parking_lot::RwLock::new(VidLabelsIndex::new())));
990            return Ok(());
991        }
992
993        // Scan all non-deleted vertices and collect (VID, labels)
994        let request = ScanRequest::all(vtable)
995            .with_filter("_deleted = false")
996            .with_limit(100_000);
997        let batches = backend
998            .scan(request)
999            .await
1000            .map_err(|e| anyhow!("Failed to query main vertex table: {}", e))?;
1001
1002        let mut index = VidLabelsIndex::new();
1003        for batch in batches {
1004            let vid_col = batch
1005                .column_by_name("_vid")
1006                .ok_or_else(|| anyhow!("Missing _vid column"))?
1007                .as_any()
1008                .downcast_ref::<UInt64Array>()
1009                .ok_or_else(|| anyhow!("Invalid _vid column type"))?;
1010
1011            let labels_col = batch
1012                .column_by_name("labels")
1013                .ok_or_else(|| anyhow!("Missing labels column"))?
1014                .as_any()
1015                .downcast_ref::<arrow_array::ListArray>()
1016                .ok_or_else(|| anyhow!("Invalid labels column type"))?;
1017
1018            for row_idx in 0..batch.num_rows() {
1019                let vid = Vid::from(vid_col.value(row_idx));
1020                let labels_array = labels_col.value(row_idx);
1021                let labels_str_array = labels_array
1022                    .as_any()
1023                    .downcast_ref::<arrow_array::StringArray>()
1024                    .ok_or_else(|| anyhow!("Invalid labels array element type"))?;
1025
1026                let labels: Vec<String> = (0..labels_str_array.len())
1027                    .map(|i| labels_str_array.value(i).to_string())
1028                    .collect();
1029
1030                index.insert(vid, labels);
1031            }
1032        }
1033
1034        self.vid_labels_index = Some(Arc::new(parking_lot::RwLock::new(index)));
1035        Ok(())
1036    }
1037
1038    /// Get labels for a VID from the in-memory index.
1039    /// Returns None if the index is disabled or the VID is not found.
1040    pub fn get_labels_from_index(&self, vid: Vid) -> Option<Vec<String>> {
1041        self.vid_labels_index.as_ref().and_then(|idx| {
1042            let index = idx.read();
1043            index.get_labels(vid).map(|labels| labels.to_vec())
1044        })
1045    }
1046
1047    /// Update the VID-to-labels mapping in the index.
1048    /// No-op if the index is disabled.
1049    pub fn update_vid_labels_index(&self, vid: Vid, labels: Vec<String>) {
1050        if let Some(idx) = &self.vid_labels_index {
1051            let mut index = idx.write();
1052            index.insert(vid, labels);
1053        }
1054    }
1055
1056    /// Remove a VID from the labels index.
1057    /// No-op if the index is disabled.
1058    pub fn remove_from_vid_labels_index(&self, vid: Vid) {
1059        if let Some(idx) = &self.vid_labels_index {
1060            let mut index = idx.write();
1061            index.remove_vid(vid);
1062        }
1063    }
1064
1065    pub async fn load_subgraph_cached(
1066        &self,
1067        start_vids: &[Vid],
1068        edge_types: &[u32],
1069        max_hops: usize,
1070        direction: GraphDirection,
1071        _l0: Option<Arc<RwLock<L0Buffer>>>,
1072    ) -> Result<WorkingGraph> {
1073        let mut graph = WorkingGraph::new();
1074
1075        let dir = match direction {
1076            GraphDirection::Outgoing => crate::storage::direction::Direction::Outgoing,
1077            GraphDirection::Incoming => crate::storage::direction::Direction::Incoming,
1078        };
1079
1080        let neighbor_is_dst = matches!(direction, GraphDirection::Outgoing);
1081
1082        // Initialize frontier
1083        let mut frontier: Vec<Vid> = start_vids.to_vec();
1084        let mut visited: HashSet<Vid> = HashSet::new();
1085
1086        // Initialize start vids
1087        for &vid in start_vids {
1088            graph.add_vertex(vid);
1089        }
1090
1091        for _hop in 0..max_hops {
1092            let mut next_frontier = HashSet::new();
1093
1094            for &vid in &frontier {
1095                if visited.contains(&vid) {
1096                    continue;
1097                }
1098                visited.insert(vid);
1099                graph.add_vertex(vid);
1100
1101                for &etype_id in edge_types {
1102                    // Warm adjacency with coalescing to prevent cache stampede (Issue #13)
1103                    let edge_ver = self.version_high_water_mark();
1104                    self.adjacency_manager
1105                        .warm_coalesced(self, etype_id, dir, edge_ver)
1106                        .await?;
1107
1108                    // Get neighbors from AdjacencyManager (Main CSR + overlay)
1109                    let edges = self.adjacency_manager.get_neighbors(vid, etype_id, dir);
1110
1111                    for (neighbor_vid, eid) in edges {
1112                        graph.add_vertex(neighbor_vid);
1113                        if !visited.contains(&neighbor_vid) {
1114                            next_frontier.insert(neighbor_vid);
1115                        }
1116
1117                        if neighbor_is_dst {
1118                            graph.add_edge(vid, neighbor_vid, eid, etype_id);
1119                        } else {
1120                            graph.add_edge(neighbor_vid, vid, eid, etype_id);
1121                        }
1122                    }
1123                }
1124            }
1125            frontier = next_frontier.into_iter().collect();
1126
1127            // Early termination: if frontier is empty, no more vertices to explore
1128            if frontier.is_empty() {
1129                break;
1130            }
1131        }
1132
1133        Ok(graph)
1134    }
1135
1136    pub fn snapshot_manager(&self) -> &SnapshotManager {
1137        &self.snapshot_manager
1138    }
1139
1140    pub fn index_manager(&self) -> IndexManager {
1141        IndexManager::new(&self.base_uri, self.schema_manager.clone())
1142    }
1143
1144    // ========================================================================
1145    // Domain-level scan methods — encapsulate LanceDB queries for consumers
1146    // ========================================================================
1147
1148    /// Scan a per-label vertex table. Returns `None` if the table doesn't exist.
1149    ///
1150    /// Internally opens the table, filters requested columns to those that
1151    /// physically exist, and applies the version HWM filter for snapshot isolation.
1152    pub async fn scan_vertex_table(
1153        &self,
1154        label: &str,
1155        columns: &[&str],
1156        additional_filter: Option<&str>,
1157    ) -> Result<Option<arrow_array::RecordBatch>> {
1158        let backend = self.backend();
1159        let table_name = table_names::vertex_table_name(label);
1160
1161        if !backend.table_exists(&table_name).await.unwrap_or(false) {
1162            return Ok(None);
1163        }
1164
1165        // Filter columns to those that exist in the table
1166        let actual_columns =
1167            if let Some(table_schema) = backend.get_table_schema(&table_name).await? {
1168                let table_field_names: HashSet<&str> = table_schema
1169                    .fields()
1170                    .iter()
1171                    .map(|f| f.name().as_str())
1172                    .collect();
1173                columns
1174                    .iter()
1175                    .copied()
1176                    .filter(|c| table_field_names.contains(c))
1177                    .map(|s| s.to_string())
1178                    .collect::<Vec<_>>()
1179            } else {
1180                return Ok(None);
1181            };
1182
1183        // Build filter with version HWM + optional additional filter
1184        let filter = match (self.version_high_water_mark(), additional_filter) {
1185            (Some(hwm), Some(f)) => Some(format!("_version <= {} AND ({})", hwm, f)),
1186            (Some(hwm), None) => Some(format!("_version <= {}", hwm)),
1187            (None, Some(f)) => Some(f.to_string()),
1188            (None, None) => None,
1189        };
1190
1191        let mut request = ScanRequest::all(&table_name).with_columns(actual_columns);
1192        if let Some(f) = filter {
1193            request = request.with_filter(f);
1194        }
1195
1196        match backend.scan(request).await {
1197            Ok(batches) => {
1198                if batches.is_empty() {
1199                    Ok(None)
1200                } else {
1201                    Ok(Some(arrow::compute::concat_batches(
1202                        &batches[0].schema(),
1203                        &batches,
1204                    )?))
1205                }
1206            }
1207            Err(_) => Ok(None),
1208        }
1209    }
1210
1211    /// Scan a delta table for an edge type + direction.
1212    /// Returns `None` if the table doesn't exist.
1213    pub async fn scan_delta_table(
1214        &self,
1215        edge_type: &str,
1216        direction: &str,
1217        columns: &[&str],
1218        additional_filter: Option<&str>,
1219    ) -> Result<Option<arrow_array::RecordBatch>> {
1220        let backend = self.backend();
1221        let table_name = table_names::delta_table_name(edge_type, direction);
1222
1223        if !backend.table_exists(&table_name).await.unwrap_or(false) {
1224            return Ok(None);
1225        }
1226
1227        // Filter columns to those that exist
1228        let actual_columns =
1229            if let Some(table_schema) = backend.get_table_schema(&table_name).await? {
1230                let table_field_names: HashSet<&str> = table_schema
1231                    .fields()
1232                    .iter()
1233                    .map(|f| f.name().as_str())
1234                    .collect();
1235                columns
1236                    .iter()
1237                    .copied()
1238                    .filter(|c| table_field_names.contains(c))
1239                    .map(|s| s.to_string())
1240                    .collect::<Vec<_>>()
1241            } else {
1242                return Ok(None);
1243            };
1244
1245        let filter = match (self.version_high_water_mark(), additional_filter) {
1246            (Some(hwm), Some(f)) => Some(format!("_version <= {} AND ({})", hwm, f)),
1247            (Some(hwm), None) => Some(format!("_version <= {}", hwm)),
1248            (None, Some(f)) => Some(f.to_string()),
1249            (None, None) => None,
1250        };
1251
1252        let mut request = ScanRequest::all(&table_name).with_columns(actual_columns);
1253        if let Some(f) = filter {
1254            request = request.with_filter(f);
1255        }
1256
1257        match backend.scan(request).await {
1258            Ok(batches) => {
1259                if batches.is_empty() {
1260                    Ok(None)
1261                } else {
1262                    Ok(Some(arrow::compute::concat_batches(
1263                        &batches[0].schema(),
1264                        &batches,
1265                    )?))
1266                }
1267            }
1268            Err(_) => Ok(None),
1269        }
1270    }
1271
1272    /// Scan the unified main vertex table. Returns `None` if table doesn't exist.
1273    ///
1274    /// Applies version HWM filter internally for snapshot isolation, combined
1275    /// with any caller-provided filter (label conditions, etc.).
1276    pub async fn scan_main_vertex_table(
1277        &self,
1278        columns: &[&str],
1279        filter: Option<&str>,
1280    ) -> Result<Option<arrow_array::RecordBatch>> {
1281        let backend = self.backend();
1282        let table_name = table_names::main_vertex_table_name();
1283
1284        if !backend.table_exists(table_name).await.unwrap_or(false) {
1285            return Ok(None);
1286        }
1287
1288        // Combine caller filter with version HWM for snapshot isolation
1289        let full_filter = match (self.version_high_water_mark(), filter) {
1290            (Some(hwm), Some(f)) => Some(format!("_version <= {} AND ({})", hwm, f)),
1291            (Some(hwm), None) => Some(format!("_version <= {}", hwm)),
1292            (None, Some(f)) => Some(f.to_string()),
1293            (None, None) => None,
1294        };
1295
1296        let request = ScanRequest::all(table_name)
1297            .with_columns(columns.iter().map(|s| s.to_string()).collect());
1298        let request = match full_filter.as_deref() {
1299            Some(f) => request.with_filter(f),
1300            None => request,
1301        };
1302
1303        match backend.scan(request).await {
1304            Ok(batches) => {
1305                if batches.is_empty() {
1306                    Ok(None)
1307                } else {
1308                    Ok(Some(arrow::compute::concat_batches(
1309                        &batches[0].schema(),
1310                        &batches,
1311                    )?))
1312                }
1313            }
1314            Err(_) => Ok(None),
1315        }
1316    }
1317
1318    /// Scan the main edge table as a stream. Returns `None` if table doesn't exist.
1319    pub async fn scan_main_edge_table_stream(
1320        &self,
1321        filter: Option<&str>,
1322    ) -> Result<
1323        Option<
1324            std::pin::Pin<Box<dyn futures::Stream<Item = Result<arrow_array::RecordBatch>> + Send>>,
1325        >,
1326    > {
1327        let backend = self.backend();
1328        let table_name = table_names::main_edge_table_name();
1329
1330        if !backend.table_exists(table_name).await.unwrap_or(false) {
1331            return Ok(None);
1332        }
1333
1334        let mut request = ScanRequest::all(table_name);
1335        if let Some(f) = filter {
1336            request = request.with_filter(f);
1337        }
1338
1339        let stream = backend.scan_stream(request).await?;
1340        Ok(Some(stream))
1341    }
1342
1343    /// Scan a per-label vertex table as a stream. Returns `None` if table doesn't exist.
1344    pub async fn scan_vertex_table_stream(
1345        &self,
1346        label: &str,
1347    ) -> Result<
1348        Option<
1349            std::pin::Pin<Box<dyn futures::Stream<Item = Result<arrow_array::RecordBatch>> + Send>>,
1350        >,
1351    > {
1352        let backend = self.backend();
1353        let table_name = table_names::vertex_table_name(label);
1354
1355        if !backend.table_exists(&table_name).await.unwrap_or(false) {
1356            return Ok(None);
1357        }
1358
1359        let stream = backend.scan_stream(ScanRequest::all(&table_name)).await?;
1360        Ok(Some(stream))
1361    }
1362
1363    /// Find a vertex VID by external ID. Uses pinned snapshot HWM if present.
1364    pub async fn find_vertex_by_ext_id(&self, ext_id: &str) -> Result<Option<Vid>> {
1365        MainVertexDataset::find_by_ext_id(self.backend(), ext_id, self.version_high_water_mark())
1366            .await
1367    }
1368
1369    /// Find labels for a vertex by VID. Uses pinned snapshot HWM if present.
1370    pub async fn find_vertex_labels_by_vid(&self, vid: Vid) -> Result<Option<Vec<String>>> {
1371        MainVertexDataset::find_labels_by_vid(self.backend(), vid, self.version_high_water_mark())
1372            .await
1373    }
1374
1375    /// Find edges from the main edge table by type names.
1376    pub async fn find_edges_by_type_names(
1377        &self,
1378        type_names: &[&str],
1379    ) -> Result<Vec<(Eid, Vid, Vid, String, uni_common::Properties)>> {
1380        MainEdgeDataset::find_edges_by_type_names(self.backend(), type_names).await
1381    }
1382
1383    /// Scan vertex candidates matching a filter. Returns VIDs where `_deleted = false`.
1384    pub async fn scan_vertex_candidates(
1385        &self,
1386        label: &str,
1387        filter: Option<&str>,
1388    ) -> Result<Vec<Vid>> {
1389        let backend = self.backend();
1390        let table_name = table_names::vertex_table_name(label);
1391
1392        if !backend.table_exists(&table_name).await.unwrap_or(false) {
1393            return Ok(Vec::new());
1394        }
1395
1396        let full_filter = match filter {
1397            Some(f) => format!("_deleted = false AND ({})", f),
1398            None => "_deleted = false".to_string(),
1399        };
1400
1401        let request = ScanRequest::all(&table_name)
1402            .with_filter(full_filter)
1403            .with_columns(vec!["_vid".to_string()]);
1404
1405        let batches = backend.scan(request).await?;
1406
1407        let mut vids = Vec::new();
1408        for batch in batches {
1409            let vid_col = batch
1410                .column_by_name("_vid")
1411                .ok_or(anyhow!("Missing _vid"))?
1412                .as_any()
1413                .downcast_ref::<UInt64Array>()
1414                .ok_or(anyhow!("Invalid _vid"))?;
1415            for i in 0..batch.num_rows() {
1416                vids.push(Vid::from(vid_col.value(i)));
1417            }
1418        }
1419        Ok(vids)
1420    }
1421
1422    pub fn vertex_dataset(&self, label: &str) -> Result<VertexDataset> {
1423        let schema = self.schema_manager.schema();
1424        let label_meta = schema
1425            .labels
1426            .get(label)
1427            .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
1428        let key = format!("vertices_{label}");
1429        match self.fork_branch_for(&key) {
1430            Some(branch) => Ok(VertexDataset::new_branched(
1431                &self.base_uri,
1432                label,
1433                label_meta.id,
1434                branch,
1435            )),
1436            None => Ok(VertexDataset::new(&self.base_uri, label, label_meta.id)),
1437        }
1438    }
1439
1440    #[cfg(feature = "lance-backend")]
1441    pub fn edge_dataset(
1442        &self,
1443        edge_type: &str,
1444        src_label: &str,
1445        dst_label: &str,
1446    ) -> Result<EdgeDataset> {
1447        let key = format!("edges_{edge_type}");
1448        match self.fork_branch_for(&key) {
1449            Some(branch) => Ok(EdgeDataset::new_branched(
1450                &self.base_uri,
1451                edge_type,
1452                src_label,
1453                dst_label,
1454                branch,
1455            )),
1456            None => Ok(EdgeDataset::new(
1457                &self.base_uri,
1458                edge_type,
1459                src_label,
1460                dst_label,
1461            )),
1462        }
1463    }
1464
1465    pub fn delta_dataset(&self, edge_type: &str, direction: &str) -> Result<DeltaDataset> {
1466        let key = format!("deltas_{edge_type}_{direction}");
1467        match self.fork_branch_for(&key) {
1468            Some(branch) => Ok(DeltaDataset::new_branched(
1469                &self.base_uri,
1470                edge_type,
1471                direction,
1472                branch,
1473            )),
1474            None => Ok(DeltaDataset::new(&self.base_uri, edge_type, direction)),
1475        }
1476    }
1477
1478    pub fn adjacency_dataset(
1479        &self,
1480        edge_type: &str,
1481        label: &str,
1482        direction: &str,
1483    ) -> Result<AdjacencyDataset> {
1484        let key = format!("adjacency_{direction}_{edge_type}_{label}");
1485        match self.fork_branch_for(&key) {
1486            Some(branch) => Ok(AdjacencyDataset::new_branched(
1487                &self.base_uri,
1488                edge_type,
1489                label,
1490                direction,
1491                branch,
1492            )),
1493            None => Ok(AdjacencyDataset::new(
1494                &self.base_uri,
1495                edge_type,
1496                label,
1497                direction,
1498            )),
1499        }
1500    }
1501
1502    /// Look up the branch name for a dataset under the active fork
1503    /// scope, if any. Returns `None` when not forked, or when the fork
1504    /// hasn't recorded a branch on this dataset yet (Phase 2 territory).
1505    fn fork_branch_for(&self, dataset_name: &str) -> Option<String> {
1506        self.fork_scope
1507            .as_ref()
1508            .and_then(|s| s.branch_for(dataset_name))
1509    }
1510
1511    /// Get the main vertex dataset for unified vertex storage.
1512    ///
1513    /// The main vertex dataset contains all vertices regardless of label,
1514    /// enabling fast ID-based lookups without knowing the label.
1515    pub fn main_vertex_dataset(&self) -> MainVertexDataset {
1516        MainVertexDataset::new(&self.base_uri)
1517    }
1518
1519    /// Get the main edge dataset for unified edge storage.
1520    ///
1521    /// The main edge dataset contains all edges regardless of type,
1522    /// enabling fast ID-based lookups without knowing the edge type.
1523    pub fn main_edge_dataset(&self) -> MainEdgeDataset {
1524        MainEdgeDataset::new(&self.base_uri)
1525    }
1526
1527    #[cfg(feature = "lance-backend")]
1528    pub fn uid_index(&self, label: &str) -> Result<UidIndex> {
1529        Ok(UidIndex::new(&self.base_uri, label))
1530    }
1531
1532    #[cfg(feature = "lance-backend")]
1533    pub async fn inverted_index(&self, label: &str, property: &str) -> Result<InvertedIndex> {
1534        let schema = self.schema_manager.schema();
1535        let config = schema
1536            .indexes
1537            .iter()
1538            .find_map(|idx| match idx {
1539                IndexDefinition::Inverted(cfg)
1540                    if cfg.label == label && cfg.property == property =>
1541                {
1542                    Some(cfg.clone())
1543                }
1544                _ => None,
1545            })
1546            .ok_or_else(|| anyhow!("Inverted index not found for {}.{}", label, property))?;
1547
1548        InvertedIndex::new(&self.base_uri, config).await
1549    }
1550
1551    pub async fn vector_search(
1552        &self,
1553        label: &str,
1554        property: &str,
1555        query: &[f32],
1556        k: usize,
1557        filter: Option<&str>,
1558        ctx: Option<&QueryContext>,
1559    ) -> Result<Vec<(Vid, f32)>> {
1560        use crate::backend::types::{DistanceMetric as BackendMetric, FilterExpr};
1561
1562        // Look up vector index config to get the correct distance metric.
1563        let schema = self.schema_manager.schema();
1564        let metric = schema
1565            .vector_index_for_property(label, property)
1566            .map(|config| config.metric.clone())
1567            .unwrap_or(DistanceMetric::L2);
1568
1569        let backend = self.backend.as_ref();
1570        let name = table_names::vertex_table_name(label);
1571
1572        let mut results = Vec::new();
1573
1574        // Only search if the table exists
1575        if backend.table_exists(&name).await.unwrap_or(false) {
1576            let backend_metric = match &metric {
1577                DistanceMetric::L2 => BackendMetric::L2,
1578                DistanceMetric::Cosine => BackendMetric::Cosine,
1579                DistanceMetric::Dot => BackendMetric::Dot,
1580                _ => BackendMetric::L2,
1581            };
1582
1583            // Build combined filter: _deleted = false + optional user filter + HWM
1584            let mut filter_parts = vec![Self::build_active_filter(filter)];
1585            if ctx.is_some()
1586                && let Some(hwm) = self.version_high_water_mark()
1587            {
1588                filter_parts.push(format!("_version <= {}", hwm));
1589            }
1590            let combined_filter = FilterExpr::Sql(filter_parts.join(" AND "));
1591
1592            let batches = backend
1593                .vector_search(&name, property, query, k, backend_metric, combined_filter)
1594                .await?;
1595
1596            results = extract_vid_score_pairs(&batches, "_vid", "_distance")?;
1597        }
1598
1599        // Merge L0 buffer vertices into results for visibility of unflushed data.
1600        if let Some(qctx) = ctx {
1601            merge_l0_into_vector_results(&mut results, qctx, label, property, query, k, &metric);
1602        }
1603
1604        Ok(results)
1605    }
1606
1607    /// Perform a full-text search with BM25 scoring.
1608    ///
1609    /// Returns vertices matching the search query along with their BM25 scores.
1610    /// Results are sorted by score descending (most relevant first).
1611    ///
1612    /// # Arguments
1613    /// * `label` - The label to search within
1614    /// * `property` - The property column to search (must have FTS index)
1615    /// * `query` - The search query text
1616    /// * `k` - Maximum number of results to return
1617    /// * `filter` - Optional Lance filter expression
1618    /// * `ctx` - Optional query context for visibility checks
1619    ///
1620    /// # Returns
1621    /// Vector of (Vid, score) tuples, where score is the BM25 relevance score.
1622    pub async fn fts_search(
1623        &self,
1624        label: &str,
1625        property: &str,
1626        query: &str,
1627        k: usize,
1628        filter: Option<&str>,
1629        ctx: Option<&QueryContext>,
1630    ) -> Result<Vec<(Vid, f32)>> {
1631        use crate::backend::types::FilterExpr;
1632
1633        let backend = self.backend.as_ref();
1634        let name = table_names::vertex_table_name(label);
1635
1636        let mut results = if backend.table_exists(&name).await.unwrap_or(false) {
1637            // Build combined filter: _deleted = false + optional user filter + HWM
1638            let mut filter_parts = vec![Self::build_active_filter(filter)];
1639            if ctx.is_some()
1640                && let Some(hwm) = self.version_high_water_mark()
1641            {
1642                filter_parts.push(format!("_version <= {}", hwm));
1643            }
1644            let combined_filter = FilterExpr::Sql(filter_parts.join(" AND "));
1645
1646            let batches = backend
1647                .full_text_search(&name, property, query, k, combined_filter)
1648                .await?;
1649
1650            let mut fts_results = extract_vid_score_pairs(&batches, "_vid", "_score")?;
1651            // Results should already be sorted by score from backend, but ensure descending order
1652            fts_results.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
1653            fts_results
1654        } else {
1655            Vec::new()
1656        };
1657
1658        // Merge L0 buffer vertices for visibility of unflushed data.
1659        if let Some(qctx) = ctx {
1660            merge_l0_into_fts_results(&mut results, qctx, label, property, query, k);
1661        }
1662
1663        Ok(results)
1664    }
1665
1666    #[cfg(feature = "lance-backend")]
1667    pub async fn get_vertex_by_uid(&self, uid: &UniId, label: &str) -> Result<Option<Vid>> {
1668        let index = self.uid_index(label)?;
1669        index.get_vid(uid).await
1670    }
1671
1672    #[cfg(feature = "lance-backend")]
1673    pub async fn insert_vertex_with_uid(&self, label: &str, vid: Vid, uid: UniId) -> Result<()> {
1674        let index = self.uid_index(label)?;
1675        index.write_mapping(&[(uid, vid)]).await
1676    }
1677
1678    pub async fn load_subgraph(
1679        &self,
1680        start_vids: &[Vid],
1681        edge_types: &[u32],
1682        max_hops: usize,
1683        direction: GraphDirection,
1684        l0: Option<&L0Buffer>,
1685    ) -> Result<WorkingGraph> {
1686        let mut graph = WorkingGraph::new();
1687        let schema = self.schema_manager.schema();
1688
1689        // Build maps for ID lookups
1690        let label_map: HashMap<u16, String> = schema
1691            .labels
1692            .values()
1693            .map(|meta| {
1694                (
1695                    meta.id,
1696                    schema.label_name_by_id(meta.id).unwrap().to_owned(),
1697                )
1698            })
1699            .collect();
1700
1701        let edge_type_map: HashMap<u32, String> = schema
1702            .edge_types
1703            .values()
1704            .map(|meta| {
1705                (
1706                    meta.id,
1707                    schema.edge_type_name_by_id(meta.id).unwrap().to_owned(),
1708                )
1709            })
1710            .collect();
1711
1712        let target_edge_types: HashSet<u32> = edge_types.iter().cloned().collect();
1713
1714        // Initialize frontier
1715        let mut frontier: Vec<Vid> = start_vids.to_vec();
1716        let mut visited: HashSet<Vid> = HashSet::new();
1717
1718        // Add start vertices to graph
1719        for &vid in start_vids {
1720            graph.add_vertex(vid);
1721        }
1722
1723        for _hop in 0..max_hops {
1724            let mut next_frontier = HashSet::new();
1725
1726            for &vid in &frontier {
1727                if visited.contains(&vid) {
1728                    continue;
1729                }
1730                visited.insert(vid);
1731                graph.add_vertex(vid);
1732
1733                // For each edge type we want to traverse
1734                for &etype_id in &target_edge_types {
1735                    let etype_name = edge_type_map
1736                        .get(&etype_id)
1737                        .ok_or_else(|| anyhow!("Unknown edge type ID: {}", etype_id))?;
1738
1739                    // Determine directions
1740                    // Storage direction: "fwd" or "bwd".
1741                    // Query direction: Outgoing -> "fwd", Incoming -> "bwd".
1742                    let (dir_str, neighbor_is_dst) = match direction {
1743                        GraphDirection::Outgoing => ("fwd", true),
1744                        GraphDirection::Incoming => ("bwd", false),
1745                    };
1746
1747                    let mut edges: HashMap<Eid, EdgeState> = HashMap::new();
1748
1749                    // 1. L2: Adjacency (Base)
1750                    // In the new storage model, VIDs don't embed label info.
1751                    // We need to try all labels to find the adjacency data.
1752                    // Edge version from snapshot (reserved for future version filtering)
1753                    let _edge_ver = self
1754                        .pinned_snapshot
1755                        .as_ref()
1756                        .and_then(|s| s.edges.get(etype_name).map(|es| es.lance_version));
1757
1758                    // Try each label until we find adjacency data
1759                    let backend = self.backend();
1760                    for current_src_label in label_map.values() {
1761                        let adj_ds =
1762                            match self.adjacency_dataset(etype_name, current_src_label, dir_str) {
1763                                Ok(ds) => ds,
1764                                Err(_) => continue,
1765                            };
1766                        if let Some((neighbors, eids)) =
1767                            adj_ds.read_adjacency_backend(backend, vid).await?
1768                        {
1769                            for (n, eid) in neighbors.into_iter().zip(eids) {
1770                                edges.insert(
1771                                    eid,
1772                                    EdgeState {
1773                                        neighbor: n,
1774                                        version: 0,
1775                                        deleted: false,
1776                                    },
1777                                );
1778                            }
1779                            break; // Found adjacency data for this vid, no need to try other labels
1780                        }
1781                    }
1782
1783                    // 2. L1: Delta
1784                    let delta_ds = self.delta_dataset(etype_name, dir_str)?;
1785                    let delta_entries = delta_ds
1786                        .read_deltas(backend, vid, &schema, self.version_high_water_mark())
1787                        .await?;
1788                    Self::apply_delta_to_edges(&mut edges, delta_entries, neighbor_is_dst);
1789
1790                    // 3. L0: Buffer
1791                    if let Some(l0) = l0 {
1792                        Self::apply_l0_to_edges(&mut edges, l0, vid, etype_id, direction);
1793                    }
1794
1795                    // Add resulting edges to graph
1796                    Self::add_edges_to_graph(
1797                        &mut graph,
1798                        edges,
1799                        vid,
1800                        etype_id,
1801                        neighbor_is_dst,
1802                        &visited,
1803                        &mut next_frontier,
1804                    );
1805                }
1806            }
1807            frontier = next_frontier.into_iter().collect();
1808
1809            // Early termination: if frontier is empty, no more vertices to explore
1810            if frontier.is_empty() {
1811                break;
1812            }
1813        }
1814
1815        Ok(graph)
1816    }
1817
1818    /// Apply delta entries to edge state map, handling version conflicts.
1819    fn apply_delta_to_edges(
1820        edges: &mut HashMap<Eid, EdgeState>,
1821        delta_entries: Vec<crate::storage::delta::L1Entry>,
1822        neighbor_is_dst: bool,
1823    ) {
1824        for entry in delta_entries {
1825            let neighbor = if neighbor_is_dst {
1826                entry.dst_vid
1827            } else {
1828                entry.src_vid
1829            };
1830            let current_ver = edges.get(&entry.eid).map(|s| s.version).unwrap_or(0);
1831
1832            if entry.version > current_ver {
1833                edges.insert(
1834                    entry.eid,
1835                    EdgeState {
1836                        neighbor,
1837                        version: entry.version,
1838                        deleted: matches!(entry.op, Op::Delete),
1839                    },
1840                );
1841            }
1842        }
1843    }
1844
1845    /// Apply L0 buffer edges and tombstones to edge state map.
1846    fn apply_l0_to_edges(
1847        edges: &mut HashMap<Eid, EdgeState>,
1848        l0: &L0Buffer,
1849        vid: Vid,
1850        etype_id: u32,
1851        direction: GraphDirection,
1852    ) {
1853        let l0_neighbors = l0.get_neighbors(vid, etype_id, direction);
1854        for (neighbor, eid, ver) in l0_neighbors {
1855            let current_ver = edges.get(&eid).map(|s| s.version).unwrap_or(0);
1856            if ver > current_ver {
1857                edges.insert(
1858                    eid,
1859                    EdgeState {
1860                        neighbor,
1861                        version: ver,
1862                        deleted: false,
1863                    },
1864                );
1865            }
1866        }
1867
1868        // Check tombstones in L0
1869        for (eid, state) in edges.iter_mut() {
1870            if l0.is_tombstoned(*eid) {
1871                state.deleted = true;
1872            }
1873        }
1874    }
1875
1876    /// Add non-deleted edges to graph and collect next frontier.
1877    fn add_edges_to_graph(
1878        graph: &mut WorkingGraph,
1879        edges: HashMap<Eid, EdgeState>,
1880        vid: Vid,
1881        etype_id: u32,
1882        neighbor_is_dst: bool,
1883        visited: &HashSet<Vid>,
1884        next_frontier: &mut HashSet<Vid>,
1885    ) {
1886        for (eid, state) in edges {
1887            if state.deleted {
1888                continue;
1889            }
1890            graph.add_vertex(state.neighbor);
1891
1892            if !visited.contains(&state.neighbor) {
1893                next_frontier.insert(state.neighbor);
1894            }
1895
1896            if neighbor_is_dst {
1897                graph.add_edge(vid, state.neighbor, eid, etype_id);
1898            } else {
1899                graph.add_edge(state.neighbor, vid, eid, etype_id);
1900            }
1901        }
1902    }
1903}
1904
1905/// Extracts `(Vid, f32)` pairs from record batches using the given VID and score column names.
1906fn extract_vid_score_pairs(
1907    batches: &[arrow_array::RecordBatch],
1908    vid_column: &str,
1909    score_column: &str,
1910) -> Result<Vec<(Vid, f32)>> {
1911    let mut results = Vec::new();
1912    for batch in batches {
1913        let vid_col = batch
1914            .column_by_name(vid_column)
1915            .ok_or_else(|| anyhow!("Missing {} column", vid_column))?
1916            .as_any()
1917            .downcast_ref::<UInt64Array>()
1918            .ok_or_else(|| anyhow!("Invalid {} column type", vid_column))?;
1919
1920        let score_col = batch
1921            .column_by_name(score_column)
1922            .ok_or_else(|| anyhow!("Missing {} column", score_column))?
1923            .as_any()
1924            .downcast_ref::<Float32Array>()
1925            .ok_or_else(|| anyhow!("Invalid {} column type", score_column))?;
1926
1927        for i in 0..batch.num_rows() {
1928            results.push((Vid::from(vid_col.value(i)), score_col.value(i)));
1929        }
1930    }
1931    Ok(results)
1932}
1933
1934/// Extracts a `Vec<f32>` from a JSON property value.
1935///
1936/// Returns `None` if the property is missing, not an array, or contains
1937/// non-numeric elements.
1938fn extract_embedding_from_props(
1939    props: &uni_common::Properties,
1940    property: &str,
1941) -> Option<Vec<f32>> {
1942    let arr = props.get(property)?.as_array()?;
1943    arr.iter().map(|v| v.as_f64().map(|f| f as f32)).collect()
1944}
1945
1946/// Merges L0 buffer vertices into LanceDB vector search results.
1947///
1948/// Visits L0 buffers in precedence order (pending flush → main → transaction),
1949/// collects tombstoned VIDs and candidate embeddings, then merges them with the
1950/// existing LanceDB results so that:
1951/// - Tombstoned VIDs are removed (unless re-created in a later L0).
1952/// - VIDs present in both L0 and LanceDB use the L0 distance.
1953/// - New L0-only VIDs are appended.
1954/// - Results are re-sorted by distance ascending and truncated to `k`.
1955fn merge_l0_into_vector_results(
1956    results: &mut Vec<(Vid, f32)>,
1957    ctx: &QueryContext,
1958    label: &str,
1959    property: &str,
1960    query: &[f32],
1961    k: usize,
1962    metric: &DistanceMetric,
1963) {
1964    // Collect all L0 buffers in precedence order (earliest first, last writer wins).
1965    let mut buffers: Vec<Arc<parking_lot::RwLock<L0Buffer>>> =
1966        ctx.pending_flush_l0s.iter().map(Arc::clone).collect();
1967    buffers.push(Arc::clone(&ctx.l0));
1968    if let Some(ref txn) = ctx.transaction_l0 {
1969        buffers.push(Arc::clone(txn));
1970    }
1971
1972    // Maps VID → distance for L0 candidates (last writer wins).
1973    let mut l0_candidates: HashMap<Vid, f32> = HashMap::new();
1974    // Tombstoned VIDs across all L0 buffers.
1975    let mut tombstoned: HashSet<Vid> = HashSet::new();
1976
1977    for buf_arc in &buffers {
1978        let buf = buf_arc.read();
1979
1980        // Accumulate tombstones.
1981        for &vid in &buf.vertex_tombstones {
1982            tombstoned.insert(vid);
1983        }
1984
1985        // Scan vertices with the target label.
1986        for (&vid, labels) in &buf.vertex_labels {
1987            if !labels.iter().any(|l| l == label) {
1988                continue;
1989            }
1990            if let Some(props) = buf.vertex_properties.get(&vid)
1991                && let Some(emb) = extract_embedding_from_props(props, property)
1992            {
1993                if emb.len() != query.len() {
1994                    continue; // dimension mismatch
1995                }
1996                let dist = metric.compute_distance(&emb, query);
1997                // Last writer wins: later buffer overwrites earlier.
1998                l0_candidates.insert(vid, dist);
1999                // If re-created in a later L0, remove from tombstones.
2000                tombstoned.remove(&vid);
2001            }
2002        }
2003    }
2004
2005    // If no L0 activity affects this search, skip merge.
2006    if l0_candidates.is_empty() && tombstoned.is_empty() {
2007        return;
2008    }
2009
2010    // Remove tombstoned VIDs from LanceDB results.
2011    results.retain(|(vid, _)| !tombstoned.contains(vid));
2012
2013    // Overwrite or append L0 candidates.
2014    for (vid, dist) in &l0_candidates {
2015        if let Some(existing) = results.iter_mut().find(|(v, _)| v == vid) {
2016            existing.1 = *dist;
2017        } else {
2018            results.push((*vid, *dist));
2019        }
2020    }
2021
2022    // Re-sort by distance ascending.
2023    results.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
2024    results.truncate(k);
2025}
2026
2027/// Computes a simple token-overlap relevance score between a query and text.
2028///
2029/// Returns the fraction of query tokens found in the text (case-insensitive),
2030/// producing a score in [0.0, 1.0]. Sufficient for the small L0 buffer.
2031fn compute_text_relevance(query: &str, text: &str) -> f32 {
2032    let query_tokens: HashSet<String> =
2033        query.split_whitespace().map(|t| t.to_lowercase()).collect();
2034    if query_tokens.is_empty() {
2035        return 0.0;
2036    }
2037    let text_tokens: HashSet<String> = text.split_whitespace().map(|t| t.to_lowercase()).collect();
2038    let hits = query_tokens
2039        .iter()
2040        .filter(|t| text_tokens.contains(t.as_str()))
2041        .count();
2042    hits as f32 / query_tokens.len() as f32
2043}
2044
2045/// Extracts a string slice from a property value.
2046fn extract_text_from_props<'a>(
2047    props: &'a uni_common::Properties,
2048    property: &str,
2049) -> Option<&'a str> {
2050    props.get(property)?.as_str()
2051}
2052
2053/// Merges L0 buffer vertices into LanceDB full-text search results.
2054///
2055/// Follows the same pattern as [`merge_l0_into_vector_results`]: visits L0
2056/// buffers in precedence order, collects tombstoned VIDs and text-match
2057/// candidates, then merges them so that:
2058/// - Tombstoned VIDs are removed (unless re-created in a later L0).
2059/// - VIDs present in both L0 and LanceDB use the L0 score.
2060/// - New L0-only VIDs are appended.
2061/// - Results are re-sorted by score **descending** and truncated to `k`.
2062fn merge_l0_into_fts_results(
2063    results: &mut Vec<(Vid, f32)>,
2064    ctx: &QueryContext,
2065    label: &str,
2066    property: &str,
2067    query: &str,
2068    k: usize,
2069) {
2070    // Collect all L0 buffers in precedence order (earliest first, last writer wins).
2071    let mut buffers: Vec<Arc<parking_lot::RwLock<L0Buffer>>> =
2072        ctx.pending_flush_l0s.iter().map(Arc::clone).collect();
2073    buffers.push(Arc::clone(&ctx.l0));
2074    if let Some(ref txn) = ctx.transaction_l0 {
2075        buffers.push(Arc::clone(txn));
2076    }
2077
2078    // Maps VID → relevance score for L0 candidates (last writer wins).
2079    let mut l0_candidates: HashMap<Vid, f32> = HashMap::new();
2080    // Tombstoned VIDs across all L0 buffers.
2081    let mut tombstoned: HashSet<Vid> = HashSet::new();
2082
2083    for buf_arc in &buffers {
2084        let buf = buf_arc.read();
2085
2086        // Accumulate tombstones.
2087        for &vid in &buf.vertex_tombstones {
2088            tombstoned.insert(vid);
2089        }
2090
2091        // Scan vertices with the target label.
2092        for (&vid, labels) in &buf.vertex_labels {
2093            if !labels.iter().any(|l| l == label) {
2094                continue;
2095            }
2096            if let Some(props) = buf.vertex_properties.get(&vid)
2097                && let Some(text) = extract_text_from_props(props, property)
2098            {
2099                let score = compute_text_relevance(query, text);
2100                if score > 0.0 {
2101                    // Last writer wins: later buffer overwrites earlier.
2102                    l0_candidates.insert(vid, score);
2103                }
2104                // If re-created in a later L0, remove from tombstones.
2105                tombstoned.remove(&vid);
2106            }
2107        }
2108    }
2109
2110    // If no L0 activity affects this search, skip merge.
2111    if l0_candidates.is_empty() && tombstoned.is_empty() {
2112        return;
2113    }
2114
2115    // Remove tombstoned VIDs from LanceDB results.
2116    results.retain(|(vid, _)| !tombstoned.contains(vid));
2117
2118    // Overwrite or append L0 candidates.
2119    for (vid, score) in &l0_candidates {
2120        if let Some(existing) = results.iter_mut().find(|(v, _)| v == vid) {
2121            existing.1 = *score;
2122        } else {
2123            results.push((*vid, *score));
2124        }
2125    }
2126
2127    // Re-sort by score descending (higher relevance first).
2128    results.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
2129    results.truncate(k);
2130}