1use crate::compaction::{CompactionStats, CompactionStatus, CompactionTask};
5use crate::lancedb::LanceDbStore;
6use crate::runtime::WorkingGraph;
7use crate::runtime::context::QueryContext;
8use crate::runtime::l0::L0Buffer;
9use crate::storage::adjacency::AdjacencyDataset;
10use crate::storage::compaction::Compactor;
11use crate::storage::delta::{DeltaDataset, ENTRY_SIZE_ESTIMATE, Op};
12use crate::storage::direction::Direction;
13use crate::storage::edge::EdgeDataset;
14use crate::storage::index::UidIndex;
15use crate::storage::inverted_index::InvertedIndex;
16use crate::storage::main_edge::MainEdgeDataset;
17use crate::storage::main_vertex::MainVertexDataset;
18use crate::storage::vertex::VertexDataset;
19use anyhow::{Result, anyhow};
20use arrow_array::{Array, Float32Array, TimestampNanosecondArray, UInt64Array};
21use dashmap::DashMap;
22use futures::TryStreamExt;
23use lancedb::query::{ExecutableQuery, QueryBase, Select};
24use object_store::ObjectStore;
25use object_store::local::LocalFileSystem;
26use parking_lot::RwLock;
27use std::collections::{HashMap, HashSet};
28use std::sync::{Arc, Mutex};
29use std::time::{Duration, SystemTime, UNIX_EPOCH};
30use tracing::warn;
31use uni_common::config::UniConfig;
32use uni_common::core::id::{Eid, UniId, Vid};
33use uni_common::core::schema::{DistanceMetric, IndexDefinition, SchemaManager};
34use uni_common::sync::acquire_mutex;
35
36use crate::snapshot::manager::SnapshotManager;
37use crate::storage::IndexManager;
38use crate::storage::adjacency_manager::AdjacencyManager;
39use crate::storage::resilient_store::ResilientObjectStore;
40
41use uni_common::core::snapshot::SnapshotManifest;
42
43use uni_common::graph::simple_graph::Direction as GraphDirection;
44
45struct EdgeState {
47 neighbor: Vid,
48 version: u64,
49 deleted: bool,
50}
51
52pub struct StorageManager {
53 base_uri: String,
54 store: Arc<dyn ObjectStore>,
55 schema_manager: Arc<SchemaManager>,
56 snapshot_manager: Arc<SnapshotManager>,
57 adjacency_manager: Arc<AdjacencyManager>,
58 table_cache: DashMap<String, lancedb::Table>,
60 pub config: UniConfig,
61 pub compaction_status: Arc<Mutex<CompactionStatus>>,
62 pinned_snapshot: Option<SnapshotManifest>,
64 lancedb_store: Arc<LanceDbStore>,
66 vid_labels_index: Option<Arc<parking_lot::RwLock<crate::storage::vid_labels::VidLabelsIndex>>>,
68}
69
70struct CompactionGuard {
72 status: Arc<Mutex<CompactionStatus>>,
73}
74
75impl CompactionGuard {
76 fn new(status: Arc<Mutex<CompactionStatus>>) -> Option<Self> {
77 let mut s = acquire_mutex(&status, "compaction_status").ok()?;
78 if s.compaction_in_progress {
79 return None;
80 }
81 s.compaction_in_progress = true;
82 Some(Self {
83 status: status.clone(),
84 })
85 }
86}
87
88impl Drop for CompactionGuard {
89 fn drop(&mut self) {
90 match uni_common::sync::acquire_mutex(&self.status, "compaction_status") {
93 Ok(mut s) => {
94 s.compaction_in_progress = false;
95 s.last_compaction = Some(std::time::SystemTime::now());
96 }
97 Err(e) => {
98 log::error!(
102 "CompactionGuard drop failed to acquire poisoned lock: {}. \
103 Compaction status may be inconsistent. Issue #18/#150",
104 e
105 );
106 }
107 }
108 }
109}
110
111impl StorageManager {
112 pub async fn new(base_uri: &str, schema_manager: Arc<SchemaManager>) -> Result<Self> {
114 Self::new_with_config(base_uri, schema_manager, UniConfig::default()).await
115 }
116
117 pub async fn new_with_cache(
119 base_uri: &str,
120 schema_manager: Arc<SchemaManager>,
121 adjacency_cache_size: usize,
122 ) -> Result<Self> {
123 let config = UniConfig {
124 cache_size: adjacency_cache_size,
125 ..Default::default()
126 };
127 Self::new_with_config(base_uri, schema_manager, config).await
128 }
129
130 pub async fn new_with_config(
132 base_uri: &str,
133 schema_manager: Arc<SchemaManager>,
134 config: UniConfig,
135 ) -> Result<Self> {
136 let store = Self::build_store_from_uri(base_uri)?;
137 Self::new_with_store_and_config(base_uri, store, schema_manager, config).await
138 }
139
140 pub async fn new_with_store_and_config(
145 base_uri: &str,
146 store: Arc<dyn ObjectStore>,
147 schema_manager: Arc<SchemaManager>,
148 config: UniConfig,
149 ) -> Result<Self> {
150 Self::new_with_store_and_storage_options(base_uri, store, schema_manager, config, None)
151 .await
152 }
153
154 pub async fn new_with_store_and_storage_options(
157 base_uri: &str,
158 store: Arc<dyn ObjectStore>,
159 schema_manager: Arc<SchemaManager>,
160 config: UniConfig,
161 lancedb_storage_options: Option<HashMap<String, String>>,
162 ) -> Result<Self> {
163 let resilient_store: Arc<dyn ObjectStore> = Arc::new(ResilientObjectStore::new(
164 store,
165 config.object_store.clone(),
166 ));
167
168 let snapshot_manager = Arc::new(SnapshotManager::new(resilient_store.clone()));
169
170 let lancedb_store =
172 LanceDbStore::connect_with_storage_options(base_uri, lancedb_storage_options).await?;
173
174 Self::recover_all_staging_tables(&lancedb_store, &schema_manager).await?;
176
177 let mut sm = Self {
178 base_uri: base_uri.to_string(),
179 store: resilient_store,
180 schema_manager,
181 snapshot_manager,
182 adjacency_manager: Arc::new(AdjacencyManager::new(config.cache_size)),
183 table_cache: DashMap::new(),
184 config,
185 compaction_status: Arc::new(Mutex::new(CompactionStatus::default())),
186 pinned_snapshot: None,
187 lancedb_store: Arc::new(lancedb_store),
188 vid_labels_index: None,
189 };
190
191 if sm.config.enable_vid_labels_index
193 && let Err(e) = sm.rebuild_vid_labels_index().await
194 {
195 warn!(
196 "Failed to rebuild VidLabelsIndex on startup: {}. Falling back to LanceDB queries.",
197 e
198 );
199 }
200
201 Ok(sm)
202 }
203
204 async fn recover_all_staging_tables(
209 lancedb_store: &LanceDbStore,
210 schema_manager: &SchemaManager,
211 ) -> Result<()> {
212 let schema = schema_manager.schema();
213
214 lancedb_store
216 .recover_staging(LanceDbStore::main_vertex_table_name())
217 .await?;
218 lancedb_store
219 .recover_staging(LanceDbStore::main_edge_table_name())
220 .await?;
221
222 for label in schema.labels.keys() {
224 let table_name = LanceDbStore::vertex_table_name(label);
225 lancedb_store.recover_staging(&table_name).await?;
226 }
227
228 for edge_type in schema.edge_types.keys() {
230 for direction in &["fwd", "bwd"] {
231 let delta_table_name = LanceDbStore::delta_table_name(edge_type, direction);
233 lancedb_store.recover_staging(&delta_table_name).await?;
234
235 for _label in schema.labels.keys() {
237 let adj_table_name = LanceDbStore::adjacency_table_name(edge_type, direction);
238 lancedb_store.recover_staging(&adj_table_name).await?;
239 }
240 }
241 }
242
243 Ok(())
244 }
245
246 fn build_store_from_uri(base_uri: &str) -> Result<Arc<dyn ObjectStore>> {
247 if base_uri.contains("://") {
248 let parsed = url::Url::parse(base_uri).map_err(|e| anyhow!("Invalid base URI: {e}"))?;
249 let (store, _path) = object_store::parse_url(&parsed)
250 .map_err(|e| anyhow!("Failed to parse object store URL: {e}"))?;
251 Ok(Arc::from(store))
252 } else {
253 std::fs::create_dir_all(base_uri)?;
255 Ok(Arc::new(LocalFileSystem::new_with_prefix(base_uri)?))
256 }
257 }
258
259 pub fn pinned(&self, snapshot: SnapshotManifest) -> Self {
260 Self {
261 base_uri: self.base_uri.clone(),
262 store: self.store.clone(),
263 schema_manager: self.schema_manager.clone(),
264 snapshot_manager: self.snapshot_manager.clone(),
265 adjacency_manager: Arc::new(AdjacencyManager::new(self.adjacency_manager.max_bytes())),
269 table_cache: DashMap::new(),
270 config: self.config.clone(),
271 compaction_status: Arc::new(Mutex::new(CompactionStatus::default())),
272 pinned_snapshot: Some(snapshot),
273 lancedb_store: self.lancedb_store.clone(),
274 vid_labels_index: self.vid_labels_index.clone(),
275 }
276 }
277
278 pub fn get_edge_version_by_id(&self, edge_type_id: u32) -> Option<u64> {
279 let schema = self.schema_manager.schema();
280 let name = schema.edge_type_name_by_id(edge_type_id)?;
281 self.pinned_snapshot
282 .as_ref()
283 .and_then(|s| s.edges.get(name).map(|es| es.lance_version))
284 }
285
286 pub fn version_high_water_mark(&self) -> Option<u64> {
292 self.pinned_snapshot
293 .as_ref()
294 .map(|s| s.version_high_water_mark)
295 }
296
297 pub fn apply_version_filter(&self, base_filter: String) -> String {
302 if let Some(hwm) = self.version_high_water_mark() {
303 format!("({}) AND (_version <= {})", base_filter, hwm)
304 } else {
305 base_filter
306 }
307 }
308
309 fn build_active_filter(user_filter: Option<&str>) -> String {
312 match user_filter {
313 Some(expr) => format!("({}) AND (_deleted = false)", expr),
314 None => "_deleted = false".to_string(),
315 }
316 }
317
318 pub fn store(&self) -> Arc<dyn ObjectStore> {
319 self.store.clone()
320 }
321
322 pub fn compaction_status(
328 &self,
329 ) -> Result<CompactionStatus, uni_common::sync::LockPoisonedError> {
330 let guard = uni_common::sync::acquire_mutex(&self.compaction_status, "compaction_status")?;
331 Ok(guard.clone())
332 }
333
334 pub async fn compact(&self) -> Result<CompactionStats> {
335 let start = std::time::Instant::now();
338 let schema = self.schema_manager.schema();
339 let mut files_compacted = 0;
340
341 for label in schema.labels.keys() {
342 let table_name = LanceDbStore::vertex_table_name(label);
343 if self.lancedb_store.table_exists(&table_name).await? {
344 let table = self.lancedb_store.open_table(&table_name).await?;
345 table.optimize(lancedb::table::OptimizeAction::All).await?;
346 files_compacted += 1;
347 self.invalidate_table_cache(label);
348 }
349 }
350
351 Ok(CompactionStats {
352 files_compacted,
353 bytes_before: 0,
354 bytes_after: 0,
355 duration: start.elapsed(),
356 crdt_merges: 0,
357 })
358 }
359
360 pub async fn compact_label(&self, label: &str) -> Result<CompactionStats> {
361 let _guard = CompactionGuard::new(self.compaction_status.clone())
362 .ok_or_else(|| anyhow!("Compaction already in progress"))?;
363
364 let start = std::time::Instant::now();
365 let table_name = LanceDbStore::vertex_table_name(label);
366
367 if self.lancedb_store.table_exists(&table_name).await? {
368 let table = self.lancedb_store.open_table(&table_name).await?;
369 table.optimize(lancedb::table::OptimizeAction::All).await?;
370 self.invalidate_table_cache(label);
371 }
372
373 Ok(CompactionStats {
374 files_compacted: 1,
375 bytes_before: 0,
376 bytes_after: 0,
377 duration: start.elapsed(),
378 crdt_merges: 0,
379 })
380 }
381
382 pub async fn compact_edge_type(&self, edge_type: &str) -> Result<CompactionStats> {
383 let _guard = CompactionGuard::new(self.compaction_status.clone())
384 .ok_or_else(|| anyhow!("Compaction already in progress"))?;
385
386 let start = std::time::Instant::now();
387 let mut files_compacted = 0;
388
389 for dir in ["fwd", "bwd"] {
390 let table_name = LanceDbStore::delta_table_name(edge_type, dir);
391 if self.lancedb_store.table_exists(&table_name).await? {
392 let table = self.lancedb_store.open_table(&table_name).await?;
393 table.optimize(lancedb::table::OptimizeAction::All).await?;
394 files_compacted += 1;
395 }
396 }
397
398 Ok(CompactionStats {
399 files_compacted,
400 bytes_before: 0,
401 bytes_after: 0,
402 duration: start.elapsed(),
403 crdt_merges: 0,
404 })
405 }
406
407 pub async fn wait_for_compaction(&self) -> Result<()> {
408 loop {
409 let in_progress = {
410 acquire_mutex(&self.compaction_status, "compaction_status")?.compaction_in_progress
411 };
412 if !in_progress {
413 return Ok(());
414 }
415 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
416 }
417 }
418
419 pub fn start_background_compaction(
420 self: Arc<Self>,
421 mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
422 ) -> tokio::task::JoinHandle<()> {
423 if !self.config.compaction.enabled {
424 return tokio::spawn(async {});
425 }
426
427 tokio::spawn(async move {
428 let start = tokio::time::Instant::now() + self.config.compaction.check_interval;
434 let mut interval =
435 tokio::time::interval_at(start, self.config.compaction.check_interval);
436
437 loop {
438 tokio::select! {
439 _ = interval.tick() => {
440 if let Err(e) = self.update_compaction_status().await {
441 log::error!("Failed to update compaction status: {}", e);
442 continue;
443 }
444
445 if let Some(task) = self.pick_compaction_task() {
446 log::info!("Triggering background compaction: {:?}", task);
447 if let Err(e) = Self::execute_compaction(Arc::clone(&self), task).await {
448 log::error!("Compaction failed: {}", e);
449 }
450 }
451 }
452 _ = shutdown_rx.recv() => {
453 log::info!("Background compaction shutting down");
454 let _ = self.wait_for_compaction().await;
455 break;
456 }
457 }
458 }
459 })
460 }
461
462 async fn update_compaction_status(&self) -> Result<()> {
463 let schema = self.schema_manager.schema();
464 let mut total_tables = 0;
465 let mut total_rows: usize = 0;
466 let mut oldest_ts: Option<i64> = None;
467
468 for name in schema.edge_types.keys() {
469 for dir in ["fwd", "bwd"] {
470 let table_name = LanceDbStore::delta_table_name(name, dir);
471 let Ok(table) = self.lancedb_store.open_table(&table_name).await else {
472 continue;
473 };
474 let row_count = table.count_rows(None).await.unwrap_or(0);
475 if row_count == 0 {
476 continue;
477 }
478 total_tables += 1;
479 total_rows += row_count;
480
481 let Ok(stream) = table
483 .query()
484 .select(Select::Columns(vec!["_created_at".to_string()]))
485 .execute()
486 .await
487 else {
488 continue;
489 };
490 let Ok(batches) = stream.try_collect::<Vec<_>>().await else {
491 continue;
492 };
493 for batch in batches {
494 let Some(col) = batch
495 .column_by_name("_created_at")
496 .and_then(|c| c.as_any().downcast_ref::<TimestampNanosecondArray>())
497 else {
498 continue;
499 };
500 for i in 0..col.len() {
501 if !col.is_null(i) {
502 let ts = col.value(i);
503 oldest_ts = Some(oldest_ts.map_or(ts, |prev| prev.min(ts)));
504 }
505 }
506 }
507 }
508 }
509
510 let oldest_l1_age = oldest_ts
511 .and_then(|ts| {
512 let created = UNIX_EPOCH + Duration::from_nanos(ts as u64);
513 SystemTime::now().duration_since(created).ok()
514 })
515 .unwrap_or(Duration::ZERO);
516
517 let mut status = acquire_mutex(&self.compaction_status, "compaction_status")?;
518 status.l1_runs = total_tables;
519 status.l1_size_bytes = (total_rows * ENTRY_SIZE_ESTIMATE) as u64;
520 status.oldest_l1_age = oldest_l1_age;
521 Ok(())
522 }
523
524 fn pick_compaction_task(&self) -> Option<CompactionTask> {
525 let status = acquire_mutex(&self.compaction_status, "compaction_status").ok()?;
526
527 if status.l1_runs >= self.config.compaction.max_l1_runs {
528 return Some(CompactionTask::ByRunCount);
529 }
530 if status.l1_size_bytes >= self.config.compaction.max_l1_size_bytes {
531 return Some(CompactionTask::BySize);
532 }
533 if status.oldest_l1_age >= self.config.compaction.max_l1_age
534 && status.oldest_l1_age > Duration::ZERO
535 {
536 return Some(CompactionTask::ByAge);
537 }
538
539 None
540 }
541
542 async fn optimize_table(store: &LanceDbStore, table_name: &str) -> bool {
544 let Ok(table) = store.open_table(table_name).await else {
545 return false;
546 };
547 if let Err(e) = table.optimize(lancedb::table::OptimizeAction::All).await {
548 log::warn!("Failed to optimize table {}: {}", table_name, e);
549 return false;
550 }
551 true
552 }
553
554 async fn execute_compaction(this: Arc<Self>, _task: CompactionTask) -> Result<CompactionStats> {
555 let start = std::time::Instant::now();
556 let _guard = CompactionGuard::new(this.compaction_status.clone())
557 .ok_or_else(|| anyhow!("Compaction already in progress"))?;
558
559 let schema = this.schema_manager.schema();
560 let mut files_compacted = 0;
561
562 let compactor = Compactor::new(Arc::clone(&this));
565 let compaction_results = compactor.compact_all().await.unwrap_or_else(|e| {
566 log::error!(
567 "Semantic compaction failed (continuing with Lance optimize): {}",
568 e
569 );
570 Vec::new()
571 });
572
573 let am = this.adjacency_manager();
575 for info in &compaction_results {
576 let direction = match info.direction.as_str() {
577 "fwd" => Direction::Outgoing,
578 "bwd" => Direction::Incoming,
579 _ => continue,
580 };
581 if let Some(etid) = schema.edge_type_id_unified_case_insensitive(&info.edge_type)
582 && let Err(e) = am.warm(&this, etid, direction, None).await
583 {
584 log::warn!(
585 "Failed to re-warm adjacency for {}/{}: {}",
586 info.edge_type,
587 info.direction,
588 e
589 );
590 }
591 }
592
593 let store = &this.lancedb_store;
595
596 for name in schema.edge_types.keys() {
598 for dir in ["fwd", "bwd"] {
599 let delta = LanceDbStore::delta_table_name(name, dir);
600 if Self::optimize_table(store, &delta).await {
601 files_compacted += 1;
602 }
603 let adj = LanceDbStore::adjacency_table_name(name, dir);
604 if Self::optimize_table(store, &adj).await {
605 files_compacted += 1;
606 }
607 }
608 }
609
610 for label in schema.labels.keys() {
612 let table_name = LanceDbStore::vertex_table_name(label);
613 if Self::optimize_table(store, &table_name).await {
614 files_compacted += 1;
615 this.invalidate_table_cache(label);
616 }
617 }
618
619 for table_name in [
621 LanceDbStore::main_vertex_table_name(),
622 LanceDbStore::main_edge_table_name(),
623 ] {
624 if Self::optimize_table(store, table_name).await {
625 files_compacted += 1;
626 }
627 }
628
629 {
630 let mut status = acquire_mutex(&this.compaction_status, "compaction_status")?;
631 status.total_compactions += 1;
632 }
633
634 Ok(CompactionStats {
635 files_compacted,
636 bytes_before: 0,
637 bytes_after: 0,
638 duration: start.elapsed(),
639 crdt_merges: 0,
640 })
641 }
642
643 pub async fn get_cached_table(&self, label: &str) -> Result<lancedb::Table> {
645 if let Some(table) = self.table_cache.get(label) {
647 return Ok(table.clone());
648 }
649
650 let table_name = LanceDbStore::vertex_table_name(label);
652 let table = self.lancedb_store.open_table(&table_name).await?;
653
654 self.table_cache.insert(label.to_string(), table.clone());
655 Ok(table)
656 }
657
658 pub fn invalidate_table_cache(&self, label: &str) {
660 self.table_cache.remove(label);
661 }
662
663 pub fn clear_table_cache(&self) {
665 self.table_cache.clear();
666 }
667
668 pub fn base_path(&self) -> &str {
669 &self.base_uri
670 }
671
672 pub fn schema_manager(&self) -> &SchemaManager {
673 &self.schema_manager
674 }
675
676 pub fn schema_manager_arc(&self) -> Arc<SchemaManager> {
677 self.schema_manager.clone()
678 }
679
680 pub fn adjacency_manager(&self) -> Arc<AdjacencyManager> {
682 Arc::clone(&self.adjacency_manager)
683 }
684
685 pub async fn warm_adjacency(
690 &self,
691 edge_type_id: u32,
692 direction: crate::storage::direction::Direction,
693 version: Option<u64>,
694 ) -> anyhow::Result<()> {
695 self.adjacency_manager
696 .warm(self, edge_type_id, direction, version)
697 .await
698 }
699
700 pub async fn warm_adjacency_coalesced(
705 &self,
706 edge_type_id: u32,
707 direction: crate::storage::direction::Direction,
708 version: Option<u64>,
709 ) -> anyhow::Result<()> {
710 self.adjacency_manager
711 .warm_coalesced(self, edge_type_id, direction, version)
712 .await
713 }
714
715 pub fn has_adjacency_csr(
717 &self,
718 edge_type_id: u32,
719 direction: crate::storage::direction::Direction,
720 ) -> bool {
721 self.adjacency_manager.has_csr(edge_type_id, direction)
722 }
723
724 pub fn get_neighbors_at_version(
726 &self,
727 vid: uni_common::core::id::Vid,
728 edge_type: u32,
729 direction: crate::storage::direction::Direction,
730 version: u64,
731 ) -> Vec<(uni_common::core::id::Vid, uni_common::core::id::Eid)> {
732 self.adjacency_manager
733 .get_neighbors_at_version(vid, edge_type, direction, version)
734 }
735
736 pub fn lancedb_store(&self) -> &LanceDbStore {
738 &self.lancedb_store
739 }
740
741 pub fn lancedb_store_arc(&self) -> Arc<LanceDbStore> {
743 self.lancedb_store.clone()
744 }
745
746 async fn rebuild_vid_labels_index(&mut self) -> Result<()> {
749 use crate::lancedb::LanceDbStore;
750 use crate::storage::vid_labels::VidLabelsIndex;
751
752 let lancedb_store = self.lancedb_store();
753
754 let table = match lancedb_store
756 .open_table(LanceDbStore::main_vertex_table_name())
757 .await
758 {
759 Ok(t) => t,
760 Err(_) => {
761 self.vid_labels_index =
763 Some(Arc::new(parking_lot::RwLock::new(VidLabelsIndex::new())));
764 return Ok(());
765 }
766 };
767
768 let batches = table
770 .query()
771 .only_if("_deleted = false")
772 .limit(100_000) .execute()
774 .await
775 .map_err(|e| anyhow!("Failed to query main vertex table: {}", e))?
776 .try_collect::<Vec<_>>()
777 .await
778 .map_err(|e| anyhow!("Failed to collect vertex data: {}", e))?;
779
780 let mut index = VidLabelsIndex::new();
781 for batch in batches {
782 let vid_col = batch
783 .column_by_name("_vid")
784 .ok_or_else(|| anyhow!("Missing _vid column"))?
785 .as_any()
786 .downcast_ref::<UInt64Array>()
787 .ok_or_else(|| anyhow!("Invalid _vid column type"))?;
788
789 let labels_col = batch
790 .column_by_name("labels")
791 .ok_or_else(|| anyhow!("Missing labels column"))?
792 .as_any()
793 .downcast_ref::<arrow_array::ListArray>()
794 .ok_or_else(|| anyhow!("Invalid labels column type"))?;
795
796 for row_idx in 0..batch.num_rows() {
797 let vid = Vid::from(vid_col.value(row_idx));
798 let labels_array = labels_col.value(row_idx);
799 let labels_str_array = labels_array
800 .as_any()
801 .downcast_ref::<arrow_array::StringArray>()
802 .ok_or_else(|| anyhow!("Invalid labels array element type"))?;
803
804 let labels: Vec<String> = (0..labels_str_array.len())
805 .map(|i| labels_str_array.value(i).to_string())
806 .collect();
807
808 index.insert(vid, labels);
809 }
810 }
811
812 self.vid_labels_index = Some(Arc::new(parking_lot::RwLock::new(index)));
813 Ok(())
814 }
815
816 pub fn get_labels_from_index(&self, vid: Vid) -> Option<Vec<String>> {
819 self.vid_labels_index.as_ref().and_then(|idx| {
820 let index = idx.read();
821 index.get_labels(vid).map(|labels| labels.to_vec())
822 })
823 }
824
825 pub fn update_vid_labels_index(&self, vid: Vid, labels: Vec<String>) {
828 if let Some(idx) = &self.vid_labels_index {
829 let mut index = idx.write();
830 index.insert(vid, labels);
831 }
832 }
833
834 pub fn remove_from_vid_labels_index(&self, vid: Vid) {
837 if let Some(idx) = &self.vid_labels_index {
838 let mut index = idx.write();
839 index.remove_vid(vid);
840 }
841 }
842
843 pub async fn load_subgraph_cached(
844 &self,
845 start_vids: &[Vid],
846 edge_types: &[u32],
847 max_hops: usize,
848 direction: GraphDirection,
849 _l0: Option<Arc<RwLock<L0Buffer>>>,
850 ) -> Result<WorkingGraph> {
851 let mut graph = WorkingGraph::new();
852
853 let dir = match direction {
854 GraphDirection::Outgoing => crate::storage::direction::Direction::Outgoing,
855 GraphDirection::Incoming => crate::storage::direction::Direction::Incoming,
856 };
857
858 let neighbor_is_dst = matches!(direction, GraphDirection::Outgoing);
859
860 let mut frontier: Vec<Vid> = start_vids.to_vec();
862 let mut visited: HashSet<Vid> = HashSet::new();
863
864 for &vid in start_vids {
866 graph.add_vertex(vid);
867 }
868
869 for _hop in 0..max_hops {
870 let mut next_frontier = HashSet::new();
871
872 for &vid in &frontier {
873 if visited.contains(&vid) {
874 continue;
875 }
876 visited.insert(vid);
877 graph.add_vertex(vid);
878
879 for &etype_id in edge_types {
880 let edge_ver = self.version_high_water_mark();
882 self.adjacency_manager
883 .warm_coalesced(self, etype_id, dir, edge_ver)
884 .await?;
885
886 let edges = self.adjacency_manager.get_neighbors(vid, etype_id, dir);
888
889 for (neighbor_vid, eid) in edges {
890 graph.add_vertex(neighbor_vid);
891 if !visited.contains(&neighbor_vid) {
892 next_frontier.insert(neighbor_vid);
893 }
894
895 if neighbor_is_dst {
896 graph.add_edge(vid, neighbor_vid, eid, etype_id);
897 } else {
898 graph.add_edge(neighbor_vid, vid, eid, etype_id);
899 }
900 }
901 }
902 }
903 frontier = next_frontier.into_iter().collect();
904
905 if frontier.is_empty() {
907 break;
908 }
909 }
910
911 Ok(graph)
912 }
913
914 pub fn snapshot_manager(&self) -> &SnapshotManager {
915 &self.snapshot_manager
916 }
917
918 pub fn index_manager(&self) -> IndexManager {
919 IndexManager::new(
920 &self.base_uri,
921 self.schema_manager.clone(),
922 self.lancedb_store.clone(),
923 )
924 }
925
926 pub fn vertex_dataset(&self, label: &str) -> Result<VertexDataset> {
927 let schema = self.schema_manager.schema();
928 let label_meta = schema
929 .labels
930 .get(label)
931 .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
932 Ok(VertexDataset::new(&self.base_uri, label, label_meta.id))
933 }
934
935 pub fn edge_dataset(
936 &self,
937 edge_type: &str,
938 src_label: &str,
939 dst_label: &str,
940 ) -> Result<EdgeDataset> {
941 Ok(EdgeDataset::new(
942 &self.base_uri,
943 edge_type,
944 src_label,
945 dst_label,
946 ))
947 }
948
949 pub fn delta_dataset(&self, edge_type: &str, direction: &str) -> Result<DeltaDataset> {
950 Ok(DeltaDataset::new(&self.base_uri, edge_type, direction))
951 }
952
953 pub fn adjacency_dataset(
954 &self,
955 edge_type: &str,
956 label: &str,
957 direction: &str,
958 ) -> Result<AdjacencyDataset> {
959 Ok(AdjacencyDataset::new(
960 &self.base_uri,
961 edge_type,
962 label,
963 direction,
964 ))
965 }
966
967 pub fn main_vertex_dataset(&self) -> MainVertexDataset {
972 MainVertexDataset::new(&self.base_uri)
973 }
974
975 pub fn main_edge_dataset(&self) -> MainEdgeDataset {
980 MainEdgeDataset::new(&self.base_uri)
981 }
982
983 pub fn uid_index(&self, label: &str) -> Result<UidIndex> {
984 Ok(UidIndex::new(&self.base_uri, label))
985 }
986
987 pub async fn inverted_index(&self, label: &str, property: &str) -> Result<InvertedIndex> {
988 let schema = self.schema_manager.schema();
989 let config = schema
990 .indexes
991 .iter()
992 .find_map(|idx| match idx {
993 IndexDefinition::Inverted(cfg)
994 if cfg.label == label && cfg.property == property =>
995 {
996 Some(cfg.clone())
997 }
998 _ => None,
999 })
1000 .ok_or_else(|| anyhow!("Inverted index not found for {}.{}", label, property))?;
1001
1002 InvertedIndex::new(&self.base_uri, config).await
1003 }
1004
1005 pub async fn vector_search(
1006 &self,
1007 label: &str,
1008 property: &str,
1009 query: &[f32],
1010 k: usize,
1011 filter: Option<&str>,
1012 ctx: Option<&QueryContext>,
1013 ) -> Result<Vec<(Vid, f32)>> {
1014 let schema = self.schema_manager.schema();
1016 let metric = schema
1017 .vector_index_for_property(label, property)
1018 .map(|config| config.metric.clone())
1019 .unwrap_or(DistanceMetric::L2);
1020
1021 let table = self.get_cached_table(label).await.ok();
1024
1025 let mut results = Vec::new();
1026
1027 if let Some(table) = table {
1028 let distance_type = match &metric {
1029 DistanceMetric::L2 => lancedb::DistanceType::L2,
1030 DistanceMetric::Cosine => lancedb::DistanceType::Cosine,
1031 DistanceMetric::Dot => lancedb::DistanceType::Dot,
1032 _ => lancedb::DistanceType::L2,
1033 };
1034
1035 let mut query_builder = table
1037 .vector_search(query.to_vec())
1038 .map_err(|e| anyhow!("Failed to create vector search: {}", e))?
1039 .column(property)
1040 .distance_type(distance_type)
1041 .limit(k);
1042
1043 query_builder = query_builder.only_if(Self::build_active_filter(filter));
1044
1045 if ctx.is_some()
1047 && let Some(hwm) = self.version_high_water_mark()
1048 {
1049 query_builder = query_builder.only_if(format!("_version <= {}", hwm));
1050 }
1051
1052 let batches = query_builder
1053 .execute()
1054 .await
1055 .map_err(|e| anyhow!("Vector search execution failed: {}", e))?
1056 .try_collect::<Vec<_>>()
1057 .await
1058 .map_err(|e| anyhow!("Failed to collect vector search results: {}", e))?;
1059
1060 results = extract_vid_score_pairs(&batches, "_vid", "_distance")?;
1061 }
1062
1063 if let Some(qctx) = ctx {
1065 merge_l0_into_vector_results(&mut results, qctx, label, property, query, k, &metric);
1066 }
1067
1068 Ok(results)
1069 }
1070
1071 pub async fn fts_search(
1087 &self,
1088 label: &str,
1089 property: &str,
1090 query: &str,
1091 k: usize,
1092 filter: Option<&str>,
1093 ctx: Option<&QueryContext>,
1094 ) -> Result<Vec<(Vid, f32)>> {
1095 use lance_index::scalar::FullTextSearchQuery;
1096 use lance_index::scalar::inverted::query::MatchQuery;
1097
1098 let table = self.get_cached_table(label).await.ok();
1101
1102 let mut results = if let Some(table) = table {
1103 let match_query =
1105 MatchQuery::new(query.to_string()).with_column(Some(property.to_string()));
1106 let fts_query = FullTextSearchQuery {
1107 query: match_query.into(),
1108 limit: Some(k as i64),
1109 wand_factor: None,
1110 };
1111
1112 let mut query_builder = table.query().full_text_search(fts_query).limit(k);
1113
1114 query_builder = query_builder.only_if(Self::build_active_filter(filter));
1115
1116 if ctx.is_some()
1118 && let Some(hwm) = self.version_high_water_mark()
1119 {
1120 query_builder = query_builder.only_if(format!("_version <= {}", hwm));
1121 }
1122
1123 let batches = query_builder
1124 .execute()
1125 .await
1126 .map_err(|e| anyhow!("FTS search execution failed: {}", e))?
1127 .try_collect::<Vec<_>>()
1128 .await
1129 .map_err(|e| anyhow!("Failed to collect FTS search results: {}", e))?;
1130
1131 let mut lance_results = extract_vid_score_pairs(&batches, "_vid", "_score")?;
1132 lance_results
1134 .sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
1135 lance_results
1136 } else {
1137 Vec::new()
1138 };
1139
1140 if let Some(qctx) = ctx {
1142 merge_l0_into_fts_results(&mut results, qctx, label, property, query, k);
1143 }
1144
1145 Ok(results)
1146 }
1147
1148 pub async fn get_vertex_by_uid(&self, uid: &UniId, label: &str) -> Result<Option<Vid>> {
1149 let index = self.uid_index(label)?;
1150 index.get_vid(uid).await
1151 }
1152
1153 pub async fn insert_vertex_with_uid(&self, label: &str, vid: Vid, uid: UniId) -> Result<()> {
1154 let index = self.uid_index(label)?;
1155 index.write_mapping(&[(uid, vid)]).await
1156 }
1157
1158 pub async fn load_subgraph(
1159 &self,
1160 start_vids: &[Vid],
1161 edge_types: &[u32],
1162 max_hops: usize,
1163 direction: GraphDirection,
1164 l0: Option<&L0Buffer>,
1165 ) -> Result<WorkingGraph> {
1166 let mut graph = WorkingGraph::new();
1167 let schema = self.schema_manager.schema();
1168
1169 let label_map: HashMap<u16, String> = schema
1171 .labels
1172 .values()
1173 .map(|meta| {
1174 (
1175 meta.id,
1176 schema.label_name_by_id(meta.id).unwrap().to_owned(),
1177 )
1178 })
1179 .collect();
1180
1181 let edge_type_map: HashMap<u32, String> = schema
1182 .edge_types
1183 .values()
1184 .map(|meta| {
1185 (
1186 meta.id,
1187 schema.edge_type_name_by_id(meta.id).unwrap().to_owned(),
1188 )
1189 })
1190 .collect();
1191
1192 let target_edge_types: HashSet<u32> = edge_types.iter().cloned().collect();
1193
1194 let mut frontier: Vec<Vid> = start_vids.to_vec();
1196 let mut visited: HashSet<Vid> = HashSet::new();
1197
1198 for &vid in start_vids {
1200 graph.add_vertex(vid);
1201 }
1202
1203 for _hop in 0..max_hops {
1204 let mut next_frontier = HashSet::new();
1205
1206 for &vid in &frontier {
1207 if visited.contains(&vid) {
1208 continue;
1209 }
1210 visited.insert(vid);
1211 graph.add_vertex(vid);
1212
1213 for &etype_id in &target_edge_types {
1215 let etype_name = edge_type_map
1216 .get(&etype_id)
1217 .ok_or_else(|| anyhow!("Unknown edge type ID: {}", etype_id))?;
1218
1219 let (dir_str, neighbor_is_dst) = match direction {
1223 GraphDirection::Outgoing => ("fwd", true),
1224 GraphDirection::Incoming => ("bwd", false),
1225 };
1226
1227 let mut edges: HashMap<Eid, EdgeState> = HashMap::new();
1228
1229 let _edge_ver = self
1234 .pinned_snapshot
1235 .as_ref()
1236 .and_then(|s| s.edges.get(etype_name).map(|es| es.lance_version));
1237
1238 let lancedb_store = self.lancedb_store();
1240 for current_src_label in label_map.values() {
1241 let adj_ds =
1242 match self.adjacency_dataset(etype_name, current_src_label, dir_str) {
1243 Ok(ds) => ds,
1244 Err(_) => continue,
1245 };
1246 if let Some((neighbors, eids)) =
1247 adj_ds.read_adjacency_lancedb(lancedb_store, vid).await?
1248 {
1249 for (n, eid) in neighbors.into_iter().zip(eids) {
1250 edges.insert(
1251 eid,
1252 EdgeState {
1253 neighbor: n,
1254 version: 0,
1255 deleted: false,
1256 },
1257 );
1258 }
1259 break; }
1261 }
1262
1263 let delta_ds = self.delta_dataset(etype_name, dir_str)?;
1265 let delta_entries = delta_ds
1266 .read_deltas_lancedb(
1267 lancedb_store,
1268 vid,
1269 &schema,
1270 self.version_high_water_mark(),
1271 )
1272 .await?;
1273 Self::apply_delta_to_edges(&mut edges, delta_entries, neighbor_is_dst);
1274
1275 if let Some(l0) = l0 {
1277 Self::apply_l0_to_edges(&mut edges, l0, vid, etype_id, direction);
1278 }
1279
1280 Self::add_edges_to_graph(
1282 &mut graph,
1283 edges,
1284 vid,
1285 etype_id,
1286 neighbor_is_dst,
1287 &visited,
1288 &mut next_frontier,
1289 );
1290 }
1291 }
1292 frontier = next_frontier.into_iter().collect();
1293
1294 if frontier.is_empty() {
1296 break;
1297 }
1298 }
1299
1300 Ok(graph)
1301 }
1302
1303 fn apply_delta_to_edges(
1305 edges: &mut HashMap<Eid, EdgeState>,
1306 delta_entries: Vec<crate::storage::delta::L1Entry>,
1307 neighbor_is_dst: bool,
1308 ) {
1309 for entry in delta_entries {
1310 let neighbor = if neighbor_is_dst {
1311 entry.dst_vid
1312 } else {
1313 entry.src_vid
1314 };
1315 let current_ver = edges.get(&entry.eid).map(|s| s.version).unwrap_or(0);
1316
1317 if entry.version > current_ver {
1318 edges.insert(
1319 entry.eid,
1320 EdgeState {
1321 neighbor,
1322 version: entry.version,
1323 deleted: matches!(entry.op, Op::Delete),
1324 },
1325 );
1326 }
1327 }
1328 }
1329
1330 fn apply_l0_to_edges(
1332 edges: &mut HashMap<Eid, EdgeState>,
1333 l0: &L0Buffer,
1334 vid: Vid,
1335 etype_id: u32,
1336 direction: GraphDirection,
1337 ) {
1338 let l0_neighbors = l0.get_neighbors(vid, etype_id, direction);
1339 for (neighbor, eid, ver) in l0_neighbors {
1340 let current_ver = edges.get(&eid).map(|s| s.version).unwrap_or(0);
1341 if ver > current_ver {
1342 edges.insert(
1343 eid,
1344 EdgeState {
1345 neighbor,
1346 version: ver,
1347 deleted: false,
1348 },
1349 );
1350 }
1351 }
1352
1353 for (eid, state) in edges.iter_mut() {
1355 if l0.is_tombstoned(*eid) {
1356 state.deleted = true;
1357 }
1358 }
1359 }
1360
1361 fn add_edges_to_graph(
1363 graph: &mut WorkingGraph,
1364 edges: HashMap<Eid, EdgeState>,
1365 vid: Vid,
1366 etype_id: u32,
1367 neighbor_is_dst: bool,
1368 visited: &HashSet<Vid>,
1369 next_frontier: &mut HashSet<Vid>,
1370 ) {
1371 for (eid, state) in edges {
1372 if state.deleted {
1373 continue;
1374 }
1375 graph.add_vertex(state.neighbor);
1376
1377 if !visited.contains(&state.neighbor) {
1378 next_frontier.insert(state.neighbor);
1379 }
1380
1381 if neighbor_is_dst {
1382 graph.add_edge(vid, state.neighbor, eid, etype_id);
1383 } else {
1384 graph.add_edge(state.neighbor, vid, eid, etype_id);
1385 }
1386 }
1387 }
1388}
1389
1390fn extract_vid_score_pairs(
1392 batches: &[arrow_array::RecordBatch],
1393 vid_column: &str,
1394 score_column: &str,
1395) -> Result<Vec<(Vid, f32)>> {
1396 let mut results = Vec::new();
1397 for batch in batches {
1398 let vid_col = batch
1399 .column_by_name(vid_column)
1400 .ok_or_else(|| anyhow!("Missing {} column", vid_column))?
1401 .as_any()
1402 .downcast_ref::<UInt64Array>()
1403 .ok_or_else(|| anyhow!("Invalid {} column type", vid_column))?;
1404
1405 let score_col = batch
1406 .column_by_name(score_column)
1407 .ok_or_else(|| anyhow!("Missing {} column", score_column))?
1408 .as_any()
1409 .downcast_ref::<Float32Array>()
1410 .ok_or_else(|| anyhow!("Invalid {} column type", score_column))?;
1411
1412 for i in 0..batch.num_rows() {
1413 results.push((Vid::from(vid_col.value(i)), score_col.value(i)));
1414 }
1415 }
1416 Ok(results)
1417}
1418
1419fn extract_embedding_from_props(
1424 props: &uni_common::Properties,
1425 property: &str,
1426) -> Option<Vec<f32>> {
1427 let arr = props.get(property)?.as_array()?;
1428 arr.iter().map(|v| v.as_f64().map(|f| f as f32)).collect()
1429}
1430
1431fn merge_l0_into_vector_results(
1441 results: &mut Vec<(Vid, f32)>,
1442 ctx: &QueryContext,
1443 label: &str,
1444 property: &str,
1445 query: &[f32],
1446 k: usize,
1447 metric: &DistanceMetric,
1448) {
1449 let mut buffers: Vec<Arc<parking_lot::RwLock<L0Buffer>>> =
1451 ctx.pending_flush_l0s.iter().map(Arc::clone).collect();
1452 buffers.push(Arc::clone(&ctx.l0));
1453 if let Some(ref txn) = ctx.transaction_l0 {
1454 buffers.push(Arc::clone(txn));
1455 }
1456
1457 let mut l0_candidates: HashMap<Vid, f32> = HashMap::new();
1459 let mut tombstoned: HashSet<Vid> = HashSet::new();
1461
1462 for buf_arc in &buffers {
1463 let buf = buf_arc.read();
1464
1465 for &vid in &buf.vertex_tombstones {
1467 tombstoned.insert(vid);
1468 }
1469
1470 for (&vid, labels) in &buf.vertex_labels {
1472 if !labels.iter().any(|l| l == label) {
1473 continue;
1474 }
1475 if let Some(props) = buf.vertex_properties.get(&vid)
1476 && let Some(emb) = extract_embedding_from_props(props, property)
1477 {
1478 if emb.len() != query.len() {
1479 continue; }
1481 let dist = metric.compute_distance(&emb, query);
1482 l0_candidates.insert(vid, dist);
1484 tombstoned.remove(&vid);
1486 }
1487 }
1488 }
1489
1490 if l0_candidates.is_empty() && tombstoned.is_empty() {
1492 return;
1493 }
1494
1495 results.retain(|(vid, _)| !tombstoned.contains(vid));
1497
1498 for (vid, dist) in &l0_candidates {
1500 if let Some(existing) = results.iter_mut().find(|(v, _)| v == vid) {
1501 existing.1 = *dist;
1502 } else {
1503 results.push((*vid, *dist));
1504 }
1505 }
1506
1507 results.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
1509 results.truncate(k);
1510}
1511
1512fn compute_text_relevance(query: &str, text: &str) -> f32 {
1517 let query_tokens: HashSet<String> =
1518 query.split_whitespace().map(|t| t.to_lowercase()).collect();
1519 if query_tokens.is_empty() {
1520 return 0.0;
1521 }
1522 let text_tokens: HashSet<String> = text.split_whitespace().map(|t| t.to_lowercase()).collect();
1523 let hits = query_tokens
1524 .iter()
1525 .filter(|t| text_tokens.contains(t.as_str()))
1526 .count();
1527 hits as f32 / query_tokens.len() as f32
1528}
1529
1530fn extract_text_from_props<'a>(
1532 props: &'a uni_common::Properties,
1533 property: &str,
1534) -> Option<&'a str> {
1535 props.get(property)?.as_str()
1536}
1537
1538fn merge_l0_into_fts_results(
1548 results: &mut Vec<(Vid, f32)>,
1549 ctx: &QueryContext,
1550 label: &str,
1551 property: &str,
1552 query: &str,
1553 k: usize,
1554) {
1555 let mut buffers: Vec<Arc<parking_lot::RwLock<L0Buffer>>> =
1557 ctx.pending_flush_l0s.iter().map(Arc::clone).collect();
1558 buffers.push(Arc::clone(&ctx.l0));
1559 if let Some(ref txn) = ctx.transaction_l0 {
1560 buffers.push(Arc::clone(txn));
1561 }
1562
1563 let mut l0_candidates: HashMap<Vid, f32> = HashMap::new();
1565 let mut tombstoned: HashSet<Vid> = HashSet::new();
1567
1568 for buf_arc in &buffers {
1569 let buf = buf_arc.read();
1570
1571 for &vid in &buf.vertex_tombstones {
1573 tombstoned.insert(vid);
1574 }
1575
1576 for (&vid, labels) in &buf.vertex_labels {
1578 if !labels.iter().any(|l| l == label) {
1579 continue;
1580 }
1581 if let Some(props) = buf.vertex_properties.get(&vid)
1582 && let Some(text) = extract_text_from_props(props, property)
1583 {
1584 let score = compute_text_relevance(query, text);
1585 if score > 0.0 {
1586 l0_candidates.insert(vid, score);
1588 }
1589 tombstoned.remove(&vid);
1591 }
1592 }
1593 }
1594
1595 if l0_candidates.is_empty() && tombstoned.is_empty() {
1597 return;
1598 }
1599
1600 results.retain(|(vid, _)| !tombstoned.contains(vid));
1602
1603 for (vid, score) in &l0_candidates {
1605 if let Some(existing) = results.iter_mut().find(|(v, _)| v == vid) {
1606 existing.1 = *score;
1607 } else {
1608 results.push((*vid, *score));
1609 }
1610 }
1611
1612 results.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
1614 results.truncate(k);
1615}