1use crate::backend::StorageBackend;
5#[cfg(feature = "lance-backend")]
6use crate::backend::lance::LanceDbBackend;
7use crate::backend::table_names;
8use crate::backend::types::ScanRequest;
9use crate::compaction::{CompactionStats, CompactionStatus, CompactionTask};
10use crate::runtime::WorkingGraph;
11use crate::runtime::context::QueryContext;
12use crate::runtime::l0::L0Buffer;
13use crate::storage::adjacency::AdjacencyDataset;
14use crate::storage::compaction::Compactor;
15use crate::storage::delta::{DeltaDataset, ENTRY_SIZE_ESTIMATE, Op};
16use crate::storage::direction::Direction;
17#[cfg(feature = "lance-backend")]
18use crate::storage::edge::EdgeDataset;
19#[cfg(feature = "lance-backend")]
20use crate::storage::index::UidIndex;
21#[cfg(feature = "lance-backend")]
22use crate::storage::inverted_index::InvertedIndex;
23use crate::storage::main_edge::MainEdgeDataset;
24use crate::storage::main_vertex::MainVertexDataset;
25use crate::storage::vertex::VertexDataset;
26use anyhow::{Result, anyhow};
27use arrow_array::{Array, Float32Array, TimestampNanosecondArray, UInt64Array};
28use object_store::ObjectStore;
29#[cfg(feature = "lance-backend")]
30use object_store::local::LocalFileSystem;
31use parking_lot::RwLock;
32use std::collections::{HashMap, HashSet};
33use std::sync::{Arc, Mutex};
34use std::time::{Duration, SystemTime, UNIX_EPOCH};
35use tracing::warn;
36use uni_common::config::UniConfig;
37#[cfg(feature = "lance-backend")]
38use uni_common::core::id::UniId;
39use uni_common::core::id::{Eid, Vid};
40#[cfg(feature = "lance-backend")]
41use uni_common::core::schema::IndexDefinition;
42use uni_common::core::schema::{DistanceMetric, SchemaManager};
43use uni_common::sync::acquire_mutex;
44
45use crate::snapshot::manager::SnapshotManager;
46use crate::storage::IndexManager;
47use crate::storage::adjacency_manager::AdjacencyManager;
48use crate::storage::resilient_store::ResilientObjectStore;
49
50use uni_common::core::snapshot::SnapshotManifest;
51
52use uni_common::graph::simple_graph::Direction as GraphDirection;
53
54struct EdgeState {
56 neighbor: Vid,
57 version: u64,
58 deleted: bool,
59}
60
61pub struct StorageManager {
62 base_uri: String,
63 store: Arc<dyn ObjectStore>,
64 schema_manager: Arc<SchemaManager>,
65 snapshot_manager: Arc<SnapshotManager>,
66 adjacency_manager: Arc<AdjacencyManager>,
67 pub config: UniConfig,
68 pub compaction_status: Arc<Mutex<CompactionStatus>>,
69 pinned_snapshot: Option<SnapshotManifest>,
71 backend: Arc<dyn StorageBackend>,
73 vid_labels_index: Option<Arc<parking_lot::RwLock<crate::storage::vid_labels::VidLabelsIndex>>>,
75}
76
77struct CompactionGuard {
79 status: Arc<Mutex<CompactionStatus>>,
80}
81
82impl CompactionGuard {
83 fn new(status: Arc<Mutex<CompactionStatus>>) -> Option<Self> {
84 let mut s = acquire_mutex(&status, "compaction_status").ok()?;
85 if s.compaction_in_progress {
86 return None;
87 }
88 s.compaction_in_progress = true;
89 Some(Self {
90 status: status.clone(),
91 })
92 }
93}
94
95impl Drop for CompactionGuard {
96 fn drop(&mut self) {
97 match uni_common::sync::acquire_mutex(&self.status, "compaction_status") {
100 Ok(mut s) => {
101 s.compaction_in_progress = false;
102 s.last_compaction = Some(std::time::SystemTime::now());
103 }
104 Err(e) => {
105 log::error!(
109 "CompactionGuard drop failed to acquire poisoned lock: {}. \
110 Compaction status may be inconsistent. Issue #18/#150",
111 e
112 );
113 }
114 }
115 }
116}
117
118impl StorageManager {
119 pub async fn new_with_backend(
121 base_uri: &str,
122 store: Arc<dyn ObjectStore>,
123 backend: Arc<dyn StorageBackend>,
124 schema_manager: Arc<SchemaManager>,
125 config: UniConfig,
126 ) -> Result<Self> {
127 let resilient_store: Arc<dyn ObjectStore> = Arc::new(ResilientObjectStore::new(
128 store,
129 config.object_store.clone(),
130 ));
131
132 let snapshot_manager = Arc::new(SnapshotManager::new(resilient_store.clone()));
133
134 Self::recover_all_staging_tables(backend.as_ref(), &schema_manager).await?;
136
137 let mut sm = Self {
138 base_uri: base_uri.to_string(),
139 store: resilient_store,
140 schema_manager,
141 snapshot_manager,
142 adjacency_manager: Arc::new(AdjacencyManager::new(config.cache_size)),
143 config,
144 compaction_status: Arc::new(Mutex::new(CompactionStatus::default())),
145 pinned_snapshot: None,
146 backend,
147 vid_labels_index: None,
148 };
149
150 if sm.config.enable_vid_labels_index
152 && let Err(e) = sm.rebuild_vid_labels_index().await
153 {
154 warn!(
155 "Failed to rebuild VidLabelsIndex on startup: {}. Falling back to storage queries.",
156 e
157 );
158 }
159
160 Ok(sm)
161 }
162
163 #[cfg(feature = "lance-backend")]
165 pub async fn new(base_uri: &str, schema_manager: Arc<SchemaManager>) -> Result<Self> {
166 Self::new_with_config(base_uri, schema_manager, UniConfig::default()).await
167 }
168
169 #[cfg(feature = "lance-backend")]
171 pub async fn new_with_cache(
172 base_uri: &str,
173 schema_manager: Arc<SchemaManager>,
174 adjacency_cache_size: usize,
175 ) -> Result<Self> {
176 let config = UniConfig {
177 cache_size: adjacency_cache_size,
178 ..Default::default()
179 };
180 Self::new_with_config(base_uri, schema_manager, config).await
181 }
182
183 #[cfg(feature = "lance-backend")]
185 pub async fn new_with_config(
186 base_uri: &str,
187 schema_manager: Arc<SchemaManager>,
188 config: UniConfig,
189 ) -> Result<Self> {
190 let store = Self::build_store_from_uri(base_uri)?;
191 Self::new_with_store_and_config(base_uri, store, schema_manager, config).await
192 }
193
194 #[cfg(feature = "lance-backend")]
196 pub async fn new_with_store_and_config(
197 base_uri: &str,
198 store: Arc<dyn ObjectStore>,
199 schema_manager: Arc<SchemaManager>,
200 config: UniConfig,
201 ) -> Result<Self> {
202 Self::new_with_store_and_storage_options(base_uri, store, schema_manager, config, None)
203 .await
204 }
205
206 #[cfg(feature = "lance-backend")]
208 pub async fn new_with_store_and_storage_options(
209 base_uri: &str,
210 store: Arc<dyn ObjectStore>,
211 schema_manager: Arc<SchemaManager>,
212 config: UniConfig,
213 lancedb_storage_options: Option<HashMap<String, String>>,
214 ) -> Result<Self> {
215 let backend = Arc::new(LanceDbBackend::connect(base_uri, lancedb_storage_options).await?);
216 Self::new_with_backend(base_uri, store, backend, schema_manager, config).await
217 }
218
219 async fn recover_all_staging_tables(
224 backend: &dyn StorageBackend,
225 schema_manager: &SchemaManager,
226 ) -> Result<()> {
227 let schema = schema_manager.schema();
228
229 backend
231 .recover_staging(table_names::main_vertex_table_name())
232 .await?;
233 backend
234 .recover_staging(table_names::main_edge_table_name())
235 .await?;
236
237 for label in schema.labels.keys() {
239 let name = table_names::vertex_table_name(label);
240 backend.recover_staging(&name).await?;
241 }
242
243 for edge_type in schema.edge_types.keys() {
245 for direction in &["fwd", "bwd"] {
246 let delta_name = table_names::delta_table_name(edge_type, direction);
248 backend.recover_staging(&delta_name).await?;
249
250 for _label in schema.labels.keys() {
252 let adj_name = table_names::adjacency_table_name(edge_type, direction);
253 backend.recover_staging(&adj_name).await?;
254 }
255 }
256 }
257
258 Ok(())
259 }
260
261 #[cfg(feature = "lance-backend")]
262 fn build_store_from_uri(base_uri: &str) -> Result<Arc<dyn ObjectStore>> {
263 if base_uri.contains("://") {
264 let parsed = url::Url::parse(base_uri).map_err(|e| anyhow!("Invalid base URI: {e}"))?;
265 let (store, _path) = object_store::parse_url(&parsed)
266 .map_err(|e| anyhow!("Failed to parse object store URL: {e}"))?;
267 Ok(Arc::from(store))
268 } else {
269 std::fs::create_dir_all(base_uri)?;
271 Ok(Arc::new(LocalFileSystem::new_with_prefix(base_uri)?))
272 }
273 }
274
275 pub fn pinned(&self, snapshot: SnapshotManifest) -> Self {
276 Self {
277 base_uri: self.base_uri.clone(),
278 store: self.store.clone(),
279 schema_manager: self.schema_manager.clone(),
280 snapshot_manager: self.snapshot_manager.clone(),
281 adjacency_manager: Arc::new(AdjacencyManager::new(self.adjacency_manager.max_bytes())),
285 config: self.config.clone(),
286 compaction_status: Arc::new(Mutex::new(CompactionStatus::default())),
287 pinned_snapshot: Some(snapshot),
288 backend: self.backend.clone(),
289 vid_labels_index: self.vid_labels_index.clone(),
290 }
291 }
292
293 pub fn get_edge_version_by_id(&self, edge_type_id: u32) -> Option<u64> {
294 let schema = self.schema_manager.schema();
295 let name = schema.edge_type_name_by_id(edge_type_id)?;
296 self.pinned_snapshot
297 .as_ref()
298 .and_then(|s| s.edges.get(name).map(|es| es.lance_version))
299 }
300
301 pub fn version_high_water_mark(&self) -> Option<u64> {
307 self.pinned_snapshot
308 .as_ref()
309 .map(|s| s.version_high_water_mark)
310 }
311
312 pub fn apply_version_filter(&self, base_filter: String) -> String {
317 if let Some(hwm) = self.version_high_water_mark() {
318 format!("({}) AND (_version <= {})", base_filter, hwm)
319 } else {
320 base_filter
321 }
322 }
323
324 fn build_active_filter(user_filter: Option<&str>) -> String {
327 match user_filter {
328 Some(expr) => format!("({}) AND (_deleted = false)", expr),
329 None => "_deleted = false".to_string(),
330 }
331 }
332
333 pub fn store(&self) -> Arc<dyn ObjectStore> {
334 self.store.clone()
335 }
336
337 pub fn compaction_status(
343 &self,
344 ) -> Result<CompactionStatus, uni_common::sync::LockPoisonedError> {
345 let guard = uni_common::sync::acquire_mutex(&self.compaction_status, "compaction_status")?;
346 Ok(guard.clone())
347 }
348
349 pub async fn compact(&self) -> Result<CompactionStats> {
350 let start = std::time::Instant::now();
352 let schema = self.schema_manager.schema();
353 let mut files_compacted = 0;
354
355 for label in schema.labels.keys() {
356 let name = table_names::vertex_table_name(label);
357 if self.backend.table_exists(&name).await? {
358 self.backend.optimize_table(&name).await?;
359 files_compacted += 1;
360 self.backend.invalidate_cache(&name);
361 }
362 }
363
364 Ok(CompactionStats {
365 files_compacted,
366 bytes_before: 0,
367 bytes_after: 0,
368 duration: start.elapsed(),
369 crdt_merges: 0,
370 })
371 }
372
373 pub async fn compact_label(&self, label: &str) -> Result<CompactionStats> {
374 let _guard = CompactionGuard::new(self.compaction_status.clone())
375 .ok_or_else(|| anyhow!("Compaction already in progress"))?;
376
377 let start = std::time::Instant::now();
378 let name = table_names::vertex_table_name(label);
379
380 if self.backend.table_exists(&name).await? {
381 self.backend.optimize_table(&name).await?;
382 self.backend.invalidate_cache(&name);
383 }
384
385 Ok(CompactionStats {
386 files_compacted: 1,
387 bytes_before: 0,
388 bytes_after: 0,
389 duration: start.elapsed(),
390 crdt_merges: 0,
391 })
392 }
393
394 pub async fn compact_edge_type(&self, edge_type: &str) -> Result<CompactionStats> {
395 let _guard = CompactionGuard::new(self.compaction_status.clone())
396 .ok_or_else(|| anyhow!("Compaction already in progress"))?;
397
398 let start = std::time::Instant::now();
399 let mut files_compacted = 0;
400
401 for dir in ["fwd", "bwd"] {
402 let name = table_names::delta_table_name(edge_type, dir);
403 if self.backend.table_exists(&name).await? {
404 self.backend.optimize_table(&name).await?;
405 files_compacted += 1;
406 }
407 }
408
409 Ok(CompactionStats {
410 files_compacted,
411 bytes_before: 0,
412 bytes_after: 0,
413 duration: start.elapsed(),
414 crdt_merges: 0,
415 })
416 }
417
418 pub async fn wait_for_compaction(&self) -> Result<()> {
419 loop {
420 let in_progress = {
421 acquire_mutex(&self.compaction_status, "compaction_status")?.compaction_in_progress
422 };
423 if !in_progress {
424 return Ok(());
425 }
426 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
427 }
428 }
429
430 pub fn start_background_compaction(
431 self: Arc<Self>,
432 mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
433 ) -> tokio::task::JoinHandle<()> {
434 if !self.config.compaction.enabled {
435 return tokio::spawn(async {});
436 }
437
438 tokio::spawn(async move {
439 let start = tokio::time::Instant::now() + self.config.compaction.check_interval;
445 let mut interval =
446 tokio::time::interval_at(start, self.config.compaction.check_interval);
447
448 loop {
449 tokio::select! {
450 _ = interval.tick() => {
451 if let Err(e) = self.update_compaction_status().await {
452 log::error!("Failed to update compaction status: {}", e);
453 continue;
454 }
455
456 if let Some(task) = self.pick_compaction_task() {
457 log::info!("Triggering background compaction: {:?}", task);
458 if let Err(e) = Self::execute_compaction(Arc::clone(&self), task).await {
459 log::error!("Compaction failed: {}", e);
460 }
461 }
462 }
463 _ = shutdown_rx.recv() => {
464 log::info!("Background compaction shutting down");
465 let _ = self.wait_for_compaction().await;
466 break;
467 }
468 }
469 }
470 })
471 }
472
473 async fn update_compaction_status(&self) -> Result<()> {
474 let schema = self.schema_manager.schema();
475 let backend = self.backend.as_ref();
476 let mut total_tables = 0;
477 let mut total_rows: usize = 0;
478 let mut oldest_ts: Option<i64> = None;
479
480 for name in schema.edge_types.keys() {
481 for dir in ["fwd", "bwd"] {
482 let tbl_name = table_names::delta_table_name(name, dir);
483 if !backend.table_exists(&tbl_name).await.unwrap_or(false) {
484 continue;
485 }
486 let row_count = backend.count_rows(&tbl_name, None).await.unwrap_or(0);
487 if row_count == 0 {
488 continue;
489 }
490 total_tables += 1;
491 total_rows += row_count;
492
493 let request =
495 ScanRequest::all(&tbl_name).with_columns(vec!["_created_at".to_string()]);
496 let Ok(batches) = backend.scan(request).await else {
497 continue;
498 };
499 for batch in batches {
500 let Some(col) = batch
501 .column_by_name("_created_at")
502 .and_then(|c| c.as_any().downcast_ref::<TimestampNanosecondArray>())
503 else {
504 continue;
505 };
506 for i in 0..col.len() {
507 if !col.is_null(i) {
508 let ts = col.value(i);
509 oldest_ts = Some(oldest_ts.map_or(ts, |prev| prev.min(ts)));
510 }
511 }
512 }
513 }
514 }
515
516 let oldest_l1_age = oldest_ts
517 .and_then(|ts| {
518 let created = UNIX_EPOCH + Duration::from_nanos(ts as u64);
519 SystemTime::now().duration_since(created).ok()
520 })
521 .unwrap_or(Duration::ZERO);
522
523 let mut status = acquire_mutex(&self.compaction_status, "compaction_status")?;
524 status.l1_runs = total_tables;
525 status.l1_size_bytes = (total_rows * ENTRY_SIZE_ESTIMATE) as u64;
526 status.oldest_l1_age = oldest_l1_age;
527 Ok(())
528 }
529
530 fn pick_compaction_task(&self) -> Option<CompactionTask> {
531 let status = acquire_mutex(&self.compaction_status, "compaction_status").ok()?;
532
533 if status.l1_runs >= self.config.compaction.max_l1_runs {
534 return Some(CompactionTask::ByRunCount);
535 }
536 if status.l1_size_bytes >= self.config.compaction.max_l1_size_bytes {
537 return Some(CompactionTask::BySize);
538 }
539 if status.oldest_l1_age >= self.config.compaction.max_l1_age
540 && status.oldest_l1_age > Duration::ZERO
541 {
542 return Some(CompactionTask::ByAge);
543 }
544
545 None
546 }
547
548 async fn try_optimize_table(backend: &dyn StorageBackend, table_name: &str) -> bool {
550 if let Err(e) = backend.optimize_table(table_name).await {
551 log::warn!("Failed to optimize table {}: {}", table_name, e);
552 return false;
553 }
554 true
555 }
556
557 async fn execute_compaction(this: Arc<Self>, _task: CompactionTask) -> Result<CompactionStats> {
558 let start = std::time::Instant::now();
559 let _guard = CompactionGuard::new(this.compaction_status.clone())
560 .ok_or_else(|| anyhow!("Compaction already in progress"))?;
561
562 let schema = this.schema_manager.schema();
563 let mut files_compacted = 0;
564
565 let compactor = Compactor::new(Arc::clone(&this));
568 let compaction_results = compactor.compact_all().await.unwrap_or_else(|e| {
569 log::error!(
570 "Semantic compaction failed (continuing with backend optimize): {}",
571 e
572 );
573 Vec::new()
574 });
575
576 let am = this.adjacency_manager();
578 for info in &compaction_results {
579 let direction = match info.direction.as_str() {
580 "fwd" => Direction::Outgoing,
581 "bwd" => Direction::Incoming,
582 _ => continue,
583 };
584 if let Some(etid) = schema.edge_type_id_unified_case_insensitive(&info.edge_type)
585 && let Err(e) = am.warm(&this, etid, direction, None).await
586 {
587 log::warn!(
588 "Failed to re-warm adjacency for {}/{}: {}",
589 info.edge_type,
590 info.direction,
591 e
592 );
593 }
594 }
595
596 let backend = this.backend.as_ref();
598
599 for name in schema.edge_types.keys() {
601 for dir in ["fwd", "bwd"] {
602 let delta = table_names::delta_table_name(name, dir);
603 if Self::try_optimize_table(backend, &delta).await {
604 files_compacted += 1;
605 }
606 let adj = table_names::adjacency_table_name(name, dir);
607 if Self::try_optimize_table(backend, &adj).await {
608 files_compacted += 1;
609 }
610 }
611 }
612
613 for label in schema.labels.keys() {
615 let tbl = table_names::vertex_table_name(label);
616 if Self::try_optimize_table(backend, &tbl).await {
617 files_compacted += 1;
618 backend.invalidate_cache(&tbl);
619 }
620 }
621
622 for tbl in [
624 table_names::main_vertex_table_name(),
625 table_names::main_edge_table_name(),
626 ] {
627 if Self::try_optimize_table(backend, tbl).await {
628 files_compacted += 1;
629 }
630 }
631
632 {
633 let mut status = acquire_mutex(&this.compaction_status, "compaction_status")?;
634 status.total_compactions += 1;
635 }
636
637 Ok(CompactionStats {
638 files_compacted,
639 bytes_before: 0,
640 bytes_after: 0,
641 duration: start.elapsed(),
642 crdt_merges: 0,
643 })
644 }
645
646 pub fn invalidate_table_cache(&self, label: &str) {
650 let name = table_names::vertex_table_name(label);
651 self.backend.invalidate_cache(&name);
652 }
653
654 pub fn base_path(&self) -> &str {
655 &self.base_uri
656 }
657
658 pub fn schema_manager(&self) -> &SchemaManager {
659 &self.schema_manager
660 }
661
662 pub fn schema_manager_arc(&self) -> Arc<SchemaManager> {
663 self.schema_manager.clone()
664 }
665
666 pub fn adjacency_manager(&self) -> Arc<AdjacencyManager> {
668 Arc::clone(&self.adjacency_manager)
669 }
670
671 pub async fn warm_adjacency(
676 &self,
677 edge_type_id: u32,
678 direction: crate::storage::direction::Direction,
679 version: Option<u64>,
680 ) -> anyhow::Result<()> {
681 self.adjacency_manager
682 .warm(self, edge_type_id, direction, version)
683 .await
684 }
685
686 pub async fn warm_adjacency_coalesced(
691 &self,
692 edge_type_id: u32,
693 direction: crate::storage::direction::Direction,
694 version: Option<u64>,
695 ) -> anyhow::Result<()> {
696 self.adjacency_manager
697 .warm_coalesced(self, edge_type_id, direction, version)
698 .await
699 }
700
701 pub fn has_adjacency_csr(
703 &self,
704 edge_type_id: u32,
705 direction: crate::storage::direction::Direction,
706 ) -> bool {
707 self.adjacency_manager.has_csr(edge_type_id, direction)
708 }
709
710 pub fn get_neighbors_at_version(
712 &self,
713 vid: uni_common::core::id::Vid,
714 edge_type: u32,
715 direction: crate::storage::direction::Direction,
716 version: u64,
717 ) -> Vec<(uni_common::core::id::Vid, uni_common::core::id::Eid)> {
718 self.adjacency_manager
719 .get_neighbors_at_version(vid, edge_type, direction, version)
720 }
721
722 pub fn backend(&self) -> &dyn StorageBackend {
724 self.backend.as_ref()
725 }
726
727 pub fn backend_arc(&self) -> Arc<dyn StorageBackend> {
729 self.backend.clone()
730 }
731
732 async fn rebuild_vid_labels_index(&mut self) -> Result<()> {
735 use crate::storage::vid_labels::VidLabelsIndex;
736
737 let backend = self.backend.as_ref();
738 let vtable = table_names::main_vertex_table_name();
739
740 if !backend.table_exists(vtable).await.unwrap_or(false) {
742 self.vid_labels_index = Some(Arc::new(parking_lot::RwLock::new(VidLabelsIndex::new())));
743 return Ok(());
744 }
745
746 let request = ScanRequest::all(vtable)
748 .with_filter("_deleted = false")
749 .with_limit(100_000);
750 let batches = backend
751 .scan(request)
752 .await
753 .map_err(|e| anyhow!("Failed to query main vertex table: {}", e))?;
754
755 let mut index = VidLabelsIndex::new();
756 for batch in batches {
757 let vid_col = batch
758 .column_by_name("_vid")
759 .ok_or_else(|| anyhow!("Missing _vid column"))?
760 .as_any()
761 .downcast_ref::<UInt64Array>()
762 .ok_or_else(|| anyhow!("Invalid _vid column type"))?;
763
764 let labels_col = batch
765 .column_by_name("labels")
766 .ok_or_else(|| anyhow!("Missing labels column"))?
767 .as_any()
768 .downcast_ref::<arrow_array::ListArray>()
769 .ok_or_else(|| anyhow!("Invalid labels column type"))?;
770
771 for row_idx in 0..batch.num_rows() {
772 let vid = Vid::from(vid_col.value(row_idx));
773 let labels_array = labels_col.value(row_idx);
774 let labels_str_array = labels_array
775 .as_any()
776 .downcast_ref::<arrow_array::StringArray>()
777 .ok_or_else(|| anyhow!("Invalid labels array element type"))?;
778
779 let labels: Vec<String> = (0..labels_str_array.len())
780 .map(|i| labels_str_array.value(i).to_string())
781 .collect();
782
783 index.insert(vid, labels);
784 }
785 }
786
787 self.vid_labels_index = Some(Arc::new(parking_lot::RwLock::new(index)));
788 Ok(())
789 }
790
791 pub fn get_labels_from_index(&self, vid: Vid) -> Option<Vec<String>> {
794 self.vid_labels_index.as_ref().and_then(|idx| {
795 let index = idx.read();
796 index.get_labels(vid).map(|labels| labels.to_vec())
797 })
798 }
799
800 pub fn update_vid_labels_index(&self, vid: Vid, labels: Vec<String>) {
803 if let Some(idx) = &self.vid_labels_index {
804 let mut index = idx.write();
805 index.insert(vid, labels);
806 }
807 }
808
809 pub fn remove_from_vid_labels_index(&self, vid: Vid) {
812 if let Some(idx) = &self.vid_labels_index {
813 let mut index = idx.write();
814 index.remove_vid(vid);
815 }
816 }
817
818 pub async fn load_subgraph_cached(
819 &self,
820 start_vids: &[Vid],
821 edge_types: &[u32],
822 max_hops: usize,
823 direction: GraphDirection,
824 _l0: Option<Arc<RwLock<L0Buffer>>>,
825 ) -> Result<WorkingGraph> {
826 let mut graph = WorkingGraph::new();
827
828 let dir = match direction {
829 GraphDirection::Outgoing => crate::storage::direction::Direction::Outgoing,
830 GraphDirection::Incoming => crate::storage::direction::Direction::Incoming,
831 };
832
833 let neighbor_is_dst = matches!(direction, GraphDirection::Outgoing);
834
835 let mut frontier: Vec<Vid> = start_vids.to_vec();
837 let mut visited: HashSet<Vid> = HashSet::new();
838
839 for &vid in start_vids {
841 graph.add_vertex(vid);
842 }
843
844 for _hop in 0..max_hops {
845 let mut next_frontier = HashSet::new();
846
847 for &vid in &frontier {
848 if visited.contains(&vid) {
849 continue;
850 }
851 visited.insert(vid);
852 graph.add_vertex(vid);
853
854 for &etype_id in edge_types {
855 let edge_ver = self.version_high_water_mark();
857 self.adjacency_manager
858 .warm_coalesced(self, etype_id, dir, edge_ver)
859 .await?;
860
861 let edges = self.adjacency_manager.get_neighbors(vid, etype_id, dir);
863
864 for (neighbor_vid, eid) in edges {
865 graph.add_vertex(neighbor_vid);
866 if !visited.contains(&neighbor_vid) {
867 next_frontier.insert(neighbor_vid);
868 }
869
870 if neighbor_is_dst {
871 graph.add_edge(vid, neighbor_vid, eid, etype_id);
872 } else {
873 graph.add_edge(neighbor_vid, vid, eid, etype_id);
874 }
875 }
876 }
877 }
878 frontier = next_frontier.into_iter().collect();
879
880 if frontier.is_empty() {
882 break;
883 }
884 }
885
886 Ok(graph)
887 }
888
889 pub fn snapshot_manager(&self) -> &SnapshotManager {
890 &self.snapshot_manager
891 }
892
893 pub fn index_manager(&self) -> IndexManager {
894 IndexManager::new(
895 &self.base_uri,
896 self.schema_manager.clone(),
897 self.backend.clone(),
898 )
899 }
900
901 pub async fn scan_vertex_table(
910 &self,
911 label: &str,
912 columns: &[&str],
913 additional_filter: Option<&str>,
914 ) -> Result<Option<arrow_array::RecordBatch>> {
915 let backend = self.backend();
916 let table_name = table_names::vertex_table_name(label);
917
918 if !backend.table_exists(&table_name).await.unwrap_or(false) {
919 return Ok(None);
920 }
921
922 let actual_columns =
924 if let Some(table_schema) = backend.get_table_schema(&table_name).await? {
925 let table_field_names: HashSet<&str> = table_schema
926 .fields()
927 .iter()
928 .map(|f| f.name().as_str())
929 .collect();
930 columns
931 .iter()
932 .copied()
933 .filter(|c| table_field_names.contains(c))
934 .map(|s| s.to_string())
935 .collect::<Vec<_>>()
936 } else {
937 return Ok(None);
938 };
939
940 let filter = match (self.version_high_water_mark(), additional_filter) {
942 (Some(hwm), Some(f)) => Some(format!("_version <= {} AND ({})", hwm, f)),
943 (Some(hwm), None) => Some(format!("_version <= {}", hwm)),
944 (None, Some(f)) => Some(f.to_string()),
945 (None, None) => None,
946 };
947
948 let mut request = ScanRequest::all(&table_name).with_columns(actual_columns);
949 if let Some(f) = filter {
950 request = request.with_filter(f);
951 }
952
953 match backend.scan(request).await {
954 Ok(batches) => {
955 if batches.is_empty() {
956 Ok(None)
957 } else {
958 Ok(Some(arrow::compute::concat_batches(
959 &batches[0].schema(),
960 &batches,
961 )?))
962 }
963 }
964 Err(_) => Ok(None),
965 }
966 }
967
968 pub async fn scan_delta_table(
971 &self,
972 edge_type: &str,
973 direction: &str,
974 columns: &[&str],
975 additional_filter: Option<&str>,
976 ) -> Result<Option<arrow_array::RecordBatch>> {
977 let backend = self.backend();
978 let table_name = table_names::delta_table_name(edge_type, direction);
979
980 if !backend.table_exists(&table_name).await.unwrap_or(false) {
981 return Ok(None);
982 }
983
984 let actual_columns =
986 if let Some(table_schema) = backend.get_table_schema(&table_name).await? {
987 let table_field_names: HashSet<&str> = table_schema
988 .fields()
989 .iter()
990 .map(|f| f.name().as_str())
991 .collect();
992 columns
993 .iter()
994 .copied()
995 .filter(|c| table_field_names.contains(c))
996 .map(|s| s.to_string())
997 .collect::<Vec<_>>()
998 } else {
999 return Ok(None);
1000 };
1001
1002 let filter = match (self.version_high_water_mark(), additional_filter) {
1003 (Some(hwm), Some(f)) => Some(format!("_version <= {} AND ({})", hwm, f)),
1004 (Some(hwm), None) => Some(format!("_version <= {}", hwm)),
1005 (None, Some(f)) => Some(f.to_string()),
1006 (None, None) => None,
1007 };
1008
1009 let mut request = ScanRequest::all(&table_name).with_columns(actual_columns);
1010 if let Some(f) = filter {
1011 request = request.with_filter(f);
1012 }
1013
1014 match backend.scan(request).await {
1015 Ok(batches) => {
1016 if batches.is_empty() {
1017 Ok(None)
1018 } else {
1019 Ok(Some(arrow::compute::concat_batches(
1020 &batches[0].schema(),
1021 &batches,
1022 )?))
1023 }
1024 }
1025 Err(_) => Ok(None),
1026 }
1027 }
1028
1029 pub async fn scan_main_vertex_table(
1034 &self,
1035 columns: &[&str],
1036 filter: Option<&str>,
1037 ) -> Result<Option<arrow_array::RecordBatch>> {
1038 let backend = self.backend();
1039 let table_name = table_names::main_vertex_table_name();
1040
1041 if !backend.table_exists(table_name).await.unwrap_or(false) {
1042 return Ok(None);
1043 }
1044
1045 let full_filter = match (self.version_high_water_mark(), filter) {
1047 (Some(hwm), Some(f)) => Some(format!("_version <= {} AND ({})", hwm, f)),
1048 (Some(hwm), None) => Some(format!("_version <= {}", hwm)),
1049 (None, Some(f)) => Some(f.to_string()),
1050 (None, None) => None,
1051 };
1052
1053 let request = ScanRequest::all(table_name)
1054 .with_columns(columns.iter().map(|s| s.to_string()).collect());
1055 let request = match full_filter.as_deref() {
1056 Some(f) => request.with_filter(f),
1057 None => request,
1058 };
1059
1060 match backend.scan(request).await {
1061 Ok(batches) => {
1062 if batches.is_empty() {
1063 Ok(None)
1064 } else {
1065 Ok(Some(arrow::compute::concat_batches(
1066 &batches[0].schema(),
1067 &batches,
1068 )?))
1069 }
1070 }
1071 Err(_) => Ok(None),
1072 }
1073 }
1074
1075 pub async fn scan_main_edge_table_stream(
1077 &self,
1078 filter: Option<&str>,
1079 ) -> Result<
1080 Option<
1081 std::pin::Pin<Box<dyn futures::Stream<Item = Result<arrow_array::RecordBatch>> + Send>>,
1082 >,
1083 > {
1084 let backend = self.backend();
1085 let table_name = table_names::main_edge_table_name();
1086
1087 if !backend.table_exists(table_name).await.unwrap_or(false) {
1088 return Ok(None);
1089 }
1090
1091 let mut request = ScanRequest::all(table_name);
1092 if let Some(f) = filter {
1093 request = request.with_filter(f);
1094 }
1095
1096 let stream = backend.scan_stream(request).await?;
1097 Ok(Some(stream))
1098 }
1099
1100 pub async fn scan_vertex_table_stream(
1102 &self,
1103 label: &str,
1104 ) -> Result<
1105 Option<
1106 std::pin::Pin<Box<dyn futures::Stream<Item = Result<arrow_array::RecordBatch>> + Send>>,
1107 >,
1108 > {
1109 let backend = self.backend();
1110 let table_name = table_names::vertex_table_name(label);
1111
1112 if !backend.table_exists(&table_name).await.unwrap_or(false) {
1113 return Ok(None);
1114 }
1115
1116 let stream = backend.scan_stream(ScanRequest::all(&table_name)).await?;
1117 Ok(Some(stream))
1118 }
1119
1120 pub async fn find_vertex_by_ext_id(&self, ext_id: &str) -> Result<Option<Vid>> {
1122 MainVertexDataset::find_by_ext_id(self.backend(), ext_id, self.version_high_water_mark())
1123 .await
1124 }
1125
1126 pub async fn find_vertex_labels_by_vid(&self, vid: Vid) -> Result<Option<Vec<String>>> {
1128 MainVertexDataset::find_labels_by_vid(self.backend(), vid, self.version_high_water_mark())
1129 .await
1130 }
1131
1132 pub async fn find_edges_by_type_names(
1134 &self,
1135 type_names: &[&str],
1136 ) -> Result<Vec<(Eid, Vid, Vid, String, uni_common::Properties)>> {
1137 MainEdgeDataset::find_edges_by_type_names(self.backend(), type_names).await
1138 }
1139
1140 pub async fn scan_vertex_candidates(
1142 &self,
1143 label: &str,
1144 filter: Option<&str>,
1145 ) -> Result<Vec<Vid>> {
1146 let backend = self.backend();
1147 let table_name = table_names::vertex_table_name(label);
1148
1149 if !backend.table_exists(&table_name).await.unwrap_or(false) {
1150 return Ok(Vec::new());
1151 }
1152
1153 let full_filter = match filter {
1154 Some(f) => format!("_deleted = false AND ({})", f),
1155 None => "_deleted = false".to_string(),
1156 };
1157
1158 let request = ScanRequest::all(&table_name)
1159 .with_filter(full_filter)
1160 .with_columns(vec!["_vid".to_string()]);
1161
1162 let batches = backend.scan(request).await?;
1163
1164 let mut vids = Vec::new();
1165 for batch in batches {
1166 let vid_col = batch
1167 .column_by_name("_vid")
1168 .ok_or(anyhow!("Missing _vid"))?
1169 .as_any()
1170 .downcast_ref::<UInt64Array>()
1171 .ok_or(anyhow!("Invalid _vid"))?;
1172 for i in 0..batch.num_rows() {
1173 vids.push(Vid::from(vid_col.value(i)));
1174 }
1175 }
1176 Ok(vids)
1177 }
1178
1179 pub fn vertex_dataset(&self, label: &str) -> Result<VertexDataset> {
1180 let schema = self.schema_manager.schema();
1181 let label_meta = schema
1182 .labels
1183 .get(label)
1184 .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
1185 Ok(VertexDataset::new(&self.base_uri, label, label_meta.id))
1186 }
1187
1188 #[cfg(feature = "lance-backend")]
1189 pub fn edge_dataset(
1190 &self,
1191 edge_type: &str,
1192 src_label: &str,
1193 dst_label: &str,
1194 ) -> Result<EdgeDataset> {
1195 Ok(EdgeDataset::new(
1196 &self.base_uri,
1197 edge_type,
1198 src_label,
1199 dst_label,
1200 ))
1201 }
1202
1203 pub fn delta_dataset(&self, edge_type: &str, direction: &str) -> Result<DeltaDataset> {
1204 Ok(DeltaDataset::new(&self.base_uri, edge_type, direction))
1205 }
1206
1207 pub fn adjacency_dataset(
1208 &self,
1209 edge_type: &str,
1210 label: &str,
1211 direction: &str,
1212 ) -> Result<AdjacencyDataset> {
1213 Ok(AdjacencyDataset::new(
1214 &self.base_uri,
1215 edge_type,
1216 label,
1217 direction,
1218 ))
1219 }
1220
1221 pub fn main_vertex_dataset(&self) -> MainVertexDataset {
1226 MainVertexDataset::new(&self.base_uri)
1227 }
1228
1229 pub fn main_edge_dataset(&self) -> MainEdgeDataset {
1234 MainEdgeDataset::new(&self.base_uri)
1235 }
1236
1237 #[cfg(feature = "lance-backend")]
1238 pub fn uid_index(&self, label: &str) -> Result<UidIndex> {
1239 Ok(UidIndex::new(&self.base_uri, label))
1240 }
1241
1242 #[cfg(feature = "lance-backend")]
1243 pub async fn inverted_index(&self, label: &str, property: &str) -> Result<InvertedIndex> {
1244 let schema = self.schema_manager.schema();
1245 let config = schema
1246 .indexes
1247 .iter()
1248 .find_map(|idx| match idx {
1249 IndexDefinition::Inverted(cfg)
1250 if cfg.label == label && cfg.property == property =>
1251 {
1252 Some(cfg.clone())
1253 }
1254 _ => None,
1255 })
1256 .ok_or_else(|| anyhow!("Inverted index not found for {}.{}", label, property))?;
1257
1258 InvertedIndex::new(&self.base_uri, config).await
1259 }
1260
1261 pub async fn vector_search(
1262 &self,
1263 label: &str,
1264 property: &str,
1265 query: &[f32],
1266 k: usize,
1267 filter: Option<&str>,
1268 ctx: Option<&QueryContext>,
1269 ) -> Result<Vec<(Vid, f32)>> {
1270 use crate::backend::types::{DistanceMetric as BackendMetric, FilterExpr};
1271
1272 let schema = self.schema_manager.schema();
1274 let metric = schema
1275 .vector_index_for_property(label, property)
1276 .map(|config| config.metric.clone())
1277 .unwrap_or(DistanceMetric::L2);
1278
1279 let backend = self.backend.as_ref();
1280 let name = table_names::vertex_table_name(label);
1281
1282 let mut results = Vec::new();
1283
1284 if backend.table_exists(&name).await.unwrap_or(false) {
1286 let backend_metric = match &metric {
1287 DistanceMetric::L2 => BackendMetric::L2,
1288 DistanceMetric::Cosine => BackendMetric::Cosine,
1289 DistanceMetric::Dot => BackendMetric::Dot,
1290 _ => BackendMetric::L2,
1291 };
1292
1293 let mut filter_parts = vec![Self::build_active_filter(filter)];
1295 if ctx.is_some()
1296 && let Some(hwm) = self.version_high_water_mark()
1297 {
1298 filter_parts.push(format!("_version <= {}", hwm));
1299 }
1300 let combined_filter = FilterExpr::Sql(filter_parts.join(" AND "));
1301
1302 let batches = backend
1303 .vector_search(&name, property, query, k, backend_metric, combined_filter)
1304 .await?;
1305
1306 results = extract_vid_score_pairs(&batches, "_vid", "_distance")?;
1307 }
1308
1309 if let Some(qctx) = ctx {
1311 merge_l0_into_vector_results(&mut results, qctx, label, property, query, k, &metric);
1312 }
1313
1314 Ok(results)
1315 }
1316
1317 pub async fn fts_search(
1333 &self,
1334 label: &str,
1335 property: &str,
1336 query: &str,
1337 k: usize,
1338 filter: Option<&str>,
1339 ctx: Option<&QueryContext>,
1340 ) -> Result<Vec<(Vid, f32)>> {
1341 use crate::backend::types::FilterExpr;
1342
1343 let backend = self.backend.as_ref();
1344 let name = table_names::vertex_table_name(label);
1345
1346 let mut results = if backend.table_exists(&name).await.unwrap_or(false) {
1347 let mut filter_parts = vec![Self::build_active_filter(filter)];
1349 if ctx.is_some()
1350 && let Some(hwm) = self.version_high_water_mark()
1351 {
1352 filter_parts.push(format!("_version <= {}", hwm));
1353 }
1354 let combined_filter = FilterExpr::Sql(filter_parts.join(" AND "));
1355
1356 let batches = backend
1357 .full_text_search(&name, property, query, k, combined_filter)
1358 .await?;
1359
1360 let mut fts_results = extract_vid_score_pairs(&batches, "_vid", "_score")?;
1361 fts_results.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
1363 fts_results
1364 } else {
1365 Vec::new()
1366 };
1367
1368 if let Some(qctx) = ctx {
1370 merge_l0_into_fts_results(&mut results, qctx, label, property, query, k);
1371 }
1372
1373 Ok(results)
1374 }
1375
1376 #[cfg(feature = "lance-backend")]
1377 pub async fn get_vertex_by_uid(&self, uid: &UniId, label: &str) -> Result<Option<Vid>> {
1378 let index = self.uid_index(label)?;
1379 index.get_vid(uid).await
1380 }
1381
1382 #[cfg(feature = "lance-backend")]
1383 pub async fn insert_vertex_with_uid(&self, label: &str, vid: Vid, uid: UniId) -> Result<()> {
1384 let index = self.uid_index(label)?;
1385 index.write_mapping(&[(uid, vid)]).await
1386 }
1387
1388 pub async fn load_subgraph(
1389 &self,
1390 start_vids: &[Vid],
1391 edge_types: &[u32],
1392 max_hops: usize,
1393 direction: GraphDirection,
1394 l0: Option<&L0Buffer>,
1395 ) -> Result<WorkingGraph> {
1396 let mut graph = WorkingGraph::new();
1397 let schema = self.schema_manager.schema();
1398
1399 let label_map: HashMap<u16, String> = schema
1401 .labels
1402 .values()
1403 .map(|meta| {
1404 (
1405 meta.id,
1406 schema.label_name_by_id(meta.id).unwrap().to_owned(),
1407 )
1408 })
1409 .collect();
1410
1411 let edge_type_map: HashMap<u32, String> = schema
1412 .edge_types
1413 .values()
1414 .map(|meta| {
1415 (
1416 meta.id,
1417 schema.edge_type_name_by_id(meta.id).unwrap().to_owned(),
1418 )
1419 })
1420 .collect();
1421
1422 let target_edge_types: HashSet<u32> = edge_types.iter().cloned().collect();
1423
1424 let mut frontier: Vec<Vid> = start_vids.to_vec();
1426 let mut visited: HashSet<Vid> = HashSet::new();
1427
1428 for &vid in start_vids {
1430 graph.add_vertex(vid);
1431 }
1432
1433 for _hop in 0..max_hops {
1434 let mut next_frontier = HashSet::new();
1435
1436 for &vid in &frontier {
1437 if visited.contains(&vid) {
1438 continue;
1439 }
1440 visited.insert(vid);
1441 graph.add_vertex(vid);
1442
1443 for &etype_id in &target_edge_types {
1445 let etype_name = edge_type_map
1446 .get(&etype_id)
1447 .ok_or_else(|| anyhow!("Unknown edge type ID: {}", etype_id))?;
1448
1449 let (dir_str, neighbor_is_dst) = match direction {
1453 GraphDirection::Outgoing => ("fwd", true),
1454 GraphDirection::Incoming => ("bwd", false),
1455 };
1456
1457 let mut edges: HashMap<Eid, EdgeState> = HashMap::new();
1458
1459 let _edge_ver = self
1464 .pinned_snapshot
1465 .as_ref()
1466 .and_then(|s| s.edges.get(etype_name).map(|es| es.lance_version));
1467
1468 let backend = self.backend();
1470 for current_src_label in label_map.values() {
1471 let adj_ds =
1472 match self.adjacency_dataset(etype_name, current_src_label, dir_str) {
1473 Ok(ds) => ds,
1474 Err(_) => continue,
1475 };
1476 if let Some((neighbors, eids)) =
1477 adj_ds.read_adjacency_backend(backend, vid).await?
1478 {
1479 for (n, eid) in neighbors.into_iter().zip(eids) {
1480 edges.insert(
1481 eid,
1482 EdgeState {
1483 neighbor: n,
1484 version: 0,
1485 deleted: false,
1486 },
1487 );
1488 }
1489 break; }
1491 }
1492
1493 let delta_ds = self.delta_dataset(etype_name, dir_str)?;
1495 let delta_entries = delta_ds
1496 .read_deltas(backend, vid, &schema, self.version_high_water_mark())
1497 .await?;
1498 Self::apply_delta_to_edges(&mut edges, delta_entries, neighbor_is_dst);
1499
1500 if let Some(l0) = l0 {
1502 Self::apply_l0_to_edges(&mut edges, l0, vid, etype_id, direction);
1503 }
1504
1505 Self::add_edges_to_graph(
1507 &mut graph,
1508 edges,
1509 vid,
1510 etype_id,
1511 neighbor_is_dst,
1512 &visited,
1513 &mut next_frontier,
1514 );
1515 }
1516 }
1517 frontier = next_frontier.into_iter().collect();
1518
1519 if frontier.is_empty() {
1521 break;
1522 }
1523 }
1524
1525 Ok(graph)
1526 }
1527
1528 fn apply_delta_to_edges(
1530 edges: &mut HashMap<Eid, EdgeState>,
1531 delta_entries: Vec<crate::storage::delta::L1Entry>,
1532 neighbor_is_dst: bool,
1533 ) {
1534 for entry in delta_entries {
1535 let neighbor = if neighbor_is_dst {
1536 entry.dst_vid
1537 } else {
1538 entry.src_vid
1539 };
1540 let current_ver = edges.get(&entry.eid).map(|s| s.version).unwrap_or(0);
1541
1542 if entry.version > current_ver {
1543 edges.insert(
1544 entry.eid,
1545 EdgeState {
1546 neighbor,
1547 version: entry.version,
1548 deleted: matches!(entry.op, Op::Delete),
1549 },
1550 );
1551 }
1552 }
1553 }
1554
1555 fn apply_l0_to_edges(
1557 edges: &mut HashMap<Eid, EdgeState>,
1558 l0: &L0Buffer,
1559 vid: Vid,
1560 etype_id: u32,
1561 direction: GraphDirection,
1562 ) {
1563 let l0_neighbors = l0.get_neighbors(vid, etype_id, direction);
1564 for (neighbor, eid, ver) in l0_neighbors {
1565 let current_ver = edges.get(&eid).map(|s| s.version).unwrap_or(0);
1566 if ver > current_ver {
1567 edges.insert(
1568 eid,
1569 EdgeState {
1570 neighbor,
1571 version: ver,
1572 deleted: false,
1573 },
1574 );
1575 }
1576 }
1577
1578 for (eid, state) in edges.iter_mut() {
1580 if l0.is_tombstoned(*eid) {
1581 state.deleted = true;
1582 }
1583 }
1584 }
1585
1586 fn add_edges_to_graph(
1588 graph: &mut WorkingGraph,
1589 edges: HashMap<Eid, EdgeState>,
1590 vid: Vid,
1591 etype_id: u32,
1592 neighbor_is_dst: bool,
1593 visited: &HashSet<Vid>,
1594 next_frontier: &mut HashSet<Vid>,
1595 ) {
1596 for (eid, state) in edges {
1597 if state.deleted {
1598 continue;
1599 }
1600 graph.add_vertex(state.neighbor);
1601
1602 if !visited.contains(&state.neighbor) {
1603 next_frontier.insert(state.neighbor);
1604 }
1605
1606 if neighbor_is_dst {
1607 graph.add_edge(vid, state.neighbor, eid, etype_id);
1608 } else {
1609 graph.add_edge(state.neighbor, vid, eid, etype_id);
1610 }
1611 }
1612 }
1613}
1614
1615fn extract_vid_score_pairs(
1617 batches: &[arrow_array::RecordBatch],
1618 vid_column: &str,
1619 score_column: &str,
1620) -> Result<Vec<(Vid, f32)>> {
1621 let mut results = Vec::new();
1622 for batch in batches {
1623 let vid_col = batch
1624 .column_by_name(vid_column)
1625 .ok_or_else(|| anyhow!("Missing {} column", vid_column))?
1626 .as_any()
1627 .downcast_ref::<UInt64Array>()
1628 .ok_or_else(|| anyhow!("Invalid {} column type", vid_column))?;
1629
1630 let score_col = batch
1631 .column_by_name(score_column)
1632 .ok_or_else(|| anyhow!("Missing {} column", score_column))?
1633 .as_any()
1634 .downcast_ref::<Float32Array>()
1635 .ok_or_else(|| anyhow!("Invalid {} column type", score_column))?;
1636
1637 for i in 0..batch.num_rows() {
1638 results.push((Vid::from(vid_col.value(i)), score_col.value(i)));
1639 }
1640 }
1641 Ok(results)
1642}
1643
1644fn extract_embedding_from_props(
1649 props: &uni_common::Properties,
1650 property: &str,
1651) -> Option<Vec<f32>> {
1652 let arr = props.get(property)?.as_array()?;
1653 arr.iter().map(|v| v.as_f64().map(|f| f as f32)).collect()
1654}
1655
1656fn merge_l0_into_vector_results(
1666 results: &mut Vec<(Vid, f32)>,
1667 ctx: &QueryContext,
1668 label: &str,
1669 property: &str,
1670 query: &[f32],
1671 k: usize,
1672 metric: &DistanceMetric,
1673) {
1674 let mut buffers: Vec<Arc<parking_lot::RwLock<L0Buffer>>> =
1676 ctx.pending_flush_l0s.iter().map(Arc::clone).collect();
1677 buffers.push(Arc::clone(&ctx.l0));
1678 if let Some(ref txn) = ctx.transaction_l0 {
1679 buffers.push(Arc::clone(txn));
1680 }
1681
1682 let mut l0_candidates: HashMap<Vid, f32> = HashMap::new();
1684 let mut tombstoned: HashSet<Vid> = HashSet::new();
1686
1687 for buf_arc in &buffers {
1688 let buf = buf_arc.read();
1689
1690 for &vid in &buf.vertex_tombstones {
1692 tombstoned.insert(vid);
1693 }
1694
1695 for (&vid, labels) in &buf.vertex_labels {
1697 if !labels.iter().any(|l| l == label) {
1698 continue;
1699 }
1700 if let Some(props) = buf.vertex_properties.get(&vid)
1701 && let Some(emb) = extract_embedding_from_props(props, property)
1702 {
1703 if emb.len() != query.len() {
1704 continue; }
1706 let dist = metric.compute_distance(&emb, query);
1707 l0_candidates.insert(vid, dist);
1709 tombstoned.remove(&vid);
1711 }
1712 }
1713 }
1714
1715 if l0_candidates.is_empty() && tombstoned.is_empty() {
1717 return;
1718 }
1719
1720 results.retain(|(vid, _)| !tombstoned.contains(vid));
1722
1723 for (vid, dist) in &l0_candidates {
1725 if let Some(existing) = results.iter_mut().find(|(v, _)| v == vid) {
1726 existing.1 = *dist;
1727 } else {
1728 results.push((*vid, *dist));
1729 }
1730 }
1731
1732 results.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
1734 results.truncate(k);
1735}
1736
1737fn compute_text_relevance(query: &str, text: &str) -> f32 {
1742 let query_tokens: HashSet<String> =
1743 query.split_whitespace().map(|t| t.to_lowercase()).collect();
1744 if query_tokens.is_empty() {
1745 return 0.0;
1746 }
1747 let text_tokens: HashSet<String> = text.split_whitespace().map(|t| t.to_lowercase()).collect();
1748 let hits = query_tokens
1749 .iter()
1750 .filter(|t| text_tokens.contains(t.as_str()))
1751 .count();
1752 hits as f32 / query_tokens.len() as f32
1753}
1754
1755fn extract_text_from_props<'a>(
1757 props: &'a uni_common::Properties,
1758 property: &str,
1759) -> Option<&'a str> {
1760 props.get(property)?.as_str()
1761}
1762
1763fn merge_l0_into_fts_results(
1773 results: &mut Vec<(Vid, f32)>,
1774 ctx: &QueryContext,
1775 label: &str,
1776 property: &str,
1777 query: &str,
1778 k: usize,
1779) {
1780 let mut buffers: Vec<Arc<parking_lot::RwLock<L0Buffer>>> =
1782 ctx.pending_flush_l0s.iter().map(Arc::clone).collect();
1783 buffers.push(Arc::clone(&ctx.l0));
1784 if let Some(ref txn) = ctx.transaction_l0 {
1785 buffers.push(Arc::clone(txn));
1786 }
1787
1788 let mut l0_candidates: HashMap<Vid, f32> = HashMap::new();
1790 let mut tombstoned: HashSet<Vid> = HashSet::new();
1792
1793 for buf_arc in &buffers {
1794 let buf = buf_arc.read();
1795
1796 for &vid in &buf.vertex_tombstones {
1798 tombstoned.insert(vid);
1799 }
1800
1801 for (&vid, labels) in &buf.vertex_labels {
1803 if !labels.iter().any(|l| l == label) {
1804 continue;
1805 }
1806 if let Some(props) = buf.vertex_properties.get(&vid)
1807 && let Some(text) = extract_text_from_props(props, property)
1808 {
1809 let score = compute_text_relevance(query, text);
1810 if score > 0.0 {
1811 l0_candidates.insert(vid, score);
1813 }
1814 tombstoned.remove(&vid);
1816 }
1817 }
1818 }
1819
1820 if l0_candidates.is_empty() && tombstoned.is_empty() {
1822 return;
1823 }
1824
1825 results.retain(|(vid, _)| !tombstoned.contains(vid));
1827
1828 for (vid, score) in &l0_candidates {
1830 if let Some(existing) = results.iter_mut().find(|(v, _)| v == vid) {
1831 existing.1 = *score;
1832 } else {
1833 results.push((*vid, *score));
1834 }
1835 }
1836
1837 results.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
1839 results.truncate(k);
1840}