1use async_trait::async_trait;
11use common::{DakeraError, NamespaceId, Result, Vector, VectorId};
12use futures_util::stream::{self, StreamExt};
13use opendal::{layers::RetryLayer, services, Operator};
14use serde::{Deserialize, Serialize};
15use std::time::Duration;
16
17use crate::traits::{IndexStorage, IndexType, VectorStorage};
18
19fn s3_concurrent_ops() -> usize {
20 std::env::var("DAKERA_S3_CONCURRENT_OPS")
21 .ok()
22 .and_then(|v| v.parse().ok())
23 .unwrap_or(16)
24}
25
26#[derive(Debug, Clone, Serialize, Deserialize)]
28struct NamespaceMetadata {
29 dimension: Option<usize>,
30 vector_count: usize,
31 created_at: u64,
32 updated_at: u64,
33}
34
35#[derive(Debug, Clone, Serialize, Deserialize)]
37struct StoredVector {
38 id: String,
39 values: Vec<f32>,
40 metadata: Option<serde_json::Value>,
41}
42
43impl From<Vector> for StoredVector {
44 fn from(v: Vector) -> Self {
45 Self {
46 id: v.id,
47 values: v.values,
48 metadata: v.metadata,
49 }
50 }
51}
52
53impl From<StoredVector> for Vector {
54 fn from(v: StoredVector) -> Self {
55 Self {
56 id: v.id,
57 values: v.values,
58 metadata: v.metadata,
59 ttl_seconds: None,
60 expires_at: None,
61 }
62 }
63}
64
65#[derive(Debug, Clone, Default)]
67pub enum ObjectStorageConfig {
68 #[default]
70 Memory,
71 Filesystem { root: String },
73 S3 {
75 bucket: String,
76 region: Option<String>,
77 endpoint: Option<String>,
78 access_key_id: Option<String>,
79 secret_access_key: Option<String>,
80 },
81 Azure {
83 container: String,
84 account_name: String,
85 account_key: Option<String>,
86 sas_token: Option<String>,
87 endpoint: Option<String>,
88 },
89 Gcs {
91 bucket: String,
92 credential_path: Option<String>,
93 endpoint: Option<String>,
94 },
95}
96
97#[derive(Clone)]
99pub struct ObjectStorage {
100 operator: Operator,
101}
102
103impl ObjectStorage {
104 pub fn new(config: ObjectStorageConfig) -> Result<Self> {
106 let operator = Self::build_operator(&config)?;
107 Ok(Self { operator })
108 }
109
110 pub fn memory() -> Result<Self> {
112 Self::new(ObjectStorageConfig::Memory)
113 }
114
115 pub fn filesystem(root: impl Into<String>) -> Result<Self> {
117 Self::new(ObjectStorageConfig::Filesystem { root: root.into() })
118 }
119
120 pub fn s3(bucket: impl Into<String>) -> Result<Self> {
122 Self::new(ObjectStorageConfig::S3 {
123 bucket: bucket.into(),
124 region: None,
125 endpoint: None,
126 access_key_id: None,
127 secret_access_key: None,
128 })
129 }
130
131 pub fn azure(container: impl Into<String>, account_name: impl Into<String>) -> Result<Self> {
133 Self::new(ObjectStorageConfig::Azure {
134 container: container.into(),
135 account_name: account_name.into(),
136 account_key: None,
137 sas_token: None,
138 endpoint: None,
139 })
140 }
141
142 pub fn gcs(bucket: impl Into<String>) -> Result<Self> {
144 Self::new(ObjectStorageConfig::Gcs {
145 bucket: bucket.into(),
146 credential_path: None,
147 endpoint: None,
148 })
149 }
150
151 pub fn build_operator(config: &ObjectStorageConfig) -> Result<Operator> {
152 match config {
153 ObjectStorageConfig::Memory => {
154 let builder = services::Memory::default();
155 Operator::new(builder)
156 .map(|op| op.finish())
157 .map_err(|e| DakeraError::Storage(e.to_string()))
158 }
159 ObjectStorageConfig::Filesystem { root } => {
160 let builder = services::Fs::default().root(root);
161 Operator::new(builder)
162 .map(|op| op.layer(Self::retry_layer()).finish())
163 .map_err(|e| DakeraError::Storage(e.to_string()))
164 }
165 ObjectStorageConfig::S3 {
166 bucket,
167 region,
168 endpoint,
169 access_key_id,
170 secret_access_key,
171 } => {
172 let mut builder = services::S3::default().bucket(bucket);
173
174 if let Some(region) = region {
175 builder = builder.region(region);
176 }
177 if let Some(endpoint) = endpoint {
178 builder = builder.endpoint(endpoint);
179 }
180 if let Some(key) = access_key_id {
181 builder = builder.access_key_id(key);
182 }
183 if let Some(secret) = secret_access_key {
184 builder = builder.secret_access_key(secret);
185 }
186
187 Operator::new(builder)
188 .map(|op| op.layer(Self::retry_layer()).finish())
189 .map_err(|e| DakeraError::Storage(e.to_string()))
190 }
191 ObjectStorageConfig::Azure {
192 container,
193 account_name,
194 account_key,
195 sas_token,
196 endpoint,
197 } => {
198 let mut builder = services::Azblob::default()
199 .container(container)
200 .account_name(account_name);
201
202 if let Some(key) = account_key {
203 builder = builder.account_key(key);
204 }
205 if let Some(token) = sas_token {
206 builder = builder.sas_token(token);
207 }
208 if let Some(endpoint) = endpoint {
209 builder = builder.endpoint(endpoint);
210 }
211
212 Operator::new(builder)
213 .map(|op| op.layer(Self::retry_layer()).finish())
214 .map_err(|e| DakeraError::Storage(e.to_string()))
215 }
216 ObjectStorageConfig::Gcs {
217 bucket,
218 credential_path,
219 endpoint,
220 } => {
221 let mut builder = services::Gcs::default().bucket(bucket);
222
223 if let Some(cred_path) = credential_path {
224 builder = builder.credential_path(cred_path);
225 }
226 if let Some(endpoint) = endpoint {
227 builder = builder.endpoint(endpoint);
228 }
229
230 Operator::new(builder)
231 .map(|op| op.layer(Self::retry_layer()).finish())
232 .map_err(|e| DakeraError::Storage(e.to_string()))
233 }
234 }
235 }
236
237 fn retry_layer() -> RetryLayer {
238 let max_times: usize = std::env::var("DAKERA_S3_MAX_RETRIES")
239 .ok()
240 .and_then(|v| v.parse().ok())
241 .unwrap_or(10);
242 let min_delay_ms: u64 = std::env::var("DAKERA_S3_RETRY_MIN_DELAY_MS")
243 .ok()
244 .and_then(|v| v.parse().ok())
245 .unwrap_or(500);
246 let max_delay_secs: u64 = std::env::var("DAKERA_S3_RETRY_MAX_DELAY_SECS")
247 .ok()
248 .and_then(|v| v.parse().ok())
249 .unwrap_or(60);
250
251 tracing::info!(
252 max_times,
253 min_delay_ms,
254 max_delay_secs,
255 "S3 retry layer configured"
256 );
257
258 RetryLayer::new()
259 .with_max_times(max_times)
260 .with_min_delay(Duration::from_millis(min_delay_ms))
261 .with_max_delay(Duration::from_secs(max_delay_secs))
262 .with_jitter()
263 .with_factor(2.0)
264 }
265
266 fn vector_path(namespace: &str, vector_id: &str) -> String {
268 format!("namespaces/{}/vectors/{}.json", namespace, vector_id)
269 }
270
271 fn namespace_meta_path(namespace: &str) -> String {
273 format!("namespaces/{}/meta.json", namespace)
274 }
275
276 fn namespace_vectors_prefix(namespace: &str) -> String {
278 format!("namespaces/{}/vectors/", namespace)
279 }
280
281 async fn read_namespace_meta(&self, namespace: &str) -> Result<Option<NamespaceMetadata>> {
283 let path = Self::namespace_meta_path(namespace);
284 match self.operator.read(&path).await {
285 Ok(data) => {
286 let bytes = data.to_vec();
287 if bytes.is_empty() {
288 tracing::warn!(
289 namespace = %namespace,
290 path = %path,
291 "Empty namespace metadata file detected, treating as missing"
292 );
293 return Ok(None);
294 }
295 match serde_json::from_slice(&bytes) {
296 Ok(meta) => Ok(Some(meta)),
297 Err(e) => {
298 tracing::warn!(
299 namespace = %namespace,
300 path = %path,
301 error = %e,
302 bytes_len = bytes.len(),
303 "Corrupted namespace metadata, treating as missing and will be recreated"
304 );
305 Ok(None)
306 }
307 }
308 }
309 Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(None),
310 Err(e) => Err(DakeraError::Storage(e.to_string())),
311 }
312 }
313
314 async fn write_namespace_meta(&self, namespace: &str, meta: &NamespaceMetadata) -> Result<()> {
316 let path = Self::namespace_meta_path(namespace);
317 let data = serde_json::to_vec(meta).map_err(|e| DakeraError::Storage(e.to_string()))?;
318 self.operator
319 .write(&path, data)
320 .await
321 .map_err(|e| DakeraError::Storage(e.to_string()))?;
322 Ok(())
323 }
324
325 fn now() -> u64 {
327 std::time::SystemTime::now()
328 .duration_since(std::time::UNIX_EPOCH)
329 .unwrap_or_default()
330 .as_secs()
331 }
332}
333
334#[async_trait]
335impl VectorStorage for ObjectStorage {
336 async fn upsert(&self, namespace: &NamespaceId, vectors: Vec<Vector>) -> Result<usize> {
337 if vectors.is_empty() {
338 return Ok(0);
339 }
340
341 let mut meta = self
343 .read_namespace_meta(namespace)
344 .await?
345 .unwrap_or_else(|| NamespaceMetadata {
346 dimension: None,
347 vector_count: 0,
348 created_at: Self::now(),
349 updated_at: Self::now(),
350 });
351
352 let first_dim = vectors[0].values.len();
354 if let Some(dim) = meta.dimension {
355 for v in &vectors {
356 if v.values.len() != dim {
357 return Err(DakeraError::DimensionMismatch {
358 expected: dim,
359 actual: v.values.len(),
360 });
361 }
362 }
363 } else {
364 meta.dimension = Some(first_dim);
365 }
366
367 let mut upserted = 0;
368 for vector in vectors {
369 let path = Self::vector_path(namespace, &vector.id);
370 let stored: StoredVector = vector.into();
371 let data =
372 serde_json::to_vec(&stored).map_err(|e| DakeraError::Storage(e.to_string()))?;
373
374 let exists = self
376 .operator
377 .exists(&path)
378 .await
379 .map_err(|e| DakeraError::Storage(e.to_string()))?;
380
381 self.operator
382 .write(&path, data)
383 .await
384 .map_err(|e| DakeraError::Storage(e.to_string()))?;
385
386 if !exists {
387 meta.vector_count += 1;
388 }
389 upserted += 1;
390 }
391
392 meta.updated_at = Self::now();
393 self.write_namespace_meta(namespace, &meta).await?;
394
395 tracing::debug!(
396 namespace = namespace,
397 upserted = upserted,
398 "Upserted vectors to object storage"
399 );
400
401 Ok(upserted)
402 }
403
404 async fn get(&self, namespace: &NamespaceId, ids: &[VectorId]) -> Result<Vec<Vector>> {
405 if ids.is_empty() {
406 return Ok(Vec::new());
407 }
408
409 let now = std::time::SystemTime::now()
410 .duration_since(std::time::UNIX_EPOCH)
411 .unwrap_or_default()
412 .as_secs();
413
414 let read_tasks: Vec<_> = ids
415 .iter()
416 .map(|id| {
417 let operator = self.operator.clone();
418 let path = Self::vector_path(namespace, id);
419 let id = id.clone();
420 async move {
421 match operator.read(&path).await {
422 Ok(data) => {
423 let bytes = data.to_vec();
424 if bytes.is_empty() {
425 tracing::warn!(
426 vector_id = %id,
427 "Empty vector file detected, skipping"
428 );
429 return Ok(None);
430 }
431 match serde_json::from_slice::<StoredVector>(&bytes) {
432 Ok(stored) => {
433 let vector: Vector = stored.into();
434 if !vector.is_expired_at(now) {
435 Ok(Some(vector))
436 } else {
437 Ok(None)
438 }
439 }
440 Err(e) => {
441 tracing::warn!(
442 vector_id = %id,
443 error = %e,
444 bytes_len = bytes.len(),
445 "Corrupted vector file detected, skipping"
446 );
447 Ok(None)
448 }
449 }
450 }
451 Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(None),
452 Err(e) => Err(DakeraError::Storage(e.to_string())),
453 }
454 }
455 })
456 .collect();
457
458 let results: Vec<Result<Option<Vector>>> = stream::iter(read_tasks)
459 .buffer_unordered(s3_concurrent_ops())
460 .collect()
461 .await;
462
463 let mut vectors = Vec::with_capacity(ids.len());
464 for result in results {
465 if let Some(v) = result? {
466 vectors.push(v);
467 }
468 }
469 Ok(vectors)
470 }
471
472 async fn get_all(&self, namespace: &NamespaceId) -> Result<Vec<Vector>> {
473 let prefix = Self::namespace_vectors_prefix(namespace);
474
475 let entries = self
476 .operator
477 .list(&prefix)
478 .await
479 .map_err(|e| DakeraError::Storage(e.to_string()))?;
480
481 let json_paths: Vec<String> = entries
482 .into_iter()
483 .filter(|e| e.path().ends_with(".json"))
484 .map(|e| e.path().to_string())
485 .collect();
486
487 if json_paths.is_empty() {
488 return Ok(Vec::new());
489 }
490
491 let now = std::time::SystemTime::now()
492 .duration_since(std::time::UNIX_EPOCH)
493 .unwrap_or_default()
494 .as_secs();
495
496 let results: Vec<Option<Vector>> = stream::iter(json_paths.into_iter().map(|path| {
497 let operator = self.operator.clone();
498 async move {
499 match operator.read(&path).await {
500 Ok(data) => {
501 let bytes = data.to_vec();
502 if let Ok(stored) = serde_json::from_slice::<StoredVector>(&bytes) {
503 let vector: Vector = stored.into();
504 if !vector.is_expired_at(now) {
505 return Some(vector);
506 }
507 }
508 None
509 }
510 Err(e) => {
511 tracing::warn!(path = %path, error = %e, "Failed to read vector");
512 None
513 }
514 }
515 }
516 }))
517 .buffer_unordered(s3_concurrent_ops())
518 .collect()
519 .await;
520
521 Ok(results.into_iter().flatten().collect())
522 }
523
524 async fn delete(&self, namespace: &NamespaceId, ids: &[VectorId]) -> Result<usize> {
525 if ids.is_empty() {
526 return Ok(0);
527 }
528
529 let delete_tasks: Vec<_> = ids
530 .iter()
531 .map(|id| {
532 let operator = self.operator.clone();
533 let path = Self::vector_path(namespace, id);
534 async move {
535 let exists = operator
536 .exists(&path)
537 .await
538 .map_err(|e| DakeraError::Storage(e.to_string()))?;
539 if exists {
540 match operator.delete(&path).await {
541 Ok(_) => Ok(true),
542 Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(false),
543 Err(e) => Err(DakeraError::Storage(e.to_string())),
544 }
545 } else {
546 Ok(false)
547 }
548 }
549 })
550 .collect();
551
552 let results: Vec<Result<bool>> = stream::iter(delete_tasks)
553 .buffer_unordered(s3_concurrent_ops())
554 .collect()
555 .await;
556
557 let mut deleted = 0;
558 for result in results {
559 if result? {
560 deleted += 1;
561 }
562 }
563
564 if deleted > 0 {
566 if let Some(mut meta) = self.read_namespace_meta(namespace).await? {
567 meta.vector_count = meta.vector_count.saturating_sub(deleted);
568 meta.updated_at = Self::now();
569 self.write_namespace_meta(namespace, &meta).await?;
570 }
571 }
572
573 Ok(deleted)
574 }
575
576 async fn namespace_exists(&self, namespace: &NamespaceId) -> Result<bool> {
577 Ok(self.read_namespace_meta(namespace).await?.is_some())
578 }
579
580 async fn ensure_namespace(&self, namespace: &NamespaceId) -> Result<()> {
581 if self.read_namespace_meta(namespace).await?.is_none() {
582 let meta = NamespaceMetadata {
583 dimension: None,
584 vector_count: 0,
585 created_at: Self::now(),
586 updated_at: Self::now(),
587 };
588 self.write_namespace_meta(namespace, &meta).await?;
589 }
590 Ok(())
591 }
592
593 async fn dimension(&self, namespace: &NamespaceId) -> Result<Option<usize>> {
594 Ok(self
595 .read_namespace_meta(namespace)
596 .await?
597 .and_then(|m| m.dimension))
598 }
599
600 async fn count(&self, namespace: &NamespaceId) -> Result<usize> {
601 Ok(self
602 .read_namespace_meta(namespace)
603 .await?
604 .map(|m| m.vector_count)
605 .unwrap_or(0))
606 }
607
608 async fn list_namespaces(&self) -> Result<Vec<NamespaceId>> {
609 let entries = self
610 .operator
611 .list("namespaces/")
612 .await
613 .map_err(|e| DakeraError::Storage(e.to_string()))?;
614
615 let mut namespaces = Vec::new();
616 for entry in entries {
617 let path = entry.path();
618 if let Some(ns) = path.strip_prefix("namespaces/") {
620 let ns = ns.trim_end_matches('/');
621 if !ns.is_empty() && !ns.contains('/') {
622 if self.read_namespace_meta(ns).await?.is_some() {
627 namespaces.push(ns.to_string());
628 }
629 }
630 }
631 }
632
633 Ok(namespaces)
634 }
635
636 async fn delete_namespace(&self, namespace: &NamespaceId) -> Result<bool> {
637 if !self.namespace_exists(namespace).await? {
639 return Ok(false);
640 }
641
642 let prefix = format!("namespaces/{}/", namespace);
646 self.operator
647 .delete_with(&prefix)
648 .recursive(true)
649 .await
650 .map_err(|e| DakeraError::Storage(e.to_string()))?;
651
652 tracing::debug!(
653 namespace = namespace,
654 "Deleted namespace from object storage"
655 );
656
657 Ok(true)
658 }
659
660 async fn cleanup_expired(&self, _namespace: &NamespaceId) -> Result<usize> {
661 Ok(0)
664 }
665
666 async fn cleanup_all_expired(&self) -> Result<usize> {
667 Ok(0)
669 }
670}
671
672#[async_trait]
673impl IndexStorage for ObjectStorage {
674 async fn save_index(
675 &self,
676 namespace: &NamespaceId,
677 index_type: IndexType,
678 data: Vec<u8>,
679 ) -> Result<()> {
680 let path = format!(
681 "namespaces/{}/indexes/{}.bin",
682 namespace,
683 index_type.as_str()
684 );
685 self.operator
686 .write(&path, data)
687 .await
688 .map_err(|e| DakeraError::Storage(e.to_string()))?;
689
690 tracing::debug!(
691 namespace = namespace,
692 index_type = index_type.as_str(),
693 "Saved index to object storage"
694 );
695 Ok(())
696 }
697
698 async fn load_index(
699 &self,
700 namespace: &NamespaceId,
701 index_type: IndexType,
702 ) -> Result<Option<Vec<u8>>> {
703 let path = format!(
704 "namespaces/{}/indexes/{}.bin",
705 namespace,
706 index_type.as_str()
707 );
708 match self.operator.read(&path).await {
709 Ok(data) => {
710 tracing::debug!(
711 namespace = namespace,
712 index_type = index_type.as_str(),
713 size = data.len(),
714 "Loaded index from object storage"
715 );
716 Ok(Some(data.to_vec()))
717 }
718 Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(None),
719 Err(e) => Err(DakeraError::Storage(e.to_string())),
720 }
721 }
722
723 async fn delete_index(&self, namespace: &NamespaceId, index_type: IndexType) -> Result<bool> {
724 let path = format!(
725 "namespaces/{}/indexes/{}.bin",
726 namespace,
727 index_type.as_str()
728 );
729 let exists = self
730 .operator
731 .exists(&path)
732 .await
733 .map_err(|e| DakeraError::Storage(e.to_string()))?;
734
735 if exists {
736 self.operator
737 .delete(&path)
738 .await
739 .map_err(|e| DakeraError::Storage(e.to_string()))?;
740 tracing::debug!(
741 namespace = namespace,
742 index_type = index_type.as_str(),
743 "Deleted index from object storage"
744 );
745 }
746 Ok(exists)
747 }
748
749 async fn index_exists(&self, namespace: &NamespaceId, index_type: IndexType) -> Result<bool> {
750 let path = format!(
751 "namespaces/{}/indexes/{}.bin",
752 namespace,
753 index_type.as_str()
754 );
755 self.operator
756 .exists(&path)
757 .await
758 .map_err(|e| DakeraError::Storage(e.to_string()))
759 }
760
761 async fn list_indexes(&self, namespace: &NamespaceId) -> Result<Vec<IndexType>> {
762 let prefix = format!("namespaces/{}/indexes/", namespace);
763 let entries = self
764 .operator
765 .list(&prefix)
766 .await
767 .map_err(|e| DakeraError::Storage(e.to_string()))?;
768
769 let mut indexes = Vec::new();
770 for entry in entries {
771 let path = entry.path();
772 if path.ends_with(".bin") {
773 if let Some(filename) = path.strip_prefix(&prefix) {
775 let name = filename.trim_end_matches(".bin");
776 match name {
777 "hnsw" => indexes.push(IndexType::Hnsw),
778 "pq" => indexes.push(IndexType::Pq),
779 "ivf" => indexes.push(IndexType::Ivf),
780 "spfresh" => indexes.push(IndexType::SpFresh),
781 "fulltext" => indexes.push(IndexType::FullText),
782 _ => {} }
784 }
785 }
786 }
787
788 Ok(indexes)
789 }
790}
791
792pub fn create_operator(config: &ObjectStorageConfig) -> Result<Operator> {
795 ObjectStorage::build_operator(config)
796}
797
798#[cfg(test)]
799mod tests {
800 use super::*;
801
802 #[tokio::test]
803 async fn test_object_storage_memory() {
804 let storage = ObjectStorage::memory().unwrap();
805 let namespace = "test".to_string();
806
807 storage.ensure_namespace(&namespace).await.unwrap();
809 assert!(storage.namespace_exists(&namespace).await.unwrap());
810
811 let vectors = vec![
813 Vector {
814 id: "v1".to_string(),
815 values: vec![1.0, 2.0, 3.0],
816 metadata: None,
817 ttl_seconds: None,
818 expires_at: None,
819 },
820 Vector {
821 id: "v2".to_string(),
822 values: vec![4.0, 5.0, 6.0],
823 metadata: Some(serde_json::json!({"key": "value"})),
824 ttl_seconds: None,
825 expires_at: None,
826 },
827 ];
828
829 let count = storage.upsert(&namespace, vectors).await.unwrap();
830 assert_eq!(count, 2);
831
832 let results = storage.get(&namespace, &["v1".to_string()]).await.unwrap();
834 assert_eq!(results.len(), 1);
835 assert_eq!(results[0].id, "v1");
836 assert_eq!(results[0].values, vec![1.0, 2.0, 3.0]);
837
838 let all = storage.get_all(&namespace).await.unwrap();
840 assert_eq!(all.len(), 2);
841
842 assert_eq!(storage.count(&namespace).await.unwrap(), 2);
844
845 let deleted = storage
847 .delete(&namespace, &["v1".to_string()])
848 .await
849 .unwrap();
850 assert_eq!(deleted, 1);
851 assert!(storage
852 .get(&namespace, &["v1".to_string()])
853 .await
854 .unwrap()
855 .is_empty());
856 assert_eq!(storage.count(&namespace).await.unwrap(), 1);
857 }
858
859 #[tokio::test]
860 async fn test_object_storage_dimension_mismatch() {
861 let storage = ObjectStorage::memory().unwrap();
862 let namespace = "test".to_string();
863 storage.ensure_namespace(&namespace).await.unwrap();
864
865 let v1 = vec![Vector {
867 id: "v1".to_string(),
868 values: vec![1.0, 2.0, 3.0],
869 metadata: None,
870 ttl_seconds: None,
871 expires_at: None,
872 }];
873 storage.upsert(&namespace, v1).await.unwrap();
874
875 let v2 = vec![Vector {
877 id: "v2".to_string(),
878 values: vec![1.0, 2.0], metadata: None,
880 ttl_seconds: None,
881 expires_at: None,
882 }];
883 let result = storage.upsert(&namespace, v2).await;
884 assert!(result.is_err());
885 }
886
887 #[tokio::test]
888 async fn test_object_storage_upsert() {
889 let storage = ObjectStorage::memory().unwrap();
890 let namespace = "test".to_string();
891 storage.ensure_namespace(&namespace).await.unwrap();
892
893 let v1 = vec![Vector {
895 id: "v1".to_string(),
896 values: vec![1.0, 2.0],
897 metadata: None,
898 ttl_seconds: None,
899 expires_at: None,
900 }];
901 storage.upsert(&namespace, v1).await.unwrap();
902
903 let v1_updated = vec![Vector {
905 id: "v1".to_string(),
906 values: vec![3.0, 4.0],
907 metadata: None,
908 ttl_seconds: None,
909 expires_at: None,
910 }];
911 storage.upsert(&namespace, v1_updated).await.unwrap();
912
913 let results = storage.get(&namespace, &["v1".to_string()]).await.unwrap();
915 assert_eq!(results.len(), 1);
916 assert_eq!(results[0].values, vec![3.0, 4.0]);
917
918 assert_eq!(storage.count(&namespace).await.unwrap(), 1);
920 }
921
922 #[tokio::test]
923 async fn test_index_storage() {
924 let storage = ObjectStorage::memory().unwrap();
925 let namespace = "test_index".to_string();
926
927 assert!(!storage
929 .index_exists(&namespace, IndexType::Hnsw)
930 .await
931 .unwrap());
932
933 let index_data = b"fake hnsw index data for testing".to_vec();
935 storage
936 .save_index(&namespace, IndexType::Hnsw, index_data.clone())
937 .await
938 .unwrap();
939
940 assert!(storage
942 .index_exists(&namespace, IndexType::Hnsw)
943 .await
944 .unwrap());
945 assert!(!storage
946 .index_exists(&namespace, IndexType::Pq)
947 .await
948 .unwrap());
949
950 let loaded = storage
952 .load_index(&namespace, IndexType::Hnsw)
953 .await
954 .unwrap();
955 assert!(loaded.is_some());
956 assert_eq!(loaded.unwrap(), index_data);
957
958 let pq_data = b"fake pq index data".to_vec();
960 storage
961 .save_index(&namespace, IndexType::Pq, pq_data)
962 .await
963 .unwrap();
964
965 let indexes = storage.list_indexes(&namespace).await.unwrap();
967 assert_eq!(indexes.len(), 2);
968 assert!(indexes.contains(&IndexType::Hnsw));
969 assert!(indexes.contains(&IndexType::Pq));
970
971 let deleted = storage
973 .delete_index(&namespace, IndexType::Hnsw)
974 .await
975 .unwrap();
976 assert!(deleted);
977 assert!(!storage
978 .index_exists(&namespace, IndexType::Hnsw)
979 .await
980 .unwrap());
981
982 let deleted = storage
984 .delete_index(&namespace, IndexType::Hnsw)
985 .await
986 .unwrap();
987 assert!(!deleted);
988
989 let loaded = storage
991 .load_index(&namespace, IndexType::Hnsw)
992 .await
993 .unwrap();
994 assert!(loaded.is_none());
995 }
996}