1use crate::Vector;
9use crate::db::VectorDbRead;
10use crate::error::{Error, Result};
11use crate::hnsw::{CentroidGraph, build_centroid_graph};
12use crate::model::{Query, ReaderConfig, SearchResult};
13use crate::query_engine::{QueryEngine, QueryEngineOptions};
14use crate::serde::centroid_chunk::CentroidEntry;
15use crate::storage::VectorDbStorageReadExt;
16use crate::storage::merge_operator::VectorDbMergeOperator;
17use async_trait::async_trait;
18use common::StorageSemantics;
19use common::storage::factory::{StorageReaderRuntime, create_storage_read};
20use std::sync::Arc;
21
22pub struct VectorDbReader {
30 query_engine: QueryEngine,
31}
32
33impl VectorDbReader {
34 pub async fn open(config: ReaderConfig) -> Result<Self> {
39 Self::open_with_runtime(config, StorageReaderRuntime::new()).await
40 }
41
42 pub async fn open_with_runtime(
44 config: ReaderConfig,
45 runtime: StorageReaderRuntime,
46 ) -> Result<Self> {
47 let merge_op = VectorDbMergeOperator::new(config.dimensions as usize);
48 let storage = create_storage_read(
49 &config.storage,
50 runtime,
51 StorageSemantics::new().with_merge_operator(Arc::new(merge_op)),
52 slatedb::config::DbReaderOptions::default(),
53 )
54 .await?;
55
56 let dimensions = config.dimensions as usize;
58 let scan_result = storage.scan_all_centroids(dimensions).await?;
59
60 if scan_result.entries.is_empty() {
61 return Err(Error::Storage(
62 "No centroids found in storage. Database must be initialized by VectorDb first."
63 .to_string(),
64 ));
65 }
66
67 let deletions = storage.get_deleted_vectors().await?;
69 let live_centroids: Vec<CentroidEntry> = scan_result
70 .entries
71 .into_iter()
72 .filter(|c| !deletions.contains(c.centroid_id))
73 .collect();
74
75 let centroid_graph = build_centroid_graph(live_centroids, config.distance_metric)?;
76
77 let options = QueryEngineOptions {
78 dimensions: config.dimensions,
79 distance_metric: config.distance_metric,
80 query_pruning_factor: config.query_pruning_factor,
81 };
82
83 let centroid_graph: Arc<dyn CentroidGraph> = Arc::from(centroid_graph);
84 let query_engine = QueryEngine::new(options, centroid_graph, storage);
85 Ok(Self::new(query_engine))
86 }
87
88 pub(crate) fn new(query_engine: QueryEngine) -> Self {
89 Self { query_engine }
90 }
91}
92
93#[async_trait]
94impl VectorDbRead for VectorDbReader {
95 async fn search(&self, query: &Query) -> Result<Vec<SearchResult>> {
96 self.query_engine.search(query).await
97 }
98
99 async fn search_with_nprobe(&self, query: &Query, nprobe: usize) -> Result<Vec<SearchResult>> {
100 self.query_engine.search_with_nprobe(query, nprobe).await
101 }
102
103 async fn get(&self, id: &str) -> Result<Option<Vector>> {
104 self.query_engine.get(id).await
105 }
106}
107
108#[cfg(test)]
109mod tests {
110 use crate::VectorDb;
111 use crate::db::VectorDbRead;
112 use crate::model::{Config, Query, ReaderConfig, Vector};
113 use crate::reader::VectorDbReader;
114 use crate::serde::collection_meta::DistanceMetric;
115 use common::StorageConfig;
116 use common::storage::config::{
117 LocalObjectStoreConfig, ObjectStoreConfig, SlateDbStorageConfig,
118 };
119 use std::time::Duration;
120 use tempfile::TempDir;
121
122 fn local_storage_config(dir: &TempDir) -> StorageConfig {
123 StorageConfig::SlateDb(SlateDbStorageConfig {
124 path: "vector-data".to_string(),
125 object_store: ObjectStoreConfig::Local(LocalObjectStoreConfig {
126 path: dir.path().to_string_lossy().to_string(),
127 }),
128 settings_path: None,
129 block_cache: None,
130 })
131 }
132
133 #[tokio::test]
134 async fn should_search_vectors_via_reader() {
135 let temp_dir = TempDir::new().expect("Failed to create temp dir");
137 let storage = local_storage_config(&temp_dir);
138
139 let config = Config {
140 storage: storage.clone(),
141 dimensions: 3,
142 distance_metric: DistanceMetric::L2,
143 flush_interval: Duration::from_secs(60),
144 split_threshold_vectors: 10_000,
145 ..Default::default()
146 };
147 let db = VectorDb::open(config).await.unwrap();
148
149 let vectors = vec![
150 Vector::new("vec-1", vec![1.0, 0.0, 0.0]),
151 Vector::new("vec-2", vec![0.0, 1.0, 0.0]),
152 Vector::new("vec-3", vec![0.0, 0.0, 1.0]),
153 ];
154 db.write(vectors).await.unwrap();
155 db.flush().await.unwrap();
156
157 let reader_config = ReaderConfig {
159 storage,
160 dimensions: 3,
161 distance_metric: DistanceMetric::L2,
162 query_pruning_factor: None,
163 metadata_fields: vec![],
164 };
165 let reader = VectorDbReader::open(reader_config).await.unwrap();
166 let results = reader
167 .search(&Query::new(vec![1.0, 0.0, 0.0]).with_limit(2))
168 .await
169 .unwrap();
170
171 assert_eq!(results.len(), 2);
173 assert_eq!(results[0].vector.id, "vec-1");
174 }
175}