Skip to main content

uni_store/storage/
manager.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::backend::StorageBackend;
5#[cfg(feature = "lance-backend")]
6use crate::backend::lance::LanceDbBackend;
7use crate::backend::table_names;
8use crate::backend::types::ScanRequest;
9use crate::compaction::{CompactionStats, CompactionStatus, CompactionTask};
10use crate::runtime::WorkingGraph;
11use crate::runtime::context::QueryContext;
12use crate::runtime::l0::L0Buffer;
13use crate::storage::adjacency::AdjacencyDataset;
14use crate::storage::compaction::Compactor;
15use crate::storage::delta::{DeltaDataset, ENTRY_SIZE_ESTIMATE, Op};
16use crate::storage::direction::Direction;
17#[cfg(feature = "lance-backend")]
18use crate::storage::edge::EdgeDataset;
19#[cfg(feature = "lance-backend")]
20use crate::storage::index::UidIndex;
21#[cfg(feature = "lance-backend")]
22use crate::storage::inverted_index::InvertedIndex;
23use crate::storage::main_edge::MainEdgeDataset;
24use crate::storage::main_vertex::MainVertexDataset;
25use crate::storage::vertex::VertexDataset;
26use anyhow::{Result, anyhow};
27use arrow_array::{Array, Float32Array, TimestampNanosecondArray, UInt64Array};
28use object_store::ObjectStore;
29#[cfg(feature = "lance-backend")]
30use object_store::local::LocalFileSystem;
31use parking_lot::RwLock;
32use std::collections::{HashMap, HashSet};
33use std::sync::{Arc, Mutex};
34use std::time::{Duration, SystemTime, UNIX_EPOCH};
35use tracing::warn;
36use uni_common::config::UniConfig;
37#[cfg(feature = "lance-backend")]
38use uni_common::core::id::UniId;
39use uni_common::core::id::{Eid, Vid};
40#[cfg(feature = "lance-backend")]
41use uni_common::core::schema::IndexDefinition;
42use uni_common::core::schema::{DistanceMetric, SchemaManager};
43use uni_common::sync::acquire_mutex;
44
45use crate::snapshot::manager::SnapshotManager;
46use crate::storage::IndexManager;
47use crate::storage::adjacency_manager::AdjacencyManager;
48use crate::storage::resilient_store::ResilientObjectStore;
49
50use uni_common::core::snapshot::SnapshotManifest;
51
52use uni_common::graph::simple_graph::Direction as GraphDirection;
53
54/// Edge state during subgraph loading - tracks version and deletion status.
55struct EdgeState {
56    neighbor: Vid,
57    version: u64,
58    deleted: bool,
59}
60
61pub struct StorageManager {
62    base_uri: String,
63    store: Arc<dyn ObjectStore>,
64    schema_manager: Arc<SchemaManager>,
65    snapshot_manager: Arc<SnapshotManager>,
66    adjacency_manager: Arc<AdjacencyManager>,
67    pub config: UniConfig,
68    pub compaction_status: Arc<Mutex<CompactionStatus>>,
69    /// Counter of in-flight `flush_to_l1` operations. Compaction skips
70    /// delta-clear when this is non-zero to avoid wiping rows a flush is
71    /// about to append. Counter (not bool) so multiple async flushes can
72    /// be in flight concurrently.
73    pub flush_in_progress: std::sync::atomic::AtomicUsize,
74    /// Optional pinned snapshot for time-travel
75    pinned_snapshot: Option<SnapshotManifest>,
76    /// Optional row-version pin for transaction snapshot reads (C2).
77    ///
78    /// When set, L1 scans filter to `_version <= hwm` exactly like a pinned
79    /// snapshot, but WITHOUT a manifest: a read-write transaction pins the
80    /// version counter observed at begin (`SnapshotView.started_at_version`)
81    /// so an L0→L1 flush completing mid-transaction cannot leak
82    /// post-snapshot rows into its scans. Mutually exclusive with
83    /// `pinned_snapshot`.
84    pinned_version_hwm: Option<u64>,
85    /// Optional fork scope for branch-aware reads (Phase 1 read-only).
86    ///
87    /// Mutually exclusive with `pinned_snapshot`: a single
88    /// `StorageManager` is either pinned to a snapshot or scoped to a
89    /// fork, never both. Phase 4's `pin_to_version` on a forked session
90    /// builds a separate combined manager out of band; Phase 1 forbids
91    /// mixing.
92    fork_scope: Option<Arc<crate::fork::ForkScope>>,
93    /// Pluggable storage backend.
94    backend: Arc<dyn StorageBackend>,
95    /// In-memory VID-to-labels index for O(1) label lookups.
96    ///
97    /// Always present: populated at startup via [`Self::rebuild_vid_labels_index`]
98    /// and kept current at flush time. Traversal-time label predicates
99    /// (`MATCH (a)-[r]->(b:B)`) read it to resolve labels for vertices that
100    /// have aged out of L0 into Lance storage — notably on forks, whose data
101    /// is flushed to Lance before branching.
102    vid_labels_index: Arc<parking_lot::RwLock<crate::storage::vid_labels::VidLabelsIndex>>,
103}
104
105/// RAII counter increment for `StorageManager.flush_in_progress`.
106///
107/// Acquired during the rotate phase of a flush (see
108/// `runtime::writer::flush_l0_rotate`) and dropped when the full
109/// rotate/stream/finalize pipeline completes. Compaction's delta-clear
110/// gate skips while this counter is non-zero, so the counter must
111/// reflect "flush has started, has not completed" — including any
112/// async stream phase running on a spawned task.
113pub struct FlushInProgressGuard {
114    storage: Arc<StorageManager>,
115}
116
117impl FlushInProgressGuard {
118    pub fn new(storage: &Arc<StorageManager>) -> Self {
119        storage
120            .flush_in_progress
121            .fetch_add(1, std::sync::atomic::Ordering::AcqRel);
122        Self {
123            storage: storage.clone(),
124        }
125    }
126}
127
128impl Drop for FlushInProgressGuard {
129    fn drop(&mut self) {
130        // M-PANIC-IS-STOP: must not panic in Drop. Atomic op cannot fail.
131        self.storage
132            .flush_in_progress
133            .fetch_sub(1, std::sync::atomic::Ordering::AcqRel);
134    }
135}
136
137/// Whether a Lance error represents a commit conflict that retrying may
138/// resolve. These fire under async-flush when ≥2 streams concurrently try
139/// to create the same table OR when an Append races with a still-in-progress
140/// Overwrite (create_table). See the Lance commit-conflict-resolver in
141/// `lance-3.0.1/src/io/commit/conflict_resolver.rs`.
142fn is_lance_conflict(err: &anyhow::Error) -> bool {
143    let msg = err.to_string();
144    msg.contains("Incompatible transaction") || msg.contains("conflict")
145}
146
147/// Runs `op` with exponential-backoff retry on Lance commit conflicts.
148/// Up to 10 attempts (~10s worst case); backoff is 1ms, 2ms, 4ms, ...,
149/// 512ms. Non-conflict errors return immediately. `op` is re-invoked each
150/// attempt so it can re-check table existence and adjust strategy.
151async fn retry_on_lance_conflict<F, Fut>(mut op: F) -> anyhow::Result<()>
152where
153    F: FnMut() -> Fut,
154    Fut: std::future::Future<Output = anyhow::Result<()>>,
155{
156    for attempt in 0u32..10 {
157        match op().await {
158            Ok(()) => return Ok(()),
159            Err(e) => {
160                if !is_lance_conflict(&e) || attempt == 9 {
161                    return Err(e);
162                }
163                let backoff_ms = 1u64 << attempt;
164                tokio::time::sleep(std::time::Duration::from_millis(backoff_ms)).await;
165            }
166        }
167    }
168    unreachable!("retry loop exits via Ok or Err")
169}
170
171/// MergeInsert sibling of `write_batch_with_lance_conflict_retry`.
172///
173/// Source `batch` must contain the join columns in `on` plus any
174/// columns to update. Matched rows have `WhenMatched::UpdateAll`
175/// applied; unmatched source rows are dropped (partial writes never
176/// INSERT). Returns an error if the target table does not exist.
177/// Retries on Lance commit conflicts via `retry_on_lance_conflict`.
178/// RecordBatch clones are cheap (column data is Arc'd).
179pub async fn merge_insert_batch_with_lance_conflict_retry(
180    backend: &dyn crate::backend::StorageBackend,
181    table_name: &str,
182    batch: arrow_array::RecordBatch,
183    on: &[&str],
184) -> anyhow::Result<()> {
185    retry_on_lance_conflict(|| async {
186        let exists = backend.table_exists(table_name).await?;
187        if !exists {
188            anyhow::bail!(
189                "merge_insert target table '{}' does not exist (partial writes \
190                 require the row to already be present; CREATE goes through Append)",
191                table_name
192            );
193        }
194        backend
195            .merge_insert(table_name, on, vec![batch.clone()])
196            .await
197    })
198    .await
199}
200
201/// Race-safe write: creates the table if missing, otherwise appends.
202/// Each attempt re-checks `table_exists` and adjusts strategy: Append if
203/// now-exists, Create if still-missing. Retries on Lance commit conflicts
204/// via `retry_on_lance_conflict`.
205///
206/// Used by every dataset's `write_batch` helper to absorb the Lance
207/// commit-conflict-resolver behavior. RecordBatch clones are cheap
208/// (column data is Arc'd).
209pub async fn write_batch_with_lance_conflict_retry(
210    backend: &dyn crate::backend::StorageBackend,
211    table_name: &str,
212    batch: arrow_array::RecordBatch,
213) -> anyhow::Result<()> {
214    use crate::backend::types::WriteMode;
215    retry_on_lance_conflict(|| async {
216        let exists = backend.table_exists(table_name).await?;
217        if exists {
218            backend
219                .write(table_name, vec![batch.clone()], WriteMode::Append)
220                .await
221        } else {
222            backend.create_table(table_name, vec![batch.clone()]).await
223        }
224    })
225    .await
226}
227
228/// Helper to manage compaction_in_progress flag
229struct CompactionGuard {
230    status: Arc<Mutex<CompactionStatus>>,
231}
232
233impl CompactionGuard {
234    fn new(status: Arc<Mutex<CompactionStatus>>) -> Option<Self> {
235        let mut s = acquire_mutex(&status, "compaction_status").ok()?;
236        if s.compaction_in_progress {
237            return None;
238        }
239        s.compaction_in_progress = true;
240        Some(Self {
241            status: status.clone(),
242        })
243    }
244}
245
246impl Drop for CompactionGuard {
247    fn drop(&mut self) {
248        // CRITICAL: Never panic in Drop - panicking in drop() = process ABORT.
249        // See issue #18/#150. If the lock is poisoned, log and continue gracefully.
250        match uni_common::sync::acquire_mutex(&self.status, "compaction_status") {
251            Ok(mut s) => {
252                s.compaction_in_progress = false;
253                s.last_compaction = Some(std::time::SystemTime::now());
254            }
255            Err(e) => {
256                // Lock is poisoned but we're in Drop - cannot panic.
257                // Log the error and continue. System state may be inconsistent but at least
258                // we don't abort the process.
259                log::error!(
260                    "CompactionGuard drop failed to acquire poisoned lock: {}. \
261                     Compaction status may be inconsistent. Issue #18/#150",
262                    e
263                );
264            }
265        }
266    }
267}
268
269impl StorageManager {
270    /// Create a new StorageManager with a pre-configured backend.
271    pub async fn new_with_backend(
272        base_uri: &str,
273        store: Arc<dyn ObjectStore>,
274        backend: Arc<dyn StorageBackend>,
275        schema_manager: Arc<SchemaManager>,
276        config: UniConfig,
277    ) -> Result<Self> {
278        let resilient_store: Arc<dyn ObjectStore> = Arc::new(ResilientObjectStore::new(
279            store,
280            config.object_store.clone(),
281        ));
282
283        let snapshot_manager = Arc::new(SnapshotManager::new(resilient_store.clone()));
284
285        // Perform crash recovery for all known table patterns
286        Self::recover_all_staging_tables(backend.as_ref(), &schema_manager).await?;
287
288        let mut sm = Self {
289            base_uri: base_uri.to_string(),
290            store: resilient_store,
291            schema_manager,
292            snapshot_manager,
293            adjacency_manager: Arc::new(AdjacencyManager::new(config.cache_size)),
294            config,
295            compaction_status: Arc::new(Mutex::new(CompactionStatus::default())),
296            flush_in_progress: std::sync::atomic::AtomicUsize::new(0),
297            pinned_snapshot: None,
298            pinned_version_hwm: None,
299            fork_scope: None,
300            backend,
301            vid_labels_index: Arc::new(parking_lot::RwLock::new(
302                crate::storage::vid_labels::VidLabelsIndex::new(),
303            )),
304        };
305
306        // Rebuild VidLabelsIndex from persisted vertices. A failure leaves the
307        // empty index in place; flush-time updates then repopulate it
308        // incrementally, so reads degrade rather than break.
309        if let Err(e) = sm.rebuild_vid_labels_index().await {
310            warn!(
311                "Failed to rebuild VidLabelsIndex on startup: {}. Falling back to storage queries.",
312                e
313            );
314        }
315
316        Ok(sm)
317    }
318
319    /// Create a new StorageManager with LanceDB integration.
320    #[cfg(feature = "lance-backend")]
321    pub async fn new(base_uri: &str, schema_manager: Arc<SchemaManager>) -> Result<Self> {
322        Self::new_with_config(base_uri, schema_manager, UniConfig::default()).await
323    }
324
325    /// Create a new StorageManager with custom cache size.
326    #[cfg(feature = "lance-backend")]
327    pub async fn new_with_cache(
328        base_uri: &str,
329        schema_manager: Arc<SchemaManager>,
330        adjacency_cache_size: usize,
331    ) -> Result<Self> {
332        let config = UniConfig {
333            cache_size: adjacency_cache_size,
334            ..Default::default()
335        };
336        Self::new_with_config(base_uri, schema_manager, config).await
337    }
338
339    /// Create a new StorageManager with custom configuration.
340    #[cfg(feature = "lance-backend")]
341    pub async fn new_with_config(
342        base_uri: &str,
343        schema_manager: Arc<SchemaManager>,
344        config: UniConfig,
345    ) -> Result<Self> {
346        let store = Self::build_store_from_uri(base_uri)?;
347        Self::new_with_store_and_config(base_uri, store, schema_manager, config).await
348    }
349
350    /// Create a new StorageManager using an already-constructed object store.
351    #[cfg(feature = "lance-backend")]
352    pub async fn new_with_store_and_config(
353        base_uri: &str,
354        store: Arc<dyn ObjectStore>,
355        schema_manager: Arc<SchemaManager>,
356        config: UniConfig,
357    ) -> Result<Self> {
358        Self::new_with_store_and_storage_options(base_uri, store, schema_manager, config, None)
359            .await
360    }
361
362    /// Create a new StorageManager with LanceDB storage options.
363    #[cfg(feature = "lance-backend")]
364    pub async fn new_with_store_and_storage_options(
365        base_uri: &str,
366        store: Arc<dyn ObjectStore>,
367        schema_manager: Arc<SchemaManager>,
368        config: UniConfig,
369        lancedb_storage_options: Option<HashMap<String, String>>,
370    ) -> Result<Self> {
371        let backend = Arc::new(LanceDbBackend::connect(base_uri, lancedb_storage_options).await?);
372        Self::new_with_backend(base_uri, store, backend, schema_manager, config).await
373    }
374
375    /// Recover all staging tables for known table patterns.
376    ///
377    /// This runs on startup to handle crash recovery. It checks for staging tables
378    /// for all vertex labels, adjacency tables, delta tables, and main tables.
379    async fn recover_all_staging_tables(
380        backend: &dyn StorageBackend,
381        schema_manager: &SchemaManager,
382    ) -> Result<()> {
383        let schema = schema_manager.schema();
384
385        // Recover main vertex and edge tables
386        backend
387            .recover_staging(table_names::main_vertex_table_name())
388            .await?;
389        backend
390            .recover_staging(table_names::main_edge_table_name())
391            .await?;
392
393        // Recover per-label vertex tables
394        for label in schema.labels.keys() {
395            let name = table_names::vertex_table_name(label);
396            backend.recover_staging(&name).await?;
397        }
398
399        // Recover adjacency and delta tables for each edge type and direction
400        for edge_type in schema.edge_types.keys() {
401            for direction in &["fwd", "bwd"] {
402                // Recover delta tables
403                let delta_name = table_names::delta_table_name(edge_type, direction);
404                backend.recover_staging(&delta_name).await?;
405
406                // Recover adjacency tables for each label
407                for _label in schema.labels.keys() {
408                    let adj_name = table_names::adjacency_table_name(edge_type, direction);
409                    backend.recover_staging(&adj_name).await?;
410                }
411            }
412        }
413
414        Ok(())
415    }
416
417    #[cfg(feature = "lance-backend")]
418    fn build_store_from_uri(base_uri: &str) -> Result<Arc<dyn ObjectStore>> {
419        if base_uri.contains("://") {
420            let parsed = url::Url::parse(base_uri).map_err(|e| anyhow!("Invalid base URI: {e}"))?;
421            let (store, _path) = object_store::parse_url(&parsed)
422                .map_err(|e| anyhow!("Failed to parse object store URL: {e}"))?;
423            Ok(Arc::from(store))
424        } else {
425            // If local path, ensure it exists.
426            std::fs::create_dir_all(base_uri)?;
427            Ok(Arc::new(LocalFileSystem::new_with_prefix(base_uri)?))
428        }
429    }
430
431    /// Filesystem root backing this manager's object store, when the store
432    /// is a local filesystem (the non-`://` branch of
433    /// `build_store_from_uri`). Used to fsync WAL segments after PUT —
434    /// `object_store::LocalFileSystem` does not fsync on its own. `None`
435    /// for remote/URL-based stores.
436    pub fn local_fs_root(&self) -> Option<std::path::PathBuf> {
437        if self.base_uri.contains("://") {
438            None
439        } else {
440            Some(std::path::PathBuf::from(&self.base_uri))
441        }
442    }
443
444    pub fn pinned(&self, snapshot: SnapshotManifest) -> Self {
445        // Phase 4a: pinning a forked session is now supported. The
446        // resulting StorageManager keeps `fork_scope` so reads continue
447        // to route through the fork's Lance branches via `base_paths`,
448        // and adds `pinned_snapshot` so writers / writers' read views
449        // resolve at the snapshot's HWM. Writes are gated separately by
450        // the session-level `is_pinned` check (`Session::tx` rejects
451        // them via `UniError::ReadOnly`).
452        Self {
453            base_uri: self.base_uri.clone(),
454            store: self.store.clone(),
455            schema_manager: self.schema_manager.clone(),
456            snapshot_manager: self.snapshot_manager.clone(),
457            // Separate AdjacencyManager for snapshot isolation (Issue #73):
458            // warm() will load only edges visible at the snapshot's HWM.
459            // This prevents live DB's CSR (with all edges) from leaking into snapshots.
460            adjacency_manager: Arc::new(AdjacencyManager::new(self.adjacency_manager.max_bytes())),
461            config: self.config.clone(),
462            compaction_status: Arc::new(Mutex::new(CompactionStatus::default())),
463            flush_in_progress: std::sync::atomic::AtomicUsize::new(0),
464            pinned_snapshot: Some(snapshot),
465            pinned_version_hwm: None,
466            fork_scope: self.fork_scope.clone(),
467            backend: self.backend.clone(),
468            // Deep-copy, not Arc-clone: a fork/pin must get its OWN label index
469            // so its flushes/relabels don't mutate the parent's (review H1/L2),
470            // mirroring the fresh `adjacency_manager` above. `VidLabelsIndex`
471            // derives `Clone`; the snapshot is taken after flush-before-branch so
472            // inherited labels (#99) are preserved.
473            vid_labels_index: Arc::new(parking_lot::RwLock::new(
474                self.vid_labels_index.read().clone(),
475            )),
476        }
477    }
478
479    /// Construct a clone of this `StorageManager` pinned to a row-version
480    /// high-water mark (C2: transaction-level L1 pinning).
481    ///
482    /// Unlike [`Self::pinned`], this needs no `SnapshotManifest`: scans
483    /// filter to `_version <= hwm` via [`Self::version_high_water_mark`].
484    /// A read-write transaction builds one of these at begin with
485    /// `SnapshotView.started_at_version`, so an L0→L1 flush completing
486    /// mid-transaction cannot leak post-snapshot rows into its L1 scans
487    /// (the L0 tier is pinned separately by the `SnapshotView`).
488    ///
489    /// Unlike [`Self::pinned`], the live `AdjacencyManager` is SHARED, not
490    /// fresh: commits replay their edges into the live manager's overlay,
491    /// which is the traversal path's only source for L0-resident edges — a
492    /// fresh manager would make every unflushed edge invisible to the
493    /// transaction. The cost is that the edge tier is not version-pinned
494    /// (post-snapshot edges remain visible to traversals, exactly as before
495    /// C2); edge reads are recorded in the OCC read-set, so a conflicting
496    /// read-modify-write still aborts at commit.
497    pub fn pinned_at_version(&self, hwm: u64) -> Self {
498        Self {
499            base_uri: self.base_uri.clone(),
500            store: self.store.clone(),
501            schema_manager: self.schema_manager.clone(),
502            snapshot_manager: self.snapshot_manager.clone(),
503            adjacency_manager: self.adjacency_manager.clone(),
504            config: self.config.clone(),
505            compaction_status: Arc::new(Mutex::new(CompactionStatus::default())),
506            flush_in_progress: std::sync::atomic::AtomicUsize::new(0),
507            pinned_snapshot: None,
508            pinned_version_hwm: Some(hwm),
509            fork_scope: self.fork_scope.clone(),
510            backend: self.backend.clone(),
511            // Deep-copy, not Arc-clone: a fork/pin must get its OWN label index
512            // so its flushes/relabels don't mutate the parent's (review H1/L2),
513            // mirroring the fresh `adjacency_manager` above. `VidLabelsIndex`
514            // derives `Clone`; the snapshot is taken after flush-before-branch so
515            // inherited labels (#99) are preserved.
516            vid_labels_index: Arc::new(parking_lot::RwLock::new(
517                self.vid_labels_index.read().clone(),
518            )),
519        }
520    }
521
522    /// Construct a fork-scoped clone of this `StorageManager`.
523    ///
524    /// All reads through dataset factories *and* through `backend()`
525    /// on the returned manager route through the fork's Lance branches
526    /// via `base_paths`. The `AdjacencyManager` is fresh (per Issue
527    /// #73 reasoning — same as `pinned`) to prevent primary's CSR from
528    /// leaking into the fork. `fork_scope` and `pinned_snapshot` are
529    /// mutually exclusive.
530    ///
531    /// The backend is wrapped in [`crate::backend::branched::BranchedBackend`]
532    /// so that every `ScanRequest` constructed *anywhere* (PropertyManager,
533    /// MainVertexDataset static methods, etc.) automatically picks up
534    /// the fork's branch for tables the fork has branched. Untracked
535    /// tables fall back to primary, matching Phase 1 read semantics.
536    pub fn at_fork(&self, scope: Arc<crate::fork::ForkScope>) -> Self {
537        self.at_fork_with_schema(scope, self.schema_manager.clone())
538    }
539
540    /// Variant of [`Self::at_fork`] that uses an explicit
541    /// `merged_schema` for the fork's storage rather than primary's
542    /// schema_manager. Used by `UniInner::at_fork` so that the
543    /// fork-side strict-schema checks (in `uni-query` / `uni-store`'s
544    /// writer) see fork-local labels and edge types added through
545    /// `Session::fork_schema()`. Without this, those checks would
546    /// route through primary's schema and reject fork-local labels.
547    pub fn at_fork_with_schema(
548        &self,
549        scope: Arc<crate::fork::ForkScope>,
550        merged_schema: Arc<SchemaManager>,
551    ) -> Self {
552        debug_assert!(
553            self.pinned_snapshot.is_none(),
554            "forking a pinned StorageManager is unsupported in Phase 1"
555        );
556        let branched_backend: Arc<dyn StorageBackend> = Arc::new(
557            crate::backend::branched::BranchedBackend::new(self.backend.clone(), scope.clone()),
558        );
559        // Fork-scoped snapshot manager: a fork's flush publishes its manifest +
560        // `latest` pointer under `catalog/forks/{fork_id}/`, never the primary's
561        // global `catalog/latest` (review C1). Uses the raw object store, since
562        // catalog metadata is not branched.
563        let snapshot_manager = Arc::new(SnapshotManager::new_for_fork(
564            self.store.clone(),
565            scope.fork_id(),
566        ));
567        Self {
568            base_uri: self.base_uri.clone(),
569            store: self.store.clone(),
570            schema_manager: merged_schema,
571            snapshot_manager,
572            adjacency_manager: Arc::new(AdjacencyManager::new(self.adjacency_manager.max_bytes())),
573            config: self.config.clone(),
574            compaction_status: Arc::new(Mutex::new(CompactionStatus::default())),
575            flush_in_progress: std::sync::atomic::AtomicUsize::new(0),
576            pinned_snapshot: None,
577            pinned_version_hwm: None,
578            fork_scope: Some(scope),
579            backend: branched_backend,
580            // Deep-copy, not Arc-clone: a fork/pin must get its OWN label index
581            // so its flushes/relabels don't mutate the parent's (review H1/L2),
582            // mirroring the fresh `adjacency_manager` above. `VidLabelsIndex`
583            // derives `Clone`; the snapshot is taken after flush-before-branch so
584            // inherited labels (#99) are preserved.
585            vid_labels_index: Arc::new(parking_lot::RwLock::new(
586                self.vid_labels_index.read().clone(),
587            )),
588        }
589    }
590
591    /// Borrow the active fork scope, if any.
592    pub fn fork_scope(&self) -> Option<&Arc<crate::fork::ForkScope>> {
593        self.fork_scope.as_ref()
594    }
595
596    /// Phase 5a: query whether a fork-local index exists for the
597    /// `(label, column)` pair on the active fork scope. Returns
598    /// `None` outside a fork or when no fork-local build has
599    /// completed for that pair.
600    ///
601    /// The planner consults this to decide whether to emit
602    /// `FusedIndexScan` (returns `Some`) or fall back to the
603    /// inherited primary index via `base_paths` (returns `None`).
604    /// The lookup is a `DashMap::get` on `ForkScope` — O(1) and
605    /// safe to call per query without caching above this layer.
606    #[must_use]
607    pub fn fork_index_exists(
608        &self,
609        label: &str,
610        column: &str,
611    ) -> Option<crate::fork::ForkLocalIndexKind> {
612        self.fork_scope
613            .as_ref()
614            .and_then(|s| s.fork_local_index(label, column))
615    }
616
617    /// Base URI for this storage manager (the directory or remote
618    /// prefix under which dataset directories live).
619    pub fn base_uri(&self) -> &str {
620        &self.base_uri
621    }
622
623    pub fn get_edge_version_by_id(&self, edge_type_id: u32) -> Option<u64> {
624        let schema = self.schema_manager.schema();
625        let name = schema.edge_type_name_by_id(edge_type_id)?;
626        self.pinned_snapshot
627            .as_ref()
628            .and_then(|s| s.edges.get(name).map(|es| es.lance_version))
629            // The flush path stamps `lance_version: 0` ("LanceDB tables don't
630            // expose Lance version directly") — 0 is a stub sentinel, not a
631            // real dataset version. Returning it would route adjacency reads
632            // through `checkout_version(0)` (the empty initial version).
633            .filter(|v| *v != 0)
634    }
635
636    /// Returns the version high-water mark from the pinned snapshot or the
637    /// transaction-level version pin, if present.
638    ///
639    /// Used by the SCAN tier (vertex tables, property reads) to filter data
640    /// by version: when set, only rows with
641    /// `_version <= version_high_water_mark` are visible. The edge/adjacency
642    /// path must use [`Self::snapshot_version_hwm`] instead.
643    pub fn version_high_water_mark(&self) -> Option<u64> {
644        self.pinned_snapshot
645            .as_ref()
646            .map(|s| s.version_high_water_mark)
647            .or(self.pinned_version_hwm)
648    }
649
650    /// Version high-water mark from a manifest-pinned (time-travel) snapshot
651    /// ONLY — never from a transaction-level version pin.
652    ///
653    /// The edge/adjacency read path switches to version-filtered CSR reads
654    /// and skips the L0 overlays when a hwm is present. That is correct for
655    /// time-travel (a snapshot is flushed state, with its own fresh
656    /// `AdjacencyManager`), but a transaction pin shares the LIVE adjacency
657    /// manager and needs live CSR + L0 overlays + its tx-L0 — filtering
658    /// there would hide unflushed edges and poison the shared warm cache.
659    /// The edge tier is deliberately not version-pinned for transactions
660    /// (see [`Self::pinned_at_version`]).
661    pub fn snapshot_version_hwm(&self) -> Option<u64> {
662        self.pinned_snapshot
663            .as_ref()
664            .map(|s| s.version_high_water_mark)
665    }
666
667    /// Apply version filtering to a base filter expression.
668    ///
669    /// If a snapshot is pinned, wraps `base_filter` with an additional
670    /// `_version <= hwm` clause. Otherwise returns `base_filter` unchanged.
671    pub fn apply_version_filter(&self, base_filter: String) -> String {
672        if let Some(hwm) = self.version_high_water_mark() {
673            format!("({}) AND (_version <= {})", base_filter, hwm)
674        } else {
675            base_filter
676        }
677    }
678
679    /// Build a filter expression that excludes soft-deleted rows and optionally
680    /// includes a user-provided filter.
681    fn build_active_filter(user_filter: Option<&str>) -> String {
682        match user_filter {
683            Some(expr) => format!("({}) AND (_deleted = false)", expr),
684            None => "_deleted = false".to_string(),
685        }
686    }
687
688    pub fn store(&self) -> Arc<dyn ObjectStore> {
689        self.store.clone()
690    }
691
692    /// Get current compaction status.
693    ///
694    /// # Errors
695    ///
696    /// Returns error if the compaction status lock is poisoned (see issue #18/#150).
697    pub fn compaction_status(
698        &self,
699    ) -> Result<CompactionStatus, uni_common::sync::LockPoisonedError> {
700        let guard = uni_common::sync::acquire_mutex(&self.compaction_status, "compaction_status")?;
701        Ok(guard.clone())
702    }
703
704    pub async fn compact(&self) -> Result<CompactionStats> {
705        // Backend handles compaction internally via optimize_table()
706        let start = std::time::Instant::now();
707        let schema = self.schema_manager.schema();
708        let mut files_compacted = 0;
709
710        for label in schema.labels.keys() {
711            let name = table_names::vertex_table_name(label);
712            if self.backend.table_exists(&name).await? {
713                self.backend.optimize_table(&name).await?;
714                files_compacted += 1;
715                self.backend.invalidate_cache(&name);
716            }
717        }
718
719        Ok(CompactionStats {
720            files_compacted,
721            bytes_before: 0,
722            bytes_after: 0,
723            duration: start.elapsed(),
724            crdt_merges: 0,
725        })
726    }
727
728    pub async fn compact_label(&self, label: &str) -> Result<CompactionStats> {
729        let _guard = CompactionGuard::new(self.compaction_status.clone())
730            .ok_or_else(|| anyhow!("Compaction already in progress"))?;
731
732        let start = std::time::Instant::now();
733        let name = table_names::vertex_table_name(label);
734
735        if self.backend.table_exists(&name).await? {
736            self.backend.optimize_table(&name).await?;
737            self.backend.invalidate_cache(&name);
738        }
739
740        Ok(CompactionStats {
741            files_compacted: 1,
742            bytes_before: 0,
743            bytes_after: 0,
744            duration: start.elapsed(),
745            crdt_merges: 0,
746        })
747    }
748
749    pub async fn compact_edge_type(&self, edge_type: &str) -> Result<CompactionStats> {
750        let _guard = CompactionGuard::new(self.compaction_status.clone())
751            .ok_or_else(|| anyhow!("Compaction already in progress"))?;
752
753        let start = std::time::Instant::now();
754        let mut files_compacted = 0;
755
756        for dir in ["fwd", "bwd"] {
757            let name = table_names::delta_table_name(edge_type, dir);
758            if self.backend.table_exists(&name).await? {
759                self.backend.optimize_table(&name).await?;
760                files_compacted += 1;
761            }
762        }
763
764        Ok(CompactionStats {
765            files_compacted,
766            bytes_before: 0,
767            bytes_after: 0,
768            duration: start.elapsed(),
769            crdt_merges: 0,
770        })
771    }
772
773    pub async fn wait_for_compaction(&self) -> Result<()> {
774        loop {
775            let in_progress = {
776                acquire_mutex(&self.compaction_status, "compaction_status")?.compaction_in_progress
777            };
778            if !in_progress {
779                return Ok(());
780            }
781            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
782        }
783    }
784
785    pub fn start_background_compaction(
786        self: Arc<Self>,
787        mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
788    ) -> tokio::task::JoinHandle<()> {
789        if !self.config.compaction.enabled {
790            return tokio::spawn(async {});
791        }
792
793        tokio::spawn(async move {
794            // Use interval_at to delay the first tick. tokio::time::interval fires
795            // immediately on the first tick, which can race with queries that run
796            // right after database open. Delaying by the check_interval gives
797            // initial queries time to complete before compaction modifies tables
798            // (optimize(All) can GC index files that concurrent queries depend on).
799            let start = tokio::time::Instant::now() + self.config.compaction.check_interval;
800            let mut interval =
801                tokio::time::interval_at(start, self.config.compaction.check_interval);
802
803            loop {
804                tokio::select! {
805                    _ = interval.tick() => {
806                        if let Err(e) = self.update_compaction_status().await {
807                            log::error!("Failed to update compaction status: {}", e);
808                            continue;
809                        }
810
811                        if let Some(task) = self.pick_compaction_task() {
812                            log::info!("Triggering background compaction: {:?}", task);
813                            if let Err(e) = Self::execute_compaction(Arc::clone(&self), task).await {
814                                log::error!("Compaction failed: {}", e);
815                            }
816                        }
817                    }
818                    _ = shutdown_rx.recv() => {
819                        log::info!("Background compaction shutting down");
820                        let _ = self.wait_for_compaction().await;
821                        break;
822                    }
823                }
824            }
825        })
826    }
827
828    async fn update_compaction_status(&self) -> Result<()> {
829        let schema = self.schema_manager.schema();
830        let backend = self.backend.as_ref();
831        let mut total_rows: usize = 0;
832        let mut oldest_ts: Option<i64> = None;
833
834        for name in schema.edge_types.keys() {
835            for dir in ["fwd", "bwd"] {
836                let tbl_name = table_names::delta_table_name(name, dir);
837                if !backend.table_exists(&tbl_name).await.unwrap_or(false) {
838                    continue;
839                }
840                let row_count = backend.count_rows(&tbl_name, None).await.unwrap_or(0);
841                if row_count == 0 {
842                    continue;
843                }
844                total_rows += row_count;
845
846                // Query oldest _created_at for age tracking
847                let request =
848                    ScanRequest::all(&tbl_name).with_columns(vec!["_created_at".to_string()]);
849                let Ok(batches) = backend.scan(request).await else {
850                    continue;
851                };
852                for batch in batches {
853                    let Some(col) = batch
854                        .column_by_name("_created_at")
855                        .and_then(|c| c.as_any().downcast_ref::<TimestampNanosecondArray>())
856                    else {
857                        continue;
858                    };
859                    for i in 0..col.len() {
860                        if !col.is_null(i) {
861                            let ts = col.value(i);
862                            oldest_ts = Some(oldest_ts.map_or(ts, |prev| prev.min(ts)));
863                        }
864                    }
865                }
866            }
867        }
868
869        let oldest_l1_age = oldest_ts
870            .and_then(|ts| {
871                let created = UNIX_EPOCH + Duration::from_nanos(ts as u64);
872                SystemTime::now().duration_since(created).ok()
873            })
874            .unwrap_or(Duration::ZERO);
875
876        let mut status = acquire_mutex(&self.compaction_status, "compaction_status")?;
877        // Note: l1_runs is managed by flush_to_l1 (increment) and execute_compaction
878        // (reset). It counts flush generations, not delta table count.
879        status.l1_size_bytes = (total_rows * ENTRY_SIZE_ESTIMATE) as u64;
880        status.oldest_l1_age = oldest_l1_age;
881        Ok(())
882    }
883
884    fn pick_compaction_task(&self) -> Option<CompactionTask> {
885        let status = acquire_mutex(&self.compaction_status, "compaction_status").ok()?;
886
887        if status.l1_runs >= self.config.compaction.max_l1_runs {
888            return Some(CompactionTask::ByRunCount);
889        }
890        if status.l1_size_bytes >= self.config.compaction.max_l1_size_bytes {
891            return Some(CompactionTask::BySize);
892        }
893        if status.oldest_l1_age >= self.config.compaction.max_l1_age
894            && status.oldest_l1_age > Duration::ZERO
895        {
896            return Some(CompactionTask::ByAge);
897        }
898
899        None
900    }
901
902    /// Optimize a table via the backend, returning `true` on success.
903    async fn try_optimize_table(backend: &dyn StorageBackend, table_name: &str) -> bool {
904        if let Err(e) = backend.optimize_table(table_name).await {
905            log::warn!("Failed to optimize table {}: {}", table_name, e);
906            return false;
907        }
908        true
909    }
910
911    /// Trigger L1 compaction asynchronously without blocking the caller.
912    /// Safe to call frequently — CompactionGuard prevents concurrent runs.
913    pub fn trigger_async_compaction(self: &Arc<Self>) {
914        let this = Arc::clone(self);
915        tokio::spawn(async move {
916            if let Err(e) = Self::execute_compaction(this, CompactionTask::ByRunCount).await {
917                // "Compaction already in progress" is expected when called frequently
918                log::debug!("Post-flush compaction skipped: {}", e);
919            }
920        });
921    }
922
923    pub(crate) async fn execute_compaction(
924        this: Arc<Self>,
925        _task: CompactionTask,
926    ) -> Result<CompactionStats> {
927        let start = std::time::Instant::now();
928        let _guard = CompactionGuard::new(this.compaction_status.clone())
929            .ok_or_else(|| anyhow!("Compaction already in progress"))?;
930
931        let schema = this.schema_manager.schema();
932        let mut files_compacted = 0;
933
934        // ── Tier 2: Semantic compaction ──
935        // Dedup vertices, merge CRDTs, consolidate L1→L2 deltas, clean tombstones
936        let compactor = Compactor::new(Arc::clone(&this));
937        let compaction_results = compactor.compact_all().await.unwrap_or_else(|e| {
938            log::error!(
939                "Semantic compaction failed (continuing with backend optimize): {}",
940                e
941            );
942            Vec::new()
943        });
944
945        // Re-warm adjacency CSR after semantic compaction
946        let am = this.adjacency_manager();
947        for info in &compaction_results {
948            let direction = match info.direction.as_str() {
949                "fwd" => Direction::Outgoing,
950                "bwd" => Direction::Incoming,
951                _ => continue,
952            };
953            if let Some(etid) = schema.edge_type_id_unified_case_insensitive(&info.edge_type)
954                && let Err(e) = am.warm(&this, etid, direction, None).await
955            {
956                log::warn!(
957                    "Failed to re-warm adjacency for {}/{}: {}",
958                    info.edge_type,
959                    info.direction,
960                    e
961                );
962            }
963        }
964
965        // ── Tier 3: Backend optimize ──
966        let backend = this.backend.as_ref();
967
968        // Optimize edge delta and adjacency tables
969        for name in schema.edge_types.keys() {
970            for dir in ["fwd", "bwd"] {
971                let delta = table_names::delta_table_name(name, dir);
972                if Self::try_optimize_table(backend, &delta).await {
973                    files_compacted += 1;
974                }
975                let adj = table_names::adjacency_table_name(name, dir);
976                if Self::try_optimize_table(backend, &adj).await {
977                    files_compacted += 1;
978                }
979            }
980        }
981
982        // Optimize vertex tables
983        for label in schema.labels.keys() {
984            let tbl = table_names::vertex_table_name(label);
985            if Self::try_optimize_table(backend, &tbl).await {
986                files_compacted += 1;
987                backend.invalidate_cache(&tbl);
988            }
989        }
990
991        // Optimize main vertex and edge tables
992        for tbl in [
993            table_names::main_vertex_table_name(),
994            table_names::main_edge_table_name(),
995        ] {
996            if Self::try_optimize_table(backend, tbl).await {
997                files_compacted += 1;
998            }
999        }
1000
1001        {
1002            let mut status = acquire_mutex(&this.compaction_status, "compaction_status")?;
1003            status.total_compactions += 1;
1004            status.l1_runs = 0; // Reset flush generation counter
1005        }
1006
1007        Ok(CompactionStats {
1008            files_compacted,
1009            bytes_before: 0,
1010            bytes_after: 0,
1011            duration: start.elapsed(),
1012            crdt_merges: 0,
1013        })
1014    }
1015
1016    /// Open a LanceDB table for a label.
1017    ///
1018    /// Invalidate cached table state (call after writes).
1019    pub fn invalidate_table_cache(&self, label: &str) {
1020        let name = table_names::vertex_table_name(label);
1021        self.backend.invalidate_cache(&name);
1022    }
1023
1024    pub fn base_path(&self) -> &str {
1025        &self.base_uri
1026    }
1027
1028    pub fn schema_manager(&self) -> &SchemaManager {
1029        &self.schema_manager
1030    }
1031
1032    pub fn schema_manager_arc(&self) -> Arc<SchemaManager> {
1033        self.schema_manager.clone()
1034    }
1035
1036    /// Returns the backing `Arc<SchemaManager>` by reference.
1037    ///
1038    /// Unlike [`Self::schema_manager`] (which derefs to `&SchemaManager`),
1039    /// this preserves the `Arc`'s pointer identity. A pinned transaction and
1040    /// the live session clone the *same* `schema_manager` `Arc`, while forks
1041    /// hold a distinct one — so this is the correct registry key for the
1042    /// projection store (see `uni-query`'s `projection_store::for_storage`).
1043    #[must_use]
1044    pub fn schema_manager_arc_ref(&self) -> &Arc<SchemaManager> {
1045        &self.schema_manager
1046    }
1047
1048    /// Get the adjacency manager for the dual-CSR architecture.
1049    pub fn adjacency_manager(&self) -> Arc<AdjacencyManager> {
1050        Arc::clone(&self.adjacency_manager)
1051    }
1052
1053    /// Warm the adjacency manager for a specific edge type and direction.
1054    ///
1055    /// Builds the Main CSR from L2 adjacency + L1 delta data in storage.
1056    /// Called lazily on first access per edge type or at startup.
1057    pub async fn warm_adjacency(
1058        &self,
1059        edge_type_id: u32,
1060        direction: crate::storage::direction::Direction,
1061        version: Option<u64>,
1062    ) -> anyhow::Result<()> {
1063        self.adjacency_manager
1064            .warm(self, edge_type_id, direction, version)
1065            .await
1066    }
1067
1068    /// Coalesced warm_adjacency() to prevent cache stampede (Issue #13).
1069    ///
1070    /// Uses double-checked locking to ensure only one concurrent warm() per
1071    /// (edge_type, direction) key. Subsequent callers wait for the first to complete.
1072    pub async fn warm_adjacency_coalesced(
1073        &self,
1074        edge_type_id: u32,
1075        direction: crate::storage::direction::Direction,
1076        version: Option<u64>,
1077    ) -> anyhow::Result<()> {
1078        self.adjacency_manager
1079            .warm_coalesced(self, edge_type_id, direction, version)
1080            .await
1081    }
1082
1083    /// Check whether the adjacency manager has a CSR for the given edge type and direction.
1084    pub fn has_adjacency_csr(
1085        &self,
1086        edge_type_id: u32,
1087        direction: crate::storage::direction::Direction,
1088    ) -> bool {
1089        self.adjacency_manager.has_csr(edge_type_id, direction)
1090    }
1091
1092    /// Get neighbors at a specific version for snapshot queries.
1093    pub fn get_neighbors_at_version(
1094        &self,
1095        vid: uni_common::core::id::Vid,
1096        edge_type: u32,
1097        direction: crate::storage::direction::Direction,
1098        version: u64,
1099    ) -> Vec<(uni_common::core::id::Vid, uni_common::core::id::Eid)> {
1100        self.adjacency_manager
1101            .get_neighbors_at_version(vid, edge_type, direction, version)
1102    }
1103
1104    /// Get the storage backend.
1105    pub fn backend(&self) -> &dyn StorageBackend {
1106        self.backend.as_ref()
1107    }
1108
1109    /// Get the storage backend as an Arc.
1110    pub fn backend_arc(&self) -> Arc<dyn StorageBackend> {
1111        self.backend.clone()
1112    }
1113
1114    /// Rebuild the VidLabelsIndex from the main vertex table.
1115    ///
1116    /// Always called on startup. On a fresh database (no vertex table yet) the
1117    /// index is left empty and filled incrementally by flush-time updates.
1118    async fn rebuild_vid_labels_index(&mut self) -> Result<()> {
1119        use crate::storage::vid_labels::VidLabelsIndex;
1120
1121        let backend = self.backend.as_ref();
1122        let vtable = table_names::main_vertex_table_name();
1123
1124        // Check if the table exists (fresh database)
1125        if !backend.table_exists(vtable).await.unwrap_or(false) {
1126            self.vid_labels_index = Arc::new(parking_lot::RwLock::new(VidLabelsIndex::new()));
1127            return Ok(());
1128        }
1129
1130        // Scan all non-deleted vertices and collect (VID, labels)
1131        let request = ScanRequest::all(vtable)
1132            .with_filter("_deleted = false")
1133            .with_limit(100_000);
1134        let batches = backend
1135            .scan(request)
1136            .await
1137            .map_err(|e| anyhow!("Failed to query main vertex table: {}", e))?;
1138
1139        let mut index = VidLabelsIndex::new();
1140        for batch in batches {
1141            let vid_col = batch
1142                .column_by_name("_vid")
1143                .ok_or_else(|| anyhow!("Missing _vid column"))?
1144                .as_any()
1145                .downcast_ref::<UInt64Array>()
1146                .ok_or_else(|| anyhow!("Invalid _vid column type"))?;
1147
1148            let labels_col = batch
1149                .column_by_name("labels")
1150                .ok_or_else(|| anyhow!("Missing labels column"))?
1151                .as_any()
1152                .downcast_ref::<arrow_array::ListArray>()
1153                .ok_or_else(|| anyhow!("Invalid labels column type"))?;
1154
1155            for row_idx in 0..batch.num_rows() {
1156                let vid = Vid::from(vid_col.value(row_idx));
1157                let labels_array = labels_col.value(row_idx);
1158                let labels_str_array = labels_array
1159                    .as_any()
1160                    .downcast_ref::<arrow_array::StringArray>()
1161                    .ok_or_else(|| anyhow!("Invalid labels array element type"))?;
1162
1163                let labels: Vec<String> = (0..labels_str_array.len())
1164                    .map(|i| labels_str_array.value(i).to_string())
1165                    .collect();
1166
1167                index.insert(vid, labels);
1168            }
1169        }
1170
1171        self.vid_labels_index = Arc::new(parking_lot::RwLock::new(index));
1172        Ok(())
1173    }
1174
1175    /// Get labels for a VID from the in-memory index.
1176    ///
1177    /// Returns `None` only when the VID is absent from the index (e.g. it was
1178    /// never persisted, or has been deleted).
1179    pub fn get_labels_from_index(&self, vid: Vid) -> Option<Vec<String>> {
1180        let index = self.vid_labels_index.read();
1181        index.get_labels(vid).map(|labels| labels.to_vec())
1182    }
1183
1184    /// Update the VID-to-labels mapping in the index.
1185    pub fn update_vid_labels_index(&self, vid: Vid, labels: Vec<String>) {
1186        let mut index = self.vid_labels_index.write();
1187        index.insert(vid, labels);
1188    }
1189
1190    /// Remove a VID from the labels index.
1191    pub fn remove_from_vid_labels_index(&self, vid: Vid) {
1192        let mut index = self.vid_labels_index.write();
1193        index.remove_vid(vid);
1194    }
1195
1196    pub async fn load_subgraph_cached(
1197        &self,
1198        start_vids: &[Vid],
1199        edge_types: &[u32],
1200        max_hops: usize,
1201        direction: GraphDirection,
1202        _l0: Option<Arc<RwLock<L0Buffer>>>,
1203    ) -> Result<WorkingGraph> {
1204        let mut graph = WorkingGraph::new();
1205
1206        let dir = match direction {
1207            GraphDirection::Outgoing => crate::storage::direction::Direction::Outgoing,
1208            GraphDirection::Incoming => crate::storage::direction::Direction::Incoming,
1209        };
1210
1211        let neighbor_is_dst = matches!(direction, GraphDirection::Outgoing);
1212
1213        // Initialize frontier
1214        let mut frontier: Vec<Vid> = start_vids.to_vec();
1215        let mut visited: HashSet<Vid> = HashSet::new();
1216
1217        // Initialize start vids
1218        for &vid in start_vids {
1219            graph.add_vertex(vid);
1220        }
1221
1222        for _hop in 0..max_hops {
1223            let mut next_frontier = HashSet::new();
1224
1225            for &vid in &frontier {
1226                if visited.contains(&vid) {
1227                    continue;
1228                }
1229                visited.insert(vid);
1230                graph.add_vertex(vid);
1231
1232                for &etype_id in edge_types {
1233                    // Warm adjacency with coalescing to prevent cache stampede (Issue #13).
1234                    // Manifest pin only: a tx version pin shares the LIVE adjacency
1235                    // manager — warming it filtered would poison the shared cache.
1236                    let edge_ver = self.snapshot_version_hwm();
1237                    self.adjacency_manager
1238                        .warm_coalesced(self, etype_id, dir, edge_ver)
1239                        .await?;
1240
1241                    // Get neighbors from AdjacencyManager (Main CSR + overlay)
1242                    let edges = self.adjacency_manager.get_neighbors(vid, etype_id, dir);
1243
1244                    for (neighbor_vid, eid) in edges {
1245                        graph.add_vertex(neighbor_vid);
1246                        if !visited.contains(&neighbor_vid) {
1247                            next_frontier.insert(neighbor_vid);
1248                        }
1249
1250                        if neighbor_is_dst {
1251                            graph.add_edge(vid, neighbor_vid, eid, etype_id);
1252                        } else {
1253                            graph.add_edge(neighbor_vid, vid, eid, etype_id);
1254                        }
1255                    }
1256                }
1257            }
1258            frontier = next_frontier.into_iter().collect();
1259
1260            // Early termination: if frontier is empty, no more vertices to explore
1261            if frontier.is_empty() {
1262                break;
1263            }
1264        }
1265
1266        Ok(graph)
1267    }
1268
1269    pub fn snapshot_manager(&self) -> &SnapshotManager {
1270        &self.snapshot_manager
1271    }
1272
1273    pub fn index_manager(&self) -> IndexManager {
1274        IndexManager::new(&self.base_uri, self.schema_manager.clone())
1275    }
1276
1277    // ========================================================================
1278    // Domain-level scan methods — encapsulate LanceDB queries for consumers
1279    // ========================================================================
1280
1281    /// Scan a per-label vertex table. Returns `None` if the table doesn't exist.
1282    ///
1283    /// Internally opens the table, filters requested columns to those that
1284    /// physically exist, and applies the version HWM filter for snapshot isolation.
1285    pub async fn scan_vertex_table(
1286        &self,
1287        label: &str,
1288        columns: &[&str],
1289        additional_filter: Option<&str>,
1290    ) -> Result<Option<arrow_array::RecordBatch>> {
1291        let backend = self.backend();
1292        let table_name = table_names::vertex_table_name(label);
1293
1294        if !backend.table_exists(&table_name).await.unwrap_or(false) {
1295            return Ok(None);
1296        }
1297
1298        // Filter columns to those that exist in the table
1299        let actual_columns =
1300            if let Some(table_schema) = backend.get_table_schema(&table_name).await? {
1301                let table_field_names: HashSet<&str> = table_schema
1302                    .fields()
1303                    .iter()
1304                    .map(|f| f.name().as_str())
1305                    .collect();
1306                columns
1307                    .iter()
1308                    .copied()
1309                    .filter(|c| table_field_names.contains(c))
1310                    .map(|s| s.to_string())
1311                    .collect::<Vec<_>>()
1312            } else {
1313                return Ok(None);
1314            };
1315
1316        // Build filter with version HWM + optional additional filter
1317        let filter = match (self.version_high_water_mark(), additional_filter) {
1318            (Some(hwm), Some(f)) => Some(format!("_version <= {} AND ({})", hwm, f)),
1319            (Some(hwm), None) => Some(format!("_version <= {}", hwm)),
1320            (None, Some(f)) => Some(f.to_string()),
1321            (None, None) => None,
1322        };
1323
1324        let mut request = ScanRequest::all(&table_name).with_columns(actual_columns);
1325        if let Some(f) = filter {
1326            request = request.with_filter(f);
1327        }
1328
1329        // Fail closed: a scan error (transient I/O, an unparsable filter, a
1330        // corrupt fragment) must propagate, never be silently mapped to
1331        // `Ok(None)`. Callers treat `Ok(None)` as "no rows" — e.g. the MERGE
1332        // fast path would create a duplicate node on a transient failure (review
1333        // bug #3a) — so an error here must surface as an error. A genuinely-
1334        // absent table is already handled above.
1335        let batches = backend.scan(request).await?;
1336        if batches.is_empty() {
1337            Ok(None)
1338        } else {
1339            Ok(Some(arrow::compute::concat_batches(
1340                &batches[0].schema(),
1341                &batches,
1342            )?))
1343        }
1344    }
1345
1346    /// Scan a delta table for an edge type + direction.
1347    /// Returns `None` if the table doesn't exist.
1348    pub async fn scan_delta_table(
1349        &self,
1350        edge_type: &str,
1351        direction: &str,
1352        columns: &[&str],
1353        additional_filter: Option<&str>,
1354    ) -> Result<Option<arrow_array::RecordBatch>> {
1355        // Edge path: manifest pin only. A transaction version pin must NOT
1356        // version-filter edge reads — the edge tier is not version-pinned
1357        // (the live AdjacencyManager + tx-L0 overlay carry unflushed and
1358        // in-transaction edges), so filtering here would hide a relationship
1359        // the same transaction just created (MERGE read-your-writes).
1360        let edge_hwm = self.snapshot_version_hwm();
1361        let backend = self.backend();
1362        let table_name = table_names::delta_table_name(edge_type, direction);
1363
1364        if !backend.table_exists(&table_name).await.unwrap_or(false) {
1365            return Ok(None);
1366        }
1367
1368        // Filter columns to those that exist
1369        let actual_columns =
1370            if let Some(table_schema) = backend.get_table_schema(&table_name).await? {
1371                let table_field_names: HashSet<&str> = table_schema
1372                    .fields()
1373                    .iter()
1374                    .map(|f| f.name().as_str())
1375                    .collect();
1376                columns
1377                    .iter()
1378                    .copied()
1379                    .filter(|c| table_field_names.contains(c))
1380                    .map(|s| s.to_string())
1381                    .collect::<Vec<_>>()
1382            } else {
1383                return Ok(None);
1384            };
1385
1386        let filter = match (edge_hwm, additional_filter) {
1387            (Some(hwm), Some(f)) => Some(format!("_version <= {} AND ({})", hwm, f)),
1388            (Some(hwm), None) => Some(format!("_version <= {}", hwm)),
1389            (None, Some(f)) => Some(f.to_string()),
1390            (None, None) => None,
1391        };
1392
1393        let mut request = ScanRequest::all(&table_name).with_columns(actual_columns);
1394        if let Some(f) = filter {
1395            request = request.with_filter(f);
1396        }
1397
1398        // Fail closed: a scan error (transient I/O, an unparsable filter, a
1399        // corrupt fragment) must propagate, never be silently mapped to
1400        // `Ok(None)`. Callers treat `Ok(None)` as "no rows" — e.g. the MERGE
1401        // fast path would create a duplicate node on a transient failure (review
1402        // bug #3a) — so an error here must surface as an error. A genuinely-
1403        // absent table is already handled above.
1404        let batches = backend.scan(request).await?;
1405        if batches.is_empty() {
1406            Ok(None)
1407        } else {
1408            Ok(Some(arrow::compute::concat_batches(
1409                &batches[0].schema(),
1410                &batches,
1411            )?))
1412        }
1413    }
1414
1415    /// Scan the unified main vertex table. Returns `None` if table doesn't exist.
1416    ///
1417    /// Applies version HWM filter internally for snapshot isolation, combined
1418    /// with any caller-provided filter (label conditions, etc.).
1419    pub async fn scan_main_vertex_table(
1420        &self,
1421        columns: &[&str],
1422        filter: Option<&str>,
1423    ) -> Result<Option<arrow_array::RecordBatch>> {
1424        let backend = self.backend();
1425        let table_name = table_names::main_vertex_table_name();
1426
1427        if !backend.table_exists(table_name).await.unwrap_or(false) {
1428            return Ok(None);
1429        }
1430
1431        // Combine caller filter with version HWM for snapshot isolation
1432        let full_filter = match (self.version_high_water_mark(), filter) {
1433            (Some(hwm), Some(f)) => Some(format!("_version <= {} AND ({})", hwm, f)),
1434            (Some(hwm), None) => Some(format!("_version <= {}", hwm)),
1435            (None, Some(f)) => Some(f.to_string()),
1436            (None, None) => None,
1437        };
1438
1439        let request = ScanRequest::all(table_name)
1440            .with_columns(columns.iter().map(|s| s.to_string()).collect());
1441        let request = match full_filter.as_deref() {
1442            Some(f) => request.with_filter(f),
1443            None => request,
1444        };
1445
1446        // Fail closed: a scan error (transient I/O, an unparsable filter, a
1447        // corrupt fragment) must propagate, never be silently mapped to
1448        // `Ok(None)`. Callers treat `Ok(None)` as "no rows" — e.g. the MERGE
1449        // fast path would create a duplicate node on a transient failure (review
1450        // bug #3a) — so an error here must surface as an error. A genuinely-
1451        // absent table is already handled above.
1452        let batches = backend.scan(request).await?;
1453        if batches.is_empty() {
1454            Ok(None)
1455        } else {
1456            Ok(Some(arrow::compute::concat_batches(
1457                &batches[0].schema(),
1458                &batches,
1459            )?))
1460        }
1461    }
1462
1463    /// Scan the main edge table as a stream. Returns `None` if table doesn't exist.
1464    pub async fn scan_main_edge_table_stream(
1465        &self,
1466        filter: Option<&str>,
1467    ) -> Result<
1468        Option<
1469            std::pin::Pin<Box<dyn futures::Stream<Item = Result<arrow_array::RecordBatch>> + Send>>,
1470        >,
1471    > {
1472        let backend = self.backend();
1473        let table_name = table_names::main_edge_table_name();
1474
1475        if !backend.table_exists(table_name).await.unwrap_or(false) {
1476            return Ok(None);
1477        }
1478
1479        let mut request = ScanRequest::all(table_name);
1480        if let Some(f) = filter {
1481            request = request.with_filter(f);
1482        }
1483
1484        let stream = backend.scan_stream(request).await?;
1485        Ok(Some(stream))
1486    }
1487
1488    /// Scan a per-label vertex table as a stream. Returns `None` if table doesn't exist.
1489    pub async fn scan_vertex_table_stream(
1490        &self,
1491        label: &str,
1492    ) -> Result<
1493        Option<
1494            std::pin::Pin<Box<dyn futures::Stream<Item = Result<arrow_array::RecordBatch>> + Send>>,
1495        >,
1496    > {
1497        let backend = self.backend();
1498        let table_name = table_names::vertex_table_name(label);
1499
1500        if !backend.table_exists(&table_name).await.unwrap_or(false) {
1501            return Ok(None);
1502        }
1503
1504        let stream = backend.scan_stream(ScanRequest::all(&table_name)).await?;
1505        Ok(Some(stream))
1506    }
1507
1508    /// Find a vertex VID by external ID. Uses pinned snapshot HWM if present.
1509    pub async fn find_vertex_by_ext_id(&self, ext_id: &str) -> Result<Option<Vid>> {
1510        MainVertexDataset::find_by_ext_id(self.backend(), ext_id, self.version_high_water_mark())
1511            .await
1512    }
1513
1514    /// Map every live vertex that has an external id to its `ext_id`
1515    /// (`_vid` → `ext_id`).
1516    ///
1517    /// `ext_id` is folded into a vertex's content `_uid` but is stripped from
1518    /// query results, so the fork diff/promote engine can't recover it by
1519    /// re-hashing query rows — two vertices differing only by `ext_id` would
1520    /// collapse to one identity (review H4). This exposes the stored `ext_id`
1521    /// so the diff can fold it back into its recomputed UID. Reads through the
1522    /// (branched) backend, so a forked manager sees its own + inherited rows.
1523    /// Covers flushed (Lance) rows; `ext_id` is immutable so no version
1524    /// reconciliation is needed.
1525    pub async fn get_vertex_ext_ids(&self) -> Result<std::collections::HashMap<Vid, String>> {
1526        use arrow_array::StringArray;
1527        let backend = self.backend.as_ref();
1528        let vtable = table_names::main_vertex_table_name();
1529        let mut out = std::collections::HashMap::new();
1530        if !backend.table_exists(vtable).await.unwrap_or(false) {
1531            return Ok(out);
1532        }
1533        let request = ScanRequest::all(vtable)
1534            .with_filter("_deleted = false")
1535            .with_columns(vec!["_vid".to_string(), "ext_id".to_string()]);
1536        let batches = backend
1537            .scan(request)
1538            .await
1539            .map_err(|e| anyhow!("get_vertex_ext_ids: {}", e))?;
1540        for batch in batches {
1541            let vids = batch
1542                .column_by_name("_vid")
1543                .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
1544                .ok_or_else(|| anyhow!("get_vertex_ext_ids: missing/invalid _vid column"))?;
1545            let exts = batch
1546                .column_by_name("ext_id")
1547                .and_then(|c| c.as_any().downcast_ref::<StringArray>())
1548                .ok_or_else(|| anyhow!("get_vertex_ext_ids: missing/invalid ext_id column"))?;
1549            for i in 0..batch.num_rows() {
1550                if exts.is_null(i) {
1551                    continue;
1552                }
1553                let ext = exts.value(i);
1554                if !ext.is_empty() {
1555                    out.insert(Vid::from(vids.value(i)), ext.to_string());
1556                }
1557            }
1558        }
1559        Ok(out)
1560    }
1561
1562    /// Find labels for a vertex by VID. Uses pinned snapshot HWM if present.
1563    pub async fn find_vertex_labels_by_vid(&self, vid: Vid) -> Result<Option<Vec<String>>> {
1564        MainVertexDataset::find_labels_by_vid(self.backend(), vid, self.version_high_water_mark())
1565            .await
1566    }
1567
1568    /// Find edges from the main edge table by type names, optionally pushing
1569    /// a bounded endpoint vid set into the scan (review perf #5).
1570    pub async fn find_edges_by_type_names(
1571        &self,
1572        type_names: &[&str],
1573        endpoint_filter: Option<(crate::storage::main_edge::EndpointSide, &[Vid])>,
1574    ) -> Result<Vec<(Eid, Vid, Vid, String, uni_common::Properties)>> {
1575        MainEdgeDataset::find_edges_by_type_names(self.backend(), type_names, endpoint_filter).await
1576    }
1577
1578    /// Scan vertex candidates matching a filter. Returns VIDs where `_deleted = false`.
1579    pub async fn scan_vertex_candidates(
1580        &self,
1581        label: &str,
1582        filter: Option<&str>,
1583    ) -> Result<Vec<Vid>> {
1584        let backend = self.backend();
1585        let table_name = table_names::vertex_table_name(label);
1586
1587        if !backend.table_exists(&table_name).await.unwrap_or(false) {
1588            return Ok(Vec::new());
1589        }
1590
1591        let full_filter = match filter {
1592            Some(f) => format!("_deleted = false AND ({})", f),
1593            None => "_deleted = false".to_string(),
1594        };
1595
1596        let request = ScanRequest::all(&table_name)
1597            .with_filter(full_filter)
1598            .with_columns(vec!["_vid".to_string()]);
1599
1600        let batches = backend.scan(request).await?;
1601
1602        let mut vids = Vec::new();
1603        for batch in batches {
1604            let vid_col = batch
1605                .column_by_name("_vid")
1606                .ok_or(anyhow!("Missing _vid"))?
1607                .as_any()
1608                .downcast_ref::<UInt64Array>()
1609                .ok_or(anyhow!("Invalid _vid"))?;
1610            for i in 0..batch.num_rows() {
1611                vids.push(Vid::from(vid_col.value(i)));
1612            }
1613        }
1614        Ok(vids)
1615    }
1616
1617    pub fn vertex_dataset(&self, label: &str) -> Result<VertexDataset> {
1618        let schema = self.schema_manager.schema();
1619        let label_meta = schema
1620            .labels
1621            .get(label)
1622            .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
1623        let key = format!("vertices_{label}");
1624        match self.fork_branch_for(&key) {
1625            Some(branch) => Ok(VertexDataset::new_branched(
1626                &self.base_uri,
1627                label,
1628                label_meta.id,
1629                branch,
1630            )),
1631            None => Ok(VertexDataset::new(&self.base_uri, label, label_meta.id)),
1632        }
1633    }
1634
1635    #[cfg(feature = "lance-backend")]
1636    pub fn edge_dataset(
1637        &self,
1638        edge_type: &str,
1639        src_label: &str,
1640        dst_label: &str,
1641    ) -> Result<EdgeDataset> {
1642        let key = format!("edges_{edge_type}");
1643        match self.fork_branch_for(&key) {
1644            Some(branch) => Ok(EdgeDataset::new_branched(
1645                &self.base_uri,
1646                edge_type,
1647                src_label,
1648                dst_label,
1649                branch,
1650            )),
1651            None => Ok(EdgeDataset::new(
1652                &self.base_uri,
1653                edge_type,
1654                src_label,
1655                dst_label,
1656            )),
1657        }
1658    }
1659
1660    pub fn delta_dataset(&self, edge_type: &str, direction: &str) -> Result<DeltaDataset> {
1661        let key = format!("deltas_{edge_type}_{direction}");
1662        match self.fork_branch_for(&key) {
1663            Some(branch) => Ok(DeltaDataset::new_branched(
1664                &self.base_uri,
1665                edge_type,
1666                direction,
1667                branch,
1668            )),
1669            None => Ok(DeltaDataset::new(&self.base_uri, edge_type, direction)),
1670        }
1671    }
1672
1673    pub fn adjacency_dataset(
1674        &self,
1675        edge_type: &str,
1676        label: &str,
1677        direction: &str,
1678    ) -> Result<AdjacencyDataset> {
1679        // The fork registers adjacency branches under the canonical table
1680        // name (`adjacency_{edge_type}_{direction}`), so the lookup key must
1681        // match it — not the historical `adjacency_{direction}_{edge_type}_
1682        // {label}`, which never resolved a branch. Adjacency is per-`(edge_
1683        // type, direction)`, not per-label. The canonical form is pinned by
1684        // `table_names::tests::adjacency_table_name_is_canonical`. (L8)
1685        let key = crate::backend::table_names::adjacency_table_name(edge_type, direction);
1686        match self.fork_branch_for(&key) {
1687            Some(branch) => Ok(AdjacencyDataset::new_branched(
1688                &self.base_uri,
1689                edge_type,
1690                label,
1691                direction,
1692                branch,
1693            )),
1694            None => Ok(AdjacencyDataset::new(
1695                &self.base_uri,
1696                edge_type,
1697                label,
1698                direction,
1699            )),
1700        }
1701    }
1702
1703    /// Look up the branch name for a dataset under the active fork
1704    /// scope, if any. Returns `None` when not forked, or when the fork
1705    /// hasn't recorded a branch on this dataset yet (Phase 2 territory).
1706    fn fork_branch_for(&self, dataset_name: &str) -> Option<String> {
1707        self.fork_scope
1708            .as_ref()
1709            .and_then(|s| s.branch_for(dataset_name))
1710    }
1711
1712    /// Get the main vertex dataset for unified vertex storage.
1713    ///
1714    /// The main vertex dataset contains all vertices regardless of label,
1715    /// enabling fast ID-based lookups without knowing the label.
1716    pub fn main_vertex_dataset(&self) -> MainVertexDataset {
1717        MainVertexDataset::new(&self.base_uri)
1718    }
1719
1720    /// Get the main edge dataset for unified edge storage.
1721    ///
1722    /// The main edge dataset contains all edges regardless of type,
1723    /// enabling fast ID-based lookups without knowing the edge type.
1724    pub fn main_edge_dataset(&self) -> MainEdgeDataset {
1725        MainEdgeDataset::new(&self.base_uri)
1726    }
1727
1728    #[cfg(feature = "lance-backend")]
1729    pub fn uid_index(&self, label: &str) -> Result<UidIndex> {
1730        Ok(UidIndex::new(&self.base_uri, label))
1731    }
1732
1733    #[cfg(feature = "lance-backend")]
1734    pub async fn inverted_index(&self, label: &str, property: &str) -> Result<InvertedIndex> {
1735        let schema = self.schema_manager.schema();
1736        let config = schema
1737            .indexes
1738            .iter()
1739            .find_map(|idx| match idx {
1740                IndexDefinition::Inverted(cfg)
1741                    if cfg.label == label && cfg.property == property =>
1742                {
1743                    Some(cfg.clone())
1744                }
1745                _ => None,
1746            })
1747            .ok_or_else(|| anyhow!("Inverted index not found for {}.{}", label, property))?;
1748
1749        InvertedIndex::new(&self.base_uri, config).await
1750    }
1751
1752    pub async fn vector_search(
1753        &self,
1754        label: &str,
1755        property: &str,
1756        query: &[f32],
1757        k: usize,
1758        filter: Option<&str>,
1759        ctx: Option<&QueryContext>,
1760    ) -> Result<Vec<(Vid, f32)>> {
1761        use crate::backend::types::{DistanceMetric as BackendMetric, FilterExpr};
1762
1763        // Look up vector index config to get the correct distance metric.
1764        let schema = self.schema_manager.schema();
1765        let metric = schema
1766            .vector_index_for_property(label, property)
1767            .map(|config| config.metric.clone())
1768            .unwrap_or(DistanceMetric::L2);
1769
1770        let backend = self.backend.as_ref();
1771        let name = table_names::vertex_table_name(label);
1772
1773        let mut results = Vec::new();
1774
1775        // Only search if the table exists
1776        if backend.table_exists(&name).await.unwrap_or(false) {
1777            let backend_metric = match &metric {
1778                DistanceMetric::L2 => BackendMetric::L2,
1779                DistanceMetric::Cosine => BackendMetric::Cosine,
1780                DistanceMetric::Dot => BackendMetric::Dot,
1781                _ => BackendMetric::L2,
1782            };
1783
1784            // Build combined filter: _deleted = false + optional user filter + HWM
1785            let mut filter_parts = vec![Self::build_active_filter(filter)];
1786            if ctx.is_some()
1787                && let Some(hwm) = self.version_high_water_mark()
1788            {
1789                filter_parts.push(format!("_version <= {}", hwm));
1790            }
1791            let combined_filter = FilterExpr::Sql(filter_parts.join(" AND "));
1792
1793            let batches = backend
1794                .vector_search(&name, property, query, k, backend_metric, combined_filter)
1795                .await?;
1796
1797            results = extract_vid_score_pairs(&batches, "_vid", "_distance")?;
1798        }
1799
1800        // Merge L0 buffer vertices into results for visibility of unflushed data.
1801        if let Some(qctx) = ctx {
1802            merge_l0_into_vector_results(&mut results, qctx, label, property, query, k, &metric);
1803        }
1804
1805        Ok(results)
1806    }
1807
1808    /// Perform a full-text search with BM25 scoring.
1809    ///
1810    /// Returns vertices matching the search query along with their BM25 scores.
1811    /// Results are sorted by score descending (most relevant first).
1812    ///
1813    /// # Arguments
1814    /// * `label` - The label to search within
1815    /// * `property` - The property column to search (must have FTS index)
1816    /// * `query` - The search query text
1817    /// * `k` - Maximum number of results to return
1818    /// * `filter` - Optional Lance filter expression
1819    /// * `ctx` - Optional query context for visibility checks
1820    ///
1821    /// # Returns
1822    /// Vector of (Vid, score) tuples, where score is the BM25 relevance score.
1823    pub async fn fts_search(
1824        &self,
1825        label: &str,
1826        property: &str,
1827        query: &str,
1828        k: usize,
1829        filter: Option<&str>,
1830        ctx: Option<&QueryContext>,
1831    ) -> Result<Vec<(Vid, f32)>> {
1832        use crate::backend::types::FilterExpr;
1833
1834        let backend = self.backend.as_ref();
1835        let name = table_names::vertex_table_name(label);
1836
1837        let mut results = if backend.table_exists(&name).await.unwrap_or(false) {
1838            // Build combined filter: _deleted = false + optional user filter + HWM
1839            let mut filter_parts = vec![Self::build_active_filter(filter)];
1840            if ctx.is_some()
1841                && let Some(hwm) = self.version_high_water_mark()
1842            {
1843                filter_parts.push(format!("_version <= {}", hwm));
1844            }
1845            let combined_filter = FilterExpr::Sql(filter_parts.join(" AND "));
1846
1847            let batches = backend
1848                .full_text_search(&name, property, query, k, combined_filter)
1849                .await?;
1850
1851            let mut fts_results = extract_vid_score_pairs(&batches, "_vid", "_score")?;
1852            // Results should already be sorted by score from backend, but ensure descending order
1853            fts_results.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
1854            fts_results
1855        } else {
1856            Vec::new()
1857        };
1858
1859        // Merge L0 buffer vertices for visibility of unflushed data.
1860        if let Some(qctx) = ctx {
1861            merge_l0_into_fts_results(&mut results, qctx, label, property, query, k);
1862        }
1863
1864        Ok(results)
1865    }
1866
1867    #[cfg(feature = "lance-backend")]
1868    pub async fn get_vertex_by_uid(&self, uid: &UniId, label: &str) -> Result<Option<Vid>> {
1869        let index = self.uid_index(label)?;
1870        index.get_vid(uid).await
1871    }
1872
1873    #[cfg(feature = "lance-backend")]
1874    pub async fn insert_vertex_with_uid(&self, label: &str, vid: Vid, uid: UniId) -> Result<()> {
1875        let index = self.uid_index(label)?;
1876        index.write_mapping(&[(uid, vid)]).await
1877    }
1878
1879    pub async fn load_subgraph(
1880        &self,
1881        start_vids: &[Vid],
1882        edge_types: &[u32],
1883        max_hops: usize,
1884        direction: GraphDirection,
1885        l0: Option<&L0Buffer>,
1886    ) -> Result<WorkingGraph> {
1887        let mut graph = WorkingGraph::new();
1888        let schema = self.schema_manager.schema();
1889
1890        // Build maps for ID lookups
1891        let label_map: HashMap<u16, String> = schema
1892            .labels
1893            .values()
1894            .map(|meta| {
1895                (
1896                    meta.id,
1897                    schema.label_name_by_id(meta.id).unwrap().to_owned(),
1898                )
1899            })
1900            .collect();
1901
1902        let edge_type_map: HashMap<u32, String> = schema
1903            .edge_types
1904            .values()
1905            .map(|meta| {
1906                (
1907                    meta.id,
1908                    schema.edge_type_name_by_id(meta.id).unwrap().to_owned(),
1909                )
1910            })
1911            .collect();
1912
1913        let target_edge_types: HashSet<u32> = edge_types.iter().cloned().collect();
1914
1915        // Initialize frontier
1916        let mut frontier: Vec<Vid> = start_vids.to_vec();
1917        let mut visited: HashSet<Vid> = HashSet::new();
1918
1919        // Add start vertices to graph
1920        for &vid in start_vids {
1921            graph.add_vertex(vid);
1922        }
1923
1924        for _hop in 0..max_hops {
1925            let mut next_frontier = HashSet::new();
1926
1927            for &vid in &frontier {
1928                if visited.contains(&vid) {
1929                    continue;
1930                }
1931                visited.insert(vid);
1932                graph.add_vertex(vid);
1933
1934                // For each edge type we want to traverse
1935                for &etype_id in &target_edge_types {
1936                    let etype_name = edge_type_map
1937                        .get(&etype_id)
1938                        .ok_or_else(|| anyhow!("Unknown edge type ID: {}", etype_id))?;
1939
1940                    // Determine directions
1941                    // Storage direction: "fwd" or "bwd".
1942                    // Query direction: Outgoing -> "fwd", Incoming -> "bwd".
1943                    let (dir_str, neighbor_is_dst) = match direction {
1944                        GraphDirection::Outgoing => ("fwd", true),
1945                        GraphDirection::Incoming => ("bwd", false),
1946                    };
1947
1948                    let mut edges: HashMap<Eid, EdgeState> = HashMap::new();
1949
1950                    // 1. L2: Adjacency (Base)
1951                    // In the new storage model, VIDs don't embed label info.
1952                    // We need to try all labels to find the adjacency data.
1953                    // Edge version from snapshot (reserved for future version filtering)
1954                    let _edge_ver = self
1955                        .pinned_snapshot
1956                        .as_ref()
1957                        .and_then(|s| s.edges.get(etype_name).map(|es| es.lance_version));
1958
1959                    // Try each label until we find adjacency data
1960                    let backend = self.backend();
1961                    for current_src_label in label_map.values() {
1962                        let adj_ds =
1963                            match self.adjacency_dataset(etype_name, current_src_label, dir_str) {
1964                                Ok(ds) => ds,
1965                                Err(_) => continue,
1966                            };
1967                        if let Some((neighbors, eids)) =
1968                            adj_ds.read_adjacency_backend(backend, vid).await?
1969                        {
1970                            for (n, eid) in neighbors.into_iter().zip(eids) {
1971                                edges.insert(
1972                                    eid,
1973                                    EdgeState {
1974                                        neighbor: n,
1975                                        version: 0,
1976                                        deleted: false,
1977                                    },
1978                                );
1979                            }
1980                            break; // Found adjacency data for this vid, no need to try other labels
1981                        }
1982                    }
1983
1984                    // 2. L1: Delta
1985                    let delta_ds = self.delta_dataset(etype_name, dir_str)?;
1986                    let delta_entries = delta_ds
1987                        .read_deltas(backend, vid, &schema, self.snapshot_version_hwm())
1988                        .await?;
1989                    Self::apply_delta_to_edges(&mut edges, delta_entries, neighbor_is_dst);
1990
1991                    // 3. L0: Buffer
1992                    if let Some(l0) = l0 {
1993                        Self::apply_l0_to_edges(&mut edges, l0, vid, etype_id, direction);
1994                    }
1995
1996                    // Add resulting edges to graph
1997                    Self::add_edges_to_graph(
1998                        &mut graph,
1999                        edges,
2000                        vid,
2001                        etype_id,
2002                        neighbor_is_dst,
2003                        &visited,
2004                        &mut next_frontier,
2005                    );
2006                }
2007            }
2008            frontier = next_frontier.into_iter().collect();
2009
2010            // Early termination: if frontier is empty, no more vertices to explore
2011            if frontier.is_empty() {
2012                break;
2013            }
2014        }
2015
2016        Ok(graph)
2017    }
2018
2019    /// Apply delta entries to edge state map, handling version conflicts.
2020    fn apply_delta_to_edges(
2021        edges: &mut HashMap<Eid, EdgeState>,
2022        delta_entries: Vec<crate::storage::delta::L1Entry>,
2023        neighbor_is_dst: bool,
2024    ) {
2025        for entry in delta_entries {
2026            let neighbor = if neighbor_is_dst {
2027                entry.dst_vid
2028            } else {
2029                entry.src_vid
2030            };
2031            let current_ver = edges.get(&entry.eid).map(|s| s.version).unwrap_or(0);
2032
2033            if entry.version > current_ver {
2034                edges.insert(
2035                    entry.eid,
2036                    EdgeState {
2037                        neighbor,
2038                        version: entry.version,
2039                        deleted: matches!(entry.op, Op::Delete),
2040                    },
2041                );
2042            }
2043        }
2044    }
2045
2046    /// Apply L0 buffer edges and tombstones to edge state map.
2047    fn apply_l0_to_edges(
2048        edges: &mut HashMap<Eid, EdgeState>,
2049        l0: &L0Buffer,
2050        vid: Vid,
2051        etype_id: u32,
2052        direction: GraphDirection,
2053    ) {
2054        let l0_neighbors = l0.get_neighbors(vid, etype_id, direction);
2055        for (neighbor, eid, ver) in l0_neighbors {
2056            let current_ver = edges.get(&eid).map(|s| s.version).unwrap_or(0);
2057            if ver > current_ver {
2058                edges.insert(
2059                    eid,
2060                    EdgeState {
2061                        neighbor,
2062                        version: ver,
2063                        deleted: false,
2064                    },
2065                );
2066            }
2067        }
2068
2069        // Check tombstones in L0
2070        for (eid, state) in edges.iter_mut() {
2071            if l0.is_tombstoned(*eid) {
2072                state.deleted = true;
2073            }
2074        }
2075    }
2076
2077    /// Add non-deleted edges to graph and collect next frontier.
2078    fn add_edges_to_graph(
2079        graph: &mut WorkingGraph,
2080        edges: HashMap<Eid, EdgeState>,
2081        vid: Vid,
2082        etype_id: u32,
2083        neighbor_is_dst: bool,
2084        visited: &HashSet<Vid>,
2085        next_frontier: &mut HashSet<Vid>,
2086    ) {
2087        for (eid, state) in edges {
2088            if state.deleted {
2089                continue;
2090            }
2091            graph.add_vertex(state.neighbor);
2092
2093            if !visited.contains(&state.neighbor) {
2094                next_frontier.insert(state.neighbor);
2095            }
2096
2097            if neighbor_is_dst {
2098                graph.add_edge(vid, state.neighbor, eid, etype_id);
2099            } else {
2100                graph.add_edge(state.neighbor, vid, eid, etype_id);
2101            }
2102        }
2103    }
2104}
2105
2106/// Extracts `(Vid, f32)` pairs from record batches using the given VID and score column names.
2107fn extract_vid_score_pairs(
2108    batches: &[arrow_array::RecordBatch],
2109    vid_column: &str,
2110    score_column: &str,
2111) -> Result<Vec<(Vid, f32)>> {
2112    let mut results = Vec::new();
2113    for batch in batches {
2114        let vid_col = batch
2115            .column_by_name(vid_column)
2116            .ok_or_else(|| anyhow!("Missing {} column", vid_column))?
2117            .as_any()
2118            .downcast_ref::<UInt64Array>()
2119            .ok_or_else(|| anyhow!("Invalid {} column type", vid_column))?;
2120
2121        let score_col = batch
2122            .column_by_name(score_column)
2123            .ok_or_else(|| anyhow!("Missing {} column", score_column))?
2124            .as_any()
2125            .downcast_ref::<Float32Array>()
2126            .ok_or_else(|| anyhow!("Invalid {} column type", score_column))?;
2127
2128        for i in 0..batch.num_rows() {
2129            results.push((Vid::from(vid_col.value(i)), score_col.value(i)));
2130        }
2131    }
2132    Ok(results)
2133}
2134
2135/// Extracts a `Vec<f32>` from a JSON property value.
2136///
2137/// Returns `None` if the property is missing, not an array, or contains
2138/// non-numeric elements.
2139fn extract_embedding_from_props(
2140    props: &uni_common::Properties,
2141    property: &str,
2142) -> Option<Vec<f32>> {
2143    let arr = props.get(property)?.as_array()?;
2144    arr.iter().map(|v| v.as_f64().map(|f| f as f32)).collect()
2145}
2146
2147/// Merges L0 buffer vertices into LanceDB vector search results.
2148///
2149/// Visits L0 buffers in precedence order (pending flush → main → transaction),
2150/// collects tombstoned VIDs and candidate embeddings, then merges them with the
2151/// existing LanceDB results so that:
2152/// - Tombstoned VIDs are removed (unless re-created in a later L0).
2153/// - VIDs present in both L0 and LanceDB use the L0 distance.
2154/// - New L0-only VIDs are appended.
2155/// - Results are re-sorted by distance ascending and truncated to `k`.
2156fn merge_l0_into_vector_results(
2157    results: &mut Vec<(Vid, f32)>,
2158    ctx: &QueryContext,
2159    label: &str,
2160    property: &str,
2161    query: &[f32],
2162    k: usize,
2163    metric: &DistanceMetric,
2164) {
2165    // Collect all L0 buffers in precedence order (earliest first, last writer wins).
2166    let mut buffers: Vec<Arc<parking_lot::RwLock<L0Buffer>>> =
2167        ctx.pending_flush_l0s.iter().map(Arc::clone).collect();
2168    buffers.push(Arc::clone(&ctx.l0));
2169    if let Some(ref txn) = ctx.transaction_l0 {
2170        buffers.push(Arc::clone(txn));
2171    }
2172
2173    // Maps VID → distance for L0 candidates (last writer wins).
2174    let mut l0_candidates: HashMap<Vid, f32> = HashMap::new();
2175    // Tombstoned VIDs across all L0 buffers.
2176    let mut tombstoned: HashSet<Vid> = HashSet::new();
2177
2178    for buf_arc in &buffers {
2179        let buf = buf_arc.read();
2180
2181        // Accumulate tombstones.
2182        for &vid in &buf.vertex_tombstones {
2183            tombstoned.insert(vid);
2184        }
2185
2186        // Scan vertices with the target label.
2187        for (&vid, labels) in &buf.vertex_labels {
2188            if !labels.iter().any(|l| l == label) {
2189                continue;
2190            }
2191            if let Some(props) = buf.vertex_properties.get(&vid)
2192                && let Some(emb) = extract_embedding_from_props(props, property)
2193            {
2194                if emb.len() != query.len() {
2195                    continue; // dimension mismatch
2196                }
2197                let dist = metric.compute_distance(&emb, query);
2198                // Last writer wins: later buffer overwrites earlier.
2199                l0_candidates.insert(vid, dist);
2200                // If re-created in a later L0, remove from tombstones.
2201                tombstoned.remove(&vid);
2202            }
2203        }
2204    }
2205
2206    // If no L0 activity affects this search, skip merge.
2207    if l0_candidates.is_empty() && tombstoned.is_empty() {
2208        return;
2209    }
2210
2211    // Remove tombstoned VIDs from LanceDB results.
2212    results.retain(|(vid, _)| !tombstoned.contains(vid));
2213
2214    // Overwrite or append L0 candidates.
2215    for (vid, dist) in &l0_candidates {
2216        if let Some(existing) = results.iter_mut().find(|(v, _)| v == vid) {
2217            existing.1 = *dist;
2218        } else {
2219            results.push((*vid, *dist));
2220        }
2221    }
2222
2223    // Re-sort by distance ascending.
2224    results.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
2225    results.truncate(k);
2226}
2227
2228/// Computes a simple token-overlap relevance score between a query and text.
2229///
2230/// Returns the fraction of query tokens found in the text (case-insensitive),
2231/// producing a score in [0.0, 1.0]. Sufficient for the small L0 buffer.
2232fn compute_text_relevance(query: &str, text: &str) -> f32 {
2233    let query_tokens: HashSet<String> =
2234        query.split_whitespace().map(|t| t.to_lowercase()).collect();
2235    if query_tokens.is_empty() {
2236        return 0.0;
2237    }
2238    let text_tokens: HashSet<String> = text.split_whitespace().map(|t| t.to_lowercase()).collect();
2239    let hits = query_tokens
2240        .iter()
2241        .filter(|t| text_tokens.contains(t.as_str()))
2242        .count();
2243    hits as f32 / query_tokens.len() as f32
2244}
2245
2246/// Extracts a string slice from a property value.
2247fn extract_text_from_props<'a>(
2248    props: &'a uni_common::Properties,
2249    property: &str,
2250) -> Option<&'a str> {
2251    props.get(property)?.as_str()
2252}
2253
2254/// Merges L0 buffer vertices into LanceDB full-text search results.
2255///
2256/// Follows the same pattern as [`merge_l0_into_vector_results`]: visits L0
2257/// buffers in precedence order, collects tombstoned VIDs and text-match
2258/// candidates, then merges them so that:
2259/// - Tombstoned VIDs are removed (unless re-created in a later L0).
2260/// - VIDs present in both L0 and LanceDB use the L0 score.
2261/// - New L0-only VIDs are appended.
2262/// - Results are re-sorted by score **descending** and truncated to `k`.
2263fn merge_l0_into_fts_results(
2264    results: &mut Vec<(Vid, f32)>,
2265    ctx: &QueryContext,
2266    label: &str,
2267    property: &str,
2268    query: &str,
2269    k: usize,
2270) {
2271    // Collect all L0 buffers in precedence order (earliest first, last writer wins).
2272    let mut buffers: Vec<Arc<parking_lot::RwLock<L0Buffer>>> =
2273        ctx.pending_flush_l0s.iter().map(Arc::clone).collect();
2274    buffers.push(Arc::clone(&ctx.l0));
2275    if let Some(ref txn) = ctx.transaction_l0 {
2276        buffers.push(Arc::clone(txn));
2277    }
2278
2279    // Maps VID → relevance score for L0 candidates (last writer wins).
2280    let mut l0_candidates: HashMap<Vid, f32> = HashMap::new();
2281    // Tombstoned VIDs across all L0 buffers.
2282    let mut tombstoned: HashSet<Vid> = HashSet::new();
2283
2284    for buf_arc in &buffers {
2285        let buf = buf_arc.read();
2286
2287        // Accumulate tombstones.
2288        for &vid in &buf.vertex_tombstones {
2289            tombstoned.insert(vid);
2290        }
2291
2292        // Scan vertices with the target label.
2293        for (&vid, labels) in &buf.vertex_labels {
2294            if !labels.iter().any(|l| l == label) {
2295                continue;
2296            }
2297            if let Some(props) = buf.vertex_properties.get(&vid)
2298                && let Some(text) = extract_text_from_props(props, property)
2299            {
2300                let score = compute_text_relevance(query, text);
2301                if score > 0.0 {
2302                    // Last writer wins: later buffer overwrites earlier.
2303                    l0_candidates.insert(vid, score);
2304                }
2305                // If re-created in a later L0, remove from tombstones.
2306                tombstoned.remove(&vid);
2307            }
2308        }
2309    }
2310
2311    // If no L0 activity affects this search, skip merge.
2312    if l0_candidates.is_empty() && tombstoned.is_empty() {
2313        return;
2314    }
2315
2316    // Remove tombstoned VIDs from LanceDB results.
2317    results.retain(|(vid, _)| !tombstoned.contains(vid));
2318
2319    // Overwrite or append L0 candidates.
2320    for (vid, score) in &l0_candidates {
2321        if let Some(existing) = results.iter_mut().find(|(v, _)| v == vid) {
2322            existing.1 = *score;
2323        } else {
2324            results.push((*vid, *score));
2325        }
2326    }
2327
2328    // Re-sort by score descending (higher relevance first).
2329    results.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
2330    results.truncate(k);
2331}