1use crate::backend::StorageBackend;
5#[cfg(feature = "lance-backend")]
6use crate::backend::lance::LanceDbBackend;
7use crate::backend::table_names;
8use crate::backend::types::ScanRequest;
9use crate::compaction::{CompactionStats, CompactionStatus, CompactionTask};
10use crate::runtime::WorkingGraph;
11use crate::runtime::context::QueryContext;
12use crate::runtime::l0::L0Buffer;
13use crate::storage::adjacency::AdjacencyDataset;
14use crate::storage::compaction::Compactor;
15use crate::storage::delta::{DeltaDataset, ENTRY_SIZE_ESTIMATE, Op};
16use crate::storage::direction::Direction;
17#[cfg(feature = "lance-backend")]
18use crate::storage::edge::EdgeDataset;
19#[cfg(feature = "lance-backend")]
20use crate::storage::index::UidIndex;
21#[cfg(feature = "lance-backend")]
22use crate::storage::inverted_index::InvertedIndex;
23use crate::storage::main_edge::MainEdgeDataset;
24use crate::storage::main_vertex::MainVertexDataset;
25use crate::storage::vertex::VertexDataset;
26use anyhow::{Result, anyhow};
27use arrow_array::{Array, Float32Array, TimestampNanosecondArray, UInt64Array};
28use object_store::ObjectStore;
29#[cfg(feature = "lance-backend")]
30use object_store::local::LocalFileSystem;
31use parking_lot::RwLock;
32use std::collections::{HashMap, HashSet};
33use std::sync::{Arc, Mutex};
34use std::time::{Duration, SystemTime, UNIX_EPOCH};
35use tracing::warn;
36use uni_common::config::UniConfig;
37#[cfg(feature = "lance-backend")]
38use uni_common::core::id::UniId;
39use uni_common::core::id::{Eid, Vid};
40#[cfg(feature = "lance-backend")]
41use uni_common::core::schema::IndexDefinition;
42use uni_common::core::schema::{DistanceMetric, SchemaManager};
43use uni_common::sync::acquire_mutex;
44
45use crate::snapshot::manager::SnapshotManager;
46use crate::storage::IndexManager;
47use crate::storage::adjacency_manager::AdjacencyManager;
48use crate::storage::resilient_store::ResilientObjectStore;
49
50use uni_common::core::snapshot::SnapshotManifest;
51
52use uni_common::graph::simple_graph::Direction as GraphDirection;
53
54struct EdgeState {
56 neighbor: Vid,
57 version: u64,
58 deleted: bool,
59}
60
61pub struct StorageManager {
62 base_uri: String,
63 store: Arc<dyn ObjectStore>,
64 schema_manager: Arc<SchemaManager>,
65 snapshot_manager: Arc<SnapshotManager>,
66 adjacency_manager: Arc<AdjacencyManager>,
67 pub config: UniConfig,
68 pub compaction_status: Arc<Mutex<CompactionStatus>>,
69 pub flush_in_progress: std::sync::atomic::AtomicUsize,
74 pinned_snapshot: Option<SnapshotManifest>,
76 pinned_version_hwm: Option<u64>,
85 fork_scope: Option<Arc<crate::fork::ForkScope>>,
93 backend: Arc<dyn StorageBackend>,
95 vid_labels_index: Option<Arc<parking_lot::RwLock<crate::storage::vid_labels::VidLabelsIndex>>>,
97}
98
99pub struct FlushInProgressGuard {
108 storage: Arc<StorageManager>,
109}
110
111impl FlushInProgressGuard {
112 pub fn new(storage: &Arc<StorageManager>) -> Self {
113 storage
114 .flush_in_progress
115 .fetch_add(1, std::sync::atomic::Ordering::AcqRel);
116 Self {
117 storage: storage.clone(),
118 }
119 }
120}
121
122impl Drop for FlushInProgressGuard {
123 fn drop(&mut self) {
124 self.storage
126 .flush_in_progress
127 .fetch_sub(1, std::sync::atomic::Ordering::AcqRel);
128 }
129}
130
131fn is_lance_conflict(err: &anyhow::Error) -> bool {
137 let msg = err.to_string();
138 msg.contains("Incompatible transaction") || msg.contains("conflict")
139}
140
141async fn retry_on_lance_conflict<F, Fut>(mut op: F) -> anyhow::Result<()>
146where
147 F: FnMut() -> Fut,
148 Fut: std::future::Future<Output = anyhow::Result<()>>,
149{
150 for attempt in 0u32..10 {
151 match op().await {
152 Ok(()) => return Ok(()),
153 Err(e) => {
154 if !is_lance_conflict(&e) || attempt == 9 {
155 return Err(e);
156 }
157 let backoff_ms = 1u64 << attempt;
158 tokio::time::sleep(std::time::Duration::from_millis(backoff_ms)).await;
159 }
160 }
161 }
162 unreachable!("retry loop exits via Ok or Err")
163}
164
165pub async fn merge_insert_batch_with_lance_conflict_retry(
174 backend: &dyn crate::backend::StorageBackend,
175 table_name: &str,
176 batch: arrow_array::RecordBatch,
177 on: &[&str],
178) -> anyhow::Result<()> {
179 retry_on_lance_conflict(|| async {
180 let exists = backend.table_exists(table_name).await?;
181 if !exists {
182 anyhow::bail!(
183 "merge_insert target table '{}' does not exist (partial writes \
184 require the row to already be present; CREATE goes through Append)",
185 table_name
186 );
187 }
188 backend
189 .merge_insert(table_name, on, vec![batch.clone()])
190 .await
191 })
192 .await
193}
194
195pub async fn write_batch_with_lance_conflict_retry(
204 backend: &dyn crate::backend::StorageBackend,
205 table_name: &str,
206 batch: arrow_array::RecordBatch,
207) -> anyhow::Result<()> {
208 use crate::backend::types::WriteMode;
209 retry_on_lance_conflict(|| async {
210 let exists = backend.table_exists(table_name).await?;
211 if exists {
212 backend
213 .write(table_name, vec![batch.clone()], WriteMode::Append)
214 .await
215 } else {
216 backend.create_table(table_name, vec![batch.clone()]).await
217 }
218 })
219 .await
220}
221
222struct CompactionGuard {
224 status: Arc<Mutex<CompactionStatus>>,
225}
226
227impl CompactionGuard {
228 fn new(status: Arc<Mutex<CompactionStatus>>) -> Option<Self> {
229 let mut s = acquire_mutex(&status, "compaction_status").ok()?;
230 if s.compaction_in_progress {
231 return None;
232 }
233 s.compaction_in_progress = true;
234 Some(Self {
235 status: status.clone(),
236 })
237 }
238}
239
240impl Drop for CompactionGuard {
241 fn drop(&mut self) {
242 match uni_common::sync::acquire_mutex(&self.status, "compaction_status") {
245 Ok(mut s) => {
246 s.compaction_in_progress = false;
247 s.last_compaction = Some(std::time::SystemTime::now());
248 }
249 Err(e) => {
250 log::error!(
254 "CompactionGuard drop failed to acquire poisoned lock: {}. \
255 Compaction status may be inconsistent. Issue #18/#150",
256 e
257 );
258 }
259 }
260 }
261}
262
263impl StorageManager {
264 pub async fn new_with_backend(
266 base_uri: &str,
267 store: Arc<dyn ObjectStore>,
268 backend: Arc<dyn StorageBackend>,
269 schema_manager: Arc<SchemaManager>,
270 config: UniConfig,
271 ) -> Result<Self> {
272 let resilient_store: Arc<dyn ObjectStore> = Arc::new(ResilientObjectStore::new(
273 store,
274 config.object_store.clone(),
275 ));
276
277 let snapshot_manager = Arc::new(SnapshotManager::new(resilient_store.clone()));
278
279 Self::recover_all_staging_tables(backend.as_ref(), &schema_manager).await?;
281
282 let mut sm = Self {
283 base_uri: base_uri.to_string(),
284 store: resilient_store,
285 schema_manager,
286 snapshot_manager,
287 adjacency_manager: Arc::new(AdjacencyManager::new(config.cache_size)),
288 config,
289 compaction_status: Arc::new(Mutex::new(CompactionStatus::default())),
290 flush_in_progress: std::sync::atomic::AtomicUsize::new(0),
291 pinned_snapshot: None,
292 pinned_version_hwm: None,
293 fork_scope: None,
294 backend,
295 vid_labels_index: None,
296 };
297
298 if sm.config.enable_vid_labels_index
300 && let Err(e) = sm.rebuild_vid_labels_index().await
301 {
302 warn!(
303 "Failed to rebuild VidLabelsIndex on startup: {}. Falling back to storage queries.",
304 e
305 );
306 }
307
308 Ok(sm)
309 }
310
311 #[cfg(feature = "lance-backend")]
313 pub async fn new(base_uri: &str, schema_manager: Arc<SchemaManager>) -> Result<Self> {
314 Self::new_with_config(base_uri, schema_manager, UniConfig::default()).await
315 }
316
317 #[cfg(feature = "lance-backend")]
319 pub async fn new_with_cache(
320 base_uri: &str,
321 schema_manager: Arc<SchemaManager>,
322 adjacency_cache_size: usize,
323 ) -> Result<Self> {
324 let config = UniConfig {
325 cache_size: adjacency_cache_size,
326 ..Default::default()
327 };
328 Self::new_with_config(base_uri, schema_manager, config).await
329 }
330
331 #[cfg(feature = "lance-backend")]
333 pub async fn new_with_config(
334 base_uri: &str,
335 schema_manager: Arc<SchemaManager>,
336 config: UniConfig,
337 ) -> Result<Self> {
338 let store = Self::build_store_from_uri(base_uri)?;
339 Self::new_with_store_and_config(base_uri, store, schema_manager, config).await
340 }
341
342 #[cfg(feature = "lance-backend")]
344 pub async fn new_with_store_and_config(
345 base_uri: &str,
346 store: Arc<dyn ObjectStore>,
347 schema_manager: Arc<SchemaManager>,
348 config: UniConfig,
349 ) -> Result<Self> {
350 Self::new_with_store_and_storage_options(base_uri, store, schema_manager, config, None)
351 .await
352 }
353
354 #[cfg(feature = "lance-backend")]
356 pub async fn new_with_store_and_storage_options(
357 base_uri: &str,
358 store: Arc<dyn ObjectStore>,
359 schema_manager: Arc<SchemaManager>,
360 config: UniConfig,
361 lancedb_storage_options: Option<HashMap<String, String>>,
362 ) -> Result<Self> {
363 let backend = Arc::new(LanceDbBackend::connect(base_uri, lancedb_storage_options).await?);
364 Self::new_with_backend(base_uri, store, backend, schema_manager, config).await
365 }
366
367 async fn recover_all_staging_tables(
372 backend: &dyn StorageBackend,
373 schema_manager: &SchemaManager,
374 ) -> Result<()> {
375 let schema = schema_manager.schema();
376
377 backend
379 .recover_staging(table_names::main_vertex_table_name())
380 .await?;
381 backend
382 .recover_staging(table_names::main_edge_table_name())
383 .await?;
384
385 for label in schema.labels.keys() {
387 let name = table_names::vertex_table_name(label);
388 backend.recover_staging(&name).await?;
389 }
390
391 for edge_type in schema.edge_types.keys() {
393 for direction in &["fwd", "bwd"] {
394 let delta_name = table_names::delta_table_name(edge_type, direction);
396 backend.recover_staging(&delta_name).await?;
397
398 for _label in schema.labels.keys() {
400 let adj_name = table_names::adjacency_table_name(edge_type, direction);
401 backend.recover_staging(&adj_name).await?;
402 }
403 }
404 }
405
406 Ok(())
407 }
408
409 #[cfg(feature = "lance-backend")]
410 fn build_store_from_uri(base_uri: &str) -> Result<Arc<dyn ObjectStore>> {
411 if base_uri.contains("://") {
412 let parsed = url::Url::parse(base_uri).map_err(|e| anyhow!("Invalid base URI: {e}"))?;
413 let (store, _path) = object_store::parse_url(&parsed)
414 .map_err(|e| anyhow!("Failed to parse object store URL: {e}"))?;
415 Ok(Arc::from(store))
416 } else {
417 std::fs::create_dir_all(base_uri)?;
419 Ok(Arc::new(LocalFileSystem::new_with_prefix(base_uri)?))
420 }
421 }
422
423 pub fn local_fs_root(&self) -> Option<std::path::PathBuf> {
429 if self.base_uri.contains("://") {
430 None
431 } else {
432 Some(std::path::PathBuf::from(&self.base_uri))
433 }
434 }
435
436 pub fn pinned(&self, snapshot: SnapshotManifest) -> Self {
437 Self {
445 base_uri: self.base_uri.clone(),
446 store: self.store.clone(),
447 schema_manager: self.schema_manager.clone(),
448 snapshot_manager: self.snapshot_manager.clone(),
449 adjacency_manager: Arc::new(AdjacencyManager::new(self.adjacency_manager.max_bytes())),
453 config: self.config.clone(),
454 compaction_status: Arc::new(Mutex::new(CompactionStatus::default())),
455 flush_in_progress: std::sync::atomic::AtomicUsize::new(0),
456 pinned_snapshot: Some(snapshot),
457 pinned_version_hwm: None,
458 fork_scope: self.fork_scope.clone(),
459 backend: self.backend.clone(),
460 vid_labels_index: self.vid_labels_index.clone(),
461 }
462 }
463
464 pub fn pinned_at_version(&self, hwm: u64) -> Self {
483 Self {
484 base_uri: self.base_uri.clone(),
485 store: self.store.clone(),
486 schema_manager: self.schema_manager.clone(),
487 snapshot_manager: self.snapshot_manager.clone(),
488 adjacency_manager: self.adjacency_manager.clone(),
489 config: self.config.clone(),
490 compaction_status: Arc::new(Mutex::new(CompactionStatus::default())),
491 flush_in_progress: std::sync::atomic::AtomicUsize::new(0),
492 pinned_snapshot: None,
493 pinned_version_hwm: Some(hwm),
494 fork_scope: self.fork_scope.clone(),
495 backend: self.backend.clone(),
496 vid_labels_index: self.vid_labels_index.clone(),
497 }
498 }
499
500 pub fn at_fork(&self, scope: Arc<crate::fork::ForkScope>) -> Self {
515 self.at_fork_with_schema(scope, self.schema_manager.clone())
516 }
517
518 pub fn at_fork_with_schema(
526 &self,
527 scope: Arc<crate::fork::ForkScope>,
528 merged_schema: Arc<SchemaManager>,
529 ) -> Self {
530 debug_assert!(
531 self.pinned_snapshot.is_none(),
532 "forking a pinned StorageManager is unsupported in Phase 1"
533 );
534 let branched_backend: Arc<dyn StorageBackend> = Arc::new(
535 crate::backend::branched::BranchedBackend::new(self.backend.clone(), scope.clone()),
536 );
537 Self {
538 base_uri: self.base_uri.clone(),
539 store: self.store.clone(),
540 schema_manager: merged_schema,
541 snapshot_manager: self.snapshot_manager.clone(),
542 adjacency_manager: Arc::new(AdjacencyManager::new(self.adjacency_manager.max_bytes())),
543 config: self.config.clone(),
544 compaction_status: Arc::new(Mutex::new(CompactionStatus::default())),
545 flush_in_progress: std::sync::atomic::AtomicUsize::new(0),
546 pinned_snapshot: None,
547 pinned_version_hwm: None,
548 fork_scope: Some(scope),
549 backend: branched_backend,
550 vid_labels_index: self.vid_labels_index.clone(),
551 }
552 }
553
554 pub fn fork_scope(&self) -> Option<&Arc<crate::fork::ForkScope>> {
556 self.fork_scope.as_ref()
557 }
558
559 #[must_use]
570 pub fn fork_index_exists(
571 &self,
572 label: &str,
573 column: &str,
574 ) -> Option<crate::fork::ForkLocalIndexKind> {
575 self.fork_scope
576 .as_ref()
577 .and_then(|s| s.fork_local_index(label, column))
578 }
579
580 pub fn base_uri(&self) -> &str {
583 &self.base_uri
584 }
585
586 pub fn get_edge_version_by_id(&self, edge_type_id: u32) -> Option<u64> {
587 let schema = self.schema_manager.schema();
588 let name = schema.edge_type_name_by_id(edge_type_id)?;
589 self.pinned_snapshot
590 .as_ref()
591 .and_then(|s| s.edges.get(name).map(|es| es.lance_version))
592 .filter(|v| *v != 0)
597 }
598
599 pub fn version_high_water_mark(&self) -> Option<u64> {
607 self.pinned_snapshot
608 .as_ref()
609 .map(|s| s.version_high_water_mark)
610 .or(self.pinned_version_hwm)
611 }
612
613 pub fn snapshot_version_hwm(&self) -> Option<u64> {
625 self.pinned_snapshot
626 .as_ref()
627 .map(|s| s.version_high_water_mark)
628 }
629
630 pub fn apply_version_filter(&self, base_filter: String) -> String {
635 if let Some(hwm) = self.version_high_water_mark() {
636 format!("({}) AND (_version <= {})", base_filter, hwm)
637 } else {
638 base_filter
639 }
640 }
641
642 fn build_active_filter(user_filter: Option<&str>) -> String {
645 match user_filter {
646 Some(expr) => format!("({}) AND (_deleted = false)", expr),
647 None => "_deleted = false".to_string(),
648 }
649 }
650
651 pub fn store(&self) -> Arc<dyn ObjectStore> {
652 self.store.clone()
653 }
654
655 pub fn compaction_status(
661 &self,
662 ) -> Result<CompactionStatus, uni_common::sync::LockPoisonedError> {
663 let guard = uni_common::sync::acquire_mutex(&self.compaction_status, "compaction_status")?;
664 Ok(guard.clone())
665 }
666
667 pub async fn compact(&self) -> Result<CompactionStats> {
668 let start = std::time::Instant::now();
670 let schema = self.schema_manager.schema();
671 let mut files_compacted = 0;
672
673 for label in schema.labels.keys() {
674 let name = table_names::vertex_table_name(label);
675 if self.backend.table_exists(&name).await? {
676 self.backend.optimize_table(&name).await?;
677 files_compacted += 1;
678 self.backend.invalidate_cache(&name);
679 }
680 }
681
682 Ok(CompactionStats {
683 files_compacted,
684 bytes_before: 0,
685 bytes_after: 0,
686 duration: start.elapsed(),
687 crdt_merges: 0,
688 })
689 }
690
691 pub async fn compact_label(&self, label: &str) -> Result<CompactionStats> {
692 let _guard = CompactionGuard::new(self.compaction_status.clone())
693 .ok_or_else(|| anyhow!("Compaction already in progress"))?;
694
695 let start = std::time::Instant::now();
696 let name = table_names::vertex_table_name(label);
697
698 if self.backend.table_exists(&name).await? {
699 self.backend.optimize_table(&name).await?;
700 self.backend.invalidate_cache(&name);
701 }
702
703 Ok(CompactionStats {
704 files_compacted: 1,
705 bytes_before: 0,
706 bytes_after: 0,
707 duration: start.elapsed(),
708 crdt_merges: 0,
709 })
710 }
711
712 pub async fn compact_edge_type(&self, edge_type: &str) -> Result<CompactionStats> {
713 let _guard = CompactionGuard::new(self.compaction_status.clone())
714 .ok_or_else(|| anyhow!("Compaction already in progress"))?;
715
716 let start = std::time::Instant::now();
717 let mut files_compacted = 0;
718
719 for dir in ["fwd", "bwd"] {
720 let name = table_names::delta_table_name(edge_type, dir);
721 if self.backend.table_exists(&name).await? {
722 self.backend.optimize_table(&name).await?;
723 files_compacted += 1;
724 }
725 }
726
727 Ok(CompactionStats {
728 files_compacted,
729 bytes_before: 0,
730 bytes_after: 0,
731 duration: start.elapsed(),
732 crdt_merges: 0,
733 })
734 }
735
736 pub async fn wait_for_compaction(&self) -> Result<()> {
737 loop {
738 let in_progress = {
739 acquire_mutex(&self.compaction_status, "compaction_status")?.compaction_in_progress
740 };
741 if !in_progress {
742 return Ok(());
743 }
744 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
745 }
746 }
747
748 pub fn start_background_compaction(
749 self: Arc<Self>,
750 mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
751 ) -> tokio::task::JoinHandle<()> {
752 if !self.config.compaction.enabled {
753 return tokio::spawn(async {});
754 }
755
756 tokio::spawn(async move {
757 let start = tokio::time::Instant::now() + self.config.compaction.check_interval;
763 let mut interval =
764 tokio::time::interval_at(start, self.config.compaction.check_interval);
765
766 loop {
767 tokio::select! {
768 _ = interval.tick() => {
769 if let Err(e) = self.update_compaction_status().await {
770 log::error!("Failed to update compaction status: {}", e);
771 continue;
772 }
773
774 if let Some(task) = self.pick_compaction_task() {
775 log::info!("Triggering background compaction: {:?}", task);
776 if let Err(e) = Self::execute_compaction(Arc::clone(&self), task).await {
777 log::error!("Compaction failed: {}", e);
778 }
779 }
780 }
781 _ = shutdown_rx.recv() => {
782 log::info!("Background compaction shutting down");
783 let _ = self.wait_for_compaction().await;
784 break;
785 }
786 }
787 }
788 })
789 }
790
791 async fn update_compaction_status(&self) -> Result<()> {
792 let schema = self.schema_manager.schema();
793 let backend = self.backend.as_ref();
794 let mut total_rows: usize = 0;
795 let mut oldest_ts: Option<i64> = None;
796
797 for name in schema.edge_types.keys() {
798 for dir in ["fwd", "bwd"] {
799 let tbl_name = table_names::delta_table_name(name, dir);
800 if !backend.table_exists(&tbl_name).await.unwrap_or(false) {
801 continue;
802 }
803 let row_count = backend.count_rows(&tbl_name, None).await.unwrap_or(0);
804 if row_count == 0 {
805 continue;
806 }
807 total_rows += row_count;
808
809 let request =
811 ScanRequest::all(&tbl_name).with_columns(vec!["_created_at".to_string()]);
812 let Ok(batches) = backend.scan(request).await else {
813 continue;
814 };
815 for batch in batches {
816 let Some(col) = batch
817 .column_by_name("_created_at")
818 .and_then(|c| c.as_any().downcast_ref::<TimestampNanosecondArray>())
819 else {
820 continue;
821 };
822 for i in 0..col.len() {
823 if !col.is_null(i) {
824 let ts = col.value(i);
825 oldest_ts = Some(oldest_ts.map_or(ts, |prev| prev.min(ts)));
826 }
827 }
828 }
829 }
830 }
831
832 let oldest_l1_age = oldest_ts
833 .and_then(|ts| {
834 let created = UNIX_EPOCH + Duration::from_nanos(ts as u64);
835 SystemTime::now().duration_since(created).ok()
836 })
837 .unwrap_or(Duration::ZERO);
838
839 let mut status = acquire_mutex(&self.compaction_status, "compaction_status")?;
840 status.l1_size_bytes = (total_rows * ENTRY_SIZE_ESTIMATE) as u64;
843 status.oldest_l1_age = oldest_l1_age;
844 Ok(())
845 }
846
847 fn pick_compaction_task(&self) -> Option<CompactionTask> {
848 let status = acquire_mutex(&self.compaction_status, "compaction_status").ok()?;
849
850 if status.l1_runs >= self.config.compaction.max_l1_runs {
851 return Some(CompactionTask::ByRunCount);
852 }
853 if status.l1_size_bytes >= self.config.compaction.max_l1_size_bytes {
854 return Some(CompactionTask::BySize);
855 }
856 if status.oldest_l1_age >= self.config.compaction.max_l1_age
857 && status.oldest_l1_age > Duration::ZERO
858 {
859 return Some(CompactionTask::ByAge);
860 }
861
862 None
863 }
864
865 async fn try_optimize_table(backend: &dyn StorageBackend, table_name: &str) -> bool {
867 if let Err(e) = backend.optimize_table(table_name).await {
868 log::warn!("Failed to optimize table {}: {}", table_name, e);
869 return false;
870 }
871 true
872 }
873
874 pub fn trigger_async_compaction(self: &Arc<Self>) {
877 let this = Arc::clone(self);
878 tokio::spawn(async move {
879 if let Err(e) = Self::execute_compaction(this, CompactionTask::ByRunCount).await {
880 log::debug!("Post-flush compaction skipped: {}", e);
882 }
883 });
884 }
885
886 pub(crate) async fn execute_compaction(
887 this: Arc<Self>,
888 _task: CompactionTask,
889 ) -> Result<CompactionStats> {
890 let start = std::time::Instant::now();
891 let _guard = CompactionGuard::new(this.compaction_status.clone())
892 .ok_or_else(|| anyhow!("Compaction already in progress"))?;
893
894 let schema = this.schema_manager.schema();
895 let mut files_compacted = 0;
896
897 let compactor = Compactor::new(Arc::clone(&this));
900 let compaction_results = compactor.compact_all().await.unwrap_or_else(|e| {
901 log::error!(
902 "Semantic compaction failed (continuing with backend optimize): {}",
903 e
904 );
905 Vec::new()
906 });
907
908 let am = this.adjacency_manager();
910 for info in &compaction_results {
911 let direction = match info.direction.as_str() {
912 "fwd" => Direction::Outgoing,
913 "bwd" => Direction::Incoming,
914 _ => continue,
915 };
916 if let Some(etid) = schema.edge_type_id_unified_case_insensitive(&info.edge_type)
917 && let Err(e) = am.warm(&this, etid, direction, None).await
918 {
919 log::warn!(
920 "Failed to re-warm adjacency for {}/{}: {}",
921 info.edge_type,
922 info.direction,
923 e
924 );
925 }
926 }
927
928 let backend = this.backend.as_ref();
930
931 for name in schema.edge_types.keys() {
933 for dir in ["fwd", "bwd"] {
934 let delta = table_names::delta_table_name(name, dir);
935 if Self::try_optimize_table(backend, &delta).await {
936 files_compacted += 1;
937 }
938 let adj = table_names::adjacency_table_name(name, dir);
939 if Self::try_optimize_table(backend, &adj).await {
940 files_compacted += 1;
941 }
942 }
943 }
944
945 for label in schema.labels.keys() {
947 let tbl = table_names::vertex_table_name(label);
948 if Self::try_optimize_table(backend, &tbl).await {
949 files_compacted += 1;
950 backend.invalidate_cache(&tbl);
951 }
952 }
953
954 for tbl in [
956 table_names::main_vertex_table_name(),
957 table_names::main_edge_table_name(),
958 ] {
959 if Self::try_optimize_table(backend, tbl).await {
960 files_compacted += 1;
961 }
962 }
963
964 {
965 let mut status = acquire_mutex(&this.compaction_status, "compaction_status")?;
966 status.total_compactions += 1;
967 status.l1_runs = 0; }
969
970 Ok(CompactionStats {
971 files_compacted,
972 bytes_before: 0,
973 bytes_after: 0,
974 duration: start.elapsed(),
975 crdt_merges: 0,
976 })
977 }
978
979 pub fn invalidate_table_cache(&self, label: &str) {
983 let name = table_names::vertex_table_name(label);
984 self.backend.invalidate_cache(&name);
985 }
986
987 pub fn base_path(&self) -> &str {
988 &self.base_uri
989 }
990
991 pub fn schema_manager(&self) -> &SchemaManager {
992 &self.schema_manager
993 }
994
995 pub fn schema_manager_arc(&self) -> Arc<SchemaManager> {
996 self.schema_manager.clone()
997 }
998
999 #[must_use]
1007 pub fn schema_manager_arc_ref(&self) -> &Arc<SchemaManager> {
1008 &self.schema_manager
1009 }
1010
1011 pub fn adjacency_manager(&self) -> Arc<AdjacencyManager> {
1013 Arc::clone(&self.adjacency_manager)
1014 }
1015
1016 pub async fn warm_adjacency(
1021 &self,
1022 edge_type_id: u32,
1023 direction: crate::storage::direction::Direction,
1024 version: Option<u64>,
1025 ) -> anyhow::Result<()> {
1026 self.adjacency_manager
1027 .warm(self, edge_type_id, direction, version)
1028 .await
1029 }
1030
1031 pub async fn warm_adjacency_coalesced(
1036 &self,
1037 edge_type_id: u32,
1038 direction: crate::storage::direction::Direction,
1039 version: Option<u64>,
1040 ) -> anyhow::Result<()> {
1041 self.adjacency_manager
1042 .warm_coalesced(self, edge_type_id, direction, version)
1043 .await
1044 }
1045
1046 pub fn has_adjacency_csr(
1048 &self,
1049 edge_type_id: u32,
1050 direction: crate::storage::direction::Direction,
1051 ) -> bool {
1052 self.adjacency_manager.has_csr(edge_type_id, direction)
1053 }
1054
1055 pub fn get_neighbors_at_version(
1057 &self,
1058 vid: uni_common::core::id::Vid,
1059 edge_type: u32,
1060 direction: crate::storage::direction::Direction,
1061 version: u64,
1062 ) -> Vec<(uni_common::core::id::Vid, uni_common::core::id::Eid)> {
1063 self.adjacency_manager
1064 .get_neighbors_at_version(vid, edge_type, direction, version)
1065 }
1066
1067 pub fn backend(&self) -> &dyn StorageBackend {
1069 self.backend.as_ref()
1070 }
1071
1072 pub fn backend_arc(&self) -> Arc<dyn StorageBackend> {
1074 self.backend.clone()
1075 }
1076
1077 async fn rebuild_vid_labels_index(&mut self) -> Result<()> {
1080 use crate::storage::vid_labels::VidLabelsIndex;
1081
1082 let backend = self.backend.as_ref();
1083 let vtable = table_names::main_vertex_table_name();
1084
1085 if !backend.table_exists(vtable).await.unwrap_or(false) {
1087 self.vid_labels_index = Some(Arc::new(parking_lot::RwLock::new(VidLabelsIndex::new())));
1088 return Ok(());
1089 }
1090
1091 let request = ScanRequest::all(vtable)
1093 .with_filter("_deleted = false")
1094 .with_limit(100_000);
1095 let batches = backend
1096 .scan(request)
1097 .await
1098 .map_err(|e| anyhow!("Failed to query main vertex table: {}", e))?;
1099
1100 let mut index = VidLabelsIndex::new();
1101 for batch in batches {
1102 let vid_col = batch
1103 .column_by_name("_vid")
1104 .ok_or_else(|| anyhow!("Missing _vid column"))?
1105 .as_any()
1106 .downcast_ref::<UInt64Array>()
1107 .ok_or_else(|| anyhow!("Invalid _vid column type"))?;
1108
1109 let labels_col = batch
1110 .column_by_name("labels")
1111 .ok_or_else(|| anyhow!("Missing labels column"))?
1112 .as_any()
1113 .downcast_ref::<arrow_array::ListArray>()
1114 .ok_or_else(|| anyhow!("Invalid labels column type"))?;
1115
1116 for row_idx in 0..batch.num_rows() {
1117 let vid = Vid::from(vid_col.value(row_idx));
1118 let labels_array = labels_col.value(row_idx);
1119 let labels_str_array = labels_array
1120 .as_any()
1121 .downcast_ref::<arrow_array::StringArray>()
1122 .ok_or_else(|| anyhow!("Invalid labels array element type"))?;
1123
1124 let labels: Vec<String> = (0..labels_str_array.len())
1125 .map(|i| labels_str_array.value(i).to_string())
1126 .collect();
1127
1128 index.insert(vid, labels);
1129 }
1130 }
1131
1132 self.vid_labels_index = Some(Arc::new(parking_lot::RwLock::new(index)));
1133 Ok(())
1134 }
1135
1136 pub fn get_labels_from_index(&self, vid: Vid) -> Option<Vec<String>> {
1139 self.vid_labels_index.as_ref().and_then(|idx| {
1140 let index = idx.read();
1141 index.get_labels(vid).map(|labels| labels.to_vec())
1142 })
1143 }
1144
1145 pub fn update_vid_labels_index(&self, vid: Vid, labels: Vec<String>) {
1148 if let Some(idx) = &self.vid_labels_index {
1149 let mut index = idx.write();
1150 index.insert(vid, labels);
1151 }
1152 }
1153
1154 pub fn remove_from_vid_labels_index(&self, vid: Vid) {
1157 if let Some(idx) = &self.vid_labels_index {
1158 let mut index = idx.write();
1159 index.remove_vid(vid);
1160 }
1161 }
1162
1163 pub async fn load_subgraph_cached(
1164 &self,
1165 start_vids: &[Vid],
1166 edge_types: &[u32],
1167 max_hops: usize,
1168 direction: GraphDirection,
1169 _l0: Option<Arc<RwLock<L0Buffer>>>,
1170 ) -> Result<WorkingGraph> {
1171 let mut graph = WorkingGraph::new();
1172
1173 let dir = match direction {
1174 GraphDirection::Outgoing => crate::storage::direction::Direction::Outgoing,
1175 GraphDirection::Incoming => crate::storage::direction::Direction::Incoming,
1176 };
1177
1178 let neighbor_is_dst = matches!(direction, GraphDirection::Outgoing);
1179
1180 let mut frontier: Vec<Vid> = start_vids.to_vec();
1182 let mut visited: HashSet<Vid> = HashSet::new();
1183
1184 for &vid in start_vids {
1186 graph.add_vertex(vid);
1187 }
1188
1189 for _hop in 0..max_hops {
1190 let mut next_frontier = HashSet::new();
1191
1192 for &vid in &frontier {
1193 if visited.contains(&vid) {
1194 continue;
1195 }
1196 visited.insert(vid);
1197 graph.add_vertex(vid);
1198
1199 for &etype_id in edge_types {
1200 let edge_ver = self.snapshot_version_hwm();
1204 self.adjacency_manager
1205 .warm_coalesced(self, etype_id, dir, edge_ver)
1206 .await?;
1207
1208 let edges = self.adjacency_manager.get_neighbors(vid, etype_id, dir);
1210
1211 for (neighbor_vid, eid) in edges {
1212 graph.add_vertex(neighbor_vid);
1213 if !visited.contains(&neighbor_vid) {
1214 next_frontier.insert(neighbor_vid);
1215 }
1216
1217 if neighbor_is_dst {
1218 graph.add_edge(vid, neighbor_vid, eid, etype_id);
1219 } else {
1220 graph.add_edge(neighbor_vid, vid, eid, etype_id);
1221 }
1222 }
1223 }
1224 }
1225 frontier = next_frontier.into_iter().collect();
1226
1227 if frontier.is_empty() {
1229 break;
1230 }
1231 }
1232
1233 Ok(graph)
1234 }
1235
1236 pub fn snapshot_manager(&self) -> &SnapshotManager {
1237 &self.snapshot_manager
1238 }
1239
1240 pub fn index_manager(&self) -> IndexManager {
1241 IndexManager::new(&self.base_uri, self.schema_manager.clone())
1242 }
1243
1244 pub async fn scan_vertex_table(
1253 &self,
1254 label: &str,
1255 columns: &[&str],
1256 additional_filter: Option<&str>,
1257 ) -> Result<Option<arrow_array::RecordBatch>> {
1258 let backend = self.backend();
1259 let table_name = table_names::vertex_table_name(label);
1260
1261 if !backend.table_exists(&table_name).await.unwrap_or(false) {
1262 return Ok(None);
1263 }
1264
1265 let actual_columns =
1267 if let Some(table_schema) = backend.get_table_schema(&table_name).await? {
1268 let table_field_names: HashSet<&str> = table_schema
1269 .fields()
1270 .iter()
1271 .map(|f| f.name().as_str())
1272 .collect();
1273 columns
1274 .iter()
1275 .copied()
1276 .filter(|c| table_field_names.contains(c))
1277 .map(|s| s.to_string())
1278 .collect::<Vec<_>>()
1279 } else {
1280 return Ok(None);
1281 };
1282
1283 let filter = match (self.version_high_water_mark(), additional_filter) {
1285 (Some(hwm), Some(f)) => Some(format!("_version <= {} AND ({})", hwm, f)),
1286 (Some(hwm), None) => Some(format!("_version <= {}", hwm)),
1287 (None, Some(f)) => Some(f.to_string()),
1288 (None, None) => None,
1289 };
1290
1291 let mut request = ScanRequest::all(&table_name).with_columns(actual_columns);
1292 if let Some(f) = filter {
1293 request = request.with_filter(f);
1294 }
1295
1296 let batches = backend.scan(request).await?;
1303 if batches.is_empty() {
1304 Ok(None)
1305 } else {
1306 Ok(Some(arrow::compute::concat_batches(
1307 &batches[0].schema(),
1308 &batches,
1309 )?))
1310 }
1311 }
1312
1313 pub async fn scan_delta_table(
1316 &self,
1317 edge_type: &str,
1318 direction: &str,
1319 columns: &[&str],
1320 additional_filter: Option<&str>,
1321 ) -> Result<Option<arrow_array::RecordBatch>> {
1322 let edge_hwm = self.snapshot_version_hwm();
1328 let backend = self.backend();
1329 let table_name = table_names::delta_table_name(edge_type, direction);
1330
1331 if !backend.table_exists(&table_name).await.unwrap_or(false) {
1332 return Ok(None);
1333 }
1334
1335 let actual_columns =
1337 if let Some(table_schema) = backend.get_table_schema(&table_name).await? {
1338 let table_field_names: HashSet<&str> = table_schema
1339 .fields()
1340 .iter()
1341 .map(|f| f.name().as_str())
1342 .collect();
1343 columns
1344 .iter()
1345 .copied()
1346 .filter(|c| table_field_names.contains(c))
1347 .map(|s| s.to_string())
1348 .collect::<Vec<_>>()
1349 } else {
1350 return Ok(None);
1351 };
1352
1353 let filter = match (edge_hwm, additional_filter) {
1354 (Some(hwm), Some(f)) => Some(format!("_version <= {} AND ({})", hwm, f)),
1355 (Some(hwm), None) => Some(format!("_version <= {}", hwm)),
1356 (None, Some(f)) => Some(f.to_string()),
1357 (None, None) => None,
1358 };
1359
1360 let mut request = ScanRequest::all(&table_name).with_columns(actual_columns);
1361 if let Some(f) = filter {
1362 request = request.with_filter(f);
1363 }
1364
1365 let batches = backend.scan(request).await?;
1372 if batches.is_empty() {
1373 Ok(None)
1374 } else {
1375 Ok(Some(arrow::compute::concat_batches(
1376 &batches[0].schema(),
1377 &batches,
1378 )?))
1379 }
1380 }
1381
1382 pub async fn scan_main_vertex_table(
1387 &self,
1388 columns: &[&str],
1389 filter: Option<&str>,
1390 ) -> Result<Option<arrow_array::RecordBatch>> {
1391 let backend = self.backend();
1392 let table_name = table_names::main_vertex_table_name();
1393
1394 if !backend.table_exists(table_name).await.unwrap_or(false) {
1395 return Ok(None);
1396 }
1397
1398 let full_filter = match (self.version_high_water_mark(), filter) {
1400 (Some(hwm), Some(f)) => Some(format!("_version <= {} AND ({})", hwm, f)),
1401 (Some(hwm), None) => Some(format!("_version <= {}", hwm)),
1402 (None, Some(f)) => Some(f.to_string()),
1403 (None, None) => None,
1404 };
1405
1406 let request = ScanRequest::all(table_name)
1407 .with_columns(columns.iter().map(|s| s.to_string()).collect());
1408 let request = match full_filter.as_deref() {
1409 Some(f) => request.with_filter(f),
1410 None => request,
1411 };
1412
1413 let batches = backend.scan(request).await?;
1420 if batches.is_empty() {
1421 Ok(None)
1422 } else {
1423 Ok(Some(arrow::compute::concat_batches(
1424 &batches[0].schema(),
1425 &batches,
1426 )?))
1427 }
1428 }
1429
1430 pub async fn scan_main_edge_table_stream(
1432 &self,
1433 filter: Option<&str>,
1434 ) -> Result<
1435 Option<
1436 std::pin::Pin<Box<dyn futures::Stream<Item = Result<arrow_array::RecordBatch>> + Send>>,
1437 >,
1438 > {
1439 let backend = self.backend();
1440 let table_name = table_names::main_edge_table_name();
1441
1442 if !backend.table_exists(table_name).await.unwrap_or(false) {
1443 return Ok(None);
1444 }
1445
1446 let mut request = ScanRequest::all(table_name);
1447 if let Some(f) = filter {
1448 request = request.with_filter(f);
1449 }
1450
1451 let stream = backend.scan_stream(request).await?;
1452 Ok(Some(stream))
1453 }
1454
1455 pub async fn scan_vertex_table_stream(
1457 &self,
1458 label: &str,
1459 ) -> Result<
1460 Option<
1461 std::pin::Pin<Box<dyn futures::Stream<Item = Result<arrow_array::RecordBatch>> + Send>>,
1462 >,
1463 > {
1464 let backend = self.backend();
1465 let table_name = table_names::vertex_table_name(label);
1466
1467 if !backend.table_exists(&table_name).await.unwrap_or(false) {
1468 return Ok(None);
1469 }
1470
1471 let stream = backend.scan_stream(ScanRequest::all(&table_name)).await?;
1472 Ok(Some(stream))
1473 }
1474
1475 pub async fn find_vertex_by_ext_id(&self, ext_id: &str) -> Result<Option<Vid>> {
1477 MainVertexDataset::find_by_ext_id(self.backend(), ext_id, self.version_high_water_mark())
1478 .await
1479 }
1480
1481 pub async fn find_vertex_labels_by_vid(&self, vid: Vid) -> Result<Option<Vec<String>>> {
1483 MainVertexDataset::find_labels_by_vid(self.backend(), vid, self.version_high_water_mark())
1484 .await
1485 }
1486
1487 pub async fn find_edges_by_type_names(
1490 &self,
1491 type_names: &[&str],
1492 endpoint_filter: Option<(crate::storage::main_edge::EndpointSide, &[Vid])>,
1493 ) -> Result<Vec<(Eid, Vid, Vid, String, uni_common::Properties)>> {
1494 MainEdgeDataset::find_edges_by_type_names(self.backend(), type_names, endpoint_filter).await
1495 }
1496
1497 pub async fn scan_vertex_candidates(
1499 &self,
1500 label: &str,
1501 filter: Option<&str>,
1502 ) -> Result<Vec<Vid>> {
1503 let backend = self.backend();
1504 let table_name = table_names::vertex_table_name(label);
1505
1506 if !backend.table_exists(&table_name).await.unwrap_or(false) {
1507 return Ok(Vec::new());
1508 }
1509
1510 let full_filter = match filter {
1511 Some(f) => format!("_deleted = false AND ({})", f),
1512 None => "_deleted = false".to_string(),
1513 };
1514
1515 let request = ScanRequest::all(&table_name)
1516 .with_filter(full_filter)
1517 .with_columns(vec!["_vid".to_string()]);
1518
1519 let batches = backend.scan(request).await?;
1520
1521 let mut vids = Vec::new();
1522 for batch in batches {
1523 let vid_col = batch
1524 .column_by_name("_vid")
1525 .ok_or(anyhow!("Missing _vid"))?
1526 .as_any()
1527 .downcast_ref::<UInt64Array>()
1528 .ok_or(anyhow!("Invalid _vid"))?;
1529 for i in 0..batch.num_rows() {
1530 vids.push(Vid::from(vid_col.value(i)));
1531 }
1532 }
1533 Ok(vids)
1534 }
1535
1536 pub fn vertex_dataset(&self, label: &str) -> Result<VertexDataset> {
1537 let schema = self.schema_manager.schema();
1538 let label_meta = schema
1539 .labels
1540 .get(label)
1541 .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
1542 let key = format!("vertices_{label}");
1543 match self.fork_branch_for(&key) {
1544 Some(branch) => Ok(VertexDataset::new_branched(
1545 &self.base_uri,
1546 label,
1547 label_meta.id,
1548 branch,
1549 )),
1550 None => Ok(VertexDataset::new(&self.base_uri, label, label_meta.id)),
1551 }
1552 }
1553
1554 #[cfg(feature = "lance-backend")]
1555 pub fn edge_dataset(
1556 &self,
1557 edge_type: &str,
1558 src_label: &str,
1559 dst_label: &str,
1560 ) -> Result<EdgeDataset> {
1561 let key = format!("edges_{edge_type}");
1562 match self.fork_branch_for(&key) {
1563 Some(branch) => Ok(EdgeDataset::new_branched(
1564 &self.base_uri,
1565 edge_type,
1566 src_label,
1567 dst_label,
1568 branch,
1569 )),
1570 None => Ok(EdgeDataset::new(
1571 &self.base_uri,
1572 edge_type,
1573 src_label,
1574 dst_label,
1575 )),
1576 }
1577 }
1578
1579 pub fn delta_dataset(&self, edge_type: &str, direction: &str) -> Result<DeltaDataset> {
1580 let key = format!("deltas_{edge_type}_{direction}");
1581 match self.fork_branch_for(&key) {
1582 Some(branch) => Ok(DeltaDataset::new_branched(
1583 &self.base_uri,
1584 edge_type,
1585 direction,
1586 branch,
1587 )),
1588 None => Ok(DeltaDataset::new(&self.base_uri, edge_type, direction)),
1589 }
1590 }
1591
1592 pub fn adjacency_dataset(
1593 &self,
1594 edge_type: &str,
1595 label: &str,
1596 direction: &str,
1597 ) -> Result<AdjacencyDataset> {
1598 let key = format!("adjacency_{direction}_{edge_type}_{label}");
1599 match self.fork_branch_for(&key) {
1600 Some(branch) => Ok(AdjacencyDataset::new_branched(
1601 &self.base_uri,
1602 edge_type,
1603 label,
1604 direction,
1605 branch,
1606 )),
1607 None => Ok(AdjacencyDataset::new(
1608 &self.base_uri,
1609 edge_type,
1610 label,
1611 direction,
1612 )),
1613 }
1614 }
1615
1616 fn fork_branch_for(&self, dataset_name: &str) -> Option<String> {
1620 self.fork_scope
1621 .as_ref()
1622 .and_then(|s| s.branch_for(dataset_name))
1623 }
1624
1625 pub fn main_vertex_dataset(&self) -> MainVertexDataset {
1630 MainVertexDataset::new(&self.base_uri)
1631 }
1632
1633 pub fn main_edge_dataset(&self) -> MainEdgeDataset {
1638 MainEdgeDataset::new(&self.base_uri)
1639 }
1640
1641 #[cfg(feature = "lance-backend")]
1642 pub fn uid_index(&self, label: &str) -> Result<UidIndex> {
1643 Ok(UidIndex::new(&self.base_uri, label))
1644 }
1645
1646 #[cfg(feature = "lance-backend")]
1647 pub async fn inverted_index(&self, label: &str, property: &str) -> Result<InvertedIndex> {
1648 let schema = self.schema_manager.schema();
1649 let config = schema
1650 .indexes
1651 .iter()
1652 .find_map(|idx| match idx {
1653 IndexDefinition::Inverted(cfg)
1654 if cfg.label == label && cfg.property == property =>
1655 {
1656 Some(cfg.clone())
1657 }
1658 _ => None,
1659 })
1660 .ok_or_else(|| anyhow!("Inverted index not found for {}.{}", label, property))?;
1661
1662 InvertedIndex::new(&self.base_uri, config).await
1663 }
1664
1665 pub async fn vector_search(
1666 &self,
1667 label: &str,
1668 property: &str,
1669 query: &[f32],
1670 k: usize,
1671 filter: Option<&str>,
1672 ctx: Option<&QueryContext>,
1673 ) -> Result<Vec<(Vid, f32)>> {
1674 use crate::backend::types::{DistanceMetric as BackendMetric, FilterExpr};
1675
1676 let schema = self.schema_manager.schema();
1678 let metric = schema
1679 .vector_index_for_property(label, property)
1680 .map(|config| config.metric.clone())
1681 .unwrap_or(DistanceMetric::L2);
1682
1683 let backend = self.backend.as_ref();
1684 let name = table_names::vertex_table_name(label);
1685
1686 let mut results = Vec::new();
1687
1688 if backend.table_exists(&name).await.unwrap_or(false) {
1690 let backend_metric = match &metric {
1691 DistanceMetric::L2 => BackendMetric::L2,
1692 DistanceMetric::Cosine => BackendMetric::Cosine,
1693 DistanceMetric::Dot => BackendMetric::Dot,
1694 _ => BackendMetric::L2,
1695 };
1696
1697 let mut filter_parts = vec![Self::build_active_filter(filter)];
1699 if ctx.is_some()
1700 && let Some(hwm) = self.version_high_water_mark()
1701 {
1702 filter_parts.push(format!("_version <= {}", hwm));
1703 }
1704 let combined_filter = FilterExpr::Sql(filter_parts.join(" AND "));
1705
1706 let batches = backend
1707 .vector_search(&name, property, query, k, backend_metric, combined_filter)
1708 .await?;
1709
1710 results = extract_vid_score_pairs(&batches, "_vid", "_distance")?;
1711 }
1712
1713 if let Some(qctx) = ctx {
1715 merge_l0_into_vector_results(&mut results, qctx, label, property, query, k, &metric);
1716 }
1717
1718 Ok(results)
1719 }
1720
1721 pub async fn fts_search(
1737 &self,
1738 label: &str,
1739 property: &str,
1740 query: &str,
1741 k: usize,
1742 filter: Option<&str>,
1743 ctx: Option<&QueryContext>,
1744 ) -> Result<Vec<(Vid, f32)>> {
1745 use crate::backend::types::FilterExpr;
1746
1747 let backend = self.backend.as_ref();
1748 let name = table_names::vertex_table_name(label);
1749
1750 let mut results = if backend.table_exists(&name).await.unwrap_or(false) {
1751 let mut filter_parts = vec![Self::build_active_filter(filter)];
1753 if ctx.is_some()
1754 && let Some(hwm) = self.version_high_water_mark()
1755 {
1756 filter_parts.push(format!("_version <= {}", hwm));
1757 }
1758 let combined_filter = FilterExpr::Sql(filter_parts.join(" AND "));
1759
1760 let batches = backend
1761 .full_text_search(&name, property, query, k, combined_filter)
1762 .await?;
1763
1764 let mut fts_results = extract_vid_score_pairs(&batches, "_vid", "_score")?;
1765 fts_results.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
1767 fts_results
1768 } else {
1769 Vec::new()
1770 };
1771
1772 if let Some(qctx) = ctx {
1774 merge_l0_into_fts_results(&mut results, qctx, label, property, query, k);
1775 }
1776
1777 Ok(results)
1778 }
1779
1780 #[cfg(feature = "lance-backend")]
1781 pub async fn get_vertex_by_uid(&self, uid: &UniId, label: &str) -> Result<Option<Vid>> {
1782 let index = self.uid_index(label)?;
1783 index.get_vid(uid).await
1784 }
1785
1786 #[cfg(feature = "lance-backend")]
1787 pub async fn insert_vertex_with_uid(&self, label: &str, vid: Vid, uid: UniId) -> Result<()> {
1788 let index = self.uid_index(label)?;
1789 index.write_mapping(&[(uid, vid)]).await
1790 }
1791
1792 pub async fn load_subgraph(
1793 &self,
1794 start_vids: &[Vid],
1795 edge_types: &[u32],
1796 max_hops: usize,
1797 direction: GraphDirection,
1798 l0: Option<&L0Buffer>,
1799 ) -> Result<WorkingGraph> {
1800 let mut graph = WorkingGraph::new();
1801 let schema = self.schema_manager.schema();
1802
1803 let label_map: HashMap<u16, String> = schema
1805 .labels
1806 .values()
1807 .map(|meta| {
1808 (
1809 meta.id,
1810 schema.label_name_by_id(meta.id).unwrap().to_owned(),
1811 )
1812 })
1813 .collect();
1814
1815 let edge_type_map: HashMap<u32, String> = schema
1816 .edge_types
1817 .values()
1818 .map(|meta| {
1819 (
1820 meta.id,
1821 schema.edge_type_name_by_id(meta.id).unwrap().to_owned(),
1822 )
1823 })
1824 .collect();
1825
1826 let target_edge_types: HashSet<u32> = edge_types.iter().cloned().collect();
1827
1828 let mut frontier: Vec<Vid> = start_vids.to_vec();
1830 let mut visited: HashSet<Vid> = HashSet::new();
1831
1832 for &vid in start_vids {
1834 graph.add_vertex(vid);
1835 }
1836
1837 for _hop in 0..max_hops {
1838 let mut next_frontier = HashSet::new();
1839
1840 for &vid in &frontier {
1841 if visited.contains(&vid) {
1842 continue;
1843 }
1844 visited.insert(vid);
1845 graph.add_vertex(vid);
1846
1847 for &etype_id in &target_edge_types {
1849 let etype_name = edge_type_map
1850 .get(&etype_id)
1851 .ok_or_else(|| anyhow!("Unknown edge type ID: {}", etype_id))?;
1852
1853 let (dir_str, neighbor_is_dst) = match direction {
1857 GraphDirection::Outgoing => ("fwd", true),
1858 GraphDirection::Incoming => ("bwd", false),
1859 };
1860
1861 let mut edges: HashMap<Eid, EdgeState> = HashMap::new();
1862
1863 let _edge_ver = self
1868 .pinned_snapshot
1869 .as_ref()
1870 .and_then(|s| s.edges.get(etype_name).map(|es| es.lance_version));
1871
1872 let backend = self.backend();
1874 for current_src_label in label_map.values() {
1875 let adj_ds =
1876 match self.adjacency_dataset(etype_name, current_src_label, dir_str) {
1877 Ok(ds) => ds,
1878 Err(_) => continue,
1879 };
1880 if let Some((neighbors, eids)) =
1881 adj_ds.read_adjacency_backend(backend, vid).await?
1882 {
1883 for (n, eid) in neighbors.into_iter().zip(eids) {
1884 edges.insert(
1885 eid,
1886 EdgeState {
1887 neighbor: n,
1888 version: 0,
1889 deleted: false,
1890 },
1891 );
1892 }
1893 break; }
1895 }
1896
1897 let delta_ds = self.delta_dataset(etype_name, dir_str)?;
1899 let delta_entries = delta_ds
1900 .read_deltas(backend, vid, &schema, self.snapshot_version_hwm())
1901 .await?;
1902 Self::apply_delta_to_edges(&mut edges, delta_entries, neighbor_is_dst);
1903
1904 if let Some(l0) = l0 {
1906 Self::apply_l0_to_edges(&mut edges, l0, vid, etype_id, direction);
1907 }
1908
1909 Self::add_edges_to_graph(
1911 &mut graph,
1912 edges,
1913 vid,
1914 etype_id,
1915 neighbor_is_dst,
1916 &visited,
1917 &mut next_frontier,
1918 );
1919 }
1920 }
1921 frontier = next_frontier.into_iter().collect();
1922
1923 if frontier.is_empty() {
1925 break;
1926 }
1927 }
1928
1929 Ok(graph)
1930 }
1931
1932 fn apply_delta_to_edges(
1934 edges: &mut HashMap<Eid, EdgeState>,
1935 delta_entries: Vec<crate::storage::delta::L1Entry>,
1936 neighbor_is_dst: bool,
1937 ) {
1938 for entry in delta_entries {
1939 let neighbor = if neighbor_is_dst {
1940 entry.dst_vid
1941 } else {
1942 entry.src_vid
1943 };
1944 let current_ver = edges.get(&entry.eid).map(|s| s.version).unwrap_or(0);
1945
1946 if entry.version > current_ver {
1947 edges.insert(
1948 entry.eid,
1949 EdgeState {
1950 neighbor,
1951 version: entry.version,
1952 deleted: matches!(entry.op, Op::Delete),
1953 },
1954 );
1955 }
1956 }
1957 }
1958
1959 fn apply_l0_to_edges(
1961 edges: &mut HashMap<Eid, EdgeState>,
1962 l0: &L0Buffer,
1963 vid: Vid,
1964 etype_id: u32,
1965 direction: GraphDirection,
1966 ) {
1967 let l0_neighbors = l0.get_neighbors(vid, etype_id, direction);
1968 for (neighbor, eid, ver) in l0_neighbors {
1969 let current_ver = edges.get(&eid).map(|s| s.version).unwrap_or(0);
1970 if ver > current_ver {
1971 edges.insert(
1972 eid,
1973 EdgeState {
1974 neighbor,
1975 version: ver,
1976 deleted: false,
1977 },
1978 );
1979 }
1980 }
1981
1982 for (eid, state) in edges.iter_mut() {
1984 if l0.is_tombstoned(*eid) {
1985 state.deleted = true;
1986 }
1987 }
1988 }
1989
1990 fn add_edges_to_graph(
1992 graph: &mut WorkingGraph,
1993 edges: HashMap<Eid, EdgeState>,
1994 vid: Vid,
1995 etype_id: u32,
1996 neighbor_is_dst: bool,
1997 visited: &HashSet<Vid>,
1998 next_frontier: &mut HashSet<Vid>,
1999 ) {
2000 for (eid, state) in edges {
2001 if state.deleted {
2002 continue;
2003 }
2004 graph.add_vertex(state.neighbor);
2005
2006 if !visited.contains(&state.neighbor) {
2007 next_frontier.insert(state.neighbor);
2008 }
2009
2010 if neighbor_is_dst {
2011 graph.add_edge(vid, state.neighbor, eid, etype_id);
2012 } else {
2013 graph.add_edge(state.neighbor, vid, eid, etype_id);
2014 }
2015 }
2016 }
2017}
2018
2019fn extract_vid_score_pairs(
2021 batches: &[arrow_array::RecordBatch],
2022 vid_column: &str,
2023 score_column: &str,
2024) -> Result<Vec<(Vid, f32)>> {
2025 let mut results = Vec::new();
2026 for batch in batches {
2027 let vid_col = batch
2028 .column_by_name(vid_column)
2029 .ok_or_else(|| anyhow!("Missing {} column", vid_column))?
2030 .as_any()
2031 .downcast_ref::<UInt64Array>()
2032 .ok_or_else(|| anyhow!("Invalid {} column type", vid_column))?;
2033
2034 let score_col = batch
2035 .column_by_name(score_column)
2036 .ok_or_else(|| anyhow!("Missing {} column", score_column))?
2037 .as_any()
2038 .downcast_ref::<Float32Array>()
2039 .ok_or_else(|| anyhow!("Invalid {} column type", score_column))?;
2040
2041 for i in 0..batch.num_rows() {
2042 results.push((Vid::from(vid_col.value(i)), score_col.value(i)));
2043 }
2044 }
2045 Ok(results)
2046}
2047
2048fn extract_embedding_from_props(
2053 props: &uni_common::Properties,
2054 property: &str,
2055) -> Option<Vec<f32>> {
2056 let arr = props.get(property)?.as_array()?;
2057 arr.iter().map(|v| v.as_f64().map(|f| f as f32)).collect()
2058}
2059
2060fn merge_l0_into_vector_results(
2070 results: &mut Vec<(Vid, f32)>,
2071 ctx: &QueryContext,
2072 label: &str,
2073 property: &str,
2074 query: &[f32],
2075 k: usize,
2076 metric: &DistanceMetric,
2077) {
2078 let mut buffers: Vec<Arc<parking_lot::RwLock<L0Buffer>>> =
2080 ctx.pending_flush_l0s.iter().map(Arc::clone).collect();
2081 buffers.push(Arc::clone(&ctx.l0));
2082 if let Some(ref txn) = ctx.transaction_l0 {
2083 buffers.push(Arc::clone(txn));
2084 }
2085
2086 let mut l0_candidates: HashMap<Vid, f32> = HashMap::new();
2088 let mut tombstoned: HashSet<Vid> = HashSet::new();
2090
2091 for buf_arc in &buffers {
2092 let buf = buf_arc.read();
2093
2094 for &vid in &buf.vertex_tombstones {
2096 tombstoned.insert(vid);
2097 }
2098
2099 for (&vid, labels) in &buf.vertex_labels {
2101 if !labels.iter().any(|l| l == label) {
2102 continue;
2103 }
2104 if let Some(props) = buf.vertex_properties.get(&vid)
2105 && let Some(emb) = extract_embedding_from_props(props, property)
2106 {
2107 if emb.len() != query.len() {
2108 continue; }
2110 let dist = metric.compute_distance(&emb, query);
2111 l0_candidates.insert(vid, dist);
2113 tombstoned.remove(&vid);
2115 }
2116 }
2117 }
2118
2119 if l0_candidates.is_empty() && tombstoned.is_empty() {
2121 return;
2122 }
2123
2124 results.retain(|(vid, _)| !tombstoned.contains(vid));
2126
2127 for (vid, dist) in &l0_candidates {
2129 if let Some(existing) = results.iter_mut().find(|(v, _)| v == vid) {
2130 existing.1 = *dist;
2131 } else {
2132 results.push((*vid, *dist));
2133 }
2134 }
2135
2136 results.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
2138 results.truncate(k);
2139}
2140
2141fn compute_text_relevance(query: &str, text: &str) -> f32 {
2146 let query_tokens: HashSet<String> =
2147 query.split_whitespace().map(|t| t.to_lowercase()).collect();
2148 if query_tokens.is_empty() {
2149 return 0.0;
2150 }
2151 let text_tokens: HashSet<String> = text.split_whitespace().map(|t| t.to_lowercase()).collect();
2152 let hits = query_tokens
2153 .iter()
2154 .filter(|t| text_tokens.contains(t.as_str()))
2155 .count();
2156 hits as f32 / query_tokens.len() as f32
2157}
2158
2159fn extract_text_from_props<'a>(
2161 props: &'a uni_common::Properties,
2162 property: &str,
2163) -> Option<&'a str> {
2164 props.get(property)?.as_str()
2165}
2166
2167fn merge_l0_into_fts_results(
2177 results: &mut Vec<(Vid, f32)>,
2178 ctx: &QueryContext,
2179 label: &str,
2180 property: &str,
2181 query: &str,
2182 k: usize,
2183) {
2184 let mut buffers: Vec<Arc<parking_lot::RwLock<L0Buffer>>> =
2186 ctx.pending_flush_l0s.iter().map(Arc::clone).collect();
2187 buffers.push(Arc::clone(&ctx.l0));
2188 if let Some(ref txn) = ctx.transaction_l0 {
2189 buffers.push(Arc::clone(txn));
2190 }
2191
2192 let mut l0_candidates: HashMap<Vid, f32> = HashMap::new();
2194 let mut tombstoned: HashSet<Vid> = HashSet::new();
2196
2197 for buf_arc in &buffers {
2198 let buf = buf_arc.read();
2199
2200 for &vid in &buf.vertex_tombstones {
2202 tombstoned.insert(vid);
2203 }
2204
2205 for (&vid, labels) in &buf.vertex_labels {
2207 if !labels.iter().any(|l| l == label) {
2208 continue;
2209 }
2210 if let Some(props) = buf.vertex_properties.get(&vid)
2211 && let Some(text) = extract_text_from_props(props, property)
2212 {
2213 let score = compute_text_relevance(query, text);
2214 if score > 0.0 {
2215 l0_candidates.insert(vid, score);
2217 }
2218 tombstoned.remove(&vid);
2220 }
2221 }
2222 }
2223
2224 if l0_candidates.is_empty() && tombstoned.is_empty() {
2226 return;
2227 }
2228
2229 results.retain(|(vid, _)| !tombstoned.contains(vid));
2231
2232 for (vid, score) in &l0_candidates {
2234 if let Some(existing) = results.iter_mut().find(|(v, _)| v == vid) {
2235 existing.1 = *score;
2236 } else {
2237 results.push((*vid, *score));
2238 }
2239 }
2240
2241 results.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
2243 results.truncate(k);
2244}