1use async_trait::async_trait;
11use common::{DakeraError, NamespaceId, Result, Vector, VectorId};
12use futures_util::stream::{self, StreamExt};
13use opendal::{layers::RetryLayer, services, Operator};
14use serde::{Deserialize, Serialize};
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::time::Duration;
17
18static TMP_COUNTER: AtomicU64 = AtomicU64::new(0);
19
20use crate::traits::{IndexStorage, IndexType, VectorStorage};
21
22fn s3_concurrent_ops() -> usize {
23 std::env::var("DAKERA_S3_CONCURRENT_OPS")
24 .ok()
25 .and_then(|v| v.parse().ok())
26 .unwrap_or(16)
27}
28
29#[derive(Debug, Clone, Serialize, Deserialize)]
31struct NamespaceMetadata {
32 dimension: Option<usize>,
33 vector_count: usize,
34 created_at: u64,
35 updated_at: u64,
36}
37
38#[derive(Debug, Clone, Serialize, Deserialize)]
40struct StoredVector {
41 id: String,
42 values: Vec<f32>,
43 metadata: Option<serde_json::Value>,
44}
45
46impl From<Vector> for StoredVector {
47 fn from(v: Vector) -> Self {
48 Self {
49 id: v.id,
50 values: v.values,
51 metadata: v.metadata,
52 }
53 }
54}
55
56impl From<StoredVector> for Vector {
57 fn from(v: StoredVector) -> Self {
58 Self {
59 id: v.id,
60 values: v.values,
61 metadata: v.metadata,
62 ttl_seconds: None,
63 expires_at: None,
64 }
65 }
66}
67
68#[derive(Debug, Clone, Default)]
70pub enum ObjectStorageConfig {
71 #[default]
73 Memory,
74 Filesystem { root: String },
76 S3 {
78 bucket: String,
79 region: Option<String>,
80 endpoint: Option<String>,
81 access_key_id: Option<String>,
82 secret_access_key: Option<String>,
83 },
84 Azure {
86 container: String,
87 account_name: String,
88 account_key: Option<String>,
89 sas_token: Option<String>,
90 endpoint: Option<String>,
91 },
92 Gcs {
94 bucket: String,
95 credential_path: Option<String>,
96 endpoint: Option<String>,
97 },
98}
99
100#[derive(Clone)]
102pub struct ObjectStorage {
103 operator: Operator,
104}
105
106impl ObjectStorage {
107 pub fn new(config: ObjectStorageConfig) -> Result<Self> {
109 let operator = Self::build_operator(&config)?;
110 Ok(Self { operator })
111 }
112
113 pub fn memory() -> Result<Self> {
115 Self::new(ObjectStorageConfig::Memory)
116 }
117
118 pub fn filesystem(root: impl Into<String>) -> Result<Self> {
120 Self::new(ObjectStorageConfig::Filesystem { root: root.into() })
121 }
122
123 pub fn s3(bucket: impl Into<String>) -> Result<Self> {
125 Self::new(ObjectStorageConfig::S3 {
126 bucket: bucket.into(),
127 region: None,
128 endpoint: None,
129 access_key_id: None,
130 secret_access_key: None,
131 })
132 }
133
134 pub fn azure(container: impl Into<String>, account_name: impl Into<String>) -> Result<Self> {
136 Self::new(ObjectStorageConfig::Azure {
137 container: container.into(),
138 account_name: account_name.into(),
139 account_key: None,
140 sas_token: None,
141 endpoint: None,
142 })
143 }
144
145 pub fn gcs(bucket: impl Into<String>) -> Result<Self> {
147 Self::new(ObjectStorageConfig::Gcs {
148 bucket: bucket.into(),
149 credential_path: None,
150 endpoint: None,
151 })
152 }
153
154 pub fn build_operator(config: &ObjectStorageConfig) -> Result<Operator> {
155 match config {
156 ObjectStorageConfig::Memory => {
157 let builder = services::Memory::default();
158 Operator::new(builder)
159 .map(|op| op.finish())
160 .map_err(|e| DakeraError::Storage(e.to_string()))
161 }
162 ObjectStorageConfig::Filesystem { root } => {
163 let builder = services::Fs::default().root(root);
164 Operator::new(builder)
165 .map(|op| op.layer(Self::retry_layer()).finish())
166 .map_err(|e| DakeraError::Storage(e.to_string()))
167 }
168 ObjectStorageConfig::S3 {
169 bucket,
170 region,
171 endpoint,
172 access_key_id,
173 secret_access_key,
174 } => {
175 let mut builder = services::S3::default().bucket(bucket);
176
177 if let Some(region) = region {
178 builder = builder.region(region);
179 }
180 if let Some(endpoint) = endpoint {
181 builder = builder.endpoint(endpoint);
182 }
183 if let Some(key) = access_key_id {
184 builder = builder.access_key_id(key);
185 }
186 if let Some(secret) = secret_access_key {
187 builder = builder.secret_access_key(secret);
188 }
189
190 Operator::new(builder)
191 .map(|op| op.layer(Self::retry_layer()).finish())
192 .map_err(|e| DakeraError::Storage(e.to_string()))
193 }
194 ObjectStorageConfig::Azure {
195 container,
196 account_name,
197 account_key,
198 sas_token,
199 endpoint,
200 } => {
201 let mut builder = services::Azblob::default()
202 .container(container)
203 .account_name(account_name);
204
205 if let Some(key) = account_key {
206 builder = builder.account_key(key);
207 }
208 if let Some(token) = sas_token {
209 builder = builder.sas_token(token);
210 }
211 if let Some(endpoint) = endpoint {
212 builder = builder.endpoint(endpoint);
213 }
214
215 Operator::new(builder)
216 .map(|op| op.layer(Self::retry_layer()).finish())
217 .map_err(|e| DakeraError::Storage(e.to_string()))
218 }
219 ObjectStorageConfig::Gcs {
220 bucket,
221 credential_path,
222 endpoint,
223 } => {
224 let mut builder = services::Gcs::default().bucket(bucket);
225
226 if let Some(cred_path) = credential_path {
227 builder = builder.credential_path(cred_path);
228 }
229 if let Some(endpoint) = endpoint {
230 builder = builder.endpoint(endpoint);
231 }
232
233 Operator::new(builder)
234 .map(|op| op.layer(Self::retry_layer()).finish())
235 .map_err(|e| DakeraError::Storage(e.to_string()))
236 }
237 }
238 }
239
240 fn retry_layer() -> RetryLayer {
241 let max_times: usize = std::env::var("DAKERA_S3_MAX_RETRIES")
245 .ok()
246 .and_then(|v| v.parse().ok())
247 .unwrap_or(3);
248 let min_delay_ms: u64 = std::env::var("DAKERA_S3_RETRY_MIN_DELAY_MS")
249 .ok()
250 .and_then(|v| v.parse().ok())
251 .unwrap_or(500);
252 let max_delay_secs: u64 = std::env::var("DAKERA_S3_RETRY_MAX_DELAY_SECS")
253 .ok()
254 .and_then(|v| v.parse().ok())
255 .unwrap_or(60);
256
257 tracing::info!(
258 max_times,
259 min_delay_ms,
260 max_delay_secs,
261 "S3 retry layer configured"
262 );
263
264 RetryLayer::new()
265 .with_max_times(max_times)
266 .with_min_delay(Duration::from_millis(min_delay_ms))
267 .with_max_delay(Duration::from_secs(max_delay_secs))
268 .with_jitter()
269 .with_factor(2.0)
270 }
271
272 fn vector_path(namespace: &str, vector_id: &str) -> String {
274 format!("namespaces/{}/vectors/{}.json", namespace, vector_id)
275 }
276
277 fn namespace_meta_path(namespace: &str) -> String {
279 format!("namespaces/{}/meta.json", namespace)
280 }
281
282 fn namespace_vectors_prefix(namespace: &str) -> String {
284 format!("namespaces/{}/vectors/", namespace)
285 }
286
287 async fn read_namespace_meta(&self, namespace: &str) -> Result<Option<NamespaceMetadata>> {
289 let path = Self::namespace_meta_path(namespace);
290 match self.operator.read(&path).await {
291 Ok(data) => {
292 let bytes = data.to_vec();
293 if bytes.is_empty() {
294 tracing::warn!(
295 namespace = %namespace,
296 path = %path,
297 "Empty namespace metadata file detected, treating as missing"
298 );
299 return Ok(None);
300 }
301 match serde_json::from_slice(&bytes) {
302 Ok(meta) => Ok(Some(meta)),
303 Err(e) => {
304 tracing::warn!(
305 namespace = %namespace,
306 path = %path,
307 error = %e,
308 bytes_len = bytes.len(),
309 "Corrupted namespace metadata, treating as missing and will be recreated"
310 );
311 Ok(None)
312 }
313 }
314 }
315 Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(None),
316 Err(e) => Err(DakeraError::Storage(e.to_string())),
317 }
318 }
319
320 async fn write_namespace_meta(&self, namespace: &str, meta: &NamespaceMetadata) -> Result<()> {
324 let path = Self::namespace_meta_path(namespace);
325 let data = serde_json::to_vec(meta).map_err(|e| DakeraError::Storage(e.to_string()))?;
326 self.write_atomic(&path, data).await
327 }
328
329 async fn write_atomic(&self, path: &str, data: Vec<u8>) -> Result<()> {
337 let seq = TMP_COUNTER.fetch_add(1, Ordering::Relaxed);
338 let tmp_path = format!("{}.tmp.{}.{}", path, Self::now(), seq);
339 let rename_result = async {
340 self.operator
341 .write(&tmp_path, data.clone())
342 .await
343 .map_err(|e| DakeraError::Storage(e.to_string()))?;
344 self.operator
345 .rename(&tmp_path, path)
346 .await
347 .map_err(|e| DakeraError::Storage(e.to_string()))?;
348 Ok::<(), DakeraError>(())
349 }
350 .await;
351 if let Err(e) = rename_result {
352 tracing::debug!(path = %path, error = %e, "atomic rename failed, falling back to direct write");
354 let _ = self.operator.delete(&tmp_path).await;
355 self.operator
356 .write(path, data)
357 .await
358 .map_err(|e| DakeraError::Storage(e.to_string()))?;
359 }
360 Ok(())
361 }
362
363 fn now() -> u64 {
365 std::time::SystemTime::now()
366 .duration_since(std::time::UNIX_EPOCH)
367 .unwrap_or_default()
368 .as_secs()
369 }
370}
371
372#[async_trait]
373impl VectorStorage for ObjectStorage {
374 async fn upsert(&self, namespace: &NamespaceId, vectors: Vec<Vector>) -> Result<usize> {
375 if vectors.is_empty() {
376 return Ok(0);
377 }
378
379 let mut meta = self
381 .read_namespace_meta(namespace)
382 .await?
383 .unwrap_or_else(|| NamespaceMetadata {
384 dimension: None,
385 vector_count: 0,
386 created_at: Self::now(),
387 updated_at: Self::now(),
388 });
389
390 let first_dim = vectors[0].values.len();
392 if let Some(dim) = meta.dimension {
393 for v in &vectors {
394 if v.values.len() != dim {
395 return Err(DakeraError::DimensionMismatch {
396 expected: dim,
397 actual: v.values.len(),
398 });
399 }
400 }
401 } else {
402 meta.dimension = Some(first_dim);
403 }
404
405 let total = vectors.len();
408 let op = self.operator.clone();
409 let ns = namespace.clone();
410
411 let results: Vec<Result<bool>> = stream::iter(vectors)
412 .map(|vector| {
413 let op = op.clone();
414 let ns = ns.clone();
415 async move {
416 let path = ObjectStorage::vector_path(&ns, &vector.id);
417 let stored: StoredVector = vector.into();
418 let data = serde_json::to_vec(&stored)
419 .map_err(|e| DakeraError::Storage(e.to_string()))?;
420
421 let exists = op
422 .exists(&path)
423 .await
424 .map_err(|e| DakeraError::Storage(e.to_string()))?;
425
426 let seq = TMP_COUNTER.fetch_add(1, Ordering::Relaxed);
428 let now_secs = std::time::SystemTime::now()
429 .duration_since(std::time::UNIX_EPOCH)
430 .unwrap_or_default()
431 .as_secs();
432 let tmp_path = format!("{}.tmp.{}.{}", path, now_secs, seq);
433 let rename_result = async {
434 op.write(&tmp_path, data.clone())
435 .await
436 .map_err(|e| DakeraError::Storage(e.to_string()))?;
437 op.rename(&tmp_path, &path)
438 .await
439 .map_err(|e| DakeraError::Storage(e.to_string()))?;
440 Ok::<(), DakeraError>(())
441 }
442 .await;
443 if let Err(e) = rename_result {
444 tracing::debug!(
445 path = %path,
446 error = %e,
447 "atomic rename failed, falling back to direct write"
448 );
449 let _ = op.delete(&tmp_path).await;
450 op.write(&path, data)
451 .await
452 .map_err(|e| DakeraError::Storage(e.to_string()))?;
453 }
454
455 Ok::<bool, DakeraError>(!exists)
456 }
457 })
458 .buffer_unordered(s3_concurrent_ops())
459 .collect()
460 .await;
461
462 let mut new_inserts = 0usize;
463 for r in results {
464 if r? {
465 new_inserts += 1;
466 }
467 }
468
469 meta.vector_count += new_inserts;
470 meta.updated_at = Self::now();
471 self.write_namespace_meta(namespace, &meta).await?;
472
473 tracing::debug!(
474 namespace = namespace,
475 upserted = total,
476 "Upserted vectors to object storage"
477 );
478
479 Ok(total)
480 }
481
482 async fn get(&self, namespace: &NamespaceId, ids: &[VectorId]) -> Result<Vec<Vector>> {
483 if ids.is_empty() {
484 return Ok(Vec::new());
485 }
486
487 let now = std::time::SystemTime::now()
488 .duration_since(std::time::UNIX_EPOCH)
489 .unwrap_or_default()
490 .as_secs();
491
492 let read_tasks: Vec<_> = ids
493 .iter()
494 .map(|id| {
495 let operator = self.operator.clone();
496 let path = Self::vector_path(namespace, id);
497 let id = id.clone();
498 async move {
499 match operator.read(&path).await {
500 Ok(data) => {
501 let bytes = data.to_vec();
502 if bytes.is_empty() {
503 tracing::warn!(
504 vector_id = %id,
505 "Empty vector file detected, skipping"
506 );
507 return Ok(None);
508 }
509 match serde_json::from_slice::<StoredVector>(&bytes) {
510 Ok(stored) => {
511 let vector: Vector = stored.into();
512 if !vector.is_expired_at(now) {
513 Ok(Some(vector))
514 } else {
515 Ok(None)
516 }
517 }
518 Err(e) => {
519 tracing::warn!(
520 vector_id = %id,
521 error = %e,
522 bytes_len = bytes.len(),
523 "Corrupted vector file detected, skipping"
524 );
525 Ok(None)
526 }
527 }
528 }
529 Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(None),
530 Err(e) => Err(DakeraError::Storage(e.to_string())),
531 }
532 }
533 })
534 .collect();
535
536 let results: Vec<Result<Option<Vector>>> = stream::iter(read_tasks)
537 .buffer_unordered(s3_concurrent_ops())
538 .collect()
539 .await;
540
541 let mut vectors = Vec::with_capacity(ids.len());
542 for result in results {
543 if let Some(v) = result? {
544 vectors.push(v);
545 }
546 }
547 Ok(vectors)
548 }
549
550 async fn get_all(&self, namespace: &NamespaceId) -> Result<Vec<Vector>> {
551 let prefix = Self::namespace_vectors_prefix(namespace);
552
553 let entries = self
554 .operator
555 .list(&prefix)
556 .await
557 .map_err(|e| DakeraError::Storage(e.to_string()))?;
558
559 let json_paths: Vec<String> = entries
560 .into_iter()
561 .filter(|e| e.path().ends_with(".json"))
562 .map(|e| e.path().to_string())
563 .collect();
564
565 if json_paths.is_empty() {
566 return Ok(Vec::new());
567 }
568
569 let now = std::time::SystemTime::now()
570 .duration_since(std::time::UNIX_EPOCH)
571 .unwrap_or_default()
572 .as_secs();
573
574 let results: Vec<Option<Vector>> = stream::iter(json_paths.into_iter().map(|path| {
575 let operator = self.operator.clone();
576 async move {
577 match operator.read(&path).await {
578 Ok(data) => {
579 let bytes = data.to_vec();
580 if let Ok(stored) = serde_json::from_slice::<StoredVector>(&bytes) {
581 let vector: Vector = stored.into();
582 if !vector.is_expired_at(now) {
583 return Some(vector);
584 }
585 }
586 None
587 }
588 Err(e) => {
589 tracing::warn!(path = %path, error = %e, "Failed to read vector");
590 None
591 }
592 }
593 }
594 }))
595 .buffer_unordered(s3_concurrent_ops())
596 .collect()
597 .await;
598
599 Ok(results.into_iter().flatten().collect())
600 }
601
602 async fn delete(&self, namespace: &NamespaceId, ids: &[VectorId]) -> Result<usize> {
603 if ids.is_empty() {
604 return Ok(0);
605 }
606
607 let delete_tasks: Vec<_> = ids
608 .iter()
609 .map(|id| {
610 let operator = self.operator.clone();
611 let path = Self::vector_path(namespace, id);
612 async move {
613 let exists = operator
614 .exists(&path)
615 .await
616 .map_err(|e| DakeraError::Storage(e.to_string()))?;
617 if exists {
618 match operator.delete(&path).await {
619 Ok(_) => Ok(true),
620 Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(false),
621 Err(e) => Err(DakeraError::Storage(e.to_string())),
622 }
623 } else {
624 Ok(false)
625 }
626 }
627 })
628 .collect();
629
630 let results: Vec<Result<bool>> = stream::iter(delete_tasks)
631 .buffer_unordered(s3_concurrent_ops())
632 .collect()
633 .await;
634
635 let mut deleted = 0;
636 for result in results {
637 if result? {
638 deleted += 1;
639 }
640 }
641
642 if deleted > 0 {
644 if let Some(mut meta) = self.read_namespace_meta(namespace).await? {
645 meta.vector_count = meta.vector_count.saturating_sub(deleted);
646 meta.updated_at = Self::now();
647 self.write_namespace_meta(namespace, &meta).await?;
648 }
649 }
650
651 Ok(deleted)
652 }
653
654 async fn namespace_exists(&self, namespace: &NamespaceId) -> Result<bool> {
655 Ok(self.read_namespace_meta(namespace).await?.is_some())
656 }
657
658 async fn ensure_namespace(&self, namespace: &NamespaceId) -> Result<()> {
659 if self.read_namespace_meta(namespace).await?.is_none() {
660 let meta = NamespaceMetadata {
661 dimension: None,
662 vector_count: 0,
663 created_at: Self::now(),
664 updated_at: Self::now(),
665 };
666 self.write_namespace_meta(namespace, &meta).await?;
667 }
668 Ok(())
669 }
670
671 async fn dimension(&self, namespace: &NamespaceId) -> Result<Option<usize>> {
672 Ok(self
673 .read_namespace_meta(namespace)
674 .await?
675 .and_then(|m| m.dimension))
676 }
677
678 async fn count(&self, namespace: &NamespaceId) -> Result<usize> {
679 Ok(self
680 .read_namespace_meta(namespace)
681 .await?
682 .map(|m| m.vector_count)
683 .unwrap_or(0))
684 }
685
686 async fn list_namespaces(&self) -> Result<Vec<NamespaceId>> {
687 let entries = self
688 .operator
689 .list("namespaces/")
690 .await
691 .map_err(|e| DakeraError::Storage(e.to_string()))?;
692
693 let mut namespaces = Vec::new();
694 for entry in entries {
695 let path = entry.path();
696 if let Some(ns) = path.strip_prefix("namespaces/") {
698 let ns = ns.trim_end_matches('/');
699 if !ns.is_empty() && !ns.contains('/') {
700 if self.read_namespace_meta(ns).await?.is_some() {
705 namespaces.push(ns.to_string());
706 }
707 }
708 }
709 }
710
711 Ok(namespaces)
712 }
713
714 async fn delete_namespace(&self, namespace: &NamespaceId) -> Result<bool> {
715 if !self.namespace_exists(namespace).await? {
717 return Ok(false);
718 }
719
720 let prefix = format!("namespaces/{}/", namespace);
724 self.operator
725 .delete_with(&prefix)
726 .recursive(true)
727 .await
728 .map_err(|e| DakeraError::Storage(e.to_string()))?;
729
730 tracing::debug!(
731 namespace = namespace,
732 "Deleted namespace from object storage"
733 );
734
735 Ok(true)
736 }
737
738 async fn cleanup_expired(&self, _namespace: &NamespaceId) -> Result<usize> {
739 Ok(0)
742 }
743
744 async fn cleanup_all_expired(&self) -> Result<usize> {
745 Ok(0)
747 }
748}
749
750#[async_trait]
751impl IndexStorage for ObjectStorage {
752 async fn save_index(
753 &self,
754 namespace: &NamespaceId,
755 index_type: IndexType,
756 data: Vec<u8>,
757 ) -> Result<()> {
758 let path = format!(
759 "namespaces/{}/indexes/{}.bin",
760 namespace,
761 index_type.as_str()
762 );
763 self.operator
764 .write(&path, data)
765 .await
766 .map_err(|e| DakeraError::Storage(e.to_string()))?;
767
768 tracing::debug!(
769 namespace = namespace,
770 index_type = index_type.as_str(),
771 "Saved index to object storage"
772 );
773 Ok(())
774 }
775
776 async fn load_index(
777 &self,
778 namespace: &NamespaceId,
779 index_type: IndexType,
780 ) -> Result<Option<Vec<u8>>> {
781 let path = format!(
782 "namespaces/{}/indexes/{}.bin",
783 namespace,
784 index_type.as_str()
785 );
786 match self.operator.read(&path).await {
787 Ok(data) => {
788 tracing::debug!(
789 namespace = namespace,
790 index_type = index_type.as_str(),
791 size = data.len(),
792 "Loaded index from object storage"
793 );
794 Ok(Some(data.to_vec()))
795 }
796 Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(None),
797 Err(e) => Err(DakeraError::Storage(e.to_string())),
798 }
799 }
800
801 async fn delete_index(&self, namespace: &NamespaceId, index_type: IndexType) -> Result<bool> {
802 let path = format!(
803 "namespaces/{}/indexes/{}.bin",
804 namespace,
805 index_type.as_str()
806 );
807 let exists = self
808 .operator
809 .exists(&path)
810 .await
811 .map_err(|e| DakeraError::Storage(e.to_string()))?;
812
813 if exists {
814 self.operator
815 .delete(&path)
816 .await
817 .map_err(|e| DakeraError::Storage(e.to_string()))?;
818 tracing::debug!(
819 namespace = namespace,
820 index_type = index_type.as_str(),
821 "Deleted index from object storage"
822 );
823 }
824 Ok(exists)
825 }
826
827 async fn index_exists(&self, namespace: &NamespaceId, index_type: IndexType) -> Result<bool> {
828 let path = format!(
829 "namespaces/{}/indexes/{}.bin",
830 namespace,
831 index_type.as_str()
832 );
833 self.operator
834 .exists(&path)
835 .await
836 .map_err(|e| DakeraError::Storage(e.to_string()))
837 }
838
839 async fn list_indexes(&self, namespace: &NamespaceId) -> Result<Vec<IndexType>> {
840 let prefix = format!("namespaces/{}/indexes/", namespace);
841 let entries = self
842 .operator
843 .list(&prefix)
844 .await
845 .map_err(|e| DakeraError::Storage(e.to_string()))?;
846
847 let mut indexes = Vec::new();
848 for entry in entries {
849 let path = entry.path();
850 if path.ends_with(".bin") {
851 if let Some(filename) = path.strip_prefix(&prefix) {
853 let name = filename.trim_end_matches(".bin");
854 match name {
855 "hnsw" => indexes.push(IndexType::Hnsw),
856 "pq" => indexes.push(IndexType::Pq),
857 "ivf" => indexes.push(IndexType::Ivf),
858 "spfresh" => indexes.push(IndexType::SpFresh),
859 "fulltext" => indexes.push(IndexType::FullText),
860 _ => {} }
862 }
863 }
864 }
865
866 Ok(indexes)
867 }
868}
869
870pub fn create_operator(config: &ObjectStorageConfig) -> Result<Operator> {
873 ObjectStorage::build_operator(config)
874}
875
876#[cfg(test)]
877mod tests {
878 use super::*;
879
880 #[tokio::test]
881 async fn test_object_storage_memory() {
882 let storage = ObjectStorage::memory().unwrap();
883 let namespace = "test".to_string();
884
885 storage.ensure_namespace(&namespace).await.unwrap();
887 assert!(storage.namespace_exists(&namespace).await.unwrap());
888
889 let vectors = vec![
891 Vector {
892 id: "v1".to_string(),
893 values: vec![1.0, 2.0, 3.0],
894 metadata: None,
895 ttl_seconds: None,
896 expires_at: None,
897 },
898 Vector {
899 id: "v2".to_string(),
900 values: vec![4.0, 5.0, 6.0],
901 metadata: Some(serde_json::json!({"key": "value"})),
902 ttl_seconds: None,
903 expires_at: None,
904 },
905 ];
906
907 let count = storage.upsert(&namespace, vectors).await.unwrap();
908 assert_eq!(count, 2);
909
910 let results = storage.get(&namespace, &["v1".to_string()]).await.unwrap();
912 assert_eq!(results.len(), 1);
913 assert_eq!(results[0].id, "v1");
914 assert_eq!(results[0].values, vec![1.0, 2.0, 3.0]);
915
916 let all = storage.get_all(&namespace).await.unwrap();
918 assert_eq!(all.len(), 2);
919
920 assert_eq!(storage.count(&namespace).await.unwrap(), 2);
922
923 let deleted = storage
925 .delete(&namespace, &["v1".to_string()])
926 .await
927 .unwrap();
928 assert_eq!(deleted, 1);
929 assert!(storage
930 .get(&namespace, &["v1".to_string()])
931 .await
932 .unwrap()
933 .is_empty());
934 assert_eq!(storage.count(&namespace).await.unwrap(), 1);
935 }
936
937 #[tokio::test]
938 async fn test_object_storage_dimension_mismatch() {
939 let storage = ObjectStorage::memory().unwrap();
940 let namespace = "test".to_string();
941 storage.ensure_namespace(&namespace).await.unwrap();
942
943 let v1 = vec![Vector {
945 id: "v1".to_string(),
946 values: vec![1.0, 2.0, 3.0],
947 metadata: None,
948 ttl_seconds: None,
949 expires_at: None,
950 }];
951 storage.upsert(&namespace, v1).await.unwrap();
952
953 let v2 = vec![Vector {
955 id: "v2".to_string(),
956 values: vec![1.0, 2.0], metadata: None,
958 ttl_seconds: None,
959 expires_at: None,
960 }];
961 let result = storage.upsert(&namespace, v2).await;
962 assert!(result.is_err());
963 }
964
965 #[tokio::test]
966 async fn test_object_storage_upsert() {
967 let storage = ObjectStorage::memory().unwrap();
968 let namespace = "test".to_string();
969 storage.ensure_namespace(&namespace).await.unwrap();
970
971 let v1 = vec![Vector {
973 id: "v1".to_string(),
974 values: vec![1.0, 2.0],
975 metadata: None,
976 ttl_seconds: None,
977 expires_at: None,
978 }];
979 storage.upsert(&namespace, v1).await.unwrap();
980
981 let v1_updated = vec![Vector {
983 id: "v1".to_string(),
984 values: vec![3.0, 4.0],
985 metadata: None,
986 ttl_seconds: None,
987 expires_at: None,
988 }];
989 storage.upsert(&namespace, v1_updated).await.unwrap();
990
991 let results = storage.get(&namespace, &["v1".to_string()]).await.unwrap();
993 assert_eq!(results.len(), 1);
994 assert_eq!(results[0].values, vec![3.0, 4.0]);
995
996 assert_eq!(storage.count(&namespace).await.unwrap(), 1);
998 }
999
1000 #[tokio::test]
1001 async fn test_index_storage() {
1002 let storage = ObjectStorage::memory().unwrap();
1003 let namespace = "test_index".to_string();
1004
1005 assert!(!storage
1007 .index_exists(&namespace, IndexType::Hnsw)
1008 .await
1009 .unwrap());
1010
1011 let index_data = b"fake hnsw index data for testing".to_vec();
1013 storage
1014 .save_index(&namespace, IndexType::Hnsw, index_data.clone())
1015 .await
1016 .unwrap();
1017
1018 assert!(storage
1020 .index_exists(&namespace, IndexType::Hnsw)
1021 .await
1022 .unwrap());
1023 assert!(!storage
1024 .index_exists(&namespace, IndexType::Pq)
1025 .await
1026 .unwrap());
1027
1028 let loaded = storage
1030 .load_index(&namespace, IndexType::Hnsw)
1031 .await
1032 .unwrap();
1033 assert!(loaded.is_some());
1034 assert_eq!(loaded.unwrap(), index_data);
1035
1036 let pq_data = b"fake pq index data".to_vec();
1038 storage
1039 .save_index(&namespace, IndexType::Pq, pq_data)
1040 .await
1041 .unwrap();
1042
1043 let indexes = storage.list_indexes(&namespace).await.unwrap();
1045 assert_eq!(indexes.len(), 2);
1046 assert!(indexes.contains(&IndexType::Hnsw));
1047 assert!(indexes.contains(&IndexType::Pq));
1048
1049 let deleted = storage
1051 .delete_index(&namespace, IndexType::Hnsw)
1052 .await
1053 .unwrap();
1054 assert!(deleted);
1055 assert!(!storage
1056 .index_exists(&namespace, IndexType::Hnsw)
1057 .await
1058 .unwrap());
1059
1060 let deleted = storage
1062 .delete_index(&namespace, IndexType::Hnsw)
1063 .await
1064 .unwrap();
1065 assert!(!deleted);
1066
1067 let loaded = storage
1069 .load_index(&namespace, IndexType::Hnsw)
1070 .await
1071 .unwrap();
1072 assert!(loaded.is_none());
1073 }
1074
1075 fn make_vector(id: &str, dim: usize) -> Vector {
1078 Vector {
1079 id: id.to_string(),
1080 values: vec![0.1; dim],
1081 metadata: None,
1082 ttl_seconds: None,
1083 expires_at: None,
1084 }
1085 }
1086
1087 #[tokio::test]
1088 async fn test_upsert_batch_parallel_all_new() {
1089 let storage = ObjectStorage::memory().unwrap();
1090 let ns = "batch_all_new".to_string();
1091 storage.ensure_namespace(&ns).await.unwrap();
1092
1093 let vectors: Vec<Vector> = (0..50).map(|i| make_vector(&format!("v{i}"), 4)).collect();
1095 let count = storage.upsert(&ns, vectors).await.unwrap();
1096
1097 assert_eq!(count, 50);
1098 assert_eq!(storage.count(&ns).await.unwrap(), 50);
1099 }
1100
1101 #[tokio::test]
1102 async fn test_upsert_batch_parallel_idempotent_count() {
1103 let storage = ObjectStorage::memory().unwrap();
1104 let ns = "batch_idempotent".to_string();
1105 storage.ensure_namespace(&ns).await.unwrap();
1106
1107 let vectors: Vec<Vector> = (0..10).map(|i| make_vector(&format!("v{i}"), 4)).collect();
1108 storage.upsert(&ns, vectors.clone()).await.unwrap();
1109
1110 storage.upsert(&ns, vectors).await.unwrap();
1112 assert_eq!(storage.count(&ns).await.unwrap(), 10);
1113 }
1114
1115 #[tokio::test]
1116 async fn test_upsert_batch_parallel_empty() {
1117 let storage = ObjectStorage::memory().unwrap();
1118 let ns = "batch_empty".to_string();
1119 storage.ensure_namespace(&ns).await.unwrap();
1120
1121 let count = storage.upsert(&ns, vec![]).await.unwrap();
1122 assert_eq!(count, 0);
1123 assert_eq!(storage.count(&ns).await.unwrap(), 0);
1124 }
1125
1126 #[tokio::test]
1127 async fn test_upsert_batch_parallel_large_batch() {
1128 let storage = ObjectStorage::memory().unwrap();
1130 let ns = "batch_large".to_string();
1131 storage.ensure_namespace(&ns).await.unwrap();
1132
1133 let vectors: Vec<Vector> = (0..200).map(|i| make_vector(&format!("v{i}"), 8)).collect();
1134 let count = storage.upsert(&ns, vectors).await.unwrap();
1135
1136 assert_eq!(count, 200);
1137 assert_eq!(storage.count(&ns).await.unwrap(), 200);
1138 }
1139}