Skip to main content

uni_store/storage/
manager.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::backend::StorageBackend;
5#[cfg(feature = "lance-backend")]
6use crate::backend::lance::LanceDbBackend;
7use crate::backend::table_names;
8use crate::backend::types::ScanRequest;
9use crate::compaction::{CompactionStats, CompactionStatus, CompactionTask};
10use crate::runtime::WorkingGraph;
11use crate::runtime::context::QueryContext;
12use crate::runtime::l0::L0Buffer;
13use crate::storage::adjacency::AdjacencyDataset;
14use crate::storage::compaction::Compactor;
15use crate::storage::delta::{DeltaDataset, ENTRY_SIZE_ESTIMATE, Op};
16use crate::storage::direction::Direction;
17#[cfg(feature = "lance-backend")]
18use crate::storage::edge::EdgeDataset;
19#[cfg(feature = "lance-backend")]
20use crate::storage::index::UidIndex;
21#[cfg(feature = "lance-backend")]
22use crate::storage::inverted_index::InvertedIndex;
23use crate::storage::main_edge::MainEdgeDataset;
24use crate::storage::main_vertex::MainVertexDataset;
25use crate::storage::vertex::VertexDataset;
26use anyhow::{Result, anyhow};
27use arrow_array::{Array, Float32Array, TimestampNanosecondArray, UInt64Array};
28use object_store::ObjectStore;
29#[cfg(feature = "lance-backend")]
30use object_store::local::LocalFileSystem;
31use parking_lot::RwLock;
32use std::collections::{HashMap, HashSet};
33use std::sync::{Arc, Mutex};
34use std::time::{Duration, SystemTime, UNIX_EPOCH};
35use tracing::warn;
36use uni_common::config::UniConfig;
37#[cfg(feature = "lance-backend")]
38use uni_common::core::id::UniId;
39use uni_common::core::id::{Eid, Vid};
40#[cfg(feature = "lance-backend")]
41use uni_common::core::schema::IndexDefinition;
42use uni_common::core::schema::{DistanceMetric, SchemaManager};
43use uni_common::sync::acquire_mutex;
44
45use crate::snapshot::manager::SnapshotManager;
46use crate::storage::IndexManager;
47use crate::storage::adjacency_manager::AdjacencyManager;
48use crate::storage::resilient_store::ResilientObjectStore;
49
50use uni_common::core::snapshot::SnapshotManifest;
51
52use uni_common::graph::simple_graph::Direction as GraphDirection;
53
54/// Edge state during subgraph loading - tracks version and deletion status.
55struct EdgeState {
56    neighbor: Vid,
57    version: u64,
58    deleted: bool,
59}
60
61pub struct StorageManager {
62    base_uri: String,
63    store: Arc<dyn ObjectStore>,
64    schema_manager: Arc<SchemaManager>,
65    snapshot_manager: Arc<SnapshotManager>,
66    adjacency_manager: Arc<AdjacencyManager>,
67    pub config: UniConfig,
68    pub compaction_status: Arc<Mutex<CompactionStatus>>,
69    /// Optional pinned snapshot for time-travel
70    pinned_snapshot: Option<SnapshotManifest>,
71    /// Pluggable storage backend.
72    backend: Arc<dyn StorageBackend>,
73    /// In-memory VID-to-labels index for O(1) lookups (optional, configurable)
74    vid_labels_index: Option<Arc<parking_lot::RwLock<crate::storage::vid_labels::VidLabelsIndex>>>,
75}
76
77/// Helper to manage compaction_in_progress flag
78struct CompactionGuard {
79    status: Arc<Mutex<CompactionStatus>>,
80}
81
82impl CompactionGuard {
83    fn new(status: Arc<Mutex<CompactionStatus>>) -> Option<Self> {
84        let mut s = acquire_mutex(&status, "compaction_status").ok()?;
85        if s.compaction_in_progress {
86            return None;
87        }
88        s.compaction_in_progress = true;
89        Some(Self {
90            status: status.clone(),
91        })
92    }
93}
94
95impl Drop for CompactionGuard {
96    fn drop(&mut self) {
97        // CRITICAL: Never panic in Drop - panicking in drop() = process ABORT.
98        // See issue #18/#150. If the lock is poisoned, log and continue gracefully.
99        match uni_common::sync::acquire_mutex(&self.status, "compaction_status") {
100            Ok(mut s) => {
101                s.compaction_in_progress = false;
102                s.last_compaction = Some(std::time::SystemTime::now());
103            }
104            Err(e) => {
105                // Lock is poisoned but we're in Drop - cannot panic.
106                // Log the error and continue. System state may be inconsistent but at least
107                // we don't abort the process.
108                log::error!(
109                    "CompactionGuard drop failed to acquire poisoned lock: {}. \
110                     Compaction status may be inconsistent. Issue #18/#150",
111                    e
112                );
113            }
114        }
115    }
116}
117
118impl StorageManager {
119    /// Create a new StorageManager with a pre-configured backend.
120    pub async fn new_with_backend(
121        base_uri: &str,
122        store: Arc<dyn ObjectStore>,
123        backend: Arc<dyn StorageBackend>,
124        schema_manager: Arc<SchemaManager>,
125        config: UniConfig,
126    ) -> Result<Self> {
127        let resilient_store: Arc<dyn ObjectStore> = Arc::new(ResilientObjectStore::new(
128            store,
129            config.object_store.clone(),
130        ));
131
132        let snapshot_manager = Arc::new(SnapshotManager::new(resilient_store.clone()));
133
134        // Perform crash recovery for all known table patterns
135        Self::recover_all_staging_tables(backend.as_ref(), &schema_manager).await?;
136
137        let mut sm = Self {
138            base_uri: base_uri.to_string(),
139            store: resilient_store,
140            schema_manager,
141            snapshot_manager,
142            adjacency_manager: Arc::new(AdjacencyManager::new(config.cache_size)),
143            config,
144            compaction_status: Arc::new(Mutex::new(CompactionStatus::default())),
145            pinned_snapshot: None,
146            backend,
147            vid_labels_index: None,
148        };
149
150        // Rebuild VidLabelsIndex if enabled
151        if sm.config.enable_vid_labels_index
152            && let Err(e) = sm.rebuild_vid_labels_index().await
153        {
154            warn!(
155                "Failed to rebuild VidLabelsIndex on startup: {}. Falling back to storage queries.",
156                e
157            );
158        }
159
160        Ok(sm)
161    }
162
163    /// Create a new StorageManager with LanceDB integration.
164    #[cfg(feature = "lance-backend")]
165    pub async fn new(base_uri: &str, schema_manager: Arc<SchemaManager>) -> Result<Self> {
166        Self::new_with_config(base_uri, schema_manager, UniConfig::default()).await
167    }
168
169    /// Create a new StorageManager with custom cache size.
170    #[cfg(feature = "lance-backend")]
171    pub async fn new_with_cache(
172        base_uri: &str,
173        schema_manager: Arc<SchemaManager>,
174        adjacency_cache_size: usize,
175    ) -> Result<Self> {
176        let config = UniConfig {
177            cache_size: adjacency_cache_size,
178            ..Default::default()
179        };
180        Self::new_with_config(base_uri, schema_manager, config).await
181    }
182
183    /// Create a new StorageManager with custom configuration.
184    #[cfg(feature = "lance-backend")]
185    pub async fn new_with_config(
186        base_uri: &str,
187        schema_manager: Arc<SchemaManager>,
188        config: UniConfig,
189    ) -> Result<Self> {
190        let store = Self::build_store_from_uri(base_uri)?;
191        Self::new_with_store_and_config(base_uri, store, schema_manager, config).await
192    }
193
194    /// Create a new StorageManager using an already-constructed object store.
195    #[cfg(feature = "lance-backend")]
196    pub async fn new_with_store_and_config(
197        base_uri: &str,
198        store: Arc<dyn ObjectStore>,
199        schema_manager: Arc<SchemaManager>,
200        config: UniConfig,
201    ) -> Result<Self> {
202        Self::new_with_store_and_storage_options(base_uri, store, schema_manager, config, None)
203            .await
204    }
205
206    /// Create a new StorageManager with LanceDB storage options.
207    #[cfg(feature = "lance-backend")]
208    pub async fn new_with_store_and_storage_options(
209        base_uri: &str,
210        store: Arc<dyn ObjectStore>,
211        schema_manager: Arc<SchemaManager>,
212        config: UniConfig,
213        lancedb_storage_options: Option<HashMap<String, String>>,
214    ) -> Result<Self> {
215        let backend = Arc::new(LanceDbBackend::connect(base_uri, lancedb_storage_options).await?);
216        Self::new_with_backend(base_uri, store, backend, schema_manager, config).await
217    }
218
219    /// Recover all staging tables for known table patterns.
220    ///
221    /// This runs on startup to handle crash recovery. It checks for staging tables
222    /// for all vertex labels, adjacency tables, delta tables, and main tables.
223    async fn recover_all_staging_tables(
224        backend: &dyn StorageBackend,
225        schema_manager: &SchemaManager,
226    ) -> Result<()> {
227        let schema = schema_manager.schema();
228
229        // Recover main vertex and edge tables
230        backend
231            .recover_staging(table_names::main_vertex_table_name())
232            .await?;
233        backend
234            .recover_staging(table_names::main_edge_table_name())
235            .await?;
236
237        // Recover per-label vertex tables
238        for label in schema.labels.keys() {
239            let name = table_names::vertex_table_name(label);
240            backend.recover_staging(&name).await?;
241        }
242
243        // Recover adjacency and delta tables for each edge type and direction
244        for edge_type in schema.edge_types.keys() {
245            for direction in &["fwd", "bwd"] {
246                // Recover delta tables
247                let delta_name = table_names::delta_table_name(edge_type, direction);
248                backend.recover_staging(&delta_name).await?;
249
250                // Recover adjacency tables for each label
251                for _label in schema.labels.keys() {
252                    let adj_name = table_names::adjacency_table_name(edge_type, direction);
253                    backend.recover_staging(&adj_name).await?;
254                }
255            }
256        }
257
258        Ok(())
259    }
260
261    #[cfg(feature = "lance-backend")]
262    fn build_store_from_uri(base_uri: &str) -> Result<Arc<dyn ObjectStore>> {
263        if base_uri.contains("://") {
264            let parsed = url::Url::parse(base_uri).map_err(|e| anyhow!("Invalid base URI: {e}"))?;
265            let (store, _path) = object_store::parse_url(&parsed)
266                .map_err(|e| anyhow!("Failed to parse object store URL: {e}"))?;
267            Ok(Arc::from(store))
268        } else {
269            // If local path, ensure it exists.
270            std::fs::create_dir_all(base_uri)?;
271            Ok(Arc::new(LocalFileSystem::new_with_prefix(base_uri)?))
272        }
273    }
274
275    pub fn pinned(&self, snapshot: SnapshotManifest) -> Self {
276        Self {
277            base_uri: self.base_uri.clone(),
278            store: self.store.clone(),
279            schema_manager: self.schema_manager.clone(),
280            snapshot_manager: self.snapshot_manager.clone(),
281            // Separate AdjacencyManager for snapshot isolation (Issue #73):
282            // warm() will load only edges visible at the snapshot's HWM.
283            // This prevents live DB's CSR (with all edges) from leaking into snapshots.
284            adjacency_manager: Arc::new(AdjacencyManager::new(self.adjacency_manager.max_bytes())),
285            config: self.config.clone(),
286            compaction_status: Arc::new(Mutex::new(CompactionStatus::default())),
287            pinned_snapshot: Some(snapshot),
288            backend: self.backend.clone(),
289            vid_labels_index: self.vid_labels_index.clone(),
290        }
291    }
292
293    pub fn get_edge_version_by_id(&self, edge_type_id: u32) -> Option<u64> {
294        let schema = self.schema_manager.schema();
295        let name = schema.edge_type_name_by_id(edge_type_id)?;
296        self.pinned_snapshot
297            .as_ref()
298            .and_then(|s| s.edges.get(name).map(|es| es.lance_version))
299    }
300
301    /// Returns the version_high_water_mark from the pinned snapshot if present.
302    ///
303    /// This is used for time-travel queries to filter data by version.
304    /// When a snapshot is pinned, only rows with `_version <= version_high_water_mark`
305    /// should be considered visible.
306    pub fn version_high_water_mark(&self) -> Option<u64> {
307        self.pinned_snapshot
308            .as_ref()
309            .map(|s| s.version_high_water_mark)
310    }
311
312    /// Apply version filtering to a base filter expression.
313    ///
314    /// If a snapshot is pinned, wraps `base_filter` with an additional
315    /// `_version <= hwm` clause. Otherwise returns `base_filter` unchanged.
316    pub fn apply_version_filter(&self, base_filter: String) -> String {
317        if let Some(hwm) = self.version_high_water_mark() {
318            format!("({}) AND (_version <= {})", base_filter, hwm)
319        } else {
320            base_filter
321        }
322    }
323
324    /// Build a filter expression that excludes soft-deleted rows and optionally
325    /// includes a user-provided filter.
326    fn build_active_filter(user_filter: Option<&str>) -> String {
327        match user_filter {
328            Some(expr) => format!("({}) AND (_deleted = false)", expr),
329            None => "_deleted = false".to_string(),
330        }
331    }
332
333    pub fn store(&self) -> Arc<dyn ObjectStore> {
334        self.store.clone()
335    }
336
337    /// Get current compaction status.
338    ///
339    /// # Errors
340    ///
341    /// Returns error if the compaction status lock is poisoned (see issue #18/#150).
342    pub fn compaction_status(
343        &self,
344    ) -> Result<CompactionStatus, uni_common::sync::LockPoisonedError> {
345        let guard = uni_common::sync::acquire_mutex(&self.compaction_status, "compaction_status")?;
346        Ok(guard.clone())
347    }
348
349    pub async fn compact(&self) -> Result<CompactionStats> {
350        // Backend handles compaction internally via optimize_table()
351        let start = std::time::Instant::now();
352        let schema = self.schema_manager.schema();
353        let mut files_compacted = 0;
354
355        for label in schema.labels.keys() {
356            let name = table_names::vertex_table_name(label);
357            if self.backend.table_exists(&name).await? {
358                self.backend.optimize_table(&name).await?;
359                files_compacted += 1;
360                self.backend.invalidate_cache(&name);
361            }
362        }
363
364        Ok(CompactionStats {
365            files_compacted,
366            bytes_before: 0,
367            bytes_after: 0,
368            duration: start.elapsed(),
369            crdt_merges: 0,
370        })
371    }
372
373    pub async fn compact_label(&self, label: &str) -> Result<CompactionStats> {
374        let _guard = CompactionGuard::new(self.compaction_status.clone())
375            .ok_or_else(|| anyhow!("Compaction already in progress"))?;
376
377        let start = std::time::Instant::now();
378        let name = table_names::vertex_table_name(label);
379
380        if self.backend.table_exists(&name).await? {
381            self.backend.optimize_table(&name).await?;
382            self.backend.invalidate_cache(&name);
383        }
384
385        Ok(CompactionStats {
386            files_compacted: 1,
387            bytes_before: 0,
388            bytes_after: 0,
389            duration: start.elapsed(),
390            crdt_merges: 0,
391        })
392    }
393
394    pub async fn compact_edge_type(&self, edge_type: &str) -> Result<CompactionStats> {
395        let _guard = CompactionGuard::new(self.compaction_status.clone())
396            .ok_or_else(|| anyhow!("Compaction already in progress"))?;
397
398        let start = std::time::Instant::now();
399        let mut files_compacted = 0;
400
401        for dir in ["fwd", "bwd"] {
402            let name = table_names::delta_table_name(edge_type, dir);
403            if self.backend.table_exists(&name).await? {
404                self.backend.optimize_table(&name).await?;
405                files_compacted += 1;
406            }
407        }
408
409        Ok(CompactionStats {
410            files_compacted,
411            bytes_before: 0,
412            bytes_after: 0,
413            duration: start.elapsed(),
414            crdt_merges: 0,
415        })
416    }
417
418    pub async fn wait_for_compaction(&self) -> Result<()> {
419        loop {
420            let in_progress = {
421                acquire_mutex(&self.compaction_status, "compaction_status")?.compaction_in_progress
422            };
423            if !in_progress {
424                return Ok(());
425            }
426            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
427        }
428    }
429
430    pub fn start_background_compaction(
431        self: Arc<Self>,
432        mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
433    ) -> tokio::task::JoinHandle<()> {
434        if !self.config.compaction.enabled {
435            return tokio::spawn(async {});
436        }
437
438        tokio::spawn(async move {
439            // Use interval_at to delay the first tick. tokio::time::interval fires
440            // immediately on the first tick, which can race with queries that run
441            // right after database open. Delaying by the check_interval gives
442            // initial queries time to complete before compaction modifies tables
443            // (optimize(All) can GC index files that concurrent queries depend on).
444            let start = tokio::time::Instant::now() + self.config.compaction.check_interval;
445            let mut interval =
446                tokio::time::interval_at(start, self.config.compaction.check_interval);
447
448            loop {
449                tokio::select! {
450                    _ = interval.tick() => {
451                        if let Err(e) = self.update_compaction_status().await {
452                            log::error!("Failed to update compaction status: {}", e);
453                            continue;
454                        }
455
456                        if let Some(task) = self.pick_compaction_task() {
457                            log::info!("Triggering background compaction: {:?}", task);
458                            if let Err(e) = Self::execute_compaction(Arc::clone(&self), task).await {
459                                log::error!("Compaction failed: {}", e);
460                            }
461                        }
462                    }
463                    _ = shutdown_rx.recv() => {
464                        log::info!("Background compaction shutting down");
465                        let _ = self.wait_for_compaction().await;
466                        break;
467                    }
468                }
469            }
470        })
471    }
472
473    async fn update_compaction_status(&self) -> Result<()> {
474        let schema = self.schema_manager.schema();
475        let backend = self.backend.as_ref();
476        let mut total_tables = 0;
477        let mut total_rows: usize = 0;
478        let mut oldest_ts: Option<i64> = None;
479
480        for name in schema.edge_types.keys() {
481            for dir in ["fwd", "bwd"] {
482                let tbl_name = table_names::delta_table_name(name, dir);
483                if !backend.table_exists(&tbl_name).await.unwrap_or(false) {
484                    continue;
485                }
486                let row_count = backend.count_rows(&tbl_name, None).await.unwrap_or(0);
487                if row_count == 0 {
488                    continue;
489                }
490                total_tables += 1;
491                total_rows += row_count;
492
493                // Query oldest _created_at for age tracking
494                let request =
495                    ScanRequest::all(&tbl_name).with_columns(vec!["_created_at".to_string()]);
496                let Ok(batches) = backend.scan(request).await else {
497                    continue;
498                };
499                for batch in batches {
500                    let Some(col) = batch
501                        .column_by_name("_created_at")
502                        .and_then(|c| c.as_any().downcast_ref::<TimestampNanosecondArray>())
503                    else {
504                        continue;
505                    };
506                    for i in 0..col.len() {
507                        if !col.is_null(i) {
508                            let ts = col.value(i);
509                            oldest_ts = Some(oldest_ts.map_or(ts, |prev| prev.min(ts)));
510                        }
511                    }
512                }
513            }
514        }
515
516        let oldest_l1_age = oldest_ts
517            .and_then(|ts| {
518                let created = UNIX_EPOCH + Duration::from_nanos(ts as u64);
519                SystemTime::now().duration_since(created).ok()
520            })
521            .unwrap_or(Duration::ZERO);
522
523        let mut status = acquire_mutex(&self.compaction_status, "compaction_status")?;
524        status.l1_runs = total_tables;
525        status.l1_size_bytes = (total_rows * ENTRY_SIZE_ESTIMATE) as u64;
526        status.oldest_l1_age = oldest_l1_age;
527        Ok(())
528    }
529
530    fn pick_compaction_task(&self) -> Option<CompactionTask> {
531        let status = acquire_mutex(&self.compaction_status, "compaction_status").ok()?;
532
533        if status.l1_runs >= self.config.compaction.max_l1_runs {
534            return Some(CompactionTask::ByRunCount);
535        }
536        if status.l1_size_bytes >= self.config.compaction.max_l1_size_bytes {
537            return Some(CompactionTask::BySize);
538        }
539        if status.oldest_l1_age >= self.config.compaction.max_l1_age
540            && status.oldest_l1_age > Duration::ZERO
541        {
542            return Some(CompactionTask::ByAge);
543        }
544
545        None
546    }
547
548    /// Optimize a table via the backend, returning `true` on success.
549    async fn try_optimize_table(backend: &dyn StorageBackend, table_name: &str) -> bool {
550        if let Err(e) = backend.optimize_table(table_name).await {
551            log::warn!("Failed to optimize table {}: {}", table_name, e);
552            return false;
553        }
554        true
555    }
556
557    async fn execute_compaction(this: Arc<Self>, _task: CompactionTask) -> Result<CompactionStats> {
558        let start = std::time::Instant::now();
559        let _guard = CompactionGuard::new(this.compaction_status.clone())
560            .ok_or_else(|| anyhow!("Compaction already in progress"))?;
561
562        let schema = this.schema_manager.schema();
563        let mut files_compacted = 0;
564
565        // ── Tier 2: Semantic compaction ──
566        // Dedup vertices, merge CRDTs, consolidate L1→L2 deltas, clean tombstones
567        let compactor = Compactor::new(Arc::clone(&this));
568        let compaction_results = compactor.compact_all().await.unwrap_or_else(|e| {
569            log::error!(
570                "Semantic compaction failed (continuing with backend optimize): {}",
571                e
572            );
573            Vec::new()
574        });
575
576        // Re-warm adjacency CSR after semantic compaction
577        let am = this.adjacency_manager();
578        for info in &compaction_results {
579            let direction = match info.direction.as_str() {
580                "fwd" => Direction::Outgoing,
581                "bwd" => Direction::Incoming,
582                _ => continue,
583            };
584            if let Some(etid) = schema.edge_type_id_unified_case_insensitive(&info.edge_type)
585                && let Err(e) = am.warm(&this, etid, direction, None).await
586            {
587                log::warn!(
588                    "Failed to re-warm adjacency for {}/{}: {}",
589                    info.edge_type,
590                    info.direction,
591                    e
592                );
593            }
594        }
595
596        // ── Tier 3: Backend optimize ──
597        let backend = this.backend.as_ref();
598
599        // Optimize edge delta and adjacency tables
600        for name in schema.edge_types.keys() {
601            for dir in ["fwd", "bwd"] {
602                let delta = table_names::delta_table_name(name, dir);
603                if Self::try_optimize_table(backend, &delta).await {
604                    files_compacted += 1;
605                }
606                let adj = table_names::adjacency_table_name(name, dir);
607                if Self::try_optimize_table(backend, &adj).await {
608                    files_compacted += 1;
609                }
610            }
611        }
612
613        // Optimize vertex tables
614        for label in schema.labels.keys() {
615            let tbl = table_names::vertex_table_name(label);
616            if Self::try_optimize_table(backend, &tbl).await {
617                files_compacted += 1;
618                backend.invalidate_cache(&tbl);
619            }
620        }
621
622        // Optimize main vertex and edge tables
623        for tbl in [
624            table_names::main_vertex_table_name(),
625            table_names::main_edge_table_name(),
626        ] {
627            if Self::try_optimize_table(backend, tbl).await {
628                files_compacted += 1;
629            }
630        }
631
632        {
633            let mut status = acquire_mutex(&this.compaction_status, "compaction_status")?;
634            status.total_compactions += 1;
635        }
636
637        Ok(CompactionStats {
638            files_compacted,
639            bytes_before: 0,
640            bytes_after: 0,
641            duration: start.elapsed(),
642            crdt_merges: 0,
643        })
644    }
645
646    /// Open a LanceDB table for a label.
647    ///
648    /// Invalidate cached table state (call after writes).
649    pub fn invalidate_table_cache(&self, label: &str) {
650        let name = table_names::vertex_table_name(label);
651        self.backend.invalidate_cache(&name);
652    }
653
654    pub fn base_path(&self) -> &str {
655        &self.base_uri
656    }
657
658    pub fn schema_manager(&self) -> &SchemaManager {
659        &self.schema_manager
660    }
661
662    pub fn schema_manager_arc(&self) -> Arc<SchemaManager> {
663        self.schema_manager.clone()
664    }
665
666    /// Get the adjacency manager for the dual-CSR architecture.
667    pub fn adjacency_manager(&self) -> Arc<AdjacencyManager> {
668        Arc::clone(&self.adjacency_manager)
669    }
670
671    /// Warm the adjacency manager for a specific edge type and direction.
672    ///
673    /// Builds the Main CSR from L2 adjacency + L1 delta data in storage.
674    /// Called lazily on first access per edge type or at startup.
675    pub async fn warm_adjacency(
676        &self,
677        edge_type_id: u32,
678        direction: crate::storage::direction::Direction,
679        version: Option<u64>,
680    ) -> anyhow::Result<()> {
681        self.adjacency_manager
682            .warm(self, edge_type_id, direction, version)
683            .await
684    }
685
686    /// Coalesced warm_adjacency() to prevent cache stampede (Issue #13).
687    ///
688    /// Uses double-checked locking to ensure only one concurrent warm() per
689    /// (edge_type, direction) key. Subsequent callers wait for the first to complete.
690    pub async fn warm_adjacency_coalesced(
691        &self,
692        edge_type_id: u32,
693        direction: crate::storage::direction::Direction,
694        version: Option<u64>,
695    ) -> anyhow::Result<()> {
696        self.adjacency_manager
697            .warm_coalesced(self, edge_type_id, direction, version)
698            .await
699    }
700
701    /// Check whether the adjacency manager has a CSR for the given edge type and direction.
702    pub fn has_adjacency_csr(
703        &self,
704        edge_type_id: u32,
705        direction: crate::storage::direction::Direction,
706    ) -> bool {
707        self.adjacency_manager.has_csr(edge_type_id, direction)
708    }
709
710    /// Get neighbors at a specific version for snapshot queries.
711    pub fn get_neighbors_at_version(
712        &self,
713        vid: uni_common::core::id::Vid,
714        edge_type: u32,
715        direction: crate::storage::direction::Direction,
716        version: u64,
717    ) -> Vec<(uni_common::core::id::Vid, uni_common::core::id::Eid)> {
718        self.adjacency_manager
719            .get_neighbors_at_version(vid, edge_type, direction, version)
720    }
721
722    /// Get the storage backend.
723    pub fn backend(&self) -> &dyn StorageBackend {
724        self.backend.as_ref()
725    }
726
727    /// Get the storage backend as an Arc.
728    pub fn backend_arc(&self) -> Arc<dyn StorageBackend> {
729        self.backend.clone()
730    }
731
732    /// Rebuild the VidLabelsIndex from the main vertex table.
733    /// This is called on startup if enable_vid_labels_index is true.
734    async fn rebuild_vid_labels_index(&mut self) -> Result<()> {
735        use crate::storage::vid_labels::VidLabelsIndex;
736
737        let backend = self.backend.as_ref();
738        let vtable = table_names::main_vertex_table_name();
739
740        // Check if the table exists (fresh database)
741        if !backend.table_exists(vtable).await.unwrap_or(false) {
742            self.vid_labels_index = Some(Arc::new(parking_lot::RwLock::new(VidLabelsIndex::new())));
743            return Ok(());
744        }
745
746        // Scan all non-deleted vertices and collect (VID, labels)
747        let request = ScanRequest::all(vtable)
748            .with_filter("_deleted = false")
749            .with_limit(100_000);
750        let batches = backend
751            .scan(request)
752            .await
753            .map_err(|e| anyhow!("Failed to query main vertex table: {}", e))?;
754
755        let mut index = VidLabelsIndex::new();
756        for batch in batches {
757            let vid_col = batch
758                .column_by_name("_vid")
759                .ok_or_else(|| anyhow!("Missing _vid column"))?
760                .as_any()
761                .downcast_ref::<UInt64Array>()
762                .ok_or_else(|| anyhow!("Invalid _vid column type"))?;
763
764            let labels_col = batch
765                .column_by_name("labels")
766                .ok_or_else(|| anyhow!("Missing labels column"))?
767                .as_any()
768                .downcast_ref::<arrow_array::ListArray>()
769                .ok_or_else(|| anyhow!("Invalid labels column type"))?;
770
771            for row_idx in 0..batch.num_rows() {
772                let vid = Vid::from(vid_col.value(row_idx));
773                let labels_array = labels_col.value(row_idx);
774                let labels_str_array = labels_array
775                    .as_any()
776                    .downcast_ref::<arrow_array::StringArray>()
777                    .ok_or_else(|| anyhow!("Invalid labels array element type"))?;
778
779                let labels: Vec<String> = (0..labels_str_array.len())
780                    .map(|i| labels_str_array.value(i).to_string())
781                    .collect();
782
783                index.insert(vid, labels);
784            }
785        }
786
787        self.vid_labels_index = Some(Arc::new(parking_lot::RwLock::new(index)));
788        Ok(())
789    }
790
791    /// Get labels for a VID from the in-memory index.
792    /// Returns None if the index is disabled or the VID is not found.
793    pub fn get_labels_from_index(&self, vid: Vid) -> Option<Vec<String>> {
794        self.vid_labels_index.as_ref().and_then(|idx| {
795            let index = idx.read();
796            index.get_labels(vid).map(|labels| labels.to_vec())
797        })
798    }
799
800    /// Update the VID-to-labels mapping in the index.
801    /// No-op if the index is disabled.
802    pub fn update_vid_labels_index(&self, vid: Vid, labels: Vec<String>) {
803        if let Some(idx) = &self.vid_labels_index {
804            let mut index = idx.write();
805            index.insert(vid, labels);
806        }
807    }
808
809    /// Remove a VID from the labels index.
810    /// No-op if the index is disabled.
811    pub fn remove_from_vid_labels_index(&self, vid: Vid) {
812        if let Some(idx) = &self.vid_labels_index {
813            let mut index = idx.write();
814            index.remove_vid(vid);
815        }
816    }
817
818    pub async fn load_subgraph_cached(
819        &self,
820        start_vids: &[Vid],
821        edge_types: &[u32],
822        max_hops: usize,
823        direction: GraphDirection,
824        _l0: Option<Arc<RwLock<L0Buffer>>>,
825    ) -> Result<WorkingGraph> {
826        let mut graph = WorkingGraph::new();
827
828        let dir = match direction {
829            GraphDirection::Outgoing => crate::storage::direction::Direction::Outgoing,
830            GraphDirection::Incoming => crate::storage::direction::Direction::Incoming,
831        };
832
833        let neighbor_is_dst = matches!(direction, GraphDirection::Outgoing);
834
835        // Initialize frontier
836        let mut frontier: Vec<Vid> = start_vids.to_vec();
837        let mut visited: HashSet<Vid> = HashSet::new();
838
839        // Initialize start vids
840        for &vid in start_vids {
841            graph.add_vertex(vid);
842        }
843
844        for _hop in 0..max_hops {
845            let mut next_frontier = HashSet::new();
846
847            for &vid in &frontier {
848                if visited.contains(&vid) {
849                    continue;
850                }
851                visited.insert(vid);
852                graph.add_vertex(vid);
853
854                for &etype_id in edge_types {
855                    // Warm adjacency with coalescing to prevent cache stampede (Issue #13)
856                    let edge_ver = self.version_high_water_mark();
857                    self.adjacency_manager
858                        .warm_coalesced(self, etype_id, dir, edge_ver)
859                        .await?;
860
861                    // Get neighbors from AdjacencyManager (Main CSR + overlay)
862                    let edges = self.adjacency_manager.get_neighbors(vid, etype_id, dir);
863
864                    for (neighbor_vid, eid) in edges {
865                        graph.add_vertex(neighbor_vid);
866                        if !visited.contains(&neighbor_vid) {
867                            next_frontier.insert(neighbor_vid);
868                        }
869
870                        if neighbor_is_dst {
871                            graph.add_edge(vid, neighbor_vid, eid, etype_id);
872                        } else {
873                            graph.add_edge(neighbor_vid, vid, eid, etype_id);
874                        }
875                    }
876                }
877            }
878            frontier = next_frontier.into_iter().collect();
879
880            // Early termination: if frontier is empty, no more vertices to explore
881            if frontier.is_empty() {
882                break;
883            }
884        }
885
886        Ok(graph)
887    }
888
889    pub fn snapshot_manager(&self) -> &SnapshotManager {
890        &self.snapshot_manager
891    }
892
893    pub fn index_manager(&self) -> IndexManager {
894        IndexManager::new(
895            &self.base_uri,
896            self.schema_manager.clone(),
897            self.backend.clone(),
898        )
899    }
900
901    // ========================================================================
902    // Domain-level scan methods — encapsulate LanceDB queries for consumers
903    // ========================================================================
904
905    /// Scan a per-label vertex table. Returns `None` if the table doesn't exist.
906    ///
907    /// Internally opens the table, filters requested columns to those that
908    /// physically exist, and applies the version HWM filter for snapshot isolation.
909    pub async fn scan_vertex_table(
910        &self,
911        label: &str,
912        columns: &[&str],
913        additional_filter: Option<&str>,
914    ) -> Result<Option<arrow_array::RecordBatch>> {
915        let backend = self.backend();
916        let table_name = table_names::vertex_table_name(label);
917
918        if !backend.table_exists(&table_name).await.unwrap_or(false) {
919            return Ok(None);
920        }
921
922        // Filter columns to those that exist in the table
923        let actual_columns =
924            if let Some(table_schema) = backend.get_table_schema(&table_name).await? {
925                let table_field_names: HashSet<&str> = table_schema
926                    .fields()
927                    .iter()
928                    .map(|f| f.name().as_str())
929                    .collect();
930                columns
931                    .iter()
932                    .copied()
933                    .filter(|c| table_field_names.contains(c))
934                    .map(|s| s.to_string())
935                    .collect::<Vec<_>>()
936            } else {
937                return Ok(None);
938            };
939
940        // Build filter with version HWM + optional additional filter
941        let filter = match (self.version_high_water_mark(), additional_filter) {
942            (Some(hwm), Some(f)) => Some(format!("_version <= {} AND ({})", hwm, f)),
943            (Some(hwm), None) => Some(format!("_version <= {}", hwm)),
944            (None, Some(f)) => Some(f.to_string()),
945            (None, None) => None,
946        };
947
948        let mut request = ScanRequest::all(&table_name).with_columns(actual_columns);
949        if let Some(f) = filter {
950            request = request.with_filter(f);
951        }
952
953        match backend.scan(request).await {
954            Ok(batches) => {
955                if batches.is_empty() {
956                    Ok(None)
957                } else {
958                    Ok(Some(arrow::compute::concat_batches(
959                        &batches[0].schema(),
960                        &batches,
961                    )?))
962                }
963            }
964            Err(_) => Ok(None),
965        }
966    }
967
968    /// Scan a delta table for an edge type + direction.
969    /// Returns `None` if the table doesn't exist.
970    pub async fn scan_delta_table(
971        &self,
972        edge_type: &str,
973        direction: &str,
974        columns: &[&str],
975        additional_filter: Option<&str>,
976    ) -> Result<Option<arrow_array::RecordBatch>> {
977        let backend = self.backend();
978        let table_name = table_names::delta_table_name(edge_type, direction);
979
980        if !backend.table_exists(&table_name).await.unwrap_or(false) {
981            return Ok(None);
982        }
983
984        // Filter columns to those that exist
985        let actual_columns =
986            if let Some(table_schema) = backend.get_table_schema(&table_name).await? {
987                let table_field_names: HashSet<&str> = table_schema
988                    .fields()
989                    .iter()
990                    .map(|f| f.name().as_str())
991                    .collect();
992                columns
993                    .iter()
994                    .copied()
995                    .filter(|c| table_field_names.contains(c))
996                    .map(|s| s.to_string())
997                    .collect::<Vec<_>>()
998            } else {
999                return Ok(None);
1000            };
1001
1002        let filter = match (self.version_high_water_mark(), additional_filter) {
1003            (Some(hwm), Some(f)) => Some(format!("_version <= {} AND ({})", hwm, f)),
1004            (Some(hwm), None) => Some(format!("_version <= {}", hwm)),
1005            (None, Some(f)) => Some(f.to_string()),
1006            (None, None) => None,
1007        };
1008
1009        let mut request = ScanRequest::all(&table_name).with_columns(actual_columns);
1010        if let Some(f) = filter {
1011            request = request.with_filter(f);
1012        }
1013
1014        match backend.scan(request).await {
1015            Ok(batches) => {
1016                if batches.is_empty() {
1017                    Ok(None)
1018                } else {
1019                    Ok(Some(arrow::compute::concat_batches(
1020                        &batches[0].schema(),
1021                        &batches,
1022                    )?))
1023                }
1024            }
1025            Err(_) => Ok(None),
1026        }
1027    }
1028
1029    /// Scan the unified main vertex table. Returns `None` if table doesn't exist.
1030    ///
1031    /// Applies version HWM filter internally for snapshot isolation, combined
1032    /// with any caller-provided filter (label conditions, etc.).
1033    pub async fn scan_main_vertex_table(
1034        &self,
1035        columns: &[&str],
1036        filter: Option<&str>,
1037    ) -> Result<Option<arrow_array::RecordBatch>> {
1038        let backend = self.backend();
1039        let table_name = table_names::main_vertex_table_name();
1040
1041        if !backend.table_exists(table_name).await.unwrap_or(false) {
1042            return Ok(None);
1043        }
1044
1045        // Combine caller filter with version HWM for snapshot isolation
1046        let full_filter = match (self.version_high_water_mark(), filter) {
1047            (Some(hwm), Some(f)) => Some(format!("_version <= {} AND ({})", hwm, f)),
1048            (Some(hwm), None) => Some(format!("_version <= {}", hwm)),
1049            (None, Some(f)) => Some(f.to_string()),
1050            (None, None) => None,
1051        };
1052
1053        let request = ScanRequest::all(table_name)
1054            .with_columns(columns.iter().map(|s| s.to_string()).collect());
1055        let request = match full_filter.as_deref() {
1056            Some(f) => request.with_filter(f),
1057            None => request,
1058        };
1059
1060        match backend.scan(request).await {
1061            Ok(batches) => {
1062                if batches.is_empty() {
1063                    Ok(None)
1064                } else {
1065                    Ok(Some(arrow::compute::concat_batches(
1066                        &batches[0].schema(),
1067                        &batches,
1068                    )?))
1069                }
1070            }
1071            Err(_) => Ok(None),
1072        }
1073    }
1074
1075    /// Scan the main edge table as a stream. Returns `None` if table doesn't exist.
1076    pub async fn scan_main_edge_table_stream(
1077        &self,
1078        filter: Option<&str>,
1079    ) -> Result<
1080        Option<
1081            std::pin::Pin<Box<dyn futures::Stream<Item = Result<arrow_array::RecordBatch>> + Send>>,
1082        >,
1083    > {
1084        let backend = self.backend();
1085        let table_name = table_names::main_edge_table_name();
1086
1087        if !backend.table_exists(table_name).await.unwrap_or(false) {
1088            return Ok(None);
1089        }
1090
1091        let mut request = ScanRequest::all(table_name);
1092        if let Some(f) = filter {
1093            request = request.with_filter(f);
1094        }
1095
1096        let stream = backend.scan_stream(request).await?;
1097        Ok(Some(stream))
1098    }
1099
1100    /// Scan a per-label vertex table as a stream. Returns `None` if table doesn't exist.
1101    pub async fn scan_vertex_table_stream(
1102        &self,
1103        label: &str,
1104    ) -> Result<
1105        Option<
1106            std::pin::Pin<Box<dyn futures::Stream<Item = Result<arrow_array::RecordBatch>> + Send>>,
1107        >,
1108    > {
1109        let backend = self.backend();
1110        let table_name = table_names::vertex_table_name(label);
1111
1112        if !backend.table_exists(&table_name).await.unwrap_or(false) {
1113            return Ok(None);
1114        }
1115
1116        let stream = backend.scan_stream(ScanRequest::all(&table_name)).await?;
1117        Ok(Some(stream))
1118    }
1119
1120    /// Find a vertex VID by external ID. Uses pinned snapshot HWM if present.
1121    pub async fn find_vertex_by_ext_id(&self, ext_id: &str) -> Result<Option<Vid>> {
1122        MainVertexDataset::find_by_ext_id(self.backend(), ext_id, self.version_high_water_mark())
1123            .await
1124    }
1125
1126    /// Find labels for a vertex by VID. Uses pinned snapshot HWM if present.
1127    pub async fn find_vertex_labels_by_vid(&self, vid: Vid) -> Result<Option<Vec<String>>> {
1128        MainVertexDataset::find_labels_by_vid(self.backend(), vid, self.version_high_water_mark())
1129            .await
1130    }
1131
1132    /// Find edges from the main edge table by type names.
1133    pub async fn find_edges_by_type_names(
1134        &self,
1135        type_names: &[&str],
1136    ) -> Result<Vec<(Eid, Vid, Vid, String, uni_common::Properties)>> {
1137        MainEdgeDataset::find_edges_by_type_names(self.backend(), type_names).await
1138    }
1139
1140    /// Scan vertex candidates matching a filter. Returns VIDs where `_deleted = false`.
1141    pub async fn scan_vertex_candidates(
1142        &self,
1143        label: &str,
1144        filter: Option<&str>,
1145    ) -> Result<Vec<Vid>> {
1146        let backend = self.backend();
1147        let table_name = table_names::vertex_table_name(label);
1148
1149        if !backend.table_exists(&table_name).await.unwrap_or(false) {
1150            return Ok(Vec::new());
1151        }
1152
1153        let full_filter = match filter {
1154            Some(f) => format!("_deleted = false AND ({})", f),
1155            None => "_deleted = false".to_string(),
1156        };
1157
1158        let request = ScanRequest::all(&table_name)
1159            .with_filter(full_filter)
1160            .with_columns(vec!["_vid".to_string()]);
1161
1162        let batches = backend.scan(request).await?;
1163
1164        let mut vids = Vec::new();
1165        for batch in batches {
1166            let vid_col = batch
1167                .column_by_name("_vid")
1168                .ok_or(anyhow!("Missing _vid"))?
1169                .as_any()
1170                .downcast_ref::<UInt64Array>()
1171                .ok_or(anyhow!("Invalid _vid"))?;
1172            for i in 0..batch.num_rows() {
1173                vids.push(Vid::from(vid_col.value(i)));
1174            }
1175        }
1176        Ok(vids)
1177    }
1178
1179    pub fn vertex_dataset(&self, label: &str) -> Result<VertexDataset> {
1180        let schema = self.schema_manager.schema();
1181        let label_meta = schema
1182            .labels
1183            .get(label)
1184            .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
1185        Ok(VertexDataset::new(&self.base_uri, label, label_meta.id))
1186    }
1187
1188    #[cfg(feature = "lance-backend")]
1189    pub fn edge_dataset(
1190        &self,
1191        edge_type: &str,
1192        src_label: &str,
1193        dst_label: &str,
1194    ) -> Result<EdgeDataset> {
1195        Ok(EdgeDataset::new(
1196            &self.base_uri,
1197            edge_type,
1198            src_label,
1199            dst_label,
1200        ))
1201    }
1202
1203    pub fn delta_dataset(&self, edge_type: &str, direction: &str) -> Result<DeltaDataset> {
1204        Ok(DeltaDataset::new(&self.base_uri, edge_type, direction))
1205    }
1206
1207    pub fn adjacency_dataset(
1208        &self,
1209        edge_type: &str,
1210        label: &str,
1211        direction: &str,
1212    ) -> Result<AdjacencyDataset> {
1213        Ok(AdjacencyDataset::new(
1214            &self.base_uri,
1215            edge_type,
1216            label,
1217            direction,
1218        ))
1219    }
1220
1221    /// Get the main vertex dataset for unified vertex storage.
1222    ///
1223    /// The main vertex dataset contains all vertices regardless of label,
1224    /// enabling fast ID-based lookups without knowing the label.
1225    pub fn main_vertex_dataset(&self) -> MainVertexDataset {
1226        MainVertexDataset::new(&self.base_uri)
1227    }
1228
1229    /// Get the main edge dataset for unified edge storage.
1230    ///
1231    /// The main edge dataset contains all edges regardless of type,
1232    /// enabling fast ID-based lookups without knowing the edge type.
1233    pub fn main_edge_dataset(&self) -> MainEdgeDataset {
1234        MainEdgeDataset::new(&self.base_uri)
1235    }
1236
1237    #[cfg(feature = "lance-backend")]
1238    pub fn uid_index(&self, label: &str) -> Result<UidIndex> {
1239        Ok(UidIndex::new(&self.base_uri, label))
1240    }
1241
1242    #[cfg(feature = "lance-backend")]
1243    pub async fn inverted_index(&self, label: &str, property: &str) -> Result<InvertedIndex> {
1244        let schema = self.schema_manager.schema();
1245        let config = schema
1246            .indexes
1247            .iter()
1248            .find_map(|idx| match idx {
1249                IndexDefinition::Inverted(cfg)
1250                    if cfg.label == label && cfg.property == property =>
1251                {
1252                    Some(cfg.clone())
1253                }
1254                _ => None,
1255            })
1256            .ok_or_else(|| anyhow!("Inverted index not found for {}.{}", label, property))?;
1257
1258        InvertedIndex::new(&self.base_uri, config).await
1259    }
1260
1261    pub async fn vector_search(
1262        &self,
1263        label: &str,
1264        property: &str,
1265        query: &[f32],
1266        k: usize,
1267        filter: Option<&str>,
1268        ctx: Option<&QueryContext>,
1269    ) -> Result<Vec<(Vid, f32)>> {
1270        use crate::backend::types::{DistanceMetric as BackendMetric, FilterExpr};
1271
1272        // Look up vector index config to get the correct distance metric.
1273        let schema = self.schema_manager.schema();
1274        let metric = schema
1275            .vector_index_for_property(label, property)
1276            .map(|config| config.metric.clone())
1277            .unwrap_or(DistanceMetric::L2);
1278
1279        let backend = self.backend.as_ref();
1280        let name = table_names::vertex_table_name(label);
1281
1282        let mut results = Vec::new();
1283
1284        // Only search if the table exists
1285        if backend.table_exists(&name).await.unwrap_or(false) {
1286            let backend_metric = match &metric {
1287                DistanceMetric::L2 => BackendMetric::L2,
1288                DistanceMetric::Cosine => BackendMetric::Cosine,
1289                DistanceMetric::Dot => BackendMetric::Dot,
1290                _ => BackendMetric::L2,
1291            };
1292
1293            // Build combined filter: _deleted = false + optional user filter + HWM
1294            let mut filter_parts = vec![Self::build_active_filter(filter)];
1295            if ctx.is_some()
1296                && let Some(hwm) = self.version_high_water_mark()
1297            {
1298                filter_parts.push(format!("_version <= {}", hwm));
1299            }
1300            let combined_filter = FilterExpr::Sql(filter_parts.join(" AND "));
1301
1302            let batches = backend
1303                .vector_search(&name, property, query, k, backend_metric, combined_filter)
1304                .await?;
1305
1306            results = extract_vid_score_pairs(&batches, "_vid", "_distance")?;
1307        }
1308
1309        // Merge L0 buffer vertices into results for visibility of unflushed data.
1310        if let Some(qctx) = ctx {
1311            merge_l0_into_vector_results(&mut results, qctx, label, property, query, k, &metric);
1312        }
1313
1314        Ok(results)
1315    }
1316
1317    /// Perform a full-text search with BM25 scoring.
1318    ///
1319    /// Returns vertices matching the search query along with their BM25 scores.
1320    /// Results are sorted by score descending (most relevant first).
1321    ///
1322    /// # Arguments
1323    /// * `label` - The label to search within
1324    /// * `property` - The property column to search (must have FTS index)
1325    /// * `query` - The search query text
1326    /// * `k` - Maximum number of results to return
1327    /// * `filter` - Optional Lance filter expression
1328    /// * `ctx` - Optional query context for visibility checks
1329    ///
1330    /// # Returns
1331    /// Vector of (Vid, score) tuples, where score is the BM25 relevance score.
1332    pub async fn fts_search(
1333        &self,
1334        label: &str,
1335        property: &str,
1336        query: &str,
1337        k: usize,
1338        filter: Option<&str>,
1339        ctx: Option<&QueryContext>,
1340    ) -> Result<Vec<(Vid, f32)>> {
1341        use crate::backend::types::FilterExpr;
1342
1343        let backend = self.backend.as_ref();
1344        let name = table_names::vertex_table_name(label);
1345
1346        let mut results = if backend.table_exists(&name).await.unwrap_or(false) {
1347            // Build combined filter: _deleted = false + optional user filter + HWM
1348            let mut filter_parts = vec![Self::build_active_filter(filter)];
1349            if ctx.is_some()
1350                && let Some(hwm) = self.version_high_water_mark()
1351            {
1352                filter_parts.push(format!("_version <= {}", hwm));
1353            }
1354            let combined_filter = FilterExpr::Sql(filter_parts.join(" AND "));
1355
1356            let batches = backend
1357                .full_text_search(&name, property, query, k, combined_filter)
1358                .await?;
1359
1360            let mut fts_results = extract_vid_score_pairs(&batches, "_vid", "_score")?;
1361            // Results should already be sorted by score from backend, but ensure descending order
1362            fts_results.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
1363            fts_results
1364        } else {
1365            Vec::new()
1366        };
1367
1368        // Merge L0 buffer vertices for visibility of unflushed data.
1369        if let Some(qctx) = ctx {
1370            merge_l0_into_fts_results(&mut results, qctx, label, property, query, k);
1371        }
1372
1373        Ok(results)
1374    }
1375
1376    #[cfg(feature = "lance-backend")]
1377    pub async fn get_vertex_by_uid(&self, uid: &UniId, label: &str) -> Result<Option<Vid>> {
1378        let index = self.uid_index(label)?;
1379        index.get_vid(uid).await
1380    }
1381
1382    #[cfg(feature = "lance-backend")]
1383    pub async fn insert_vertex_with_uid(&self, label: &str, vid: Vid, uid: UniId) -> Result<()> {
1384        let index = self.uid_index(label)?;
1385        index.write_mapping(&[(uid, vid)]).await
1386    }
1387
1388    pub async fn load_subgraph(
1389        &self,
1390        start_vids: &[Vid],
1391        edge_types: &[u32],
1392        max_hops: usize,
1393        direction: GraphDirection,
1394        l0: Option<&L0Buffer>,
1395    ) -> Result<WorkingGraph> {
1396        let mut graph = WorkingGraph::new();
1397        let schema = self.schema_manager.schema();
1398
1399        // Build maps for ID lookups
1400        let label_map: HashMap<u16, String> = schema
1401            .labels
1402            .values()
1403            .map(|meta| {
1404                (
1405                    meta.id,
1406                    schema.label_name_by_id(meta.id).unwrap().to_owned(),
1407                )
1408            })
1409            .collect();
1410
1411        let edge_type_map: HashMap<u32, String> = schema
1412            .edge_types
1413            .values()
1414            .map(|meta| {
1415                (
1416                    meta.id,
1417                    schema.edge_type_name_by_id(meta.id).unwrap().to_owned(),
1418                )
1419            })
1420            .collect();
1421
1422        let target_edge_types: HashSet<u32> = edge_types.iter().cloned().collect();
1423
1424        // Initialize frontier
1425        let mut frontier: Vec<Vid> = start_vids.to_vec();
1426        let mut visited: HashSet<Vid> = HashSet::new();
1427
1428        // Add start vertices to graph
1429        for &vid in start_vids {
1430            graph.add_vertex(vid);
1431        }
1432
1433        for _hop in 0..max_hops {
1434            let mut next_frontier = HashSet::new();
1435
1436            for &vid in &frontier {
1437                if visited.contains(&vid) {
1438                    continue;
1439                }
1440                visited.insert(vid);
1441                graph.add_vertex(vid);
1442
1443                // For each edge type we want to traverse
1444                for &etype_id in &target_edge_types {
1445                    let etype_name = edge_type_map
1446                        .get(&etype_id)
1447                        .ok_or_else(|| anyhow!("Unknown edge type ID: {}", etype_id))?;
1448
1449                    // Determine directions
1450                    // Storage direction: "fwd" or "bwd".
1451                    // Query direction: Outgoing -> "fwd", Incoming -> "bwd".
1452                    let (dir_str, neighbor_is_dst) = match direction {
1453                        GraphDirection::Outgoing => ("fwd", true),
1454                        GraphDirection::Incoming => ("bwd", false),
1455                    };
1456
1457                    let mut edges: HashMap<Eid, EdgeState> = HashMap::new();
1458
1459                    // 1. L2: Adjacency (Base)
1460                    // In the new storage model, VIDs don't embed label info.
1461                    // We need to try all labels to find the adjacency data.
1462                    // Edge version from snapshot (reserved for future version filtering)
1463                    let _edge_ver = self
1464                        .pinned_snapshot
1465                        .as_ref()
1466                        .and_then(|s| s.edges.get(etype_name).map(|es| es.lance_version));
1467
1468                    // Try each label until we find adjacency data
1469                    let backend = self.backend();
1470                    for current_src_label in label_map.values() {
1471                        let adj_ds =
1472                            match self.adjacency_dataset(etype_name, current_src_label, dir_str) {
1473                                Ok(ds) => ds,
1474                                Err(_) => continue,
1475                            };
1476                        if let Some((neighbors, eids)) =
1477                            adj_ds.read_adjacency_backend(backend, vid).await?
1478                        {
1479                            for (n, eid) in neighbors.into_iter().zip(eids) {
1480                                edges.insert(
1481                                    eid,
1482                                    EdgeState {
1483                                        neighbor: n,
1484                                        version: 0,
1485                                        deleted: false,
1486                                    },
1487                                );
1488                            }
1489                            break; // Found adjacency data for this vid, no need to try other labels
1490                        }
1491                    }
1492
1493                    // 2. L1: Delta
1494                    let delta_ds = self.delta_dataset(etype_name, dir_str)?;
1495                    let delta_entries = delta_ds
1496                        .read_deltas(backend, vid, &schema, self.version_high_water_mark())
1497                        .await?;
1498                    Self::apply_delta_to_edges(&mut edges, delta_entries, neighbor_is_dst);
1499
1500                    // 3. L0: Buffer
1501                    if let Some(l0) = l0 {
1502                        Self::apply_l0_to_edges(&mut edges, l0, vid, etype_id, direction);
1503                    }
1504
1505                    // Add resulting edges to graph
1506                    Self::add_edges_to_graph(
1507                        &mut graph,
1508                        edges,
1509                        vid,
1510                        etype_id,
1511                        neighbor_is_dst,
1512                        &visited,
1513                        &mut next_frontier,
1514                    );
1515                }
1516            }
1517            frontier = next_frontier.into_iter().collect();
1518
1519            // Early termination: if frontier is empty, no more vertices to explore
1520            if frontier.is_empty() {
1521                break;
1522            }
1523        }
1524
1525        Ok(graph)
1526    }
1527
1528    /// Apply delta entries to edge state map, handling version conflicts.
1529    fn apply_delta_to_edges(
1530        edges: &mut HashMap<Eid, EdgeState>,
1531        delta_entries: Vec<crate::storage::delta::L1Entry>,
1532        neighbor_is_dst: bool,
1533    ) {
1534        for entry in delta_entries {
1535            let neighbor = if neighbor_is_dst {
1536                entry.dst_vid
1537            } else {
1538                entry.src_vid
1539            };
1540            let current_ver = edges.get(&entry.eid).map(|s| s.version).unwrap_or(0);
1541
1542            if entry.version > current_ver {
1543                edges.insert(
1544                    entry.eid,
1545                    EdgeState {
1546                        neighbor,
1547                        version: entry.version,
1548                        deleted: matches!(entry.op, Op::Delete),
1549                    },
1550                );
1551            }
1552        }
1553    }
1554
1555    /// Apply L0 buffer edges and tombstones to edge state map.
1556    fn apply_l0_to_edges(
1557        edges: &mut HashMap<Eid, EdgeState>,
1558        l0: &L0Buffer,
1559        vid: Vid,
1560        etype_id: u32,
1561        direction: GraphDirection,
1562    ) {
1563        let l0_neighbors = l0.get_neighbors(vid, etype_id, direction);
1564        for (neighbor, eid, ver) in l0_neighbors {
1565            let current_ver = edges.get(&eid).map(|s| s.version).unwrap_or(0);
1566            if ver > current_ver {
1567                edges.insert(
1568                    eid,
1569                    EdgeState {
1570                        neighbor,
1571                        version: ver,
1572                        deleted: false,
1573                    },
1574                );
1575            }
1576        }
1577
1578        // Check tombstones in L0
1579        for (eid, state) in edges.iter_mut() {
1580            if l0.is_tombstoned(*eid) {
1581                state.deleted = true;
1582            }
1583        }
1584    }
1585
1586    /// Add non-deleted edges to graph and collect next frontier.
1587    fn add_edges_to_graph(
1588        graph: &mut WorkingGraph,
1589        edges: HashMap<Eid, EdgeState>,
1590        vid: Vid,
1591        etype_id: u32,
1592        neighbor_is_dst: bool,
1593        visited: &HashSet<Vid>,
1594        next_frontier: &mut HashSet<Vid>,
1595    ) {
1596        for (eid, state) in edges {
1597            if state.deleted {
1598                continue;
1599            }
1600            graph.add_vertex(state.neighbor);
1601
1602            if !visited.contains(&state.neighbor) {
1603                next_frontier.insert(state.neighbor);
1604            }
1605
1606            if neighbor_is_dst {
1607                graph.add_edge(vid, state.neighbor, eid, etype_id);
1608            } else {
1609                graph.add_edge(state.neighbor, vid, eid, etype_id);
1610            }
1611        }
1612    }
1613}
1614
1615/// Extracts `(Vid, f32)` pairs from record batches using the given VID and score column names.
1616fn extract_vid_score_pairs(
1617    batches: &[arrow_array::RecordBatch],
1618    vid_column: &str,
1619    score_column: &str,
1620) -> Result<Vec<(Vid, f32)>> {
1621    let mut results = Vec::new();
1622    for batch in batches {
1623        let vid_col = batch
1624            .column_by_name(vid_column)
1625            .ok_or_else(|| anyhow!("Missing {} column", vid_column))?
1626            .as_any()
1627            .downcast_ref::<UInt64Array>()
1628            .ok_or_else(|| anyhow!("Invalid {} column type", vid_column))?;
1629
1630        let score_col = batch
1631            .column_by_name(score_column)
1632            .ok_or_else(|| anyhow!("Missing {} column", score_column))?
1633            .as_any()
1634            .downcast_ref::<Float32Array>()
1635            .ok_or_else(|| anyhow!("Invalid {} column type", score_column))?;
1636
1637        for i in 0..batch.num_rows() {
1638            results.push((Vid::from(vid_col.value(i)), score_col.value(i)));
1639        }
1640    }
1641    Ok(results)
1642}
1643
1644/// Extracts a `Vec<f32>` from a JSON property value.
1645///
1646/// Returns `None` if the property is missing, not an array, or contains
1647/// non-numeric elements.
1648fn extract_embedding_from_props(
1649    props: &uni_common::Properties,
1650    property: &str,
1651) -> Option<Vec<f32>> {
1652    let arr = props.get(property)?.as_array()?;
1653    arr.iter().map(|v| v.as_f64().map(|f| f as f32)).collect()
1654}
1655
1656/// Merges L0 buffer vertices into LanceDB vector search results.
1657///
1658/// Visits L0 buffers in precedence order (pending flush → main → transaction),
1659/// collects tombstoned VIDs and candidate embeddings, then merges them with the
1660/// existing LanceDB results so that:
1661/// - Tombstoned VIDs are removed (unless re-created in a later L0).
1662/// - VIDs present in both L0 and LanceDB use the L0 distance.
1663/// - New L0-only VIDs are appended.
1664/// - Results are re-sorted by distance ascending and truncated to `k`.
1665fn merge_l0_into_vector_results(
1666    results: &mut Vec<(Vid, f32)>,
1667    ctx: &QueryContext,
1668    label: &str,
1669    property: &str,
1670    query: &[f32],
1671    k: usize,
1672    metric: &DistanceMetric,
1673) {
1674    // Collect all L0 buffers in precedence order (earliest first, last writer wins).
1675    let mut buffers: Vec<Arc<parking_lot::RwLock<L0Buffer>>> =
1676        ctx.pending_flush_l0s.iter().map(Arc::clone).collect();
1677    buffers.push(Arc::clone(&ctx.l0));
1678    if let Some(ref txn) = ctx.transaction_l0 {
1679        buffers.push(Arc::clone(txn));
1680    }
1681
1682    // Maps VID → distance for L0 candidates (last writer wins).
1683    let mut l0_candidates: HashMap<Vid, f32> = HashMap::new();
1684    // Tombstoned VIDs across all L0 buffers.
1685    let mut tombstoned: HashSet<Vid> = HashSet::new();
1686
1687    for buf_arc in &buffers {
1688        let buf = buf_arc.read();
1689
1690        // Accumulate tombstones.
1691        for &vid in &buf.vertex_tombstones {
1692            tombstoned.insert(vid);
1693        }
1694
1695        // Scan vertices with the target label.
1696        for (&vid, labels) in &buf.vertex_labels {
1697            if !labels.iter().any(|l| l == label) {
1698                continue;
1699            }
1700            if let Some(props) = buf.vertex_properties.get(&vid)
1701                && let Some(emb) = extract_embedding_from_props(props, property)
1702            {
1703                if emb.len() != query.len() {
1704                    continue; // dimension mismatch
1705                }
1706                let dist = metric.compute_distance(&emb, query);
1707                // Last writer wins: later buffer overwrites earlier.
1708                l0_candidates.insert(vid, dist);
1709                // If re-created in a later L0, remove from tombstones.
1710                tombstoned.remove(&vid);
1711            }
1712        }
1713    }
1714
1715    // If no L0 activity affects this search, skip merge.
1716    if l0_candidates.is_empty() && tombstoned.is_empty() {
1717        return;
1718    }
1719
1720    // Remove tombstoned VIDs from LanceDB results.
1721    results.retain(|(vid, _)| !tombstoned.contains(vid));
1722
1723    // Overwrite or append L0 candidates.
1724    for (vid, dist) in &l0_candidates {
1725        if let Some(existing) = results.iter_mut().find(|(v, _)| v == vid) {
1726            existing.1 = *dist;
1727        } else {
1728            results.push((*vid, *dist));
1729        }
1730    }
1731
1732    // Re-sort by distance ascending.
1733    results.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
1734    results.truncate(k);
1735}
1736
1737/// Computes a simple token-overlap relevance score between a query and text.
1738///
1739/// Returns the fraction of query tokens found in the text (case-insensitive),
1740/// producing a score in [0.0, 1.0]. Sufficient for the small L0 buffer.
1741fn compute_text_relevance(query: &str, text: &str) -> f32 {
1742    let query_tokens: HashSet<String> =
1743        query.split_whitespace().map(|t| t.to_lowercase()).collect();
1744    if query_tokens.is_empty() {
1745        return 0.0;
1746    }
1747    let text_tokens: HashSet<String> = text.split_whitespace().map(|t| t.to_lowercase()).collect();
1748    let hits = query_tokens
1749        .iter()
1750        .filter(|t| text_tokens.contains(t.as_str()))
1751        .count();
1752    hits as f32 / query_tokens.len() as f32
1753}
1754
1755/// Extracts a string slice from a property value.
1756fn extract_text_from_props<'a>(
1757    props: &'a uni_common::Properties,
1758    property: &str,
1759) -> Option<&'a str> {
1760    props.get(property)?.as_str()
1761}
1762
1763/// Merges L0 buffer vertices into LanceDB full-text search results.
1764///
1765/// Follows the same pattern as [`merge_l0_into_vector_results`]: visits L0
1766/// buffers in precedence order, collects tombstoned VIDs and text-match
1767/// candidates, then merges them so that:
1768/// - Tombstoned VIDs are removed (unless re-created in a later L0).
1769/// - VIDs present in both L0 and LanceDB use the L0 score.
1770/// - New L0-only VIDs are appended.
1771/// - Results are re-sorted by score **descending** and truncated to `k`.
1772fn merge_l0_into_fts_results(
1773    results: &mut Vec<(Vid, f32)>,
1774    ctx: &QueryContext,
1775    label: &str,
1776    property: &str,
1777    query: &str,
1778    k: usize,
1779) {
1780    // Collect all L0 buffers in precedence order (earliest first, last writer wins).
1781    let mut buffers: Vec<Arc<parking_lot::RwLock<L0Buffer>>> =
1782        ctx.pending_flush_l0s.iter().map(Arc::clone).collect();
1783    buffers.push(Arc::clone(&ctx.l0));
1784    if let Some(ref txn) = ctx.transaction_l0 {
1785        buffers.push(Arc::clone(txn));
1786    }
1787
1788    // Maps VID → relevance score for L0 candidates (last writer wins).
1789    let mut l0_candidates: HashMap<Vid, f32> = HashMap::new();
1790    // Tombstoned VIDs across all L0 buffers.
1791    let mut tombstoned: HashSet<Vid> = HashSet::new();
1792
1793    for buf_arc in &buffers {
1794        let buf = buf_arc.read();
1795
1796        // Accumulate tombstones.
1797        for &vid in &buf.vertex_tombstones {
1798            tombstoned.insert(vid);
1799        }
1800
1801        // Scan vertices with the target label.
1802        for (&vid, labels) in &buf.vertex_labels {
1803            if !labels.iter().any(|l| l == label) {
1804                continue;
1805            }
1806            if let Some(props) = buf.vertex_properties.get(&vid)
1807                && let Some(text) = extract_text_from_props(props, property)
1808            {
1809                let score = compute_text_relevance(query, text);
1810                if score > 0.0 {
1811                    // Last writer wins: later buffer overwrites earlier.
1812                    l0_candidates.insert(vid, score);
1813                }
1814                // If re-created in a later L0, remove from tombstones.
1815                tombstoned.remove(&vid);
1816            }
1817        }
1818    }
1819
1820    // If no L0 activity affects this search, skip merge.
1821    if l0_candidates.is_empty() && tombstoned.is_empty() {
1822        return;
1823    }
1824
1825    // Remove tombstoned VIDs from LanceDB results.
1826    results.retain(|(vid, _)| !tombstoned.contains(vid));
1827
1828    // Overwrite or append L0 candidates.
1829    for (vid, score) in &l0_candidates {
1830        if let Some(existing) = results.iter_mut().find(|(v, _)| v == vid) {
1831            existing.1 = *score;
1832        } else {
1833            results.push((*vid, *score));
1834        }
1835    }
1836
1837    // Re-sort by score descending (higher relevance first).
1838    results.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
1839    results.truncate(k);
1840}