vectrust_storage/
legacy.rs1use async_trait::async_trait;
2use std::path::{Path, PathBuf};
3use tokio::fs;
4use vectrust_core::*;
5use uuid::Uuid;
6use serde::{Serialize, Deserialize};
7
8pub struct LegacyStorage {
10 path: PathBuf,
11 index_name: String,
12 cache: tokio::sync::RwLock<Option<LegacyIndexFile>>,
13}
14
15#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct LegacyIndexFile {
18 pub version: u32,
19 pub metadata_config: MetadataConfig,
20 pub items: Vec<VectorItem>,
21}
22
23impl LegacyStorage {
24 pub fn new(path: &Path, index_name: &str) -> Result<Self> {
25 Ok(Self {
26 path: path.to_path_buf(),
27 index_name: index_name.to_string(),
28 cache: tokio::sync::RwLock::new(None),
29 })
30 }
31
32 async fn perform_vector_search(&self, items: &[VectorItem], query_vector: &[f32], top_k: usize) -> Result<Vec<QueryResult>> {
33 let mut results = self.compute_similarity_scores(items, query_vector)?;
34
35 results.sort_by(|a, b| b.score.partial_cmp(&a.score).unwrap_or(std::cmp::Ordering::Equal));
37 results.truncate(top_k);
38
39 self.load_results_metadata(&mut results).await?;
41
42 Ok(results)
43 }
44
45 fn compute_similarity_scores(&self, items: &[VectorItem], query_vector: &[f32]) -> Result<Vec<QueryResult>> {
46 let mut results = Vec::new();
47
48 for item in items {
49 if let Some(result) = Self::evaluate_item_similarity(item, query_vector) {
50 results.push(result);
51 }
52 }
53
54 Ok(results)
55 }
56
57 fn evaluate_item_similarity(item: &VectorItem, query_vector: &[f32]) -> Option<QueryResult> {
58 if item.deleted {
60 return None;
61 }
62
63 if !VectorOps::compatible_dimensions(query_vector, &item.vector) {
65 return None;
66 }
67
68 let similarity = VectorOps::calculate_similarity(
70 query_vector,
71 &item.vector,
72 &DistanceMetric::Cosine
73 );
74
75 if similarity.is_finite() {
77 Some(QueryResult {
78 item: item.clone(),
79 score: similarity,
80 })
81 } else {
82 None
83 }
84 }
85
86 async fn load_results_metadata(&self, results: &mut Vec<QueryResult>) -> Result<()> {
87 for result in results {
88 if let Some(external_metadata) = self.load_metadata(&result.item.id).await? {
89 result.item.metadata = external_metadata;
90 }
91 }
92 Ok(())
93 }
94
95 fn index_path(&self) -> PathBuf {
96 self.path.join(&self.index_name)
97 }
98
99 async fn load_index(&self) -> Result<LegacyIndexFile> {
100 {
102 let cache = self.cache.read().await;
103 if let Some(ref index) = *cache {
104 return Ok(index.clone());
105 }
106 }
107
108 let path = self.index_path();
110 if !path.exists() {
111 return Err(VectraError::IndexNotFound {
112 path: path.to_string_lossy().to_string()
113 });
114 }
115
116 let content = fs::read_to_string(&path).await?;
117 let index: LegacyIndexFile = serde_json::from_str(&content)?;
118
119 {
121 let mut cache = self.cache.write().await;
122 *cache = Some(index.clone());
123 }
124
125 Ok(index)
126 }
127
128 async fn save_index(&self, index: &LegacyIndexFile) -> Result<()> {
129 let path = self.index_path();
130
131 if let Some(parent) = path.parent() {
133 fs::create_dir_all(parent).await?;
134 }
135
136 let temp_path = path.with_extension("tmp");
138 let content = serde_json::to_string_pretty(index)?;
139 fs::write(&temp_path, content).await?;
140 fs::rename(&temp_path, &path).await?;
141
142 {
144 let mut cache = self.cache.write().await;
145 *cache = Some(index.clone());
146 }
147
148 Ok(())
149 }
150
151 async fn load_metadata(&self, id: &Uuid) -> Result<Option<serde_json::Value>> {
152 let metadata_path = self.path.join(format!("{}.json", id));
153
154 if !metadata_path.exists() {
155 return Ok(None);
156 }
157
158 let content = fs::read_to_string(metadata_path).await?;
159 let metadata: serde_json::Value = serde_json::from_str(&content)?;
160 Ok(Some(metadata))
161 }
162
163 async fn save_metadata(&self, id: &Uuid, metadata: &serde_json::Value) -> Result<()> {
164 let metadata_path = self.path.join(format!("{}.json", id));
165 let content = serde_json::to_string_pretty(metadata)?;
166 fs::write(metadata_path, content).await?;
167 Ok(())
168 }
169
170 async fn delete_metadata(&self, id: &Uuid) -> Result<()> {
171 let metadata_path = self.path.join(format!("{}.json", id));
172 if metadata_path.exists() {
173 fs::remove_file(metadata_path).await?;
174 }
175 Ok(())
176 }
177}
178
179#[async_trait]
180impl StorageBackend for LegacyStorage {
181 async fn exists(&self) -> bool {
182 self.index_path().exists()
183 }
184
185 async fn create_index(&mut self, config: &CreateIndexConfig) -> Result<()> {
186 let index_path = self.index_path();
187
188 if index_path.exists() && !config.delete_if_exists {
189 return Err(VectraError::IndexAlreadyExists {
190 path: index_path.to_string_lossy().to_string()
191 });
192 }
193
194 let index = LegacyIndexFile {
195 version: config.version,
196 metadata_config: config.metadata_config.clone(),
197 items: Vec::new(),
198 };
199
200 self.save_index(&index).await?;
201 Ok(())
202 }
203
204 async fn get_item(&self, id: &Uuid) -> Result<Option<VectorItem>> {
205 let index = self.load_index().await?;
206
207 let item = index.items.iter().find(|item| &item.id == id);
209
210 if let Some(mut item) = item.cloned() {
211 if let Some(external_metadata) = self.load_metadata(id).await? {
213 item.metadata = external_metadata;
214 }
215 Ok(Some(item))
216 } else {
217 Ok(None)
218 }
219 }
220
221 async fn insert_item(&mut self, item: &VectorItem) -> Result<()> {
222 let mut index = self.load_index().await?;
223
224 if index.items.iter().any(|existing| existing.id == item.id) {
226 return Err(VectraError::Storage {
227 message: format!("Item with ID {} already exists", item.id)
228 });
229 }
230
231 let mut item_to_store = item.clone();
233 let metadata_size = serde_json::to_string(&item.metadata)?.len();
234
235 if metadata_size > 1024 {
236 self.save_metadata(&item.id, &item.metadata).await?;
238 item_to_store.metadata = serde_json::Value::Object(serde_json::Map::new());
239 }
240
241 index.items.push(item_to_store);
243 self.save_index(&index).await?;
244
245 Ok(())
246 }
247
248 async fn update_item(&mut self, item: &VectorItem) -> Result<()> {
249 let mut index = self.load_index().await?;
250
251 let position = index.items.iter().position(|existing| existing.id == item.id)
253 .ok_or(VectraError::ItemNotFound)?;
254
255 let mut item_to_store = item.clone();
257 let metadata_size = serde_json::to_string(&item.metadata)?.len();
258
259 if metadata_size > 1024 {
260 self.save_metadata(&item.id, &item.metadata).await?;
261 item_to_store.metadata = serde_json::Value::Object(serde_json::Map::new());
262 } else {
263 self.delete_metadata(&item.id).await?;
265 }
266
267 index.items[position] = item_to_store;
268 self.save_index(&index).await?;
269
270 Ok(())
271 }
272
273 async fn delete_item(&mut self, id: &Uuid) -> Result<()> {
274 let mut index = self.load_index().await?;
275
276 let original_len = index.items.len();
278 index.items.retain(|item| &item.id != id);
279
280 if index.items.len() == original_len {
281 return Err(VectraError::ItemNotFound);
282 }
283
284 self.delete_metadata(id).await?;
286
287 self.save_index(&index).await?;
288 Ok(())
289 }
290
291 async fn list_items(&self, options: Option<ListOptions>) -> Result<Vec<VectorItem>> {
292 let index = self.load_index().await?;
293 let mut items = index.items.clone();
294
295 for item in &mut items {
297 if let Some(external_metadata) = self.load_metadata(&item.id).await? {
298 item.metadata = external_metadata;
299 }
300 }
301
302 if let Some(opts) = options {
304 let offset = opts.offset.unwrap_or(0);
305 let limit = opts.limit.unwrap_or(items.len());
306
307 if offset < items.len() {
308 let end = std::cmp::min(offset + limit, items.len());
309 items = items[offset..end].to_vec();
310 } else {
311 items.clear();
312 }
313 }
314
315 Ok(items)
316 }
317
318 async fn query_items(&self, query: &Query) -> Result<Vec<QueryResult>> {
319 let index = self.load_index().await?;
320
321 if let Some(ref query_vector) = query.vector {
322 let results = self.perform_vector_search(&index.items, query_vector, query.top_k).await?;
323 Ok(results)
324 } else if let Some(ref _text_query) = query.text {
325 Ok(Vec::new())
328 } else {
329 Ok(Vec::new())
331 }
332 }
333
334 async fn begin_transaction(&mut self) -> Result<()> {
335 Ok(())
337 }
338
339 async fn commit_transaction(&mut self) -> Result<()> {
340 Ok(())
342 }
343
344 async fn rollback_transaction(&mut self) -> Result<()> {
345 Ok(())
347 }
348
349 async fn delete_index(&mut self) -> Result<()> {
350 let index_path = self.index_path();
351
352 if index_path.exists() {
353 fs::remove_file(&index_path).await?;
355
356 let mut dir = fs::read_dir(&self.path).await?;
358 let mut metadata_files = Vec::new();
359
360 while let Some(entry) = dir.next_entry().await? {
361 let path = entry.path();
362 if let Some(ext) = path.extension() {
363 if ext == "json" && path != index_path {
364 if let Some(stem) = path.file_stem() {
366 if let Some(stem_str) = stem.to_str() {
367 if Uuid::parse_str(stem_str).is_ok() {
368 metadata_files.push(path);
369 }
370 }
371 }
372 }
373 }
374 }
375
376 for metadata_file in metadata_files {
378 fs::remove_file(metadata_file).await?;
379 }
380 }
381
382 {
384 let mut cache = self.cache.write().await;
385 *cache = None;
386 }
387
388 Ok(())
389 }
390
391 async fn get_stats(&self) -> Result<IndexStats> {
392 if !self.exists().await {
393 return Ok(IndexStats {
394 items: 0,
395 size: 0,
396 dimensions: None,
397 distance_metric: DistanceMetric::Cosine,
398 });
399 }
400
401 let index = self.load_index().await?;
402 let dimensions = index.items.first().map(|item| item.vector.len());
403
404 let index_size = fs::metadata(self.index_path()).await?.len();
406
407 Ok(IndexStats {
408 items: index.items.len(),
409 size: index_size,
410 dimensions,
411 distance_metric: DistanceMetric::Cosine, })
413 }
414}
415