Skip to main content

uni_store/storage/
manager.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::backend::StorageBackend;
5#[cfg(feature = "lance-backend")]
6use crate::backend::lance::LanceDbBackend;
7use crate::backend::table_names;
8use crate::backend::types::ScanRequest;
9use crate::compaction::{CompactionStats, CompactionStatus, CompactionTask};
10use crate::runtime::WorkingGraph;
11use crate::runtime::context::QueryContext;
12use crate::runtime::l0::L0Buffer;
13use crate::storage::adjacency::AdjacencyDataset;
14use crate::storage::compaction::Compactor;
15use crate::storage::delta::{DeltaDataset, ENTRY_SIZE_ESTIMATE, Op};
16use crate::storage::direction::Direction;
17#[cfg(feature = "lance-backend")]
18use crate::storage::edge::EdgeDataset;
19#[cfg(feature = "lance-backend")]
20use crate::storage::index::UidIndex;
21#[cfg(feature = "lance-backend")]
22use crate::storage::inverted_index::InvertedIndex;
23use crate::storage::main_edge::MainEdgeDataset;
24use crate::storage::main_vertex::MainVertexDataset;
25use crate::storage::vertex::VertexDataset;
26use anyhow::{Result, anyhow};
27use arrow_array::{Array, Float32Array, TimestampNanosecondArray, UInt64Array};
28use object_store::ObjectStore;
29#[cfg(feature = "lance-backend")]
30use object_store::local::LocalFileSystem;
31use parking_lot::RwLock;
32use std::collections::{HashMap, HashSet};
33use std::sync::{Arc, Mutex};
34use std::time::{Duration, SystemTime, UNIX_EPOCH};
35use tracing::warn;
36use uni_common::config::UniConfig;
37#[cfg(feature = "lance-backend")]
38use uni_common::core::id::UniId;
39use uni_common::core::id::{Eid, Vid};
40#[cfg(feature = "lance-backend")]
41use uni_common::core::schema::IndexDefinition;
42use uni_common::core::schema::{DistanceMetric, SchemaManager};
43use uni_common::sync::acquire_mutex;
44
45use crate::snapshot::manager::SnapshotManager;
46use crate::storage::IndexManager;
47use crate::storage::adjacency_manager::AdjacencyManager;
48use crate::storage::resilient_store::ResilientObjectStore;
49
50use uni_common::core::snapshot::SnapshotManifest;
51
52use uni_common::graph::simple_graph::Direction as GraphDirection;
53
54/// Edge state during subgraph loading - tracks version and deletion status.
55struct EdgeState {
56    neighbor: Vid,
57    version: u64,
58    deleted: bool,
59}
60
61pub struct StorageManager {
62    base_uri: String,
63    store: Arc<dyn ObjectStore>,
64    schema_manager: Arc<SchemaManager>,
65    snapshot_manager: Arc<SnapshotManager>,
66    adjacency_manager: Arc<AdjacencyManager>,
67    pub config: UniConfig,
68    pub compaction_status: Arc<Mutex<CompactionStatus>>,
69    /// Counter of in-flight `flush_to_l1` operations. Compaction skips
70    /// delta-clear when this is non-zero to avoid wiping rows a flush is
71    /// about to append. Counter (not bool) so multiple async flushes can
72    /// be in flight concurrently.
73    pub flush_in_progress: std::sync::atomic::AtomicUsize,
74    /// Optional pinned snapshot for time-travel
75    pinned_snapshot: Option<SnapshotManifest>,
76    /// Optional row-version pin for transaction snapshot reads (C2).
77    ///
78    /// When set, L1 scans filter to `_version <= hwm` exactly like a pinned
79    /// snapshot, but WITHOUT a manifest: a read-write transaction pins the
80    /// version counter observed at begin (`SnapshotView.started_at_version`)
81    /// so an L0→L1 flush completing mid-transaction cannot leak
82    /// post-snapshot rows into its scans. Mutually exclusive with
83    /// `pinned_snapshot`.
84    pinned_version_hwm: Option<u64>,
85    /// Optional fork scope for branch-aware reads (Phase 1 read-only).
86    ///
87    /// Mutually exclusive with `pinned_snapshot`: a single
88    /// `StorageManager` is either pinned to a snapshot or scoped to a
89    /// fork, never both. Phase 4's `pin_to_version` on a forked session
90    /// builds a separate combined manager out of band; Phase 1 forbids
91    /// mixing.
92    fork_scope: Option<Arc<crate::fork::ForkScope>>,
93    /// Pluggable storage backend.
94    backend: Arc<dyn StorageBackend>,
95    /// In-memory VID-to-labels index for O(1) lookups (optional, configurable)
96    vid_labels_index: Option<Arc<parking_lot::RwLock<crate::storage::vid_labels::VidLabelsIndex>>>,
97}
98
99/// RAII counter increment for `StorageManager.flush_in_progress`.
100///
101/// Acquired during the rotate phase of a flush (see
102/// `runtime::writer::flush_l0_rotate`) and dropped when the full
103/// rotate/stream/finalize pipeline completes. Compaction's delta-clear
104/// gate skips while this counter is non-zero, so the counter must
105/// reflect "flush has started, has not completed" — including any
106/// async stream phase running on a spawned task.
107pub struct FlushInProgressGuard {
108    storage: Arc<StorageManager>,
109}
110
111impl FlushInProgressGuard {
112    pub fn new(storage: &Arc<StorageManager>) -> Self {
113        storage
114            .flush_in_progress
115            .fetch_add(1, std::sync::atomic::Ordering::AcqRel);
116        Self {
117            storage: storage.clone(),
118        }
119    }
120}
121
122impl Drop for FlushInProgressGuard {
123    fn drop(&mut self) {
124        // M-PANIC-IS-STOP: must not panic in Drop. Atomic op cannot fail.
125        self.storage
126            .flush_in_progress
127            .fetch_sub(1, std::sync::atomic::Ordering::AcqRel);
128    }
129}
130
131/// Whether a Lance error represents a commit conflict that retrying may
132/// resolve. These fire under async-flush when ≥2 streams concurrently try
133/// to create the same table OR when an Append races with a still-in-progress
134/// Overwrite (create_table). See the Lance commit-conflict-resolver in
135/// `lance-3.0.1/src/io/commit/conflict_resolver.rs`.
136fn is_lance_conflict(err: &anyhow::Error) -> bool {
137    let msg = err.to_string();
138    msg.contains("Incompatible transaction") || msg.contains("conflict")
139}
140
141/// Runs `op` with exponential-backoff retry on Lance commit conflicts.
142/// Up to 10 attempts (~10s worst case); backoff is 1ms, 2ms, 4ms, ...,
143/// 512ms. Non-conflict errors return immediately. `op` is re-invoked each
144/// attempt so it can re-check table existence and adjust strategy.
145async fn retry_on_lance_conflict<F, Fut>(mut op: F) -> anyhow::Result<()>
146where
147    F: FnMut() -> Fut,
148    Fut: std::future::Future<Output = anyhow::Result<()>>,
149{
150    for attempt in 0u32..10 {
151        match op().await {
152            Ok(()) => return Ok(()),
153            Err(e) => {
154                if !is_lance_conflict(&e) || attempt == 9 {
155                    return Err(e);
156                }
157                let backoff_ms = 1u64 << attempt;
158                tokio::time::sleep(std::time::Duration::from_millis(backoff_ms)).await;
159            }
160        }
161    }
162    unreachable!("retry loop exits via Ok or Err")
163}
164
165/// MergeInsert sibling of `write_batch_with_lance_conflict_retry`.
166///
167/// Source `batch` must contain the join columns in `on` plus any
168/// columns to update. Matched rows have `WhenMatched::UpdateAll`
169/// applied; unmatched source rows are dropped (partial writes never
170/// INSERT). Returns an error if the target table does not exist.
171/// Retries on Lance commit conflicts via `retry_on_lance_conflict`.
172/// RecordBatch clones are cheap (column data is Arc'd).
173pub async fn merge_insert_batch_with_lance_conflict_retry(
174    backend: &dyn crate::backend::StorageBackend,
175    table_name: &str,
176    batch: arrow_array::RecordBatch,
177    on: &[&str],
178) -> anyhow::Result<()> {
179    retry_on_lance_conflict(|| async {
180        let exists = backend.table_exists(table_name).await?;
181        if !exists {
182            anyhow::bail!(
183                "merge_insert target table '{}' does not exist (partial writes \
184                 require the row to already be present; CREATE goes through Append)",
185                table_name
186            );
187        }
188        backend
189            .merge_insert(table_name, on, vec![batch.clone()])
190            .await
191    })
192    .await
193}
194
195/// Race-safe write: creates the table if missing, otherwise appends.
196/// Each attempt re-checks `table_exists` and adjusts strategy: Append if
197/// now-exists, Create if still-missing. Retries on Lance commit conflicts
198/// via `retry_on_lance_conflict`.
199///
200/// Used by every dataset's `write_batch` helper to absorb the Lance
201/// commit-conflict-resolver behavior. RecordBatch clones are cheap
202/// (column data is Arc'd).
203pub async fn write_batch_with_lance_conflict_retry(
204    backend: &dyn crate::backend::StorageBackend,
205    table_name: &str,
206    batch: arrow_array::RecordBatch,
207) -> anyhow::Result<()> {
208    use crate::backend::types::WriteMode;
209    retry_on_lance_conflict(|| async {
210        let exists = backend.table_exists(table_name).await?;
211        if exists {
212            backend
213                .write(table_name, vec![batch.clone()], WriteMode::Append)
214                .await
215        } else {
216            backend.create_table(table_name, vec![batch.clone()]).await
217        }
218    })
219    .await
220}
221
222/// Helper to manage compaction_in_progress flag
223struct CompactionGuard {
224    status: Arc<Mutex<CompactionStatus>>,
225}
226
227impl CompactionGuard {
228    fn new(status: Arc<Mutex<CompactionStatus>>) -> Option<Self> {
229        let mut s = acquire_mutex(&status, "compaction_status").ok()?;
230        if s.compaction_in_progress {
231            return None;
232        }
233        s.compaction_in_progress = true;
234        Some(Self {
235            status: status.clone(),
236        })
237    }
238}
239
240impl Drop for CompactionGuard {
241    fn drop(&mut self) {
242        // CRITICAL: Never panic in Drop - panicking in drop() = process ABORT.
243        // See issue #18/#150. If the lock is poisoned, log and continue gracefully.
244        match uni_common::sync::acquire_mutex(&self.status, "compaction_status") {
245            Ok(mut s) => {
246                s.compaction_in_progress = false;
247                s.last_compaction = Some(std::time::SystemTime::now());
248            }
249            Err(e) => {
250                // Lock is poisoned but we're in Drop - cannot panic.
251                // Log the error and continue. System state may be inconsistent but at least
252                // we don't abort the process.
253                log::error!(
254                    "CompactionGuard drop failed to acquire poisoned lock: {}. \
255                     Compaction status may be inconsistent. Issue #18/#150",
256                    e
257                );
258            }
259        }
260    }
261}
262
263impl StorageManager {
264    /// Create a new StorageManager with a pre-configured backend.
265    pub async fn new_with_backend(
266        base_uri: &str,
267        store: Arc<dyn ObjectStore>,
268        backend: Arc<dyn StorageBackend>,
269        schema_manager: Arc<SchemaManager>,
270        config: UniConfig,
271    ) -> Result<Self> {
272        let resilient_store: Arc<dyn ObjectStore> = Arc::new(ResilientObjectStore::new(
273            store,
274            config.object_store.clone(),
275        ));
276
277        let snapshot_manager = Arc::new(SnapshotManager::new(resilient_store.clone()));
278
279        // Perform crash recovery for all known table patterns
280        Self::recover_all_staging_tables(backend.as_ref(), &schema_manager).await?;
281
282        let mut sm = Self {
283            base_uri: base_uri.to_string(),
284            store: resilient_store,
285            schema_manager,
286            snapshot_manager,
287            adjacency_manager: Arc::new(AdjacencyManager::new(config.cache_size)),
288            config,
289            compaction_status: Arc::new(Mutex::new(CompactionStatus::default())),
290            flush_in_progress: std::sync::atomic::AtomicUsize::new(0),
291            pinned_snapshot: None,
292            pinned_version_hwm: None,
293            fork_scope: None,
294            backend,
295            vid_labels_index: None,
296        };
297
298        // Rebuild VidLabelsIndex if enabled
299        if sm.config.enable_vid_labels_index
300            && let Err(e) = sm.rebuild_vid_labels_index().await
301        {
302            warn!(
303                "Failed to rebuild VidLabelsIndex on startup: {}. Falling back to storage queries.",
304                e
305            );
306        }
307
308        Ok(sm)
309    }
310
311    /// Create a new StorageManager with LanceDB integration.
312    #[cfg(feature = "lance-backend")]
313    pub async fn new(base_uri: &str, schema_manager: Arc<SchemaManager>) -> Result<Self> {
314        Self::new_with_config(base_uri, schema_manager, UniConfig::default()).await
315    }
316
317    /// Create a new StorageManager with custom cache size.
318    #[cfg(feature = "lance-backend")]
319    pub async fn new_with_cache(
320        base_uri: &str,
321        schema_manager: Arc<SchemaManager>,
322        adjacency_cache_size: usize,
323    ) -> Result<Self> {
324        let config = UniConfig {
325            cache_size: adjacency_cache_size,
326            ..Default::default()
327        };
328        Self::new_with_config(base_uri, schema_manager, config).await
329    }
330
331    /// Create a new StorageManager with custom configuration.
332    #[cfg(feature = "lance-backend")]
333    pub async fn new_with_config(
334        base_uri: &str,
335        schema_manager: Arc<SchemaManager>,
336        config: UniConfig,
337    ) -> Result<Self> {
338        let store = Self::build_store_from_uri(base_uri)?;
339        Self::new_with_store_and_config(base_uri, store, schema_manager, config).await
340    }
341
342    /// Create a new StorageManager using an already-constructed object store.
343    #[cfg(feature = "lance-backend")]
344    pub async fn new_with_store_and_config(
345        base_uri: &str,
346        store: Arc<dyn ObjectStore>,
347        schema_manager: Arc<SchemaManager>,
348        config: UniConfig,
349    ) -> Result<Self> {
350        Self::new_with_store_and_storage_options(base_uri, store, schema_manager, config, None)
351            .await
352    }
353
354    /// Create a new StorageManager with LanceDB storage options.
355    #[cfg(feature = "lance-backend")]
356    pub async fn new_with_store_and_storage_options(
357        base_uri: &str,
358        store: Arc<dyn ObjectStore>,
359        schema_manager: Arc<SchemaManager>,
360        config: UniConfig,
361        lancedb_storage_options: Option<HashMap<String, String>>,
362    ) -> Result<Self> {
363        let backend = Arc::new(LanceDbBackend::connect(base_uri, lancedb_storage_options).await?);
364        Self::new_with_backend(base_uri, store, backend, schema_manager, config).await
365    }
366
367    /// Recover all staging tables for known table patterns.
368    ///
369    /// This runs on startup to handle crash recovery. It checks for staging tables
370    /// for all vertex labels, adjacency tables, delta tables, and main tables.
371    async fn recover_all_staging_tables(
372        backend: &dyn StorageBackend,
373        schema_manager: &SchemaManager,
374    ) -> Result<()> {
375        let schema = schema_manager.schema();
376
377        // Recover main vertex and edge tables
378        backend
379            .recover_staging(table_names::main_vertex_table_name())
380            .await?;
381        backend
382            .recover_staging(table_names::main_edge_table_name())
383            .await?;
384
385        // Recover per-label vertex tables
386        for label in schema.labels.keys() {
387            let name = table_names::vertex_table_name(label);
388            backend.recover_staging(&name).await?;
389        }
390
391        // Recover adjacency and delta tables for each edge type and direction
392        for edge_type in schema.edge_types.keys() {
393            for direction in &["fwd", "bwd"] {
394                // Recover delta tables
395                let delta_name = table_names::delta_table_name(edge_type, direction);
396                backend.recover_staging(&delta_name).await?;
397
398                // Recover adjacency tables for each label
399                for _label in schema.labels.keys() {
400                    let adj_name = table_names::adjacency_table_name(edge_type, direction);
401                    backend.recover_staging(&adj_name).await?;
402                }
403            }
404        }
405
406        Ok(())
407    }
408
409    #[cfg(feature = "lance-backend")]
410    fn build_store_from_uri(base_uri: &str) -> Result<Arc<dyn ObjectStore>> {
411        if base_uri.contains("://") {
412            let parsed = url::Url::parse(base_uri).map_err(|e| anyhow!("Invalid base URI: {e}"))?;
413            let (store, _path) = object_store::parse_url(&parsed)
414                .map_err(|e| anyhow!("Failed to parse object store URL: {e}"))?;
415            Ok(Arc::from(store))
416        } else {
417            // If local path, ensure it exists.
418            std::fs::create_dir_all(base_uri)?;
419            Ok(Arc::new(LocalFileSystem::new_with_prefix(base_uri)?))
420        }
421    }
422
423    /// Filesystem root backing this manager's object store, when the store
424    /// is a local filesystem (the non-`://` branch of
425    /// `build_store_from_uri`). Used to fsync WAL segments after PUT —
426    /// `object_store::LocalFileSystem` does not fsync on its own. `None`
427    /// for remote/URL-based stores.
428    pub fn local_fs_root(&self) -> Option<std::path::PathBuf> {
429        if self.base_uri.contains("://") {
430            None
431        } else {
432            Some(std::path::PathBuf::from(&self.base_uri))
433        }
434    }
435
436    pub fn pinned(&self, snapshot: SnapshotManifest) -> Self {
437        // Phase 4a: pinning a forked session is now supported. The
438        // resulting StorageManager keeps `fork_scope` so reads continue
439        // to route through the fork's Lance branches via `base_paths`,
440        // and adds `pinned_snapshot` so writers / writers' read views
441        // resolve at the snapshot's HWM. Writes are gated separately by
442        // the session-level `is_pinned` check (`Session::tx` rejects
443        // them via `UniError::ReadOnly`).
444        Self {
445            base_uri: self.base_uri.clone(),
446            store: self.store.clone(),
447            schema_manager: self.schema_manager.clone(),
448            snapshot_manager: self.snapshot_manager.clone(),
449            // Separate AdjacencyManager for snapshot isolation (Issue #73):
450            // warm() will load only edges visible at the snapshot's HWM.
451            // This prevents live DB's CSR (with all edges) from leaking into snapshots.
452            adjacency_manager: Arc::new(AdjacencyManager::new(self.adjacency_manager.max_bytes())),
453            config: self.config.clone(),
454            compaction_status: Arc::new(Mutex::new(CompactionStatus::default())),
455            flush_in_progress: std::sync::atomic::AtomicUsize::new(0),
456            pinned_snapshot: Some(snapshot),
457            pinned_version_hwm: None,
458            fork_scope: self.fork_scope.clone(),
459            backend: self.backend.clone(),
460            vid_labels_index: self.vid_labels_index.clone(),
461        }
462    }
463
464    /// Construct a clone of this `StorageManager` pinned to a row-version
465    /// high-water mark (C2: transaction-level L1 pinning).
466    ///
467    /// Unlike [`Self::pinned`], this needs no `SnapshotManifest`: scans
468    /// filter to `_version <= hwm` via [`Self::version_high_water_mark`].
469    /// A read-write transaction builds one of these at begin with
470    /// `SnapshotView.started_at_version`, so an L0→L1 flush completing
471    /// mid-transaction cannot leak post-snapshot rows into its L1 scans
472    /// (the L0 tier is pinned separately by the `SnapshotView`).
473    ///
474    /// Unlike [`Self::pinned`], the live `AdjacencyManager` is SHARED, not
475    /// fresh: commits replay their edges into the live manager's overlay,
476    /// which is the traversal path's only source for L0-resident edges — a
477    /// fresh manager would make every unflushed edge invisible to the
478    /// transaction. The cost is that the edge tier is not version-pinned
479    /// (post-snapshot edges remain visible to traversals, exactly as before
480    /// C2); edge reads are recorded in the OCC read-set, so a conflicting
481    /// read-modify-write still aborts at commit.
482    pub fn pinned_at_version(&self, hwm: u64) -> Self {
483        Self {
484            base_uri: self.base_uri.clone(),
485            store: self.store.clone(),
486            schema_manager: self.schema_manager.clone(),
487            snapshot_manager: self.snapshot_manager.clone(),
488            adjacency_manager: self.adjacency_manager.clone(),
489            config: self.config.clone(),
490            compaction_status: Arc::new(Mutex::new(CompactionStatus::default())),
491            flush_in_progress: std::sync::atomic::AtomicUsize::new(0),
492            pinned_snapshot: None,
493            pinned_version_hwm: Some(hwm),
494            fork_scope: self.fork_scope.clone(),
495            backend: self.backend.clone(),
496            vid_labels_index: self.vid_labels_index.clone(),
497        }
498    }
499
500    /// Construct a fork-scoped clone of this `StorageManager`.
501    ///
502    /// All reads through dataset factories *and* through `backend()`
503    /// on the returned manager route through the fork's Lance branches
504    /// via `base_paths`. The `AdjacencyManager` is fresh (per Issue
505    /// #73 reasoning — same as `pinned`) to prevent primary's CSR from
506    /// leaking into the fork. `fork_scope` and `pinned_snapshot` are
507    /// mutually exclusive.
508    ///
509    /// The backend is wrapped in [`crate::backend::branched::BranchedBackend`]
510    /// so that every `ScanRequest` constructed *anywhere* (PropertyManager,
511    /// MainVertexDataset static methods, etc.) automatically picks up
512    /// the fork's branch for tables the fork has branched. Untracked
513    /// tables fall back to primary, matching Phase 1 read semantics.
514    pub fn at_fork(&self, scope: Arc<crate::fork::ForkScope>) -> Self {
515        self.at_fork_with_schema(scope, self.schema_manager.clone())
516    }
517
518    /// Variant of [`Self::at_fork`] that uses an explicit
519    /// `merged_schema` for the fork's storage rather than primary's
520    /// schema_manager. Used by `UniInner::at_fork` so that the
521    /// fork-side strict-schema checks (in `uni-query` / `uni-store`'s
522    /// writer) see fork-local labels and edge types added through
523    /// `Session::fork_schema()`. Without this, those checks would
524    /// route through primary's schema and reject fork-local labels.
525    pub fn at_fork_with_schema(
526        &self,
527        scope: Arc<crate::fork::ForkScope>,
528        merged_schema: Arc<SchemaManager>,
529    ) -> Self {
530        debug_assert!(
531            self.pinned_snapshot.is_none(),
532            "forking a pinned StorageManager is unsupported in Phase 1"
533        );
534        let branched_backend: Arc<dyn StorageBackend> = Arc::new(
535            crate::backend::branched::BranchedBackend::new(self.backend.clone(), scope.clone()),
536        );
537        Self {
538            base_uri: self.base_uri.clone(),
539            store: self.store.clone(),
540            schema_manager: merged_schema,
541            snapshot_manager: self.snapshot_manager.clone(),
542            adjacency_manager: Arc::new(AdjacencyManager::new(self.adjacency_manager.max_bytes())),
543            config: self.config.clone(),
544            compaction_status: Arc::new(Mutex::new(CompactionStatus::default())),
545            flush_in_progress: std::sync::atomic::AtomicUsize::new(0),
546            pinned_snapshot: None,
547            pinned_version_hwm: None,
548            fork_scope: Some(scope),
549            backend: branched_backend,
550            vid_labels_index: self.vid_labels_index.clone(),
551        }
552    }
553
554    /// Borrow the active fork scope, if any.
555    pub fn fork_scope(&self) -> Option<&Arc<crate::fork::ForkScope>> {
556        self.fork_scope.as_ref()
557    }
558
559    /// Phase 5a: query whether a fork-local index exists for the
560    /// `(label, column)` pair on the active fork scope. Returns
561    /// `None` outside a fork or when no fork-local build has
562    /// completed for that pair.
563    ///
564    /// The planner consults this to decide whether to emit
565    /// `FusedIndexScan` (returns `Some`) or fall back to the
566    /// inherited primary index via `base_paths` (returns `None`).
567    /// The lookup is a `DashMap::get` on `ForkScope` — O(1) and
568    /// safe to call per query without caching above this layer.
569    #[must_use]
570    pub fn fork_index_exists(
571        &self,
572        label: &str,
573        column: &str,
574    ) -> Option<crate::fork::ForkLocalIndexKind> {
575        self.fork_scope
576            .as_ref()
577            .and_then(|s| s.fork_local_index(label, column))
578    }
579
580    /// Base URI for this storage manager (the directory or remote
581    /// prefix under which dataset directories live).
582    pub fn base_uri(&self) -> &str {
583        &self.base_uri
584    }
585
586    pub fn get_edge_version_by_id(&self, edge_type_id: u32) -> Option<u64> {
587        let schema = self.schema_manager.schema();
588        let name = schema.edge_type_name_by_id(edge_type_id)?;
589        self.pinned_snapshot
590            .as_ref()
591            .and_then(|s| s.edges.get(name).map(|es| es.lance_version))
592            // The flush path stamps `lance_version: 0` ("LanceDB tables don't
593            // expose Lance version directly") — 0 is a stub sentinel, not a
594            // real dataset version. Returning it would route adjacency reads
595            // through `checkout_version(0)` (the empty initial version).
596            .filter(|v| *v != 0)
597    }
598
599    /// Returns the version high-water mark from the pinned snapshot or the
600    /// transaction-level version pin, if present.
601    ///
602    /// Used by the SCAN tier (vertex tables, property reads) to filter data
603    /// by version: when set, only rows with
604    /// `_version <= version_high_water_mark` are visible. The edge/adjacency
605    /// path must use [`Self::snapshot_version_hwm`] instead.
606    pub fn version_high_water_mark(&self) -> Option<u64> {
607        self.pinned_snapshot
608            .as_ref()
609            .map(|s| s.version_high_water_mark)
610            .or(self.pinned_version_hwm)
611    }
612
613    /// Version high-water mark from a manifest-pinned (time-travel) snapshot
614    /// ONLY — never from a transaction-level version pin.
615    ///
616    /// The edge/adjacency read path switches to version-filtered CSR reads
617    /// and skips the L0 overlays when a hwm is present. That is correct for
618    /// time-travel (a snapshot is flushed state, with its own fresh
619    /// `AdjacencyManager`), but a transaction pin shares the LIVE adjacency
620    /// manager and needs live CSR + L0 overlays + its tx-L0 — filtering
621    /// there would hide unflushed edges and poison the shared warm cache.
622    /// The edge tier is deliberately not version-pinned for transactions
623    /// (see [`Self::pinned_at_version`]).
624    pub fn snapshot_version_hwm(&self) -> Option<u64> {
625        self.pinned_snapshot
626            .as_ref()
627            .map(|s| s.version_high_water_mark)
628    }
629
630    /// Apply version filtering to a base filter expression.
631    ///
632    /// If a snapshot is pinned, wraps `base_filter` with an additional
633    /// `_version <= hwm` clause. Otherwise returns `base_filter` unchanged.
634    pub fn apply_version_filter(&self, base_filter: String) -> String {
635        if let Some(hwm) = self.version_high_water_mark() {
636            format!("({}) AND (_version <= {})", base_filter, hwm)
637        } else {
638            base_filter
639        }
640    }
641
642    /// Build a filter expression that excludes soft-deleted rows and optionally
643    /// includes a user-provided filter.
644    fn build_active_filter(user_filter: Option<&str>) -> String {
645        match user_filter {
646            Some(expr) => format!("({}) AND (_deleted = false)", expr),
647            None => "_deleted = false".to_string(),
648        }
649    }
650
651    pub fn store(&self) -> Arc<dyn ObjectStore> {
652        self.store.clone()
653    }
654
655    /// Get current compaction status.
656    ///
657    /// # Errors
658    ///
659    /// Returns error if the compaction status lock is poisoned (see issue #18/#150).
660    pub fn compaction_status(
661        &self,
662    ) -> Result<CompactionStatus, uni_common::sync::LockPoisonedError> {
663        let guard = uni_common::sync::acquire_mutex(&self.compaction_status, "compaction_status")?;
664        Ok(guard.clone())
665    }
666
667    pub async fn compact(&self) -> Result<CompactionStats> {
668        // Backend handles compaction internally via optimize_table()
669        let start = std::time::Instant::now();
670        let schema = self.schema_manager.schema();
671        let mut files_compacted = 0;
672
673        for label in schema.labels.keys() {
674            let name = table_names::vertex_table_name(label);
675            if self.backend.table_exists(&name).await? {
676                self.backend.optimize_table(&name).await?;
677                files_compacted += 1;
678                self.backend.invalidate_cache(&name);
679            }
680        }
681
682        Ok(CompactionStats {
683            files_compacted,
684            bytes_before: 0,
685            bytes_after: 0,
686            duration: start.elapsed(),
687            crdt_merges: 0,
688        })
689    }
690
691    pub async fn compact_label(&self, label: &str) -> Result<CompactionStats> {
692        let _guard = CompactionGuard::new(self.compaction_status.clone())
693            .ok_or_else(|| anyhow!("Compaction already in progress"))?;
694
695        let start = std::time::Instant::now();
696        let name = table_names::vertex_table_name(label);
697
698        if self.backend.table_exists(&name).await? {
699            self.backend.optimize_table(&name).await?;
700            self.backend.invalidate_cache(&name);
701        }
702
703        Ok(CompactionStats {
704            files_compacted: 1,
705            bytes_before: 0,
706            bytes_after: 0,
707            duration: start.elapsed(),
708            crdt_merges: 0,
709        })
710    }
711
712    pub async fn compact_edge_type(&self, edge_type: &str) -> Result<CompactionStats> {
713        let _guard = CompactionGuard::new(self.compaction_status.clone())
714            .ok_or_else(|| anyhow!("Compaction already in progress"))?;
715
716        let start = std::time::Instant::now();
717        let mut files_compacted = 0;
718
719        for dir in ["fwd", "bwd"] {
720            let name = table_names::delta_table_name(edge_type, dir);
721            if self.backend.table_exists(&name).await? {
722                self.backend.optimize_table(&name).await?;
723                files_compacted += 1;
724            }
725        }
726
727        Ok(CompactionStats {
728            files_compacted,
729            bytes_before: 0,
730            bytes_after: 0,
731            duration: start.elapsed(),
732            crdt_merges: 0,
733        })
734    }
735
736    pub async fn wait_for_compaction(&self) -> Result<()> {
737        loop {
738            let in_progress = {
739                acquire_mutex(&self.compaction_status, "compaction_status")?.compaction_in_progress
740            };
741            if !in_progress {
742                return Ok(());
743            }
744            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
745        }
746    }
747
748    pub fn start_background_compaction(
749        self: Arc<Self>,
750        mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
751    ) -> tokio::task::JoinHandle<()> {
752        if !self.config.compaction.enabled {
753            return tokio::spawn(async {});
754        }
755
756        tokio::spawn(async move {
757            // Use interval_at to delay the first tick. tokio::time::interval fires
758            // immediately on the first tick, which can race with queries that run
759            // right after database open. Delaying by the check_interval gives
760            // initial queries time to complete before compaction modifies tables
761            // (optimize(All) can GC index files that concurrent queries depend on).
762            let start = tokio::time::Instant::now() + self.config.compaction.check_interval;
763            let mut interval =
764                tokio::time::interval_at(start, self.config.compaction.check_interval);
765
766            loop {
767                tokio::select! {
768                    _ = interval.tick() => {
769                        if let Err(e) = self.update_compaction_status().await {
770                            log::error!("Failed to update compaction status: {}", e);
771                            continue;
772                        }
773
774                        if let Some(task) = self.pick_compaction_task() {
775                            log::info!("Triggering background compaction: {:?}", task);
776                            if let Err(e) = Self::execute_compaction(Arc::clone(&self), task).await {
777                                log::error!("Compaction failed: {}", e);
778                            }
779                        }
780                    }
781                    _ = shutdown_rx.recv() => {
782                        log::info!("Background compaction shutting down");
783                        let _ = self.wait_for_compaction().await;
784                        break;
785                    }
786                }
787            }
788        })
789    }
790
791    async fn update_compaction_status(&self) -> Result<()> {
792        let schema = self.schema_manager.schema();
793        let backend = self.backend.as_ref();
794        let mut total_rows: usize = 0;
795        let mut oldest_ts: Option<i64> = None;
796
797        for name in schema.edge_types.keys() {
798            for dir in ["fwd", "bwd"] {
799                let tbl_name = table_names::delta_table_name(name, dir);
800                if !backend.table_exists(&tbl_name).await.unwrap_or(false) {
801                    continue;
802                }
803                let row_count = backend.count_rows(&tbl_name, None).await.unwrap_or(0);
804                if row_count == 0 {
805                    continue;
806                }
807                total_rows += row_count;
808
809                // Query oldest _created_at for age tracking
810                let request =
811                    ScanRequest::all(&tbl_name).with_columns(vec!["_created_at".to_string()]);
812                let Ok(batches) = backend.scan(request).await else {
813                    continue;
814                };
815                for batch in batches {
816                    let Some(col) = batch
817                        .column_by_name("_created_at")
818                        .and_then(|c| c.as_any().downcast_ref::<TimestampNanosecondArray>())
819                    else {
820                        continue;
821                    };
822                    for i in 0..col.len() {
823                        if !col.is_null(i) {
824                            let ts = col.value(i);
825                            oldest_ts = Some(oldest_ts.map_or(ts, |prev| prev.min(ts)));
826                        }
827                    }
828                }
829            }
830        }
831
832        let oldest_l1_age = oldest_ts
833            .and_then(|ts| {
834                let created = UNIX_EPOCH + Duration::from_nanos(ts as u64);
835                SystemTime::now().duration_since(created).ok()
836            })
837            .unwrap_or(Duration::ZERO);
838
839        let mut status = acquire_mutex(&self.compaction_status, "compaction_status")?;
840        // Note: l1_runs is managed by flush_to_l1 (increment) and execute_compaction
841        // (reset). It counts flush generations, not delta table count.
842        status.l1_size_bytes = (total_rows * ENTRY_SIZE_ESTIMATE) as u64;
843        status.oldest_l1_age = oldest_l1_age;
844        Ok(())
845    }
846
847    fn pick_compaction_task(&self) -> Option<CompactionTask> {
848        let status = acquire_mutex(&self.compaction_status, "compaction_status").ok()?;
849
850        if status.l1_runs >= self.config.compaction.max_l1_runs {
851            return Some(CompactionTask::ByRunCount);
852        }
853        if status.l1_size_bytes >= self.config.compaction.max_l1_size_bytes {
854            return Some(CompactionTask::BySize);
855        }
856        if status.oldest_l1_age >= self.config.compaction.max_l1_age
857            && status.oldest_l1_age > Duration::ZERO
858        {
859            return Some(CompactionTask::ByAge);
860        }
861
862        None
863    }
864
865    /// Optimize a table via the backend, returning `true` on success.
866    async fn try_optimize_table(backend: &dyn StorageBackend, table_name: &str) -> bool {
867        if let Err(e) = backend.optimize_table(table_name).await {
868            log::warn!("Failed to optimize table {}: {}", table_name, e);
869            return false;
870        }
871        true
872    }
873
874    /// Trigger L1 compaction asynchronously without blocking the caller.
875    /// Safe to call frequently — CompactionGuard prevents concurrent runs.
876    pub fn trigger_async_compaction(self: &Arc<Self>) {
877        let this = Arc::clone(self);
878        tokio::spawn(async move {
879            if let Err(e) = Self::execute_compaction(this, CompactionTask::ByRunCount).await {
880                // "Compaction already in progress" is expected when called frequently
881                log::debug!("Post-flush compaction skipped: {}", e);
882            }
883        });
884    }
885
886    pub(crate) async fn execute_compaction(
887        this: Arc<Self>,
888        _task: CompactionTask,
889    ) -> Result<CompactionStats> {
890        let start = std::time::Instant::now();
891        let _guard = CompactionGuard::new(this.compaction_status.clone())
892            .ok_or_else(|| anyhow!("Compaction already in progress"))?;
893
894        let schema = this.schema_manager.schema();
895        let mut files_compacted = 0;
896
897        // ── Tier 2: Semantic compaction ──
898        // Dedup vertices, merge CRDTs, consolidate L1→L2 deltas, clean tombstones
899        let compactor = Compactor::new(Arc::clone(&this));
900        let compaction_results = compactor.compact_all().await.unwrap_or_else(|e| {
901            log::error!(
902                "Semantic compaction failed (continuing with backend optimize): {}",
903                e
904            );
905            Vec::new()
906        });
907
908        // Re-warm adjacency CSR after semantic compaction
909        let am = this.adjacency_manager();
910        for info in &compaction_results {
911            let direction = match info.direction.as_str() {
912                "fwd" => Direction::Outgoing,
913                "bwd" => Direction::Incoming,
914                _ => continue,
915            };
916            if let Some(etid) = schema.edge_type_id_unified_case_insensitive(&info.edge_type)
917                && let Err(e) = am.warm(&this, etid, direction, None).await
918            {
919                log::warn!(
920                    "Failed to re-warm adjacency for {}/{}: {}",
921                    info.edge_type,
922                    info.direction,
923                    e
924                );
925            }
926        }
927
928        // ── Tier 3: Backend optimize ──
929        let backend = this.backend.as_ref();
930
931        // Optimize edge delta and adjacency tables
932        for name in schema.edge_types.keys() {
933            for dir in ["fwd", "bwd"] {
934                let delta = table_names::delta_table_name(name, dir);
935                if Self::try_optimize_table(backend, &delta).await {
936                    files_compacted += 1;
937                }
938                let adj = table_names::adjacency_table_name(name, dir);
939                if Self::try_optimize_table(backend, &adj).await {
940                    files_compacted += 1;
941                }
942            }
943        }
944
945        // Optimize vertex tables
946        for label in schema.labels.keys() {
947            let tbl = table_names::vertex_table_name(label);
948            if Self::try_optimize_table(backend, &tbl).await {
949                files_compacted += 1;
950                backend.invalidate_cache(&tbl);
951            }
952        }
953
954        // Optimize main vertex and edge tables
955        for tbl in [
956            table_names::main_vertex_table_name(),
957            table_names::main_edge_table_name(),
958        ] {
959            if Self::try_optimize_table(backend, tbl).await {
960                files_compacted += 1;
961            }
962        }
963
964        {
965            let mut status = acquire_mutex(&this.compaction_status, "compaction_status")?;
966            status.total_compactions += 1;
967            status.l1_runs = 0; // Reset flush generation counter
968        }
969
970        Ok(CompactionStats {
971            files_compacted,
972            bytes_before: 0,
973            bytes_after: 0,
974            duration: start.elapsed(),
975            crdt_merges: 0,
976        })
977    }
978
979    /// Open a LanceDB table for a label.
980    ///
981    /// Invalidate cached table state (call after writes).
982    pub fn invalidate_table_cache(&self, label: &str) {
983        let name = table_names::vertex_table_name(label);
984        self.backend.invalidate_cache(&name);
985    }
986
987    pub fn base_path(&self) -> &str {
988        &self.base_uri
989    }
990
991    pub fn schema_manager(&self) -> &SchemaManager {
992        &self.schema_manager
993    }
994
995    pub fn schema_manager_arc(&self) -> Arc<SchemaManager> {
996        self.schema_manager.clone()
997    }
998
999    /// Returns the backing `Arc<SchemaManager>` by reference.
1000    ///
1001    /// Unlike [`Self::schema_manager`] (which derefs to `&SchemaManager`),
1002    /// this preserves the `Arc`'s pointer identity. A pinned transaction and
1003    /// the live session clone the *same* `schema_manager` `Arc`, while forks
1004    /// hold a distinct one — so this is the correct registry key for the
1005    /// projection store (see `uni-query`'s `projection_store::for_storage`).
1006    #[must_use]
1007    pub fn schema_manager_arc_ref(&self) -> &Arc<SchemaManager> {
1008        &self.schema_manager
1009    }
1010
1011    /// Get the adjacency manager for the dual-CSR architecture.
1012    pub fn adjacency_manager(&self) -> Arc<AdjacencyManager> {
1013        Arc::clone(&self.adjacency_manager)
1014    }
1015
1016    /// Warm the adjacency manager for a specific edge type and direction.
1017    ///
1018    /// Builds the Main CSR from L2 adjacency + L1 delta data in storage.
1019    /// Called lazily on first access per edge type or at startup.
1020    pub async fn warm_adjacency(
1021        &self,
1022        edge_type_id: u32,
1023        direction: crate::storage::direction::Direction,
1024        version: Option<u64>,
1025    ) -> anyhow::Result<()> {
1026        self.adjacency_manager
1027            .warm(self, edge_type_id, direction, version)
1028            .await
1029    }
1030
1031    /// Coalesced warm_adjacency() to prevent cache stampede (Issue #13).
1032    ///
1033    /// Uses double-checked locking to ensure only one concurrent warm() per
1034    /// (edge_type, direction) key. Subsequent callers wait for the first to complete.
1035    pub async fn warm_adjacency_coalesced(
1036        &self,
1037        edge_type_id: u32,
1038        direction: crate::storage::direction::Direction,
1039        version: Option<u64>,
1040    ) -> anyhow::Result<()> {
1041        self.adjacency_manager
1042            .warm_coalesced(self, edge_type_id, direction, version)
1043            .await
1044    }
1045
1046    /// Check whether the adjacency manager has a CSR for the given edge type and direction.
1047    pub fn has_adjacency_csr(
1048        &self,
1049        edge_type_id: u32,
1050        direction: crate::storage::direction::Direction,
1051    ) -> bool {
1052        self.adjacency_manager.has_csr(edge_type_id, direction)
1053    }
1054
1055    /// Get neighbors at a specific version for snapshot queries.
1056    pub fn get_neighbors_at_version(
1057        &self,
1058        vid: uni_common::core::id::Vid,
1059        edge_type: u32,
1060        direction: crate::storage::direction::Direction,
1061        version: u64,
1062    ) -> Vec<(uni_common::core::id::Vid, uni_common::core::id::Eid)> {
1063        self.adjacency_manager
1064            .get_neighbors_at_version(vid, edge_type, direction, version)
1065    }
1066
1067    /// Get the storage backend.
1068    pub fn backend(&self) -> &dyn StorageBackend {
1069        self.backend.as_ref()
1070    }
1071
1072    /// Get the storage backend as an Arc.
1073    pub fn backend_arc(&self) -> Arc<dyn StorageBackend> {
1074        self.backend.clone()
1075    }
1076
1077    /// Rebuild the VidLabelsIndex from the main vertex table.
1078    /// This is called on startup if enable_vid_labels_index is true.
1079    async fn rebuild_vid_labels_index(&mut self) -> Result<()> {
1080        use crate::storage::vid_labels::VidLabelsIndex;
1081
1082        let backend = self.backend.as_ref();
1083        let vtable = table_names::main_vertex_table_name();
1084
1085        // Check if the table exists (fresh database)
1086        if !backend.table_exists(vtable).await.unwrap_or(false) {
1087            self.vid_labels_index = Some(Arc::new(parking_lot::RwLock::new(VidLabelsIndex::new())));
1088            return Ok(());
1089        }
1090
1091        // Scan all non-deleted vertices and collect (VID, labels)
1092        let request = ScanRequest::all(vtable)
1093            .with_filter("_deleted = false")
1094            .with_limit(100_000);
1095        let batches = backend
1096            .scan(request)
1097            .await
1098            .map_err(|e| anyhow!("Failed to query main vertex table: {}", e))?;
1099
1100        let mut index = VidLabelsIndex::new();
1101        for batch in batches {
1102            let vid_col = batch
1103                .column_by_name("_vid")
1104                .ok_or_else(|| anyhow!("Missing _vid column"))?
1105                .as_any()
1106                .downcast_ref::<UInt64Array>()
1107                .ok_or_else(|| anyhow!("Invalid _vid column type"))?;
1108
1109            let labels_col = batch
1110                .column_by_name("labels")
1111                .ok_or_else(|| anyhow!("Missing labels column"))?
1112                .as_any()
1113                .downcast_ref::<arrow_array::ListArray>()
1114                .ok_or_else(|| anyhow!("Invalid labels column type"))?;
1115
1116            for row_idx in 0..batch.num_rows() {
1117                let vid = Vid::from(vid_col.value(row_idx));
1118                let labels_array = labels_col.value(row_idx);
1119                let labels_str_array = labels_array
1120                    .as_any()
1121                    .downcast_ref::<arrow_array::StringArray>()
1122                    .ok_or_else(|| anyhow!("Invalid labels array element type"))?;
1123
1124                let labels: Vec<String> = (0..labels_str_array.len())
1125                    .map(|i| labels_str_array.value(i).to_string())
1126                    .collect();
1127
1128                index.insert(vid, labels);
1129            }
1130        }
1131
1132        self.vid_labels_index = Some(Arc::new(parking_lot::RwLock::new(index)));
1133        Ok(())
1134    }
1135
1136    /// Get labels for a VID from the in-memory index.
1137    /// Returns None if the index is disabled or the VID is not found.
1138    pub fn get_labels_from_index(&self, vid: Vid) -> Option<Vec<String>> {
1139        self.vid_labels_index.as_ref().and_then(|idx| {
1140            let index = idx.read();
1141            index.get_labels(vid).map(|labels| labels.to_vec())
1142        })
1143    }
1144
1145    /// Update the VID-to-labels mapping in the index.
1146    /// No-op if the index is disabled.
1147    pub fn update_vid_labels_index(&self, vid: Vid, labels: Vec<String>) {
1148        if let Some(idx) = &self.vid_labels_index {
1149            let mut index = idx.write();
1150            index.insert(vid, labels);
1151        }
1152    }
1153
1154    /// Remove a VID from the labels index.
1155    /// No-op if the index is disabled.
1156    pub fn remove_from_vid_labels_index(&self, vid: Vid) {
1157        if let Some(idx) = &self.vid_labels_index {
1158            let mut index = idx.write();
1159            index.remove_vid(vid);
1160        }
1161    }
1162
1163    pub async fn load_subgraph_cached(
1164        &self,
1165        start_vids: &[Vid],
1166        edge_types: &[u32],
1167        max_hops: usize,
1168        direction: GraphDirection,
1169        _l0: Option<Arc<RwLock<L0Buffer>>>,
1170    ) -> Result<WorkingGraph> {
1171        let mut graph = WorkingGraph::new();
1172
1173        let dir = match direction {
1174            GraphDirection::Outgoing => crate::storage::direction::Direction::Outgoing,
1175            GraphDirection::Incoming => crate::storage::direction::Direction::Incoming,
1176        };
1177
1178        let neighbor_is_dst = matches!(direction, GraphDirection::Outgoing);
1179
1180        // Initialize frontier
1181        let mut frontier: Vec<Vid> = start_vids.to_vec();
1182        let mut visited: HashSet<Vid> = HashSet::new();
1183
1184        // Initialize start vids
1185        for &vid in start_vids {
1186            graph.add_vertex(vid);
1187        }
1188
1189        for _hop in 0..max_hops {
1190            let mut next_frontier = HashSet::new();
1191
1192            for &vid in &frontier {
1193                if visited.contains(&vid) {
1194                    continue;
1195                }
1196                visited.insert(vid);
1197                graph.add_vertex(vid);
1198
1199                for &etype_id in edge_types {
1200                    // Warm adjacency with coalescing to prevent cache stampede (Issue #13).
1201                    // Manifest pin only: a tx version pin shares the LIVE adjacency
1202                    // manager — warming it filtered would poison the shared cache.
1203                    let edge_ver = self.snapshot_version_hwm();
1204                    self.adjacency_manager
1205                        .warm_coalesced(self, etype_id, dir, edge_ver)
1206                        .await?;
1207
1208                    // Get neighbors from AdjacencyManager (Main CSR + overlay)
1209                    let edges = self.adjacency_manager.get_neighbors(vid, etype_id, dir);
1210
1211                    for (neighbor_vid, eid) in edges {
1212                        graph.add_vertex(neighbor_vid);
1213                        if !visited.contains(&neighbor_vid) {
1214                            next_frontier.insert(neighbor_vid);
1215                        }
1216
1217                        if neighbor_is_dst {
1218                            graph.add_edge(vid, neighbor_vid, eid, etype_id);
1219                        } else {
1220                            graph.add_edge(neighbor_vid, vid, eid, etype_id);
1221                        }
1222                    }
1223                }
1224            }
1225            frontier = next_frontier.into_iter().collect();
1226
1227            // Early termination: if frontier is empty, no more vertices to explore
1228            if frontier.is_empty() {
1229                break;
1230            }
1231        }
1232
1233        Ok(graph)
1234    }
1235
1236    pub fn snapshot_manager(&self) -> &SnapshotManager {
1237        &self.snapshot_manager
1238    }
1239
1240    pub fn index_manager(&self) -> IndexManager {
1241        IndexManager::new(&self.base_uri, self.schema_manager.clone())
1242    }
1243
1244    // ========================================================================
1245    // Domain-level scan methods — encapsulate LanceDB queries for consumers
1246    // ========================================================================
1247
1248    /// Scan a per-label vertex table. Returns `None` if the table doesn't exist.
1249    ///
1250    /// Internally opens the table, filters requested columns to those that
1251    /// physically exist, and applies the version HWM filter for snapshot isolation.
1252    pub async fn scan_vertex_table(
1253        &self,
1254        label: &str,
1255        columns: &[&str],
1256        additional_filter: Option<&str>,
1257    ) -> Result<Option<arrow_array::RecordBatch>> {
1258        let backend = self.backend();
1259        let table_name = table_names::vertex_table_name(label);
1260
1261        if !backend.table_exists(&table_name).await.unwrap_or(false) {
1262            return Ok(None);
1263        }
1264
1265        // Filter columns to those that exist in the table
1266        let actual_columns =
1267            if let Some(table_schema) = backend.get_table_schema(&table_name).await? {
1268                let table_field_names: HashSet<&str> = table_schema
1269                    .fields()
1270                    .iter()
1271                    .map(|f| f.name().as_str())
1272                    .collect();
1273                columns
1274                    .iter()
1275                    .copied()
1276                    .filter(|c| table_field_names.contains(c))
1277                    .map(|s| s.to_string())
1278                    .collect::<Vec<_>>()
1279            } else {
1280                return Ok(None);
1281            };
1282
1283        // Build filter with version HWM + optional additional filter
1284        let filter = match (self.version_high_water_mark(), additional_filter) {
1285            (Some(hwm), Some(f)) => Some(format!("_version <= {} AND ({})", hwm, f)),
1286            (Some(hwm), None) => Some(format!("_version <= {}", hwm)),
1287            (None, Some(f)) => Some(f.to_string()),
1288            (None, None) => None,
1289        };
1290
1291        let mut request = ScanRequest::all(&table_name).with_columns(actual_columns);
1292        if let Some(f) = filter {
1293            request = request.with_filter(f);
1294        }
1295
1296        // Fail closed: a scan error (transient I/O, an unparsable filter, a
1297        // corrupt fragment) must propagate, never be silently mapped to
1298        // `Ok(None)`. Callers treat `Ok(None)` as "no rows" — e.g. the MERGE
1299        // fast path would create a duplicate node on a transient failure (review
1300        // bug #3a) — so an error here must surface as an error. A genuinely-
1301        // absent table is already handled above.
1302        let batches = backend.scan(request).await?;
1303        if batches.is_empty() {
1304            Ok(None)
1305        } else {
1306            Ok(Some(arrow::compute::concat_batches(
1307                &batches[0].schema(),
1308                &batches,
1309            )?))
1310        }
1311    }
1312
1313    /// Scan a delta table for an edge type + direction.
1314    /// Returns `None` if the table doesn't exist.
1315    pub async fn scan_delta_table(
1316        &self,
1317        edge_type: &str,
1318        direction: &str,
1319        columns: &[&str],
1320        additional_filter: Option<&str>,
1321    ) -> Result<Option<arrow_array::RecordBatch>> {
1322        // Edge path: manifest pin only. A transaction version pin must NOT
1323        // version-filter edge reads — the edge tier is not version-pinned
1324        // (the live AdjacencyManager + tx-L0 overlay carry unflushed and
1325        // in-transaction edges), so filtering here would hide a relationship
1326        // the same transaction just created (MERGE read-your-writes).
1327        let edge_hwm = self.snapshot_version_hwm();
1328        let backend = self.backend();
1329        let table_name = table_names::delta_table_name(edge_type, direction);
1330
1331        if !backend.table_exists(&table_name).await.unwrap_or(false) {
1332            return Ok(None);
1333        }
1334
1335        // Filter columns to those that exist
1336        let actual_columns =
1337            if let Some(table_schema) = backend.get_table_schema(&table_name).await? {
1338                let table_field_names: HashSet<&str> = table_schema
1339                    .fields()
1340                    .iter()
1341                    .map(|f| f.name().as_str())
1342                    .collect();
1343                columns
1344                    .iter()
1345                    .copied()
1346                    .filter(|c| table_field_names.contains(c))
1347                    .map(|s| s.to_string())
1348                    .collect::<Vec<_>>()
1349            } else {
1350                return Ok(None);
1351            };
1352
1353        let filter = match (edge_hwm, additional_filter) {
1354            (Some(hwm), Some(f)) => Some(format!("_version <= {} AND ({})", hwm, f)),
1355            (Some(hwm), None) => Some(format!("_version <= {}", hwm)),
1356            (None, Some(f)) => Some(f.to_string()),
1357            (None, None) => None,
1358        };
1359
1360        let mut request = ScanRequest::all(&table_name).with_columns(actual_columns);
1361        if let Some(f) = filter {
1362            request = request.with_filter(f);
1363        }
1364
1365        // Fail closed: a scan error (transient I/O, an unparsable filter, a
1366        // corrupt fragment) must propagate, never be silently mapped to
1367        // `Ok(None)`. Callers treat `Ok(None)` as "no rows" — e.g. the MERGE
1368        // fast path would create a duplicate node on a transient failure (review
1369        // bug #3a) — so an error here must surface as an error. A genuinely-
1370        // absent table is already handled above.
1371        let batches = backend.scan(request).await?;
1372        if batches.is_empty() {
1373            Ok(None)
1374        } else {
1375            Ok(Some(arrow::compute::concat_batches(
1376                &batches[0].schema(),
1377                &batches,
1378            )?))
1379        }
1380    }
1381
1382    /// Scan the unified main vertex table. Returns `None` if table doesn't exist.
1383    ///
1384    /// Applies version HWM filter internally for snapshot isolation, combined
1385    /// with any caller-provided filter (label conditions, etc.).
1386    pub async fn scan_main_vertex_table(
1387        &self,
1388        columns: &[&str],
1389        filter: Option<&str>,
1390    ) -> Result<Option<arrow_array::RecordBatch>> {
1391        let backend = self.backend();
1392        let table_name = table_names::main_vertex_table_name();
1393
1394        if !backend.table_exists(table_name).await.unwrap_or(false) {
1395            return Ok(None);
1396        }
1397
1398        // Combine caller filter with version HWM for snapshot isolation
1399        let full_filter = match (self.version_high_water_mark(), filter) {
1400            (Some(hwm), Some(f)) => Some(format!("_version <= {} AND ({})", hwm, f)),
1401            (Some(hwm), None) => Some(format!("_version <= {}", hwm)),
1402            (None, Some(f)) => Some(f.to_string()),
1403            (None, None) => None,
1404        };
1405
1406        let request = ScanRequest::all(table_name)
1407            .with_columns(columns.iter().map(|s| s.to_string()).collect());
1408        let request = match full_filter.as_deref() {
1409            Some(f) => request.with_filter(f),
1410            None => request,
1411        };
1412
1413        // Fail closed: a scan error (transient I/O, an unparsable filter, a
1414        // corrupt fragment) must propagate, never be silently mapped to
1415        // `Ok(None)`. Callers treat `Ok(None)` as "no rows" — e.g. the MERGE
1416        // fast path would create a duplicate node on a transient failure (review
1417        // bug #3a) — so an error here must surface as an error. A genuinely-
1418        // absent table is already handled above.
1419        let batches = backend.scan(request).await?;
1420        if batches.is_empty() {
1421            Ok(None)
1422        } else {
1423            Ok(Some(arrow::compute::concat_batches(
1424                &batches[0].schema(),
1425                &batches,
1426            )?))
1427        }
1428    }
1429
1430    /// Scan the main edge table as a stream. Returns `None` if table doesn't exist.
1431    pub async fn scan_main_edge_table_stream(
1432        &self,
1433        filter: Option<&str>,
1434    ) -> Result<
1435        Option<
1436            std::pin::Pin<Box<dyn futures::Stream<Item = Result<arrow_array::RecordBatch>> + Send>>,
1437        >,
1438    > {
1439        let backend = self.backend();
1440        let table_name = table_names::main_edge_table_name();
1441
1442        if !backend.table_exists(table_name).await.unwrap_or(false) {
1443            return Ok(None);
1444        }
1445
1446        let mut request = ScanRequest::all(table_name);
1447        if let Some(f) = filter {
1448            request = request.with_filter(f);
1449        }
1450
1451        let stream = backend.scan_stream(request).await?;
1452        Ok(Some(stream))
1453    }
1454
1455    /// Scan a per-label vertex table as a stream. Returns `None` if table doesn't exist.
1456    pub async fn scan_vertex_table_stream(
1457        &self,
1458        label: &str,
1459    ) -> Result<
1460        Option<
1461            std::pin::Pin<Box<dyn futures::Stream<Item = Result<arrow_array::RecordBatch>> + Send>>,
1462        >,
1463    > {
1464        let backend = self.backend();
1465        let table_name = table_names::vertex_table_name(label);
1466
1467        if !backend.table_exists(&table_name).await.unwrap_or(false) {
1468            return Ok(None);
1469        }
1470
1471        let stream = backend.scan_stream(ScanRequest::all(&table_name)).await?;
1472        Ok(Some(stream))
1473    }
1474
1475    /// Find a vertex VID by external ID. Uses pinned snapshot HWM if present.
1476    pub async fn find_vertex_by_ext_id(&self, ext_id: &str) -> Result<Option<Vid>> {
1477        MainVertexDataset::find_by_ext_id(self.backend(), ext_id, self.version_high_water_mark())
1478            .await
1479    }
1480
1481    /// Find labels for a vertex by VID. Uses pinned snapshot HWM if present.
1482    pub async fn find_vertex_labels_by_vid(&self, vid: Vid) -> Result<Option<Vec<String>>> {
1483        MainVertexDataset::find_labels_by_vid(self.backend(), vid, self.version_high_water_mark())
1484            .await
1485    }
1486
1487    /// Find edges from the main edge table by type names, optionally pushing
1488    /// a bounded endpoint vid set into the scan (review perf #5).
1489    pub async fn find_edges_by_type_names(
1490        &self,
1491        type_names: &[&str],
1492        endpoint_filter: Option<(crate::storage::main_edge::EndpointSide, &[Vid])>,
1493    ) -> Result<Vec<(Eid, Vid, Vid, String, uni_common::Properties)>> {
1494        MainEdgeDataset::find_edges_by_type_names(self.backend(), type_names, endpoint_filter).await
1495    }
1496
1497    /// Scan vertex candidates matching a filter. Returns VIDs where `_deleted = false`.
1498    pub async fn scan_vertex_candidates(
1499        &self,
1500        label: &str,
1501        filter: Option<&str>,
1502    ) -> Result<Vec<Vid>> {
1503        let backend = self.backend();
1504        let table_name = table_names::vertex_table_name(label);
1505
1506        if !backend.table_exists(&table_name).await.unwrap_or(false) {
1507            return Ok(Vec::new());
1508        }
1509
1510        let full_filter = match filter {
1511            Some(f) => format!("_deleted = false AND ({})", f),
1512            None => "_deleted = false".to_string(),
1513        };
1514
1515        let request = ScanRequest::all(&table_name)
1516            .with_filter(full_filter)
1517            .with_columns(vec!["_vid".to_string()]);
1518
1519        let batches = backend.scan(request).await?;
1520
1521        let mut vids = Vec::new();
1522        for batch in batches {
1523            let vid_col = batch
1524                .column_by_name("_vid")
1525                .ok_or(anyhow!("Missing _vid"))?
1526                .as_any()
1527                .downcast_ref::<UInt64Array>()
1528                .ok_or(anyhow!("Invalid _vid"))?;
1529            for i in 0..batch.num_rows() {
1530                vids.push(Vid::from(vid_col.value(i)));
1531            }
1532        }
1533        Ok(vids)
1534    }
1535
1536    pub fn vertex_dataset(&self, label: &str) -> Result<VertexDataset> {
1537        let schema = self.schema_manager.schema();
1538        let label_meta = schema
1539            .labels
1540            .get(label)
1541            .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
1542        let key = format!("vertices_{label}");
1543        match self.fork_branch_for(&key) {
1544            Some(branch) => Ok(VertexDataset::new_branched(
1545                &self.base_uri,
1546                label,
1547                label_meta.id,
1548                branch,
1549            )),
1550            None => Ok(VertexDataset::new(&self.base_uri, label, label_meta.id)),
1551        }
1552    }
1553
1554    #[cfg(feature = "lance-backend")]
1555    pub fn edge_dataset(
1556        &self,
1557        edge_type: &str,
1558        src_label: &str,
1559        dst_label: &str,
1560    ) -> Result<EdgeDataset> {
1561        let key = format!("edges_{edge_type}");
1562        match self.fork_branch_for(&key) {
1563            Some(branch) => Ok(EdgeDataset::new_branched(
1564                &self.base_uri,
1565                edge_type,
1566                src_label,
1567                dst_label,
1568                branch,
1569            )),
1570            None => Ok(EdgeDataset::new(
1571                &self.base_uri,
1572                edge_type,
1573                src_label,
1574                dst_label,
1575            )),
1576        }
1577    }
1578
1579    pub fn delta_dataset(&self, edge_type: &str, direction: &str) -> Result<DeltaDataset> {
1580        let key = format!("deltas_{edge_type}_{direction}");
1581        match self.fork_branch_for(&key) {
1582            Some(branch) => Ok(DeltaDataset::new_branched(
1583                &self.base_uri,
1584                edge_type,
1585                direction,
1586                branch,
1587            )),
1588            None => Ok(DeltaDataset::new(&self.base_uri, edge_type, direction)),
1589        }
1590    }
1591
1592    pub fn adjacency_dataset(
1593        &self,
1594        edge_type: &str,
1595        label: &str,
1596        direction: &str,
1597    ) -> Result<AdjacencyDataset> {
1598        let key = format!("adjacency_{direction}_{edge_type}_{label}");
1599        match self.fork_branch_for(&key) {
1600            Some(branch) => Ok(AdjacencyDataset::new_branched(
1601                &self.base_uri,
1602                edge_type,
1603                label,
1604                direction,
1605                branch,
1606            )),
1607            None => Ok(AdjacencyDataset::new(
1608                &self.base_uri,
1609                edge_type,
1610                label,
1611                direction,
1612            )),
1613        }
1614    }
1615
1616    /// Look up the branch name for a dataset under the active fork
1617    /// scope, if any. Returns `None` when not forked, or when the fork
1618    /// hasn't recorded a branch on this dataset yet (Phase 2 territory).
1619    fn fork_branch_for(&self, dataset_name: &str) -> Option<String> {
1620        self.fork_scope
1621            .as_ref()
1622            .and_then(|s| s.branch_for(dataset_name))
1623    }
1624
1625    /// Get the main vertex dataset for unified vertex storage.
1626    ///
1627    /// The main vertex dataset contains all vertices regardless of label,
1628    /// enabling fast ID-based lookups without knowing the label.
1629    pub fn main_vertex_dataset(&self) -> MainVertexDataset {
1630        MainVertexDataset::new(&self.base_uri)
1631    }
1632
1633    /// Get the main edge dataset for unified edge storage.
1634    ///
1635    /// The main edge dataset contains all edges regardless of type,
1636    /// enabling fast ID-based lookups without knowing the edge type.
1637    pub fn main_edge_dataset(&self) -> MainEdgeDataset {
1638        MainEdgeDataset::new(&self.base_uri)
1639    }
1640
1641    #[cfg(feature = "lance-backend")]
1642    pub fn uid_index(&self, label: &str) -> Result<UidIndex> {
1643        Ok(UidIndex::new(&self.base_uri, label))
1644    }
1645
1646    #[cfg(feature = "lance-backend")]
1647    pub async fn inverted_index(&self, label: &str, property: &str) -> Result<InvertedIndex> {
1648        let schema = self.schema_manager.schema();
1649        let config = schema
1650            .indexes
1651            .iter()
1652            .find_map(|idx| match idx {
1653                IndexDefinition::Inverted(cfg)
1654                    if cfg.label == label && cfg.property == property =>
1655                {
1656                    Some(cfg.clone())
1657                }
1658                _ => None,
1659            })
1660            .ok_or_else(|| anyhow!("Inverted index not found for {}.{}", label, property))?;
1661
1662        InvertedIndex::new(&self.base_uri, config).await
1663    }
1664
1665    pub async fn vector_search(
1666        &self,
1667        label: &str,
1668        property: &str,
1669        query: &[f32],
1670        k: usize,
1671        filter: Option<&str>,
1672        ctx: Option<&QueryContext>,
1673    ) -> Result<Vec<(Vid, f32)>> {
1674        use crate::backend::types::{DistanceMetric as BackendMetric, FilterExpr};
1675
1676        // Look up vector index config to get the correct distance metric.
1677        let schema = self.schema_manager.schema();
1678        let metric = schema
1679            .vector_index_for_property(label, property)
1680            .map(|config| config.metric.clone())
1681            .unwrap_or(DistanceMetric::L2);
1682
1683        let backend = self.backend.as_ref();
1684        let name = table_names::vertex_table_name(label);
1685
1686        let mut results = Vec::new();
1687
1688        // Only search if the table exists
1689        if backend.table_exists(&name).await.unwrap_or(false) {
1690            let backend_metric = match &metric {
1691                DistanceMetric::L2 => BackendMetric::L2,
1692                DistanceMetric::Cosine => BackendMetric::Cosine,
1693                DistanceMetric::Dot => BackendMetric::Dot,
1694                _ => BackendMetric::L2,
1695            };
1696
1697            // Build combined filter: _deleted = false + optional user filter + HWM
1698            let mut filter_parts = vec![Self::build_active_filter(filter)];
1699            if ctx.is_some()
1700                && let Some(hwm) = self.version_high_water_mark()
1701            {
1702                filter_parts.push(format!("_version <= {}", hwm));
1703            }
1704            let combined_filter = FilterExpr::Sql(filter_parts.join(" AND "));
1705
1706            let batches = backend
1707                .vector_search(&name, property, query, k, backend_metric, combined_filter)
1708                .await?;
1709
1710            results = extract_vid_score_pairs(&batches, "_vid", "_distance")?;
1711        }
1712
1713        // Merge L0 buffer vertices into results for visibility of unflushed data.
1714        if let Some(qctx) = ctx {
1715            merge_l0_into_vector_results(&mut results, qctx, label, property, query, k, &metric);
1716        }
1717
1718        Ok(results)
1719    }
1720
1721    /// Perform a full-text search with BM25 scoring.
1722    ///
1723    /// Returns vertices matching the search query along with their BM25 scores.
1724    /// Results are sorted by score descending (most relevant first).
1725    ///
1726    /// # Arguments
1727    /// * `label` - The label to search within
1728    /// * `property` - The property column to search (must have FTS index)
1729    /// * `query` - The search query text
1730    /// * `k` - Maximum number of results to return
1731    /// * `filter` - Optional Lance filter expression
1732    /// * `ctx` - Optional query context for visibility checks
1733    ///
1734    /// # Returns
1735    /// Vector of (Vid, score) tuples, where score is the BM25 relevance score.
1736    pub async fn fts_search(
1737        &self,
1738        label: &str,
1739        property: &str,
1740        query: &str,
1741        k: usize,
1742        filter: Option<&str>,
1743        ctx: Option<&QueryContext>,
1744    ) -> Result<Vec<(Vid, f32)>> {
1745        use crate::backend::types::FilterExpr;
1746
1747        let backend = self.backend.as_ref();
1748        let name = table_names::vertex_table_name(label);
1749
1750        let mut results = if backend.table_exists(&name).await.unwrap_or(false) {
1751            // Build combined filter: _deleted = false + optional user filter + HWM
1752            let mut filter_parts = vec![Self::build_active_filter(filter)];
1753            if ctx.is_some()
1754                && let Some(hwm) = self.version_high_water_mark()
1755            {
1756                filter_parts.push(format!("_version <= {}", hwm));
1757            }
1758            let combined_filter = FilterExpr::Sql(filter_parts.join(" AND "));
1759
1760            let batches = backend
1761                .full_text_search(&name, property, query, k, combined_filter)
1762                .await?;
1763
1764            let mut fts_results = extract_vid_score_pairs(&batches, "_vid", "_score")?;
1765            // Results should already be sorted by score from backend, but ensure descending order
1766            fts_results.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
1767            fts_results
1768        } else {
1769            Vec::new()
1770        };
1771
1772        // Merge L0 buffer vertices for visibility of unflushed data.
1773        if let Some(qctx) = ctx {
1774            merge_l0_into_fts_results(&mut results, qctx, label, property, query, k);
1775        }
1776
1777        Ok(results)
1778    }
1779
1780    #[cfg(feature = "lance-backend")]
1781    pub async fn get_vertex_by_uid(&self, uid: &UniId, label: &str) -> Result<Option<Vid>> {
1782        let index = self.uid_index(label)?;
1783        index.get_vid(uid).await
1784    }
1785
1786    #[cfg(feature = "lance-backend")]
1787    pub async fn insert_vertex_with_uid(&self, label: &str, vid: Vid, uid: UniId) -> Result<()> {
1788        let index = self.uid_index(label)?;
1789        index.write_mapping(&[(uid, vid)]).await
1790    }
1791
1792    pub async fn load_subgraph(
1793        &self,
1794        start_vids: &[Vid],
1795        edge_types: &[u32],
1796        max_hops: usize,
1797        direction: GraphDirection,
1798        l0: Option<&L0Buffer>,
1799    ) -> Result<WorkingGraph> {
1800        let mut graph = WorkingGraph::new();
1801        let schema = self.schema_manager.schema();
1802
1803        // Build maps for ID lookups
1804        let label_map: HashMap<u16, String> = schema
1805            .labels
1806            .values()
1807            .map(|meta| {
1808                (
1809                    meta.id,
1810                    schema.label_name_by_id(meta.id).unwrap().to_owned(),
1811                )
1812            })
1813            .collect();
1814
1815        let edge_type_map: HashMap<u32, String> = schema
1816            .edge_types
1817            .values()
1818            .map(|meta| {
1819                (
1820                    meta.id,
1821                    schema.edge_type_name_by_id(meta.id).unwrap().to_owned(),
1822                )
1823            })
1824            .collect();
1825
1826        let target_edge_types: HashSet<u32> = edge_types.iter().cloned().collect();
1827
1828        // Initialize frontier
1829        let mut frontier: Vec<Vid> = start_vids.to_vec();
1830        let mut visited: HashSet<Vid> = HashSet::new();
1831
1832        // Add start vertices to graph
1833        for &vid in start_vids {
1834            graph.add_vertex(vid);
1835        }
1836
1837        for _hop in 0..max_hops {
1838            let mut next_frontier = HashSet::new();
1839
1840            for &vid in &frontier {
1841                if visited.contains(&vid) {
1842                    continue;
1843                }
1844                visited.insert(vid);
1845                graph.add_vertex(vid);
1846
1847                // For each edge type we want to traverse
1848                for &etype_id in &target_edge_types {
1849                    let etype_name = edge_type_map
1850                        .get(&etype_id)
1851                        .ok_or_else(|| anyhow!("Unknown edge type ID: {}", etype_id))?;
1852
1853                    // Determine directions
1854                    // Storage direction: "fwd" or "bwd".
1855                    // Query direction: Outgoing -> "fwd", Incoming -> "bwd".
1856                    let (dir_str, neighbor_is_dst) = match direction {
1857                        GraphDirection::Outgoing => ("fwd", true),
1858                        GraphDirection::Incoming => ("bwd", false),
1859                    };
1860
1861                    let mut edges: HashMap<Eid, EdgeState> = HashMap::new();
1862
1863                    // 1. L2: Adjacency (Base)
1864                    // In the new storage model, VIDs don't embed label info.
1865                    // We need to try all labels to find the adjacency data.
1866                    // Edge version from snapshot (reserved for future version filtering)
1867                    let _edge_ver = self
1868                        .pinned_snapshot
1869                        .as_ref()
1870                        .and_then(|s| s.edges.get(etype_name).map(|es| es.lance_version));
1871
1872                    // Try each label until we find adjacency data
1873                    let backend = self.backend();
1874                    for current_src_label in label_map.values() {
1875                        let adj_ds =
1876                            match self.adjacency_dataset(etype_name, current_src_label, dir_str) {
1877                                Ok(ds) => ds,
1878                                Err(_) => continue,
1879                            };
1880                        if let Some((neighbors, eids)) =
1881                            adj_ds.read_adjacency_backend(backend, vid).await?
1882                        {
1883                            for (n, eid) in neighbors.into_iter().zip(eids) {
1884                                edges.insert(
1885                                    eid,
1886                                    EdgeState {
1887                                        neighbor: n,
1888                                        version: 0,
1889                                        deleted: false,
1890                                    },
1891                                );
1892                            }
1893                            break; // Found adjacency data for this vid, no need to try other labels
1894                        }
1895                    }
1896
1897                    // 2. L1: Delta
1898                    let delta_ds = self.delta_dataset(etype_name, dir_str)?;
1899                    let delta_entries = delta_ds
1900                        .read_deltas(backend, vid, &schema, self.snapshot_version_hwm())
1901                        .await?;
1902                    Self::apply_delta_to_edges(&mut edges, delta_entries, neighbor_is_dst);
1903
1904                    // 3. L0: Buffer
1905                    if let Some(l0) = l0 {
1906                        Self::apply_l0_to_edges(&mut edges, l0, vid, etype_id, direction);
1907                    }
1908
1909                    // Add resulting edges to graph
1910                    Self::add_edges_to_graph(
1911                        &mut graph,
1912                        edges,
1913                        vid,
1914                        etype_id,
1915                        neighbor_is_dst,
1916                        &visited,
1917                        &mut next_frontier,
1918                    );
1919                }
1920            }
1921            frontier = next_frontier.into_iter().collect();
1922
1923            // Early termination: if frontier is empty, no more vertices to explore
1924            if frontier.is_empty() {
1925                break;
1926            }
1927        }
1928
1929        Ok(graph)
1930    }
1931
1932    /// Apply delta entries to edge state map, handling version conflicts.
1933    fn apply_delta_to_edges(
1934        edges: &mut HashMap<Eid, EdgeState>,
1935        delta_entries: Vec<crate::storage::delta::L1Entry>,
1936        neighbor_is_dst: bool,
1937    ) {
1938        for entry in delta_entries {
1939            let neighbor = if neighbor_is_dst {
1940                entry.dst_vid
1941            } else {
1942                entry.src_vid
1943            };
1944            let current_ver = edges.get(&entry.eid).map(|s| s.version).unwrap_or(0);
1945
1946            if entry.version > current_ver {
1947                edges.insert(
1948                    entry.eid,
1949                    EdgeState {
1950                        neighbor,
1951                        version: entry.version,
1952                        deleted: matches!(entry.op, Op::Delete),
1953                    },
1954                );
1955            }
1956        }
1957    }
1958
1959    /// Apply L0 buffer edges and tombstones to edge state map.
1960    fn apply_l0_to_edges(
1961        edges: &mut HashMap<Eid, EdgeState>,
1962        l0: &L0Buffer,
1963        vid: Vid,
1964        etype_id: u32,
1965        direction: GraphDirection,
1966    ) {
1967        let l0_neighbors = l0.get_neighbors(vid, etype_id, direction);
1968        for (neighbor, eid, ver) in l0_neighbors {
1969            let current_ver = edges.get(&eid).map(|s| s.version).unwrap_or(0);
1970            if ver > current_ver {
1971                edges.insert(
1972                    eid,
1973                    EdgeState {
1974                        neighbor,
1975                        version: ver,
1976                        deleted: false,
1977                    },
1978                );
1979            }
1980        }
1981
1982        // Check tombstones in L0
1983        for (eid, state) in edges.iter_mut() {
1984            if l0.is_tombstoned(*eid) {
1985                state.deleted = true;
1986            }
1987        }
1988    }
1989
1990    /// Add non-deleted edges to graph and collect next frontier.
1991    fn add_edges_to_graph(
1992        graph: &mut WorkingGraph,
1993        edges: HashMap<Eid, EdgeState>,
1994        vid: Vid,
1995        etype_id: u32,
1996        neighbor_is_dst: bool,
1997        visited: &HashSet<Vid>,
1998        next_frontier: &mut HashSet<Vid>,
1999    ) {
2000        for (eid, state) in edges {
2001            if state.deleted {
2002                continue;
2003            }
2004            graph.add_vertex(state.neighbor);
2005
2006            if !visited.contains(&state.neighbor) {
2007                next_frontier.insert(state.neighbor);
2008            }
2009
2010            if neighbor_is_dst {
2011                graph.add_edge(vid, state.neighbor, eid, etype_id);
2012            } else {
2013                graph.add_edge(state.neighbor, vid, eid, etype_id);
2014            }
2015        }
2016    }
2017}
2018
2019/// Extracts `(Vid, f32)` pairs from record batches using the given VID and score column names.
2020fn extract_vid_score_pairs(
2021    batches: &[arrow_array::RecordBatch],
2022    vid_column: &str,
2023    score_column: &str,
2024) -> Result<Vec<(Vid, f32)>> {
2025    let mut results = Vec::new();
2026    for batch in batches {
2027        let vid_col = batch
2028            .column_by_name(vid_column)
2029            .ok_or_else(|| anyhow!("Missing {} column", vid_column))?
2030            .as_any()
2031            .downcast_ref::<UInt64Array>()
2032            .ok_or_else(|| anyhow!("Invalid {} column type", vid_column))?;
2033
2034        let score_col = batch
2035            .column_by_name(score_column)
2036            .ok_or_else(|| anyhow!("Missing {} column", score_column))?
2037            .as_any()
2038            .downcast_ref::<Float32Array>()
2039            .ok_or_else(|| anyhow!("Invalid {} column type", score_column))?;
2040
2041        for i in 0..batch.num_rows() {
2042            results.push((Vid::from(vid_col.value(i)), score_col.value(i)));
2043        }
2044    }
2045    Ok(results)
2046}
2047
2048/// Extracts a `Vec<f32>` from a JSON property value.
2049///
2050/// Returns `None` if the property is missing, not an array, or contains
2051/// non-numeric elements.
2052fn extract_embedding_from_props(
2053    props: &uni_common::Properties,
2054    property: &str,
2055) -> Option<Vec<f32>> {
2056    let arr = props.get(property)?.as_array()?;
2057    arr.iter().map(|v| v.as_f64().map(|f| f as f32)).collect()
2058}
2059
2060/// Merges L0 buffer vertices into LanceDB vector search results.
2061///
2062/// Visits L0 buffers in precedence order (pending flush → main → transaction),
2063/// collects tombstoned VIDs and candidate embeddings, then merges them with the
2064/// existing LanceDB results so that:
2065/// - Tombstoned VIDs are removed (unless re-created in a later L0).
2066/// - VIDs present in both L0 and LanceDB use the L0 distance.
2067/// - New L0-only VIDs are appended.
2068/// - Results are re-sorted by distance ascending and truncated to `k`.
2069fn merge_l0_into_vector_results(
2070    results: &mut Vec<(Vid, f32)>,
2071    ctx: &QueryContext,
2072    label: &str,
2073    property: &str,
2074    query: &[f32],
2075    k: usize,
2076    metric: &DistanceMetric,
2077) {
2078    // Collect all L0 buffers in precedence order (earliest first, last writer wins).
2079    let mut buffers: Vec<Arc<parking_lot::RwLock<L0Buffer>>> =
2080        ctx.pending_flush_l0s.iter().map(Arc::clone).collect();
2081    buffers.push(Arc::clone(&ctx.l0));
2082    if let Some(ref txn) = ctx.transaction_l0 {
2083        buffers.push(Arc::clone(txn));
2084    }
2085
2086    // Maps VID → distance for L0 candidates (last writer wins).
2087    let mut l0_candidates: HashMap<Vid, f32> = HashMap::new();
2088    // Tombstoned VIDs across all L0 buffers.
2089    let mut tombstoned: HashSet<Vid> = HashSet::new();
2090
2091    for buf_arc in &buffers {
2092        let buf = buf_arc.read();
2093
2094        // Accumulate tombstones.
2095        for &vid in &buf.vertex_tombstones {
2096            tombstoned.insert(vid);
2097        }
2098
2099        // Scan vertices with the target label.
2100        for (&vid, labels) in &buf.vertex_labels {
2101            if !labels.iter().any(|l| l == label) {
2102                continue;
2103            }
2104            if let Some(props) = buf.vertex_properties.get(&vid)
2105                && let Some(emb) = extract_embedding_from_props(props, property)
2106            {
2107                if emb.len() != query.len() {
2108                    continue; // dimension mismatch
2109                }
2110                let dist = metric.compute_distance(&emb, query);
2111                // Last writer wins: later buffer overwrites earlier.
2112                l0_candidates.insert(vid, dist);
2113                // If re-created in a later L0, remove from tombstones.
2114                tombstoned.remove(&vid);
2115            }
2116        }
2117    }
2118
2119    // If no L0 activity affects this search, skip merge.
2120    if l0_candidates.is_empty() && tombstoned.is_empty() {
2121        return;
2122    }
2123
2124    // Remove tombstoned VIDs from LanceDB results.
2125    results.retain(|(vid, _)| !tombstoned.contains(vid));
2126
2127    // Overwrite or append L0 candidates.
2128    for (vid, dist) in &l0_candidates {
2129        if let Some(existing) = results.iter_mut().find(|(v, _)| v == vid) {
2130            existing.1 = *dist;
2131        } else {
2132            results.push((*vid, *dist));
2133        }
2134    }
2135
2136    // Re-sort by distance ascending.
2137    results.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
2138    results.truncate(k);
2139}
2140
2141/// Computes a simple token-overlap relevance score between a query and text.
2142///
2143/// Returns the fraction of query tokens found in the text (case-insensitive),
2144/// producing a score in [0.0, 1.0]. Sufficient for the small L0 buffer.
2145fn compute_text_relevance(query: &str, text: &str) -> f32 {
2146    let query_tokens: HashSet<String> =
2147        query.split_whitespace().map(|t| t.to_lowercase()).collect();
2148    if query_tokens.is_empty() {
2149        return 0.0;
2150    }
2151    let text_tokens: HashSet<String> = text.split_whitespace().map(|t| t.to_lowercase()).collect();
2152    let hits = query_tokens
2153        .iter()
2154        .filter(|t| text_tokens.contains(t.as_str()))
2155        .count();
2156    hits as f32 / query_tokens.len() as f32
2157}
2158
2159/// Extracts a string slice from a property value.
2160fn extract_text_from_props<'a>(
2161    props: &'a uni_common::Properties,
2162    property: &str,
2163) -> Option<&'a str> {
2164    props.get(property)?.as_str()
2165}
2166
2167/// Merges L0 buffer vertices into LanceDB full-text search results.
2168///
2169/// Follows the same pattern as [`merge_l0_into_vector_results`]: visits L0
2170/// buffers in precedence order, collects tombstoned VIDs and text-match
2171/// candidates, then merges them so that:
2172/// - Tombstoned VIDs are removed (unless re-created in a later L0).
2173/// - VIDs present in both L0 and LanceDB use the L0 score.
2174/// - New L0-only VIDs are appended.
2175/// - Results are re-sorted by score **descending** and truncated to `k`.
2176fn merge_l0_into_fts_results(
2177    results: &mut Vec<(Vid, f32)>,
2178    ctx: &QueryContext,
2179    label: &str,
2180    property: &str,
2181    query: &str,
2182    k: usize,
2183) {
2184    // Collect all L0 buffers in precedence order (earliest first, last writer wins).
2185    let mut buffers: Vec<Arc<parking_lot::RwLock<L0Buffer>>> =
2186        ctx.pending_flush_l0s.iter().map(Arc::clone).collect();
2187    buffers.push(Arc::clone(&ctx.l0));
2188    if let Some(ref txn) = ctx.transaction_l0 {
2189        buffers.push(Arc::clone(txn));
2190    }
2191
2192    // Maps VID → relevance score for L0 candidates (last writer wins).
2193    let mut l0_candidates: HashMap<Vid, f32> = HashMap::new();
2194    // Tombstoned VIDs across all L0 buffers.
2195    let mut tombstoned: HashSet<Vid> = HashSet::new();
2196
2197    for buf_arc in &buffers {
2198        let buf = buf_arc.read();
2199
2200        // Accumulate tombstones.
2201        for &vid in &buf.vertex_tombstones {
2202            tombstoned.insert(vid);
2203        }
2204
2205        // Scan vertices with the target label.
2206        for (&vid, labels) in &buf.vertex_labels {
2207            if !labels.iter().any(|l| l == label) {
2208                continue;
2209            }
2210            if let Some(props) = buf.vertex_properties.get(&vid)
2211                && let Some(text) = extract_text_from_props(props, property)
2212            {
2213                let score = compute_text_relevance(query, text);
2214                if score > 0.0 {
2215                    // Last writer wins: later buffer overwrites earlier.
2216                    l0_candidates.insert(vid, score);
2217                }
2218                // If re-created in a later L0, remove from tombstones.
2219                tombstoned.remove(&vid);
2220            }
2221        }
2222    }
2223
2224    // If no L0 activity affects this search, skip merge.
2225    if l0_candidates.is_empty() && tombstoned.is_empty() {
2226        return;
2227    }
2228
2229    // Remove tombstoned VIDs from LanceDB results.
2230    results.retain(|(vid, _)| !tombstoned.contains(vid));
2231
2232    // Overwrite or append L0 candidates.
2233    for (vid, score) in &l0_candidates {
2234        if let Some(existing) = results.iter_mut().find(|(v, _)| v == vid) {
2235            existing.1 = *score;
2236        } else {
2237            results.push((*vid, *score));
2238        }
2239    }
2240
2241    // Re-sort by score descending (higher relevance first).
2242    results.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
2243    results.truncate(k);
2244}