1use crate::backend::StorageBackend;
5#[cfg(feature = "lance-backend")]
6use crate::backend::lance::LanceDbBackend;
7use crate::backend::table_names;
8use crate::backend::types::ScanRequest;
9use crate::compaction::{CompactionStats, CompactionStatus, CompactionTask};
10use crate::runtime::WorkingGraph;
11use crate::runtime::context::QueryContext;
12use crate::runtime::l0::L0Buffer;
13use crate::storage::adjacency::AdjacencyDataset;
14use crate::storage::compaction::Compactor;
15use crate::storage::delta::{DeltaDataset, ENTRY_SIZE_ESTIMATE, Op};
16use crate::storage::direction::Direction;
17#[cfg(feature = "lance-backend")]
18use crate::storage::edge::EdgeDataset;
19#[cfg(feature = "lance-backend")]
20use crate::storage::index::UidIndex;
21#[cfg(feature = "lance-backend")]
22use crate::storage::inverted_index::InvertedIndex;
23use crate::storage::main_edge::MainEdgeDataset;
24use crate::storage::main_vertex::MainVertexDataset;
25use crate::storage::vertex::VertexDataset;
26use anyhow::{Result, anyhow};
27use arrow_array::{Array, Float32Array, TimestampNanosecondArray, UInt64Array};
28use object_store::ObjectStore;
29#[cfg(feature = "lance-backend")]
30use object_store::local::LocalFileSystem;
31use parking_lot::RwLock;
32use std::collections::{HashMap, HashSet};
33use std::sync::{Arc, Mutex};
34use std::time::{Duration, SystemTime, UNIX_EPOCH};
35use tracing::warn;
36use uni_common::config::UniConfig;
37#[cfg(feature = "lance-backend")]
38use uni_common::core::id::UniId;
39use uni_common::core::id::{Eid, Vid};
40#[cfg(feature = "lance-backend")]
41use uni_common::core::schema::IndexDefinition;
42use uni_common::core::schema::{DistanceMetric, SchemaManager};
43use uni_common::sync::acquire_mutex;
44
45use crate::snapshot::manager::SnapshotManager;
46use crate::storage::IndexManager;
47use crate::storage::adjacency_manager::AdjacencyManager;
48use crate::storage::resilient_store::ResilientObjectStore;
49
50use uni_common::core::snapshot::SnapshotManifest;
51
52use uni_common::graph::simple_graph::Direction as GraphDirection;
53
54struct EdgeState {
56 neighbor: Vid,
57 version: u64,
58 deleted: bool,
59}
60
61pub struct StorageManager {
62 base_uri: String,
63 store: Arc<dyn ObjectStore>,
64 schema_manager: Arc<SchemaManager>,
65 snapshot_manager: Arc<SnapshotManager>,
66 adjacency_manager: Arc<AdjacencyManager>,
67 pub config: UniConfig,
68 pub compaction_status: Arc<Mutex<CompactionStatus>>,
69 pub flush_in_progress: std::sync::atomic::AtomicUsize,
74 pinned_snapshot: Option<SnapshotManifest>,
76 fork_scope: Option<Arc<crate::fork::ForkScope>>,
84 backend: Arc<dyn StorageBackend>,
86 vid_labels_index: Option<Arc<parking_lot::RwLock<crate::storage::vid_labels::VidLabelsIndex>>>,
88}
89
90pub struct FlushInProgressGuard {
99 storage: Arc<StorageManager>,
100}
101
102impl FlushInProgressGuard {
103 pub fn new(storage: &Arc<StorageManager>) -> Self {
104 storage
105 .flush_in_progress
106 .fetch_add(1, std::sync::atomic::Ordering::AcqRel);
107 Self {
108 storage: storage.clone(),
109 }
110 }
111}
112
113impl Drop for FlushInProgressGuard {
114 fn drop(&mut self) {
115 self.storage
117 .flush_in_progress
118 .fetch_sub(1, std::sync::atomic::Ordering::AcqRel);
119 }
120}
121
122fn is_lance_conflict(err: &anyhow::Error) -> bool {
128 let msg = err.to_string();
129 msg.contains("Incompatible transaction") || msg.contains("conflict")
130}
131
132async fn retry_on_lance_conflict<F, Fut>(mut op: F) -> anyhow::Result<()>
137where
138 F: FnMut() -> Fut,
139 Fut: std::future::Future<Output = anyhow::Result<()>>,
140{
141 for attempt in 0u32..10 {
142 match op().await {
143 Ok(()) => return Ok(()),
144 Err(e) => {
145 if !is_lance_conflict(&e) || attempt == 9 {
146 return Err(e);
147 }
148 let backoff_ms = 1u64 << attempt;
149 tokio::time::sleep(std::time::Duration::from_millis(backoff_ms)).await;
150 }
151 }
152 }
153 unreachable!("retry loop exits via Ok or Err")
154}
155
156pub async fn merge_insert_batch_with_lance_conflict_retry(
165 backend: &dyn crate::backend::StorageBackend,
166 table_name: &str,
167 batch: arrow_array::RecordBatch,
168 on: &[&str],
169) -> anyhow::Result<()> {
170 retry_on_lance_conflict(|| async {
171 let exists = backend.table_exists(table_name).await?;
172 if !exists {
173 anyhow::bail!(
174 "merge_insert target table '{}' does not exist (partial writes \
175 require the row to already be present; CREATE goes through Append)",
176 table_name
177 );
178 }
179 backend
180 .merge_insert(table_name, on, vec![batch.clone()])
181 .await
182 })
183 .await
184}
185
186pub async fn write_batch_with_lance_conflict_retry(
195 backend: &dyn crate::backend::StorageBackend,
196 table_name: &str,
197 batch: arrow_array::RecordBatch,
198) -> anyhow::Result<()> {
199 use crate::backend::types::WriteMode;
200 retry_on_lance_conflict(|| async {
201 let exists = backend.table_exists(table_name).await?;
202 if exists {
203 backend
204 .write(table_name, vec![batch.clone()], WriteMode::Append)
205 .await
206 } else {
207 backend.create_table(table_name, vec![batch.clone()]).await
208 }
209 })
210 .await
211}
212
213struct CompactionGuard {
215 status: Arc<Mutex<CompactionStatus>>,
216}
217
218impl CompactionGuard {
219 fn new(status: Arc<Mutex<CompactionStatus>>) -> Option<Self> {
220 let mut s = acquire_mutex(&status, "compaction_status").ok()?;
221 if s.compaction_in_progress {
222 return None;
223 }
224 s.compaction_in_progress = true;
225 Some(Self {
226 status: status.clone(),
227 })
228 }
229}
230
231impl Drop for CompactionGuard {
232 fn drop(&mut self) {
233 match uni_common::sync::acquire_mutex(&self.status, "compaction_status") {
236 Ok(mut s) => {
237 s.compaction_in_progress = false;
238 s.last_compaction = Some(std::time::SystemTime::now());
239 }
240 Err(e) => {
241 log::error!(
245 "CompactionGuard drop failed to acquire poisoned lock: {}. \
246 Compaction status may be inconsistent. Issue #18/#150",
247 e
248 );
249 }
250 }
251 }
252}
253
254impl StorageManager {
255 pub async fn new_with_backend(
257 base_uri: &str,
258 store: Arc<dyn ObjectStore>,
259 backend: Arc<dyn StorageBackend>,
260 schema_manager: Arc<SchemaManager>,
261 config: UniConfig,
262 ) -> Result<Self> {
263 let resilient_store: Arc<dyn ObjectStore> = Arc::new(ResilientObjectStore::new(
264 store,
265 config.object_store.clone(),
266 ));
267
268 let snapshot_manager = Arc::new(SnapshotManager::new(resilient_store.clone()));
269
270 Self::recover_all_staging_tables(backend.as_ref(), &schema_manager).await?;
272
273 let mut sm = Self {
274 base_uri: base_uri.to_string(),
275 store: resilient_store,
276 schema_manager,
277 snapshot_manager,
278 adjacency_manager: Arc::new(AdjacencyManager::new(config.cache_size)),
279 config,
280 compaction_status: Arc::new(Mutex::new(CompactionStatus::default())),
281 flush_in_progress: std::sync::atomic::AtomicUsize::new(0),
282 pinned_snapshot: None,
283 fork_scope: None,
284 backend,
285 vid_labels_index: None,
286 };
287
288 if sm.config.enable_vid_labels_index
290 && let Err(e) = sm.rebuild_vid_labels_index().await
291 {
292 warn!(
293 "Failed to rebuild VidLabelsIndex on startup: {}. Falling back to storage queries.",
294 e
295 );
296 }
297
298 Ok(sm)
299 }
300
301 #[cfg(feature = "lance-backend")]
303 pub async fn new(base_uri: &str, schema_manager: Arc<SchemaManager>) -> Result<Self> {
304 Self::new_with_config(base_uri, schema_manager, UniConfig::default()).await
305 }
306
307 #[cfg(feature = "lance-backend")]
309 pub async fn new_with_cache(
310 base_uri: &str,
311 schema_manager: Arc<SchemaManager>,
312 adjacency_cache_size: usize,
313 ) -> Result<Self> {
314 let config = UniConfig {
315 cache_size: adjacency_cache_size,
316 ..Default::default()
317 };
318 Self::new_with_config(base_uri, schema_manager, config).await
319 }
320
321 #[cfg(feature = "lance-backend")]
323 pub async fn new_with_config(
324 base_uri: &str,
325 schema_manager: Arc<SchemaManager>,
326 config: UniConfig,
327 ) -> Result<Self> {
328 let store = Self::build_store_from_uri(base_uri)?;
329 Self::new_with_store_and_config(base_uri, store, schema_manager, config).await
330 }
331
332 #[cfg(feature = "lance-backend")]
334 pub async fn new_with_store_and_config(
335 base_uri: &str,
336 store: Arc<dyn ObjectStore>,
337 schema_manager: Arc<SchemaManager>,
338 config: UniConfig,
339 ) -> Result<Self> {
340 Self::new_with_store_and_storage_options(base_uri, store, schema_manager, config, None)
341 .await
342 }
343
344 #[cfg(feature = "lance-backend")]
346 pub async fn new_with_store_and_storage_options(
347 base_uri: &str,
348 store: Arc<dyn ObjectStore>,
349 schema_manager: Arc<SchemaManager>,
350 config: UniConfig,
351 lancedb_storage_options: Option<HashMap<String, String>>,
352 ) -> Result<Self> {
353 let backend = Arc::new(LanceDbBackend::connect(base_uri, lancedb_storage_options).await?);
354 Self::new_with_backend(base_uri, store, backend, schema_manager, config).await
355 }
356
357 async fn recover_all_staging_tables(
362 backend: &dyn StorageBackend,
363 schema_manager: &SchemaManager,
364 ) -> Result<()> {
365 let schema = schema_manager.schema();
366
367 backend
369 .recover_staging(table_names::main_vertex_table_name())
370 .await?;
371 backend
372 .recover_staging(table_names::main_edge_table_name())
373 .await?;
374
375 for label in schema.labels.keys() {
377 let name = table_names::vertex_table_name(label);
378 backend.recover_staging(&name).await?;
379 }
380
381 for edge_type in schema.edge_types.keys() {
383 for direction in &["fwd", "bwd"] {
384 let delta_name = table_names::delta_table_name(edge_type, direction);
386 backend.recover_staging(&delta_name).await?;
387
388 for _label in schema.labels.keys() {
390 let adj_name = table_names::adjacency_table_name(edge_type, direction);
391 backend.recover_staging(&adj_name).await?;
392 }
393 }
394 }
395
396 Ok(())
397 }
398
399 #[cfg(feature = "lance-backend")]
400 fn build_store_from_uri(base_uri: &str) -> Result<Arc<dyn ObjectStore>> {
401 if base_uri.contains("://") {
402 let parsed = url::Url::parse(base_uri).map_err(|e| anyhow!("Invalid base URI: {e}"))?;
403 let (store, _path) = object_store::parse_url(&parsed)
404 .map_err(|e| anyhow!("Failed to parse object store URL: {e}"))?;
405 Ok(Arc::from(store))
406 } else {
407 std::fs::create_dir_all(base_uri)?;
409 Ok(Arc::new(LocalFileSystem::new_with_prefix(base_uri)?))
410 }
411 }
412
413 pub fn pinned(&self, snapshot: SnapshotManifest) -> Self {
414 Self {
422 base_uri: self.base_uri.clone(),
423 store: self.store.clone(),
424 schema_manager: self.schema_manager.clone(),
425 snapshot_manager: self.snapshot_manager.clone(),
426 adjacency_manager: Arc::new(AdjacencyManager::new(self.adjacency_manager.max_bytes())),
430 config: self.config.clone(),
431 compaction_status: Arc::new(Mutex::new(CompactionStatus::default())),
432 flush_in_progress: std::sync::atomic::AtomicUsize::new(0),
433 pinned_snapshot: Some(snapshot),
434 fork_scope: self.fork_scope.clone(),
435 backend: self.backend.clone(),
436 vid_labels_index: self.vid_labels_index.clone(),
437 }
438 }
439
440 pub fn at_fork(&self, scope: Arc<crate::fork::ForkScope>) -> Self {
455 self.at_fork_with_schema(scope, self.schema_manager.clone())
456 }
457
458 pub fn at_fork_with_schema(
466 &self,
467 scope: Arc<crate::fork::ForkScope>,
468 merged_schema: Arc<SchemaManager>,
469 ) -> Self {
470 debug_assert!(
471 self.pinned_snapshot.is_none(),
472 "forking a pinned StorageManager is unsupported in Phase 1"
473 );
474 let branched_backend: Arc<dyn StorageBackend> = Arc::new(
475 crate::backend::branched::BranchedBackend::new(self.backend.clone(), scope.clone()),
476 );
477 Self {
478 base_uri: self.base_uri.clone(),
479 store: self.store.clone(),
480 schema_manager: merged_schema,
481 snapshot_manager: self.snapshot_manager.clone(),
482 adjacency_manager: Arc::new(AdjacencyManager::new(self.adjacency_manager.max_bytes())),
483 config: self.config.clone(),
484 compaction_status: Arc::new(Mutex::new(CompactionStatus::default())),
485 flush_in_progress: std::sync::atomic::AtomicUsize::new(0),
486 pinned_snapshot: None,
487 fork_scope: Some(scope),
488 backend: branched_backend,
489 vid_labels_index: self.vid_labels_index.clone(),
490 }
491 }
492
493 pub fn fork_scope(&self) -> Option<&Arc<crate::fork::ForkScope>> {
495 self.fork_scope.as_ref()
496 }
497
498 #[must_use]
509 pub fn fork_index_exists(
510 &self,
511 label: &str,
512 column: &str,
513 ) -> Option<crate::fork::ForkLocalIndexKind> {
514 self.fork_scope
515 .as_ref()
516 .and_then(|s| s.fork_local_index(label, column))
517 }
518
519 pub fn base_uri(&self) -> &str {
522 &self.base_uri
523 }
524
525 pub fn get_edge_version_by_id(&self, edge_type_id: u32) -> Option<u64> {
526 let schema = self.schema_manager.schema();
527 let name = schema.edge_type_name_by_id(edge_type_id)?;
528 self.pinned_snapshot
529 .as_ref()
530 .and_then(|s| s.edges.get(name).map(|es| es.lance_version))
531 }
532
533 pub fn version_high_water_mark(&self) -> Option<u64> {
539 self.pinned_snapshot
540 .as_ref()
541 .map(|s| s.version_high_water_mark)
542 }
543
544 pub fn apply_version_filter(&self, base_filter: String) -> String {
549 if let Some(hwm) = self.version_high_water_mark() {
550 format!("({}) AND (_version <= {})", base_filter, hwm)
551 } else {
552 base_filter
553 }
554 }
555
556 fn build_active_filter(user_filter: Option<&str>) -> String {
559 match user_filter {
560 Some(expr) => format!("({}) AND (_deleted = false)", expr),
561 None => "_deleted = false".to_string(),
562 }
563 }
564
565 pub fn store(&self) -> Arc<dyn ObjectStore> {
566 self.store.clone()
567 }
568
569 pub fn compaction_status(
575 &self,
576 ) -> Result<CompactionStatus, uni_common::sync::LockPoisonedError> {
577 let guard = uni_common::sync::acquire_mutex(&self.compaction_status, "compaction_status")?;
578 Ok(guard.clone())
579 }
580
581 pub async fn compact(&self) -> Result<CompactionStats> {
582 let start = std::time::Instant::now();
584 let schema = self.schema_manager.schema();
585 let mut files_compacted = 0;
586
587 for label in schema.labels.keys() {
588 let name = table_names::vertex_table_name(label);
589 if self.backend.table_exists(&name).await? {
590 self.backend.optimize_table(&name).await?;
591 files_compacted += 1;
592 self.backend.invalidate_cache(&name);
593 }
594 }
595
596 Ok(CompactionStats {
597 files_compacted,
598 bytes_before: 0,
599 bytes_after: 0,
600 duration: start.elapsed(),
601 crdt_merges: 0,
602 })
603 }
604
605 pub async fn compact_label(&self, label: &str) -> Result<CompactionStats> {
606 let _guard = CompactionGuard::new(self.compaction_status.clone())
607 .ok_or_else(|| anyhow!("Compaction already in progress"))?;
608
609 let start = std::time::Instant::now();
610 let name = table_names::vertex_table_name(label);
611
612 if self.backend.table_exists(&name).await? {
613 self.backend.optimize_table(&name).await?;
614 self.backend.invalidate_cache(&name);
615 }
616
617 Ok(CompactionStats {
618 files_compacted: 1,
619 bytes_before: 0,
620 bytes_after: 0,
621 duration: start.elapsed(),
622 crdt_merges: 0,
623 })
624 }
625
626 pub async fn compact_edge_type(&self, edge_type: &str) -> Result<CompactionStats> {
627 let _guard = CompactionGuard::new(self.compaction_status.clone())
628 .ok_or_else(|| anyhow!("Compaction already in progress"))?;
629
630 let start = std::time::Instant::now();
631 let mut files_compacted = 0;
632
633 for dir in ["fwd", "bwd"] {
634 let name = table_names::delta_table_name(edge_type, dir);
635 if self.backend.table_exists(&name).await? {
636 self.backend.optimize_table(&name).await?;
637 files_compacted += 1;
638 }
639 }
640
641 Ok(CompactionStats {
642 files_compacted,
643 bytes_before: 0,
644 bytes_after: 0,
645 duration: start.elapsed(),
646 crdt_merges: 0,
647 })
648 }
649
650 pub async fn wait_for_compaction(&self) -> Result<()> {
651 loop {
652 let in_progress = {
653 acquire_mutex(&self.compaction_status, "compaction_status")?.compaction_in_progress
654 };
655 if !in_progress {
656 return Ok(());
657 }
658 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
659 }
660 }
661
662 pub fn start_background_compaction(
663 self: Arc<Self>,
664 mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
665 ) -> tokio::task::JoinHandle<()> {
666 if !self.config.compaction.enabled {
667 return tokio::spawn(async {});
668 }
669
670 tokio::spawn(async move {
671 let start = tokio::time::Instant::now() + self.config.compaction.check_interval;
677 let mut interval =
678 tokio::time::interval_at(start, self.config.compaction.check_interval);
679
680 loop {
681 tokio::select! {
682 _ = interval.tick() => {
683 if let Err(e) = self.update_compaction_status().await {
684 log::error!("Failed to update compaction status: {}", e);
685 continue;
686 }
687
688 if let Some(task) = self.pick_compaction_task() {
689 log::info!("Triggering background compaction: {:?}", task);
690 if let Err(e) = Self::execute_compaction(Arc::clone(&self), task).await {
691 log::error!("Compaction failed: {}", e);
692 }
693 }
694 }
695 _ = shutdown_rx.recv() => {
696 log::info!("Background compaction shutting down");
697 let _ = self.wait_for_compaction().await;
698 break;
699 }
700 }
701 }
702 })
703 }
704
705 async fn update_compaction_status(&self) -> Result<()> {
706 let schema = self.schema_manager.schema();
707 let backend = self.backend.as_ref();
708 let mut total_rows: usize = 0;
709 let mut oldest_ts: Option<i64> = None;
710
711 for name in schema.edge_types.keys() {
712 for dir in ["fwd", "bwd"] {
713 let tbl_name = table_names::delta_table_name(name, dir);
714 if !backend.table_exists(&tbl_name).await.unwrap_or(false) {
715 continue;
716 }
717 let row_count = backend.count_rows(&tbl_name, None).await.unwrap_or(0);
718 if row_count == 0 {
719 continue;
720 }
721 total_rows += row_count;
722
723 let request =
725 ScanRequest::all(&tbl_name).with_columns(vec!["_created_at".to_string()]);
726 let Ok(batches) = backend.scan(request).await else {
727 continue;
728 };
729 for batch in batches {
730 let Some(col) = batch
731 .column_by_name("_created_at")
732 .and_then(|c| c.as_any().downcast_ref::<TimestampNanosecondArray>())
733 else {
734 continue;
735 };
736 for i in 0..col.len() {
737 if !col.is_null(i) {
738 let ts = col.value(i);
739 oldest_ts = Some(oldest_ts.map_or(ts, |prev| prev.min(ts)));
740 }
741 }
742 }
743 }
744 }
745
746 let oldest_l1_age = oldest_ts
747 .and_then(|ts| {
748 let created = UNIX_EPOCH + Duration::from_nanos(ts as u64);
749 SystemTime::now().duration_since(created).ok()
750 })
751 .unwrap_or(Duration::ZERO);
752
753 let mut status = acquire_mutex(&self.compaction_status, "compaction_status")?;
754 status.l1_size_bytes = (total_rows * ENTRY_SIZE_ESTIMATE) as u64;
757 status.oldest_l1_age = oldest_l1_age;
758 Ok(())
759 }
760
761 fn pick_compaction_task(&self) -> Option<CompactionTask> {
762 let status = acquire_mutex(&self.compaction_status, "compaction_status").ok()?;
763
764 if status.l1_runs >= self.config.compaction.max_l1_runs {
765 return Some(CompactionTask::ByRunCount);
766 }
767 if status.l1_size_bytes >= self.config.compaction.max_l1_size_bytes {
768 return Some(CompactionTask::BySize);
769 }
770 if status.oldest_l1_age >= self.config.compaction.max_l1_age
771 && status.oldest_l1_age > Duration::ZERO
772 {
773 return Some(CompactionTask::ByAge);
774 }
775
776 None
777 }
778
779 async fn try_optimize_table(backend: &dyn StorageBackend, table_name: &str) -> bool {
781 if let Err(e) = backend.optimize_table(table_name).await {
782 log::warn!("Failed to optimize table {}: {}", table_name, e);
783 return false;
784 }
785 true
786 }
787
788 pub fn trigger_async_compaction(self: &Arc<Self>) {
791 let this = Arc::clone(self);
792 tokio::spawn(async move {
793 if let Err(e) = Self::execute_compaction(this, CompactionTask::ByRunCount).await {
794 log::debug!("Post-flush compaction skipped: {}", e);
796 }
797 });
798 }
799
800 pub(crate) async fn execute_compaction(
801 this: Arc<Self>,
802 _task: CompactionTask,
803 ) -> Result<CompactionStats> {
804 let start = std::time::Instant::now();
805 let _guard = CompactionGuard::new(this.compaction_status.clone())
806 .ok_or_else(|| anyhow!("Compaction already in progress"))?;
807
808 let schema = this.schema_manager.schema();
809 let mut files_compacted = 0;
810
811 let compactor = Compactor::new(Arc::clone(&this));
814 let compaction_results = compactor.compact_all().await.unwrap_or_else(|e| {
815 log::error!(
816 "Semantic compaction failed (continuing with backend optimize): {}",
817 e
818 );
819 Vec::new()
820 });
821
822 let am = this.adjacency_manager();
824 for info in &compaction_results {
825 let direction = match info.direction.as_str() {
826 "fwd" => Direction::Outgoing,
827 "bwd" => Direction::Incoming,
828 _ => continue,
829 };
830 if let Some(etid) = schema.edge_type_id_unified_case_insensitive(&info.edge_type)
831 && let Err(e) = am.warm(&this, etid, direction, None).await
832 {
833 log::warn!(
834 "Failed to re-warm adjacency for {}/{}: {}",
835 info.edge_type,
836 info.direction,
837 e
838 );
839 }
840 }
841
842 let backend = this.backend.as_ref();
844
845 for name in schema.edge_types.keys() {
847 for dir in ["fwd", "bwd"] {
848 let delta = table_names::delta_table_name(name, dir);
849 if Self::try_optimize_table(backend, &delta).await {
850 files_compacted += 1;
851 }
852 let adj = table_names::adjacency_table_name(name, dir);
853 if Self::try_optimize_table(backend, &adj).await {
854 files_compacted += 1;
855 }
856 }
857 }
858
859 for label in schema.labels.keys() {
861 let tbl = table_names::vertex_table_name(label);
862 if Self::try_optimize_table(backend, &tbl).await {
863 files_compacted += 1;
864 backend.invalidate_cache(&tbl);
865 }
866 }
867
868 for tbl in [
870 table_names::main_vertex_table_name(),
871 table_names::main_edge_table_name(),
872 ] {
873 if Self::try_optimize_table(backend, tbl).await {
874 files_compacted += 1;
875 }
876 }
877
878 {
879 let mut status = acquire_mutex(&this.compaction_status, "compaction_status")?;
880 status.total_compactions += 1;
881 status.l1_runs = 0; }
883
884 Ok(CompactionStats {
885 files_compacted,
886 bytes_before: 0,
887 bytes_after: 0,
888 duration: start.elapsed(),
889 crdt_merges: 0,
890 })
891 }
892
893 pub fn invalidate_table_cache(&self, label: &str) {
897 let name = table_names::vertex_table_name(label);
898 self.backend.invalidate_cache(&name);
899 }
900
901 pub fn base_path(&self) -> &str {
902 &self.base_uri
903 }
904
905 pub fn schema_manager(&self) -> &SchemaManager {
906 &self.schema_manager
907 }
908
909 pub fn schema_manager_arc(&self) -> Arc<SchemaManager> {
910 self.schema_manager.clone()
911 }
912
913 pub fn adjacency_manager(&self) -> Arc<AdjacencyManager> {
915 Arc::clone(&self.adjacency_manager)
916 }
917
918 pub async fn warm_adjacency(
923 &self,
924 edge_type_id: u32,
925 direction: crate::storage::direction::Direction,
926 version: Option<u64>,
927 ) -> anyhow::Result<()> {
928 self.adjacency_manager
929 .warm(self, edge_type_id, direction, version)
930 .await
931 }
932
933 pub async fn warm_adjacency_coalesced(
938 &self,
939 edge_type_id: u32,
940 direction: crate::storage::direction::Direction,
941 version: Option<u64>,
942 ) -> anyhow::Result<()> {
943 self.adjacency_manager
944 .warm_coalesced(self, edge_type_id, direction, version)
945 .await
946 }
947
948 pub fn has_adjacency_csr(
950 &self,
951 edge_type_id: u32,
952 direction: crate::storage::direction::Direction,
953 ) -> bool {
954 self.adjacency_manager.has_csr(edge_type_id, direction)
955 }
956
957 pub fn get_neighbors_at_version(
959 &self,
960 vid: uni_common::core::id::Vid,
961 edge_type: u32,
962 direction: crate::storage::direction::Direction,
963 version: u64,
964 ) -> Vec<(uni_common::core::id::Vid, uni_common::core::id::Eid)> {
965 self.adjacency_manager
966 .get_neighbors_at_version(vid, edge_type, direction, version)
967 }
968
969 pub fn backend(&self) -> &dyn StorageBackend {
971 self.backend.as_ref()
972 }
973
974 pub fn backend_arc(&self) -> Arc<dyn StorageBackend> {
976 self.backend.clone()
977 }
978
979 async fn rebuild_vid_labels_index(&mut self) -> Result<()> {
982 use crate::storage::vid_labels::VidLabelsIndex;
983
984 let backend = self.backend.as_ref();
985 let vtable = table_names::main_vertex_table_name();
986
987 if !backend.table_exists(vtable).await.unwrap_or(false) {
989 self.vid_labels_index = Some(Arc::new(parking_lot::RwLock::new(VidLabelsIndex::new())));
990 return Ok(());
991 }
992
993 let request = ScanRequest::all(vtable)
995 .with_filter("_deleted = false")
996 .with_limit(100_000);
997 let batches = backend
998 .scan(request)
999 .await
1000 .map_err(|e| anyhow!("Failed to query main vertex table: {}", e))?;
1001
1002 let mut index = VidLabelsIndex::new();
1003 for batch in batches {
1004 let vid_col = batch
1005 .column_by_name("_vid")
1006 .ok_or_else(|| anyhow!("Missing _vid column"))?
1007 .as_any()
1008 .downcast_ref::<UInt64Array>()
1009 .ok_or_else(|| anyhow!("Invalid _vid column type"))?;
1010
1011 let labels_col = batch
1012 .column_by_name("labels")
1013 .ok_or_else(|| anyhow!("Missing labels column"))?
1014 .as_any()
1015 .downcast_ref::<arrow_array::ListArray>()
1016 .ok_or_else(|| anyhow!("Invalid labels column type"))?;
1017
1018 for row_idx in 0..batch.num_rows() {
1019 let vid = Vid::from(vid_col.value(row_idx));
1020 let labels_array = labels_col.value(row_idx);
1021 let labels_str_array = labels_array
1022 .as_any()
1023 .downcast_ref::<arrow_array::StringArray>()
1024 .ok_or_else(|| anyhow!("Invalid labels array element type"))?;
1025
1026 let labels: Vec<String> = (0..labels_str_array.len())
1027 .map(|i| labels_str_array.value(i).to_string())
1028 .collect();
1029
1030 index.insert(vid, labels);
1031 }
1032 }
1033
1034 self.vid_labels_index = Some(Arc::new(parking_lot::RwLock::new(index)));
1035 Ok(())
1036 }
1037
1038 pub fn get_labels_from_index(&self, vid: Vid) -> Option<Vec<String>> {
1041 self.vid_labels_index.as_ref().and_then(|idx| {
1042 let index = idx.read();
1043 index.get_labels(vid).map(|labels| labels.to_vec())
1044 })
1045 }
1046
1047 pub fn update_vid_labels_index(&self, vid: Vid, labels: Vec<String>) {
1050 if let Some(idx) = &self.vid_labels_index {
1051 let mut index = idx.write();
1052 index.insert(vid, labels);
1053 }
1054 }
1055
1056 pub fn remove_from_vid_labels_index(&self, vid: Vid) {
1059 if let Some(idx) = &self.vid_labels_index {
1060 let mut index = idx.write();
1061 index.remove_vid(vid);
1062 }
1063 }
1064
1065 pub async fn load_subgraph_cached(
1066 &self,
1067 start_vids: &[Vid],
1068 edge_types: &[u32],
1069 max_hops: usize,
1070 direction: GraphDirection,
1071 _l0: Option<Arc<RwLock<L0Buffer>>>,
1072 ) -> Result<WorkingGraph> {
1073 let mut graph = WorkingGraph::new();
1074
1075 let dir = match direction {
1076 GraphDirection::Outgoing => crate::storage::direction::Direction::Outgoing,
1077 GraphDirection::Incoming => crate::storage::direction::Direction::Incoming,
1078 };
1079
1080 let neighbor_is_dst = matches!(direction, GraphDirection::Outgoing);
1081
1082 let mut frontier: Vec<Vid> = start_vids.to_vec();
1084 let mut visited: HashSet<Vid> = HashSet::new();
1085
1086 for &vid in start_vids {
1088 graph.add_vertex(vid);
1089 }
1090
1091 for _hop in 0..max_hops {
1092 let mut next_frontier = HashSet::new();
1093
1094 for &vid in &frontier {
1095 if visited.contains(&vid) {
1096 continue;
1097 }
1098 visited.insert(vid);
1099 graph.add_vertex(vid);
1100
1101 for &etype_id in edge_types {
1102 let edge_ver = self.version_high_water_mark();
1104 self.adjacency_manager
1105 .warm_coalesced(self, etype_id, dir, edge_ver)
1106 .await?;
1107
1108 let edges = self.adjacency_manager.get_neighbors(vid, etype_id, dir);
1110
1111 for (neighbor_vid, eid) in edges {
1112 graph.add_vertex(neighbor_vid);
1113 if !visited.contains(&neighbor_vid) {
1114 next_frontier.insert(neighbor_vid);
1115 }
1116
1117 if neighbor_is_dst {
1118 graph.add_edge(vid, neighbor_vid, eid, etype_id);
1119 } else {
1120 graph.add_edge(neighbor_vid, vid, eid, etype_id);
1121 }
1122 }
1123 }
1124 }
1125 frontier = next_frontier.into_iter().collect();
1126
1127 if frontier.is_empty() {
1129 break;
1130 }
1131 }
1132
1133 Ok(graph)
1134 }
1135
1136 pub fn snapshot_manager(&self) -> &SnapshotManager {
1137 &self.snapshot_manager
1138 }
1139
1140 pub fn index_manager(&self) -> IndexManager {
1141 IndexManager::new(&self.base_uri, self.schema_manager.clone())
1142 }
1143
1144 pub async fn scan_vertex_table(
1153 &self,
1154 label: &str,
1155 columns: &[&str],
1156 additional_filter: Option<&str>,
1157 ) -> Result<Option<arrow_array::RecordBatch>> {
1158 let backend = self.backend();
1159 let table_name = table_names::vertex_table_name(label);
1160
1161 if !backend.table_exists(&table_name).await.unwrap_or(false) {
1162 return Ok(None);
1163 }
1164
1165 let actual_columns =
1167 if let Some(table_schema) = backend.get_table_schema(&table_name).await? {
1168 let table_field_names: HashSet<&str> = table_schema
1169 .fields()
1170 .iter()
1171 .map(|f| f.name().as_str())
1172 .collect();
1173 columns
1174 .iter()
1175 .copied()
1176 .filter(|c| table_field_names.contains(c))
1177 .map(|s| s.to_string())
1178 .collect::<Vec<_>>()
1179 } else {
1180 return Ok(None);
1181 };
1182
1183 let filter = match (self.version_high_water_mark(), additional_filter) {
1185 (Some(hwm), Some(f)) => Some(format!("_version <= {} AND ({})", hwm, f)),
1186 (Some(hwm), None) => Some(format!("_version <= {}", hwm)),
1187 (None, Some(f)) => Some(f.to_string()),
1188 (None, None) => None,
1189 };
1190
1191 let mut request = ScanRequest::all(&table_name).with_columns(actual_columns);
1192 if let Some(f) = filter {
1193 request = request.with_filter(f);
1194 }
1195
1196 match backend.scan(request).await {
1197 Ok(batches) => {
1198 if batches.is_empty() {
1199 Ok(None)
1200 } else {
1201 Ok(Some(arrow::compute::concat_batches(
1202 &batches[0].schema(),
1203 &batches,
1204 )?))
1205 }
1206 }
1207 Err(_) => Ok(None),
1208 }
1209 }
1210
1211 pub async fn scan_delta_table(
1214 &self,
1215 edge_type: &str,
1216 direction: &str,
1217 columns: &[&str],
1218 additional_filter: Option<&str>,
1219 ) -> Result<Option<arrow_array::RecordBatch>> {
1220 let backend = self.backend();
1221 let table_name = table_names::delta_table_name(edge_type, direction);
1222
1223 if !backend.table_exists(&table_name).await.unwrap_or(false) {
1224 return Ok(None);
1225 }
1226
1227 let actual_columns =
1229 if let Some(table_schema) = backend.get_table_schema(&table_name).await? {
1230 let table_field_names: HashSet<&str> = table_schema
1231 .fields()
1232 .iter()
1233 .map(|f| f.name().as_str())
1234 .collect();
1235 columns
1236 .iter()
1237 .copied()
1238 .filter(|c| table_field_names.contains(c))
1239 .map(|s| s.to_string())
1240 .collect::<Vec<_>>()
1241 } else {
1242 return Ok(None);
1243 };
1244
1245 let filter = match (self.version_high_water_mark(), additional_filter) {
1246 (Some(hwm), Some(f)) => Some(format!("_version <= {} AND ({})", hwm, f)),
1247 (Some(hwm), None) => Some(format!("_version <= {}", hwm)),
1248 (None, Some(f)) => Some(f.to_string()),
1249 (None, None) => None,
1250 };
1251
1252 let mut request = ScanRequest::all(&table_name).with_columns(actual_columns);
1253 if let Some(f) = filter {
1254 request = request.with_filter(f);
1255 }
1256
1257 match backend.scan(request).await {
1258 Ok(batches) => {
1259 if batches.is_empty() {
1260 Ok(None)
1261 } else {
1262 Ok(Some(arrow::compute::concat_batches(
1263 &batches[0].schema(),
1264 &batches,
1265 )?))
1266 }
1267 }
1268 Err(_) => Ok(None),
1269 }
1270 }
1271
1272 pub async fn scan_main_vertex_table(
1277 &self,
1278 columns: &[&str],
1279 filter: Option<&str>,
1280 ) -> Result<Option<arrow_array::RecordBatch>> {
1281 let backend = self.backend();
1282 let table_name = table_names::main_vertex_table_name();
1283
1284 if !backend.table_exists(table_name).await.unwrap_or(false) {
1285 return Ok(None);
1286 }
1287
1288 let full_filter = match (self.version_high_water_mark(), filter) {
1290 (Some(hwm), Some(f)) => Some(format!("_version <= {} AND ({})", hwm, f)),
1291 (Some(hwm), None) => Some(format!("_version <= {}", hwm)),
1292 (None, Some(f)) => Some(f.to_string()),
1293 (None, None) => None,
1294 };
1295
1296 let request = ScanRequest::all(table_name)
1297 .with_columns(columns.iter().map(|s| s.to_string()).collect());
1298 let request = match full_filter.as_deref() {
1299 Some(f) => request.with_filter(f),
1300 None => request,
1301 };
1302
1303 match backend.scan(request).await {
1304 Ok(batches) => {
1305 if batches.is_empty() {
1306 Ok(None)
1307 } else {
1308 Ok(Some(arrow::compute::concat_batches(
1309 &batches[0].schema(),
1310 &batches,
1311 )?))
1312 }
1313 }
1314 Err(_) => Ok(None),
1315 }
1316 }
1317
1318 pub async fn scan_main_edge_table_stream(
1320 &self,
1321 filter: Option<&str>,
1322 ) -> Result<
1323 Option<
1324 std::pin::Pin<Box<dyn futures::Stream<Item = Result<arrow_array::RecordBatch>> + Send>>,
1325 >,
1326 > {
1327 let backend = self.backend();
1328 let table_name = table_names::main_edge_table_name();
1329
1330 if !backend.table_exists(table_name).await.unwrap_or(false) {
1331 return Ok(None);
1332 }
1333
1334 let mut request = ScanRequest::all(table_name);
1335 if let Some(f) = filter {
1336 request = request.with_filter(f);
1337 }
1338
1339 let stream = backend.scan_stream(request).await?;
1340 Ok(Some(stream))
1341 }
1342
1343 pub async fn scan_vertex_table_stream(
1345 &self,
1346 label: &str,
1347 ) -> Result<
1348 Option<
1349 std::pin::Pin<Box<dyn futures::Stream<Item = Result<arrow_array::RecordBatch>> + Send>>,
1350 >,
1351 > {
1352 let backend = self.backend();
1353 let table_name = table_names::vertex_table_name(label);
1354
1355 if !backend.table_exists(&table_name).await.unwrap_or(false) {
1356 return Ok(None);
1357 }
1358
1359 let stream = backend.scan_stream(ScanRequest::all(&table_name)).await?;
1360 Ok(Some(stream))
1361 }
1362
1363 pub async fn find_vertex_by_ext_id(&self, ext_id: &str) -> Result<Option<Vid>> {
1365 MainVertexDataset::find_by_ext_id(self.backend(), ext_id, self.version_high_water_mark())
1366 .await
1367 }
1368
1369 pub async fn find_vertex_labels_by_vid(&self, vid: Vid) -> Result<Option<Vec<String>>> {
1371 MainVertexDataset::find_labels_by_vid(self.backend(), vid, self.version_high_water_mark())
1372 .await
1373 }
1374
1375 pub async fn find_edges_by_type_names(
1377 &self,
1378 type_names: &[&str],
1379 ) -> Result<Vec<(Eid, Vid, Vid, String, uni_common::Properties)>> {
1380 MainEdgeDataset::find_edges_by_type_names(self.backend(), type_names).await
1381 }
1382
1383 pub async fn scan_vertex_candidates(
1385 &self,
1386 label: &str,
1387 filter: Option<&str>,
1388 ) -> Result<Vec<Vid>> {
1389 let backend = self.backend();
1390 let table_name = table_names::vertex_table_name(label);
1391
1392 if !backend.table_exists(&table_name).await.unwrap_or(false) {
1393 return Ok(Vec::new());
1394 }
1395
1396 let full_filter = match filter {
1397 Some(f) => format!("_deleted = false AND ({})", f),
1398 None => "_deleted = false".to_string(),
1399 };
1400
1401 let request = ScanRequest::all(&table_name)
1402 .with_filter(full_filter)
1403 .with_columns(vec!["_vid".to_string()]);
1404
1405 let batches = backend.scan(request).await?;
1406
1407 let mut vids = Vec::new();
1408 for batch in batches {
1409 let vid_col = batch
1410 .column_by_name("_vid")
1411 .ok_or(anyhow!("Missing _vid"))?
1412 .as_any()
1413 .downcast_ref::<UInt64Array>()
1414 .ok_or(anyhow!("Invalid _vid"))?;
1415 for i in 0..batch.num_rows() {
1416 vids.push(Vid::from(vid_col.value(i)));
1417 }
1418 }
1419 Ok(vids)
1420 }
1421
1422 pub fn vertex_dataset(&self, label: &str) -> Result<VertexDataset> {
1423 let schema = self.schema_manager.schema();
1424 let label_meta = schema
1425 .labels
1426 .get(label)
1427 .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
1428 let key = format!("vertices_{label}");
1429 match self.fork_branch_for(&key) {
1430 Some(branch) => Ok(VertexDataset::new_branched(
1431 &self.base_uri,
1432 label,
1433 label_meta.id,
1434 branch,
1435 )),
1436 None => Ok(VertexDataset::new(&self.base_uri, label, label_meta.id)),
1437 }
1438 }
1439
1440 #[cfg(feature = "lance-backend")]
1441 pub fn edge_dataset(
1442 &self,
1443 edge_type: &str,
1444 src_label: &str,
1445 dst_label: &str,
1446 ) -> Result<EdgeDataset> {
1447 let key = format!("edges_{edge_type}");
1448 match self.fork_branch_for(&key) {
1449 Some(branch) => Ok(EdgeDataset::new_branched(
1450 &self.base_uri,
1451 edge_type,
1452 src_label,
1453 dst_label,
1454 branch,
1455 )),
1456 None => Ok(EdgeDataset::new(
1457 &self.base_uri,
1458 edge_type,
1459 src_label,
1460 dst_label,
1461 )),
1462 }
1463 }
1464
1465 pub fn delta_dataset(&self, edge_type: &str, direction: &str) -> Result<DeltaDataset> {
1466 let key = format!("deltas_{edge_type}_{direction}");
1467 match self.fork_branch_for(&key) {
1468 Some(branch) => Ok(DeltaDataset::new_branched(
1469 &self.base_uri,
1470 edge_type,
1471 direction,
1472 branch,
1473 )),
1474 None => Ok(DeltaDataset::new(&self.base_uri, edge_type, direction)),
1475 }
1476 }
1477
1478 pub fn adjacency_dataset(
1479 &self,
1480 edge_type: &str,
1481 label: &str,
1482 direction: &str,
1483 ) -> Result<AdjacencyDataset> {
1484 let key = format!("adjacency_{direction}_{edge_type}_{label}");
1485 match self.fork_branch_for(&key) {
1486 Some(branch) => Ok(AdjacencyDataset::new_branched(
1487 &self.base_uri,
1488 edge_type,
1489 label,
1490 direction,
1491 branch,
1492 )),
1493 None => Ok(AdjacencyDataset::new(
1494 &self.base_uri,
1495 edge_type,
1496 label,
1497 direction,
1498 )),
1499 }
1500 }
1501
1502 fn fork_branch_for(&self, dataset_name: &str) -> Option<String> {
1506 self.fork_scope
1507 .as_ref()
1508 .and_then(|s| s.branch_for(dataset_name))
1509 }
1510
1511 pub fn main_vertex_dataset(&self) -> MainVertexDataset {
1516 MainVertexDataset::new(&self.base_uri)
1517 }
1518
1519 pub fn main_edge_dataset(&self) -> MainEdgeDataset {
1524 MainEdgeDataset::new(&self.base_uri)
1525 }
1526
1527 #[cfg(feature = "lance-backend")]
1528 pub fn uid_index(&self, label: &str) -> Result<UidIndex> {
1529 Ok(UidIndex::new(&self.base_uri, label))
1530 }
1531
1532 #[cfg(feature = "lance-backend")]
1533 pub async fn inverted_index(&self, label: &str, property: &str) -> Result<InvertedIndex> {
1534 let schema = self.schema_manager.schema();
1535 let config = schema
1536 .indexes
1537 .iter()
1538 .find_map(|idx| match idx {
1539 IndexDefinition::Inverted(cfg)
1540 if cfg.label == label && cfg.property == property =>
1541 {
1542 Some(cfg.clone())
1543 }
1544 _ => None,
1545 })
1546 .ok_or_else(|| anyhow!("Inverted index not found for {}.{}", label, property))?;
1547
1548 InvertedIndex::new(&self.base_uri, config).await
1549 }
1550
1551 pub async fn vector_search(
1552 &self,
1553 label: &str,
1554 property: &str,
1555 query: &[f32],
1556 k: usize,
1557 filter: Option<&str>,
1558 ctx: Option<&QueryContext>,
1559 ) -> Result<Vec<(Vid, f32)>> {
1560 use crate::backend::types::{DistanceMetric as BackendMetric, FilterExpr};
1561
1562 let schema = self.schema_manager.schema();
1564 let metric = schema
1565 .vector_index_for_property(label, property)
1566 .map(|config| config.metric.clone())
1567 .unwrap_or(DistanceMetric::L2);
1568
1569 let backend = self.backend.as_ref();
1570 let name = table_names::vertex_table_name(label);
1571
1572 let mut results = Vec::new();
1573
1574 if backend.table_exists(&name).await.unwrap_or(false) {
1576 let backend_metric = match &metric {
1577 DistanceMetric::L2 => BackendMetric::L2,
1578 DistanceMetric::Cosine => BackendMetric::Cosine,
1579 DistanceMetric::Dot => BackendMetric::Dot,
1580 _ => BackendMetric::L2,
1581 };
1582
1583 let mut filter_parts = vec![Self::build_active_filter(filter)];
1585 if ctx.is_some()
1586 && let Some(hwm) = self.version_high_water_mark()
1587 {
1588 filter_parts.push(format!("_version <= {}", hwm));
1589 }
1590 let combined_filter = FilterExpr::Sql(filter_parts.join(" AND "));
1591
1592 let batches = backend
1593 .vector_search(&name, property, query, k, backend_metric, combined_filter)
1594 .await?;
1595
1596 results = extract_vid_score_pairs(&batches, "_vid", "_distance")?;
1597 }
1598
1599 if let Some(qctx) = ctx {
1601 merge_l0_into_vector_results(&mut results, qctx, label, property, query, k, &metric);
1602 }
1603
1604 Ok(results)
1605 }
1606
1607 pub async fn fts_search(
1623 &self,
1624 label: &str,
1625 property: &str,
1626 query: &str,
1627 k: usize,
1628 filter: Option<&str>,
1629 ctx: Option<&QueryContext>,
1630 ) -> Result<Vec<(Vid, f32)>> {
1631 use crate::backend::types::FilterExpr;
1632
1633 let backend = self.backend.as_ref();
1634 let name = table_names::vertex_table_name(label);
1635
1636 let mut results = if backend.table_exists(&name).await.unwrap_or(false) {
1637 let mut filter_parts = vec![Self::build_active_filter(filter)];
1639 if ctx.is_some()
1640 && let Some(hwm) = self.version_high_water_mark()
1641 {
1642 filter_parts.push(format!("_version <= {}", hwm));
1643 }
1644 let combined_filter = FilterExpr::Sql(filter_parts.join(" AND "));
1645
1646 let batches = backend
1647 .full_text_search(&name, property, query, k, combined_filter)
1648 .await?;
1649
1650 let mut fts_results = extract_vid_score_pairs(&batches, "_vid", "_score")?;
1651 fts_results.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
1653 fts_results
1654 } else {
1655 Vec::new()
1656 };
1657
1658 if let Some(qctx) = ctx {
1660 merge_l0_into_fts_results(&mut results, qctx, label, property, query, k);
1661 }
1662
1663 Ok(results)
1664 }
1665
1666 #[cfg(feature = "lance-backend")]
1667 pub async fn get_vertex_by_uid(&self, uid: &UniId, label: &str) -> Result<Option<Vid>> {
1668 let index = self.uid_index(label)?;
1669 index.get_vid(uid).await
1670 }
1671
1672 #[cfg(feature = "lance-backend")]
1673 pub async fn insert_vertex_with_uid(&self, label: &str, vid: Vid, uid: UniId) -> Result<()> {
1674 let index = self.uid_index(label)?;
1675 index.write_mapping(&[(uid, vid)]).await
1676 }
1677
1678 pub async fn load_subgraph(
1679 &self,
1680 start_vids: &[Vid],
1681 edge_types: &[u32],
1682 max_hops: usize,
1683 direction: GraphDirection,
1684 l0: Option<&L0Buffer>,
1685 ) -> Result<WorkingGraph> {
1686 let mut graph = WorkingGraph::new();
1687 let schema = self.schema_manager.schema();
1688
1689 let label_map: HashMap<u16, String> = schema
1691 .labels
1692 .values()
1693 .map(|meta| {
1694 (
1695 meta.id,
1696 schema.label_name_by_id(meta.id).unwrap().to_owned(),
1697 )
1698 })
1699 .collect();
1700
1701 let edge_type_map: HashMap<u32, String> = schema
1702 .edge_types
1703 .values()
1704 .map(|meta| {
1705 (
1706 meta.id,
1707 schema.edge_type_name_by_id(meta.id).unwrap().to_owned(),
1708 )
1709 })
1710 .collect();
1711
1712 let target_edge_types: HashSet<u32> = edge_types.iter().cloned().collect();
1713
1714 let mut frontier: Vec<Vid> = start_vids.to_vec();
1716 let mut visited: HashSet<Vid> = HashSet::new();
1717
1718 for &vid in start_vids {
1720 graph.add_vertex(vid);
1721 }
1722
1723 for _hop in 0..max_hops {
1724 let mut next_frontier = HashSet::new();
1725
1726 for &vid in &frontier {
1727 if visited.contains(&vid) {
1728 continue;
1729 }
1730 visited.insert(vid);
1731 graph.add_vertex(vid);
1732
1733 for &etype_id in &target_edge_types {
1735 let etype_name = edge_type_map
1736 .get(&etype_id)
1737 .ok_or_else(|| anyhow!("Unknown edge type ID: {}", etype_id))?;
1738
1739 let (dir_str, neighbor_is_dst) = match direction {
1743 GraphDirection::Outgoing => ("fwd", true),
1744 GraphDirection::Incoming => ("bwd", false),
1745 };
1746
1747 let mut edges: HashMap<Eid, EdgeState> = HashMap::new();
1748
1749 let _edge_ver = self
1754 .pinned_snapshot
1755 .as_ref()
1756 .and_then(|s| s.edges.get(etype_name).map(|es| es.lance_version));
1757
1758 let backend = self.backend();
1760 for current_src_label in label_map.values() {
1761 let adj_ds =
1762 match self.adjacency_dataset(etype_name, current_src_label, dir_str) {
1763 Ok(ds) => ds,
1764 Err(_) => continue,
1765 };
1766 if let Some((neighbors, eids)) =
1767 adj_ds.read_adjacency_backend(backend, vid).await?
1768 {
1769 for (n, eid) in neighbors.into_iter().zip(eids) {
1770 edges.insert(
1771 eid,
1772 EdgeState {
1773 neighbor: n,
1774 version: 0,
1775 deleted: false,
1776 },
1777 );
1778 }
1779 break; }
1781 }
1782
1783 let delta_ds = self.delta_dataset(etype_name, dir_str)?;
1785 let delta_entries = delta_ds
1786 .read_deltas(backend, vid, &schema, self.version_high_water_mark())
1787 .await?;
1788 Self::apply_delta_to_edges(&mut edges, delta_entries, neighbor_is_dst);
1789
1790 if let Some(l0) = l0 {
1792 Self::apply_l0_to_edges(&mut edges, l0, vid, etype_id, direction);
1793 }
1794
1795 Self::add_edges_to_graph(
1797 &mut graph,
1798 edges,
1799 vid,
1800 etype_id,
1801 neighbor_is_dst,
1802 &visited,
1803 &mut next_frontier,
1804 );
1805 }
1806 }
1807 frontier = next_frontier.into_iter().collect();
1808
1809 if frontier.is_empty() {
1811 break;
1812 }
1813 }
1814
1815 Ok(graph)
1816 }
1817
1818 fn apply_delta_to_edges(
1820 edges: &mut HashMap<Eid, EdgeState>,
1821 delta_entries: Vec<crate::storage::delta::L1Entry>,
1822 neighbor_is_dst: bool,
1823 ) {
1824 for entry in delta_entries {
1825 let neighbor = if neighbor_is_dst {
1826 entry.dst_vid
1827 } else {
1828 entry.src_vid
1829 };
1830 let current_ver = edges.get(&entry.eid).map(|s| s.version).unwrap_or(0);
1831
1832 if entry.version > current_ver {
1833 edges.insert(
1834 entry.eid,
1835 EdgeState {
1836 neighbor,
1837 version: entry.version,
1838 deleted: matches!(entry.op, Op::Delete),
1839 },
1840 );
1841 }
1842 }
1843 }
1844
1845 fn apply_l0_to_edges(
1847 edges: &mut HashMap<Eid, EdgeState>,
1848 l0: &L0Buffer,
1849 vid: Vid,
1850 etype_id: u32,
1851 direction: GraphDirection,
1852 ) {
1853 let l0_neighbors = l0.get_neighbors(vid, etype_id, direction);
1854 for (neighbor, eid, ver) in l0_neighbors {
1855 let current_ver = edges.get(&eid).map(|s| s.version).unwrap_or(0);
1856 if ver > current_ver {
1857 edges.insert(
1858 eid,
1859 EdgeState {
1860 neighbor,
1861 version: ver,
1862 deleted: false,
1863 },
1864 );
1865 }
1866 }
1867
1868 for (eid, state) in edges.iter_mut() {
1870 if l0.is_tombstoned(*eid) {
1871 state.deleted = true;
1872 }
1873 }
1874 }
1875
1876 fn add_edges_to_graph(
1878 graph: &mut WorkingGraph,
1879 edges: HashMap<Eid, EdgeState>,
1880 vid: Vid,
1881 etype_id: u32,
1882 neighbor_is_dst: bool,
1883 visited: &HashSet<Vid>,
1884 next_frontier: &mut HashSet<Vid>,
1885 ) {
1886 for (eid, state) in edges {
1887 if state.deleted {
1888 continue;
1889 }
1890 graph.add_vertex(state.neighbor);
1891
1892 if !visited.contains(&state.neighbor) {
1893 next_frontier.insert(state.neighbor);
1894 }
1895
1896 if neighbor_is_dst {
1897 graph.add_edge(vid, state.neighbor, eid, etype_id);
1898 } else {
1899 graph.add_edge(state.neighbor, vid, eid, etype_id);
1900 }
1901 }
1902 }
1903}
1904
1905fn extract_vid_score_pairs(
1907 batches: &[arrow_array::RecordBatch],
1908 vid_column: &str,
1909 score_column: &str,
1910) -> Result<Vec<(Vid, f32)>> {
1911 let mut results = Vec::new();
1912 for batch in batches {
1913 let vid_col = batch
1914 .column_by_name(vid_column)
1915 .ok_or_else(|| anyhow!("Missing {} column", vid_column))?
1916 .as_any()
1917 .downcast_ref::<UInt64Array>()
1918 .ok_or_else(|| anyhow!("Invalid {} column type", vid_column))?;
1919
1920 let score_col = batch
1921 .column_by_name(score_column)
1922 .ok_or_else(|| anyhow!("Missing {} column", score_column))?
1923 .as_any()
1924 .downcast_ref::<Float32Array>()
1925 .ok_or_else(|| anyhow!("Invalid {} column type", score_column))?;
1926
1927 for i in 0..batch.num_rows() {
1928 results.push((Vid::from(vid_col.value(i)), score_col.value(i)));
1929 }
1930 }
1931 Ok(results)
1932}
1933
1934fn extract_embedding_from_props(
1939 props: &uni_common::Properties,
1940 property: &str,
1941) -> Option<Vec<f32>> {
1942 let arr = props.get(property)?.as_array()?;
1943 arr.iter().map(|v| v.as_f64().map(|f| f as f32)).collect()
1944}
1945
1946fn merge_l0_into_vector_results(
1956 results: &mut Vec<(Vid, f32)>,
1957 ctx: &QueryContext,
1958 label: &str,
1959 property: &str,
1960 query: &[f32],
1961 k: usize,
1962 metric: &DistanceMetric,
1963) {
1964 let mut buffers: Vec<Arc<parking_lot::RwLock<L0Buffer>>> =
1966 ctx.pending_flush_l0s.iter().map(Arc::clone).collect();
1967 buffers.push(Arc::clone(&ctx.l0));
1968 if let Some(ref txn) = ctx.transaction_l0 {
1969 buffers.push(Arc::clone(txn));
1970 }
1971
1972 let mut l0_candidates: HashMap<Vid, f32> = HashMap::new();
1974 let mut tombstoned: HashSet<Vid> = HashSet::new();
1976
1977 for buf_arc in &buffers {
1978 let buf = buf_arc.read();
1979
1980 for &vid in &buf.vertex_tombstones {
1982 tombstoned.insert(vid);
1983 }
1984
1985 for (&vid, labels) in &buf.vertex_labels {
1987 if !labels.iter().any(|l| l == label) {
1988 continue;
1989 }
1990 if let Some(props) = buf.vertex_properties.get(&vid)
1991 && let Some(emb) = extract_embedding_from_props(props, property)
1992 {
1993 if emb.len() != query.len() {
1994 continue; }
1996 let dist = metric.compute_distance(&emb, query);
1997 l0_candidates.insert(vid, dist);
1999 tombstoned.remove(&vid);
2001 }
2002 }
2003 }
2004
2005 if l0_candidates.is_empty() && tombstoned.is_empty() {
2007 return;
2008 }
2009
2010 results.retain(|(vid, _)| !tombstoned.contains(vid));
2012
2013 for (vid, dist) in &l0_candidates {
2015 if let Some(existing) = results.iter_mut().find(|(v, _)| v == vid) {
2016 existing.1 = *dist;
2017 } else {
2018 results.push((*vid, *dist));
2019 }
2020 }
2021
2022 results.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
2024 results.truncate(k);
2025}
2026
2027fn compute_text_relevance(query: &str, text: &str) -> f32 {
2032 let query_tokens: HashSet<String> =
2033 query.split_whitespace().map(|t| t.to_lowercase()).collect();
2034 if query_tokens.is_empty() {
2035 return 0.0;
2036 }
2037 let text_tokens: HashSet<String> = text.split_whitespace().map(|t| t.to_lowercase()).collect();
2038 let hits = query_tokens
2039 .iter()
2040 .filter(|t| text_tokens.contains(t.as_str()))
2041 .count();
2042 hits as f32 / query_tokens.len() as f32
2043}
2044
2045fn extract_text_from_props<'a>(
2047 props: &'a uni_common::Properties,
2048 property: &str,
2049) -> Option<&'a str> {
2050 props.get(property)?.as_str()
2051}
2052
2053fn merge_l0_into_fts_results(
2063 results: &mut Vec<(Vid, f32)>,
2064 ctx: &QueryContext,
2065 label: &str,
2066 property: &str,
2067 query: &str,
2068 k: usize,
2069) {
2070 let mut buffers: Vec<Arc<parking_lot::RwLock<L0Buffer>>> =
2072 ctx.pending_flush_l0s.iter().map(Arc::clone).collect();
2073 buffers.push(Arc::clone(&ctx.l0));
2074 if let Some(ref txn) = ctx.transaction_l0 {
2075 buffers.push(Arc::clone(txn));
2076 }
2077
2078 let mut l0_candidates: HashMap<Vid, f32> = HashMap::new();
2080 let mut tombstoned: HashSet<Vid> = HashSet::new();
2082
2083 for buf_arc in &buffers {
2084 let buf = buf_arc.read();
2085
2086 for &vid in &buf.vertex_tombstones {
2088 tombstoned.insert(vid);
2089 }
2090
2091 for (&vid, labels) in &buf.vertex_labels {
2093 if !labels.iter().any(|l| l == label) {
2094 continue;
2095 }
2096 if let Some(props) = buf.vertex_properties.get(&vid)
2097 && let Some(text) = extract_text_from_props(props, property)
2098 {
2099 let score = compute_text_relevance(query, text);
2100 if score > 0.0 {
2101 l0_candidates.insert(vid, score);
2103 }
2104 tombstoned.remove(&vid);
2106 }
2107 }
2108 }
2109
2110 if l0_candidates.is_empty() && tombstoned.is_empty() {
2112 return;
2113 }
2114
2115 results.retain(|(vid, _)| !tombstoned.contains(vid));
2117
2118 for (vid, score) in &l0_candidates {
2120 if let Some(existing) = results.iter_mut().find(|(v, _)| v == vid) {
2121 existing.1 = *score;
2122 } else {
2123 results.push((*vid, *score));
2124 }
2125 }
2126
2127 results.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
2129 results.truncate(k);
2130}