1use async_trait::async_trait;
11use common::{DakeraError, NamespaceId, Result, Vector, VectorId};
12use futures_util::stream::{self, StreamExt};
13use opendal::{layers::RetryLayer, services, Operator};
14use serde::{Deserialize, Serialize};
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::time::Duration;
17
18static TMP_COUNTER: AtomicU64 = AtomicU64::new(0);
19
20use crate::traits::{IndexStorage, IndexType, VectorStorage};
21
22fn s3_concurrent_ops() -> usize {
23 std::env::var("DAKERA_S3_CONCURRENT_OPS")
24 .ok()
25 .and_then(|v| v.parse().ok())
26 .unwrap_or(16)
27}
28
29#[derive(Debug, Clone, Serialize, Deserialize)]
31struct NamespaceMetadata {
32 dimension: Option<usize>,
33 vector_count: usize,
34 created_at: u64,
35 updated_at: u64,
36}
37
38#[derive(Debug, Clone, Serialize, Deserialize)]
40struct StoredVector {
41 id: String,
42 values: Vec<f32>,
43 metadata: Option<serde_json::Value>,
44}
45
46impl From<Vector> for StoredVector {
47 fn from(v: Vector) -> Self {
48 Self {
49 id: v.id,
50 values: v.values,
51 metadata: v.metadata,
52 }
53 }
54}
55
56impl From<StoredVector> for Vector {
57 fn from(v: StoredVector) -> Self {
58 Self {
59 id: v.id,
60 values: v.values,
61 metadata: v.metadata,
62 ttl_seconds: None,
63 expires_at: None,
64 }
65 }
66}
67
68#[derive(Debug, Clone, Default)]
70pub enum ObjectStorageConfig {
71 #[default]
73 Memory,
74 Filesystem { root: String },
76 S3 {
78 bucket: String,
79 region: Option<String>,
80 endpoint: Option<String>,
81 access_key_id: Option<String>,
82 secret_access_key: Option<String>,
83 },
84 Azure {
86 container: String,
87 account_name: String,
88 account_key: Option<String>,
89 sas_token: Option<String>,
90 endpoint: Option<String>,
91 },
92 Gcs {
94 bucket: String,
95 credential_path: Option<String>,
96 endpoint: Option<String>,
97 },
98}
99
100#[derive(Clone)]
102pub struct ObjectStorage {
103 operator: Operator,
104}
105
106impl ObjectStorage {
107 pub fn new(config: ObjectStorageConfig) -> Result<Self> {
109 let operator = Self::build_operator(&config)?;
110 Ok(Self { operator })
111 }
112
113 pub fn memory() -> Result<Self> {
115 Self::new(ObjectStorageConfig::Memory)
116 }
117
118 pub fn filesystem(root: impl Into<String>) -> Result<Self> {
120 Self::new(ObjectStorageConfig::Filesystem { root: root.into() })
121 }
122
123 pub fn s3(bucket: impl Into<String>) -> Result<Self> {
125 Self::new(ObjectStorageConfig::S3 {
126 bucket: bucket.into(),
127 region: None,
128 endpoint: None,
129 access_key_id: None,
130 secret_access_key: None,
131 })
132 }
133
134 pub fn azure(container: impl Into<String>, account_name: impl Into<String>) -> Result<Self> {
136 Self::new(ObjectStorageConfig::Azure {
137 container: container.into(),
138 account_name: account_name.into(),
139 account_key: None,
140 sas_token: None,
141 endpoint: None,
142 })
143 }
144
145 pub fn gcs(bucket: impl Into<String>) -> Result<Self> {
147 Self::new(ObjectStorageConfig::Gcs {
148 bucket: bucket.into(),
149 credential_path: None,
150 endpoint: None,
151 })
152 }
153
154 pub fn build_operator(config: &ObjectStorageConfig) -> Result<Operator> {
155 match config {
156 ObjectStorageConfig::Memory => {
157 let builder = services::Memory::default();
158 Operator::new(builder)
159 .map(|op| op.finish())
160 .map_err(|e| DakeraError::Storage(e.to_string()))
161 }
162 ObjectStorageConfig::Filesystem { root } => {
163 let builder = services::Fs::default().root(root);
164 Operator::new(builder)
165 .map(|op| op.layer(Self::retry_layer()).finish())
166 .map_err(|e| DakeraError::Storage(e.to_string()))
167 }
168 ObjectStorageConfig::S3 {
169 bucket,
170 region,
171 endpoint,
172 access_key_id,
173 secret_access_key,
174 } => {
175 let mut builder = services::S3::default().bucket(bucket);
176
177 if let Some(region) = region {
178 builder = builder.region(region);
179 }
180 if let Some(endpoint) = endpoint {
181 builder = builder.endpoint(endpoint);
182 }
183 if let Some(key) = access_key_id {
184 builder = builder.access_key_id(key);
185 }
186 if let Some(secret) = secret_access_key {
187 builder = builder.secret_access_key(secret);
188 }
189
190 Operator::new(builder)
191 .map(|op| op.layer(Self::retry_layer()).finish())
192 .map_err(|e| DakeraError::Storage(e.to_string()))
193 }
194 ObjectStorageConfig::Azure {
195 container,
196 account_name,
197 account_key,
198 sas_token,
199 endpoint,
200 } => {
201 let mut builder = services::Azblob::default()
202 .container(container)
203 .account_name(account_name);
204
205 if let Some(key) = account_key {
206 builder = builder.account_key(key);
207 }
208 if let Some(token) = sas_token {
209 builder = builder.sas_token(token);
210 }
211 if let Some(endpoint) = endpoint {
212 builder = builder.endpoint(endpoint);
213 }
214
215 Operator::new(builder)
216 .map(|op| op.layer(Self::retry_layer()).finish())
217 .map_err(|e| DakeraError::Storage(e.to_string()))
218 }
219 ObjectStorageConfig::Gcs {
220 bucket,
221 credential_path,
222 endpoint,
223 } => {
224 let mut builder = services::Gcs::default().bucket(bucket);
225
226 if let Some(cred_path) = credential_path {
227 builder = builder.credential_path(cred_path);
228 }
229 if let Some(endpoint) = endpoint {
230 builder = builder.endpoint(endpoint);
231 }
232
233 Operator::new(builder)
234 .map(|op| op.layer(Self::retry_layer()).finish())
235 .map_err(|e| DakeraError::Storage(e.to_string()))
236 }
237 }
238 }
239
240 fn retry_layer() -> RetryLayer {
241 let max_times: usize = std::env::var("DAKERA_S3_MAX_RETRIES")
245 .ok()
246 .and_then(|v| v.parse().ok())
247 .unwrap_or(3);
248 let min_delay_ms: u64 = std::env::var("DAKERA_S3_RETRY_MIN_DELAY_MS")
249 .ok()
250 .and_then(|v| v.parse().ok())
251 .unwrap_or(500);
252 let max_delay_secs: u64 = std::env::var("DAKERA_S3_RETRY_MAX_DELAY_SECS")
253 .ok()
254 .and_then(|v| v.parse().ok())
255 .unwrap_or(60);
256
257 tracing::info!(
258 max_times,
259 min_delay_ms,
260 max_delay_secs,
261 "S3 retry layer configured"
262 );
263
264 RetryLayer::new()
265 .with_max_times(max_times)
266 .with_min_delay(Duration::from_millis(min_delay_ms))
267 .with_max_delay(Duration::from_secs(max_delay_secs))
268 .with_jitter()
269 .with_factor(2.0)
270 }
271
272 fn vector_path(namespace: &str, vector_id: &str) -> String {
274 format!("namespaces/{}/vectors/{}.json", namespace, vector_id)
275 }
276
277 fn namespace_meta_path(namespace: &str) -> String {
279 format!("namespaces/{}/meta.json", namespace)
280 }
281
282 fn namespace_vectors_prefix(namespace: &str) -> String {
284 format!("namespaces/{}/vectors/", namespace)
285 }
286
287 async fn read_namespace_meta(&self, namespace: &str) -> Result<Option<NamespaceMetadata>> {
289 let path = Self::namespace_meta_path(namespace);
290 match self.operator.read(&path).await {
291 Ok(data) => {
292 let bytes = data.to_vec();
293 if bytes.is_empty() {
294 tracing::warn!(
295 namespace = %namespace,
296 path = %path,
297 "Empty namespace metadata file detected, treating as missing"
298 );
299 return Ok(None);
300 }
301 match serde_json::from_slice(&bytes) {
302 Ok(meta) => Ok(Some(meta)),
303 Err(e) => {
304 tracing::warn!(
305 namespace = %namespace,
306 path = %path,
307 error = %e,
308 bytes_len = bytes.len(),
309 "Corrupted namespace metadata, treating as missing and will be recreated"
310 );
311 Ok(None)
312 }
313 }
314 }
315 Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(None),
316 Err(e) => Err(DakeraError::Storage(e.to_string())),
317 }
318 }
319
320 async fn write_namespace_meta(&self, namespace: &str, meta: &NamespaceMetadata) -> Result<()> {
324 let path = Self::namespace_meta_path(namespace);
325 let data = serde_json::to_vec(meta).map_err(|e| DakeraError::Storage(e.to_string()))?;
326 self.write_atomic(&path, data).await
327 }
328
329 async fn write_atomic(&self, path: &str, data: Vec<u8>) -> Result<()> {
337 let seq = TMP_COUNTER.fetch_add(1, Ordering::Relaxed);
338 let tmp_path = format!("{}.tmp.{}.{}", path, Self::now(), seq);
339 let rename_result = async {
340 self.operator
341 .write(&tmp_path, data.clone())
342 .await
343 .map_err(|e| DakeraError::Storage(e.to_string()))?;
344 self.operator
345 .rename(&tmp_path, path)
346 .await
347 .map_err(|e| DakeraError::Storage(e.to_string()))?;
348 Ok::<(), DakeraError>(())
349 }
350 .await;
351 if let Err(e) = rename_result {
352 tracing::debug!(path = %path, error = %e, "atomic rename failed, falling back to direct write");
354 let _ = self.operator.delete(&tmp_path).await;
355 self.operator
356 .write(path, data)
357 .await
358 .map_err(|e| DakeraError::Storage(e.to_string()))?;
359 }
360 Ok(())
361 }
362
363 fn now() -> u64 {
365 std::time::SystemTime::now()
366 .duration_since(std::time::UNIX_EPOCH)
367 .unwrap_or_default()
368 .as_secs()
369 }
370}
371
372#[async_trait]
373impl VectorStorage for ObjectStorage {
374 async fn upsert(&self, namespace: &NamespaceId, vectors: Vec<Vector>) -> Result<usize> {
375 if vectors.is_empty() {
376 return Ok(0);
377 }
378
379 let mut meta = self
381 .read_namespace_meta(namespace)
382 .await?
383 .unwrap_or_else(|| NamespaceMetadata {
384 dimension: None,
385 vector_count: 0,
386 created_at: Self::now(),
387 updated_at: Self::now(),
388 });
389
390 let first_dim = vectors[0].values.len();
392 if let Some(dim) = meta.dimension {
393 for v in &vectors {
394 if v.values.len() != dim {
395 return Err(DakeraError::DimensionMismatch {
396 expected: dim,
397 actual: v.values.len(),
398 });
399 }
400 }
401 } else {
402 meta.dimension = Some(first_dim);
403 }
404
405 let mut upserted = 0;
406 for vector in vectors {
407 let path = Self::vector_path(namespace, &vector.id);
408 let stored: StoredVector = vector.into();
409 let data =
410 serde_json::to_vec(&stored).map_err(|e| DakeraError::Storage(e.to_string()))?;
411
412 let exists = self
414 .operator
415 .exists(&path)
416 .await
417 .map_err(|e| DakeraError::Storage(e.to_string()))?;
418
419 self.write_atomic(&path, data).await?;
420
421 if !exists {
422 meta.vector_count += 1;
423 }
424 upserted += 1;
425 }
426
427 meta.updated_at = Self::now();
428 self.write_namespace_meta(namespace, &meta).await?;
429
430 tracing::debug!(
431 namespace = namespace,
432 upserted = upserted,
433 "Upserted vectors to object storage"
434 );
435
436 Ok(upserted)
437 }
438
439 async fn get(&self, namespace: &NamespaceId, ids: &[VectorId]) -> Result<Vec<Vector>> {
440 if ids.is_empty() {
441 return Ok(Vec::new());
442 }
443
444 let now = std::time::SystemTime::now()
445 .duration_since(std::time::UNIX_EPOCH)
446 .unwrap_or_default()
447 .as_secs();
448
449 let read_tasks: Vec<_> = ids
450 .iter()
451 .map(|id| {
452 let operator = self.operator.clone();
453 let path = Self::vector_path(namespace, id);
454 let id = id.clone();
455 async move {
456 match operator.read(&path).await {
457 Ok(data) => {
458 let bytes = data.to_vec();
459 if bytes.is_empty() {
460 tracing::warn!(
461 vector_id = %id,
462 "Empty vector file detected, skipping"
463 );
464 return Ok(None);
465 }
466 match serde_json::from_slice::<StoredVector>(&bytes) {
467 Ok(stored) => {
468 let vector: Vector = stored.into();
469 if !vector.is_expired_at(now) {
470 Ok(Some(vector))
471 } else {
472 Ok(None)
473 }
474 }
475 Err(e) => {
476 tracing::warn!(
477 vector_id = %id,
478 error = %e,
479 bytes_len = bytes.len(),
480 "Corrupted vector file detected, skipping"
481 );
482 Ok(None)
483 }
484 }
485 }
486 Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(None),
487 Err(e) => Err(DakeraError::Storage(e.to_string())),
488 }
489 }
490 })
491 .collect();
492
493 let results: Vec<Result<Option<Vector>>> = stream::iter(read_tasks)
494 .buffer_unordered(s3_concurrent_ops())
495 .collect()
496 .await;
497
498 let mut vectors = Vec::with_capacity(ids.len());
499 for result in results {
500 if let Some(v) = result? {
501 vectors.push(v);
502 }
503 }
504 Ok(vectors)
505 }
506
507 async fn get_all(&self, namespace: &NamespaceId) -> Result<Vec<Vector>> {
508 let prefix = Self::namespace_vectors_prefix(namespace);
509
510 let entries = self
511 .operator
512 .list(&prefix)
513 .await
514 .map_err(|e| DakeraError::Storage(e.to_string()))?;
515
516 let json_paths: Vec<String> = entries
517 .into_iter()
518 .filter(|e| e.path().ends_with(".json"))
519 .map(|e| e.path().to_string())
520 .collect();
521
522 if json_paths.is_empty() {
523 return Ok(Vec::new());
524 }
525
526 let now = std::time::SystemTime::now()
527 .duration_since(std::time::UNIX_EPOCH)
528 .unwrap_or_default()
529 .as_secs();
530
531 let results: Vec<Option<Vector>> = stream::iter(json_paths.into_iter().map(|path| {
532 let operator = self.operator.clone();
533 async move {
534 match operator.read(&path).await {
535 Ok(data) => {
536 let bytes = data.to_vec();
537 if let Ok(stored) = serde_json::from_slice::<StoredVector>(&bytes) {
538 let vector: Vector = stored.into();
539 if !vector.is_expired_at(now) {
540 return Some(vector);
541 }
542 }
543 None
544 }
545 Err(e) => {
546 tracing::warn!(path = %path, error = %e, "Failed to read vector");
547 None
548 }
549 }
550 }
551 }))
552 .buffer_unordered(s3_concurrent_ops())
553 .collect()
554 .await;
555
556 Ok(results.into_iter().flatten().collect())
557 }
558
559 async fn delete(&self, namespace: &NamespaceId, ids: &[VectorId]) -> Result<usize> {
560 if ids.is_empty() {
561 return Ok(0);
562 }
563
564 let delete_tasks: Vec<_> = ids
565 .iter()
566 .map(|id| {
567 let operator = self.operator.clone();
568 let path = Self::vector_path(namespace, id);
569 async move {
570 let exists = operator
571 .exists(&path)
572 .await
573 .map_err(|e| DakeraError::Storage(e.to_string()))?;
574 if exists {
575 match operator.delete(&path).await {
576 Ok(_) => Ok(true),
577 Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(false),
578 Err(e) => Err(DakeraError::Storage(e.to_string())),
579 }
580 } else {
581 Ok(false)
582 }
583 }
584 })
585 .collect();
586
587 let results: Vec<Result<bool>> = stream::iter(delete_tasks)
588 .buffer_unordered(s3_concurrent_ops())
589 .collect()
590 .await;
591
592 let mut deleted = 0;
593 for result in results {
594 if result? {
595 deleted += 1;
596 }
597 }
598
599 if deleted > 0 {
601 if let Some(mut meta) = self.read_namespace_meta(namespace).await? {
602 meta.vector_count = meta.vector_count.saturating_sub(deleted);
603 meta.updated_at = Self::now();
604 self.write_namespace_meta(namespace, &meta).await?;
605 }
606 }
607
608 Ok(deleted)
609 }
610
611 async fn namespace_exists(&self, namespace: &NamespaceId) -> Result<bool> {
612 Ok(self.read_namespace_meta(namespace).await?.is_some())
613 }
614
615 async fn ensure_namespace(&self, namespace: &NamespaceId) -> Result<()> {
616 if self.read_namespace_meta(namespace).await?.is_none() {
617 let meta = NamespaceMetadata {
618 dimension: None,
619 vector_count: 0,
620 created_at: Self::now(),
621 updated_at: Self::now(),
622 };
623 self.write_namespace_meta(namespace, &meta).await?;
624 }
625 Ok(())
626 }
627
628 async fn dimension(&self, namespace: &NamespaceId) -> Result<Option<usize>> {
629 Ok(self
630 .read_namespace_meta(namespace)
631 .await?
632 .and_then(|m| m.dimension))
633 }
634
635 async fn count(&self, namespace: &NamespaceId) -> Result<usize> {
636 Ok(self
637 .read_namespace_meta(namespace)
638 .await?
639 .map(|m| m.vector_count)
640 .unwrap_or(0))
641 }
642
643 async fn list_namespaces(&self) -> Result<Vec<NamespaceId>> {
644 let entries = self
645 .operator
646 .list("namespaces/")
647 .await
648 .map_err(|e| DakeraError::Storage(e.to_string()))?;
649
650 let mut namespaces = Vec::new();
651 for entry in entries {
652 let path = entry.path();
653 if let Some(ns) = path.strip_prefix("namespaces/") {
655 let ns = ns.trim_end_matches('/');
656 if !ns.is_empty() && !ns.contains('/') {
657 if self.read_namespace_meta(ns).await?.is_some() {
662 namespaces.push(ns.to_string());
663 }
664 }
665 }
666 }
667
668 Ok(namespaces)
669 }
670
671 async fn delete_namespace(&self, namespace: &NamespaceId) -> Result<bool> {
672 if !self.namespace_exists(namespace).await? {
674 return Ok(false);
675 }
676
677 let prefix = format!("namespaces/{}/", namespace);
681 self.operator
682 .delete_with(&prefix)
683 .recursive(true)
684 .await
685 .map_err(|e| DakeraError::Storage(e.to_string()))?;
686
687 tracing::debug!(
688 namespace = namespace,
689 "Deleted namespace from object storage"
690 );
691
692 Ok(true)
693 }
694
695 async fn cleanup_expired(&self, _namespace: &NamespaceId) -> Result<usize> {
696 Ok(0)
699 }
700
701 async fn cleanup_all_expired(&self) -> Result<usize> {
702 Ok(0)
704 }
705}
706
707#[async_trait]
708impl IndexStorage for ObjectStorage {
709 async fn save_index(
710 &self,
711 namespace: &NamespaceId,
712 index_type: IndexType,
713 data: Vec<u8>,
714 ) -> Result<()> {
715 let path = format!(
716 "namespaces/{}/indexes/{}.bin",
717 namespace,
718 index_type.as_str()
719 );
720 self.operator
721 .write(&path, data)
722 .await
723 .map_err(|e| DakeraError::Storage(e.to_string()))?;
724
725 tracing::debug!(
726 namespace = namespace,
727 index_type = index_type.as_str(),
728 "Saved index to object storage"
729 );
730 Ok(())
731 }
732
733 async fn load_index(
734 &self,
735 namespace: &NamespaceId,
736 index_type: IndexType,
737 ) -> Result<Option<Vec<u8>>> {
738 let path = format!(
739 "namespaces/{}/indexes/{}.bin",
740 namespace,
741 index_type.as_str()
742 );
743 match self.operator.read(&path).await {
744 Ok(data) => {
745 tracing::debug!(
746 namespace = namespace,
747 index_type = index_type.as_str(),
748 size = data.len(),
749 "Loaded index from object storage"
750 );
751 Ok(Some(data.to_vec()))
752 }
753 Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(None),
754 Err(e) => Err(DakeraError::Storage(e.to_string())),
755 }
756 }
757
758 async fn delete_index(&self, namespace: &NamespaceId, index_type: IndexType) -> Result<bool> {
759 let path = format!(
760 "namespaces/{}/indexes/{}.bin",
761 namespace,
762 index_type.as_str()
763 );
764 let exists = self
765 .operator
766 .exists(&path)
767 .await
768 .map_err(|e| DakeraError::Storage(e.to_string()))?;
769
770 if exists {
771 self.operator
772 .delete(&path)
773 .await
774 .map_err(|e| DakeraError::Storage(e.to_string()))?;
775 tracing::debug!(
776 namespace = namespace,
777 index_type = index_type.as_str(),
778 "Deleted index from object storage"
779 );
780 }
781 Ok(exists)
782 }
783
784 async fn index_exists(&self, namespace: &NamespaceId, index_type: IndexType) -> Result<bool> {
785 let path = format!(
786 "namespaces/{}/indexes/{}.bin",
787 namespace,
788 index_type.as_str()
789 );
790 self.operator
791 .exists(&path)
792 .await
793 .map_err(|e| DakeraError::Storage(e.to_string()))
794 }
795
796 async fn list_indexes(&self, namespace: &NamespaceId) -> Result<Vec<IndexType>> {
797 let prefix = format!("namespaces/{}/indexes/", namespace);
798 let entries = self
799 .operator
800 .list(&prefix)
801 .await
802 .map_err(|e| DakeraError::Storage(e.to_string()))?;
803
804 let mut indexes = Vec::new();
805 for entry in entries {
806 let path = entry.path();
807 if path.ends_with(".bin") {
808 if let Some(filename) = path.strip_prefix(&prefix) {
810 let name = filename.trim_end_matches(".bin");
811 match name {
812 "hnsw" => indexes.push(IndexType::Hnsw),
813 "pq" => indexes.push(IndexType::Pq),
814 "ivf" => indexes.push(IndexType::Ivf),
815 "spfresh" => indexes.push(IndexType::SpFresh),
816 "fulltext" => indexes.push(IndexType::FullText),
817 _ => {} }
819 }
820 }
821 }
822
823 Ok(indexes)
824 }
825}
826
827pub fn create_operator(config: &ObjectStorageConfig) -> Result<Operator> {
830 ObjectStorage::build_operator(config)
831}
832
833#[cfg(test)]
834mod tests {
835 use super::*;
836
837 #[tokio::test]
838 async fn test_object_storage_memory() {
839 let storage = ObjectStorage::memory().unwrap();
840 let namespace = "test".to_string();
841
842 storage.ensure_namespace(&namespace).await.unwrap();
844 assert!(storage.namespace_exists(&namespace).await.unwrap());
845
846 let vectors = vec![
848 Vector {
849 id: "v1".to_string(),
850 values: vec![1.0, 2.0, 3.0],
851 metadata: None,
852 ttl_seconds: None,
853 expires_at: None,
854 },
855 Vector {
856 id: "v2".to_string(),
857 values: vec![4.0, 5.0, 6.0],
858 metadata: Some(serde_json::json!({"key": "value"})),
859 ttl_seconds: None,
860 expires_at: None,
861 },
862 ];
863
864 let count = storage.upsert(&namespace, vectors).await.unwrap();
865 assert_eq!(count, 2);
866
867 let results = storage.get(&namespace, &["v1".to_string()]).await.unwrap();
869 assert_eq!(results.len(), 1);
870 assert_eq!(results[0].id, "v1");
871 assert_eq!(results[0].values, vec![1.0, 2.0, 3.0]);
872
873 let all = storage.get_all(&namespace).await.unwrap();
875 assert_eq!(all.len(), 2);
876
877 assert_eq!(storage.count(&namespace).await.unwrap(), 2);
879
880 let deleted = storage
882 .delete(&namespace, &["v1".to_string()])
883 .await
884 .unwrap();
885 assert_eq!(deleted, 1);
886 assert!(storage
887 .get(&namespace, &["v1".to_string()])
888 .await
889 .unwrap()
890 .is_empty());
891 assert_eq!(storage.count(&namespace).await.unwrap(), 1);
892 }
893
894 #[tokio::test]
895 async fn test_object_storage_dimension_mismatch() {
896 let storage = ObjectStorage::memory().unwrap();
897 let namespace = "test".to_string();
898 storage.ensure_namespace(&namespace).await.unwrap();
899
900 let v1 = vec![Vector {
902 id: "v1".to_string(),
903 values: vec![1.0, 2.0, 3.0],
904 metadata: None,
905 ttl_seconds: None,
906 expires_at: None,
907 }];
908 storage.upsert(&namespace, v1).await.unwrap();
909
910 let v2 = vec![Vector {
912 id: "v2".to_string(),
913 values: vec![1.0, 2.0], metadata: None,
915 ttl_seconds: None,
916 expires_at: None,
917 }];
918 let result = storage.upsert(&namespace, v2).await;
919 assert!(result.is_err());
920 }
921
922 #[tokio::test]
923 async fn test_object_storage_upsert() {
924 let storage = ObjectStorage::memory().unwrap();
925 let namespace = "test".to_string();
926 storage.ensure_namespace(&namespace).await.unwrap();
927
928 let v1 = vec![Vector {
930 id: "v1".to_string(),
931 values: vec![1.0, 2.0],
932 metadata: None,
933 ttl_seconds: None,
934 expires_at: None,
935 }];
936 storage.upsert(&namespace, v1).await.unwrap();
937
938 let v1_updated = vec![Vector {
940 id: "v1".to_string(),
941 values: vec![3.0, 4.0],
942 metadata: None,
943 ttl_seconds: None,
944 expires_at: None,
945 }];
946 storage.upsert(&namespace, v1_updated).await.unwrap();
947
948 let results = storage.get(&namespace, &["v1".to_string()]).await.unwrap();
950 assert_eq!(results.len(), 1);
951 assert_eq!(results[0].values, vec![3.0, 4.0]);
952
953 assert_eq!(storage.count(&namespace).await.unwrap(), 1);
955 }
956
957 #[tokio::test]
958 async fn test_index_storage() {
959 let storage = ObjectStorage::memory().unwrap();
960 let namespace = "test_index".to_string();
961
962 assert!(!storage
964 .index_exists(&namespace, IndexType::Hnsw)
965 .await
966 .unwrap());
967
968 let index_data = b"fake hnsw index data for testing".to_vec();
970 storage
971 .save_index(&namespace, IndexType::Hnsw, index_data.clone())
972 .await
973 .unwrap();
974
975 assert!(storage
977 .index_exists(&namespace, IndexType::Hnsw)
978 .await
979 .unwrap());
980 assert!(!storage
981 .index_exists(&namespace, IndexType::Pq)
982 .await
983 .unwrap());
984
985 let loaded = storage
987 .load_index(&namespace, IndexType::Hnsw)
988 .await
989 .unwrap();
990 assert!(loaded.is_some());
991 assert_eq!(loaded.unwrap(), index_data);
992
993 let pq_data = b"fake pq index data".to_vec();
995 storage
996 .save_index(&namespace, IndexType::Pq, pq_data)
997 .await
998 .unwrap();
999
1000 let indexes = storage.list_indexes(&namespace).await.unwrap();
1002 assert_eq!(indexes.len(), 2);
1003 assert!(indexes.contains(&IndexType::Hnsw));
1004 assert!(indexes.contains(&IndexType::Pq));
1005
1006 let deleted = storage
1008 .delete_index(&namespace, IndexType::Hnsw)
1009 .await
1010 .unwrap();
1011 assert!(deleted);
1012 assert!(!storage
1013 .index_exists(&namespace, IndexType::Hnsw)
1014 .await
1015 .unwrap());
1016
1017 let deleted = storage
1019 .delete_index(&namespace, IndexType::Hnsw)
1020 .await
1021 .unwrap();
1022 assert!(!deleted);
1023
1024 let loaded = storage
1026 .load_index(&namespace, IndexType::Hnsw)
1027 .await
1028 .unwrap();
1029 assert!(loaded.is_none());
1030 }
1031}