Skip to main content

uni_store/storage/
manager.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::compaction::{CompactionStats, CompactionStatus, CompactionTask};
5use crate::lancedb::LanceDbStore;
6use crate::runtime::WorkingGraph;
7use crate::runtime::context::QueryContext;
8use crate::runtime::l0::L0Buffer;
9use crate::storage::adjacency::AdjacencyDataset;
10use crate::storage::compaction::Compactor;
11use crate::storage::delta::{DeltaDataset, ENTRY_SIZE_ESTIMATE, Op};
12use crate::storage::direction::Direction;
13use crate::storage::edge::EdgeDataset;
14use crate::storage::index::UidIndex;
15use crate::storage::inverted_index::InvertedIndex;
16use crate::storage::main_edge::MainEdgeDataset;
17use crate::storage::main_vertex::MainVertexDataset;
18use crate::storage::vertex::VertexDataset;
19use anyhow::{Result, anyhow};
20use arrow_array::{Array, Float32Array, TimestampNanosecondArray, UInt64Array};
21use dashmap::DashMap;
22use futures::TryStreamExt;
23use lancedb::query::{ExecutableQuery, QueryBase, Select};
24use object_store::ObjectStore;
25use object_store::local::LocalFileSystem;
26use parking_lot::RwLock;
27use std::collections::{HashMap, HashSet};
28use std::sync::{Arc, Mutex};
29use std::time::{Duration, SystemTime, UNIX_EPOCH};
30use tracing::warn;
31use uni_common::config::UniConfig;
32use uni_common::core::id::{Eid, UniId, Vid};
33use uni_common::core::schema::{DistanceMetric, IndexDefinition, SchemaManager};
34use uni_common::sync::acquire_mutex;
35
36use crate::snapshot::manager::SnapshotManager;
37use crate::storage::IndexManager;
38use crate::storage::adjacency_manager::AdjacencyManager;
39use crate::storage::resilient_store::ResilientObjectStore;
40
41use uni_common::core::snapshot::SnapshotManifest;
42
43use uni_common::graph::simple_graph::Direction as GraphDirection;
44
45/// Edge state during subgraph loading - tracks version and deletion status.
46struct EdgeState {
47    neighbor: Vid,
48    version: u64,
49    deleted: bool,
50}
51
52pub struct StorageManager {
53    base_uri: String,
54    store: Arc<dyn ObjectStore>,
55    schema_manager: Arc<SchemaManager>,
56    snapshot_manager: Arc<SnapshotManager>,
57    adjacency_manager: Arc<AdjacencyManager>,
58    /// Cache of opened LanceDB tables by label name for vector search performance
59    table_cache: DashMap<String, lancedb::Table>,
60    pub config: UniConfig,
61    pub compaction_status: Arc<Mutex<CompactionStatus>>,
62    /// Optional pinned snapshot for time-travel
63    pinned_snapshot: Option<SnapshotManifest>,
64    /// LanceDB store for DataFusion-powered queries.
65    lancedb_store: Arc<LanceDbStore>,
66    /// In-memory VID-to-labels index for O(1) lookups (optional, configurable)
67    vid_labels_index: Option<Arc<parking_lot::RwLock<crate::storage::vid_labels::VidLabelsIndex>>>,
68}
69
70/// Helper to manage compaction_in_progress flag
71struct CompactionGuard {
72    status: Arc<Mutex<CompactionStatus>>,
73}
74
75impl CompactionGuard {
76    fn new(status: Arc<Mutex<CompactionStatus>>) -> Option<Self> {
77        let mut s = acquire_mutex(&status, "compaction_status").ok()?;
78        if s.compaction_in_progress {
79            return None;
80        }
81        s.compaction_in_progress = true;
82        Some(Self {
83            status: status.clone(),
84        })
85    }
86}
87
88impl Drop for CompactionGuard {
89    fn drop(&mut self) {
90        // CRITICAL: Never panic in Drop - panicking in drop() = process ABORT.
91        // See issue #18/#150. If the lock is poisoned, log and continue gracefully.
92        match uni_common::sync::acquire_mutex(&self.status, "compaction_status") {
93            Ok(mut s) => {
94                s.compaction_in_progress = false;
95                s.last_compaction = Some(std::time::SystemTime::now());
96            }
97            Err(e) => {
98                // Lock is poisoned but we're in Drop - cannot panic.
99                // Log the error and continue. System state may be inconsistent but at least
100                // we don't abort the process.
101                log::error!(
102                    "CompactionGuard drop failed to acquire poisoned lock: {}. \
103                     Compaction status may be inconsistent. Issue #18/#150",
104                    e
105                );
106            }
107        }
108    }
109}
110
111impl StorageManager {
112    /// Create a new StorageManager with LanceDB integration.
113    pub async fn new(base_uri: &str, schema_manager: Arc<SchemaManager>) -> Result<Self> {
114        Self::new_with_config(base_uri, schema_manager, UniConfig::default()).await
115    }
116
117    /// Create a new StorageManager with custom cache size.
118    pub async fn new_with_cache(
119        base_uri: &str,
120        schema_manager: Arc<SchemaManager>,
121        adjacency_cache_size: usize,
122    ) -> Result<Self> {
123        let config = UniConfig {
124            cache_size: adjacency_cache_size,
125            ..Default::default()
126        };
127        Self::new_with_config(base_uri, schema_manager, config).await
128    }
129
130    /// Create a new StorageManager with custom configuration.
131    pub async fn new_with_config(
132        base_uri: &str,
133        schema_manager: Arc<SchemaManager>,
134        config: UniConfig,
135    ) -> Result<Self> {
136        let store = Self::build_store_from_uri(base_uri)?;
137        Self::new_with_store_and_config(base_uri, store, schema_manager, config).await
138    }
139
140    /// Create a new StorageManager using an already-constructed object store.
141    ///
142    /// This is used by higher layers that need explicit store configuration
143    /// (for example custom S3 endpoints in hybrid/cloud modes).
144    pub async fn new_with_store_and_config(
145        base_uri: &str,
146        store: Arc<dyn ObjectStore>,
147        schema_manager: Arc<SchemaManager>,
148        config: UniConfig,
149    ) -> Result<Self> {
150        Self::new_with_store_and_storage_options(base_uri, store, schema_manager, config, None)
151            .await
152    }
153
154    /// Create a new StorageManager using an already-constructed object store
155    /// and explicit LanceDB storage options.
156    pub async fn new_with_store_and_storage_options(
157        base_uri: &str,
158        store: Arc<dyn ObjectStore>,
159        schema_manager: Arc<SchemaManager>,
160        config: UniConfig,
161        lancedb_storage_options: Option<HashMap<String, String>>,
162    ) -> Result<Self> {
163        let resilient_store: Arc<dyn ObjectStore> = Arc::new(ResilientObjectStore::new(
164            store,
165            config.object_store.clone(),
166        ));
167
168        let snapshot_manager = Arc::new(SnapshotManager::new(resilient_store.clone()));
169
170        // Connect to LanceDB
171        let lancedb_store =
172            LanceDbStore::connect_with_storage_options(base_uri, lancedb_storage_options).await?;
173
174        // Perform crash recovery for all known table patterns
175        Self::recover_all_staging_tables(&lancedb_store, &schema_manager).await?;
176
177        let mut sm = Self {
178            base_uri: base_uri.to_string(),
179            store: resilient_store,
180            schema_manager,
181            snapshot_manager,
182            adjacency_manager: Arc::new(AdjacencyManager::new(config.cache_size)),
183            table_cache: DashMap::new(),
184            config,
185            compaction_status: Arc::new(Mutex::new(CompactionStatus::default())),
186            pinned_snapshot: None,
187            lancedb_store: Arc::new(lancedb_store),
188            vid_labels_index: None,
189        };
190
191        // Rebuild VidLabelsIndex if enabled
192        if sm.config.enable_vid_labels_index
193            && let Err(e) = sm.rebuild_vid_labels_index().await
194        {
195            warn!(
196                "Failed to rebuild VidLabelsIndex on startup: {}. Falling back to LanceDB queries.",
197                e
198            );
199        }
200
201        Ok(sm)
202    }
203
204    /// Recover all staging tables for known table patterns.
205    ///
206    /// This runs on startup to handle crash recovery. It checks for staging tables
207    /// for all vertex labels, adjacency tables, delta tables, and main tables.
208    async fn recover_all_staging_tables(
209        lancedb_store: &LanceDbStore,
210        schema_manager: &SchemaManager,
211    ) -> Result<()> {
212        let schema = schema_manager.schema();
213
214        // Recover main vertex and edge tables
215        lancedb_store
216            .recover_staging(LanceDbStore::main_vertex_table_name())
217            .await?;
218        lancedb_store
219            .recover_staging(LanceDbStore::main_edge_table_name())
220            .await?;
221
222        // Recover per-label vertex tables
223        for label in schema.labels.keys() {
224            let table_name = LanceDbStore::vertex_table_name(label);
225            lancedb_store.recover_staging(&table_name).await?;
226        }
227
228        // Recover adjacency and delta tables for each edge type and direction
229        for edge_type in schema.edge_types.keys() {
230            for direction in &["fwd", "bwd"] {
231                // Recover delta tables
232                let delta_table_name = LanceDbStore::delta_table_name(edge_type, direction);
233                lancedb_store.recover_staging(&delta_table_name).await?;
234
235                // Recover adjacency tables for each label
236                for _label in schema.labels.keys() {
237                    let adj_table_name = LanceDbStore::adjacency_table_name(edge_type, direction);
238                    lancedb_store.recover_staging(&adj_table_name).await?;
239                }
240            }
241        }
242
243        Ok(())
244    }
245
246    fn build_store_from_uri(base_uri: &str) -> Result<Arc<dyn ObjectStore>> {
247        if base_uri.contains("://") {
248            let parsed = url::Url::parse(base_uri).map_err(|e| anyhow!("Invalid base URI: {e}"))?;
249            let (store, _path) = object_store::parse_url(&parsed)
250                .map_err(|e| anyhow!("Failed to parse object store URL: {e}"))?;
251            Ok(Arc::from(store))
252        } else {
253            // If local path, ensure it exists.
254            std::fs::create_dir_all(base_uri)?;
255            Ok(Arc::new(LocalFileSystem::new_with_prefix(base_uri)?))
256        }
257    }
258
259    pub fn pinned(&self, snapshot: SnapshotManifest) -> Self {
260        Self {
261            base_uri: self.base_uri.clone(),
262            store: self.store.clone(),
263            schema_manager: self.schema_manager.clone(),
264            snapshot_manager: self.snapshot_manager.clone(),
265            // Separate AdjacencyManager for snapshot isolation (Issue #73):
266            // warm() will load only edges visible at the snapshot's HWM.
267            // This prevents live DB's CSR (with all edges) from leaking into snapshots.
268            adjacency_manager: Arc::new(AdjacencyManager::new(self.adjacency_manager.max_bytes())),
269            table_cache: DashMap::new(),
270            config: self.config.clone(),
271            compaction_status: Arc::new(Mutex::new(CompactionStatus::default())),
272            pinned_snapshot: Some(snapshot),
273            lancedb_store: self.lancedb_store.clone(),
274            vid_labels_index: self.vid_labels_index.clone(),
275        }
276    }
277
278    pub fn get_edge_version_by_id(&self, edge_type_id: u32) -> Option<u64> {
279        let schema = self.schema_manager.schema();
280        let name = schema.edge_type_name_by_id(edge_type_id)?;
281        self.pinned_snapshot
282            .as_ref()
283            .and_then(|s| s.edges.get(name).map(|es| es.lance_version))
284    }
285
286    /// Returns the version_high_water_mark from the pinned snapshot if present.
287    ///
288    /// This is used for time-travel queries to filter data by version.
289    /// When a snapshot is pinned, only rows with `_version <= version_high_water_mark`
290    /// should be considered visible.
291    pub fn version_high_water_mark(&self) -> Option<u64> {
292        self.pinned_snapshot
293            .as_ref()
294            .map(|s| s.version_high_water_mark)
295    }
296
297    /// Apply version filtering to a base filter expression.
298    ///
299    /// If a snapshot is pinned, wraps `base_filter` with an additional
300    /// `_version <= hwm` clause. Otherwise returns `base_filter` unchanged.
301    pub fn apply_version_filter(&self, base_filter: String) -> String {
302        if let Some(hwm) = self.version_high_water_mark() {
303            format!("({}) AND (_version <= {})", base_filter, hwm)
304        } else {
305            base_filter
306        }
307    }
308
309    /// Build a filter expression that excludes soft-deleted rows and optionally
310    /// includes a user-provided filter.
311    fn build_active_filter(user_filter: Option<&str>) -> String {
312        match user_filter {
313            Some(expr) => format!("({}) AND (_deleted = false)", expr),
314            None => "_deleted = false".to_string(),
315        }
316    }
317
318    pub fn store(&self) -> Arc<dyn ObjectStore> {
319        self.store.clone()
320    }
321
322    /// Get current compaction status.
323    ///
324    /// # Errors
325    ///
326    /// Returns error if the compaction status lock is poisoned (see issue #18/#150).
327    pub fn compaction_status(
328        &self,
329    ) -> Result<CompactionStatus, uni_common::sync::LockPoisonedError> {
330        let guard = uni_common::sync::acquire_mutex(&self.compaction_status, "compaction_status")?;
331        Ok(guard.clone())
332    }
333
334    pub async fn compact(&self) -> Result<CompactionStats> {
335        // LanceDB handles compaction internally via optimize()
336        // For now, call optimize on vertex tables
337        let start = std::time::Instant::now();
338        let schema = self.schema_manager.schema();
339        let mut files_compacted = 0;
340
341        for label in schema.labels.keys() {
342            let table_name = LanceDbStore::vertex_table_name(label);
343            if self.lancedb_store.table_exists(&table_name).await? {
344                let table = self.lancedb_store.open_table(&table_name).await?;
345                table.optimize(lancedb::table::OptimizeAction::All).await?;
346                files_compacted += 1;
347                self.invalidate_table_cache(label);
348            }
349        }
350
351        Ok(CompactionStats {
352            files_compacted,
353            bytes_before: 0,
354            bytes_after: 0,
355            duration: start.elapsed(),
356            crdt_merges: 0,
357        })
358    }
359
360    pub async fn compact_label(&self, label: &str) -> Result<CompactionStats> {
361        let _guard = CompactionGuard::new(self.compaction_status.clone())
362            .ok_or_else(|| anyhow!("Compaction already in progress"))?;
363
364        let start = std::time::Instant::now();
365        let table_name = LanceDbStore::vertex_table_name(label);
366
367        if self.lancedb_store.table_exists(&table_name).await? {
368            let table = self.lancedb_store.open_table(&table_name).await?;
369            table.optimize(lancedb::table::OptimizeAction::All).await?;
370            self.invalidate_table_cache(label);
371        }
372
373        Ok(CompactionStats {
374            files_compacted: 1,
375            bytes_before: 0,
376            bytes_after: 0,
377            duration: start.elapsed(),
378            crdt_merges: 0,
379        })
380    }
381
382    pub async fn compact_edge_type(&self, edge_type: &str) -> Result<CompactionStats> {
383        let _guard = CompactionGuard::new(self.compaction_status.clone())
384            .ok_or_else(|| anyhow!("Compaction already in progress"))?;
385
386        let start = std::time::Instant::now();
387        let mut files_compacted = 0;
388
389        for dir in ["fwd", "bwd"] {
390            let table_name = LanceDbStore::delta_table_name(edge_type, dir);
391            if self.lancedb_store.table_exists(&table_name).await? {
392                let table = self.lancedb_store.open_table(&table_name).await?;
393                table.optimize(lancedb::table::OptimizeAction::All).await?;
394                files_compacted += 1;
395            }
396        }
397
398        Ok(CompactionStats {
399            files_compacted,
400            bytes_before: 0,
401            bytes_after: 0,
402            duration: start.elapsed(),
403            crdt_merges: 0,
404        })
405    }
406
407    pub async fn wait_for_compaction(&self) -> Result<()> {
408        loop {
409            let in_progress = {
410                acquire_mutex(&self.compaction_status, "compaction_status")?.compaction_in_progress
411            };
412            if !in_progress {
413                return Ok(());
414            }
415            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
416        }
417    }
418
419    pub fn start_background_compaction(
420        self: Arc<Self>,
421        mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
422    ) -> tokio::task::JoinHandle<()> {
423        if !self.config.compaction.enabled {
424            return tokio::spawn(async {});
425        }
426
427        tokio::spawn(async move {
428            // Use interval_at to delay the first tick. tokio::time::interval fires
429            // immediately on the first tick, which can race with queries that run
430            // right after database open. Delaying by the check_interval gives
431            // initial queries time to complete before compaction modifies tables
432            // (optimize(All) can GC index files that concurrent queries depend on).
433            let start = tokio::time::Instant::now() + self.config.compaction.check_interval;
434            let mut interval =
435                tokio::time::interval_at(start, self.config.compaction.check_interval);
436
437            loop {
438                tokio::select! {
439                    _ = interval.tick() => {
440                        if let Err(e) = self.update_compaction_status().await {
441                            log::error!("Failed to update compaction status: {}", e);
442                            continue;
443                        }
444
445                        if let Some(task) = self.pick_compaction_task() {
446                            log::info!("Triggering background compaction: {:?}", task);
447                            if let Err(e) = Self::execute_compaction(Arc::clone(&self), task).await {
448                                log::error!("Compaction failed: {}", e);
449                            }
450                        }
451                    }
452                    _ = shutdown_rx.recv() => {
453                        log::info!("Background compaction shutting down");
454                        let _ = self.wait_for_compaction().await;
455                        break;
456                    }
457                }
458            }
459        })
460    }
461
462    async fn update_compaction_status(&self) -> Result<()> {
463        let schema = self.schema_manager.schema();
464        let mut total_tables = 0;
465        let mut total_rows: usize = 0;
466        let mut oldest_ts: Option<i64> = None;
467
468        for name in schema.edge_types.keys() {
469            for dir in ["fwd", "bwd"] {
470                let table_name = LanceDbStore::delta_table_name(name, dir);
471                let Ok(table) = self.lancedb_store.open_table(&table_name).await else {
472                    continue;
473                };
474                let row_count = table.count_rows(None).await.unwrap_or(0);
475                if row_count == 0 {
476                    continue;
477                }
478                total_tables += 1;
479                total_rows += row_count;
480
481                // Query oldest _created_at for age tracking
482                let Ok(stream) = table
483                    .query()
484                    .select(Select::Columns(vec!["_created_at".to_string()]))
485                    .execute()
486                    .await
487                else {
488                    continue;
489                };
490                let Ok(batches) = stream.try_collect::<Vec<_>>().await else {
491                    continue;
492                };
493                for batch in batches {
494                    let Some(col) = batch
495                        .column_by_name("_created_at")
496                        .and_then(|c| c.as_any().downcast_ref::<TimestampNanosecondArray>())
497                    else {
498                        continue;
499                    };
500                    for i in 0..col.len() {
501                        if !col.is_null(i) {
502                            let ts = col.value(i);
503                            oldest_ts = Some(oldest_ts.map_or(ts, |prev| prev.min(ts)));
504                        }
505                    }
506                }
507            }
508        }
509
510        let oldest_l1_age = oldest_ts
511            .and_then(|ts| {
512                let created = UNIX_EPOCH + Duration::from_nanos(ts as u64);
513                SystemTime::now().duration_since(created).ok()
514            })
515            .unwrap_or(Duration::ZERO);
516
517        let mut status = acquire_mutex(&self.compaction_status, "compaction_status")?;
518        status.l1_runs = total_tables;
519        status.l1_size_bytes = (total_rows * ENTRY_SIZE_ESTIMATE) as u64;
520        status.oldest_l1_age = oldest_l1_age;
521        Ok(())
522    }
523
524    fn pick_compaction_task(&self) -> Option<CompactionTask> {
525        let status = acquire_mutex(&self.compaction_status, "compaction_status").ok()?;
526
527        if status.l1_runs >= self.config.compaction.max_l1_runs {
528            return Some(CompactionTask::ByRunCount);
529        }
530        if status.l1_size_bytes >= self.config.compaction.max_l1_size_bytes {
531            return Some(CompactionTask::BySize);
532        }
533        if status.oldest_l1_age >= self.config.compaction.max_l1_age
534            && status.oldest_l1_age > Duration::ZERO
535        {
536            return Some(CompactionTask::ByAge);
537        }
538
539        None
540    }
541
542    /// Open a table by name and run `optimize(All)`, returning `true` on success.
543    async fn optimize_table(store: &LanceDbStore, table_name: &str) -> bool {
544        let Ok(table) = store.open_table(table_name).await else {
545            return false;
546        };
547        if let Err(e) = table.optimize(lancedb::table::OptimizeAction::All).await {
548            log::warn!("Failed to optimize table {}: {}", table_name, e);
549            return false;
550        }
551        true
552    }
553
554    async fn execute_compaction(this: Arc<Self>, _task: CompactionTask) -> Result<CompactionStats> {
555        let start = std::time::Instant::now();
556        let _guard = CompactionGuard::new(this.compaction_status.clone())
557            .ok_or_else(|| anyhow!("Compaction already in progress"))?;
558
559        let schema = this.schema_manager.schema();
560        let mut files_compacted = 0;
561
562        // ── Tier 2: Semantic compaction ──
563        // Dedup vertices, merge CRDTs, consolidate L1→L2 deltas, clean tombstones
564        let compactor = Compactor::new(Arc::clone(&this));
565        let compaction_results = compactor.compact_all().await.unwrap_or_else(|e| {
566            log::error!(
567                "Semantic compaction failed (continuing with Lance optimize): {}",
568                e
569            );
570            Vec::new()
571        });
572
573        // Re-warm adjacency CSR after semantic compaction
574        let am = this.adjacency_manager();
575        for info in &compaction_results {
576            let direction = match info.direction.as_str() {
577                "fwd" => Direction::Outgoing,
578                "bwd" => Direction::Incoming,
579                _ => continue,
580            };
581            if let Some(etid) = schema.edge_type_id_unified_case_insensitive(&info.edge_type)
582                && let Err(e) = am.warm(&this, etid, direction, None).await
583            {
584                log::warn!(
585                    "Failed to re-warm adjacency for {}/{}: {}",
586                    info.edge_type,
587                    info.direction,
588                    e
589                );
590            }
591        }
592
593        // ── Tier 3: Lance optimize ──
594        let store = &this.lancedb_store;
595
596        // Optimize edge delta and adjacency tables
597        for name in schema.edge_types.keys() {
598            for dir in ["fwd", "bwd"] {
599                let delta = LanceDbStore::delta_table_name(name, dir);
600                if Self::optimize_table(store, &delta).await {
601                    files_compacted += 1;
602                }
603                let adj = LanceDbStore::adjacency_table_name(name, dir);
604                if Self::optimize_table(store, &adj).await {
605                    files_compacted += 1;
606                }
607            }
608        }
609
610        // Optimize vertex tables
611        for label in schema.labels.keys() {
612            let table_name = LanceDbStore::vertex_table_name(label);
613            if Self::optimize_table(store, &table_name).await {
614                files_compacted += 1;
615                this.invalidate_table_cache(label);
616            }
617        }
618
619        // Optimize main vertex and edge tables
620        for table_name in [
621            LanceDbStore::main_vertex_table_name(),
622            LanceDbStore::main_edge_table_name(),
623        ] {
624            if Self::optimize_table(store, table_name).await {
625                files_compacted += 1;
626            }
627        }
628
629        {
630            let mut status = acquire_mutex(&this.compaction_status, "compaction_status")?;
631            status.total_compactions += 1;
632        }
633
634        Ok(CompactionStats {
635            files_compacted,
636            bytes_before: 0,
637            bytes_after: 0,
638            duration: start.elapsed(),
639            crdt_merges: 0,
640        })
641    }
642
643    /// Get or open a cached table for a label
644    pub async fn get_cached_table(&self, label: &str) -> Result<lancedb::Table> {
645        // Check cache first
646        if let Some(table) = self.table_cache.get(label) {
647            return Ok(table.clone());
648        }
649
650        // Open and cache
651        let table_name = LanceDbStore::vertex_table_name(label);
652        let table = self.lancedb_store.open_table(&table_name).await?;
653
654        self.table_cache.insert(label.to_string(), table.clone());
655        Ok(table)
656    }
657
658    /// Invalidate cached table (call after writes)
659    pub fn invalidate_table_cache(&self, label: &str) {
660        self.table_cache.remove(label);
661    }
662
663    /// Clear all cached tables
664    pub fn clear_table_cache(&self) {
665        self.table_cache.clear();
666    }
667
668    pub fn base_path(&self) -> &str {
669        &self.base_uri
670    }
671
672    pub fn schema_manager(&self) -> &SchemaManager {
673        &self.schema_manager
674    }
675
676    pub fn schema_manager_arc(&self) -> Arc<SchemaManager> {
677        self.schema_manager.clone()
678    }
679
680    /// Get the adjacency manager for the dual-CSR architecture.
681    pub fn adjacency_manager(&self) -> Arc<AdjacencyManager> {
682        Arc::clone(&self.adjacency_manager)
683    }
684
685    /// Warm the adjacency manager for a specific edge type and direction.
686    ///
687    /// Builds the Main CSR from L2 adjacency + L1 delta data in storage.
688    /// Called lazily on first access per edge type or at startup.
689    pub async fn warm_adjacency(
690        &self,
691        edge_type_id: u32,
692        direction: crate::storage::direction::Direction,
693        version: Option<u64>,
694    ) -> anyhow::Result<()> {
695        self.adjacency_manager
696            .warm(self, edge_type_id, direction, version)
697            .await
698    }
699
700    /// Coalesced warm_adjacency() to prevent cache stampede (Issue #13).
701    ///
702    /// Uses double-checked locking to ensure only one concurrent warm() per
703    /// (edge_type, direction) key. Subsequent callers wait for the first to complete.
704    pub async fn warm_adjacency_coalesced(
705        &self,
706        edge_type_id: u32,
707        direction: crate::storage::direction::Direction,
708        version: Option<u64>,
709    ) -> anyhow::Result<()> {
710        self.adjacency_manager
711            .warm_coalesced(self, edge_type_id, direction, version)
712            .await
713    }
714
715    /// Check whether the adjacency manager has a CSR for the given edge type and direction.
716    pub fn has_adjacency_csr(
717        &self,
718        edge_type_id: u32,
719        direction: crate::storage::direction::Direction,
720    ) -> bool {
721        self.adjacency_manager.has_csr(edge_type_id, direction)
722    }
723
724    /// Get neighbors at a specific version for snapshot queries.
725    pub fn get_neighbors_at_version(
726        &self,
727        vid: uni_common::core::id::Vid,
728        edge_type: u32,
729        direction: crate::storage::direction::Direction,
730        version: u64,
731    ) -> Vec<(uni_common::core::id::Vid, uni_common::core::id::Eid)> {
732        self.adjacency_manager
733            .get_neighbors_at_version(vid, edge_type, direction, version)
734    }
735
736    /// Get the LanceDB store.
737    pub fn lancedb_store(&self) -> &LanceDbStore {
738        &self.lancedb_store
739    }
740
741    /// Get the LanceDB store as an Arc.
742    pub fn lancedb_store_arc(&self) -> Arc<LanceDbStore> {
743        self.lancedb_store.clone()
744    }
745
746    /// Rebuild the VidLabelsIndex from the main vertex table.
747    /// This is called on startup if enable_vid_labels_index is true.
748    async fn rebuild_vid_labels_index(&mut self) -> Result<()> {
749        use crate::lancedb::LanceDbStore;
750        use crate::storage::vid_labels::VidLabelsIndex;
751
752        let lancedb_store = self.lancedb_store();
753
754        // Open the main vertex table
755        let table = match lancedb_store
756            .open_table(LanceDbStore::main_vertex_table_name())
757            .await
758        {
759            Ok(t) => t,
760            Err(_) => {
761                // Table doesn't exist yet (fresh database)
762                self.vid_labels_index =
763                    Some(Arc::new(parking_lot::RwLock::new(VidLabelsIndex::new())));
764                return Ok(());
765            }
766        };
767
768        // Scan all non-deleted vertices and collect (VID, labels)
769        let batches = table
770            .query()
771            .only_if("_deleted = false")
772            .limit(100_000) // Reasonable batch size
773            .execute()
774            .await
775            .map_err(|e| anyhow!("Failed to query main vertex table: {}", e))?
776            .try_collect::<Vec<_>>()
777            .await
778            .map_err(|e| anyhow!("Failed to collect vertex data: {}", e))?;
779
780        let mut index = VidLabelsIndex::new();
781        for batch in batches {
782            let vid_col = batch
783                .column_by_name("_vid")
784                .ok_or_else(|| anyhow!("Missing _vid column"))?
785                .as_any()
786                .downcast_ref::<UInt64Array>()
787                .ok_or_else(|| anyhow!("Invalid _vid column type"))?;
788
789            let labels_col = batch
790                .column_by_name("labels")
791                .ok_or_else(|| anyhow!("Missing labels column"))?
792                .as_any()
793                .downcast_ref::<arrow_array::ListArray>()
794                .ok_or_else(|| anyhow!("Invalid labels column type"))?;
795
796            for row_idx in 0..batch.num_rows() {
797                let vid = Vid::from(vid_col.value(row_idx));
798                let labels_array = labels_col.value(row_idx);
799                let labels_str_array = labels_array
800                    .as_any()
801                    .downcast_ref::<arrow_array::StringArray>()
802                    .ok_or_else(|| anyhow!("Invalid labels array element type"))?;
803
804                let labels: Vec<String> = (0..labels_str_array.len())
805                    .map(|i| labels_str_array.value(i).to_string())
806                    .collect();
807
808                index.insert(vid, labels);
809            }
810        }
811
812        self.vid_labels_index = Some(Arc::new(parking_lot::RwLock::new(index)));
813        Ok(())
814    }
815
816    /// Get labels for a VID from the in-memory index.
817    /// Returns None if the index is disabled or the VID is not found.
818    pub fn get_labels_from_index(&self, vid: Vid) -> Option<Vec<String>> {
819        self.vid_labels_index.as_ref().and_then(|idx| {
820            let index = idx.read();
821            index.get_labels(vid).map(|labels| labels.to_vec())
822        })
823    }
824
825    /// Update the VID-to-labels mapping in the index.
826    /// No-op if the index is disabled.
827    pub fn update_vid_labels_index(&self, vid: Vid, labels: Vec<String>) {
828        if let Some(idx) = &self.vid_labels_index {
829            let mut index = idx.write();
830            index.insert(vid, labels);
831        }
832    }
833
834    /// Remove a VID from the labels index.
835    /// No-op if the index is disabled.
836    pub fn remove_from_vid_labels_index(&self, vid: Vid) {
837        if let Some(idx) = &self.vid_labels_index {
838            let mut index = idx.write();
839            index.remove_vid(vid);
840        }
841    }
842
843    pub async fn load_subgraph_cached(
844        &self,
845        start_vids: &[Vid],
846        edge_types: &[u32],
847        max_hops: usize,
848        direction: GraphDirection,
849        _l0: Option<Arc<RwLock<L0Buffer>>>,
850    ) -> Result<WorkingGraph> {
851        let mut graph = WorkingGraph::new();
852
853        let dir = match direction {
854            GraphDirection::Outgoing => crate::storage::direction::Direction::Outgoing,
855            GraphDirection::Incoming => crate::storage::direction::Direction::Incoming,
856        };
857
858        let neighbor_is_dst = matches!(direction, GraphDirection::Outgoing);
859
860        // Initialize frontier
861        let mut frontier: Vec<Vid> = start_vids.to_vec();
862        let mut visited: HashSet<Vid> = HashSet::new();
863
864        // Initialize start vids
865        for &vid in start_vids {
866            graph.add_vertex(vid);
867        }
868
869        for _hop in 0..max_hops {
870            let mut next_frontier = HashSet::new();
871
872            for &vid in &frontier {
873                if visited.contains(&vid) {
874                    continue;
875                }
876                visited.insert(vid);
877                graph.add_vertex(vid);
878
879                for &etype_id in edge_types {
880                    // Warm adjacency with coalescing to prevent cache stampede (Issue #13)
881                    let edge_ver = self.version_high_water_mark();
882                    self.adjacency_manager
883                        .warm_coalesced(self, etype_id, dir, edge_ver)
884                        .await?;
885
886                    // Get neighbors from AdjacencyManager (Main CSR + overlay)
887                    let edges = self.adjacency_manager.get_neighbors(vid, etype_id, dir);
888
889                    for (neighbor_vid, eid) in edges {
890                        graph.add_vertex(neighbor_vid);
891                        if !visited.contains(&neighbor_vid) {
892                            next_frontier.insert(neighbor_vid);
893                        }
894
895                        if neighbor_is_dst {
896                            graph.add_edge(vid, neighbor_vid, eid, etype_id);
897                        } else {
898                            graph.add_edge(neighbor_vid, vid, eid, etype_id);
899                        }
900                    }
901                }
902            }
903            frontier = next_frontier.into_iter().collect();
904
905            // Early termination: if frontier is empty, no more vertices to explore
906            if frontier.is_empty() {
907                break;
908            }
909        }
910
911        Ok(graph)
912    }
913
914    pub fn snapshot_manager(&self) -> &SnapshotManager {
915        &self.snapshot_manager
916    }
917
918    pub fn index_manager(&self) -> IndexManager {
919        IndexManager::new(
920            &self.base_uri,
921            self.schema_manager.clone(),
922            self.lancedb_store.clone(),
923        )
924    }
925
926    pub fn vertex_dataset(&self, label: &str) -> Result<VertexDataset> {
927        let schema = self.schema_manager.schema();
928        let label_meta = schema
929            .labels
930            .get(label)
931            .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
932        Ok(VertexDataset::new(&self.base_uri, label, label_meta.id))
933    }
934
935    pub fn edge_dataset(
936        &self,
937        edge_type: &str,
938        src_label: &str,
939        dst_label: &str,
940    ) -> Result<EdgeDataset> {
941        Ok(EdgeDataset::new(
942            &self.base_uri,
943            edge_type,
944            src_label,
945            dst_label,
946        ))
947    }
948
949    pub fn delta_dataset(&self, edge_type: &str, direction: &str) -> Result<DeltaDataset> {
950        Ok(DeltaDataset::new(&self.base_uri, edge_type, direction))
951    }
952
953    pub fn adjacency_dataset(
954        &self,
955        edge_type: &str,
956        label: &str,
957        direction: &str,
958    ) -> Result<AdjacencyDataset> {
959        Ok(AdjacencyDataset::new(
960            &self.base_uri,
961            edge_type,
962            label,
963            direction,
964        ))
965    }
966
967    /// Get the main vertex dataset for unified vertex storage.
968    ///
969    /// The main vertex dataset contains all vertices regardless of label,
970    /// enabling fast ID-based lookups without knowing the label.
971    pub fn main_vertex_dataset(&self) -> MainVertexDataset {
972        MainVertexDataset::new(&self.base_uri)
973    }
974
975    /// Get the main edge dataset for unified edge storage.
976    ///
977    /// The main edge dataset contains all edges regardless of type,
978    /// enabling fast ID-based lookups without knowing the edge type.
979    pub fn main_edge_dataset(&self) -> MainEdgeDataset {
980        MainEdgeDataset::new(&self.base_uri)
981    }
982
983    pub fn uid_index(&self, label: &str) -> Result<UidIndex> {
984        Ok(UidIndex::new(&self.base_uri, label))
985    }
986
987    pub async fn inverted_index(&self, label: &str, property: &str) -> Result<InvertedIndex> {
988        let schema = self.schema_manager.schema();
989        let config = schema
990            .indexes
991            .iter()
992            .find_map(|idx| match idx {
993                IndexDefinition::Inverted(cfg)
994                    if cfg.label == label && cfg.property == property =>
995                {
996                    Some(cfg.clone())
997                }
998                _ => None,
999            })
1000            .ok_or_else(|| anyhow!("Inverted index not found for {}.{}", label, property))?;
1001
1002        InvertedIndex::new(&self.base_uri, config).await
1003    }
1004
1005    pub async fn vector_search(
1006        &self,
1007        label: &str,
1008        property: &str,
1009        query: &[f32],
1010        k: usize,
1011        filter: Option<&str>,
1012        ctx: Option<&QueryContext>,
1013    ) -> Result<Vec<(Vid, f32)>> {
1014        // Look up vector index config to get the correct distance metric.
1015        let schema = self.schema_manager.schema();
1016        let metric = schema
1017            .vector_index_for_property(label, property)
1018            .map(|config| config.metric.clone())
1019            .unwrap_or(DistanceMetric::L2);
1020
1021        // Try to open the cached table; if the label has no data yet the Lance
1022        // table won't exist. In that case fall back to L0-only results.
1023        let table = self.get_cached_table(label).await.ok();
1024
1025        let mut results = Vec::new();
1026
1027        if let Some(table) = table {
1028            let distance_type = match &metric {
1029                DistanceMetric::L2 => lancedb::DistanceType::L2,
1030                DistanceMetric::Cosine => lancedb::DistanceType::Cosine,
1031                DistanceMetric::Dot => lancedb::DistanceType::Dot,
1032                _ => lancedb::DistanceType::L2,
1033            };
1034
1035            // Use LanceDB's vector search API
1036            let mut query_builder = table
1037                .vector_search(query.to_vec())
1038                .map_err(|e| anyhow!("Failed to create vector search: {}", e))?
1039                .column(property)
1040                .distance_type(distance_type)
1041                .limit(k);
1042
1043            query_builder = query_builder.only_if(Self::build_active_filter(filter));
1044
1045            // Apply version filtering if snapshot is pinned
1046            if ctx.is_some()
1047                && let Some(hwm) = self.version_high_water_mark()
1048            {
1049                query_builder = query_builder.only_if(format!("_version <= {}", hwm));
1050            }
1051
1052            let batches = query_builder
1053                .execute()
1054                .await
1055                .map_err(|e| anyhow!("Vector search execution failed: {}", e))?
1056                .try_collect::<Vec<_>>()
1057                .await
1058                .map_err(|e| anyhow!("Failed to collect vector search results: {}", e))?;
1059
1060            results = extract_vid_score_pairs(&batches, "_vid", "_distance")?;
1061        }
1062
1063        // Merge L0 buffer vertices into results for visibility of unflushed data.
1064        if let Some(qctx) = ctx {
1065            merge_l0_into_vector_results(&mut results, qctx, label, property, query, k, &metric);
1066        }
1067
1068        Ok(results)
1069    }
1070
1071    /// Perform a full-text search with BM25 scoring.
1072    ///
1073    /// Returns vertices matching the search query along with their BM25 scores.
1074    /// Results are sorted by score descending (most relevant first).
1075    ///
1076    /// # Arguments
1077    /// * `label` - The label to search within
1078    /// * `property` - The property column to search (must have FTS index)
1079    /// * `query` - The search query text
1080    /// * `k` - Maximum number of results to return
1081    /// * `filter` - Optional Lance filter expression
1082    /// * `ctx` - Optional query context for visibility checks
1083    ///
1084    /// # Returns
1085    /// Vector of (Vid, score) tuples, where score is the BM25 relevance score.
1086    pub async fn fts_search(
1087        &self,
1088        label: &str,
1089        property: &str,
1090        query: &str,
1091        k: usize,
1092        filter: Option<&str>,
1093        ctx: Option<&QueryContext>,
1094    ) -> Result<Vec<(Vid, f32)>> {
1095        use lance_index::scalar::FullTextSearchQuery;
1096        use lance_index::scalar::inverted::query::MatchQuery;
1097
1098        // Try to open the cached table; if the label has no data yet the Lance
1099        // table won't exist. Fall through to L0 merge with empty results.
1100        let table = self.get_cached_table(label).await.ok();
1101
1102        let mut results = if let Some(table) = table {
1103            // Build the FTS query with specific column
1104            let match_query =
1105                MatchQuery::new(query.to_string()).with_column(Some(property.to_string()));
1106            let fts_query = FullTextSearchQuery {
1107                query: match_query.into(),
1108                limit: Some(k as i64),
1109                wand_factor: None,
1110            };
1111
1112            let mut query_builder = table.query().full_text_search(fts_query).limit(k);
1113
1114            query_builder = query_builder.only_if(Self::build_active_filter(filter));
1115
1116            // Apply version filtering if snapshot is pinned
1117            if ctx.is_some()
1118                && let Some(hwm) = self.version_high_water_mark()
1119            {
1120                query_builder = query_builder.only_if(format!("_version <= {}", hwm));
1121            }
1122
1123            let batches = query_builder
1124                .execute()
1125                .await
1126                .map_err(|e| anyhow!("FTS search execution failed: {}", e))?
1127                .try_collect::<Vec<_>>()
1128                .await
1129                .map_err(|e| anyhow!("Failed to collect FTS search results: {}", e))?;
1130
1131            let mut lance_results = extract_vid_score_pairs(&batches, "_vid", "_score")?;
1132            // Results should already be sorted by score from Lance, but ensure descending order
1133            lance_results
1134                .sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
1135            lance_results
1136        } else {
1137            Vec::new()
1138        };
1139
1140        // Merge L0 buffer vertices for visibility of unflushed data.
1141        if let Some(qctx) = ctx {
1142            merge_l0_into_fts_results(&mut results, qctx, label, property, query, k);
1143        }
1144
1145        Ok(results)
1146    }
1147
1148    pub async fn get_vertex_by_uid(&self, uid: &UniId, label: &str) -> Result<Option<Vid>> {
1149        let index = self.uid_index(label)?;
1150        index.get_vid(uid).await
1151    }
1152
1153    pub async fn insert_vertex_with_uid(&self, label: &str, vid: Vid, uid: UniId) -> Result<()> {
1154        let index = self.uid_index(label)?;
1155        index.write_mapping(&[(uid, vid)]).await
1156    }
1157
1158    pub async fn load_subgraph(
1159        &self,
1160        start_vids: &[Vid],
1161        edge_types: &[u32],
1162        max_hops: usize,
1163        direction: GraphDirection,
1164        l0: Option<&L0Buffer>,
1165    ) -> Result<WorkingGraph> {
1166        let mut graph = WorkingGraph::new();
1167        let schema = self.schema_manager.schema();
1168
1169        // Build maps for ID lookups
1170        let label_map: HashMap<u16, String> = schema
1171            .labels
1172            .values()
1173            .map(|meta| {
1174                (
1175                    meta.id,
1176                    schema.label_name_by_id(meta.id).unwrap().to_owned(),
1177                )
1178            })
1179            .collect();
1180
1181        let edge_type_map: HashMap<u32, String> = schema
1182            .edge_types
1183            .values()
1184            .map(|meta| {
1185                (
1186                    meta.id,
1187                    schema.edge_type_name_by_id(meta.id).unwrap().to_owned(),
1188                )
1189            })
1190            .collect();
1191
1192        let target_edge_types: HashSet<u32> = edge_types.iter().cloned().collect();
1193
1194        // Initialize frontier
1195        let mut frontier: Vec<Vid> = start_vids.to_vec();
1196        let mut visited: HashSet<Vid> = HashSet::new();
1197
1198        // Add start vertices to graph
1199        for &vid in start_vids {
1200            graph.add_vertex(vid);
1201        }
1202
1203        for _hop in 0..max_hops {
1204            let mut next_frontier = HashSet::new();
1205
1206            for &vid in &frontier {
1207                if visited.contains(&vid) {
1208                    continue;
1209                }
1210                visited.insert(vid);
1211                graph.add_vertex(vid);
1212
1213                // For each edge type we want to traverse
1214                for &etype_id in &target_edge_types {
1215                    let etype_name = edge_type_map
1216                        .get(&etype_id)
1217                        .ok_or_else(|| anyhow!("Unknown edge type ID: {}", etype_id))?;
1218
1219                    // Determine directions
1220                    // Storage direction: "fwd" or "bwd".
1221                    // Query direction: Outgoing -> "fwd", Incoming -> "bwd".
1222                    let (dir_str, neighbor_is_dst) = match direction {
1223                        GraphDirection::Outgoing => ("fwd", true),
1224                        GraphDirection::Incoming => ("bwd", false),
1225                    };
1226
1227                    let mut edges: HashMap<Eid, EdgeState> = HashMap::new();
1228
1229                    // 1. L2: Adjacency (Base)
1230                    // In the new storage model, VIDs don't embed label info.
1231                    // We need to try all labels to find the adjacency data.
1232                    // Edge version from snapshot (reserved for future version filtering)
1233                    let _edge_ver = self
1234                        .pinned_snapshot
1235                        .as_ref()
1236                        .and_then(|s| s.edges.get(etype_name).map(|es| es.lance_version));
1237
1238                    // Try each label until we find adjacency data
1239                    let lancedb_store = self.lancedb_store();
1240                    for current_src_label in label_map.values() {
1241                        let adj_ds =
1242                            match self.adjacency_dataset(etype_name, current_src_label, dir_str) {
1243                                Ok(ds) => ds,
1244                                Err(_) => continue,
1245                            };
1246                        if let Some((neighbors, eids)) =
1247                            adj_ds.read_adjacency_lancedb(lancedb_store, vid).await?
1248                        {
1249                            for (n, eid) in neighbors.into_iter().zip(eids) {
1250                                edges.insert(
1251                                    eid,
1252                                    EdgeState {
1253                                        neighbor: n,
1254                                        version: 0,
1255                                        deleted: false,
1256                                    },
1257                                );
1258                            }
1259                            break; // Found adjacency data for this vid, no need to try other labels
1260                        }
1261                    }
1262
1263                    // 2. L1: Delta
1264                    let delta_ds = self.delta_dataset(etype_name, dir_str)?;
1265                    let delta_entries = delta_ds
1266                        .read_deltas_lancedb(
1267                            lancedb_store,
1268                            vid,
1269                            &schema,
1270                            self.version_high_water_mark(),
1271                        )
1272                        .await?;
1273                    Self::apply_delta_to_edges(&mut edges, delta_entries, neighbor_is_dst);
1274
1275                    // 3. L0: Buffer
1276                    if let Some(l0) = l0 {
1277                        Self::apply_l0_to_edges(&mut edges, l0, vid, etype_id, direction);
1278                    }
1279
1280                    // Add resulting edges to graph
1281                    Self::add_edges_to_graph(
1282                        &mut graph,
1283                        edges,
1284                        vid,
1285                        etype_id,
1286                        neighbor_is_dst,
1287                        &visited,
1288                        &mut next_frontier,
1289                    );
1290                }
1291            }
1292            frontier = next_frontier.into_iter().collect();
1293
1294            // Early termination: if frontier is empty, no more vertices to explore
1295            if frontier.is_empty() {
1296                break;
1297            }
1298        }
1299
1300        Ok(graph)
1301    }
1302
1303    /// Apply delta entries to edge state map, handling version conflicts.
1304    fn apply_delta_to_edges(
1305        edges: &mut HashMap<Eid, EdgeState>,
1306        delta_entries: Vec<crate::storage::delta::L1Entry>,
1307        neighbor_is_dst: bool,
1308    ) {
1309        for entry in delta_entries {
1310            let neighbor = if neighbor_is_dst {
1311                entry.dst_vid
1312            } else {
1313                entry.src_vid
1314            };
1315            let current_ver = edges.get(&entry.eid).map(|s| s.version).unwrap_or(0);
1316
1317            if entry.version > current_ver {
1318                edges.insert(
1319                    entry.eid,
1320                    EdgeState {
1321                        neighbor,
1322                        version: entry.version,
1323                        deleted: matches!(entry.op, Op::Delete),
1324                    },
1325                );
1326            }
1327        }
1328    }
1329
1330    /// Apply L0 buffer edges and tombstones to edge state map.
1331    fn apply_l0_to_edges(
1332        edges: &mut HashMap<Eid, EdgeState>,
1333        l0: &L0Buffer,
1334        vid: Vid,
1335        etype_id: u32,
1336        direction: GraphDirection,
1337    ) {
1338        let l0_neighbors = l0.get_neighbors(vid, etype_id, direction);
1339        for (neighbor, eid, ver) in l0_neighbors {
1340            let current_ver = edges.get(&eid).map(|s| s.version).unwrap_or(0);
1341            if ver > current_ver {
1342                edges.insert(
1343                    eid,
1344                    EdgeState {
1345                        neighbor,
1346                        version: ver,
1347                        deleted: false,
1348                    },
1349                );
1350            }
1351        }
1352
1353        // Check tombstones in L0
1354        for (eid, state) in edges.iter_mut() {
1355            if l0.is_tombstoned(*eid) {
1356                state.deleted = true;
1357            }
1358        }
1359    }
1360
1361    /// Add non-deleted edges to graph and collect next frontier.
1362    fn add_edges_to_graph(
1363        graph: &mut WorkingGraph,
1364        edges: HashMap<Eid, EdgeState>,
1365        vid: Vid,
1366        etype_id: u32,
1367        neighbor_is_dst: bool,
1368        visited: &HashSet<Vid>,
1369        next_frontier: &mut HashSet<Vid>,
1370    ) {
1371        for (eid, state) in edges {
1372            if state.deleted {
1373                continue;
1374            }
1375            graph.add_vertex(state.neighbor);
1376
1377            if !visited.contains(&state.neighbor) {
1378                next_frontier.insert(state.neighbor);
1379            }
1380
1381            if neighbor_is_dst {
1382                graph.add_edge(vid, state.neighbor, eid, etype_id);
1383            } else {
1384                graph.add_edge(state.neighbor, vid, eid, etype_id);
1385            }
1386        }
1387    }
1388}
1389
1390/// Extracts `(Vid, f32)` pairs from record batches using the given VID and score column names.
1391fn extract_vid_score_pairs(
1392    batches: &[arrow_array::RecordBatch],
1393    vid_column: &str,
1394    score_column: &str,
1395) -> Result<Vec<(Vid, f32)>> {
1396    let mut results = Vec::new();
1397    for batch in batches {
1398        let vid_col = batch
1399            .column_by_name(vid_column)
1400            .ok_or_else(|| anyhow!("Missing {} column", vid_column))?
1401            .as_any()
1402            .downcast_ref::<UInt64Array>()
1403            .ok_or_else(|| anyhow!("Invalid {} column type", vid_column))?;
1404
1405        let score_col = batch
1406            .column_by_name(score_column)
1407            .ok_or_else(|| anyhow!("Missing {} column", score_column))?
1408            .as_any()
1409            .downcast_ref::<Float32Array>()
1410            .ok_or_else(|| anyhow!("Invalid {} column type", score_column))?;
1411
1412        for i in 0..batch.num_rows() {
1413            results.push((Vid::from(vid_col.value(i)), score_col.value(i)));
1414        }
1415    }
1416    Ok(results)
1417}
1418
1419/// Extracts a `Vec<f32>` from a JSON property value.
1420///
1421/// Returns `None` if the property is missing, not an array, or contains
1422/// non-numeric elements.
1423fn extract_embedding_from_props(
1424    props: &uni_common::Properties,
1425    property: &str,
1426) -> Option<Vec<f32>> {
1427    let arr = props.get(property)?.as_array()?;
1428    arr.iter().map(|v| v.as_f64().map(|f| f as f32)).collect()
1429}
1430
1431/// Merges L0 buffer vertices into LanceDB vector search results.
1432///
1433/// Visits L0 buffers in precedence order (pending flush → main → transaction),
1434/// collects tombstoned VIDs and candidate embeddings, then merges them with the
1435/// existing LanceDB results so that:
1436/// - Tombstoned VIDs are removed (unless re-created in a later L0).
1437/// - VIDs present in both L0 and LanceDB use the L0 distance.
1438/// - New L0-only VIDs are appended.
1439/// - Results are re-sorted by distance ascending and truncated to `k`.
1440fn merge_l0_into_vector_results(
1441    results: &mut Vec<(Vid, f32)>,
1442    ctx: &QueryContext,
1443    label: &str,
1444    property: &str,
1445    query: &[f32],
1446    k: usize,
1447    metric: &DistanceMetric,
1448) {
1449    // Collect all L0 buffers in precedence order (earliest first, last writer wins).
1450    let mut buffers: Vec<Arc<parking_lot::RwLock<L0Buffer>>> =
1451        ctx.pending_flush_l0s.iter().map(Arc::clone).collect();
1452    buffers.push(Arc::clone(&ctx.l0));
1453    if let Some(ref txn) = ctx.transaction_l0 {
1454        buffers.push(Arc::clone(txn));
1455    }
1456
1457    // Maps VID → distance for L0 candidates (last writer wins).
1458    let mut l0_candidates: HashMap<Vid, f32> = HashMap::new();
1459    // Tombstoned VIDs across all L0 buffers.
1460    let mut tombstoned: HashSet<Vid> = HashSet::new();
1461
1462    for buf_arc in &buffers {
1463        let buf = buf_arc.read();
1464
1465        // Accumulate tombstones.
1466        for &vid in &buf.vertex_tombstones {
1467            tombstoned.insert(vid);
1468        }
1469
1470        // Scan vertices with the target label.
1471        for (&vid, labels) in &buf.vertex_labels {
1472            if !labels.iter().any(|l| l == label) {
1473                continue;
1474            }
1475            if let Some(props) = buf.vertex_properties.get(&vid)
1476                && let Some(emb) = extract_embedding_from_props(props, property)
1477            {
1478                if emb.len() != query.len() {
1479                    continue; // dimension mismatch
1480                }
1481                let dist = metric.compute_distance(&emb, query);
1482                // Last writer wins: later buffer overwrites earlier.
1483                l0_candidates.insert(vid, dist);
1484                // If re-created in a later L0, remove from tombstones.
1485                tombstoned.remove(&vid);
1486            }
1487        }
1488    }
1489
1490    // If no L0 activity affects this search, skip merge.
1491    if l0_candidates.is_empty() && tombstoned.is_empty() {
1492        return;
1493    }
1494
1495    // Remove tombstoned VIDs from LanceDB results.
1496    results.retain(|(vid, _)| !tombstoned.contains(vid));
1497
1498    // Overwrite or append L0 candidates.
1499    for (vid, dist) in &l0_candidates {
1500        if let Some(existing) = results.iter_mut().find(|(v, _)| v == vid) {
1501            existing.1 = *dist;
1502        } else {
1503            results.push((*vid, *dist));
1504        }
1505    }
1506
1507    // Re-sort by distance ascending.
1508    results.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
1509    results.truncate(k);
1510}
1511
1512/// Computes a simple token-overlap relevance score between a query and text.
1513///
1514/// Returns the fraction of query tokens found in the text (case-insensitive),
1515/// producing a score in [0.0, 1.0]. Sufficient for the small L0 buffer.
1516fn compute_text_relevance(query: &str, text: &str) -> f32 {
1517    let query_tokens: HashSet<String> =
1518        query.split_whitespace().map(|t| t.to_lowercase()).collect();
1519    if query_tokens.is_empty() {
1520        return 0.0;
1521    }
1522    let text_tokens: HashSet<String> = text.split_whitespace().map(|t| t.to_lowercase()).collect();
1523    let hits = query_tokens
1524        .iter()
1525        .filter(|t| text_tokens.contains(t.as_str()))
1526        .count();
1527    hits as f32 / query_tokens.len() as f32
1528}
1529
1530/// Extracts a string slice from a property value.
1531fn extract_text_from_props<'a>(
1532    props: &'a uni_common::Properties,
1533    property: &str,
1534) -> Option<&'a str> {
1535    props.get(property)?.as_str()
1536}
1537
1538/// Merges L0 buffer vertices into LanceDB full-text search results.
1539///
1540/// Follows the same pattern as [`merge_l0_into_vector_results`]: visits L0
1541/// buffers in precedence order, collects tombstoned VIDs and text-match
1542/// candidates, then merges them so that:
1543/// - Tombstoned VIDs are removed (unless re-created in a later L0).
1544/// - VIDs present in both L0 and LanceDB use the L0 score.
1545/// - New L0-only VIDs are appended.
1546/// - Results are re-sorted by score **descending** and truncated to `k`.
1547fn merge_l0_into_fts_results(
1548    results: &mut Vec<(Vid, f32)>,
1549    ctx: &QueryContext,
1550    label: &str,
1551    property: &str,
1552    query: &str,
1553    k: usize,
1554) {
1555    // Collect all L0 buffers in precedence order (earliest first, last writer wins).
1556    let mut buffers: Vec<Arc<parking_lot::RwLock<L0Buffer>>> =
1557        ctx.pending_flush_l0s.iter().map(Arc::clone).collect();
1558    buffers.push(Arc::clone(&ctx.l0));
1559    if let Some(ref txn) = ctx.transaction_l0 {
1560        buffers.push(Arc::clone(txn));
1561    }
1562
1563    // Maps VID → relevance score for L0 candidates (last writer wins).
1564    let mut l0_candidates: HashMap<Vid, f32> = HashMap::new();
1565    // Tombstoned VIDs across all L0 buffers.
1566    let mut tombstoned: HashSet<Vid> = HashSet::new();
1567
1568    for buf_arc in &buffers {
1569        let buf = buf_arc.read();
1570
1571        // Accumulate tombstones.
1572        for &vid in &buf.vertex_tombstones {
1573            tombstoned.insert(vid);
1574        }
1575
1576        // Scan vertices with the target label.
1577        for (&vid, labels) in &buf.vertex_labels {
1578            if !labels.iter().any(|l| l == label) {
1579                continue;
1580            }
1581            if let Some(props) = buf.vertex_properties.get(&vid)
1582                && let Some(text) = extract_text_from_props(props, property)
1583            {
1584                let score = compute_text_relevance(query, text);
1585                if score > 0.0 {
1586                    // Last writer wins: later buffer overwrites earlier.
1587                    l0_candidates.insert(vid, score);
1588                }
1589                // If re-created in a later L0, remove from tombstones.
1590                tombstoned.remove(&vid);
1591            }
1592        }
1593    }
1594
1595    // If no L0 activity affects this search, skip merge.
1596    if l0_candidates.is_empty() && tombstoned.is_empty() {
1597        return;
1598    }
1599
1600    // Remove tombstoned VIDs from LanceDB results.
1601    results.retain(|(vid, _)| !tombstoned.contains(vid));
1602
1603    // Overwrite or append L0 candidates.
1604    for (vid, score) in &l0_candidates {
1605        if let Some(existing) = results.iter_mut().find(|(v, _)| v == vid) {
1606            existing.1 = *score;
1607        } else {
1608            results.push((*vid, *score));
1609        }
1610    }
1611
1612    // Re-sort by score descending (higher relevance first).
1613    results.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
1614    results.truncate(k);
1615}