1use crate::compaction::{CompactionStats, CompactionStatus, CompactionTask};
5use crate::lancedb::LanceDbStore;
6use crate::runtime::WorkingGraph;
7use crate::runtime::context::QueryContext;
8use crate::runtime::l0::L0Buffer;
9use crate::storage::adjacency::AdjacencyDataset;
10use crate::storage::delta::{DeltaDataset, Op};
11use crate::storage::edge::EdgeDataset;
12use crate::storage::index::UidIndex;
13use crate::storage::inverted_index::InvertedIndex;
14use crate::storage::main_edge::MainEdgeDataset;
15use crate::storage::main_vertex::MainVertexDataset;
16use crate::storage::vertex::VertexDataset;
17use anyhow::{Result, anyhow};
18use arrow_array::{Array, Float32Array, UInt64Array};
19use dashmap::DashMap;
20use futures::TryStreamExt;
21use lancedb::query::{ExecutableQuery, QueryBase};
22use object_store::ObjectStore;
23use object_store::local::LocalFileSystem;
24use parking_lot::RwLock;
25use std::collections::{HashMap, HashSet};
26use std::sync::{Arc, Mutex};
27use tracing::warn;
28use uni_common::config::UniConfig;
29use uni_common::core::id::{Eid, UniId, Vid};
30use uni_common::core::schema::{DistanceMetric, IndexDefinition, SchemaManager};
31use uni_common::sync::acquire_mutex;
32
33use crate::snapshot::manager::SnapshotManager;
34use crate::storage::IndexManager;
35use crate::storage::adjacency_manager::AdjacencyManager;
36use crate::storage::resilient_store::ResilientObjectStore;
37
38use uni_common::core::snapshot::SnapshotManifest;
39
40use uni_common::graph::simple_graph::Direction as GraphDirection;
41
42struct EdgeState {
44 neighbor: Vid,
45 version: u64,
46 deleted: bool,
47}
48
49pub struct StorageManager {
50 base_uri: String,
51 store: Arc<dyn ObjectStore>,
52 schema_manager: Arc<SchemaManager>,
53 snapshot_manager: Arc<SnapshotManager>,
54 adjacency_manager: Arc<AdjacencyManager>,
55 table_cache: DashMap<String, lancedb::Table>,
57 pub config: UniConfig,
58 pub compaction_status: Arc<Mutex<CompactionStatus>>,
59 pinned_snapshot: Option<SnapshotManifest>,
61 lancedb_store: Arc<LanceDbStore>,
63 vid_labels_index: Option<Arc<parking_lot::RwLock<crate::storage::vid_labels::VidLabelsIndex>>>,
65}
66
67struct CompactionGuard {
69 status: Arc<Mutex<CompactionStatus>>,
70}
71
72impl CompactionGuard {
73 fn new(status: Arc<Mutex<CompactionStatus>>) -> Option<Self> {
74 let mut s = acquire_mutex(&status, "compaction_status").ok()?;
75 if s.compaction_in_progress {
76 return None;
77 }
78 s.compaction_in_progress = true;
79 Some(Self {
80 status: status.clone(),
81 })
82 }
83}
84
85impl Drop for CompactionGuard {
86 fn drop(&mut self) {
87 match uni_common::sync::acquire_mutex(&self.status, "compaction_status") {
90 Ok(mut s) => {
91 s.compaction_in_progress = false;
92 s.last_compaction = Some(std::time::SystemTime::now());
93 }
94 Err(e) => {
95 log::error!(
99 "CompactionGuard drop failed to acquire poisoned lock: {}. \
100 Compaction status may be inconsistent. Issue #18/#150",
101 e
102 );
103 }
104 }
105 }
106}
107
108impl StorageManager {
109 pub async fn new(base_uri: &str, schema_manager: Arc<SchemaManager>) -> Result<Self> {
111 Self::new_with_config(base_uri, schema_manager, UniConfig::default()).await
112 }
113
114 pub async fn new_with_cache(
116 base_uri: &str,
117 schema_manager: Arc<SchemaManager>,
118 adjacency_cache_size: usize,
119 ) -> Result<Self> {
120 let config = UniConfig {
121 cache_size: adjacency_cache_size,
122 ..Default::default()
123 };
124 Self::new_with_config(base_uri, schema_manager, config).await
125 }
126
127 pub async fn new_with_config(
129 base_uri: &str,
130 schema_manager: Arc<SchemaManager>,
131 config: UniConfig,
132 ) -> Result<Self> {
133 let store = Self::build_store_from_uri(base_uri)?;
134 Self::new_with_store_and_config(base_uri, store, schema_manager, config).await
135 }
136
137 pub async fn new_with_store_and_config(
142 base_uri: &str,
143 store: Arc<dyn ObjectStore>,
144 schema_manager: Arc<SchemaManager>,
145 config: UniConfig,
146 ) -> Result<Self> {
147 Self::new_with_store_and_storage_options(base_uri, store, schema_manager, config, None)
148 .await
149 }
150
151 pub async fn new_with_store_and_storage_options(
154 base_uri: &str,
155 store: Arc<dyn ObjectStore>,
156 schema_manager: Arc<SchemaManager>,
157 config: UniConfig,
158 lancedb_storage_options: Option<HashMap<String, String>>,
159 ) -> Result<Self> {
160 let resilient_store: Arc<dyn ObjectStore> = Arc::new(ResilientObjectStore::new(
161 store,
162 config.object_store.clone(),
163 ));
164
165 let snapshot_manager = Arc::new(SnapshotManager::new(resilient_store.clone()));
166
167 let lancedb_store =
169 LanceDbStore::connect_with_storage_options(base_uri, lancedb_storage_options).await?;
170
171 Self::recover_all_staging_tables(&lancedb_store, &schema_manager).await?;
173
174 let mut sm = Self {
175 base_uri: base_uri.to_string(),
176 store: resilient_store,
177 schema_manager,
178 snapshot_manager,
179 adjacency_manager: Arc::new(AdjacencyManager::new(config.cache_size)),
180 table_cache: DashMap::new(),
181 config,
182 compaction_status: Arc::new(Mutex::new(CompactionStatus::default())),
183 pinned_snapshot: None,
184 lancedb_store: Arc::new(lancedb_store),
185 vid_labels_index: None,
186 };
187
188 if sm.config.enable_vid_labels_index
190 && let Err(e) = sm.rebuild_vid_labels_index().await
191 {
192 warn!(
193 "Failed to rebuild VidLabelsIndex on startup: {}. Falling back to LanceDB queries.",
194 e
195 );
196 }
197
198 Ok(sm)
199 }
200
201 async fn recover_all_staging_tables(
206 lancedb_store: &LanceDbStore,
207 schema_manager: &SchemaManager,
208 ) -> Result<()> {
209 let schema = schema_manager.schema();
210
211 lancedb_store
213 .recover_staging(LanceDbStore::main_vertex_table_name())
214 .await?;
215 lancedb_store
216 .recover_staging(LanceDbStore::main_edge_table_name())
217 .await?;
218
219 for label in schema.labels.keys() {
221 let table_name = LanceDbStore::vertex_table_name(label);
222 lancedb_store.recover_staging(&table_name).await?;
223 }
224
225 for edge_type in schema.edge_types.keys() {
227 for direction in &["fwd", "bwd"] {
228 let delta_table_name = LanceDbStore::delta_table_name(edge_type, direction);
230 lancedb_store.recover_staging(&delta_table_name).await?;
231
232 for _label in schema.labels.keys() {
234 let adj_table_name = LanceDbStore::adjacency_table_name(edge_type, direction);
235 lancedb_store.recover_staging(&adj_table_name).await?;
236 }
237 }
238 }
239
240 Ok(())
241 }
242
243 fn build_store_from_uri(base_uri: &str) -> Result<Arc<dyn ObjectStore>> {
244 if base_uri.contains("://") {
245 let parsed = url::Url::parse(base_uri).map_err(|e| anyhow!("Invalid base URI: {e}"))?;
246 let (store, _path) = object_store::parse_url(&parsed)
247 .map_err(|e| anyhow!("Failed to parse object store URL: {e}"))?;
248 Ok(Arc::from(store))
249 } else {
250 std::fs::create_dir_all(base_uri)?;
252 Ok(Arc::new(LocalFileSystem::new_with_prefix(base_uri)?))
253 }
254 }
255
256 pub fn pinned(&self, snapshot: SnapshotManifest) -> Self {
257 Self {
258 base_uri: self.base_uri.clone(),
259 store: self.store.clone(),
260 schema_manager: self.schema_manager.clone(),
261 snapshot_manager: self.snapshot_manager.clone(),
262 adjacency_manager: Arc::new(AdjacencyManager::new(self.adjacency_manager.max_bytes())),
266 table_cache: DashMap::new(),
267 config: self.config.clone(),
268 compaction_status: Arc::new(Mutex::new(CompactionStatus::default())),
269 pinned_snapshot: Some(snapshot),
270 lancedb_store: self.lancedb_store.clone(),
271 vid_labels_index: self.vid_labels_index.clone(),
272 }
273 }
274
275 pub fn get_edge_version_by_id(&self, edge_type_id: u32) -> Option<u64> {
276 let schema = self.schema_manager.schema();
277 let name = schema.edge_type_name_by_id(edge_type_id)?;
278 self.pinned_snapshot
279 .as_ref()
280 .and_then(|s| s.edges.get(name).map(|es| es.lance_version))
281 }
282
283 pub fn version_high_water_mark(&self) -> Option<u64> {
289 self.pinned_snapshot
290 .as_ref()
291 .map(|s| s.version_high_water_mark)
292 }
293
294 pub fn apply_version_filter(&self, base_filter: String) -> String {
299 if let Some(hwm) = self.version_high_water_mark() {
300 format!("({}) AND (_version <= {})", base_filter, hwm)
301 } else {
302 base_filter
303 }
304 }
305
306 fn build_active_filter(user_filter: Option<&str>) -> String {
309 match user_filter {
310 Some(expr) => format!("({}) AND (_deleted = false)", expr),
311 None => "_deleted = false".to_string(),
312 }
313 }
314
315 pub fn store(&self) -> Arc<dyn ObjectStore> {
316 self.store.clone()
317 }
318
319 pub fn compaction_status(
325 &self,
326 ) -> Result<CompactionStatus, uni_common::sync::LockPoisonedError> {
327 let guard = uni_common::sync::acquire_mutex(&self.compaction_status, "compaction_status")?;
328 Ok(guard.clone())
329 }
330
331 pub async fn compact(&self) -> Result<CompactionStats> {
332 let start = std::time::Instant::now();
335 let schema = self.schema_manager.schema();
336 let mut files_compacted = 0;
337
338 for label in schema.labels.keys() {
339 let table_name = LanceDbStore::vertex_table_name(label);
340 if self.lancedb_store.table_exists(&table_name).await? {
341 let table = self.lancedb_store.open_table(&table_name).await?;
342 table.optimize(lancedb::table::OptimizeAction::All).await?;
343 files_compacted += 1;
344 self.invalidate_table_cache(label);
345 }
346 }
347
348 Ok(CompactionStats {
349 files_compacted,
350 bytes_before: 0,
351 bytes_after: 0,
352 duration: start.elapsed(),
353 crdt_merges: 0,
354 })
355 }
356
357 pub async fn compact_label(&self, label: &str) -> Result<CompactionStats> {
358 let _guard = CompactionGuard::new(self.compaction_status.clone())
359 .ok_or_else(|| anyhow!("Compaction already in progress"))?;
360
361 let start = std::time::Instant::now();
362 let table_name = LanceDbStore::vertex_table_name(label);
363
364 if self.lancedb_store.table_exists(&table_name).await? {
365 let table = self.lancedb_store.open_table(&table_name).await?;
366 table.optimize(lancedb::table::OptimizeAction::All).await?;
367 self.invalidate_table_cache(label);
368 }
369
370 Ok(CompactionStats {
371 files_compacted: 1,
372 bytes_before: 0,
373 bytes_after: 0,
374 duration: start.elapsed(),
375 crdt_merges: 0,
376 })
377 }
378
379 pub async fn compact_edge_type(&self, edge_type: &str) -> Result<CompactionStats> {
380 let _guard = CompactionGuard::new(self.compaction_status.clone())
381 .ok_or_else(|| anyhow!("Compaction already in progress"))?;
382
383 let start = std::time::Instant::now();
384 let mut files_compacted = 0;
385
386 for dir in ["fwd", "bwd"] {
387 let table_name = LanceDbStore::delta_table_name(edge_type, dir);
388 if self.lancedb_store.table_exists(&table_name).await? {
389 let table = self.lancedb_store.open_table(&table_name).await?;
390 table.optimize(lancedb::table::OptimizeAction::All).await?;
391 files_compacted += 1;
392 }
393 }
394
395 Ok(CompactionStats {
396 files_compacted,
397 bytes_before: 0,
398 bytes_after: 0,
399 duration: start.elapsed(),
400 crdt_merges: 0,
401 })
402 }
403
404 pub async fn wait_for_compaction(&self) -> Result<()> {
405 loop {
406 let in_progress = {
407 acquire_mutex(&self.compaction_status, "compaction_status")?.compaction_in_progress
408 };
409 if !in_progress {
410 return Ok(());
411 }
412 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
413 }
414 }
415
416 pub fn start_background_compaction(
417 self: Arc<Self>,
418 mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
419 ) -> tokio::task::JoinHandle<()> {
420 if !self.config.compaction.enabled {
421 return tokio::spawn(async {});
422 }
423
424 tokio::spawn(async move {
425 let mut interval = tokio::time::interval(self.config.compaction.check_interval);
426
427 loop {
428 tokio::select! {
429 _ = interval.tick() => {
430 if let Err(e) = self.update_compaction_status().await {
431 log::error!("Failed to update compaction status: {}", e);
432 continue;
433 }
434
435 if let Some(task) = self.pick_compaction_task() {
436 log::info!("Triggering background compaction: {:?}", task);
437 if let Err(e) = self.execute_compaction(task).await {
438 log::error!("Compaction failed: {}", e);
439 }
440 }
441 }
442 _ = shutdown_rx.recv() => {
443 log::info!("Background compaction shutting down");
444 let _ = self.wait_for_compaction().await;
445 break;
446 }
447 }
448 }
449 })
450 }
451
452 async fn update_compaction_status(&self) -> Result<()> {
453 let schema = self.schema_manager.schema();
454 let mut total_tables = 0;
455
456 for name in schema.edge_types.keys() {
458 for dir in ["fwd", "bwd"] {
459 let table_name = LanceDbStore::delta_table_name(name, dir);
460 if self.lancedb_store.table_exists(&table_name).await? {
461 total_tables += 1;
462 }
463 }
464 }
465
466 let mut status = acquire_mutex(&self.compaction_status, "compaction_status")?;
467 status.l1_runs = total_tables;
468 status.l1_size_bytes = 0; Ok(())
470 }
471
472 fn pick_compaction_task(&self) -> Option<CompactionTask> {
473 let status = acquire_mutex(&self.compaction_status, "compaction_status").ok()?;
474
475 if status.l1_runs >= self.config.compaction.max_l1_runs {
476 return Some(CompactionTask::ByRunCount);
477 }
478 if status.l1_size_bytes >= self.config.compaction.max_l1_size_bytes {
479 return Some(CompactionTask::BySize);
480 }
481 None
484 }
485
486 async fn execute_compaction(&self, _task: CompactionTask) -> Result<CompactionStats> {
487 let start = std::time::Instant::now();
488 let _guard = CompactionGuard::new(self.compaction_status.clone())
490 .ok_or_else(|| anyhow!("Compaction already in progress"))?;
491
492 let schema = self.schema_manager.schema();
493 let mut files_compacted = 0;
494
495 for name in schema.edge_types.keys() {
497 for dir in ["fwd", "bwd"] {
498 let table_name = LanceDbStore::delta_table_name(name, dir);
499 if self.lancedb_store.table_exists(&table_name).await? {
500 let table = self.lancedb_store.open_table(&table_name).await?;
501 table.optimize(lancedb::table::OptimizeAction::All).await?;
502 files_compacted += 1;
503 }
504 }
505 }
506
507 for label in schema.labels.keys() {
509 let table_name = LanceDbStore::vertex_table_name(label);
510 if self.lancedb_store.table_exists(&table_name).await? {
511 let table = self.lancedb_store.open_table(&table_name).await?;
512 table.optimize(lancedb::table::OptimizeAction::All).await?;
513 files_compacted += 1;
514 self.invalidate_table_cache(label);
515 }
516 }
517
518 {
519 let mut status = acquire_mutex(&self.compaction_status, "compaction_status")?;
520 status.total_compactions += 1;
521 }
522
523 Ok(CompactionStats {
524 files_compacted,
525 bytes_before: 0,
526 bytes_after: 0,
527 duration: start.elapsed(),
528 crdt_merges: 0,
529 })
530 }
531
532 pub async fn get_cached_table(&self, label: &str) -> Result<lancedb::Table> {
534 if let Some(table) = self.table_cache.get(label) {
536 return Ok(table.clone());
537 }
538
539 let table_name = LanceDbStore::vertex_table_name(label);
541 let table = self.lancedb_store.open_table(&table_name).await?;
542
543 self.table_cache.insert(label.to_string(), table.clone());
544 Ok(table)
545 }
546
547 pub fn invalidate_table_cache(&self, label: &str) {
549 self.table_cache.remove(label);
550 }
551
552 pub fn clear_table_cache(&self) {
554 self.table_cache.clear();
555 }
556
557 pub fn base_path(&self) -> &str {
558 &self.base_uri
559 }
560
561 pub fn schema_manager(&self) -> &SchemaManager {
562 &self.schema_manager
563 }
564
565 pub fn schema_manager_arc(&self) -> Arc<SchemaManager> {
566 self.schema_manager.clone()
567 }
568
569 pub fn adjacency_manager(&self) -> Arc<AdjacencyManager> {
571 Arc::clone(&self.adjacency_manager)
572 }
573
574 pub async fn warm_adjacency(
579 &self,
580 edge_type_id: u32,
581 direction: crate::storage::direction::Direction,
582 version: Option<u64>,
583 ) -> anyhow::Result<()> {
584 self.adjacency_manager
585 .warm(self, edge_type_id, direction, version)
586 .await
587 }
588
589 pub async fn warm_adjacency_coalesced(
594 &self,
595 edge_type_id: u32,
596 direction: crate::storage::direction::Direction,
597 version: Option<u64>,
598 ) -> anyhow::Result<()> {
599 self.adjacency_manager
600 .warm_coalesced(self, edge_type_id, direction, version)
601 .await
602 }
603
604 pub fn has_adjacency_csr(
606 &self,
607 edge_type_id: u32,
608 direction: crate::storage::direction::Direction,
609 ) -> bool {
610 self.adjacency_manager.has_csr(edge_type_id, direction)
611 }
612
613 pub fn get_neighbors_at_version(
615 &self,
616 vid: uni_common::core::id::Vid,
617 edge_type: u32,
618 direction: crate::storage::direction::Direction,
619 version: u64,
620 ) -> Vec<(uni_common::core::id::Vid, uni_common::core::id::Eid)> {
621 self.adjacency_manager
622 .get_neighbors_at_version(vid, edge_type, direction, version)
623 }
624
625 pub fn lancedb_store(&self) -> &LanceDbStore {
627 &self.lancedb_store
628 }
629
630 pub fn lancedb_store_arc(&self) -> Arc<LanceDbStore> {
632 self.lancedb_store.clone()
633 }
634
635 async fn rebuild_vid_labels_index(&mut self) -> Result<()> {
638 use crate::lancedb::LanceDbStore;
639 use crate::storage::vid_labels::VidLabelsIndex;
640
641 let lancedb_store = self.lancedb_store();
642
643 let table = match lancedb_store
645 .open_table(LanceDbStore::main_vertex_table_name())
646 .await
647 {
648 Ok(t) => t,
649 Err(_) => {
650 self.vid_labels_index =
652 Some(Arc::new(parking_lot::RwLock::new(VidLabelsIndex::new())));
653 return Ok(());
654 }
655 };
656
657 let batches = table
659 .query()
660 .only_if("_deleted = false")
661 .limit(100_000) .execute()
663 .await
664 .map_err(|e| anyhow!("Failed to query main vertex table: {}", e))?
665 .try_collect::<Vec<_>>()
666 .await
667 .map_err(|e| anyhow!("Failed to collect vertex data: {}", e))?;
668
669 let mut index = VidLabelsIndex::new();
670 for batch in batches {
671 let vid_col = batch
672 .column_by_name("_vid")
673 .ok_or_else(|| anyhow!("Missing _vid column"))?
674 .as_any()
675 .downcast_ref::<UInt64Array>()
676 .ok_or_else(|| anyhow!("Invalid _vid column type"))?;
677
678 let labels_col = batch
679 .column_by_name("labels")
680 .ok_or_else(|| anyhow!("Missing labels column"))?
681 .as_any()
682 .downcast_ref::<arrow_array::ListArray>()
683 .ok_or_else(|| anyhow!("Invalid labels column type"))?;
684
685 for row_idx in 0..batch.num_rows() {
686 let vid = Vid::from(vid_col.value(row_idx));
687 let labels_array = labels_col.value(row_idx);
688 let labels_str_array = labels_array
689 .as_any()
690 .downcast_ref::<arrow_array::StringArray>()
691 .ok_or_else(|| anyhow!("Invalid labels array element type"))?;
692
693 let labels: Vec<String> = (0..labels_str_array.len())
694 .map(|i| labels_str_array.value(i).to_string())
695 .collect();
696
697 index.insert(vid, labels);
698 }
699 }
700
701 self.vid_labels_index = Some(Arc::new(parking_lot::RwLock::new(index)));
702 Ok(())
703 }
704
705 pub fn get_labels_from_index(&self, vid: Vid) -> Option<Vec<String>> {
708 self.vid_labels_index.as_ref().and_then(|idx| {
709 let index = idx.read();
710 index.get_labels(vid).map(|labels| labels.to_vec())
711 })
712 }
713
714 pub fn update_vid_labels_index(&self, vid: Vid, labels: Vec<String>) {
717 if let Some(idx) = &self.vid_labels_index {
718 let mut index = idx.write();
719 index.insert(vid, labels);
720 }
721 }
722
723 pub fn remove_from_vid_labels_index(&self, vid: Vid) {
726 if let Some(idx) = &self.vid_labels_index {
727 let mut index = idx.write();
728 index.remove_vid(vid);
729 }
730 }
731
732 pub async fn load_subgraph_cached(
733 &self,
734 start_vids: &[Vid],
735 edge_types: &[u32],
736 max_hops: usize,
737 direction: GraphDirection,
738 _l0: Option<Arc<RwLock<L0Buffer>>>,
739 ) -> Result<WorkingGraph> {
740 let mut graph = WorkingGraph::new();
741
742 let dir = match direction {
743 GraphDirection::Outgoing => crate::storage::direction::Direction::Outgoing,
744 GraphDirection::Incoming => crate::storage::direction::Direction::Incoming,
745 };
746
747 let neighbor_is_dst = matches!(direction, GraphDirection::Outgoing);
748
749 let mut frontier: Vec<Vid> = start_vids.to_vec();
751 let mut visited: HashSet<Vid> = HashSet::new();
752
753 for &vid in start_vids {
755 graph.add_vertex(vid);
756 }
757
758 for _hop in 0..max_hops {
759 let mut next_frontier = HashSet::new();
760
761 for &vid in &frontier {
762 if visited.contains(&vid) {
763 continue;
764 }
765 visited.insert(vid);
766 graph.add_vertex(vid);
767
768 for &etype_id in edge_types {
769 let edge_ver = self.version_high_water_mark();
771 self.adjacency_manager
772 .warm_coalesced(self, etype_id, dir, edge_ver)
773 .await?;
774
775 let edges = self.adjacency_manager.get_neighbors(vid, etype_id, dir);
777
778 for (neighbor_vid, eid) in edges {
779 graph.add_vertex(neighbor_vid);
780 if !visited.contains(&neighbor_vid) {
781 next_frontier.insert(neighbor_vid);
782 }
783
784 if neighbor_is_dst {
785 graph.add_edge(vid, neighbor_vid, eid, etype_id);
786 } else {
787 graph.add_edge(neighbor_vid, vid, eid, etype_id);
788 }
789 }
790 }
791 }
792 frontier = next_frontier.into_iter().collect();
793
794 if frontier.is_empty() {
796 break;
797 }
798 }
799
800 Ok(graph)
801 }
802
803 pub fn snapshot_manager(&self) -> &SnapshotManager {
804 &self.snapshot_manager
805 }
806
807 pub fn index_manager(&self) -> IndexManager {
808 IndexManager::new(
809 &self.base_uri,
810 self.schema_manager.clone(),
811 self.lancedb_store.clone(),
812 )
813 }
814
815 pub fn vertex_dataset(&self, label: &str) -> Result<VertexDataset> {
816 let schema = self.schema_manager.schema();
817 let label_meta = schema
818 .labels
819 .get(label)
820 .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
821 Ok(VertexDataset::new(&self.base_uri, label, label_meta.id))
822 }
823
824 pub fn edge_dataset(
825 &self,
826 edge_type: &str,
827 src_label: &str,
828 dst_label: &str,
829 ) -> Result<EdgeDataset> {
830 Ok(EdgeDataset::new(
831 &self.base_uri,
832 edge_type,
833 src_label,
834 dst_label,
835 ))
836 }
837
838 pub fn delta_dataset(&self, edge_type: &str, direction: &str) -> Result<DeltaDataset> {
839 Ok(DeltaDataset::new(&self.base_uri, edge_type, direction))
840 }
841
842 pub fn adjacency_dataset(
843 &self,
844 edge_type: &str,
845 label: &str,
846 direction: &str,
847 ) -> Result<AdjacencyDataset> {
848 Ok(AdjacencyDataset::new(
849 &self.base_uri,
850 edge_type,
851 label,
852 direction,
853 ))
854 }
855
856 pub fn main_vertex_dataset(&self) -> MainVertexDataset {
861 MainVertexDataset::new(&self.base_uri)
862 }
863
864 pub fn main_edge_dataset(&self) -> MainEdgeDataset {
869 MainEdgeDataset::new(&self.base_uri)
870 }
871
872 pub fn uid_index(&self, label: &str) -> Result<UidIndex> {
873 Ok(UidIndex::new(&self.base_uri, label))
874 }
875
876 pub async fn inverted_index(&self, label: &str, property: &str) -> Result<InvertedIndex> {
877 let schema = self.schema_manager.schema();
878 let config = schema
879 .indexes
880 .iter()
881 .find_map(|idx| match idx {
882 IndexDefinition::Inverted(cfg)
883 if cfg.label == label && cfg.property == property =>
884 {
885 Some(cfg.clone())
886 }
887 _ => None,
888 })
889 .ok_or_else(|| anyhow!("Inverted index not found for {}.{}", label, property))?;
890
891 InvertedIndex::new(&self.base_uri, config).await
892 }
893
894 pub async fn vector_search(
895 &self,
896 label: &str,
897 property: &str,
898 query: &[f32],
899 k: usize,
900 filter: Option<&str>,
901 ctx: Option<&QueryContext>,
902 ) -> Result<Vec<(Vid, f32)>> {
903 let schema = self.schema_manager.schema();
905 let metric = schema
906 .vector_index_for_property(label, property)
907 .map(|config| config.metric.clone())
908 .unwrap_or(DistanceMetric::L2);
909
910 let table = self.get_cached_table(label).await.ok();
913
914 let mut results = Vec::new();
915
916 if let Some(table) = table {
917 let distance_type = match &metric {
918 DistanceMetric::L2 => lancedb::DistanceType::L2,
919 DistanceMetric::Cosine => lancedb::DistanceType::Cosine,
920 DistanceMetric::Dot => lancedb::DistanceType::Dot,
921 _ => lancedb::DistanceType::L2,
922 };
923
924 let mut query_builder = table
926 .vector_search(query.to_vec())
927 .map_err(|e| anyhow!("Failed to create vector search: {}", e))?
928 .column(property)
929 .distance_type(distance_type)
930 .limit(k);
931
932 query_builder = query_builder.only_if(Self::build_active_filter(filter));
933
934 if ctx.is_some()
936 && let Some(hwm) = self.version_high_water_mark()
937 {
938 query_builder = query_builder.only_if(format!("_version <= {}", hwm));
939 }
940
941 let batches = query_builder
942 .execute()
943 .await
944 .map_err(|e| anyhow!("Vector search execution failed: {}", e))?
945 .try_collect::<Vec<_>>()
946 .await
947 .map_err(|e| anyhow!("Failed to collect vector search results: {}", e))?;
948
949 results = extract_vid_score_pairs(&batches, "_vid", "_distance")?;
950 }
951
952 if let Some(qctx) = ctx {
954 merge_l0_into_vector_results(&mut results, qctx, label, property, query, k, &metric);
955 }
956
957 Ok(results)
958 }
959
960 pub async fn fts_search(
976 &self,
977 label: &str,
978 property: &str,
979 query: &str,
980 k: usize,
981 filter: Option<&str>,
982 ctx: Option<&QueryContext>,
983 ) -> Result<Vec<(Vid, f32)>> {
984 use lance_index::scalar::FullTextSearchQuery;
985 use lance_index::scalar::inverted::query::MatchQuery;
986
987 let table = match self.get_cached_table(label).await {
990 Ok(t) => t,
991 Err(_) => return Ok(Vec::new()),
992 };
993
994 let match_query =
996 MatchQuery::new(query.to_string()).with_column(Some(property.to_string()));
997 let fts_query = FullTextSearchQuery {
998 query: match_query.into(),
999 limit: Some(k as i64),
1000 wand_factor: None,
1001 };
1002
1003 let mut query_builder = table.query().full_text_search(fts_query).limit(k);
1004
1005 query_builder = query_builder.only_if(Self::build_active_filter(filter));
1006
1007 if ctx.is_some()
1009 && let Some(hwm) = self.version_high_water_mark()
1010 {
1011 query_builder = query_builder.only_if(format!("_version <= {}", hwm));
1012 }
1013
1014 let batches = query_builder
1015 .execute()
1016 .await
1017 .map_err(|e| anyhow!("FTS search execution failed: {}", e))?
1018 .try_collect::<Vec<_>>()
1019 .await
1020 .map_err(|e| anyhow!("Failed to collect FTS search results: {}", e))?;
1021
1022 let mut results = extract_vid_score_pairs(&batches, "_vid", "_score")?;
1023
1024 results.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
1026
1027 Ok(results)
1028 }
1029
1030 pub async fn get_vertex_by_uid(&self, uid: &UniId, label: &str) -> Result<Option<Vid>> {
1031 let index = self.uid_index(label)?;
1032 index.get_vid(uid).await
1033 }
1034
1035 pub async fn insert_vertex_with_uid(&self, label: &str, vid: Vid, uid: UniId) -> Result<()> {
1036 let index = self.uid_index(label)?;
1037 index.write_mapping(&[(uid, vid)]).await
1038 }
1039
1040 pub async fn load_subgraph(
1041 &self,
1042 start_vids: &[Vid],
1043 edge_types: &[u32],
1044 max_hops: usize,
1045 direction: GraphDirection,
1046 l0: Option<&L0Buffer>,
1047 ) -> Result<WorkingGraph> {
1048 let mut graph = WorkingGraph::new();
1049 let schema = self.schema_manager.schema();
1050
1051 let label_map: HashMap<u16, String> = schema
1053 .labels
1054 .values()
1055 .map(|meta| {
1056 (
1057 meta.id,
1058 schema.label_name_by_id(meta.id).unwrap().to_owned(),
1059 )
1060 })
1061 .collect();
1062
1063 let edge_type_map: HashMap<u32, String> = schema
1064 .edge_types
1065 .values()
1066 .map(|meta| {
1067 (
1068 meta.id,
1069 schema.edge_type_name_by_id(meta.id).unwrap().to_owned(),
1070 )
1071 })
1072 .collect();
1073
1074 let target_edge_types: HashSet<u32> = edge_types.iter().cloned().collect();
1075
1076 let mut frontier: Vec<Vid> = start_vids.to_vec();
1078 let mut visited: HashSet<Vid> = HashSet::new();
1079
1080 for &vid in start_vids {
1082 graph.add_vertex(vid);
1083 }
1084
1085 for _hop in 0..max_hops {
1086 let mut next_frontier = HashSet::new();
1087
1088 for &vid in &frontier {
1089 if visited.contains(&vid) {
1090 continue;
1091 }
1092 visited.insert(vid);
1093 graph.add_vertex(vid);
1094
1095 for &etype_id in &target_edge_types {
1097 let etype_name = edge_type_map
1098 .get(&etype_id)
1099 .ok_or_else(|| anyhow!("Unknown edge type ID: {}", etype_id))?;
1100
1101 let (dir_str, neighbor_is_dst) = match direction {
1105 GraphDirection::Outgoing => ("fwd", true),
1106 GraphDirection::Incoming => ("bwd", false),
1107 };
1108
1109 let mut edges: HashMap<Eid, EdgeState> = HashMap::new();
1110
1111 let _edge_ver = self
1116 .pinned_snapshot
1117 .as_ref()
1118 .and_then(|s| s.edges.get(etype_name).map(|es| es.lance_version));
1119
1120 let lancedb_store = self.lancedb_store();
1122 for current_src_label in label_map.values() {
1123 let adj_ds =
1124 match self.adjacency_dataset(etype_name, current_src_label, dir_str) {
1125 Ok(ds) => ds,
1126 Err(_) => continue,
1127 };
1128 if let Some((neighbors, eids)) =
1129 adj_ds.read_adjacency_lancedb(lancedb_store, vid).await?
1130 {
1131 for (n, eid) in neighbors.into_iter().zip(eids) {
1132 edges.insert(
1133 eid,
1134 EdgeState {
1135 neighbor: n,
1136 version: 0,
1137 deleted: false,
1138 },
1139 );
1140 }
1141 break; }
1143 }
1144
1145 let delta_ds = self.delta_dataset(etype_name, dir_str)?;
1147 let delta_entries = delta_ds
1148 .read_deltas_lancedb(
1149 lancedb_store,
1150 vid,
1151 &schema,
1152 self.version_high_water_mark(),
1153 )
1154 .await?;
1155 Self::apply_delta_to_edges(&mut edges, delta_entries, neighbor_is_dst);
1156
1157 if let Some(l0) = l0 {
1159 Self::apply_l0_to_edges(&mut edges, l0, vid, etype_id, direction);
1160 }
1161
1162 Self::add_edges_to_graph(
1164 &mut graph,
1165 edges,
1166 vid,
1167 etype_id,
1168 neighbor_is_dst,
1169 &visited,
1170 &mut next_frontier,
1171 );
1172 }
1173 }
1174 frontier = next_frontier.into_iter().collect();
1175
1176 if frontier.is_empty() {
1178 break;
1179 }
1180 }
1181
1182 Ok(graph)
1183 }
1184
1185 fn apply_delta_to_edges(
1187 edges: &mut HashMap<Eid, EdgeState>,
1188 delta_entries: Vec<crate::storage::delta::L1Entry>,
1189 neighbor_is_dst: bool,
1190 ) {
1191 for entry in delta_entries {
1192 let neighbor = if neighbor_is_dst {
1193 entry.dst_vid
1194 } else {
1195 entry.src_vid
1196 };
1197 let current_ver = edges.get(&entry.eid).map(|s| s.version).unwrap_or(0);
1198
1199 if entry.version > current_ver {
1200 edges.insert(
1201 entry.eid,
1202 EdgeState {
1203 neighbor,
1204 version: entry.version,
1205 deleted: matches!(entry.op, Op::Delete),
1206 },
1207 );
1208 }
1209 }
1210 }
1211
1212 fn apply_l0_to_edges(
1214 edges: &mut HashMap<Eid, EdgeState>,
1215 l0: &L0Buffer,
1216 vid: Vid,
1217 etype_id: u32,
1218 direction: GraphDirection,
1219 ) {
1220 let l0_neighbors = l0.get_neighbors(vid, etype_id, direction);
1221 for (neighbor, eid, ver) in l0_neighbors {
1222 let current_ver = edges.get(&eid).map(|s| s.version).unwrap_or(0);
1223 if ver > current_ver {
1224 edges.insert(
1225 eid,
1226 EdgeState {
1227 neighbor,
1228 version: ver,
1229 deleted: false,
1230 },
1231 );
1232 }
1233 }
1234
1235 for (eid, state) in edges.iter_mut() {
1237 if l0.is_tombstoned(*eid) {
1238 state.deleted = true;
1239 }
1240 }
1241 }
1242
1243 fn add_edges_to_graph(
1245 graph: &mut WorkingGraph,
1246 edges: HashMap<Eid, EdgeState>,
1247 vid: Vid,
1248 etype_id: u32,
1249 neighbor_is_dst: bool,
1250 visited: &HashSet<Vid>,
1251 next_frontier: &mut HashSet<Vid>,
1252 ) {
1253 for (eid, state) in edges {
1254 if state.deleted {
1255 continue;
1256 }
1257 graph.add_vertex(state.neighbor);
1258
1259 if !visited.contains(&state.neighbor) {
1260 next_frontier.insert(state.neighbor);
1261 }
1262
1263 if neighbor_is_dst {
1264 graph.add_edge(vid, state.neighbor, eid, etype_id);
1265 } else {
1266 graph.add_edge(state.neighbor, vid, eid, etype_id);
1267 }
1268 }
1269 }
1270}
1271
1272fn extract_vid_score_pairs(
1274 batches: &[arrow_array::RecordBatch],
1275 vid_column: &str,
1276 score_column: &str,
1277) -> Result<Vec<(Vid, f32)>> {
1278 let mut results = Vec::new();
1279 for batch in batches {
1280 let vid_col = batch
1281 .column_by_name(vid_column)
1282 .ok_or_else(|| anyhow!("Missing {} column", vid_column))?
1283 .as_any()
1284 .downcast_ref::<UInt64Array>()
1285 .ok_or_else(|| anyhow!("Invalid {} column type", vid_column))?;
1286
1287 let score_col = batch
1288 .column_by_name(score_column)
1289 .ok_or_else(|| anyhow!("Missing {} column", score_column))?
1290 .as_any()
1291 .downcast_ref::<Float32Array>()
1292 .ok_or_else(|| anyhow!("Invalid {} column type", score_column))?;
1293
1294 for i in 0..batch.num_rows() {
1295 results.push((Vid::from(vid_col.value(i)), score_col.value(i)));
1296 }
1297 }
1298 Ok(results)
1299}
1300
1301fn extract_embedding_from_props(
1306 props: &uni_common::Properties,
1307 property: &str,
1308) -> Option<Vec<f32>> {
1309 let arr = props.get(property)?.as_array()?;
1310 arr.iter().map(|v| v.as_f64().map(|f| f as f32)).collect()
1311}
1312
1313fn merge_l0_into_vector_results(
1323 results: &mut Vec<(Vid, f32)>,
1324 ctx: &QueryContext,
1325 label: &str,
1326 property: &str,
1327 query: &[f32],
1328 k: usize,
1329 metric: &DistanceMetric,
1330) {
1331 let mut buffers: Vec<Arc<parking_lot::RwLock<L0Buffer>>> =
1333 ctx.pending_flush_l0s.iter().map(Arc::clone).collect();
1334 buffers.push(Arc::clone(&ctx.l0));
1335 if let Some(ref txn) = ctx.transaction_l0 {
1336 buffers.push(Arc::clone(txn));
1337 }
1338
1339 let mut l0_candidates: HashMap<Vid, f32> = HashMap::new();
1341 let mut tombstoned: HashSet<Vid> = HashSet::new();
1343
1344 for buf_arc in &buffers {
1345 let buf = buf_arc.read();
1346
1347 for &vid in &buf.vertex_tombstones {
1349 tombstoned.insert(vid);
1350 }
1351
1352 for (&vid, labels) in &buf.vertex_labels {
1354 if !labels.iter().any(|l| l == label) {
1355 continue;
1356 }
1357 if let Some(props) = buf.vertex_properties.get(&vid)
1358 && let Some(emb) = extract_embedding_from_props(props, property)
1359 {
1360 if emb.len() != query.len() {
1361 continue; }
1363 let dist = metric.compute_distance(&emb, query);
1364 l0_candidates.insert(vid, dist);
1366 tombstoned.remove(&vid);
1368 }
1369 }
1370 }
1371
1372 if l0_candidates.is_empty() && tombstoned.is_empty() {
1374 return;
1375 }
1376
1377 results.retain(|(vid, _)| !tombstoned.contains(vid));
1379
1380 for (vid, dist) in &l0_candidates {
1382 if let Some(existing) = results.iter_mut().find(|(v, _)| v == vid) {
1383 existing.1 = *dist;
1384 } else {
1385 results.push((*vid, *dist));
1386 }
1387 }
1388
1389 results.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
1391 results.truncate(k);
1392}