1use crate::*;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use tokio::sync::RwLock;
5use uuid::Uuid;
6use chrono::Utc;
7use serde::{Serialize, Deserialize};
8
9use async_trait::async_trait;
11
12#[async_trait]
13pub trait StorageBackend: Send + Sync {
14 async fn exists(&self) -> bool;
15 async fn create_index(&mut self, config: &CreateIndexConfig) -> Result<()>;
16 async fn get_item(&self, id: &uuid::Uuid) -> Result<Option<VectorItem>>;
17 async fn insert_item(&mut self, item: &VectorItem) -> Result<()>;
18 async fn insert_items(&mut self, items: &[VectorItem]) -> Result<()> {
19 for item in items {
21 self.insert_item(item).await?;
22 }
23 Ok(())
24 }
25 async fn update_item(&mut self, item: &VectorItem) -> Result<()>;
26 async fn delete_item(&mut self, id: &uuid::Uuid) -> Result<()>;
27 async fn list_items(&self, options: Option<ListOptions>) -> Result<Vec<VectorItem>>;
28 async fn query_items(&self, query: &Query) -> Result<Vec<QueryResult>>;
29 async fn begin_transaction(&mut self) -> Result<()>;
30 async fn commit_transaction(&mut self) -> Result<()>;
31 async fn rollback_transaction(&mut self) -> Result<()>;
32 async fn delete_index(&mut self) -> Result<()>;
33 async fn get_stats(&self) -> Result<IndexStats>;
34}
35
36#[derive(Debug, Clone, Serialize, Deserialize)]
38pub struct CreateIndexConfig {
39 #[serde(default = "default_version")]
40 pub version: u32,
41
42 #[serde(default)]
43 pub delete_if_exists: bool,
44
45 #[serde(default = "default_distance_metric")]
46 pub distance_metric: DistanceMetric,
47
48 #[serde(default)]
49 pub metadata_config: MetadataConfig,
50
51 #[serde(default)]
52 pub hnsw_config: HnswConfig,
53}
54
55fn default_version() -> u32 { 1 }
56fn default_distance_metric() -> DistanceMetric { DistanceMetric::Cosine }
57
58impl Default for CreateIndexConfig {
59 fn default() -> Self {
60 Self {
61 version: default_version(),
62 delete_if_exists: false,
63 distance_metric: default_distance_metric(),
64 metadata_config: MetadataConfig::default(),
65 hnsw_config: HnswConfig::default(),
66 }
67 }
68}
69
70pub struct LocalIndex {
72 #[allow(dead_code)]
73 path: PathBuf,
74 #[allow(dead_code)]
75 index_name: String,
76 storage: Arc<RwLock<Box<dyn StorageBackend>>>,
77 config: Arc<RwLock<Option<CreateIndexConfig>>>,
78}
79
80impl LocalIndex {
81 pub fn with_storage(
83 folder_path: impl AsRef<Path>,
84 index_name: Option<String>,
85 storage: Box<dyn StorageBackend>
86 ) -> Result<Self> {
87 let path = folder_path.as_ref().to_path_buf();
88 let index_name = index_name.unwrap_or_else(|| "index.json".to_string());
89
90 Ok(Self {
91 path,
92 index_name,
93 storage: Arc::new(RwLock::new(storage)),
94 config: Arc::new(RwLock::new(None)),
95 })
96 }
97
98 pub fn new<P: AsRef<Path>>(folder_path: P, index_name: Option<String>) -> Result<Self> {
101 let path = folder_path.as_ref().to_path_buf();
102 let index_name = index_name.unwrap_or_else(|| "index.json".to_string());
103
104 let storage = create_dummy_storage();
107
108 Ok(Self {
109 path,
110 index_name,
111 storage: Arc::new(RwLock::new(storage)),
112 config: Arc::new(RwLock::new(None)),
113 })
114 }
115
116 pub async fn create_index(&self, config: Option<CreateIndexConfig>) -> Result<()> {
118 let config = config.unwrap_or_default();
119
120 if config.delete_if_exists && self.is_index_created().await {
122 self.delete_index().await?;
123 }
124
125 let mut storage = self.storage.write().await;
127 storage.create_index(&config).await?;
128
129 *self.config.write().await = Some(config);
131
132 Ok(())
133 }
134
135 pub async fn is_index_created(&self) -> bool {
137 let storage = self.storage.read().await;
138 storage.exists().await
139 }
140
141 pub async fn insert_item(&self, item: impl Into<VectorItem>) -> Result<VectorItem> {
143 let mut item = item.into();
144
145 if item.id == Uuid::default() {
147 item.id = Uuid::new_v4();
148 }
149
150 let now = Utc::now();
152 item.created_at = now;
153 item.updated_at = now;
154
155 let mut storage = self.storage.write().await;
157 storage.insert_item(&item).await?;
158
159 Ok(item)
160 }
161
162 pub async fn update_item(&self, update: UpdateRequest) -> Result<UpdateResult> {
164 let mut storage = self.storage.write().await;
165
166 let mut item = storage.get_item(&update.id).await?
168 .ok_or(VectraError::ItemNotFound)?;
169
170 if let Some(vector) = update.vector {
172 item.vector = vector;
173 }
174
175 if let Some(metadata) = update.metadata {
176 merge_json(&mut item.metadata, metadata);
177 }
178
179 item.version += 1;
181 item.updated_at = Utc::now();
182
183 storage.update_item(&item).await?;
185
186 Ok(UpdateResult {
187 id: item.id,
188 version: item.version,
189 })
190 }
191
192 pub async fn upsert_item(&self, item: impl Into<VectorItem>) -> Result<VectorItem> {
194 let item = item.into();
195 let mut storage = self.storage.write().await;
196
197 if storage.get_item(&item.id).await?.is_some() {
198 storage.update_item(&item).await?;
200 } else {
201 storage.insert_item(&item).await?;
203 }
204
205 Ok(item)
206 }
207
208 pub async fn get_item(&self, id: &Uuid) -> Result<Option<VectorItem>> {
210 let storage = self.storage.read().await;
211 storage.get_item(id).await
212 }
213
214 pub async fn delete_item(&self, id: &Uuid) -> Result<()> {
216 let mut storage = self.storage.write().await;
217 storage.delete_item(id).await
218 }
219
220 pub async fn list_items(&self, options: Option<ListOptions>) -> Result<Vec<VectorItem>> {
222 let storage = self.storage.read().await;
223 storage.list_items(options).await
224 }
225
226 pub async fn query_items(
228 &self,
229 vector: Vec<f32>,
230 top_k: Option<u32>,
231 filter: Option<serde_json::Value>,
232 ) -> Result<Vec<QueryResult>> {
233 self.query_items_extended(vector, None, top_k, filter).await
234 }
235
236 pub async fn query_items_extended(
238 &self,
239 vector: Vec<f32>,
240 text_query: Option<String>,
241 top_k: Option<u32>,
242 filter: Option<serde_json::Value>,
243 ) -> Result<Vec<QueryResult>> {
244 let storage = self.storage.read().await;
245
246 let query = Query {
247 vector: Some(vector),
248 text: text_query,
249 top_k: top_k.unwrap_or(10) as usize,
250 filter,
251 };
252
253 storage.query_items(&query).await
254 }
255
256 pub async fn begin_update(&self) -> Result<()> {
258 let mut storage = self.storage.write().await;
259 storage.begin_transaction().await
260 }
261
262 pub async fn end_update(&self) -> Result<()> {
264 let mut storage = self.storage.write().await;
265 storage.commit_transaction().await
266 }
267
268 pub async fn cancel_update(&self) -> Result<()> {
270 let mut storage = self.storage.write().await;
271 storage.rollback_transaction().await
272 }
273
274 pub async fn delete_index(&self) -> Result<()> {
276 let mut storage = self.storage.write().await;
277 storage.delete_index().await
278 }
279
280 pub async fn get_stats(&self) -> Result<IndexStats> {
282 let storage = self.storage.read().await;
283 storage.get_stats().await
284 }
285}
286
287fn merge_json(target: &mut serde_json::Value, source: serde_json::Value) {
289 if let (Some(target_obj), Some(source_obj)) = (target.as_object_mut(), source.as_object()) {
290 for (key, value) in source_obj {
291 target_obj.insert(key.clone(), value.clone());
292 }
293 } else {
294 *target = source;
295 }
296}
297
298struct DummyStorage;
300
301#[async_trait]
302impl StorageBackend for DummyStorage {
303 async fn exists(&self) -> bool { false }
304 async fn create_index(&mut self, _config: &CreateIndexConfig) -> Result<()> { Ok(()) }
305 async fn get_item(&self, _id: &uuid::Uuid) -> Result<Option<VectorItem>> { Ok(None) }
306 async fn insert_item(&mut self, _item: &VectorItem) -> Result<()> { Ok(()) }
307 async fn update_item(&mut self, _item: &VectorItem) -> Result<()> { Ok(()) }
308 async fn delete_item(&mut self, _id: &uuid::Uuid) -> Result<()> { Ok(()) }
309 async fn list_items(&self, _options: Option<ListOptions>) -> Result<Vec<VectorItem>> { Ok(Vec::new()) }
310 async fn query_items(&self, _query: &Query) -> Result<Vec<QueryResult>> { Ok(Vec::new()) }
311 async fn begin_transaction(&mut self) -> Result<()> { Ok(()) }
312 async fn commit_transaction(&mut self) -> Result<()> { Ok(()) }
313 async fn rollback_transaction(&mut self) -> Result<()> { Ok(()) }
314 async fn delete_index(&mut self) -> Result<()> { Ok(()) }
315 async fn get_stats(&self) -> Result<IndexStats> {
316 Ok(IndexStats {
317 items: 0,
318 size: 0,
319 dimensions: None,
320 distance_metric: DistanceMetric::Cosine,
321 })
322 }
323}
324
325fn create_dummy_storage() -> Box<dyn StorageBackend> {
326 Box::new(DummyStorage)
327}
328