1use crate::compaction::{CompactionStats, CompactionStatus, CompactionTask};
5use crate::lancedb::LanceDbStore;
6use crate::runtime::WorkingGraph;
7use crate::runtime::context::QueryContext;
8use crate::runtime::l0::L0Buffer;
9use crate::storage::adjacency::AdjacencyDataset;
10use crate::storage::compaction::Compactor;
11use crate::storage::delta::{DeltaDataset, ENTRY_SIZE_ESTIMATE, Op};
12use crate::storage::direction::Direction;
13use crate::storage::edge::EdgeDataset;
14use crate::storage::index::UidIndex;
15use crate::storage::inverted_index::InvertedIndex;
16use crate::storage::main_edge::MainEdgeDataset;
17use crate::storage::main_vertex::MainVertexDataset;
18use crate::storage::vertex::VertexDataset;
19use anyhow::{Result, anyhow};
20use arrow_array::{Array, Float32Array, TimestampNanosecondArray, UInt64Array};
21use dashmap::DashMap;
22use futures::TryStreamExt;
23use lancedb::query::{ExecutableQuery, QueryBase, Select};
24use object_store::ObjectStore;
25use object_store::local::LocalFileSystem;
26use parking_lot::RwLock;
27use std::collections::{HashMap, HashSet};
28use std::sync::{Arc, Mutex};
29use std::time::{Duration, SystemTime, UNIX_EPOCH};
30use tracing::warn;
31use uni_common::config::UniConfig;
32use uni_common::core::id::{Eid, UniId, Vid};
33use uni_common::core::schema::{DistanceMetric, IndexDefinition, SchemaManager};
34use uni_common::sync::acquire_mutex;
35
36use crate::snapshot::manager::SnapshotManager;
37use crate::storage::IndexManager;
38use crate::storage::adjacency_manager::AdjacencyManager;
39use crate::storage::resilient_store::ResilientObjectStore;
40
41use uni_common::core::snapshot::SnapshotManifest;
42
43use uni_common::graph::simple_graph::Direction as GraphDirection;
44
45struct EdgeState {
47 neighbor: Vid,
48 version: u64,
49 deleted: bool,
50}
51
52pub struct StorageManager {
53 base_uri: String,
54 store: Arc<dyn ObjectStore>,
55 schema_manager: Arc<SchemaManager>,
56 snapshot_manager: Arc<SnapshotManager>,
57 adjacency_manager: Arc<AdjacencyManager>,
58 table_cache: DashMap<String, lancedb::Table>,
60 pub config: UniConfig,
61 pub compaction_status: Arc<Mutex<CompactionStatus>>,
62 pinned_snapshot: Option<SnapshotManifest>,
64 lancedb_store: Arc<LanceDbStore>,
66 vid_labels_index: Option<Arc<parking_lot::RwLock<crate::storage::vid_labels::VidLabelsIndex>>>,
68}
69
70struct CompactionGuard {
72 status: Arc<Mutex<CompactionStatus>>,
73}
74
75impl CompactionGuard {
76 fn new(status: Arc<Mutex<CompactionStatus>>) -> Option<Self> {
77 let mut s = acquire_mutex(&status, "compaction_status").ok()?;
78 if s.compaction_in_progress {
79 return None;
80 }
81 s.compaction_in_progress = true;
82 Some(Self {
83 status: status.clone(),
84 })
85 }
86}
87
88impl Drop for CompactionGuard {
89 fn drop(&mut self) {
90 match uni_common::sync::acquire_mutex(&self.status, "compaction_status") {
93 Ok(mut s) => {
94 s.compaction_in_progress = false;
95 s.last_compaction = Some(std::time::SystemTime::now());
96 }
97 Err(e) => {
98 log::error!(
102 "CompactionGuard drop failed to acquire poisoned lock: {}. \
103 Compaction status may be inconsistent. Issue #18/#150",
104 e
105 );
106 }
107 }
108 }
109}
110
111impl StorageManager {
112 pub async fn new(base_uri: &str, schema_manager: Arc<SchemaManager>) -> Result<Self> {
114 Self::new_with_config(base_uri, schema_manager, UniConfig::default()).await
115 }
116
117 pub async fn new_with_cache(
119 base_uri: &str,
120 schema_manager: Arc<SchemaManager>,
121 adjacency_cache_size: usize,
122 ) -> Result<Self> {
123 let config = UniConfig {
124 cache_size: adjacency_cache_size,
125 ..Default::default()
126 };
127 Self::new_with_config(base_uri, schema_manager, config).await
128 }
129
130 pub async fn new_with_config(
132 base_uri: &str,
133 schema_manager: Arc<SchemaManager>,
134 config: UniConfig,
135 ) -> Result<Self> {
136 let store = Self::build_store_from_uri(base_uri)?;
137 Self::new_with_store_and_config(base_uri, store, schema_manager, config).await
138 }
139
140 pub async fn new_with_store_and_config(
145 base_uri: &str,
146 store: Arc<dyn ObjectStore>,
147 schema_manager: Arc<SchemaManager>,
148 config: UniConfig,
149 ) -> Result<Self> {
150 Self::new_with_store_and_storage_options(base_uri, store, schema_manager, config, None)
151 .await
152 }
153
154 pub async fn new_with_store_and_storage_options(
157 base_uri: &str,
158 store: Arc<dyn ObjectStore>,
159 schema_manager: Arc<SchemaManager>,
160 config: UniConfig,
161 lancedb_storage_options: Option<HashMap<String, String>>,
162 ) -> Result<Self> {
163 let resilient_store: Arc<dyn ObjectStore> = Arc::new(ResilientObjectStore::new(
164 store,
165 config.object_store.clone(),
166 ));
167
168 let snapshot_manager = Arc::new(SnapshotManager::new(resilient_store.clone()));
169
170 let lancedb_store =
172 LanceDbStore::connect_with_storage_options(base_uri, lancedb_storage_options).await?;
173
174 Self::recover_all_staging_tables(&lancedb_store, &schema_manager).await?;
176
177 let mut sm = Self {
178 base_uri: base_uri.to_string(),
179 store: resilient_store,
180 schema_manager,
181 snapshot_manager,
182 adjacency_manager: Arc::new(AdjacencyManager::new(config.cache_size)),
183 table_cache: DashMap::new(),
184 config,
185 compaction_status: Arc::new(Mutex::new(CompactionStatus::default())),
186 pinned_snapshot: None,
187 lancedb_store: Arc::new(lancedb_store),
188 vid_labels_index: None,
189 };
190
191 if sm.config.enable_vid_labels_index
193 && let Err(e) = sm.rebuild_vid_labels_index().await
194 {
195 warn!(
196 "Failed to rebuild VidLabelsIndex on startup: {}. Falling back to LanceDB queries.",
197 e
198 );
199 }
200
201 Ok(sm)
202 }
203
204 async fn recover_all_staging_tables(
209 lancedb_store: &LanceDbStore,
210 schema_manager: &SchemaManager,
211 ) -> Result<()> {
212 let schema = schema_manager.schema();
213
214 lancedb_store
216 .recover_staging(LanceDbStore::main_vertex_table_name())
217 .await?;
218 lancedb_store
219 .recover_staging(LanceDbStore::main_edge_table_name())
220 .await?;
221
222 for label in schema.labels.keys() {
224 let table_name = LanceDbStore::vertex_table_name(label);
225 lancedb_store.recover_staging(&table_name).await?;
226 }
227
228 for edge_type in schema.edge_types.keys() {
230 for direction in &["fwd", "bwd"] {
231 let delta_table_name = LanceDbStore::delta_table_name(edge_type, direction);
233 lancedb_store.recover_staging(&delta_table_name).await?;
234
235 for _label in schema.labels.keys() {
237 let adj_table_name = LanceDbStore::adjacency_table_name(edge_type, direction);
238 lancedb_store.recover_staging(&adj_table_name).await?;
239 }
240 }
241 }
242
243 Ok(())
244 }
245
246 fn build_store_from_uri(base_uri: &str) -> Result<Arc<dyn ObjectStore>> {
247 if base_uri.contains("://") {
248 let parsed = url::Url::parse(base_uri).map_err(|e| anyhow!("Invalid base URI: {e}"))?;
249 let (store, _path) = object_store::parse_url(&parsed)
250 .map_err(|e| anyhow!("Failed to parse object store URL: {e}"))?;
251 Ok(Arc::from(store))
252 } else {
253 std::fs::create_dir_all(base_uri)?;
255 Ok(Arc::new(LocalFileSystem::new_with_prefix(base_uri)?))
256 }
257 }
258
259 pub fn pinned(&self, snapshot: SnapshotManifest) -> Self {
260 Self {
261 base_uri: self.base_uri.clone(),
262 store: self.store.clone(),
263 schema_manager: self.schema_manager.clone(),
264 snapshot_manager: self.snapshot_manager.clone(),
265 adjacency_manager: Arc::new(AdjacencyManager::new(self.adjacency_manager.max_bytes())),
269 table_cache: DashMap::new(),
270 config: self.config.clone(),
271 compaction_status: Arc::new(Mutex::new(CompactionStatus::default())),
272 pinned_snapshot: Some(snapshot),
273 lancedb_store: self.lancedb_store.clone(),
274 vid_labels_index: self.vid_labels_index.clone(),
275 }
276 }
277
278 pub fn get_edge_version_by_id(&self, edge_type_id: u32) -> Option<u64> {
279 let schema = self.schema_manager.schema();
280 let name = schema.edge_type_name_by_id(edge_type_id)?;
281 self.pinned_snapshot
282 .as_ref()
283 .and_then(|s| s.edges.get(name).map(|es| es.lance_version))
284 }
285
286 pub fn version_high_water_mark(&self) -> Option<u64> {
292 self.pinned_snapshot
293 .as_ref()
294 .map(|s| s.version_high_water_mark)
295 }
296
297 pub fn apply_version_filter(&self, base_filter: String) -> String {
302 if let Some(hwm) = self.version_high_water_mark() {
303 format!("({}) AND (_version <= {})", base_filter, hwm)
304 } else {
305 base_filter
306 }
307 }
308
309 fn build_active_filter(user_filter: Option<&str>) -> String {
312 match user_filter {
313 Some(expr) => format!("({}) AND (_deleted = false)", expr),
314 None => "_deleted = false".to_string(),
315 }
316 }
317
318 pub fn store(&self) -> Arc<dyn ObjectStore> {
319 self.store.clone()
320 }
321
322 pub fn compaction_status(
328 &self,
329 ) -> Result<CompactionStatus, uni_common::sync::LockPoisonedError> {
330 let guard = uni_common::sync::acquire_mutex(&self.compaction_status, "compaction_status")?;
331 Ok(guard.clone())
332 }
333
334 pub async fn compact(&self) -> Result<CompactionStats> {
335 let start = std::time::Instant::now();
338 let schema = self.schema_manager.schema();
339 let mut files_compacted = 0;
340
341 for label in schema.labels.keys() {
342 let table_name = LanceDbStore::vertex_table_name(label);
343 if self.lancedb_store.table_exists(&table_name).await? {
344 let table = self.lancedb_store.open_table(&table_name).await?;
345 table.optimize(lancedb::table::OptimizeAction::All).await?;
346 files_compacted += 1;
347 self.invalidate_table_cache(label);
348 }
349 }
350
351 Ok(CompactionStats {
352 files_compacted,
353 bytes_before: 0,
354 bytes_after: 0,
355 duration: start.elapsed(),
356 crdt_merges: 0,
357 })
358 }
359
360 pub async fn compact_label(&self, label: &str) -> Result<CompactionStats> {
361 let _guard = CompactionGuard::new(self.compaction_status.clone())
362 .ok_or_else(|| anyhow!("Compaction already in progress"))?;
363
364 let start = std::time::Instant::now();
365 let table_name = LanceDbStore::vertex_table_name(label);
366
367 if self.lancedb_store.table_exists(&table_name).await? {
368 let table = self.lancedb_store.open_table(&table_name).await?;
369 table.optimize(lancedb::table::OptimizeAction::All).await?;
370 self.invalidate_table_cache(label);
371 }
372
373 Ok(CompactionStats {
374 files_compacted: 1,
375 bytes_before: 0,
376 bytes_after: 0,
377 duration: start.elapsed(),
378 crdt_merges: 0,
379 })
380 }
381
382 pub async fn compact_edge_type(&self, edge_type: &str) -> Result<CompactionStats> {
383 let _guard = CompactionGuard::new(self.compaction_status.clone())
384 .ok_or_else(|| anyhow!("Compaction already in progress"))?;
385
386 let start = std::time::Instant::now();
387 let mut files_compacted = 0;
388
389 for dir in ["fwd", "bwd"] {
390 let table_name = LanceDbStore::delta_table_name(edge_type, dir);
391 if self.lancedb_store.table_exists(&table_name).await? {
392 let table = self.lancedb_store.open_table(&table_name).await?;
393 table.optimize(lancedb::table::OptimizeAction::All).await?;
394 files_compacted += 1;
395 }
396 }
397
398 Ok(CompactionStats {
399 files_compacted,
400 bytes_before: 0,
401 bytes_after: 0,
402 duration: start.elapsed(),
403 crdt_merges: 0,
404 })
405 }
406
407 pub async fn wait_for_compaction(&self) -> Result<()> {
408 loop {
409 let in_progress = {
410 acquire_mutex(&self.compaction_status, "compaction_status")?.compaction_in_progress
411 };
412 if !in_progress {
413 return Ok(());
414 }
415 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
416 }
417 }
418
419 pub fn start_background_compaction(
420 self: Arc<Self>,
421 mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
422 ) -> tokio::task::JoinHandle<()> {
423 if !self.config.compaction.enabled {
424 return tokio::spawn(async {});
425 }
426
427 tokio::spawn(async move {
428 let mut interval = tokio::time::interval(self.config.compaction.check_interval);
429
430 loop {
431 tokio::select! {
432 _ = interval.tick() => {
433 if let Err(e) = self.update_compaction_status().await {
434 log::error!("Failed to update compaction status: {}", e);
435 continue;
436 }
437
438 if let Some(task) = self.pick_compaction_task() {
439 log::info!("Triggering background compaction: {:?}", task);
440 if let Err(e) = Self::execute_compaction(Arc::clone(&self), task).await {
441 log::error!("Compaction failed: {}", e);
442 }
443 }
444 }
445 _ = shutdown_rx.recv() => {
446 log::info!("Background compaction shutting down");
447 let _ = self.wait_for_compaction().await;
448 break;
449 }
450 }
451 }
452 })
453 }
454
455 async fn update_compaction_status(&self) -> Result<()> {
456 let schema = self.schema_manager.schema();
457 let mut total_tables = 0;
458 let mut total_rows: usize = 0;
459 let mut oldest_ts: Option<i64> = None;
460
461 for name in schema.edge_types.keys() {
462 for dir in ["fwd", "bwd"] {
463 let table_name = LanceDbStore::delta_table_name(name, dir);
464 let Ok(table) = self.lancedb_store.open_table(&table_name).await else {
465 continue;
466 };
467 let row_count = table.count_rows(None).await.unwrap_or(0);
468 if row_count == 0 {
469 continue;
470 }
471 total_tables += 1;
472 total_rows += row_count;
473
474 let Ok(stream) = table
476 .query()
477 .select(Select::Columns(vec!["_created_at".to_string()]))
478 .execute()
479 .await
480 else {
481 continue;
482 };
483 let Ok(batches) = stream.try_collect::<Vec<_>>().await else {
484 continue;
485 };
486 for batch in batches {
487 let Some(col) = batch
488 .column_by_name("_created_at")
489 .and_then(|c| c.as_any().downcast_ref::<TimestampNanosecondArray>())
490 else {
491 continue;
492 };
493 for i in 0..col.len() {
494 if !col.is_null(i) {
495 let ts = col.value(i);
496 oldest_ts = Some(oldest_ts.map_or(ts, |prev| prev.min(ts)));
497 }
498 }
499 }
500 }
501 }
502
503 let oldest_l1_age = oldest_ts
504 .and_then(|ts| {
505 let created = UNIX_EPOCH + Duration::from_nanos(ts as u64);
506 SystemTime::now().duration_since(created).ok()
507 })
508 .unwrap_or(Duration::ZERO);
509
510 let mut status = acquire_mutex(&self.compaction_status, "compaction_status")?;
511 status.l1_runs = total_tables;
512 status.l1_size_bytes = (total_rows * ENTRY_SIZE_ESTIMATE) as u64;
513 status.oldest_l1_age = oldest_l1_age;
514 Ok(())
515 }
516
517 fn pick_compaction_task(&self) -> Option<CompactionTask> {
518 let status = acquire_mutex(&self.compaction_status, "compaction_status").ok()?;
519
520 if status.l1_runs >= self.config.compaction.max_l1_runs {
521 return Some(CompactionTask::ByRunCount);
522 }
523 if status.l1_size_bytes >= self.config.compaction.max_l1_size_bytes {
524 return Some(CompactionTask::BySize);
525 }
526 if status.oldest_l1_age >= self.config.compaction.max_l1_age
527 && status.oldest_l1_age > Duration::ZERO
528 {
529 return Some(CompactionTask::ByAge);
530 }
531
532 None
533 }
534
535 async fn optimize_table(store: &LanceDbStore, table_name: &str) -> bool {
537 let Ok(table) = store.open_table(table_name).await else {
538 return false;
539 };
540 if let Err(e) = table.optimize(lancedb::table::OptimizeAction::All).await {
541 log::warn!("Failed to optimize table {}: {}", table_name, e);
542 return false;
543 }
544 true
545 }
546
547 async fn execute_compaction(this: Arc<Self>, _task: CompactionTask) -> Result<CompactionStats> {
548 let start = std::time::Instant::now();
549 let _guard = CompactionGuard::new(this.compaction_status.clone())
550 .ok_or_else(|| anyhow!("Compaction already in progress"))?;
551
552 let schema = this.schema_manager.schema();
553 let mut files_compacted = 0;
554
555 let compactor = Compactor::new(Arc::clone(&this));
558 let compaction_results = compactor.compact_all().await.unwrap_or_else(|e| {
559 log::error!(
560 "Semantic compaction failed (continuing with Lance optimize): {}",
561 e
562 );
563 Vec::new()
564 });
565
566 let am = this.adjacency_manager();
568 for info in &compaction_results {
569 let direction = match info.direction.as_str() {
570 "fwd" => Direction::Outgoing,
571 "bwd" => Direction::Incoming,
572 _ => continue,
573 };
574 if let Some(etid) = schema.edge_type_id_unified_case_insensitive(&info.edge_type)
575 && let Err(e) = am.warm(&this, etid, direction, None).await
576 {
577 log::warn!(
578 "Failed to re-warm adjacency for {}/{}: {}",
579 info.edge_type,
580 info.direction,
581 e
582 );
583 }
584 }
585
586 let store = &this.lancedb_store;
588
589 for name in schema.edge_types.keys() {
591 for dir in ["fwd", "bwd"] {
592 let delta = LanceDbStore::delta_table_name(name, dir);
593 if Self::optimize_table(store, &delta).await {
594 files_compacted += 1;
595 }
596 let adj = LanceDbStore::adjacency_table_name(name, dir);
597 if Self::optimize_table(store, &adj).await {
598 files_compacted += 1;
599 }
600 }
601 }
602
603 for label in schema.labels.keys() {
605 let table_name = LanceDbStore::vertex_table_name(label);
606 if Self::optimize_table(store, &table_name).await {
607 files_compacted += 1;
608 this.invalidate_table_cache(label);
609 }
610 }
611
612 for table_name in [
614 LanceDbStore::main_vertex_table_name(),
615 LanceDbStore::main_edge_table_name(),
616 ] {
617 if Self::optimize_table(store, table_name).await {
618 files_compacted += 1;
619 }
620 }
621
622 {
623 let mut status = acquire_mutex(&this.compaction_status, "compaction_status")?;
624 status.total_compactions += 1;
625 }
626
627 Ok(CompactionStats {
628 files_compacted,
629 bytes_before: 0,
630 bytes_after: 0,
631 duration: start.elapsed(),
632 crdt_merges: 0,
633 })
634 }
635
636 pub async fn get_cached_table(&self, label: &str) -> Result<lancedb::Table> {
638 if let Some(table) = self.table_cache.get(label) {
640 return Ok(table.clone());
641 }
642
643 let table_name = LanceDbStore::vertex_table_name(label);
645 let table = self.lancedb_store.open_table(&table_name).await?;
646
647 self.table_cache.insert(label.to_string(), table.clone());
648 Ok(table)
649 }
650
651 pub fn invalidate_table_cache(&self, label: &str) {
653 self.table_cache.remove(label);
654 }
655
656 pub fn clear_table_cache(&self) {
658 self.table_cache.clear();
659 }
660
661 pub fn base_path(&self) -> &str {
662 &self.base_uri
663 }
664
665 pub fn schema_manager(&self) -> &SchemaManager {
666 &self.schema_manager
667 }
668
669 pub fn schema_manager_arc(&self) -> Arc<SchemaManager> {
670 self.schema_manager.clone()
671 }
672
673 pub fn adjacency_manager(&self) -> Arc<AdjacencyManager> {
675 Arc::clone(&self.adjacency_manager)
676 }
677
678 pub async fn warm_adjacency(
683 &self,
684 edge_type_id: u32,
685 direction: crate::storage::direction::Direction,
686 version: Option<u64>,
687 ) -> anyhow::Result<()> {
688 self.adjacency_manager
689 .warm(self, edge_type_id, direction, version)
690 .await
691 }
692
693 pub async fn warm_adjacency_coalesced(
698 &self,
699 edge_type_id: u32,
700 direction: crate::storage::direction::Direction,
701 version: Option<u64>,
702 ) -> anyhow::Result<()> {
703 self.adjacency_manager
704 .warm_coalesced(self, edge_type_id, direction, version)
705 .await
706 }
707
708 pub fn has_adjacency_csr(
710 &self,
711 edge_type_id: u32,
712 direction: crate::storage::direction::Direction,
713 ) -> bool {
714 self.adjacency_manager.has_csr(edge_type_id, direction)
715 }
716
717 pub fn get_neighbors_at_version(
719 &self,
720 vid: uni_common::core::id::Vid,
721 edge_type: u32,
722 direction: crate::storage::direction::Direction,
723 version: u64,
724 ) -> Vec<(uni_common::core::id::Vid, uni_common::core::id::Eid)> {
725 self.adjacency_manager
726 .get_neighbors_at_version(vid, edge_type, direction, version)
727 }
728
729 pub fn lancedb_store(&self) -> &LanceDbStore {
731 &self.lancedb_store
732 }
733
734 pub fn lancedb_store_arc(&self) -> Arc<LanceDbStore> {
736 self.lancedb_store.clone()
737 }
738
739 async fn rebuild_vid_labels_index(&mut self) -> Result<()> {
742 use crate::lancedb::LanceDbStore;
743 use crate::storage::vid_labels::VidLabelsIndex;
744
745 let lancedb_store = self.lancedb_store();
746
747 let table = match lancedb_store
749 .open_table(LanceDbStore::main_vertex_table_name())
750 .await
751 {
752 Ok(t) => t,
753 Err(_) => {
754 self.vid_labels_index =
756 Some(Arc::new(parking_lot::RwLock::new(VidLabelsIndex::new())));
757 return Ok(());
758 }
759 };
760
761 let batches = table
763 .query()
764 .only_if("_deleted = false")
765 .limit(100_000) .execute()
767 .await
768 .map_err(|e| anyhow!("Failed to query main vertex table: {}", e))?
769 .try_collect::<Vec<_>>()
770 .await
771 .map_err(|e| anyhow!("Failed to collect vertex data: {}", e))?;
772
773 let mut index = VidLabelsIndex::new();
774 for batch in batches {
775 let vid_col = batch
776 .column_by_name("_vid")
777 .ok_or_else(|| anyhow!("Missing _vid column"))?
778 .as_any()
779 .downcast_ref::<UInt64Array>()
780 .ok_or_else(|| anyhow!("Invalid _vid column type"))?;
781
782 let labels_col = batch
783 .column_by_name("labels")
784 .ok_or_else(|| anyhow!("Missing labels column"))?
785 .as_any()
786 .downcast_ref::<arrow_array::ListArray>()
787 .ok_or_else(|| anyhow!("Invalid labels column type"))?;
788
789 for row_idx in 0..batch.num_rows() {
790 let vid = Vid::from(vid_col.value(row_idx));
791 let labels_array = labels_col.value(row_idx);
792 let labels_str_array = labels_array
793 .as_any()
794 .downcast_ref::<arrow_array::StringArray>()
795 .ok_or_else(|| anyhow!("Invalid labels array element type"))?;
796
797 let labels: Vec<String> = (0..labels_str_array.len())
798 .map(|i| labels_str_array.value(i).to_string())
799 .collect();
800
801 index.insert(vid, labels);
802 }
803 }
804
805 self.vid_labels_index = Some(Arc::new(parking_lot::RwLock::new(index)));
806 Ok(())
807 }
808
809 pub fn get_labels_from_index(&self, vid: Vid) -> Option<Vec<String>> {
812 self.vid_labels_index.as_ref().and_then(|idx| {
813 let index = idx.read();
814 index.get_labels(vid).map(|labels| labels.to_vec())
815 })
816 }
817
818 pub fn update_vid_labels_index(&self, vid: Vid, labels: Vec<String>) {
821 if let Some(idx) = &self.vid_labels_index {
822 let mut index = idx.write();
823 index.insert(vid, labels);
824 }
825 }
826
827 pub fn remove_from_vid_labels_index(&self, vid: Vid) {
830 if let Some(idx) = &self.vid_labels_index {
831 let mut index = idx.write();
832 index.remove_vid(vid);
833 }
834 }
835
836 pub async fn load_subgraph_cached(
837 &self,
838 start_vids: &[Vid],
839 edge_types: &[u32],
840 max_hops: usize,
841 direction: GraphDirection,
842 _l0: Option<Arc<RwLock<L0Buffer>>>,
843 ) -> Result<WorkingGraph> {
844 let mut graph = WorkingGraph::new();
845
846 let dir = match direction {
847 GraphDirection::Outgoing => crate::storage::direction::Direction::Outgoing,
848 GraphDirection::Incoming => crate::storage::direction::Direction::Incoming,
849 };
850
851 let neighbor_is_dst = matches!(direction, GraphDirection::Outgoing);
852
853 let mut frontier: Vec<Vid> = start_vids.to_vec();
855 let mut visited: HashSet<Vid> = HashSet::new();
856
857 for &vid in start_vids {
859 graph.add_vertex(vid);
860 }
861
862 for _hop in 0..max_hops {
863 let mut next_frontier = HashSet::new();
864
865 for &vid in &frontier {
866 if visited.contains(&vid) {
867 continue;
868 }
869 visited.insert(vid);
870 graph.add_vertex(vid);
871
872 for &etype_id in edge_types {
873 let edge_ver = self.version_high_water_mark();
875 self.adjacency_manager
876 .warm_coalesced(self, etype_id, dir, edge_ver)
877 .await?;
878
879 let edges = self.adjacency_manager.get_neighbors(vid, etype_id, dir);
881
882 for (neighbor_vid, eid) in edges {
883 graph.add_vertex(neighbor_vid);
884 if !visited.contains(&neighbor_vid) {
885 next_frontier.insert(neighbor_vid);
886 }
887
888 if neighbor_is_dst {
889 graph.add_edge(vid, neighbor_vid, eid, etype_id);
890 } else {
891 graph.add_edge(neighbor_vid, vid, eid, etype_id);
892 }
893 }
894 }
895 }
896 frontier = next_frontier.into_iter().collect();
897
898 if frontier.is_empty() {
900 break;
901 }
902 }
903
904 Ok(graph)
905 }
906
907 pub fn snapshot_manager(&self) -> &SnapshotManager {
908 &self.snapshot_manager
909 }
910
911 pub fn index_manager(&self) -> IndexManager {
912 IndexManager::new(
913 &self.base_uri,
914 self.schema_manager.clone(),
915 self.lancedb_store.clone(),
916 )
917 }
918
919 pub fn vertex_dataset(&self, label: &str) -> Result<VertexDataset> {
920 let schema = self.schema_manager.schema();
921 let label_meta = schema
922 .labels
923 .get(label)
924 .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
925 Ok(VertexDataset::new(&self.base_uri, label, label_meta.id))
926 }
927
928 pub fn edge_dataset(
929 &self,
930 edge_type: &str,
931 src_label: &str,
932 dst_label: &str,
933 ) -> Result<EdgeDataset> {
934 Ok(EdgeDataset::new(
935 &self.base_uri,
936 edge_type,
937 src_label,
938 dst_label,
939 ))
940 }
941
942 pub fn delta_dataset(&self, edge_type: &str, direction: &str) -> Result<DeltaDataset> {
943 Ok(DeltaDataset::new(&self.base_uri, edge_type, direction))
944 }
945
946 pub fn adjacency_dataset(
947 &self,
948 edge_type: &str,
949 label: &str,
950 direction: &str,
951 ) -> Result<AdjacencyDataset> {
952 Ok(AdjacencyDataset::new(
953 &self.base_uri,
954 edge_type,
955 label,
956 direction,
957 ))
958 }
959
960 pub fn main_vertex_dataset(&self) -> MainVertexDataset {
965 MainVertexDataset::new(&self.base_uri)
966 }
967
968 pub fn main_edge_dataset(&self) -> MainEdgeDataset {
973 MainEdgeDataset::new(&self.base_uri)
974 }
975
976 pub fn uid_index(&self, label: &str) -> Result<UidIndex> {
977 Ok(UidIndex::new(&self.base_uri, label))
978 }
979
980 pub async fn inverted_index(&self, label: &str, property: &str) -> Result<InvertedIndex> {
981 let schema = self.schema_manager.schema();
982 let config = schema
983 .indexes
984 .iter()
985 .find_map(|idx| match idx {
986 IndexDefinition::Inverted(cfg)
987 if cfg.label == label && cfg.property == property =>
988 {
989 Some(cfg.clone())
990 }
991 _ => None,
992 })
993 .ok_or_else(|| anyhow!("Inverted index not found for {}.{}", label, property))?;
994
995 InvertedIndex::new(&self.base_uri, config).await
996 }
997
998 pub async fn vector_search(
999 &self,
1000 label: &str,
1001 property: &str,
1002 query: &[f32],
1003 k: usize,
1004 filter: Option<&str>,
1005 ctx: Option<&QueryContext>,
1006 ) -> Result<Vec<(Vid, f32)>> {
1007 let schema = self.schema_manager.schema();
1009 let metric = schema
1010 .vector_index_for_property(label, property)
1011 .map(|config| config.metric.clone())
1012 .unwrap_or(DistanceMetric::L2);
1013
1014 let table = self.get_cached_table(label).await.ok();
1017
1018 let mut results = Vec::new();
1019
1020 if let Some(table) = table {
1021 let distance_type = match &metric {
1022 DistanceMetric::L2 => lancedb::DistanceType::L2,
1023 DistanceMetric::Cosine => lancedb::DistanceType::Cosine,
1024 DistanceMetric::Dot => lancedb::DistanceType::Dot,
1025 _ => lancedb::DistanceType::L2,
1026 };
1027
1028 let mut query_builder = table
1030 .vector_search(query.to_vec())
1031 .map_err(|e| anyhow!("Failed to create vector search: {}", e))?
1032 .column(property)
1033 .distance_type(distance_type)
1034 .limit(k);
1035
1036 query_builder = query_builder.only_if(Self::build_active_filter(filter));
1037
1038 if ctx.is_some()
1040 && let Some(hwm) = self.version_high_water_mark()
1041 {
1042 query_builder = query_builder.only_if(format!("_version <= {}", hwm));
1043 }
1044
1045 let batches = query_builder
1046 .execute()
1047 .await
1048 .map_err(|e| anyhow!("Vector search execution failed: {}", e))?
1049 .try_collect::<Vec<_>>()
1050 .await
1051 .map_err(|e| anyhow!("Failed to collect vector search results: {}", e))?;
1052
1053 results = extract_vid_score_pairs(&batches, "_vid", "_distance")?;
1054 }
1055
1056 if let Some(qctx) = ctx {
1058 merge_l0_into_vector_results(&mut results, qctx, label, property, query, k, &metric);
1059 }
1060
1061 Ok(results)
1062 }
1063
1064 pub async fn fts_search(
1080 &self,
1081 label: &str,
1082 property: &str,
1083 query: &str,
1084 k: usize,
1085 filter: Option<&str>,
1086 ctx: Option<&QueryContext>,
1087 ) -> Result<Vec<(Vid, f32)>> {
1088 use lance_index::scalar::FullTextSearchQuery;
1089 use lance_index::scalar::inverted::query::MatchQuery;
1090
1091 let table = match self.get_cached_table(label).await {
1094 Ok(t) => t,
1095 Err(_) => return Ok(Vec::new()),
1096 };
1097
1098 let match_query =
1100 MatchQuery::new(query.to_string()).with_column(Some(property.to_string()));
1101 let fts_query = FullTextSearchQuery {
1102 query: match_query.into(),
1103 limit: Some(k as i64),
1104 wand_factor: None,
1105 };
1106
1107 let mut query_builder = table.query().full_text_search(fts_query).limit(k);
1108
1109 query_builder = query_builder.only_if(Self::build_active_filter(filter));
1110
1111 if ctx.is_some()
1113 && let Some(hwm) = self.version_high_water_mark()
1114 {
1115 query_builder = query_builder.only_if(format!("_version <= {}", hwm));
1116 }
1117
1118 let batches = query_builder
1119 .execute()
1120 .await
1121 .map_err(|e| anyhow!("FTS search execution failed: {}", e))?
1122 .try_collect::<Vec<_>>()
1123 .await
1124 .map_err(|e| anyhow!("Failed to collect FTS search results: {}", e))?;
1125
1126 let mut results = extract_vid_score_pairs(&batches, "_vid", "_score")?;
1127
1128 results.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
1130
1131 Ok(results)
1132 }
1133
1134 pub async fn get_vertex_by_uid(&self, uid: &UniId, label: &str) -> Result<Option<Vid>> {
1135 let index = self.uid_index(label)?;
1136 index.get_vid(uid).await
1137 }
1138
1139 pub async fn insert_vertex_with_uid(&self, label: &str, vid: Vid, uid: UniId) -> Result<()> {
1140 let index = self.uid_index(label)?;
1141 index.write_mapping(&[(uid, vid)]).await
1142 }
1143
1144 pub async fn load_subgraph(
1145 &self,
1146 start_vids: &[Vid],
1147 edge_types: &[u32],
1148 max_hops: usize,
1149 direction: GraphDirection,
1150 l0: Option<&L0Buffer>,
1151 ) -> Result<WorkingGraph> {
1152 let mut graph = WorkingGraph::new();
1153 let schema = self.schema_manager.schema();
1154
1155 let label_map: HashMap<u16, String> = schema
1157 .labels
1158 .values()
1159 .map(|meta| {
1160 (
1161 meta.id,
1162 schema.label_name_by_id(meta.id).unwrap().to_owned(),
1163 )
1164 })
1165 .collect();
1166
1167 let edge_type_map: HashMap<u32, String> = schema
1168 .edge_types
1169 .values()
1170 .map(|meta| {
1171 (
1172 meta.id,
1173 schema.edge_type_name_by_id(meta.id).unwrap().to_owned(),
1174 )
1175 })
1176 .collect();
1177
1178 let target_edge_types: HashSet<u32> = edge_types.iter().cloned().collect();
1179
1180 let mut frontier: Vec<Vid> = start_vids.to_vec();
1182 let mut visited: HashSet<Vid> = HashSet::new();
1183
1184 for &vid in start_vids {
1186 graph.add_vertex(vid);
1187 }
1188
1189 for _hop in 0..max_hops {
1190 let mut next_frontier = HashSet::new();
1191
1192 for &vid in &frontier {
1193 if visited.contains(&vid) {
1194 continue;
1195 }
1196 visited.insert(vid);
1197 graph.add_vertex(vid);
1198
1199 for &etype_id in &target_edge_types {
1201 let etype_name = edge_type_map
1202 .get(&etype_id)
1203 .ok_or_else(|| anyhow!("Unknown edge type ID: {}", etype_id))?;
1204
1205 let (dir_str, neighbor_is_dst) = match direction {
1209 GraphDirection::Outgoing => ("fwd", true),
1210 GraphDirection::Incoming => ("bwd", false),
1211 };
1212
1213 let mut edges: HashMap<Eid, EdgeState> = HashMap::new();
1214
1215 let _edge_ver = self
1220 .pinned_snapshot
1221 .as_ref()
1222 .and_then(|s| s.edges.get(etype_name).map(|es| es.lance_version));
1223
1224 let lancedb_store = self.lancedb_store();
1226 for current_src_label in label_map.values() {
1227 let adj_ds =
1228 match self.adjacency_dataset(etype_name, current_src_label, dir_str) {
1229 Ok(ds) => ds,
1230 Err(_) => continue,
1231 };
1232 if let Some((neighbors, eids)) =
1233 adj_ds.read_adjacency_lancedb(lancedb_store, vid).await?
1234 {
1235 for (n, eid) in neighbors.into_iter().zip(eids) {
1236 edges.insert(
1237 eid,
1238 EdgeState {
1239 neighbor: n,
1240 version: 0,
1241 deleted: false,
1242 },
1243 );
1244 }
1245 break; }
1247 }
1248
1249 let delta_ds = self.delta_dataset(etype_name, dir_str)?;
1251 let delta_entries = delta_ds
1252 .read_deltas_lancedb(
1253 lancedb_store,
1254 vid,
1255 &schema,
1256 self.version_high_water_mark(),
1257 )
1258 .await?;
1259 Self::apply_delta_to_edges(&mut edges, delta_entries, neighbor_is_dst);
1260
1261 if let Some(l0) = l0 {
1263 Self::apply_l0_to_edges(&mut edges, l0, vid, etype_id, direction);
1264 }
1265
1266 Self::add_edges_to_graph(
1268 &mut graph,
1269 edges,
1270 vid,
1271 etype_id,
1272 neighbor_is_dst,
1273 &visited,
1274 &mut next_frontier,
1275 );
1276 }
1277 }
1278 frontier = next_frontier.into_iter().collect();
1279
1280 if frontier.is_empty() {
1282 break;
1283 }
1284 }
1285
1286 Ok(graph)
1287 }
1288
1289 fn apply_delta_to_edges(
1291 edges: &mut HashMap<Eid, EdgeState>,
1292 delta_entries: Vec<crate::storage::delta::L1Entry>,
1293 neighbor_is_dst: bool,
1294 ) {
1295 for entry in delta_entries {
1296 let neighbor = if neighbor_is_dst {
1297 entry.dst_vid
1298 } else {
1299 entry.src_vid
1300 };
1301 let current_ver = edges.get(&entry.eid).map(|s| s.version).unwrap_or(0);
1302
1303 if entry.version > current_ver {
1304 edges.insert(
1305 entry.eid,
1306 EdgeState {
1307 neighbor,
1308 version: entry.version,
1309 deleted: matches!(entry.op, Op::Delete),
1310 },
1311 );
1312 }
1313 }
1314 }
1315
1316 fn apply_l0_to_edges(
1318 edges: &mut HashMap<Eid, EdgeState>,
1319 l0: &L0Buffer,
1320 vid: Vid,
1321 etype_id: u32,
1322 direction: GraphDirection,
1323 ) {
1324 let l0_neighbors = l0.get_neighbors(vid, etype_id, direction);
1325 for (neighbor, eid, ver) in l0_neighbors {
1326 let current_ver = edges.get(&eid).map(|s| s.version).unwrap_or(0);
1327 if ver > current_ver {
1328 edges.insert(
1329 eid,
1330 EdgeState {
1331 neighbor,
1332 version: ver,
1333 deleted: false,
1334 },
1335 );
1336 }
1337 }
1338
1339 for (eid, state) in edges.iter_mut() {
1341 if l0.is_tombstoned(*eid) {
1342 state.deleted = true;
1343 }
1344 }
1345 }
1346
1347 fn add_edges_to_graph(
1349 graph: &mut WorkingGraph,
1350 edges: HashMap<Eid, EdgeState>,
1351 vid: Vid,
1352 etype_id: u32,
1353 neighbor_is_dst: bool,
1354 visited: &HashSet<Vid>,
1355 next_frontier: &mut HashSet<Vid>,
1356 ) {
1357 for (eid, state) in edges {
1358 if state.deleted {
1359 continue;
1360 }
1361 graph.add_vertex(state.neighbor);
1362
1363 if !visited.contains(&state.neighbor) {
1364 next_frontier.insert(state.neighbor);
1365 }
1366
1367 if neighbor_is_dst {
1368 graph.add_edge(vid, state.neighbor, eid, etype_id);
1369 } else {
1370 graph.add_edge(state.neighbor, vid, eid, etype_id);
1371 }
1372 }
1373 }
1374}
1375
1376fn extract_vid_score_pairs(
1378 batches: &[arrow_array::RecordBatch],
1379 vid_column: &str,
1380 score_column: &str,
1381) -> Result<Vec<(Vid, f32)>> {
1382 let mut results = Vec::new();
1383 for batch in batches {
1384 let vid_col = batch
1385 .column_by_name(vid_column)
1386 .ok_or_else(|| anyhow!("Missing {} column", vid_column))?
1387 .as_any()
1388 .downcast_ref::<UInt64Array>()
1389 .ok_or_else(|| anyhow!("Invalid {} column type", vid_column))?;
1390
1391 let score_col = batch
1392 .column_by_name(score_column)
1393 .ok_or_else(|| anyhow!("Missing {} column", score_column))?
1394 .as_any()
1395 .downcast_ref::<Float32Array>()
1396 .ok_or_else(|| anyhow!("Invalid {} column type", score_column))?;
1397
1398 for i in 0..batch.num_rows() {
1399 results.push((Vid::from(vid_col.value(i)), score_col.value(i)));
1400 }
1401 }
1402 Ok(results)
1403}
1404
1405fn extract_embedding_from_props(
1410 props: &uni_common::Properties,
1411 property: &str,
1412) -> Option<Vec<f32>> {
1413 let arr = props.get(property)?.as_array()?;
1414 arr.iter().map(|v| v.as_f64().map(|f| f as f32)).collect()
1415}
1416
1417fn merge_l0_into_vector_results(
1427 results: &mut Vec<(Vid, f32)>,
1428 ctx: &QueryContext,
1429 label: &str,
1430 property: &str,
1431 query: &[f32],
1432 k: usize,
1433 metric: &DistanceMetric,
1434) {
1435 let mut buffers: Vec<Arc<parking_lot::RwLock<L0Buffer>>> =
1437 ctx.pending_flush_l0s.iter().map(Arc::clone).collect();
1438 buffers.push(Arc::clone(&ctx.l0));
1439 if let Some(ref txn) = ctx.transaction_l0 {
1440 buffers.push(Arc::clone(txn));
1441 }
1442
1443 let mut l0_candidates: HashMap<Vid, f32> = HashMap::new();
1445 let mut tombstoned: HashSet<Vid> = HashSet::new();
1447
1448 for buf_arc in &buffers {
1449 let buf = buf_arc.read();
1450
1451 for &vid in &buf.vertex_tombstones {
1453 tombstoned.insert(vid);
1454 }
1455
1456 for (&vid, labels) in &buf.vertex_labels {
1458 if !labels.iter().any(|l| l == label) {
1459 continue;
1460 }
1461 if let Some(props) = buf.vertex_properties.get(&vid)
1462 && let Some(emb) = extract_embedding_from_props(props, property)
1463 {
1464 if emb.len() != query.len() {
1465 continue; }
1467 let dist = metric.compute_distance(&emb, query);
1468 l0_candidates.insert(vid, dist);
1470 tombstoned.remove(&vid);
1472 }
1473 }
1474 }
1475
1476 if l0_candidates.is_empty() && tombstoned.is_empty() {
1478 return;
1479 }
1480
1481 results.retain(|(vid, _)| !tombstoned.contains(vid));
1483
1484 for (vid, dist) in &l0_candidates {
1486 if let Some(existing) = results.iter_mut().find(|(v, _)| v == vid) {
1487 existing.1 = *dist;
1488 } else {
1489 results.push((*vid, *dist));
1490 }
1491 }
1492
1493 results.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
1495 results.truncate(k);
1496}