1use crate::backend::StorageBackend;
5#[cfg(feature = "lance-backend")]
6use crate::backend::lance::LanceDbBackend;
7use crate::backend::table_names;
8use crate::backend::types::ScanRequest;
9use crate::compaction::{CompactionStats, CompactionStatus, CompactionTask};
10use crate::runtime::WorkingGraph;
11use crate::runtime::context::QueryContext;
12use crate::runtime::l0::L0Buffer;
13use crate::storage::adjacency::AdjacencyDataset;
14use crate::storage::compaction::Compactor;
15use crate::storage::delta::{DeltaDataset, ENTRY_SIZE_ESTIMATE, Op};
16use crate::storage::direction::Direction;
17#[cfg(feature = "lance-backend")]
18use crate::storage::edge::EdgeDataset;
19#[cfg(feature = "lance-backend")]
20use crate::storage::index::UidIndex;
21#[cfg(feature = "lance-backend")]
22use crate::storage::inverted_index::InvertedIndex;
23use crate::storage::main_edge::MainEdgeDataset;
24use crate::storage::main_vertex::MainVertexDataset;
25use crate::storage::vertex::VertexDataset;
26use anyhow::{Result, anyhow};
27use arrow_array::{Array, Float32Array, TimestampNanosecondArray, UInt64Array};
28use object_store::ObjectStore;
29#[cfg(feature = "lance-backend")]
30use object_store::local::LocalFileSystem;
31use parking_lot::RwLock;
32use std::collections::{HashMap, HashSet};
33use std::sync::{Arc, Mutex};
34use std::time::{Duration, SystemTime, UNIX_EPOCH};
35use tracing::warn;
36use uni_common::config::UniConfig;
37#[cfg(feature = "lance-backend")]
38use uni_common::core::id::UniId;
39use uni_common::core::id::{Eid, Vid};
40#[cfg(feature = "lance-backend")]
41use uni_common::core::schema::IndexDefinition;
42use uni_common::core::schema::{DistanceMetric, SchemaManager};
43use uni_common::sync::acquire_mutex;
44
45use crate::snapshot::manager::SnapshotManager;
46use crate::storage::IndexManager;
47use crate::storage::adjacency_manager::AdjacencyManager;
48use crate::storage::resilient_store::ResilientObjectStore;
49
50use uni_common::core::snapshot::SnapshotManifest;
51
52use uni_common::graph::simple_graph::Direction as GraphDirection;
53
54struct EdgeState {
56 neighbor: Vid,
57 version: u64,
58 deleted: bool,
59}
60
61pub struct StorageManager {
62 base_uri: String,
63 store: Arc<dyn ObjectStore>,
64 schema_manager: Arc<SchemaManager>,
65 snapshot_manager: Arc<SnapshotManager>,
66 adjacency_manager: Arc<AdjacencyManager>,
67 pub config: UniConfig,
68 pub compaction_status: Arc<Mutex<CompactionStatus>>,
69 pub flush_in_progress: std::sync::atomic::AtomicUsize,
74 pinned_snapshot: Option<SnapshotManifest>,
76 pinned_version_hwm: Option<u64>,
85 fork_scope: Option<Arc<crate::fork::ForkScope>>,
93 backend: Arc<dyn StorageBackend>,
95 vid_labels_index: Arc<parking_lot::RwLock<crate::storage::vid_labels::VidLabelsIndex>>,
103}
104
105pub struct FlushInProgressGuard {
114 storage: Arc<StorageManager>,
115}
116
117impl FlushInProgressGuard {
118 pub fn new(storage: &Arc<StorageManager>) -> Self {
119 storage
120 .flush_in_progress
121 .fetch_add(1, std::sync::atomic::Ordering::AcqRel);
122 Self {
123 storage: storage.clone(),
124 }
125 }
126}
127
128impl Drop for FlushInProgressGuard {
129 fn drop(&mut self) {
130 self.storage
132 .flush_in_progress
133 .fetch_sub(1, std::sync::atomic::Ordering::AcqRel);
134 }
135}
136
137fn is_lance_conflict(err: &anyhow::Error) -> bool {
143 let msg = err.to_string();
144 msg.contains("Incompatible transaction") || msg.contains("conflict")
145}
146
147async fn retry_on_lance_conflict<F, Fut>(mut op: F) -> anyhow::Result<()>
152where
153 F: FnMut() -> Fut,
154 Fut: std::future::Future<Output = anyhow::Result<()>>,
155{
156 for attempt in 0u32..10 {
157 match op().await {
158 Ok(()) => return Ok(()),
159 Err(e) => {
160 if !is_lance_conflict(&e) || attempt == 9 {
161 return Err(e);
162 }
163 let backoff_ms = 1u64 << attempt;
164 tokio::time::sleep(std::time::Duration::from_millis(backoff_ms)).await;
165 }
166 }
167 }
168 unreachable!("retry loop exits via Ok or Err")
169}
170
171pub async fn merge_insert_batch_with_lance_conflict_retry(
180 backend: &dyn crate::backend::StorageBackend,
181 table_name: &str,
182 batch: arrow_array::RecordBatch,
183 on: &[&str],
184) -> anyhow::Result<()> {
185 retry_on_lance_conflict(|| async {
186 let exists = backend.table_exists(table_name).await?;
187 if !exists {
188 anyhow::bail!(
189 "merge_insert target table '{}' does not exist (partial writes \
190 require the row to already be present; CREATE goes through Append)",
191 table_name
192 );
193 }
194 backend
195 .merge_insert(table_name, on, vec![batch.clone()])
196 .await
197 })
198 .await
199}
200
201pub async fn write_batch_with_lance_conflict_retry(
210 backend: &dyn crate::backend::StorageBackend,
211 table_name: &str,
212 batch: arrow_array::RecordBatch,
213) -> anyhow::Result<()> {
214 use crate::backend::types::WriteMode;
215 retry_on_lance_conflict(|| async {
216 let exists = backend.table_exists(table_name).await?;
217 if exists {
218 backend
219 .write(table_name, vec![batch.clone()], WriteMode::Append)
220 .await
221 } else {
222 backend.create_table(table_name, vec![batch.clone()]).await
223 }
224 })
225 .await
226}
227
228struct CompactionGuard {
230 status: Arc<Mutex<CompactionStatus>>,
231}
232
233impl CompactionGuard {
234 fn new(status: Arc<Mutex<CompactionStatus>>) -> Option<Self> {
235 let mut s = acquire_mutex(&status, "compaction_status").ok()?;
236 if s.compaction_in_progress {
237 return None;
238 }
239 s.compaction_in_progress = true;
240 Some(Self {
241 status: status.clone(),
242 })
243 }
244}
245
246impl Drop for CompactionGuard {
247 fn drop(&mut self) {
248 match uni_common::sync::acquire_mutex(&self.status, "compaction_status") {
251 Ok(mut s) => {
252 s.compaction_in_progress = false;
253 s.last_compaction = Some(std::time::SystemTime::now());
254 }
255 Err(e) => {
256 log::error!(
260 "CompactionGuard drop failed to acquire poisoned lock: {}. \
261 Compaction status may be inconsistent. Issue #18/#150",
262 e
263 );
264 }
265 }
266 }
267}
268
269impl StorageManager {
270 pub async fn new_with_backend(
272 base_uri: &str,
273 store: Arc<dyn ObjectStore>,
274 backend: Arc<dyn StorageBackend>,
275 schema_manager: Arc<SchemaManager>,
276 config: UniConfig,
277 ) -> Result<Self> {
278 let resilient_store: Arc<dyn ObjectStore> = Arc::new(ResilientObjectStore::new(
279 store,
280 config.object_store.clone(),
281 ));
282
283 let snapshot_manager = Arc::new(SnapshotManager::new(resilient_store.clone()));
284
285 Self::recover_all_staging_tables(backend.as_ref(), &schema_manager).await?;
287
288 let mut sm = Self {
289 base_uri: base_uri.to_string(),
290 store: resilient_store,
291 schema_manager,
292 snapshot_manager,
293 adjacency_manager: Arc::new(AdjacencyManager::new(config.cache_size)),
294 config,
295 compaction_status: Arc::new(Mutex::new(CompactionStatus::default())),
296 flush_in_progress: std::sync::atomic::AtomicUsize::new(0),
297 pinned_snapshot: None,
298 pinned_version_hwm: None,
299 fork_scope: None,
300 backend,
301 vid_labels_index: Arc::new(parking_lot::RwLock::new(
302 crate::storage::vid_labels::VidLabelsIndex::new(),
303 )),
304 };
305
306 if let Err(e) = sm.rebuild_vid_labels_index().await {
310 warn!(
311 "Failed to rebuild VidLabelsIndex on startup: {}. Falling back to storage queries.",
312 e
313 );
314 }
315
316 Ok(sm)
317 }
318
319 #[cfg(feature = "lance-backend")]
321 pub async fn new(base_uri: &str, schema_manager: Arc<SchemaManager>) -> Result<Self> {
322 Self::new_with_config(base_uri, schema_manager, UniConfig::default()).await
323 }
324
325 #[cfg(feature = "lance-backend")]
327 pub async fn new_with_cache(
328 base_uri: &str,
329 schema_manager: Arc<SchemaManager>,
330 adjacency_cache_size: usize,
331 ) -> Result<Self> {
332 let config = UniConfig {
333 cache_size: adjacency_cache_size,
334 ..Default::default()
335 };
336 Self::new_with_config(base_uri, schema_manager, config).await
337 }
338
339 #[cfg(feature = "lance-backend")]
341 pub async fn new_with_config(
342 base_uri: &str,
343 schema_manager: Arc<SchemaManager>,
344 config: UniConfig,
345 ) -> Result<Self> {
346 let store = Self::build_store_from_uri(base_uri)?;
347 Self::new_with_store_and_config(base_uri, store, schema_manager, config).await
348 }
349
350 #[cfg(feature = "lance-backend")]
352 pub async fn new_with_store_and_config(
353 base_uri: &str,
354 store: Arc<dyn ObjectStore>,
355 schema_manager: Arc<SchemaManager>,
356 config: UniConfig,
357 ) -> Result<Self> {
358 Self::new_with_store_and_storage_options(base_uri, store, schema_manager, config, None)
359 .await
360 }
361
362 #[cfg(feature = "lance-backend")]
364 pub async fn new_with_store_and_storage_options(
365 base_uri: &str,
366 store: Arc<dyn ObjectStore>,
367 schema_manager: Arc<SchemaManager>,
368 config: UniConfig,
369 lancedb_storage_options: Option<HashMap<String, String>>,
370 ) -> Result<Self> {
371 let backend = Arc::new(LanceDbBackend::connect(base_uri, lancedb_storage_options).await?);
372 Self::new_with_backend(base_uri, store, backend, schema_manager, config).await
373 }
374
375 async fn recover_all_staging_tables(
380 backend: &dyn StorageBackend,
381 schema_manager: &SchemaManager,
382 ) -> Result<()> {
383 let schema = schema_manager.schema();
384
385 backend
387 .recover_staging(table_names::main_vertex_table_name())
388 .await?;
389 backend
390 .recover_staging(table_names::main_edge_table_name())
391 .await?;
392
393 for label in schema.labels.keys() {
395 let name = table_names::vertex_table_name(label);
396 backend.recover_staging(&name).await?;
397 }
398
399 for edge_type in schema.edge_types.keys() {
401 for direction in &["fwd", "bwd"] {
402 let delta_name = table_names::delta_table_name(edge_type, direction);
404 backend.recover_staging(&delta_name).await?;
405
406 for _label in schema.labels.keys() {
408 let adj_name = table_names::adjacency_table_name(edge_type, direction);
409 backend.recover_staging(&adj_name).await?;
410 }
411 }
412 }
413
414 Ok(())
415 }
416
417 #[cfg(feature = "lance-backend")]
418 fn build_store_from_uri(base_uri: &str) -> Result<Arc<dyn ObjectStore>> {
419 if base_uri.contains("://") {
420 let parsed = url::Url::parse(base_uri).map_err(|e| anyhow!("Invalid base URI: {e}"))?;
421 let (store, _path) = object_store::parse_url(&parsed)
422 .map_err(|e| anyhow!("Failed to parse object store URL: {e}"))?;
423 Ok(Arc::from(store))
424 } else {
425 std::fs::create_dir_all(base_uri)?;
427 Ok(Arc::new(LocalFileSystem::new_with_prefix(base_uri)?))
428 }
429 }
430
431 pub fn local_fs_root(&self) -> Option<std::path::PathBuf> {
437 if self.base_uri.contains("://") {
438 None
439 } else {
440 Some(std::path::PathBuf::from(&self.base_uri))
441 }
442 }
443
444 pub fn pinned(&self, snapshot: SnapshotManifest) -> Self {
445 Self {
453 base_uri: self.base_uri.clone(),
454 store: self.store.clone(),
455 schema_manager: self.schema_manager.clone(),
456 snapshot_manager: self.snapshot_manager.clone(),
457 adjacency_manager: Arc::new(AdjacencyManager::new(self.adjacency_manager.max_bytes())),
461 config: self.config.clone(),
462 compaction_status: Arc::new(Mutex::new(CompactionStatus::default())),
463 flush_in_progress: std::sync::atomic::AtomicUsize::new(0),
464 pinned_snapshot: Some(snapshot),
465 pinned_version_hwm: None,
466 fork_scope: self.fork_scope.clone(),
467 backend: self.backend.clone(),
468 vid_labels_index: Arc::new(parking_lot::RwLock::new(
474 self.vid_labels_index.read().clone(),
475 )),
476 }
477 }
478
479 pub fn pinned_at_version(&self, hwm: u64) -> Self {
498 Self {
499 base_uri: self.base_uri.clone(),
500 store: self.store.clone(),
501 schema_manager: self.schema_manager.clone(),
502 snapshot_manager: self.snapshot_manager.clone(),
503 adjacency_manager: self.adjacency_manager.clone(),
504 config: self.config.clone(),
505 compaction_status: Arc::new(Mutex::new(CompactionStatus::default())),
506 flush_in_progress: std::sync::atomic::AtomicUsize::new(0),
507 pinned_snapshot: None,
508 pinned_version_hwm: Some(hwm),
509 fork_scope: self.fork_scope.clone(),
510 backend: self.backend.clone(),
511 vid_labels_index: Arc::new(parking_lot::RwLock::new(
517 self.vid_labels_index.read().clone(),
518 )),
519 }
520 }
521
522 pub fn at_fork(&self, scope: Arc<crate::fork::ForkScope>) -> Self {
537 self.at_fork_with_schema(scope, self.schema_manager.clone())
538 }
539
540 pub fn at_fork_with_schema(
548 &self,
549 scope: Arc<crate::fork::ForkScope>,
550 merged_schema: Arc<SchemaManager>,
551 ) -> Self {
552 debug_assert!(
553 self.pinned_snapshot.is_none(),
554 "forking a pinned StorageManager is unsupported in Phase 1"
555 );
556 let branched_backend: Arc<dyn StorageBackend> = Arc::new(
557 crate::backend::branched::BranchedBackend::new(self.backend.clone(), scope.clone()),
558 );
559 let snapshot_manager = Arc::new(SnapshotManager::new_for_fork(
564 self.store.clone(),
565 scope.fork_id(),
566 ));
567 Self {
568 base_uri: self.base_uri.clone(),
569 store: self.store.clone(),
570 schema_manager: merged_schema,
571 snapshot_manager,
572 adjacency_manager: Arc::new(AdjacencyManager::new(self.adjacency_manager.max_bytes())),
573 config: self.config.clone(),
574 compaction_status: Arc::new(Mutex::new(CompactionStatus::default())),
575 flush_in_progress: std::sync::atomic::AtomicUsize::new(0),
576 pinned_snapshot: None,
577 pinned_version_hwm: None,
578 fork_scope: Some(scope),
579 backend: branched_backend,
580 vid_labels_index: Arc::new(parking_lot::RwLock::new(
586 self.vid_labels_index.read().clone(),
587 )),
588 }
589 }
590
591 pub fn fork_scope(&self) -> Option<&Arc<crate::fork::ForkScope>> {
593 self.fork_scope.as_ref()
594 }
595
596 #[must_use]
607 pub fn fork_index_exists(
608 &self,
609 label: &str,
610 column: &str,
611 ) -> Option<crate::fork::ForkLocalIndexKind> {
612 self.fork_scope
613 .as_ref()
614 .and_then(|s| s.fork_local_index(label, column))
615 }
616
617 pub fn base_uri(&self) -> &str {
620 &self.base_uri
621 }
622
623 pub fn get_edge_version_by_id(&self, edge_type_id: u32) -> Option<u64> {
624 let schema = self.schema_manager.schema();
625 let name = schema.edge_type_name_by_id(edge_type_id)?;
626 self.pinned_snapshot
627 .as_ref()
628 .and_then(|s| s.edges.get(name).map(|es| es.lance_version))
629 .filter(|v| *v != 0)
634 }
635
636 pub fn version_high_water_mark(&self) -> Option<u64> {
644 self.pinned_snapshot
645 .as_ref()
646 .map(|s| s.version_high_water_mark)
647 .or(self.pinned_version_hwm)
648 }
649
650 pub fn snapshot_version_hwm(&self) -> Option<u64> {
662 self.pinned_snapshot
663 .as_ref()
664 .map(|s| s.version_high_water_mark)
665 }
666
667 pub fn apply_version_filter(&self, base_filter: String) -> String {
672 if let Some(hwm) = self.version_high_water_mark() {
673 format!("({}) AND (_version <= {})", base_filter, hwm)
674 } else {
675 base_filter
676 }
677 }
678
679 fn build_active_filter(user_filter: Option<&str>) -> String {
682 match user_filter {
683 Some(expr) => format!("({}) AND (_deleted = false)", expr),
684 None => "_deleted = false".to_string(),
685 }
686 }
687
688 pub fn store(&self) -> Arc<dyn ObjectStore> {
689 self.store.clone()
690 }
691
692 pub fn compaction_status(
698 &self,
699 ) -> Result<CompactionStatus, uni_common::sync::LockPoisonedError> {
700 let guard = uni_common::sync::acquire_mutex(&self.compaction_status, "compaction_status")?;
701 Ok(guard.clone())
702 }
703
704 pub async fn compact(&self) -> Result<CompactionStats> {
705 let start = std::time::Instant::now();
707 let schema = self.schema_manager.schema();
708 let mut files_compacted = 0;
709
710 for label in schema.labels.keys() {
711 let name = table_names::vertex_table_name(label);
712 if self.backend.table_exists(&name).await? {
713 self.backend.optimize_table(&name).await?;
714 files_compacted += 1;
715 self.backend.invalidate_cache(&name);
716 }
717 }
718
719 Ok(CompactionStats {
720 files_compacted,
721 bytes_before: 0,
722 bytes_after: 0,
723 duration: start.elapsed(),
724 crdt_merges: 0,
725 })
726 }
727
728 pub async fn compact_label(&self, label: &str) -> Result<CompactionStats> {
729 let _guard = CompactionGuard::new(self.compaction_status.clone())
730 .ok_or_else(|| anyhow!("Compaction already in progress"))?;
731
732 let start = std::time::Instant::now();
733 let name = table_names::vertex_table_name(label);
734
735 if self.backend.table_exists(&name).await? {
736 self.backend.optimize_table(&name).await?;
737 self.backend.invalidate_cache(&name);
738 }
739
740 Ok(CompactionStats {
741 files_compacted: 1,
742 bytes_before: 0,
743 bytes_after: 0,
744 duration: start.elapsed(),
745 crdt_merges: 0,
746 })
747 }
748
749 pub async fn compact_edge_type(&self, edge_type: &str) -> Result<CompactionStats> {
750 let _guard = CompactionGuard::new(self.compaction_status.clone())
751 .ok_or_else(|| anyhow!("Compaction already in progress"))?;
752
753 let start = std::time::Instant::now();
754 let mut files_compacted = 0;
755
756 for dir in ["fwd", "bwd"] {
757 let name = table_names::delta_table_name(edge_type, dir);
758 if self.backend.table_exists(&name).await? {
759 self.backend.optimize_table(&name).await?;
760 files_compacted += 1;
761 }
762 }
763
764 Ok(CompactionStats {
765 files_compacted,
766 bytes_before: 0,
767 bytes_after: 0,
768 duration: start.elapsed(),
769 crdt_merges: 0,
770 })
771 }
772
773 pub async fn wait_for_compaction(&self) -> Result<()> {
774 loop {
775 let in_progress = {
776 acquire_mutex(&self.compaction_status, "compaction_status")?.compaction_in_progress
777 };
778 if !in_progress {
779 return Ok(());
780 }
781 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
782 }
783 }
784
785 pub fn start_background_compaction(
786 self: Arc<Self>,
787 mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
788 ) -> tokio::task::JoinHandle<()> {
789 if !self.config.compaction.enabled {
790 return tokio::spawn(async {});
791 }
792
793 tokio::spawn(async move {
794 let start = tokio::time::Instant::now() + self.config.compaction.check_interval;
800 let mut interval =
801 tokio::time::interval_at(start, self.config.compaction.check_interval);
802
803 loop {
804 tokio::select! {
805 _ = interval.tick() => {
806 if let Err(e) = self.update_compaction_status().await {
807 log::error!("Failed to update compaction status: {}", e);
808 continue;
809 }
810
811 if let Some(task) = self.pick_compaction_task() {
812 log::info!("Triggering background compaction: {:?}", task);
813 if let Err(e) = Self::execute_compaction(Arc::clone(&self), task).await {
814 log::error!("Compaction failed: {}", e);
815 }
816 }
817 }
818 _ = shutdown_rx.recv() => {
819 log::info!("Background compaction shutting down");
820 let _ = self.wait_for_compaction().await;
821 break;
822 }
823 }
824 }
825 })
826 }
827
828 async fn update_compaction_status(&self) -> Result<()> {
829 let schema = self.schema_manager.schema();
830 let backend = self.backend.as_ref();
831 let mut total_rows: usize = 0;
832 let mut oldest_ts: Option<i64> = None;
833
834 for name in schema.edge_types.keys() {
835 for dir in ["fwd", "bwd"] {
836 let tbl_name = table_names::delta_table_name(name, dir);
837 if !backend.table_exists(&tbl_name).await.unwrap_or(false) {
838 continue;
839 }
840 let row_count = backend.count_rows(&tbl_name, None).await.unwrap_or(0);
841 if row_count == 0 {
842 continue;
843 }
844 total_rows += row_count;
845
846 let request =
848 ScanRequest::all(&tbl_name).with_columns(vec!["_created_at".to_string()]);
849 let Ok(batches) = backend.scan(request).await else {
850 continue;
851 };
852 for batch in batches {
853 let Some(col) = batch
854 .column_by_name("_created_at")
855 .and_then(|c| c.as_any().downcast_ref::<TimestampNanosecondArray>())
856 else {
857 continue;
858 };
859 for i in 0..col.len() {
860 if !col.is_null(i) {
861 let ts = col.value(i);
862 oldest_ts = Some(oldest_ts.map_or(ts, |prev| prev.min(ts)));
863 }
864 }
865 }
866 }
867 }
868
869 let oldest_l1_age = oldest_ts
870 .and_then(|ts| {
871 let created = UNIX_EPOCH + Duration::from_nanos(ts as u64);
872 SystemTime::now().duration_since(created).ok()
873 })
874 .unwrap_or(Duration::ZERO);
875
876 let mut status = acquire_mutex(&self.compaction_status, "compaction_status")?;
877 status.l1_size_bytes = (total_rows * ENTRY_SIZE_ESTIMATE) as u64;
880 status.oldest_l1_age = oldest_l1_age;
881 Ok(())
882 }
883
884 fn pick_compaction_task(&self) -> Option<CompactionTask> {
885 let status = acquire_mutex(&self.compaction_status, "compaction_status").ok()?;
886
887 if status.l1_runs >= self.config.compaction.max_l1_runs {
888 return Some(CompactionTask::ByRunCount);
889 }
890 if status.l1_size_bytes >= self.config.compaction.max_l1_size_bytes {
891 return Some(CompactionTask::BySize);
892 }
893 if status.oldest_l1_age >= self.config.compaction.max_l1_age
894 && status.oldest_l1_age > Duration::ZERO
895 {
896 return Some(CompactionTask::ByAge);
897 }
898
899 None
900 }
901
902 async fn try_optimize_table(backend: &dyn StorageBackend, table_name: &str) -> bool {
904 if let Err(e) = backend.optimize_table(table_name).await {
905 log::warn!("Failed to optimize table {}: {}", table_name, e);
906 return false;
907 }
908 true
909 }
910
911 pub fn trigger_async_compaction(self: &Arc<Self>) {
914 let this = Arc::clone(self);
915 tokio::spawn(async move {
916 if let Err(e) = Self::execute_compaction(this, CompactionTask::ByRunCount).await {
917 log::debug!("Post-flush compaction skipped: {}", e);
919 }
920 });
921 }
922
923 pub(crate) async fn execute_compaction(
924 this: Arc<Self>,
925 _task: CompactionTask,
926 ) -> Result<CompactionStats> {
927 let start = std::time::Instant::now();
928 let _guard = CompactionGuard::new(this.compaction_status.clone())
929 .ok_or_else(|| anyhow!("Compaction already in progress"))?;
930
931 let schema = this.schema_manager.schema();
932 let mut files_compacted = 0;
933
934 let compactor = Compactor::new(Arc::clone(&this));
937 let compaction_results = compactor.compact_all().await.unwrap_or_else(|e| {
938 log::error!(
939 "Semantic compaction failed (continuing with backend optimize): {}",
940 e
941 );
942 Vec::new()
943 });
944
945 let am = this.adjacency_manager();
947 for info in &compaction_results {
948 let direction = match info.direction.as_str() {
949 "fwd" => Direction::Outgoing,
950 "bwd" => Direction::Incoming,
951 _ => continue,
952 };
953 if let Some(etid) = schema.edge_type_id_unified_case_insensitive(&info.edge_type)
954 && let Err(e) = am.warm(&this, etid, direction, None).await
955 {
956 log::warn!(
957 "Failed to re-warm adjacency for {}/{}: {}",
958 info.edge_type,
959 info.direction,
960 e
961 );
962 }
963 }
964
965 let backend = this.backend.as_ref();
967
968 for name in schema.edge_types.keys() {
970 for dir in ["fwd", "bwd"] {
971 let delta = table_names::delta_table_name(name, dir);
972 if Self::try_optimize_table(backend, &delta).await {
973 files_compacted += 1;
974 }
975 let adj = table_names::adjacency_table_name(name, dir);
976 if Self::try_optimize_table(backend, &adj).await {
977 files_compacted += 1;
978 }
979 }
980 }
981
982 for label in schema.labels.keys() {
984 let tbl = table_names::vertex_table_name(label);
985 if Self::try_optimize_table(backend, &tbl).await {
986 files_compacted += 1;
987 backend.invalidate_cache(&tbl);
988 }
989 }
990
991 for tbl in [
993 table_names::main_vertex_table_name(),
994 table_names::main_edge_table_name(),
995 ] {
996 if Self::try_optimize_table(backend, tbl).await {
997 files_compacted += 1;
998 }
999 }
1000
1001 {
1002 let mut status = acquire_mutex(&this.compaction_status, "compaction_status")?;
1003 status.total_compactions += 1;
1004 status.l1_runs = 0; }
1006
1007 Ok(CompactionStats {
1008 files_compacted,
1009 bytes_before: 0,
1010 bytes_after: 0,
1011 duration: start.elapsed(),
1012 crdt_merges: 0,
1013 })
1014 }
1015
1016 pub fn invalidate_table_cache(&self, label: &str) {
1020 let name = table_names::vertex_table_name(label);
1021 self.backend.invalidate_cache(&name);
1022 }
1023
1024 pub fn base_path(&self) -> &str {
1025 &self.base_uri
1026 }
1027
1028 pub fn schema_manager(&self) -> &SchemaManager {
1029 &self.schema_manager
1030 }
1031
1032 pub fn schema_manager_arc(&self) -> Arc<SchemaManager> {
1033 self.schema_manager.clone()
1034 }
1035
1036 #[must_use]
1044 pub fn schema_manager_arc_ref(&self) -> &Arc<SchemaManager> {
1045 &self.schema_manager
1046 }
1047
1048 pub fn adjacency_manager(&self) -> Arc<AdjacencyManager> {
1050 Arc::clone(&self.adjacency_manager)
1051 }
1052
1053 pub async fn warm_adjacency(
1058 &self,
1059 edge_type_id: u32,
1060 direction: crate::storage::direction::Direction,
1061 version: Option<u64>,
1062 ) -> anyhow::Result<()> {
1063 self.adjacency_manager
1064 .warm(self, edge_type_id, direction, version)
1065 .await
1066 }
1067
1068 pub async fn warm_adjacency_coalesced(
1073 &self,
1074 edge_type_id: u32,
1075 direction: crate::storage::direction::Direction,
1076 version: Option<u64>,
1077 ) -> anyhow::Result<()> {
1078 self.adjacency_manager
1079 .warm_coalesced(self, edge_type_id, direction, version)
1080 .await
1081 }
1082
1083 pub fn has_adjacency_csr(
1085 &self,
1086 edge_type_id: u32,
1087 direction: crate::storage::direction::Direction,
1088 ) -> bool {
1089 self.adjacency_manager.has_csr(edge_type_id, direction)
1090 }
1091
1092 pub fn get_neighbors_at_version(
1094 &self,
1095 vid: uni_common::core::id::Vid,
1096 edge_type: u32,
1097 direction: crate::storage::direction::Direction,
1098 version: u64,
1099 ) -> Vec<(uni_common::core::id::Vid, uni_common::core::id::Eid)> {
1100 self.adjacency_manager
1101 .get_neighbors_at_version(vid, edge_type, direction, version)
1102 }
1103
1104 pub fn backend(&self) -> &dyn StorageBackend {
1106 self.backend.as_ref()
1107 }
1108
1109 pub fn backend_arc(&self) -> Arc<dyn StorageBackend> {
1111 self.backend.clone()
1112 }
1113
1114 async fn rebuild_vid_labels_index(&mut self) -> Result<()> {
1119 use crate::storage::vid_labels::VidLabelsIndex;
1120
1121 let backend = self.backend.as_ref();
1122 let vtable = table_names::main_vertex_table_name();
1123
1124 if !backend.table_exists(vtable).await.unwrap_or(false) {
1126 self.vid_labels_index = Arc::new(parking_lot::RwLock::new(VidLabelsIndex::new()));
1127 return Ok(());
1128 }
1129
1130 let request = ScanRequest::all(vtable)
1132 .with_filter("_deleted = false")
1133 .with_limit(100_000);
1134 let batches = backend
1135 .scan(request)
1136 .await
1137 .map_err(|e| anyhow!("Failed to query main vertex table: {}", e))?;
1138
1139 let mut index = VidLabelsIndex::new();
1140 for batch in batches {
1141 let vid_col = batch
1142 .column_by_name("_vid")
1143 .ok_or_else(|| anyhow!("Missing _vid column"))?
1144 .as_any()
1145 .downcast_ref::<UInt64Array>()
1146 .ok_or_else(|| anyhow!("Invalid _vid column type"))?;
1147
1148 let labels_col = batch
1149 .column_by_name("labels")
1150 .ok_or_else(|| anyhow!("Missing labels column"))?
1151 .as_any()
1152 .downcast_ref::<arrow_array::ListArray>()
1153 .ok_or_else(|| anyhow!("Invalid labels column type"))?;
1154
1155 for row_idx in 0..batch.num_rows() {
1156 let vid = Vid::from(vid_col.value(row_idx));
1157 let labels_array = labels_col.value(row_idx);
1158 let labels_str_array = labels_array
1159 .as_any()
1160 .downcast_ref::<arrow_array::StringArray>()
1161 .ok_or_else(|| anyhow!("Invalid labels array element type"))?;
1162
1163 let labels: Vec<String> = (0..labels_str_array.len())
1164 .map(|i| labels_str_array.value(i).to_string())
1165 .collect();
1166
1167 index.insert(vid, labels);
1168 }
1169 }
1170
1171 self.vid_labels_index = Arc::new(parking_lot::RwLock::new(index));
1172 Ok(())
1173 }
1174
1175 pub fn get_labels_from_index(&self, vid: Vid) -> Option<Vec<String>> {
1180 let index = self.vid_labels_index.read();
1181 index.get_labels(vid).map(|labels| labels.to_vec())
1182 }
1183
1184 pub fn update_vid_labels_index(&self, vid: Vid, labels: Vec<String>) {
1186 let mut index = self.vid_labels_index.write();
1187 index.insert(vid, labels);
1188 }
1189
1190 pub fn remove_from_vid_labels_index(&self, vid: Vid) {
1192 let mut index = self.vid_labels_index.write();
1193 index.remove_vid(vid);
1194 }
1195
1196 pub async fn load_subgraph_cached(
1197 &self,
1198 start_vids: &[Vid],
1199 edge_types: &[u32],
1200 max_hops: usize,
1201 direction: GraphDirection,
1202 _l0: Option<Arc<RwLock<L0Buffer>>>,
1203 ) -> Result<WorkingGraph> {
1204 let mut graph = WorkingGraph::new();
1205
1206 let dir = match direction {
1207 GraphDirection::Outgoing => crate::storage::direction::Direction::Outgoing,
1208 GraphDirection::Incoming => crate::storage::direction::Direction::Incoming,
1209 };
1210
1211 let neighbor_is_dst = matches!(direction, GraphDirection::Outgoing);
1212
1213 let mut frontier: Vec<Vid> = start_vids.to_vec();
1215 let mut visited: HashSet<Vid> = HashSet::new();
1216
1217 for &vid in start_vids {
1219 graph.add_vertex(vid);
1220 }
1221
1222 for _hop in 0..max_hops {
1223 let mut next_frontier = HashSet::new();
1224
1225 for &vid in &frontier {
1226 if visited.contains(&vid) {
1227 continue;
1228 }
1229 visited.insert(vid);
1230 graph.add_vertex(vid);
1231
1232 for &etype_id in edge_types {
1233 let edge_ver = self.snapshot_version_hwm();
1237 self.adjacency_manager
1238 .warm_coalesced(self, etype_id, dir, edge_ver)
1239 .await?;
1240
1241 let edges = self.adjacency_manager.get_neighbors(vid, etype_id, dir);
1243
1244 for (neighbor_vid, eid) in edges {
1245 graph.add_vertex(neighbor_vid);
1246 if !visited.contains(&neighbor_vid) {
1247 next_frontier.insert(neighbor_vid);
1248 }
1249
1250 if neighbor_is_dst {
1251 graph.add_edge(vid, neighbor_vid, eid, etype_id);
1252 } else {
1253 graph.add_edge(neighbor_vid, vid, eid, etype_id);
1254 }
1255 }
1256 }
1257 }
1258 frontier = next_frontier.into_iter().collect();
1259
1260 if frontier.is_empty() {
1262 break;
1263 }
1264 }
1265
1266 Ok(graph)
1267 }
1268
1269 pub fn snapshot_manager(&self) -> &SnapshotManager {
1270 &self.snapshot_manager
1271 }
1272
1273 pub fn index_manager(&self) -> IndexManager {
1274 IndexManager::new(&self.base_uri, self.schema_manager.clone())
1275 }
1276
1277 pub async fn scan_vertex_table(
1286 &self,
1287 label: &str,
1288 columns: &[&str],
1289 additional_filter: Option<&str>,
1290 ) -> Result<Option<arrow_array::RecordBatch>> {
1291 let backend = self.backend();
1292 let table_name = table_names::vertex_table_name(label);
1293
1294 if !backend.table_exists(&table_name).await.unwrap_or(false) {
1295 return Ok(None);
1296 }
1297
1298 let actual_columns =
1300 if let Some(table_schema) = backend.get_table_schema(&table_name).await? {
1301 let table_field_names: HashSet<&str> = table_schema
1302 .fields()
1303 .iter()
1304 .map(|f| f.name().as_str())
1305 .collect();
1306 columns
1307 .iter()
1308 .copied()
1309 .filter(|c| table_field_names.contains(c))
1310 .map(|s| s.to_string())
1311 .collect::<Vec<_>>()
1312 } else {
1313 return Ok(None);
1314 };
1315
1316 let filter = match (self.version_high_water_mark(), additional_filter) {
1318 (Some(hwm), Some(f)) => Some(format!("_version <= {} AND ({})", hwm, f)),
1319 (Some(hwm), None) => Some(format!("_version <= {}", hwm)),
1320 (None, Some(f)) => Some(f.to_string()),
1321 (None, None) => None,
1322 };
1323
1324 let mut request = ScanRequest::all(&table_name).with_columns(actual_columns);
1325 if let Some(f) = filter {
1326 request = request.with_filter(f);
1327 }
1328
1329 let batches = backend.scan(request).await?;
1336 if batches.is_empty() {
1337 Ok(None)
1338 } else {
1339 Ok(Some(arrow::compute::concat_batches(
1340 &batches[0].schema(),
1341 &batches,
1342 )?))
1343 }
1344 }
1345
1346 pub async fn scan_delta_table(
1349 &self,
1350 edge_type: &str,
1351 direction: &str,
1352 columns: &[&str],
1353 additional_filter: Option<&str>,
1354 ) -> Result<Option<arrow_array::RecordBatch>> {
1355 let edge_hwm = self.snapshot_version_hwm();
1361 let backend = self.backend();
1362 let table_name = table_names::delta_table_name(edge_type, direction);
1363
1364 if !backend.table_exists(&table_name).await.unwrap_or(false) {
1365 return Ok(None);
1366 }
1367
1368 let actual_columns =
1370 if let Some(table_schema) = backend.get_table_schema(&table_name).await? {
1371 let table_field_names: HashSet<&str> = table_schema
1372 .fields()
1373 .iter()
1374 .map(|f| f.name().as_str())
1375 .collect();
1376 columns
1377 .iter()
1378 .copied()
1379 .filter(|c| table_field_names.contains(c))
1380 .map(|s| s.to_string())
1381 .collect::<Vec<_>>()
1382 } else {
1383 return Ok(None);
1384 };
1385
1386 let filter = match (edge_hwm, additional_filter) {
1387 (Some(hwm), Some(f)) => Some(format!("_version <= {} AND ({})", hwm, f)),
1388 (Some(hwm), None) => Some(format!("_version <= {}", hwm)),
1389 (None, Some(f)) => Some(f.to_string()),
1390 (None, None) => None,
1391 };
1392
1393 let mut request = ScanRequest::all(&table_name).with_columns(actual_columns);
1394 if let Some(f) = filter {
1395 request = request.with_filter(f);
1396 }
1397
1398 let batches = backend.scan(request).await?;
1405 if batches.is_empty() {
1406 Ok(None)
1407 } else {
1408 Ok(Some(arrow::compute::concat_batches(
1409 &batches[0].schema(),
1410 &batches,
1411 )?))
1412 }
1413 }
1414
1415 pub async fn scan_main_vertex_table(
1420 &self,
1421 columns: &[&str],
1422 filter: Option<&str>,
1423 ) -> Result<Option<arrow_array::RecordBatch>> {
1424 let backend = self.backend();
1425 let table_name = table_names::main_vertex_table_name();
1426
1427 if !backend.table_exists(table_name).await.unwrap_or(false) {
1428 return Ok(None);
1429 }
1430
1431 let full_filter = match (self.version_high_water_mark(), filter) {
1433 (Some(hwm), Some(f)) => Some(format!("_version <= {} AND ({})", hwm, f)),
1434 (Some(hwm), None) => Some(format!("_version <= {}", hwm)),
1435 (None, Some(f)) => Some(f.to_string()),
1436 (None, None) => None,
1437 };
1438
1439 let request = ScanRequest::all(table_name)
1440 .with_columns(columns.iter().map(|s| s.to_string()).collect());
1441 let request = match full_filter.as_deref() {
1442 Some(f) => request.with_filter(f),
1443 None => request,
1444 };
1445
1446 let batches = backend.scan(request).await?;
1453 if batches.is_empty() {
1454 Ok(None)
1455 } else {
1456 Ok(Some(arrow::compute::concat_batches(
1457 &batches[0].schema(),
1458 &batches,
1459 )?))
1460 }
1461 }
1462
1463 pub async fn scan_main_edge_table_stream(
1465 &self,
1466 filter: Option<&str>,
1467 ) -> Result<
1468 Option<
1469 std::pin::Pin<Box<dyn futures::Stream<Item = Result<arrow_array::RecordBatch>> + Send>>,
1470 >,
1471 > {
1472 let backend = self.backend();
1473 let table_name = table_names::main_edge_table_name();
1474
1475 if !backend.table_exists(table_name).await.unwrap_or(false) {
1476 return Ok(None);
1477 }
1478
1479 let mut request = ScanRequest::all(table_name);
1480 if let Some(f) = filter {
1481 request = request.with_filter(f);
1482 }
1483
1484 let stream = backend.scan_stream(request).await?;
1485 Ok(Some(stream))
1486 }
1487
1488 pub async fn scan_vertex_table_stream(
1490 &self,
1491 label: &str,
1492 ) -> Result<
1493 Option<
1494 std::pin::Pin<Box<dyn futures::Stream<Item = Result<arrow_array::RecordBatch>> + Send>>,
1495 >,
1496 > {
1497 let backend = self.backend();
1498 let table_name = table_names::vertex_table_name(label);
1499
1500 if !backend.table_exists(&table_name).await.unwrap_or(false) {
1501 return Ok(None);
1502 }
1503
1504 let stream = backend.scan_stream(ScanRequest::all(&table_name)).await?;
1505 Ok(Some(stream))
1506 }
1507
1508 pub async fn find_vertex_by_ext_id(&self, ext_id: &str) -> Result<Option<Vid>> {
1510 MainVertexDataset::find_by_ext_id(self.backend(), ext_id, self.version_high_water_mark())
1511 .await
1512 }
1513
1514 pub async fn get_vertex_ext_ids(&self) -> Result<std::collections::HashMap<Vid, String>> {
1526 use arrow_array::StringArray;
1527 let backend = self.backend.as_ref();
1528 let vtable = table_names::main_vertex_table_name();
1529 let mut out = std::collections::HashMap::new();
1530 if !backend.table_exists(vtable).await.unwrap_or(false) {
1531 return Ok(out);
1532 }
1533 let request = ScanRequest::all(vtable)
1534 .with_filter("_deleted = false")
1535 .with_columns(vec!["_vid".to_string(), "ext_id".to_string()]);
1536 let batches = backend
1537 .scan(request)
1538 .await
1539 .map_err(|e| anyhow!("get_vertex_ext_ids: {}", e))?;
1540 for batch in batches {
1541 let vids = batch
1542 .column_by_name("_vid")
1543 .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
1544 .ok_or_else(|| anyhow!("get_vertex_ext_ids: missing/invalid _vid column"))?;
1545 let exts = batch
1546 .column_by_name("ext_id")
1547 .and_then(|c| c.as_any().downcast_ref::<StringArray>())
1548 .ok_or_else(|| anyhow!("get_vertex_ext_ids: missing/invalid ext_id column"))?;
1549 for i in 0..batch.num_rows() {
1550 if exts.is_null(i) {
1551 continue;
1552 }
1553 let ext = exts.value(i);
1554 if !ext.is_empty() {
1555 out.insert(Vid::from(vids.value(i)), ext.to_string());
1556 }
1557 }
1558 }
1559 Ok(out)
1560 }
1561
1562 pub async fn find_vertex_labels_by_vid(&self, vid: Vid) -> Result<Option<Vec<String>>> {
1564 MainVertexDataset::find_labels_by_vid(self.backend(), vid, self.version_high_water_mark())
1565 .await
1566 }
1567
1568 pub async fn find_edges_by_type_names(
1571 &self,
1572 type_names: &[&str],
1573 endpoint_filter: Option<(crate::storage::main_edge::EndpointSide, &[Vid])>,
1574 ) -> Result<Vec<(Eid, Vid, Vid, String, uni_common::Properties)>> {
1575 MainEdgeDataset::find_edges_by_type_names(self.backend(), type_names, endpoint_filter).await
1576 }
1577
1578 pub async fn scan_vertex_candidates(
1580 &self,
1581 label: &str,
1582 filter: Option<&str>,
1583 ) -> Result<Vec<Vid>> {
1584 let backend = self.backend();
1585 let table_name = table_names::vertex_table_name(label);
1586
1587 if !backend.table_exists(&table_name).await.unwrap_or(false) {
1588 return Ok(Vec::new());
1589 }
1590
1591 let full_filter = match filter {
1592 Some(f) => format!("_deleted = false AND ({})", f),
1593 None => "_deleted = false".to_string(),
1594 };
1595
1596 let request = ScanRequest::all(&table_name)
1597 .with_filter(full_filter)
1598 .with_columns(vec!["_vid".to_string()]);
1599
1600 let batches = backend.scan(request).await?;
1601
1602 let mut vids = Vec::new();
1603 for batch in batches {
1604 let vid_col = batch
1605 .column_by_name("_vid")
1606 .ok_or(anyhow!("Missing _vid"))?
1607 .as_any()
1608 .downcast_ref::<UInt64Array>()
1609 .ok_or(anyhow!("Invalid _vid"))?;
1610 for i in 0..batch.num_rows() {
1611 vids.push(Vid::from(vid_col.value(i)));
1612 }
1613 }
1614 Ok(vids)
1615 }
1616
1617 pub fn vertex_dataset(&self, label: &str) -> Result<VertexDataset> {
1618 let schema = self.schema_manager.schema();
1619 let label_meta = schema
1620 .labels
1621 .get(label)
1622 .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
1623 let key = format!("vertices_{label}");
1624 match self.fork_branch_for(&key) {
1625 Some(branch) => Ok(VertexDataset::new_branched(
1626 &self.base_uri,
1627 label,
1628 label_meta.id,
1629 branch,
1630 )),
1631 None => Ok(VertexDataset::new(&self.base_uri, label, label_meta.id)),
1632 }
1633 }
1634
1635 #[cfg(feature = "lance-backend")]
1636 pub fn edge_dataset(
1637 &self,
1638 edge_type: &str,
1639 src_label: &str,
1640 dst_label: &str,
1641 ) -> Result<EdgeDataset> {
1642 let key = format!("edges_{edge_type}");
1643 match self.fork_branch_for(&key) {
1644 Some(branch) => Ok(EdgeDataset::new_branched(
1645 &self.base_uri,
1646 edge_type,
1647 src_label,
1648 dst_label,
1649 branch,
1650 )),
1651 None => Ok(EdgeDataset::new(
1652 &self.base_uri,
1653 edge_type,
1654 src_label,
1655 dst_label,
1656 )),
1657 }
1658 }
1659
1660 pub fn delta_dataset(&self, edge_type: &str, direction: &str) -> Result<DeltaDataset> {
1661 let key = format!("deltas_{edge_type}_{direction}");
1662 match self.fork_branch_for(&key) {
1663 Some(branch) => Ok(DeltaDataset::new_branched(
1664 &self.base_uri,
1665 edge_type,
1666 direction,
1667 branch,
1668 )),
1669 None => Ok(DeltaDataset::new(&self.base_uri, edge_type, direction)),
1670 }
1671 }
1672
1673 pub fn adjacency_dataset(
1674 &self,
1675 edge_type: &str,
1676 label: &str,
1677 direction: &str,
1678 ) -> Result<AdjacencyDataset> {
1679 let key = crate::backend::table_names::adjacency_table_name(edge_type, direction);
1686 match self.fork_branch_for(&key) {
1687 Some(branch) => Ok(AdjacencyDataset::new_branched(
1688 &self.base_uri,
1689 edge_type,
1690 label,
1691 direction,
1692 branch,
1693 )),
1694 None => Ok(AdjacencyDataset::new(
1695 &self.base_uri,
1696 edge_type,
1697 label,
1698 direction,
1699 )),
1700 }
1701 }
1702
1703 fn fork_branch_for(&self, dataset_name: &str) -> Option<String> {
1707 self.fork_scope
1708 .as_ref()
1709 .and_then(|s| s.branch_for(dataset_name))
1710 }
1711
1712 pub fn main_vertex_dataset(&self) -> MainVertexDataset {
1717 MainVertexDataset::new(&self.base_uri)
1718 }
1719
1720 pub fn main_edge_dataset(&self) -> MainEdgeDataset {
1725 MainEdgeDataset::new(&self.base_uri)
1726 }
1727
1728 #[cfg(feature = "lance-backend")]
1729 pub fn uid_index(&self, label: &str) -> Result<UidIndex> {
1730 Ok(UidIndex::new(&self.base_uri, label))
1731 }
1732
1733 #[cfg(feature = "lance-backend")]
1734 pub async fn inverted_index(&self, label: &str, property: &str) -> Result<InvertedIndex> {
1735 let schema = self.schema_manager.schema();
1736 let config = schema
1737 .indexes
1738 .iter()
1739 .find_map(|idx| match idx {
1740 IndexDefinition::Inverted(cfg)
1741 if cfg.label == label && cfg.property == property =>
1742 {
1743 Some(cfg.clone())
1744 }
1745 _ => None,
1746 })
1747 .ok_or_else(|| anyhow!("Inverted index not found for {}.{}", label, property))?;
1748
1749 InvertedIndex::new(&self.base_uri, config).await
1750 }
1751
1752 pub async fn vector_search(
1753 &self,
1754 label: &str,
1755 property: &str,
1756 query: &[f32],
1757 k: usize,
1758 filter: Option<&str>,
1759 ctx: Option<&QueryContext>,
1760 ) -> Result<Vec<(Vid, f32)>> {
1761 use crate::backend::types::{DistanceMetric as BackendMetric, FilterExpr};
1762
1763 let schema = self.schema_manager.schema();
1765 let metric = schema
1766 .vector_index_for_property(label, property)
1767 .map(|config| config.metric.clone())
1768 .unwrap_or(DistanceMetric::L2);
1769
1770 let backend = self.backend.as_ref();
1771 let name = table_names::vertex_table_name(label);
1772
1773 let mut results = Vec::new();
1774
1775 if backend.table_exists(&name).await.unwrap_or(false) {
1777 let backend_metric = match &metric {
1778 DistanceMetric::L2 => BackendMetric::L2,
1779 DistanceMetric::Cosine => BackendMetric::Cosine,
1780 DistanceMetric::Dot => BackendMetric::Dot,
1781 _ => BackendMetric::L2,
1782 };
1783
1784 let mut filter_parts = vec![Self::build_active_filter(filter)];
1786 if ctx.is_some()
1787 && let Some(hwm) = self.version_high_water_mark()
1788 {
1789 filter_parts.push(format!("_version <= {}", hwm));
1790 }
1791 let combined_filter = FilterExpr::Sql(filter_parts.join(" AND "));
1792
1793 let batches = backend
1794 .vector_search(&name, property, query, k, backend_metric, combined_filter)
1795 .await?;
1796
1797 results = extract_vid_score_pairs(&batches, "_vid", "_distance")?;
1798 }
1799
1800 if let Some(qctx) = ctx {
1802 merge_l0_into_vector_results(&mut results, qctx, label, property, query, k, &metric);
1803 }
1804
1805 Ok(results)
1806 }
1807
1808 pub async fn fts_search(
1824 &self,
1825 label: &str,
1826 property: &str,
1827 query: &str,
1828 k: usize,
1829 filter: Option<&str>,
1830 ctx: Option<&QueryContext>,
1831 ) -> Result<Vec<(Vid, f32)>> {
1832 use crate::backend::types::FilterExpr;
1833
1834 let backend = self.backend.as_ref();
1835 let name = table_names::vertex_table_name(label);
1836
1837 let mut results = if backend.table_exists(&name).await.unwrap_or(false) {
1838 let mut filter_parts = vec![Self::build_active_filter(filter)];
1840 if ctx.is_some()
1841 && let Some(hwm) = self.version_high_water_mark()
1842 {
1843 filter_parts.push(format!("_version <= {}", hwm));
1844 }
1845 let combined_filter = FilterExpr::Sql(filter_parts.join(" AND "));
1846
1847 let batches = backend
1848 .full_text_search(&name, property, query, k, combined_filter)
1849 .await?;
1850
1851 let mut fts_results = extract_vid_score_pairs(&batches, "_vid", "_score")?;
1852 fts_results.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
1854 fts_results
1855 } else {
1856 Vec::new()
1857 };
1858
1859 if let Some(qctx) = ctx {
1861 merge_l0_into_fts_results(&mut results, qctx, label, property, query, k);
1862 }
1863
1864 Ok(results)
1865 }
1866
1867 #[cfg(feature = "lance-backend")]
1868 pub async fn get_vertex_by_uid(&self, uid: &UniId, label: &str) -> Result<Option<Vid>> {
1869 let index = self.uid_index(label)?;
1870 index.get_vid(uid).await
1871 }
1872
1873 #[cfg(feature = "lance-backend")]
1874 pub async fn insert_vertex_with_uid(&self, label: &str, vid: Vid, uid: UniId) -> Result<()> {
1875 let index = self.uid_index(label)?;
1876 index.write_mapping(&[(uid, vid)]).await
1877 }
1878
1879 pub async fn load_subgraph(
1880 &self,
1881 start_vids: &[Vid],
1882 edge_types: &[u32],
1883 max_hops: usize,
1884 direction: GraphDirection,
1885 l0: Option<&L0Buffer>,
1886 ) -> Result<WorkingGraph> {
1887 let mut graph = WorkingGraph::new();
1888 let schema = self.schema_manager.schema();
1889
1890 let label_map: HashMap<u16, String> = schema
1892 .labels
1893 .values()
1894 .map(|meta| {
1895 (
1896 meta.id,
1897 schema.label_name_by_id(meta.id).unwrap().to_owned(),
1898 )
1899 })
1900 .collect();
1901
1902 let edge_type_map: HashMap<u32, String> = schema
1903 .edge_types
1904 .values()
1905 .map(|meta| {
1906 (
1907 meta.id,
1908 schema.edge_type_name_by_id(meta.id).unwrap().to_owned(),
1909 )
1910 })
1911 .collect();
1912
1913 let target_edge_types: HashSet<u32> = edge_types.iter().cloned().collect();
1914
1915 let mut frontier: Vec<Vid> = start_vids.to_vec();
1917 let mut visited: HashSet<Vid> = HashSet::new();
1918
1919 for &vid in start_vids {
1921 graph.add_vertex(vid);
1922 }
1923
1924 for _hop in 0..max_hops {
1925 let mut next_frontier = HashSet::new();
1926
1927 for &vid in &frontier {
1928 if visited.contains(&vid) {
1929 continue;
1930 }
1931 visited.insert(vid);
1932 graph.add_vertex(vid);
1933
1934 for &etype_id in &target_edge_types {
1936 let etype_name = edge_type_map
1937 .get(&etype_id)
1938 .ok_or_else(|| anyhow!("Unknown edge type ID: {}", etype_id))?;
1939
1940 let (dir_str, neighbor_is_dst) = match direction {
1944 GraphDirection::Outgoing => ("fwd", true),
1945 GraphDirection::Incoming => ("bwd", false),
1946 };
1947
1948 let mut edges: HashMap<Eid, EdgeState> = HashMap::new();
1949
1950 let _edge_ver = self
1955 .pinned_snapshot
1956 .as_ref()
1957 .and_then(|s| s.edges.get(etype_name).map(|es| es.lance_version));
1958
1959 let backend = self.backend();
1961 for current_src_label in label_map.values() {
1962 let adj_ds =
1963 match self.adjacency_dataset(etype_name, current_src_label, dir_str) {
1964 Ok(ds) => ds,
1965 Err(_) => continue,
1966 };
1967 if let Some((neighbors, eids)) =
1968 adj_ds.read_adjacency_backend(backend, vid).await?
1969 {
1970 for (n, eid) in neighbors.into_iter().zip(eids) {
1971 edges.insert(
1972 eid,
1973 EdgeState {
1974 neighbor: n,
1975 version: 0,
1976 deleted: false,
1977 },
1978 );
1979 }
1980 break; }
1982 }
1983
1984 let delta_ds = self.delta_dataset(etype_name, dir_str)?;
1986 let delta_entries = delta_ds
1987 .read_deltas(backend, vid, &schema, self.snapshot_version_hwm())
1988 .await?;
1989 Self::apply_delta_to_edges(&mut edges, delta_entries, neighbor_is_dst);
1990
1991 if let Some(l0) = l0 {
1993 Self::apply_l0_to_edges(&mut edges, l0, vid, etype_id, direction);
1994 }
1995
1996 Self::add_edges_to_graph(
1998 &mut graph,
1999 edges,
2000 vid,
2001 etype_id,
2002 neighbor_is_dst,
2003 &visited,
2004 &mut next_frontier,
2005 );
2006 }
2007 }
2008 frontier = next_frontier.into_iter().collect();
2009
2010 if frontier.is_empty() {
2012 break;
2013 }
2014 }
2015
2016 Ok(graph)
2017 }
2018
2019 fn apply_delta_to_edges(
2021 edges: &mut HashMap<Eid, EdgeState>,
2022 delta_entries: Vec<crate::storage::delta::L1Entry>,
2023 neighbor_is_dst: bool,
2024 ) {
2025 for entry in delta_entries {
2026 let neighbor = if neighbor_is_dst {
2027 entry.dst_vid
2028 } else {
2029 entry.src_vid
2030 };
2031 let current_ver = edges.get(&entry.eid).map(|s| s.version).unwrap_or(0);
2032
2033 if entry.version > current_ver {
2034 edges.insert(
2035 entry.eid,
2036 EdgeState {
2037 neighbor,
2038 version: entry.version,
2039 deleted: matches!(entry.op, Op::Delete),
2040 },
2041 );
2042 }
2043 }
2044 }
2045
2046 fn apply_l0_to_edges(
2048 edges: &mut HashMap<Eid, EdgeState>,
2049 l0: &L0Buffer,
2050 vid: Vid,
2051 etype_id: u32,
2052 direction: GraphDirection,
2053 ) {
2054 let l0_neighbors = l0.get_neighbors(vid, etype_id, direction);
2055 for (neighbor, eid, ver) in l0_neighbors {
2056 let current_ver = edges.get(&eid).map(|s| s.version).unwrap_or(0);
2057 if ver > current_ver {
2058 edges.insert(
2059 eid,
2060 EdgeState {
2061 neighbor,
2062 version: ver,
2063 deleted: false,
2064 },
2065 );
2066 }
2067 }
2068
2069 for (eid, state) in edges.iter_mut() {
2071 if l0.is_tombstoned(*eid) {
2072 state.deleted = true;
2073 }
2074 }
2075 }
2076
2077 fn add_edges_to_graph(
2079 graph: &mut WorkingGraph,
2080 edges: HashMap<Eid, EdgeState>,
2081 vid: Vid,
2082 etype_id: u32,
2083 neighbor_is_dst: bool,
2084 visited: &HashSet<Vid>,
2085 next_frontier: &mut HashSet<Vid>,
2086 ) {
2087 for (eid, state) in edges {
2088 if state.deleted {
2089 continue;
2090 }
2091 graph.add_vertex(state.neighbor);
2092
2093 if !visited.contains(&state.neighbor) {
2094 next_frontier.insert(state.neighbor);
2095 }
2096
2097 if neighbor_is_dst {
2098 graph.add_edge(vid, state.neighbor, eid, etype_id);
2099 } else {
2100 graph.add_edge(state.neighbor, vid, eid, etype_id);
2101 }
2102 }
2103 }
2104}
2105
2106fn extract_vid_score_pairs(
2108 batches: &[arrow_array::RecordBatch],
2109 vid_column: &str,
2110 score_column: &str,
2111) -> Result<Vec<(Vid, f32)>> {
2112 let mut results = Vec::new();
2113 for batch in batches {
2114 let vid_col = batch
2115 .column_by_name(vid_column)
2116 .ok_or_else(|| anyhow!("Missing {} column", vid_column))?
2117 .as_any()
2118 .downcast_ref::<UInt64Array>()
2119 .ok_or_else(|| anyhow!("Invalid {} column type", vid_column))?;
2120
2121 let score_col = batch
2122 .column_by_name(score_column)
2123 .ok_or_else(|| anyhow!("Missing {} column", score_column))?
2124 .as_any()
2125 .downcast_ref::<Float32Array>()
2126 .ok_or_else(|| anyhow!("Invalid {} column type", score_column))?;
2127
2128 for i in 0..batch.num_rows() {
2129 results.push((Vid::from(vid_col.value(i)), score_col.value(i)));
2130 }
2131 }
2132 Ok(results)
2133}
2134
2135fn extract_embedding_from_props(
2140 props: &uni_common::Properties,
2141 property: &str,
2142) -> Option<Vec<f32>> {
2143 let arr = props.get(property)?.as_array()?;
2144 arr.iter().map(|v| v.as_f64().map(|f| f as f32)).collect()
2145}
2146
2147fn merge_l0_into_vector_results(
2157 results: &mut Vec<(Vid, f32)>,
2158 ctx: &QueryContext,
2159 label: &str,
2160 property: &str,
2161 query: &[f32],
2162 k: usize,
2163 metric: &DistanceMetric,
2164) {
2165 let mut buffers: Vec<Arc<parking_lot::RwLock<L0Buffer>>> =
2167 ctx.pending_flush_l0s.iter().map(Arc::clone).collect();
2168 buffers.push(Arc::clone(&ctx.l0));
2169 if let Some(ref txn) = ctx.transaction_l0 {
2170 buffers.push(Arc::clone(txn));
2171 }
2172
2173 let mut l0_candidates: HashMap<Vid, f32> = HashMap::new();
2175 let mut tombstoned: HashSet<Vid> = HashSet::new();
2177
2178 for buf_arc in &buffers {
2179 let buf = buf_arc.read();
2180
2181 for &vid in &buf.vertex_tombstones {
2183 tombstoned.insert(vid);
2184 }
2185
2186 for (&vid, labels) in &buf.vertex_labels {
2188 if !labels.iter().any(|l| l == label) {
2189 continue;
2190 }
2191 if let Some(props) = buf.vertex_properties.get(&vid)
2192 && let Some(emb) = extract_embedding_from_props(props, property)
2193 {
2194 if emb.len() != query.len() {
2195 continue; }
2197 let dist = metric.compute_distance(&emb, query);
2198 l0_candidates.insert(vid, dist);
2200 tombstoned.remove(&vid);
2202 }
2203 }
2204 }
2205
2206 if l0_candidates.is_empty() && tombstoned.is_empty() {
2208 return;
2209 }
2210
2211 results.retain(|(vid, _)| !tombstoned.contains(vid));
2213
2214 for (vid, dist) in &l0_candidates {
2216 if let Some(existing) = results.iter_mut().find(|(v, _)| v == vid) {
2217 existing.1 = *dist;
2218 } else {
2219 results.push((*vid, *dist));
2220 }
2221 }
2222
2223 results.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
2225 results.truncate(k);
2226}
2227
2228fn compute_text_relevance(query: &str, text: &str) -> f32 {
2233 let query_tokens: HashSet<String> =
2234 query.split_whitespace().map(|t| t.to_lowercase()).collect();
2235 if query_tokens.is_empty() {
2236 return 0.0;
2237 }
2238 let text_tokens: HashSet<String> = text.split_whitespace().map(|t| t.to_lowercase()).collect();
2239 let hits = query_tokens
2240 .iter()
2241 .filter(|t| text_tokens.contains(t.as_str()))
2242 .count();
2243 hits as f32 / query_tokens.len() as f32
2244}
2245
2246fn extract_text_from_props<'a>(
2248 props: &'a uni_common::Properties,
2249 property: &str,
2250) -> Option<&'a str> {
2251 props.get(property)?.as_str()
2252}
2253
2254fn merge_l0_into_fts_results(
2264 results: &mut Vec<(Vid, f32)>,
2265 ctx: &QueryContext,
2266 label: &str,
2267 property: &str,
2268 query: &str,
2269 k: usize,
2270) {
2271 let mut buffers: Vec<Arc<parking_lot::RwLock<L0Buffer>>> =
2273 ctx.pending_flush_l0s.iter().map(Arc::clone).collect();
2274 buffers.push(Arc::clone(&ctx.l0));
2275 if let Some(ref txn) = ctx.transaction_l0 {
2276 buffers.push(Arc::clone(txn));
2277 }
2278
2279 let mut l0_candidates: HashMap<Vid, f32> = HashMap::new();
2281 let mut tombstoned: HashSet<Vid> = HashSet::new();
2283
2284 for buf_arc in &buffers {
2285 let buf = buf_arc.read();
2286
2287 for &vid in &buf.vertex_tombstones {
2289 tombstoned.insert(vid);
2290 }
2291
2292 for (&vid, labels) in &buf.vertex_labels {
2294 if !labels.iter().any(|l| l == label) {
2295 continue;
2296 }
2297 if let Some(props) = buf.vertex_properties.get(&vid)
2298 && let Some(text) = extract_text_from_props(props, property)
2299 {
2300 let score = compute_text_relevance(query, text);
2301 if score > 0.0 {
2302 l0_candidates.insert(vid, score);
2304 }
2305 tombstoned.remove(&vid);
2307 }
2308 }
2309 }
2310
2311 if l0_candidates.is_empty() && tombstoned.is_empty() {
2313 return;
2314 }
2315
2316 results.retain(|(vid, _)| !tombstoned.contains(vid));
2318
2319 for (vid, score) in &l0_candidates {
2321 if let Some(existing) = results.iter_mut().find(|(v, _)| v == vid) {
2322 existing.1 = *score;
2323 } else {
2324 results.push((*vid, *score));
2325 }
2326 }
2327
2328 results.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
2330 results.truncate(k);
2331}