1use crate::backend::StorageBackend;
5#[cfg(feature = "lance-backend")]
6use crate::backend::lance::LanceDbBackend;
7use crate::backend::table_names;
8use crate::backend::types::{ScanRequest, VectorQueryOpts};
9use crate::compaction::{CompactionStats, CompactionStatus, CompactionTask};
10use crate::runtime::WorkingGraph;
11use crate::runtime::context::QueryContext;
12use crate::runtime::l0::L0Buffer;
13use crate::storage::adjacency::AdjacencyDataset;
14use crate::storage::compaction::Compactor;
15use crate::storage::delta::{DeltaDataset, ENTRY_SIZE_ESTIMATE, Op};
16use crate::storage::direction::Direction;
17#[cfg(feature = "lance-backend")]
18use crate::storage::edge::EdgeDataset;
19#[cfg(feature = "lance-backend")]
20use crate::storage::index::UidIndex;
21#[cfg(feature = "lance-backend")]
22use crate::storage::inverted_index::InvertedIndex;
23use crate::storage::main_edge::MainEdgeDataset;
24use crate::storage::main_vertex::MainVertexDataset;
25use crate::storage::vertex::VertexDataset;
26use anyhow::{Result, anyhow};
27use arrow_array::{Array, Float32Array, TimestampNanosecondArray, UInt64Array};
28use object_store::ObjectStore;
29#[cfg(feature = "lance-backend")]
30use object_store::local::LocalFileSystem;
31use parking_lot::RwLock;
32use std::collections::{HashMap, HashSet};
33use std::sync::{Arc, Mutex};
34use std::time::{Duration, SystemTime, UNIX_EPOCH};
35use tracing::warn;
36use uni_common::config::UniConfig;
37#[cfg(feature = "lance-backend")]
38use uni_common::core::id::UniId;
39use uni_common::core::id::{Eid, Vid};
40#[cfg(feature = "lance-backend")]
41use uni_common::core::schema::IndexDefinition;
42use uni_common::core::schema::{DistanceMetric, SchemaManager};
43use uni_common::sync::acquire_mutex;
44
45use crate::snapshot::manager::SnapshotManager;
46use crate::storage::IndexManager;
47use crate::storage::adjacency_manager::AdjacencyManager;
48use crate::storage::resilient_store::ResilientObjectStore;
49
50use uni_common::core::snapshot::SnapshotManifest;
51
52use uni_common::graph::simple_graph::Direction as GraphDirection;
53
54struct EdgeState {
56 neighbor: Vid,
57 version: u64,
58 deleted: bool,
59}
60
61pub struct StorageManager {
62 base_uri: String,
63 store: Arc<dyn ObjectStore>,
64 schema_manager: Arc<SchemaManager>,
65 snapshot_manager: Arc<SnapshotManager>,
66 adjacency_manager: Arc<AdjacencyManager>,
67 pub config: UniConfig,
68 pub compaction_status: Arc<Mutex<CompactionStatus>>,
69 pub flush_in_progress: std::sync::atomic::AtomicUsize,
74 pinned_snapshot: Option<SnapshotManifest>,
76 pinned_version_hwm: Option<u64>,
85 fork_scope: Option<Arc<crate::fork::ForkScope>>,
93 backend: Arc<dyn StorageBackend>,
95 vid_labels_index: Arc<parking_lot::RwLock<crate::storage::vid_labels::VidLabelsIndex>>,
103}
104
105pub struct FlushInProgressGuard {
114 storage: Arc<StorageManager>,
115}
116
117impl FlushInProgressGuard {
118 pub fn new(storage: &Arc<StorageManager>) -> Self {
119 storage
120 .flush_in_progress
121 .fetch_add(1, std::sync::atomic::Ordering::AcqRel);
122 Self {
123 storage: storage.clone(),
124 }
125 }
126}
127
128impl Drop for FlushInProgressGuard {
129 fn drop(&mut self) {
130 self.storage
132 .flush_in_progress
133 .fetch_sub(1, std::sync::atomic::Ordering::AcqRel);
134 }
135}
136
137fn is_lance_conflict(err: &anyhow::Error) -> bool {
143 let msg = err.to_string();
144 msg.contains("Incompatible transaction") || msg.contains("conflict")
145}
146
147async fn retry_on_lance_conflict<F, Fut>(mut op: F) -> anyhow::Result<()>
152where
153 F: FnMut() -> Fut,
154 Fut: std::future::Future<Output = anyhow::Result<()>>,
155{
156 for attempt in 0u32..10 {
157 match op().await {
158 Ok(()) => return Ok(()),
159 Err(e) => {
160 if !is_lance_conflict(&e) || attempt == 9 {
161 return Err(e);
162 }
163 let backoff_ms = 1u64 << attempt;
164 tokio::time::sleep(std::time::Duration::from_millis(backoff_ms)).await;
165 }
166 }
167 }
168 unreachable!("retry loop exits via Ok or Err")
169}
170
171pub async fn merge_insert_batch_with_lance_conflict_retry(
180 backend: &dyn crate::backend::StorageBackend,
181 table_name: &str,
182 batch: arrow_array::RecordBatch,
183 on: &[&str],
184) -> anyhow::Result<()> {
185 retry_on_lance_conflict(|| async {
186 let exists = backend.table_exists(table_name).await?;
187 if !exists {
188 anyhow::bail!(
189 "merge_insert target table '{}' does not exist (partial writes \
190 require the row to already be present; CREATE goes through Append)",
191 table_name
192 );
193 }
194 backend
195 .merge_insert(table_name, on, vec![batch.clone()])
196 .await
197 })
198 .await
199}
200
201pub async fn write_batch_with_lance_conflict_retry(
210 backend: &dyn crate::backend::StorageBackend,
211 table_name: &str,
212 batch: arrow_array::RecordBatch,
213) -> anyhow::Result<()> {
214 use crate::backend::types::WriteMode;
215 retry_on_lance_conflict(|| async {
216 let exists = backend.table_exists(table_name).await?;
217 if exists {
218 backend
219 .write(table_name, vec![batch.clone()], WriteMode::Append)
220 .await
221 } else {
222 backend.create_table(table_name, vec![batch.clone()]).await
223 }
224 })
225 .await
226}
227
228struct CompactionGuard {
230 status: Arc<Mutex<CompactionStatus>>,
231}
232
233impl CompactionGuard {
234 fn new(status: Arc<Mutex<CompactionStatus>>) -> Option<Self> {
235 let mut s = acquire_mutex(&status, "compaction_status").ok()?;
236 if s.compaction_in_progress {
237 return None;
238 }
239 s.compaction_in_progress = true;
240 Some(Self {
241 status: status.clone(),
242 })
243 }
244}
245
246impl Drop for CompactionGuard {
247 fn drop(&mut self) {
248 match uni_common::sync::acquire_mutex(&self.status, "compaction_status") {
251 Ok(mut s) => {
252 s.compaction_in_progress = false;
253 s.last_compaction = Some(std::time::SystemTime::now());
254 }
255 Err(e) => {
256 log::error!(
260 "CompactionGuard drop failed to acquire poisoned lock: {}. \
261 Compaction status may be inconsistent. Issue #18/#150",
262 e
263 );
264 }
265 }
266 }
267}
268
269impl StorageManager {
270 pub async fn new_with_backend(
272 base_uri: &str,
273 store: Arc<dyn ObjectStore>,
274 backend: Arc<dyn StorageBackend>,
275 schema_manager: Arc<SchemaManager>,
276 config: UniConfig,
277 ) -> Result<Self> {
278 let resilient_store: Arc<dyn ObjectStore> = Arc::new(ResilientObjectStore::new(
279 store,
280 config.object_store.clone(),
281 ));
282
283 let snapshot_manager = Arc::new(SnapshotManager::new(resilient_store.clone()));
284
285 Self::recover_all_staging_tables(backend.as_ref(), &schema_manager).await?;
287
288 let mut sm = Self {
289 base_uri: base_uri.to_string(),
290 store: resilient_store,
291 schema_manager,
292 snapshot_manager,
293 adjacency_manager: Arc::new(AdjacencyManager::new(config.cache_size)),
294 config,
295 compaction_status: Arc::new(Mutex::new(CompactionStatus::default())),
296 flush_in_progress: std::sync::atomic::AtomicUsize::new(0),
297 pinned_snapshot: None,
298 pinned_version_hwm: None,
299 fork_scope: None,
300 backend,
301 vid_labels_index: Arc::new(parking_lot::RwLock::new(
302 crate::storage::vid_labels::VidLabelsIndex::new(),
303 )),
304 };
305
306 if let Err(e) = sm.rebuild_vid_labels_index().await {
310 warn!(
311 "Failed to rebuild VidLabelsIndex on startup: {}. Falling back to storage queries.",
312 e
313 );
314 }
315
316 Ok(sm)
317 }
318
319 #[cfg(feature = "lance-backend")]
321 pub async fn new(base_uri: &str, schema_manager: Arc<SchemaManager>) -> Result<Self> {
322 Self::new_with_config(base_uri, schema_manager, UniConfig::default()).await
323 }
324
325 #[cfg(feature = "lance-backend")]
327 pub async fn new_with_cache(
328 base_uri: &str,
329 schema_manager: Arc<SchemaManager>,
330 adjacency_cache_size: usize,
331 ) -> Result<Self> {
332 let config = UniConfig {
333 cache_size: adjacency_cache_size,
334 ..Default::default()
335 };
336 Self::new_with_config(base_uri, schema_manager, config).await
337 }
338
339 #[cfg(feature = "lance-backend")]
341 pub async fn new_with_config(
342 base_uri: &str,
343 schema_manager: Arc<SchemaManager>,
344 config: UniConfig,
345 ) -> Result<Self> {
346 let store = Self::build_store_from_uri(base_uri)?;
347 Self::new_with_store_and_config(base_uri, store, schema_manager, config).await
348 }
349
350 #[cfg(feature = "lance-backend")]
352 pub async fn new_with_store_and_config(
353 base_uri: &str,
354 store: Arc<dyn ObjectStore>,
355 schema_manager: Arc<SchemaManager>,
356 config: UniConfig,
357 ) -> Result<Self> {
358 Self::new_with_store_and_storage_options(base_uri, store, schema_manager, config, None)
359 .await
360 }
361
362 #[cfg(feature = "lance-backend")]
364 pub async fn new_with_store_and_storage_options(
365 base_uri: &str,
366 store: Arc<dyn ObjectStore>,
367 schema_manager: Arc<SchemaManager>,
368 config: UniConfig,
369 lancedb_storage_options: Option<HashMap<String, String>>,
370 ) -> Result<Self> {
371 let backend = Arc::new(LanceDbBackend::connect(base_uri, lancedb_storage_options).await?);
372 Self::new_with_backend(base_uri, store, backend, schema_manager, config).await
373 }
374
375 async fn recover_all_staging_tables(
380 backend: &dyn StorageBackend,
381 schema_manager: &SchemaManager,
382 ) -> Result<()> {
383 let schema = schema_manager.schema();
384
385 backend
387 .recover_staging(table_names::main_vertex_table_name())
388 .await?;
389 backend
390 .recover_staging(table_names::main_edge_table_name())
391 .await?;
392
393 for label in schema.labels.keys() {
395 let name = table_names::vertex_table_name(label);
396 backend.recover_staging(&name).await?;
397 }
398
399 for edge_type in schema.edge_types.keys() {
401 for direction in &["fwd", "bwd"] {
402 let delta_name = table_names::delta_table_name(edge_type, direction);
404 backend.recover_staging(&delta_name).await?;
405
406 for _label in schema.labels.keys() {
408 let adj_name = table_names::adjacency_table_name(edge_type, direction);
409 backend.recover_staging(&adj_name).await?;
410 }
411 }
412 }
413
414 Ok(())
415 }
416
417 #[cfg(feature = "lance-backend")]
418 fn build_store_from_uri(base_uri: &str) -> Result<Arc<dyn ObjectStore>> {
419 if base_uri.contains("://") {
420 let parsed = url::Url::parse(base_uri).map_err(|e| anyhow!("Invalid base URI: {e}"))?;
421 let (store, _path) = object_store::parse_url(&parsed)
422 .map_err(|e| anyhow!("Failed to parse object store URL: {e}"))?;
423 Ok(Arc::from(store))
424 } else {
425 std::fs::create_dir_all(base_uri)?;
427 Ok(Arc::new(LocalFileSystem::new_with_prefix(base_uri)?))
428 }
429 }
430
431 pub fn local_fs_root(&self) -> Option<std::path::PathBuf> {
437 if self.base_uri.contains("://") {
438 None
439 } else {
440 Some(std::path::PathBuf::from(&self.base_uri))
441 }
442 }
443
444 pub fn pinned(&self, snapshot: SnapshotManifest) -> Self {
445 Self {
453 base_uri: self.base_uri.clone(),
454 store: self.store.clone(),
455 schema_manager: self.schema_manager.clone(),
456 snapshot_manager: self.snapshot_manager.clone(),
457 adjacency_manager: Arc::new(AdjacencyManager::new(self.adjacency_manager.max_bytes())),
461 config: self.config.clone(),
462 compaction_status: Arc::new(Mutex::new(CompactionStatus::default())),
463 flush_in_progress: std::sync::atomic::AtomicUsize::new(0),
464 pinned_snapshot: Some(snapshot),
465 pinned_version_hwm: None,
466 fork_scope: self.fork_scope.clone(),
467 backend: self.backend.clone(),
468 vid_labels_index: Arc::new(parking_lot::RwLock::new(
474 self.vid_labels_index.read().clone(),
475 )),
476 }
477 }
478
479 pub fn pinned_at_version(&self, hwm: u64) -> Self {
498 Self {
499 base_uri: self.base_uri.clone(),
500 store: self.store.clone(),
501 schema_manager: self.schema_manager.clone(),
502 snapshot_manager: self.snapshot_manager.clone(),
503 adjacency_manager: self.adjacency_manager.clone(),
504 config: self.config.clone(),
505 compaction_status: Arc::new(Mutex::new(CompactionStatus::default())),
506 flush_in_progress: std::sync::atomic::AtomicUsize::new(0),
507 pinned_snapshot: None,
508 pinned_version_hwm: Some(hwm),
509 fork_scope: self.fork_scope.clone(),
510 backend: self.backend.clone(),
511 vid_labels_index: Arc::new(parking_lot::RwLock::new(
517 self.vid_labels_index.read().clone(),
518 )),
519 }
520 }
521
522 pub fn at_fork(&self, scope: Arc<crate::fork::ForkScope>) -> Self {
537 self.at_fork_with_schema(scope, self.schema_manager.clone())
538 }
539
540 pub fn at_fork_with_schema(
548 &self,
549 scope: Arc<crate::fork::ForkScope>,
550 merged_schema: Arc<SchemaManager>,
551 ) -> Self {
552 debug_assert!(
553 self.pinned_snapshot.is_none(),
554 "forking a pinned StorageManager is unsupported in Phase 1"
555 );
556 let branched_backend: Arc<dyn StorageBackend> = Arc::new(
557 crate::backend::branched::BranchedBackend::new(self.backend.clone(), scope.clone()),
558 );
559 let snapshot_manager = Arc::new(SnapshotManager::new_for_fork(
564 self.store.clone(),
565 scope.fork_id(),
566 ));
567 Self {
568 base_uri: self.base_uri.clone(),
569 store: self.store.clone(),
570 schema_manager: merged_schema,
571 snapshot_manager,
572 adjacency_manager: Arc::new(AdjacencyManager::new(self.adjacency_manager.max_bytes())),
573 config: self.config.clone(),
574 compaction_status: Arc::new(Mutex::new(CompactionStatus::default())),
575 flush_in_progress: std::sync::atomic::AtomicUsize::new(0),
576 pinned_snapshot: None,
577 pinned_version_hwm: None,
578 fork_scope: Some(scope),
579 backend: branched_backend,
580 vid_labels_index: Arc::new(parking_lot::RwLock::new(
586 self.vid_labels_index.read().clone(),
587 )),
588 }
589 }
590
591 pub fn fork_scope(&self) -> Option<&Arc<crate::fork::ForkScope>> {
593 self.fork_scope.as_ref()
594 }
595
596 #[must_use]
607 pub fn fork_index_exists(
608 &self,
609 label: &str,
610 column: &str,
611 ) -> Option<crate::fork::ForkLocalIndexKind> {
612 self.fork_scope
613 .as_ref()
614 .and_then(|s| s.fork_local_index(label, column))
615 }
616
617 pub fn base_uri(&self) -> &str {
620 &self.base_uri
621 }
622
623 pub fn get_edge_version_by_id(&self, edge_type_id: u32) -> Option<u64> {
624 let schema = self.schema_manager.schema();
625 let name = schema.edge_type_name_by_id(edge_type_id)?;
626 self.pinned_snapshot
627 .as_ref()
628 .and_then(|s| s.edges.get(name).map(|es| es.lance_version))
629 .filter(|v| *v != 0)
634 }
635
636 pub fn version_high_water_mark(&self) -> Option<u64> {
644 self.pinned_snapshot
645 .as_ref()
646 .map(|s| s.version_high_water_mark)
647 .or(self.pinned_version_hwm)
648 }
649
650 pub fn snapshot_version_hwm(&self) -> Option<u64> {
662 self.pinned_snapshot
663 .as_ref()
664 .map(|s| s.version_high_water_mark)
665 }
666
667 pub fn apply_version_filter(&self, base_filter: String) -> String {
672 if let Some(hwm) = self.version_high_water_mark() {
673 format!("({}) AND (_version <= {})", base_filter, hwm)
674 } else {
675 base_filter
676 }
677 }
678
679 fn build_active_filter(user_filter: Option<&str>) -> String {
682 match user_filter {
683 Some(expr) => format!("({}) AND (_deleted = false)", expr),
684 None => "_deleted = false".to_string(),
685 }
686 }
687
688 pub fn store(&self) -> Arc<dyn ObjectStore> {
689 self.store.clone()
690 }
691
692 pub fn compaction_status(
698 &self,
699 ) -> Result<CompactionStatus, uni_common::sync::LockPoisonedError> {
700 let guard = uni_common::sync::acquire_mutex(&self.compaction_status, "compaction_status")?;
701 Ok(guard.clone())
702 }
703
704 pub async fn compact(&self) -> Result<CompactionStats> {
705 let start = std::time::Instant::now();
707 let schema = self.schema_manager.schema();
708 let mut files_compacted = 0;
709
710 for label in schema.labels.keys() {
711 let name = table_names::vertex_table_name(label);
712 if self.backend.table_exists(&name).await? {
713 self.backend.optimize_table(&name).await?;
714 files_compacted += 1;
715 self.backend.invalidate_cache(&name);
716 }
717 }
718
719 Ok(CompactionStats {
720 files_compacted,
721 bytes_before: 0,
722 bytes_after: 0,
723 duration: start.elapsed(),
724 crdt_merges: 0,
725 })
726 }
727
728 pub async fn compact_label(&self, label: &str) -> Result<CompactionStats> {
729 let _guard = CompactionGuard::new(self.compaction_status.clone())
730 .ok_or_else(|| anyhow!("Compaction already in progress"))?;
731
732 let start = std::time::Instant::now();
733 let name = table_names::vertex_table_name(label);
734
735 if self.backend.table_exists(&name).await? {
736 self.backend.optimize_table(&name).await?;
737 self.backend.invalidate_cache(&name);
738 }
739
740 Ok(CompactionStats {
741 files_compacted: 1,
742 bytes_before: 0,
743 bytes_after: 0,
744 duration: start.elapsed(),
745 crdt_merges: 0,
746 })
747 }
748
749 pub async fn compact_edge_type(&self, edge_type: &str) -> Result<CompactionStats> {
750 let _guard = CompactionGuard::new(self.compaction_status.clone())
751 .ok_or_else(|| anyhow!("Compaction already in progress"))?;
752
753 let start = std::time::Instant::now();
754 let mut files_compacted = 0;
755
756 for dir in ["fwd", "bwd"] {
757 let name = table_names::delta_table_name(edge_type, dir);
758 if self.backend.table_exists(&name).await? {
759 self.backend.optimize_table(&name).await?;
760 files_compacted += 1;
761 }
762 }
763
764 Ok(CompactionStats {
765 files_compacted,
766 bytes_before: 0,
767 bytes_after: 0,
768 duration: start.elapsed(),
769 crdt_merges: 0,
770 })
771 }
772
773 pub async fn wait_for_compaction(&self) -> Result<()> {
774 loop {
775 let in_progress = {
776 acquire_mutex(&self.compaction_status, "compaction_status")?.compaction_in_progress
777 };
778 if !in_progress {
779 return Ok(());
780 }
781 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
782 }
783 }
784
785 pub fn start_background_compaction(
786 self: Arc<Self>,
787 mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
788 ) -> tokio::task::JoinHandle<()> {
789 if !self.config.compaction.enabled {
790 return tokio::spawn(async {});
791 }
792
793 tokio::spawn(async move {
794 let start = tokio::time::Instant::now() + self.config.compaction.check_interval;
800 let mut interval =
801 tokio::time::interval_at(start, self.config.compaction.check_interval);
802
803 loop {
804 tokio::select! {
805 _ = interval.tick() => {
806 if let Err(e) = self.update_compaction_status().await {
807 log::error!("Failed to update compaction status: {}", e);
808 continue;
809 }
810
811 if let Some(task) = self.pick_compaction_task() {
812 log::info!("Triggering background compaction: {:?}", task);
813 if let Err(e) = Self::execute_compaction(Arc::clone(&self), task).await {
814 log::error!("Compaction failed: {}", e);
815 }
816 }
817 }
818 _ = shutdown_rx.recv() => {
819 log::info!("Background compaction shutting down");
820 let _ = self.wait_for_compaction().await;
821 break;
822 }
823 }
824 }
825 })
826 }
827
828 async fn update_compaction_status(&self) -> Result<()> {
829 let schema = self.schema_manager.schema();
830 let backend = self.backend.as_ref();
831 let mut total_rows: usize = 0;
832 let mut oldest_ts: Option<i64> = None;
833
834 for name in schema.edge_types.keys() {
835 for dir in ["fwd", "bwd"] {
836 let tbl_name = table_names::delta_table_name(name, dir);
837 if !backend.table_exists(&tbl_name).await.unwrap_or(false) {
838 continue;
839 }
840 let row_count = backend.count_rows(&tbl_name, None).await.unwrap_or(0);
841 if row_count == 0 {
842 continue;
843 }
844 total_rows += row_count;
845
846 let request =
848 ScanRequest::all(&tbl_name).with_columns(vec!["_created_at".to_string()]);
849 let Ok(batches) = backend.scan(request).await else {
850 continue;
851 };
852 for batch in batches {
853 let Some(col) = batch
854 .column_by_name("_created_at")
855 .and_then(|c| c.as_any().downcast_ref::<TimestampNanosecondArray>())
856 else {
857 continue;
858 };
859 for i in 0..col.len() {
860 if !col.is_null(i) {
861 let ts = col.value(i);
862 oldest_ts = Some(oldest_ts.map_or(ts, |prev| prev.min(ts)));
863 }
864 }
865 }
866 }
867 }
868
869 let oldest_l1_age = oldest_ts
870 .and_then(|ts| {
871 let created = UNIX_EPOCH + Duration::from_nanos(ts as u64);
872 SystemTime::now().duration_since(created).ok()
873 })
874 .unwrap_or(Duration::ZERO);
875
876 let mut status = acquire_mutex(&self.compaction_status, "compaction_status")?;
877 status.l1_size_bytes = (total_rows * ENTRY_SIZE_ESTIMATE) as u64;
880 status.oldest_l1_age = oldest_l1_age;
881 Ok(())
882 }
883
884 fn pick_compaction_task(&self) -> Option<CompactionTask> {
885 let status = acquire_mutex(&self.compaction_status, "compaction_status").ok()?;
886
887 if status.l1_runs >= self.config.compaction.max_l1_runs {
888 return Some(CompactionTask::ByRunCount);
889 }
890 if status.l1_size_bytes >= self.config.compaction.max_l1_size_bytes {
891 return Some(CompactionTask::BySize);
892 }
893 if status.oldest_l1_age >= self.config.compaction.max_l1_age
894 && status.oldest_l1_age > Duration::ZERO
895 {
896 return Some(CompactionTask::ByAge);
897 }
898
899 None
900 }
901
902 async fn try_optimize_table(backend: &dyn StorageBackend, table_name: &str) -> bool {
904 if let Err(e) = backend.optimize_table(table_name).await {
905 log::warn!("Failed to optimize table {}: {}", table_name, e);
906 return false;
907 }
908 true
909 }
910
911 pub fn trigger_async_compaction(self: &Arc<Self>) {
914 let this = Arc::clone(self);
915 tokio::spawn(async move {
916 if let Err(e) = Self::execute_compaction(this, CompactionTask::ByRunCount).await {
917 log::debug!("Post-flush compaction skipped: {}", e);
919 }
920 });
921 }
922
923 pub(crate) async fn execute_compaction(
924 this: Arc<Self>,
925 _task: CompactionTask,
926 ) -> Result<CompactionStats> {
927 let start = std::time::Instant::now();
928 let _guard = CompactionGuard::new(this.compaction_status.clone())
929 .ok_or_else(|| anyhow!("Compaction already in progress"))?;
930
931 let schema = this.schema_manager.schema();
932 let mut files_compacted = 0;
933
934 let compactor = Compactor::new(Arc::clone(&this));
937 let compaction_results = compactor.compact_all().await.unwrap_or_else(|e| {
938 log::error!(
939 "Semantic compaction failed (continuing with backend optimize): {}",
940 e
941 );
942 Vec::new()
943 });
944
945 let am = this.adjacency_manager();
947 for info in &compaction_results {
948 let direction = match info.direction.as_str() {
949 "fwd" => Direction::Outgoing,
950 "bwd" => Direction::Incoming,
951 _ => continue,
952 };
953 if let Some(etid) = schema.edge_type_id_unified_case_insensitive(&info.edge_type)
954 && let Err(e) = am.warm(&this, etid, direction, None).await
955 {
956 log::warn!(
957 "Failed to re-warm adjacency for {}/{}: {}",
958 info.edge_type,
959 info.direction,
960 e
961 );
962 }
963 }
964
965 let backend = this.backend.as_ref();
967
968 for name in schema.edge_types.keys() {
970 for dir in ["fwd", "bwd"] {
971 let delta = table_names::delta_table_name(name, dir);
972 if Self::try_optimize_table(backend, &delta).await {
973 files_compacted += 1;
974 }
975 let adj = table_names::adjacency_table_name(name, dir);
976 if Self::try_optimize_table(backend, &adj).await {
977 files_compacted += 1;
978 }
979 }
980 }
981
982 for label in schema.labels.keys() {
984 let tbl = table_names::vertex_table_name(label);
985 if Self::try_optimize_table(backend, &tbl).await {
986 files_compacted += 1;
987 backend.invalidate_cache(&tbl);
988 }
989 }
990
991 for tbl in [
993 table_names::main_vertex_table_name(),
994 table_names::main_edge_table_name(),
995 ] {
996 if Self::try_optimize_table(backend, tbl).await {
997 files_compacted += 1;
998 }
999 }
1000
1001 {
1002 let mut status = acquire_mutex(&this.compaction_status, "compaction_status")?;
1003 status.total_compactions += 1;
1004 status.l1_runs = 0; }
1006
1007 Ok(CompactionStats {
1008 files_compacted,
1009 bytes_before: 0,
1010 bytes_after: 0,
1011 duration: start.elapsed(),
1012 crdt_merges: 0,
1013 })
1014 }
1015
1016 pub fn invalidate_table_cache(&self, label: &str) {
1020 let name = table_names::vertex_table_name(label);
1021 self.backend.invalidate_cache(&name);
1022 }
1023
1024 pub fn base_path(&self) -> &str {
1025 &self.base_uri
1026 }
1027
1028 pub fn schema_manager(&self) -> &SchemaManager {
1029 &self.schema_manager
1030 }
1031
1032 pub fn schema_manager_arc(&self) -> Arc<SchemaManager> {
1033 self.schema_manager.clone()
1034 }
1035
1036 #[must_use]
1044 pub fn schema_manager_arc_ref(&self) -> &Arc<SchemaManager> {
1045 &self.schema_manager
1046 }
1047
1048 pub fn adjacency_manager(&self) -> Arc<AdjacencyManager> {
1050 Arc::clone(&self.adjacency_manager)
1051 }
1052
1053 pub async fn warm_adjacency(
1058 &self,
1059 edge_type_id: u32,
1060 direction: crate::storage::direction::Direction,
1061 version: Option<u64>,
1062 ) -> anyhow::Result<()> {
1063 self.adjacency_manager
1064 .warm(self, edge_type_id, direction, version)
1065 .await
1066 }
1067
1068 pub async fn warm_adjacency_coalesced(
1073 &self,
1074 edge_type_id: u32,
1075 direction: crate::storage::direction::Direction,
1076 version: Option<u64>,
1077 ) -> anyhow::Result<()> {
1078 self.adjacency_manager
1079 .warm_coalesced(self, edge_type_id, direction, version)
1080 .await
1081 }
1082
1083 pub fn has_adjacency_csr(
1085 &self,
1086 edge_type_id: u32,
1087 direction: crate::storage::direction::Direction,
1088 ) -> bool {
1089 self.adjacency_manager.has_csr(edge_type_id, direction)
1090 }
1091
1092 pub fn get_neighbors_at_version(
1094 &self,
1095 vid: uni_common::core::id::Vid,
1096 edge_type: u32,
1097 direction: crate::storage::direction::Direction,
1098 version: u64,
1099 ) -> Vec<(uni_common::core::id::Vid, uni_common::core::id::Eid)> {
1100 self.adjacency_manager
1101 .get_neighbors_at_version(vid, edge_type, direction, version)
1102 }
1103
1104 pub fn backend(&self) -> &dyn StorageBackend {
1106 self.backend.as_ref()
1107 }
1108
1109 pub fn backend_arc(&self) -> Arc<dyn StorageBackend> {
1111 self.backend.clone()
1112 }
1113
1114 async fn rebuild_vid_labels_index(&mut self) -> Result<()> {
1119 use crate::storage::vid_labels::VidLabelsIndex;
1120
1121 let backend = self.backend.as_ref();
1122 let vtable = table_names::main_vertex_table_name();
1123
1124 if !backend.table_exists(vtable).await.unwrap_or(false) {
1126 self.vid_labels_index = Arc::new(parking_lot::RwLock::new(VidLabelsIndex::new()));
1127 return Ok(());
1128 }
1129
1130 let request = ScanRequest::all(vtable)
1132 .with_filter("_deleted = false")
1133 .with_limit(100_000);
1134 let batches = backend
1135 .scan(request)
1136 .await
1137 .map_err(|e| anyhow!("Failed to query main vertex table: {}", e))?;
1138
1139 let mut index = VidLabelsIndex::new();
1140 for batch in batches {
1141 let vid_col = batch
1142 .column_by_name("_vid")
1143 .ok_or_else(|| anyhow!("Missing _vid column"))?
1144 .as_any()
1145 .downcast_ref::<UInt64Array>()
1146 .ok_or_else(|| anyhow!("Invalid _vid column type"))?;
1147
1148 let labels_col = batch
1149 .column_by_name("labels")
1150 .ok_or_else(|| anyhow!("Missing labels column"))?
1151 .as_any()
1152 .downcast_ref::<arrow_array::ListArray>()
1153 .ok_or_else(|| anyhow!("Invalid labels column type"))?;
1154
1155 for row_idx in 0..batch.num_rows() {
1156 let vid = Vid::from(vid_col.value(row_idx));
1157 let labels_array = labels_col.value(row_idx);
1158 let labels_str_array = labels_array
1159 .as_any()
1160 .downcast_ref::<arrow_array::StringArray>()
1161 .ok_or_else(|| anyhow!("Invalid labels array element type"))?;
1162
1163 let labels: Vec<String> = (0..labels_str_array.len())
1164 .map(|i| labels_str_array.value(i).to_string())
1165 .collect();
1166
1167 index.insert(vid, labels);
1168 }
1169 }
1170
1171 self.vid_labels_index = Arc::new(parking_lot::RwLock::new(index));
1172 Ok(())
1173 }
1174
1175 pub fn get_labels_from_index(&self, vid: Vid) -> Option<Vec<String>> {
1180 let index = self.vid_labels_index.read();
1181 index.get_labels(vid).map(|labels| labels.to_vec())
1182 }
1183
1184 pub fn update_vid_labels_index(&self, vid: Vid, labels: Vec<String>) {
1186 let mut index = self.vid_labels_index.write();
1187 index.insert(vid, labels);
1188 }
1189
1190 pub fn remove_from_vid_labels_index(&self, vid: Vid) {
1192 let mut index = self.vid_labels_index.write();
1193 index.remove_vid(vid);
1194 }
1195
1196 pub async fn load_subgraph_cached(
1197 &self,
1198 start_vids: &[Vid],
1199 edge_types: &[u32],
1200 max_hops: usize,
1201 direction: GraphDirection,
1202 _l0: Option<Arc<RwLock<L0Buffer>>>,
1203 ) -> Result<WorkingGraph> {
1204 let mut graph = WorkingGraph::new();
1205
1206 let dir = match direction {
1207 GraphDirection::Outgoing => crate::storage::direction::Direction::Outgoing,
1208 GraphDirection::Incoming => crate::storage::direction::Direction::Incoming,
1209 };
1210
1211 let neighbor_is_dst = matches!(direction, GraphDirection::Outgoing);
1212
1213 let mut frontier: Vec<Vid> = start_vids.to_vec();
1215 let mut visited: HashSet<Vid> = HashSet::new();
1216
1217 for &vid in start_vids {
1219 graph.add_vertex(vid);
1220 }
1221
1222 for _hop in 0..max_hops {
1223 let mut next_frontier = HashSet::new();
1224
1225 for &vid in &frontier {
1226 if visited.contains(&vid) {
1227 continue;
1228 }
1229 visited.insert(vid);
1230 graph.add_vertex(vid);
1231
1232 for &etype_id in edge_types {
1233 let edge_ver = self.snapshot_version_hwm();
1237 self.adjacency_manager
1238 .warm_coalesced(self, etype_id, dir, edge_ver)
1239 .await?;
1240
1241 let edges = self.adjacency_manager.get_neighbors(vid, etype_id, dir);
1243
1244 for (neighbor_vid, eid) in edges {
1245 graph.add_vertex(neighbor_vid);
1246 if !visited.contains(&neighbor_vid) {
1247 next_frontier.insert(neighbor_vid);
1248 }
1249
1250 if neighbor_is_dst {
1251 graph.add_edge(vid, neighbor_vid, eid, etype_id);
1252 } else {
1253 graph.add_edge(neighbor_vid, vid, eid, etype_id);
1254 }
1255 }
1256 }
1257 }
1258 frontier = next_frontier.into_iter().collect();
1259
1260 if frontier.is_empty() {
1262 break;
1263 }
1264 }
1265
1266 Ok(graph)
1267 }
1268
1269 pub fn snapshot_manager(&self) -> &SnapshotManager {
1270 &self.snapshot_manager
1271 }
1272
1273 pub fn index_manager(&self) -> IndexManager {
1274 IndexManager::new(&self.base_uri, self.schema_manager.clone())
1275 .with_backend(self.backend_arc())
1276 }
1277
1278 pub async fn scan_vertex_table(
1287 &self,
1288 label: &str,
1289 columns: &[&str],
1290 additional_filter: Option<&str>,
1291 ) -> Result<Option<arrow_array::RecordBatch>> {
1292 let backend = self.backend();
1293 let table_name = table_names::vertex_table_name(label);
1294
1295 if !backend.table_exists(&table_name).await.unwrap_or(false) {
1296 return Ok(None);
1297 }
1298
1299 let actual_columns =
1301 if let Some(table_schema) = backend.get_table_schema(&table_name).await? {
1302 let table_field_names: HashSet<&str> = table_schema
1303 .fields()
1304 .iter()
1305 .map(|f| f.name().as_str())
1306 .collect();
1307 columns
1308 .iter()
1309 .copied()
1310 .filter(|c| table_field_names.contains(c))
1311 .map(|s| s.to_string())
1312 .collect::<Vec<_>>()
1313 } else {
1314 return Ok(None);
1315 };
1316
1317 let filter = match (self.version_high_water_mark(), additional_filter) {
1319 (Some(hwm), Some(f)) => Some(format!("_version <= {} AND ({})", hwm, f)),
1320 (Some(hwm), None) => Some(format!("_version <= {}", hwm)),
1321 (None, Some(f)) => Some(f.to_string()),
1322 (None, None) => None,
1323 };
1324
1325 let mut request = ScanRequest::all(&table_name).with_columns(actual_columns);
1326 if let Some(f) = filter {
1327 request = request.with_filter(f);
1328 }
1329
1330 let batches = backend.scan(request).await?;
1337 if batches.is_empty() {
1338 Ok(None)
1339 } else {
1340 Ok(Some(arrow::compute::concat_batches(
1341 &batches[0].schema(),
1342 &batches,
1343 )?))
1344 }
1345 }
1346
1347 pub async fn scan_delta_table(
1350 &self,
1351 edge_type: &str,
1352 direction: &str,
1353 columns: &[&str],
1354 additional_filter: Option<&str>,
1355 ) -> Result<Option<arrow_array::RecordBatch>> {
1356 let edge_hwm = self.snapshot_version_hwm();
1362 let backend = self.backend();
1363 let table_name = table_names::delta_table_name(edge_type, direction);
1364
1365 if !backend.table_exists(&table_name).await.unwrap_or(false) {
1366 return Ok(None);
1367 }
1368
1369 let actual_columns =
1371 if let Some(table_schema) = backend.get_table_schema(&table_name).await? {
1372 let table_field_names: HashSet<&str> = table_schema
1373 .fields()
1374 .iter()
1375 .map(|f| f.name().as_str())
1376 .collect();
1377 columns
1378 .iter()
1379 .copied()
1380 .filter(|c| table_field_names.contains(c))
1381 .map(|s| s.to_string())
1382 .collect::<Vec<_>>()
1383 } else {
1384 return Ok(None);
1385 };
1386
1387 let filter = match (edge_hwm, additional_filter) {
1388 (Some(hwm), Some(f)) => Some(format!("_version <= {} AND ({})", hwm, f)),
1389 (Some(hwm), None) => Some(format!("_version <= {}", hwm)),
1390 (None, Some(f)) => Some(f.to_string()),
1391 (None, None) => None,
1392 };
1393
1394 let mut request = ScanRequest::all(&table_name).with_columns(actual_columns);
1395 if let Some(f) = filter {
1396 request = request.with_filter(f);
1397 }
1398
1399 let batches = backend.scan(request).await?;
1406 if batches.is_empty() {
1407 Ok(None)
1408 } else {
1409 Ok(Some(arrow::compute::concat_batches(
1410 &batches[0].schema(),
1411 &batches,
1412 )?))
1413 }
1414 }
1415
1416 pub async fn scan_main_vertex_table(
1421 &self,
1422 columns: &[&str],
1423 filter: Option<&str>,
1424 ) -> Result<Option<arrow_array::RecordBatch>> {
1425 let backend = self.backend();
1426 let table_name = table_names::main_vertex_table_name();
1427
1428 if !backend.table_exists(table_name).await.unwrap_or(false) {
1429 return Ok(None);
1430 }
1431
1432 let full_filter = match (self.version_high_water_mark(), filter) {
1434 (Some(hwm), Some(f)) => Some(format!("_version <= {} AND ({})", hwm, f)),
1435 (Some(hwm), None) => Some(format!("_version <= {}", hwm)),
1436 (None, Some(f)) => Some(f.to_string()),
1437 (None, None) => None,
1438 };
1439
1440 let request = ScanRequest::all(table_name)
1441 .with_columns(columns.iter().map(|s| s.to_string()).collect());
1442 let request = match full_filter.as_deref() {
1443 Some(f) => request.with_filter(f),
1444 None => request,
1445 };
1446
1447 let batches = backend.scan(request).await?;
1454 if batches.is_empty() {
1455 Ok(None)
1456 } else {
1457 Ok(Some(arrow::compute::concat_batches(
1458 &batches[0].schema(),
1459 &batches,
1460 )?))
1461 }
1462 }
1463
1464 pub async fn scan_main_edge_table_stream(
1466 &self,
1467 filter: Option<&str>,
1468 ) -> Result<
1469 Option<
1470 std::pin::Pin<Box<dyn futures::Stream<Item = Result<arrow_array::RecordBatch>> + Send>>,
1471 >,
1472 > {
1473 let backend = self.backend();
1474 let table_name = table_names::main_edge_table_name();
1475
1476 if !backend.table_exists(table_name).await.unwrap_or(false) {
1477 return Ok(None);
1478 }
1479
1480 let mut request = ScanRequest::all(table_name);
1481 if let Some(f) = filter {
1482 request = request.with_filter(f);
1483 }
1484
1485 let stream = backend.scan_stream(request).await?;
1486 Ok(Some(stream))
1487 }
1488
1489 pub async fn scan_vertex_table_stream(
1491 &self,
1492 label: &str,
1493 ) -> Result<
1494 Option<
1495 std::pin::Pin<Box<dyn futures::Stream<Item = Result<arrow_array::RecordBatch>> + Send>>,
1496 >,
1497 > {
1498 let backend = self.backend();
1499 let table_name = table_names::vertex_table_name(label);
1500
1501 if !backend.table_exists(&table_name).await.unwrap_or(false) {
1502 return Ok(None);
1503 }
1504
1505 let stream = backend.scan_stream(ScanRequest::all(&table_name)).await?;
1506 Ok(Some(stream))
1507 }
1508
1509 pub async fn find_vertex_by_ext_id(&self, ext_id: &str) -> Result<Option<Vid>> {
1511 MainVertexDataset::find_by_ext_id(self.backend(), ext_id, self.version_high_water_mark())
1512 .await
1513 }
1514
1515 pub async fn get_vertex_ext_ids(&self) -> Result<std::collections::HashMap<Vid, String>> {
1527 use arrow_array::StringArray;
1528 let backend = self.backend.as_ref();
1529 let vtable = table_names::main_vertex_table_name();
1530 let mut out = std::collections::HashMap::new();
1531 if !backend.table_exists(vtable).await.unwrap_or(false) {
1532 return Ok(out);
1533 }
1534 let request = ScanRequest::all(vtable)
1535 .with_filter("_deleted = false")
1536 .with_columns(vec!["_vid".to_string(), "ext_id".to_string()]);
1537 let batches = backend
1538 .scan(request)
1539 .await
1540 .map_err(|e| anyhow!("get_vertex_ext_ids: {}", e))?;
1541 for batch in batches {
1542 let vids = batch
1543 .column_by_name("_vid")
1544 .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
1545 .ok_or_else(|| anyhow!("get_vertex_ext_ids: missing/invalid _vid column"))?;
1546 let exts = batch
1547 .column_by_name("ext_id")
1548 .and_then(|c| c.as_any().downcast_ref::<StringArray>())
1549 .ok_or_else(|| anyhow!("get_vertex_ext_ids: missing/invalid ext_id column"))?;
1550 for i in 0..batch.num_rows() {
1551 if exts.is_null(i) {
1552 continue;
1553 }
1554 let ext = exts.value(i);
1555 if !ext.is_empty() {
1556 out.insert(Vid::from(vids.value(i)), ext.to_string());
1557 }
1558 }
1559 }
1560 Ok(out)
1561 }
1562
1563 pub async fn find_vertex_labels_by_vid(&self, vid: Vid) -> Result<Option<Vec<String>>> {
1565 MainVertexDataset::find_labels_by_vid(self.backend(), vid, self.version_high_water_mark())
1566 .await
1567 }
1568
1569 pub async fn find_edges_by_type_names(
1572 &self,
1573 type_names: &[&str],
1574 endpoint_filter: Option<(crate::storage::main_edge::EndpointSide, &[Vid])>,
1575 ) -> Result<Vec<(Eid, Vid, Vid, String, uni_common::Properties)>> {
1576 MainEdgeDataset::find_edges_by_type_names(self.backend(), type_names, endpoint_filter).await
1577 }
1578
1579 pub async fn scan_vertex_candidates(
1581 &self,
1582 label: &str,
1583 filter: Option<&str>,
1584 ) -> Result<Vec<Vid>> {
1585 let backend = self.backend();
1586 let table_name = table_names::vertex_table_name(label);
1587
1588 if !backend.table_exists(&table_name).await.unwrap_or(false) {
1589 return Ok(Vec::new());
1590 }
1591
1592 let full_filter = match filter {
1593 Some(f) => format!("_deleted = false AND ({})", f),
1594 None => "_deleted = false".to_string(),
1595 };
1596
1597 let request = ScanRequest::all(&table_name)
1598 .with_filter(full_filter)
1599 .with_columns(vec!["_vid".to_string()]);
1600
1601 let batches = backend.scan(request).await?;
1602
1603 let mut vids = Vec::new();
1604 for batch in batches {
1605 let vid_col = batch
1606 .column_by_name("_vid")
1607 .ok_or(anyhow!("Missing _vid"))?
1608 .as_any()
1609 .downcast_ref::<UInt64Array>()
1610 .ok_or(anyhow!("Invalid _vid"))?;
1611 for i in 0..batch.num_rows() {
1612 vids.push(Vid::from(vid_col.value(i)));
1613 }
1614 }
1615 Ok(vids)
1616 }
1617
1618 pub fn vertex_dataset(&self, label: &str) -> Result<VertexDataset> {
1619 let schema = self.schema_manager.schema();
1620 let label_meta = schema
1621 .labels
1622 .get(label)
1623 .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
1624 let key = format!("vertices_{label}");
1625 match self.fork_branch_for(&key) {
1626 Some(branch) => Ok(VertexDataset::new_branched(
1627 &self.base_uri,
1628 label,
1629 label_meta.id,
1630 branch,
1631 )),
1632 None => Ok(VertexDataset::new(&self.base_uri, label, label_meta.id)),
1633 }
1634 }
1635
1636 #[cfg(feature = "lance-backend")]
1637 pub fn edge_dataset(
1638 &self,
1639 edge_type: &str,
1640 src_label: &str,
1641 dst_label: &str,
1642 ) -> Result<EdgeDataset> {
1643 let key = format!("edges_{edge_type}");
1644 match self.fork_branch_for(&key) {
1645 Some(branch) => Ok(EdgeDataset::new_branched(
1646 &self.base_uri,
1647 edge_type,
1648 src_label,
1649 dst_label,
1650 branch,
1651 )),
1652 None => Ok(EdgeDataset::new(
1653 &self.base_uri,
1654 edge_type,
1655 src_label,
1656 dst_label,
1657 )),
1658 }
1659 }
1660
1661 pub fn delta_dataset(&self, edge_type: &str, direction: &str) -> Result<DeltaDataset> {
1662 let key = format!("deltas_{edge_type}_{direction}");
1663 match self.fork_branch_for(&key) {
1664 Some(branch) => Ok(DeltaDataset::new_branched(
1665 &self.base_uri,
1666 edge_type,
1667 direction,
1668 branch,
1669 )),
1670 None => Ok(DeltaDataset::new(&self.base_uri, edge_type, direction)),
1671 }
1672 }
1673
1674 pub fn adjacency_dataset(
1675 &self,
1676 edge_type: &str,
1677 label: &str,
1678 direction: &str,
1679 ) -> Result<AdjacencyDataset> {
1680 let key = crate::backend::table_names::adjacency_table_name(edge_type, direction);
1687 match self.fork_branch_for(&key) {
1688 Some(branch) => Ok(AdjacencyDataset::new_branched(
1689 &self.base_uri,
1690 edge_type,
1691 label,
1692 direction,
1693 branch,
1694 )),
1695 None => Ok(AdjacencyDataset::new(
1696 &self.base_uri,
1697 edge_type,
1698 label,
1699 direction,
1700 )),
1701 }
1702 }
1703
1704 fn fork_branch_for(&self, dataset_name: &str) -> Option<String> {
1708 self.fork_scope
1709 .as_ref()
1710 .and_then(|s| s.branch_for(dataset_name))
1711 }
1712
1713 pub fn main_vertex_dataset(&self) -> MainVertexDataset {
1718 MainVertexDataset::new(&self.base_uri)
1719 }
1720
1721 pub fn main_edge_dataset(&self) -> MainEdgeDataset {
1726 MainEdgeDataset::new(&self.base_uri)
1727 }
1728
1729 #[cfg(feature = "lance-backend")]
1730 pub fn uid_index(&self, label: &str) -> Result<UidIndex> {
1731 Ok(UidIndex::new(&self.base_uri, label))
1732 }
1733
1734 #[cfg(feature = "lance-backend")]
1735 pub async fn inverted_index(&self, label: &str, property: &str) -> Result<InvertedIndex> {
1736 let schema = self.schema_manager.schema();
1737 let config = schema
1738 .indexes
1739 .iter()
1740 .find_map(|idx| match idx {
1741 IndexDefinition::Inverted(cfg)
1742 if cfg.label == label && cfg.property == property =>
1743 {
1744 Some(cfg.clone())
1745 }
1746 _ => None,
1747 })
1748 .ok_or_else(|| anyhow!("Inverted index not found for {}.{}", label, property))?;
1749
1750 InvertedIndex::new(&self.base_uri, config).await
1751 }
1752
1753 #[expect(clippy::too_many_arguments)]
1754 pub async fn vector_search(
1755 &self,
1756 label: &str,
1757 property: &str,
1758 query: &[f32],
1759 k: usize,
1760 filter: Option<&str>,
1761 opts: VectorQueryOpts,
1762 ctx: Option<&QueryContext>,
1763 ) -> Result<Vec<(Vid, f32)>> {
1764 use crate::backend::types::{DistanceMetric as BackendMetric, FilterExpr};
1765
1766 let schema = self.schema_manager.schema();
1768 let metric = schema
1769 .vector_index_for_property(label, property)
1770 .map(|config| config.metric.clone())
1771 .unwrap_or(DistanceMetric::L2);
1772
1773 let backend = self.backend.as_ref();
1774 let name = table_names::vertex_table_name(label);
1775
1776 let mut results = Vec::new();
1777
1778 if backend.table_exists(&name).await.unwrap_or(false) {
1780 let backend_metric = match &metric {
1781 DistanceMetric::L2 => BackendMetric::L2,
1782 DistanceMetric::Cosine => BackendMetric::Cosine,
1783 DistanceMetric::Dot => BackendMetric::Dot,
1784 _ => BackendMetric::L2,
1785 };
1786
1787 let mut filter_parts = vec![Self::build_active_filter(filter)];
1789 if ctx.is_some()
1790 && let Some(hwm) = self.version_high_water_mark()
1791 {
1792 filter_parts.push(format!("_version <= {}", hwm));
1793 }
1794 let combined_filter = FilterExpr::Sql(filter_parts.join(" AND "));
1795
1796 let batches = backend
1797 .vector_search(
1798 &name,
1799 property,
1800 query,
1801 k,
1802 backend_metric,
1803 combined_filter,
1804 opts,
1805 )
1806 .await?;
1807
1808 results = extract_vid_score_pairs(&batches, "_vid", "_distance")?;
1809 }
1810
1811 if let Some(qctx) = ctx {
1813 merge_l0_into_vector_results(&mut results, qctx, label, property, query, k, &metric);
1814 }
1815
1816 Ok(results)
1817 }
1818
1819 #[expect(clippy::too_many_arguments)]
1830 pub async fn muvera_fde_candidates(
1831 &self,
1832 label: &str,
1833 fde_column: &str,
1834 fde_query: &[f32],
1835 k: usize,
1836 filter: Option<&str>,
1837 opts: VectorQueryOpts,
1838 ctx: Option<&QueryContext>,
1839 ) -> Result<Vec<(Vid, f32)>> {
1840 use crate::backend::types::{DistanceMetric as BackendMetric, FilterExpr};
1841
1842 let backend = self.backend.as_ref();
1843 let name = table_names::vertex_table_name(label);
1844 if !backend.table_exists(&name).await.unwrap_or(false) {
1845 return Ok(Vec::new());
1847 }
1848
1849 let mut filter_parts = vec![Self::build_active_filter(filter)];
1850 if ctx.is_some()
1851 && let Some(hwm) = self.version_high_water_mark()
1852 {
1853 filter_parts.push(format!("_version <= {}", hwm));
1854 }
1855 let combined_filter = FilterExpr::Sql(filter_parts.join(" AND "));
1856
1857 let batches = backend
1858 .vector_search(
1859 &name,
1860 fde_column,
1861 fde_query,
1862 k,
1863 BackendMetric::Dot,
1864 combined_filter,
1865 opts,
1866 )
1867 .await?;
1868 extract_vid_score_pairs(&batches, "_vid", "_distance")
1869 }
1870
1871 #[expect(clippy::too_many_arguments)]
1892 pub async fn multivector_search(
1893 &self,
1894 label: &str,
1895 property: &str,
1896 query: &[Vec<f32>],
1897 k: usize,
1898 filter: Option<&str>,
1899 opts: VectorQueryOpts,
1900 ctx: Option<&QueryContext>,
1901 ) -> Result<Vec<(Vid, f32)>> {
1902 use crate::backend::types::{DistanceMetric as BackendMetric, FilterExpr};
1903
1904 let schema = self.schema_manager.schema();
1905 let metric = schema
1906 .vector_index_for_property(label, property)
1907 .map(|config| config.metric.clone())
1908 .unwrap_or(DistanceMetric::Cosine);
1909
1910 let backend = self.backend.as_ref();
1911 let name = table_names::vertex_table_name(label);
1912
1913 let branched = self
1924 .fork_scope
1925 .as_ref()
1926 .is_some_and(|s| s.branch_for(&name).is_some());
1927 if branched {
1928 if !backend.table_exists(&name).await.unwrap_or(false) {
1929 return Ok(Vec::new());
1931 }
1932 let mut filter_parts = vec![Self::build_active_filter(filter)];
1933 if ctx.is_some()
1934 && let Some(hwm) = self.version_high_water_mark()
1935 {
1936 filter_parts.push(format!("_version <= {}", hwm));
1937 }
1938 let request = ScanRequest::all(&name)
1939 .with_filter(filter_parts.join(" AND "))
1940 .with_columns(vec!["_vid".to_string()]);
1941 let batches = backend.scan(request).await?;
1942 let mut results = Vec::new();
1943 for batch in batches {
1944 let vid_col = batch
1945 .column_by_name("_vid")
1946 .ok_or(anyhow!("Missing _vid"))?
1947 .as_any()
1948 .downcast_ref::<UInt64Array>()
1949 .ok_or(anyhow!("Invalid _vid"))?;
1950 for i in 0..batch.num_rows() {
1951 results.push((Vid::from(vid_col.value(i)), 0.0_f32));
1952 }
1953 }
1954 return Ok(results);
1955 }
1956
1957 let mut results = Vec::new();
1958 if backend.table_exists(&name).await.unwrap_or(false) {
1959 let backend_metric = match &metric {
1960 DistanceMetric::L2 => BackendMetric::L2,
1961 DistanceMetric::Cosine => BackendMetric::Cosine,
1962 DistanceMetric::Dot => BackendMetric::Dot,
1963 _ => BackendMetric::Cosine,
1964 };
1965
1966 let mut filter_parts = vec![Self::build_active_filter(filter)];
1967 if ctx.is_some()
1968 && let Some(hwm) = self.version_high_water_mark()
1969 {
1970 filter_parts.push(format!("_version <= {}", hwm));
1971 }
1972 let combined_filter = FilterExpr::Sql(filter_parts.join(" AND "));
1973
1974 let batches = backend
1975 .multivector_search(
1976 &name,
1977 property,
1978 query,
1979 k,
1980 backend_metric,
1981 combined_filter,
1982 opts,
1983 )
1984 .await?;
1985 results = extract_vid_score_pairs(&batches, "_vid", "_distance")?;
1986 }
1987
1988 Ok(results)
1989 }
1990
1991 pub async fn fts_search(
2007 &self,
2008 label: &str,
2009 property: &str,
2010 query: &str,
2011 k: usize,
2012 filter: Option<&str>,
2013 ctx: Option<&QueryContext>,
2014 ) -> Result<Vec<(Vid, f32)>> {
2015 use crate::backend::types::FilterExpr;
2016
2017 let backend = self.backend.as_ref();
2018 let name = table_names::vertex_table_name(label);
2019
2020 let mut results = if backend.table_exists(&name).await.unwrap_or(false) {
2021 let mut filter_parts = vec![Self::build_active_filter(filter)];
2023 if ctx.is_some()
2024 && let Some(hwm) = self.version_high_water_mark()
2025 {
2026 filter_parts.push(format!("_version <= {}", hwm));
2027 }
2028 let combined_filter = FilterExpr::Sql(filter_parts.join(" AND "));
2029
2030 let batches = backend
2031 .full_text_search(&name, property, query, k, combined_filter)
2032 .await?;
2033
2034 let mut fts_results = extract_vid_score_pairs(&batches, "_vid", "_score")?;
2035 fts_results.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
2037 fts_results
2038 } else {
2039 Vec::new()
2040 };
2041
2042 if let Some(qctx) = ctx {
2044 merge_l0_into_fts_results(&mut results, qctx, label, property, query, k);
2045 }
2046
2047 Ok(results)
2048 }
2049
2050 #[cfg(feature = "lance-backend")]
2051 pub async fn get_vertex_by_uid(&self, uid: &UniId, label: &str) -> Result<Option<Vid>> {
2052 let index = self.uid_index(label)?;
2053 index.get_vid(uid).await
2054 }
2055
2056 #[cfg(feature = "lance-backend")]
2057 pub async fn insert_vertex_with_uid(&self, label: &str, vid: Vid, uid: UniId) -> Result<()> {
2058 let index = self.uid_index(label)?;
2059 index.write_mapping(&[(uid, vid)]).await
2060 }
2061
2062 pub async fn load_subgraph(
2063 &self,
2064 start_vids: &[Vid],
2065 edge_types: &[u32],
2066 max_hops: usize,
2067 direction: GraphDirection,
2068 l0: Option<&L0Buffer>,
2069 ) -> Result<WorkingGraph> {
2070 let mut graph = WorkingGraph::new();
2071 let schema = self.schema_manager.schema();
2072
2073 let label_map: HashMap<u16, String> = schema
2075 .labels
2076 .values()
2077 .map(|meta| {
2078 (
2079 meta.id,
2080 schema.label_name_by_id(meta.id).unwrap().to_owned(),
2081 )
2082 })
2083 .collect();
2084
2085 let edge_type_map: HashMap<u32, String> = schema
2086 .edge_types
2087 .values()
2088 .map(|meta| {
2089 (
2090 meta.id,
2091 schema.edge_type_name_by_id(meta.id).unwrap().to_owned(),
2092 )
2093 })
2094 .collect();
2095
2096 let target_edge_types: HashSet<u32> = edge_types.iter().cloned().collect();
2097
2098 let mut frontier: Vec<Vid> = start_vids.to_vec();
2100 let mut visited: HashSet<Vid> = HashSet::new();
2101
2102 for &vid in start_vids {
2104 graph.add_vertex(vid);
2105 }
2106
2107 for _hop in 0..max_hops {
2108 let mut next_frontier = HashSet::new();
2109
2110 for &vid in &frontier {
2111 if visited.contains(&vid) {
2112 continue;
2113 }
2114 visited.insert(vid);
2115 graph.add_vertex(vid);
2116
2117 for &etype_id in &target_edge_types {
2119 let etype_name = edge_type_map
2120 .get(&etype_id)
2121 .ok_or_else(|| anyhow!("Unknown edge type ID: {}", etype_id))?;
2122
2123 let (dir_str, neighbor_is_dst) = match direction {
2127 GraphDirection::Outgoing => ("fwd", true),
2128 GraphDirection::Incoming => ("bwd", false),
2129 };
2130
2131 let mut edges: HashMap<Eid, EdgeState> = HashMap::new();
2132
2133 let _edge_ver = self
2138 .pinned_snapshot
2139 .as_ref()
2140 .and_then(|s| s.edges.get(etype_name).map(|es| es.lance_version));
2141
2142 let backend = self.backend();
2144 for current_src_label in label_map.values() {
2145 let adj_ds =
2146 match self.adjacency_dataset(etype_name, current_src_label, dir_str) {
2147 Ok(ds) => ds,
2148 Err(_) => continue,
2149 };
2150 if let Some((neighbors, eids)) =
2151 adj_ds.read_adjacency_backend(backend, vid).await?
2152 {
2153 for (n, eid) in neighbors.into_iter().zip(eids) {
2154 edges.insert(
2155 eid,
2156 EdgeState {
2157 neighbor: n,
2158 version: 0,
2159 deleted: false,
2160 },
2161 );
2162 }
2163 break; }
2165 }
2166
2167 let delta_ds = self.delta_dataset(etype_name, dir_str)?;
2169 let delta_entries = delta_ds
2170 .read_deltas(backend, vid, &schema, self.snapshot_version_hwm())
2171 .await?;
2172 Self::apply_delta_to_edges(&mut edges, delta_entries, neighbor_is_dst);
2173
2174 if let Some(l0) = l0 {
2176 Self::apply_l0_to_edges(&mut edges, l0, vid, etype_id, direction);
2177 }
2178
2179 Self::add_edges_to_graph(
2181 &mut graph,
2182 edges,
2183 vid,
2184 etype_id,
2185 neighbor_is_dst,
2186 &visited,
2187 &mut next_frontier,
2188 );
2189 }
2190 }
2191 frontier = next_frontier.into_iter().collect();
2192
2193 if frontier.is_empty() {
2195 break;
2196 }
2197 }
2198
2199 Ok(graph)
2200 }
2201
2202 fn apply_delta_to_edges(
2204 edges: &mut HashMap<Eid, EdgeState>,
2205 delta_entries: Vec<crate::storage::delta::L1Entry>,
2206 neighbor_is_dst: bool,
2207 ) {
2208 for entry in delta_entries {
2209 let neighbor = if neighbor_is_dst {
2210 entry.dst_vid
2211 } else {
2212 entry.src_vid
2213 };
2214 let current_ver = edges.get(&entry.eid).map(|s| s.version).unwrap_or(0);
2215
2216 if entry.version > current_ver {
2217 edges.insert(
2218 entry.eid,
2219 EdgeState {
2220 neighbor,
2221 version: entry.version,
2222 deleted: matches!(entry.op, Op::Delete),
2223 },
2224 );
2225 }
2226 }
2227 }
2228
2229 fn apply_l0_to_edges(
2231 edges: &mut HashMap<Eid, EdgeState>,
2232 l0: &L0Buffer,
2233 vid: Vid,
2234 etype_id: u32,
2235 direction: GraphDirection,
2236 ) {
2237 let l0_neighbors = l0.get_neighbors(vid, etype_id, direction);
2238 for (neighbor, eid, ver) in l0_neighbors {
2239 let current_ver = edges.get(&eid).map(|s| s.version).unwrap_or(0);
2240 if ver > current_ver {
2241 edges.insert(
2242 eid,
2243 EdgeState {
2244 neighbor,
2245 version: ver,
2246 deleted: false,
2247 },
2248 );
2249 }
2250 }
2251
2252 for (eid, state) in edges.iter_mut() {
2254 if l0.is_tombstoned(*eid) {
2255 state.deleted = true;
2256 }
2257 }
2258 }
2259
2260 fn add_edges_to_graph(
2262 graph: &mut WorkingGraph,
2263 edges: HashMap<Eid, EdgeState>,
2264 vid: Vid,
2265 etype_id: u32,
2266 neighbor_is_dst: bool,
2267 visited: &HashSet<Vid>,
2268 next_frontier: &mut HashSet<Vid>,
2269 ) {
2270 for (eid, state) in edges {
2271 if state.deleted {
2272 continue;
2273 }
2274 graph.add_vertex(state.neighbor);
2275
2276 if !visited.contains(&state.neighbor) {
2277 next_frontier.insert(state.neighbor);
2278 }
2279
2280 if neighbor_is_dst {
2281 graph.add_edge(vid, state.neighbor, eid, etype_id);
2282 } else {
2283 graph.add_edge(state.neighbor, vid, eid, etype_id);
2284 }
2285 }
2286 }
2287}
2288
2289fn extract_vid_score_pairs(
2291 batches: &[arrow_array::RecordBatch],
2292 vid_column: &str,
2293 score_column: &str,
2294) -> Result<Vec<(Vid, f32)>> {
2295 let mut results = Vec::new();
2296 for batch in batches {
2297 let vid_col = batch
2298 .column_by_name(vid_column)
2299 .ok_or_else(|| anyhow!("Missing {} column", vid_column))?
2300 .as_any()
2301 .downcast_ref::<UInt64Array>()
2302 .ok_or_else(|| anyhow!("Invalid {} column type", vid_column))?;
2303
2304 let score_col = batch
2305 .column_by_name(score_column)
2306 .ok_or_else(|| anyhow!("Missing {} column", score_column))?
2307 .as_any()
2308 .downcast_ref::<Float32Array>()
2309 .ok_or_else(|| anyhow!("Invalid {} column type", score_column))?;
2310
2311 for i in 0..batch.num_rows() {
2312 results.push((Vid::from(vid_col.value(i)), score_col.value(i)));
2313 }
2314 }
2315 Ok(results)
2316}
2317
2318fn extract_embedding_from_props(
2323 props: &uni_common::Properties,
2324 property: &str,
2325) -> Option<Vec<f32>> {
2326 let arr = props.get(property)?.as_array()?;
2327 arr.iter().map(|v| v.as_f64().map(|f| f as f32)).collect()
2328}
2329
2330fn merge_l0_into_vector_results(
2340 results: &mut Vec<(Vid, f32)>,
2341 ctx: &QueryContext,
2342 label: &str,
2343 property: &str,
2344 query: &[f32],
2345 k: usize,
2346 metric: &DistanceMetric,
2347) {
2348 let mut buffers: Vec<Arc<parking_lot::RwLock<L0Buffer>>> =
2350 ctx.pending_flush_l0s.iter().map(Arc::clone).collect();
2351 buffers.push(Arc::clone(&ctx.l0));
2352 if let Some(ref txn) = ctx.transaction_l0 {
2353 buffers.push(Arc::clone(txn));
2354 }
2355
2356 let mut l0_candidates: HashMap<Vid, f32> = HashMap::new();
2358 let mut tombstoned: HashSet<Vid> = HashSet::new();
2360
2361 for buf_arc in &buffers {
2362 let buf = buf_arc.read();
2363
2364 for &vid in &buf.vertex_tombstones {
2366 tombstoned.insert(vid);
2367 }
2368
2369 for (&vid, labels) in &buf.vertex_labels {
2371 if !labels.iter().any(|l| l == label) {
2372 continue;
2373 }
2374 if let Some(props) = buf.vertex_properties.get(&vid)
2375 && let Some(emb) = extract_embedding_from_props(props, property)
2376 {
2377 if emb.len() != query.len() {
2378 continue; }
2380 let dist = metric.compute_distance(&emb, query);
2381 l0_candidates.insert(vid, dist);
2383 tombstoned.remove(&vid);
2385 }
2386 }
2387 }
2388
2389 if l0_candidates.is_empty() && tombstoned.is_empty() {
2391 return;
2392 }
2393
2394 results.retain(|(vid, _)| !tombstoned.contains(vid));
2396
2397 for (vid, dist) in &l0_candidates {
2399 if let Some(existing) = results.iter_mut().find(|(v, _)| v == vid) {
2400 existing.1 = *dist;
2401 } else {
2402 results.push((*vid, *dist));
2403 }
2404 }
2405
2406 results.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
2408 results.truncate(k);
2409}
2410
2411pub fn collect_l0_label_candidates(ctx: &QueryContext, label: &str) -> (Vec<Vid>, HashSet<Vid>) {
2425 let mut buffers: Vec<Arc<parking_lot::RwLock<L0Buffer>>> =
2427 ctx.pending_flush_l0s.iter().map(Arc::clone).collect();
2428 buffers.push(Arc::clone(&ctx.l0));
2429 if let Some(ref txn) = ctx.transaction_l0 {
2430 buffers.push(Arc::clone(txn));
2431 }
2432
2433 let mut live: HashSet<Vid> = HashSet::new();
2434 let mut tombstoned: HashSet<Vid> = HashSet::new();
2435
2436 for buf_arc in &buffers {
2437 let buf = buf_arc.read();
2438
2439 for &vid in &buf.vertex_tombstones {
2441 tombstoned.insert(vid);
2442 live.remove(&vid);
2443 }
2444
2445 for (&vid, labels) in &buf.vertex_labels {
2448 if !labels.iter().any(|l| l == label) {
2449 continue;
2450 }
2451 if buf.vertex_properties.contains_key(&vid) {
2452 live.insert(vid);
2453 tombstoned.remove(&vid);
2454 }
2455 }
2456 }
2457
2458 (live.into_iter().collect(), tombstoned)
2459}
2460
2461fn compute_text_relevance(query: &str, text: &str) -> f32 {
2466 let query_tokens: HashSet<String> =
2467 query.split_whitespace().map(|t| t.to_lowercase()).collect();
2468 if query_tokens.is_empty() {
2469 return 0.0;
2470 }
2471 let text_tokens: HashSet<String> = text.split_whitespace().map(|t| t.to_lowercase()).collect();
2472 let hits = query_tokens
2473 .iter()
2474 .filter(|t| text_tokens.contains(t.as_str()))
2475 .count();
2476 hits as f32 / query_tokens.len() as f32
2477}
2478
2479fn extract_text_from_props<'a>(
2481 props: &'a uni_common::Properties,
2482 property: &str,
2483) -> Option<&'a str> {
2484 props.get(property)?.as_str()
2485}
2486
2487fn merge_l0_into_fts_results(
2497 results: &mut Vec<(Vid, f32)>,
2498 ctx: &QueryContext,
2499 label: &str,
2500 property: &str,
2501 query: &str,
2502 k: usize,
2503) {
2504 let mut buffers: Vec<Arc<parking_lot::RwLock<L0Buffer>>> =
2506 ctx.pending_flush_l0s.iter().map(Arc::clone).collect();
2507 buffers.push(Arc::clone(&ctx.l0));
2508 if let Some(ref txn) = ctx.transaction_l0 {
2509 buffers.push(Arc::clone(txn));
2510 }
2511
2512 let mut l0_candidates: HashMap<Vid, f32> = HashMap::new();
2514 let mut tombstoned: HashSet<Vid> = HashSet::new();
2516
2517 for buf_arc in &buffers {
2518 let buf = buf_arc.read();
2519
2520 for &vid in &buf.vertex_tombstones {
2522 tombstoned.insert(vid);
2523 }
2524
2525 for (&vid, labels) in &buf.vertex_labels {
2527 if !labels.iter().any(|l| l == label) {
2528 continue;
2529 }
2530 if let Some(props) = buf.vertex_properties.get(&vid)
2531 && let Some(text) = extract_text_from_props(props, property)
2532 {
2533 let score = compute_text_relevance(query, text);
2534 if score > 0.0 {
2535 l0_candidates.insert(vid, score);
2537 }
2538 tombstoned.remove(&vid);
2540 }
2541 }
2542 }
2543
2544 if l0_candidates.is_empty() && tombstoned.is_empty() {
2546 return;
2547 }
2548
2549 results.retain(|(vid, _)| !tombstoned.contains(vid));
2551
2552 for (vid, score) in &l0_candidates {
2554 if let Some(existing) = results.iter_mut().find(|(v, _)| v == vid) {
2555 existing.1 = *score;
2556 } else {
2557 results.push((*vid, *score));
2558 }
2559 }
2560
2561 results.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
2563 results.truncate(k);
2564}