1use async_trait::async_trait;
11use common::{DakeraError, NamespaceId, Result, Vector, VectorId};
12use futures_util::stream::{self, StreamExt};
13use opendal::{layers::RetryLayer, services, Operator};
14use serde::{Deserialize, Serialize};
15use std::time::Duration;
16
17use crate::traits::{IndexStorage, IndexType, VectorStorage};
18
19fn s3_concurrent_ops() -> usize {
20 std::env::var("DAKERA_S3_CONCURRENT_OPS")
21 .ok()
22 .and_then(|v| v.parse().ok())
23 .unwrap_or(16)
24}
25
26#[derive(Debug, Clone, Serialize, Deserialize)]
28struct NamespaceMetadata {
29 dimension: Option<usize>,
30 vector_count: usize,
31 created_at: u64,
32 updated_at: u64,
33}
34
35#[derive(Debug, Clone, Serialize, Deserialize)]
37struct StoredVector {
38 id: String,
39 values: Vec<f32>,
40 metadata: Option<serde_json::Value>,
41}
42
43impl From<Vector> for StoredVector {
44 fn from(v: Vector) -> Self {
45 Self {
46 id: v.id,
47 values: v.values,
48 metadata: v.metadata,
49 }
50 }
51}
52
53impl From<StoredVector> for Vector {
54 fn from(v: StoredVector) -> Self {
55 Self {
56 id: v.id,
57 values: v.values,
58 metadata: v.metadata,
59 ttl_seconds: None,
60 expires_at: None,
61 }
62 }
63}
64
65#[derive(Debug, Clone, Default)]
67pub enum ObjectStorageConfig {
68 #[default]
70 Memory,
71 Filesystem { root: String },
73 S3 {
75 bucket: String,
76 region: Option<String>,
77 endpoint: Option<String>,
78 access_key_id: Option<String>,
79 secret_access_key: Option<String>,
80 },
81 Azure {
83 container: String,
84 account_name: String,
85 account_key: Option<String>,
86 sas_token: Option<String>,
87 endpoint: Option<String>,
88 },
89 Gcs {
91 bucket: String,
92 credential_path: Option<String>,
93 endpoint: Option<String>,
94 },
95}
96
97#[derive(Clone)]
99pub struct ObjectStorage {
100 operator: Operator,
101}
102
103impl ObjectStorage {
104 pub fn new(config: ObjectStorageConfig) -> Result<Self> {
106 let operator = Self::build_operator(&config)?;
107 Ok(Self { operator })
108 }
109
110 pub fn memory() -> Result<Self> {
112 Self::new(ObjectStorageConfig::Memory)
113 }
114
115 pub fn filesystem(root: impl Into<String>) -> Result<Self> {
117 Self::new(ObjectStorageConfig::Filesystem { root: root.into() })
118 }
119
120 pub fn s3(bucket: impl Into<String>) -> Result<Self> {
122 Self::new(ObjectStorageConfig::S3 {
123 bucket: bucket.into(),
124 region: None,
125 endpoint: None,
126 access_key_id: None,
127 secret_access_key: None,
128 })
129 }
130
131 pub fn azure(container: impl Into<String>, account_name: impl Into<String>) -> Result<Self> {
133 Self::new(ObjectStorageConfig::Azure {
134 container: container.into(),
135 account_name: account_name.into(),
136 account_key: None,
137 sas_token: None,
138 endpoint: None,
139 })
140 }
141
142 pub fn gcs(bucket: impl Into<String>) -> Result<Self> {
144 Self::new(ObjectStorageConfig::Gcs {
145 bucket: bucket.into(),
146 credential_path: None,
147 endpoint: None,
148 })
149 }
150
151 pub fn build_operator(config: &ObjectStorageConfig) -> Result<Operator> {
152 match config {
153 ObjectStorageConfig::Memory => {
154 let builder = services::Memory::default();
155 Operator::new(builder)
156 .map(|op| op.finish())
157 .map_err(|e| DakeraError::Storage(e.to_string()))
158 }
159 ObjectStorageConfig::Filesystem { root } => {
160 let builder = services::Fs::default().root(root);
161 Operator::new(builder)
162 .map(|op| op.layer(Self::retry_layer()).finish())
163 .map_err(|e| DakeraError::Storage(e.to_string()))
164 }
165 ObjectStorageConfig::S3 {
166 bucket,
167 region,
168 endpoint,
169 access_key_id,
170 secret_access_key,
171 } => {
172 let mut builder = services::S3::default().bucket(bucket);
173
174 if let Some(region) = region {
175 builder = builder.region(region);
176 }
177 if let Some(endpoint) = endpoint {
178 builder = builder.endpoint(endpoint);
179 }
180 if let Some(key) = access_key_id {
181 builder = builder.access_key_id(key);
182 }
183 if let Some(secret) = secret_access_key {
184 builder = builder.secret_access_key(secret);
185 }
186
187 Operator::new(builder)
188 .map(|op| op.layer(Self::retry_layer()).finish())
189 .map_err(|e| DakeraError::Storage(e.to_string()))
190 }
191 ObjectStorageConfig::Azure {
192 container,
193 account_name,
194 account_key,
195 sas_token,
196 endpoint,
197 } => {
198 let mut builder = services::Azblob::default()
199 .container(container)
200 .account_name(account_name);
201
202 if let Some(key) = account_key {
203 builder = builder.account_key(key);
204 }
205 if let Some(token) = sas_token {
206 builder = builder.sas_token(token);
207 }
208 if let Some(endpoint) = endpoint {
209 builder = builder.endpoint(endpoint);
210 }
211
212 Operator::new(builder)
213 .map(|op| op.layer(Self::retry_layer()).finish())
214 .map_err(|e| DakeraError::Storage(e.to_string()))
215 }
216 ObjectStorageConfig::Gcs {
217 bucket,
218 credential_path,
219 endpoint,
220 } => {
221 let mut builder = services::Gcs::default().bucket(bucket);
222
223 if let Some(cred_path) = credential_path {
224 builder = builder.credential_path(cred_path);
225 }
226 if let Some(endpoint) = endpoint {
227 builder = builder.endpoint(endpoint);
228 }
229
230 Operator::new(builder)
231 .map(|op| op.layer(Self::retry_layer()).finish())
232 .map_err(|e| DakeraError::Storage(e.to_string()))
233 }
234 }
235 }
236
237 fn retry_layer() -> RetryLayer {
238 let max_times: usize = std::env::var("DAKERA_S3_MAX_RETRIES")
242 .ok()
243 .and_then(|v| v.parse().ok())
244 .unwrap_or(3);
245 let min_delay_ms: u64 = std::env::var("DAKERA_S3_RETRY_MIN_DELAY_MS")
246 .ok()
247 .and_then(|v| v.parse().ok())
248 .unwrap_or(500);
249 let max_delay_secs: u64 = std::env::var("DAKERA_S3_RETRY_MAX_DELAY_SECS")
250 .ok()
251 .and_then(|v| v.parse().ok())
252 .unwrap_or(60);
253
254 tracing::info!(
255 max_times,
256 min_delay_ms,
257 max_delay_secs,
258 "S3 retry layer configured"
259 );
260
261 RetryLayer::new()
262 .with_max_times(max_times)
263 .with_min_delay(Duration::from_millis(min_delay_ms))
264 .with_max_delay(Duration::from_secs(max_delay_secs))
265 .with_jitter()
266 .with_factor(2.0)
267 }
268
269 fn vector_path(namespace: &str, vector_id: &str) -> String {
271 format!("namespaces/{}/vectors/{}.json", namespace, vector_id)
272 }
273
274 fn namespace_meta_path(namespace: &str) -> String {
276 format!("namespaces/{}/meta.json", namespace)
277 }
278
279 fn namespace_vectors_prefix(namespace: &str) -> String {
281 format!("namespaces/{}/vectors/", namespace)
282 }
283
284 async fn read_namespace_meta(&self, namespace: &str) -> Result<Option<NamespaceMetadata>> {
286 let path = Self::namespace_meta_path(namespace);
287 match self.operator.read(&path).await {
288 Ok(data) => {
289 let bytes = data.to_vec();
290 if bytes.is_empty() {
291 tracing::warn!(
292 namespace = %namespace,
293 path = %path,
294 "Empty namespace metadata file detected, treating as missing"
295 );
296 return Ok(None);
297 }
298 match serde_json::from_slice(&bytes) {
299 Ok(meta) => Ok(Some(meta)),
300 Err(e) => {
301 tracing::warn!(
302 namespace = %namespace,
303 path = %path,
304 error = %e,
305 bytes_len = bytes.len(),
306 "Corrupted namespace metadata, treating as missing and will be recreated"
307 );
308 Ok(None)
309 }
310 }
311 }
312 Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(None),
313 Err(e) => Err(DakeraError::Storage(e.to_string())),
314 }
315 }
316
317 async fn write_namespace_meta(&self, namespace: &str, meta: &NamespaceMetadata) -> Result<()> {
319 let path = Self::namespace_meta_path(namespace);
320 let data = serde_json::to_vec(meta).map_err(|e| DakeraError::Storage(e.to_string()))?;
321 self.operator
322 .write(&path, data)
323 .await
324 .map_err(|e| DakeraError::Storage(e.to_string()))?;
325 Ok(())
326 }
327
328 fn now() -> u64 {
330 std::time::SystemTime::now()
331 .duration_since(std::time::UNIX_EPOCH)
332 .unwrap_or_default()
333 .as_secs()
334 }
335}
336
337#[async_trait]
338impl VectorStorage for ObjectStorage {
339 async fn upsert(&self, namespace: &NamespaceId, vectors: Vec<Vector>) -> Result<usize> {
340 if vectors.is_empty() {
341 return Ok(0);
342 }
343
344 let mut meta = self
346 .read_namespace_meta(namespace)
347 .await?
348 .unwrap_or_else(|| NamespaceMetadata {
349 dimension: None,
350 vector_count: 0,
351 created_at: Self::now(),
352 updated_at: Self::now(),
353 });
354
355 let first_dim = vectors[0].values.len();
357 if let Some(dim) = meta.dimension {
358 for v in &vectors {
359 if v.values.len() != dim {
360 return Err(DakeraError::DimensionMismatch {
361 expected: dim,
362 actual: v.values.len(),
363 });
364 }
365 }
366 } else {
367 meta.dimension = Some(first_dim);
368 }
369
370 let mut upserted = 0;
371 for vector in vectors {
372 let path = Self::vector_path(namespace, &vector.id);
373 let stored: StoredVector = vector.into();
374 let data =
375 serde_json::to_vec(&stored).map_err(|e| DakeraError::Storage(e.to_string()))?;
376
377 let exists = self
379 .operator
380 .exists(&path)
381 .await
382 .map_err(|e| DakeraError::Storage(e.to_string()))?;
383
384 self.operator
385 .write(&path, data)
386 .await
387 .map_err(|e| DakeraError::Storage(e.to_string()))?;
388
389 if !exists {
390 meta.vector_count += 1;
391 }
392 upserted += 1;
393 }
394
395 meta.updated_at = Self::now();
396 self.write_namespace_meta(namespace, &meta).await?;
397
398 tracing::debug!(
399 namespace = namespace,
400 upserted = upserted,
401 "Upserted vectors to object storage"
402 );
403
404 Ok(upserted)
405 }
406
407 async fn get(&self, namespace: &NamespaceId, ids: &[VectorId]) -> Result<Vec<Vector>> {
408 if ids.is_empty() {
409 return Ok(Vec::new());
410 }
411
412 let now = std::time::SystemTime::now()
413 .duration_since(std::time::UNIX_EPOCH)
414 .unwrap_or_default()
415 .as_secs();
416
417 let read_tasks: Vec<_> = ids
418 .iter()
419 .map(|id| {
420 let operator = self.operator.clone();
421 let path = Self::vector_path(namespace, id);
422 let id = id.clone();
423 async move {
424 match operator.read(&path).await {
425 Ok(data) => {
426 let bytes = data.to_vec();
427 if bytes.is_empty() {
428 tracing::warn!(
429 vector_id = %id,
430 "Empty vector file detected, skipping"
431 );
432 return Ok(None);
433 }
434 match serde_json::from_slice::<StoredVector>(&bytes) {
435 Ok(stored) => {
436 let vector: Vector = stored.into();
437 if !vector.is_expired_at(now) {
438 Ok(Some(vector))
439 } else {
440 Ok(None)
441 }
442 }
443 Err(e) => {
444 tracing::warn!(
445 vector_id = %id,
446 error = %e,
447 bytes_len = bytes.len(),
448 "Corrupted vector file detected, skipping"
449 );
450 Ok(None)
451 }
452 }
453 }
454 Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(None),
455 Err(e) => Err(DakeraError::Storage(e.to_string())),
456 }
457 }
458 })
459 .collect();
460
461 let results: Vec<Result<Option<Vector>>> = stream::iter(read_tasks)
462 .buffer_unordered(s3_concurrent_ops())
463 .collect()
464 .await;
465
466 let mut vectors = Vec::with_capacity(ids.len());
467 for result in results {
468 if let Some(v) = result? {
469 vectors.push(v);
470 }
471 }
472 Ok(vectors)
473 }
474
475 async fn get_all(&self, namespace: &NamespaceId) -> Result<Vec<Vector>> {
476 let prefix = Self::namespace_vectors_prefix(namespace);
477
478 let entries = self
479 .operator
480 .list(&prefix)
481 .await
482 .map_err(|e| DakeraError::Storage(e.to_string()))?;
483
484 let json_paths: Vec<String> = entries
485 .into_iter()
486 .filter(|e| e.path().ends_with(".json"))
487 .map(|e| e.path().to_string())
488 .collect();
489
490 if json_paths.is_empty() {
491 return Ok(Vec::new());
492 }
493
494 let now = std::time::SystemTime::now()
495 .duration_since(std::time::UNIX_EPOCH)
496 .unwrap_or_default()
497 .as_secs();
498
499 let results: Vec<Option<Vector>> = stream::iter(json_paths.into_iter().map(|path| {
500 let operator = self.operator.clone();
501 async move {
502 match operator.read(&path).await {
503 Ok(data) => {
504 let bytes = data.to_vec();
505 if let Ok(stored) = serde_json::from_slice::<StoredVector>(&bytes) {
506 let vector: Vector = stored.into();
507 if !vector.is_expired_at(now) {
508 return Some(vector);
509 }
510 }
511 None
512 }
513 Err(e) => {
514 tracing::warn!(path = %path, error = %e, "Failed to read vector");
515 None
516 }
517 }
518 }
519 }))
520 .buffer_unordered(s3_concurrent_ops())
521 .collect()
522 .await;
523
524 Ok(results.into_iter().flatten().collect())
525 }
526
527 async fn delete(&self, namespace: &NamespaceId, ids: &[VectorId]) -> Result<usize> {
528 if ids.is_empty() {
529 return Ok(0);
530 }
531
532 let delete_tasks: Vec<_> = ids
533 .iter()
534 .map(|id| {
535 let operator = self.operator.clone();
536 let path = Self::vector_path(namespace, id);
537 async move {
538 let exists = operator
539 .exists(&path)
540 .await
541 .map_err(|e| DakeraError::Storage(e.to_string()))?;
542 if exists {
543 match operator.delete(&path).await {
544 Ok(_) => Ok(true),
545 Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(false),
546 Err(e) => Err(DakeraError::Storage(e.to_string())),
547 }
548 } else {
549 Ok(false)
550 }
551 }
552 })
553 .collect();
554
555 let results: Vec<Result<bool>> = stream::iter(delete_tasks)
556 .buffer_unordered(s3_concurrent_ops())
557 .collect()
558 .await;
559
560 let mut deleted = 0;
561 for result in results {
562 if result? {
563 deleted += 1;
564 }
565 }
566
567 if deleted > 0 {
569 if let Some(mut meta) = self.read_namespace_meta(namespace).await? {
570 meta.vector_count = meta.vector_count.saturating_sub(deleted);
571 meta.updated_at = Self::now();
572 self.write_namespace_meta(namespace, &meta).await?;
573 }
574 }
575
576 Ok(deleted)
577 }
578
579 async fn namespace_exists(&self, namespace: &NamespaceId) -> Result<bool> {
580 Ok(self.read_namespace_meta(namespace).await?.is_some())
581 }
582
583 async fn ensure_namespace(&self, namespace: &NamespaceId) -> Result<()> {
584 if self.read_namespace_meta(namespace).await?.is_none() {
585 let meta = NamespaceMetadata {
586 dimension: None,
587 vector_count: 0,
588 created_at: Self::now(),
589 updated_at: Self::now(),
590 };
591 self.write_namespace_meta(namespace, &meta).await?;
592 }
593 Ok(())
594 }
595
596 async fn dimension(&self, namespace: &NamespaceId) -> Result<Option<usize>> {
597 Ok(self
598 .read_namespace_meta(namespace)
599 .await?
600 .and_then(|m| m.dimension))
601 }
602
603 async fn count(&self, namespace: &NamespaceId) -> Result<usize> {
604 Ok(self
605 .read_namespace_meta(namespace)
606 .await?
607 .map(|m| m.vector_count)
608 .unwrap_or(0))
609 }
610
611 async fn list_namespaces(&self) -> Result<Vec<NamespaceId>> {
612 let entries = self
613 .operator
614 .list("namespaces/")
615 .await
616 .map_err(|e| DakeraError::Storage(e.to_string()))?;
617
618 let mut namespaces = Vec::new();
619 for entry in entries {
620 let path = entry.path();
621 if let Some(ns) = path.strip_prefix("namespaces/") {
623 let ns = ns.trim_end_matches('/');
624 if !ns.is_empty() && !ns.contains('/') {
625 if self.read_namespace_meta(ns).await?.is_some() {
630 namespaces.push(ns.to_string());
631 }
632 }
633 }
634 }
635
636 Ok(namespaces)
637 }
638
639 async fn delete_namespace(&self, namespace: &NamespaceId) -> Result<bool> {
640 if !self.namespace_exists(namespace).await? {
642 return Ok(false);
643 }
644
645 let prefix = format!("namespaces/{}/", namespace);
649 self.operator
650 .delete_with(&prefix)
651 .recursive(true)
652 .await
653 .map_err(|e| DakeraError::Storage(e.to_string()))?;
654
655 tracing::debug!(
656 namespace = namespace,
657 "Deleted namespace from object storage"
658 );
659
660 Ok(true)
661 }
662
663 async fn cleanup_expired(&self, _namespace: &NamespaceId) -> Result<usize> {
664 Ok(0)
667 }
668
669 async fn cleanup_all_expired(&self) -> Result<usize> {
670 Ok(0)
672 }
673}
674
675#[async_trait]
676impl IndexStorage for ObjectStorage {
677 async fn save_index(
678 &self,
679 namespace: &NamespaceId,
680 index_type: IndexType,
681 data: Vec<u8>,
682 ) -> Result<()> {
683 let path = format!(
684 "namespaces/{}/indexes/{}.bin",
685 namespace,
686 index_type.as_str()
687 );
688 self.operator
689 .write(&path, data)
690 .await
691 .map_err(|e| DakeraError::Storage(e.to_string()))?;
692
693 tracing::debug!(
694 namespace = namespace,
695 index_type = index_type.as_str(),
696 "Saved index to object storage"
697 );
698 Ok(())
699 }
700
701 async fn load_index(
702 &self,
703 namespace: &NamespaceId,
704 index_type: IndexType,
705 ) -> Result<Option<Vec<u8>>> {
706 let path = format!(
707 "namespaces/{}/indexes/{}.bin",
708 namespace,
709 index_type.as_str()
710 );
711 match self.operator.read(&path).await {
712 Ok(data) => {
713 tracing::debug!(
714 namespace = namespace,
715 index_type = index_type.as_str(),
716 size = data.len(),
717 "Loaded index from object storage"
718 );
719 Ok(Some(data.to_vec()))
720 }
721 Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(None),
722 Err(e) => Err(DakeraError::Storage(e.to_string())),
723 }
724 }
725
726 async fn delete_index(&self, namespace: &NamespaceId, index_type: IndexType) -> Result<bool> {
727 let path = format!(
728 "namespaces/{}/indexes/{}.bin",
729 namespace,
730 index_type.as_str()
731 );
732 let exists = self
733 .operator
734 .exists(&path)
735 .await
736 .map_err(|e| DakeraError::Storage(e.to_string()))?;
737
738 if exists {
739 self.operator
740 .delete(&path)
741 .await
742 .map_err(|e| DakeraError::Storage(e.to_string()))?;
743 tracing::debug!(
744 namespace = namespace,
745 index_type = index_type.as_str(),
746 "Deleted index from object storage"
747 );
748 }
749 Ok(exists)
750 }
751
752 async fn index_exists(&self, namespace: &NamespaceId, index_type: IndexType) -> Result<bool> {
753 let path = format!(
754 "namespaces/{}/indexes/{}.bin",
755 namespace,
756 index_type.as_str()
757 );
758 self.operator
759 .exists(&path)
760 .await
761 .map_err(|e| DakeraError::Storage(e.to_string()))
762 }
763
764 async fn list_indexes(&self, namespace: &NamespaceId) -> Result<Vec<IndexType>> {
765 let prefix = format!("namespaces/{}/indexes/", namespace);
766 let entries = self
767 .operator
768 .list(&prefix)
769 .await
770 .map_err(|e| DakeraError::Storage(e.to_string()))?;
771
772 let mut indexes = Vec::new();
773 for entry in entries {
774 let path = entry.path();
775 if path.ends_with(".bin") {
776 if let Some(filename) = path.strip_prefix(&prefix) {
778 let name = filename.trim_end_matches(".bin");
779 match name {
780 "hnsw" => indexes.push(IndexType::Hnsw),
781 "pq" => indexes.push(IndexType::Pq),
782 "ivf" => indexes.push(IndexType::Ivf),
783 "spfresh" => indexes.push(IndexType::SpFresh),
784 "fulltext" => indexes.push(IndexType::FullText),
785 _ => {} }
787 }
788 }
789 }
790
791 Ok(indexes)
792 }
793}
794
795pub fn create_operator(config: &ObjectStorageConfig) -> Result<Operator> {
798 ObjectStorage::build_operator(config)
799}
800
801#[cfg(test)]
802mod tests {
803 use super::*;
804
805 #[tokio::test]
806 async fn test_object_storage_memory() {
807 let storage = ObjectStorage::memory().unwrap();
808 let namespace = "test".to_string();
809
810 storage.ensure_namespace(&namespace).await.unwrap();
812 assert!(storage.namespace_exists(&namespace).await.unwrap());
813
814 let vectors = vec![
816 Vector {
817 id: "v1".to_string(),
818 values: vec![1.0, 2.0, 3.0],
819 metadata: None,
820 ttl_seconds: None,
821 expires_at: None,
822 },
823 Vector {
824 id: "v2".to_string(),
825 values: vec![4.0, 5.0, 6.0],
826 metadata: Some(serde_json::json!({"key": "value"})),
827 ttl_seconds: None,
828 expires_at: None,
829 },
830 ];
831
832 let count = storage.upsert(&namespace, vectors).await.unwrap();
833 assert_eq!(count, 2);
834
835 let results = storage.get(&namespace, &["v1".to_string()]).await.unwrap();
837 assert_eq!(results.len(), 1);
838 assert_eq!(results[0].id, "v1");
839 assert_eq!(results[0].values, vec![1.0, 2.0, 3.0]);
840
841 let all = storage.get_all(&namespace).await.unwrap();
843 assert_eq!(all.len(), 2);
844
845 assert_eq!(storage.count(&namespace).await.unwrap(), 2);
847
848 let deleted = storage
850 .delete(&namespace, &["v1".to_string()])
851 .await
852 .unwrap();
853 assert_eq!(deleted, 1);
854 assert!(storage
855 .get(&namespace, &["v1".to_string()])
856 .await
857 .unwrap()
858 .is_empty());
859 assert_eq!(storage.count(&namespace).await.unwrap(), 1);
860 }
861
862 #[tokio::test]
863 async fn test_object_storage_dimension_mismatch() {
864 let storage = ObjectStorage::memory().unwrap();
865 let namespace = "test".to_string();
866 storage.ensure_namespace(&namespace).await.unwrap();
867
868 let v1 = vec![Vector {
870 id: "v1".to_string(),
871 values: vec![1.0, 2.0, 3.0],
872 metadata: None,
873 ttl_seconds: None,
874 expires_at: None,
875 }];
876 storage.upsert(&namespace, v1).await.unwrap();
877
878 let v2 = vec![Vector {
880 id: "v2".to_string(),
881 values: vec![1.0, 2.0], metadata: None,
883 ttl_seconds: None,
884 expires_at: None,
885 }];
886 let result = storage.upsert(&namespace, v2).await;
887 assert!(result.is_err());
888 }
889
890 #[tokio::test]
891 async fn test_object_storage_upsert() {
892 let storage = ObjectStorage::memory().unwrap();
893 let namespace = "test".to_string();
894 storage.ensure_namespace(&namespace).await.unwrap();
895
896 let v1 = vec![Vector {
898 id: "v1".to_string(),
899 values: vec![1.0, 2.0],
900 metadata: None,
901 ttl_seconds: None,
902 expires_at: None,
903 }];
904 storage.upsert(&namespace, v1).await.unwrap();
905
906 let v1_updated = vec![Vector {
908 id: "v1".to_string(),
909 values: vec![3.0, 4.0],
910 metadata: None,
911 ttl_seconds: None,
912 expires_at: None,
913 }];
914 storage.upsert(&namespace, v1_updated).await.unwrap();
915
916 let results = storage.get(&namespace, &["v1".to_string()]).await.unwrap();
918 assert_eq!(results.len(), 1);
919 assert_eq!(results[0].values, vec![3.0, 4.0]);
920
921 assert_eq!(storage.count(&namespace).await.unwrap(), 1);
923 }
924
925 #[tokio::test]
926 async fn test_index_storage() {
927 let storage = ObjectStorage::memory().unwrap();
928 let namespace = "test_index".to_string();
929
930 assert!(!storage
932 .index_exists(&namespace, IndexType::Hnsw)
933 .await
934 .unwrap());
935
936 let index_data = b"fake hnsw index data for testing".to_vec();
938 storage
939 .save_index(&namespace, IndexType::Hnsw, index_data.clone())
940 .await
941 .unwrap();
942
943 assert!(storage
945 .index_exists(&namespace, IndexType::Hnsw)
946 .await
947 .unwrap());
948 assert!(!storage
949 .index_exists(&namespace, IndexType::Pq)
950 .await
951 .unwrap());
952
953 let loaded = storage
955 .load_index(&namespace, IndexType::Hnsw)
956 .await
957 .unwrap();
958 assert!(loaded.is_some());
959 assert_eq!(loaded.unwrap(), index_data);
960
961 let pq_data = b"fake pq index data".to_vec();
963 storage
964 .save_index(&namespace, IndexType::Pq, pq_data)
965 .await
966 .unwrap();
967
968 let indexes = storage.list_indexes(&namespace).await.unwrap();
970 assert_eq!(indexes.len(), 2);
971 assert!(indexes.contains(&IndexType::Hnsw));
972 assert!(indexes.contains(&IndexType::Pq));
973
974 let deleted = storage
976 .delete_index(&namespace, IndexType::Hnsw)
977 .await
978 .unwrap();
979 assert!(deleted);
980 assert!(!storage
981 .index_exists(&namespace, IndexType::Hnsw)
982 .await
983 .unwrap());
984
985 let deleted = storage
987 .delete_index(&namespace, IndexType::Hnsw)
988 .await
989 .unwrap();
990 assert!(!deleted);
991
992 let loaded = storage
994 .load_index(&namespace, IndexType::Hnsw)
995 .await
996 .unwrap();
997 assert!(loaded.is_none());
998 }
999}