1use async_trait::async_trait;
11use common::{DakeraError, NamespaceId, Result, Vector, VectorId};
12use futures_util::stream::{self, StreamExt};
13use opendal::{services, Operator};
14use serde::{Deserialize, Serialize};
15
16use crate::traits::{IndexStorage, IndexType, VectorStorage};
17
18const S3_CONCURRENT_READS: usize = 32;
19
20#[derive(Debug, Clone, Serialize, Deserialize)]
22struct NamespaceMetadata {
23 dimension: Option<usize>,
24 vector_count: usize,
25 created_at: u64,
26 updated_at: u64,
27}
28
29#[derive(Debug, Clone, Serialize, Deserialize)]
31struct StoredVector {
32 id: String,
33 values: Vec<f32>,
34 metadata: Option<serde_json::Value>,
35}
36
37impl From<Vector> for StoredVector {
38 fn from(v: Vector) -> Self {
39 Self {
40 id: v.id,
41 values: v.values,
42 metadata: v.metadata,
43 }
44 }
45}
46
47impl From<StoredVector> for Vector {
48 fn from(v: StoredVector) -> Self {
49 Self {
50 id: v.id,
51 values: v.values,
52 metadata: v.metadata,
53 ttl_seconds: None,
54 expires_at: None,
55 }
56 }
57}
58
59#[derive(Debug, Clone, Default)]
61pub enum ObjectStorageConfig {
62 #[default]
64 Memory,
65 Filesystem { root: String },
67 S3 {
69 bucket: String,
70 region: Option<String>,
71 endpoint: Option<String>,
72 access_key_id: Option<String>,
73 secret_access_key: Option<String>,
74 },
75 Azure {
77 container: String,
78 account_name: String,
79 account_key: Option<String>,
80 sas_token: Option<String>,
81 endpoint: Option<String>,
82 },
83 Gcs {
85 bucket: String,
86 credential_path: Option<String>,
87 endpoint: Option<String>,
88 },
89}
90
91#[derive(Clone)]
93pub struct ObjectStorage {
94 operator: Operator,
95}
96
97impl ObjectStorage {
98 pub fn new(config: ObjectStorageConfig) -> Result<Self> {
100 let operator = Self::build_operator(&config)?;
101 Ok(Self { operator })
102 }
103
104 pub fn memory() -> Result<Self> {
106 Self::new(ObjectStorageConfig::Memory)
107 }
108
109 pub fn filesystem(root: impl Into<String>) -> Result<Self> {
111 Self::new(ObjectStorageConfig::Filesystem { root: root.into() })
112 }
113
114 pub fn s3(bucket: impl Into<String>) -> Result<Self> {
116 Self::new(ObjectStorageConfig::S3 {
117 bucket: bucket.into(),
118 region: None,
119 endpoint: None,
120 access_key_id: None,
121 secret_access_key: None,
122 })
123 }
124
125 pub fn azure(container: impl Into<String>, account_name: impl Into<String>) -> Result<Self> {
127 Self::new(ObjectStorageConfig::Azure {
128 container: container.into(),
129 account_name: account_name.into(),
130 account_key: None,
131 sas_token: None,
132 endpoint: None,
133 })
134 }
135
136 pub fn gcs(bucket: impl Into<String>) -> Result<Self> {
138 Self::new(ObjectStorageConfig::Gcs {
139 bucket: bucket.into(),
140 credential_path: None,
141 endpoint: None,
142 })
143 }
144
145 pub fn build_operator(config: &ObjectStorageConfig) -> Result<Operator> {
146 match config {
147 ObjectStorageConfig::Memory => {
148 let builder = services::Memory::default();
149 Operator::new(builder)
150 .map(|op| op.finish())
151 .map_err(|e| DakeraError::Storage(e.to_string()))
152 }
153 ObjectStorageConfig::Filesystem { root } => {
154 let builder = services::Fs::default().root(root);
155 Operator::new(builder)
156 .map(|op| op.finish())
157 .map_err(|e| DakeraError::Storage(e.to_string()))
158 }
159 ObjectStorageConfig::S3 {
160 bucket,
161 region,
162 endpoint,
163 access_key_id,
164 secret_access_key,
165 } => {
166 let mut builder = services::S3::default().bucket(bucket);
167
168 if let Some(region) = region {
169 builder = builder.region(region);
170 }
171 if let Some(endpoint) = endpoint {
172 builder = builder.endpoint(endpoint);
173 }
174 if let Some(key) = access_key_id {
175 builder = builder.access_key_id(key);
176 }
177 if let Some(secret) = secret_access_key {
178 builder = builder.secret_access_key(secret);
179 }
180
181 Operator::new(builder)
182 .map(|op| op.finish())
183 .map_err(|e| DakeraError::Storage(e.to_string()))
184 }
185 ObjectStorageConfig::Azure {
186 container,
187 account_name,
188 account_key,
189 sas_token,
190 endpoint,
191 } => {
192 let mut builder = services::Azblob::default()
193 .container(container)
194 .account_name(account_name);
195
196 if let Some(key) = account_key {
197 builder = builder.account_key(key);
198 }
199 if let Some(token) = sas_token {
200 builder = builder.sas_token(token);
201 }
202 if let Some(endpoint) = endpoint {
203 builder = builder.endpoint(endpoint);
204 }
205
206 Operator::new(builder)
207 .map(|op| op.finish())
208 .map_err(|e| DakeraError::Storage(e.to_string()))
209 }
210 ObjectStorageConfig::Gcs {
211 bucket,
212 credential_path,
213 endpoint,
214 } => {
215 let mut builder = services::Gcs::default().bucket(bucket);
216
217 if let Some(cred_path) = credential_path {
218 builder = builder.credential_path(cred_path);
219 }
220 if let Some(endpoint) = endpoint {
221 builder = builder.endpoint(endpoint);
222 }
223
224 Operator::new(builder)
225 .map(|op| op.finish())
226 .map_err(|e| DakeraError::Storage(e.to_string()))
227 }
228 }
229 }
230
231 fn vector_path(namespace: &str, vector_id: &str) -> String {
233 format!("namespaces/{}/vectors/{}.json", namespace, vector_id)
234 }
235
236 fn namespace_meta_path(namespace: &str) -> String {
238 format!("namespaces/{}/meta.json", namespace)
239 }
240
241 fn namespace_vectors_prefix(namespace: &str) -> String {
243 format!("namespaces/{}/vectors/", namespace)
244 }
245
246 async fn read_namespace_meta(&self, namespace: &str) -> Result<Option<NamespaceMetadata>> {
248 let path = Self::namespace_meta_path(namespace);
249 match self.operator.read(&path).await {
250 Ok(data) => {
251 let bytes = data.to_vec();
252 if bytes.is_empty() {
253 tracing::warn!(
254 namespace = %namespace,
255 path = %path,
256 "Empty namespace metadata file detected, treating as missing"
257 );
258 return Ok(None);
259 }
260 match serde_json::from_slice(&bytes) {
261 Ok(meta) => Ok(Some(meta)),
262 Err(e) => {
263 tracing::warn!(
264 namespace = %namespace,
265 path = %path,
266 error = %e,
267 bytes_len = bytes.len(),
268 "Corrupted namespace metadata, treating as missing and will be recreated"
269 );
270 Ok(None)
271 }
272 }
273 }
274 Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(None),
275 Err(e) => Err(DakeraError::Storage(e.to_string())),
276 }
277 }
278
279 async fn write_namespace_meta(&self, namespace: &str, meta: &NamespaceMetadata) -> Result<()> {
281 let path = Self::namespace_meta_path(namespace);
282 let data = serde_json::to_vec(meta).map_err(|e| DakeraError::Storage(e.to_string()))?;
283 self.operator
284 .write(&path, data)
285 .await
286 .map_err(|e| DakeraError::Storage(e.to_string()))?;
287 Ok(())
288 }
289
290 fn now() -> u64 {
292 std::time::SystemTime::now()
293 .duration_since(std::time::UNIX_EPOCH)
294 .unwrap_or_default()
295 .as_secs()
296 }
297}
298
299#[async_trait]
300impl VectorStorage for ObjectStorage {
301 async fn upsert(&self, namespace: &NamespaceId, vectors: Vec<Vector>) -> Result<usize> {
302 if vectors.is_empty() {
303 return Ok(0);
304 }
305
306 let mut meta = self
308 .read_namespace_meta(namespace)
309 .await?
310 .unwrap_or_else(|| NamespaceMetadata {
311 dimension: None,
312 vector_count: 0,
313 created_at: Self::now(),
314 updated_at: Self::now(),
315 });
316
317 let first_dim = vectors[0].values.len();
319 if let Some(dim) = meta.dimension {
320 for v in &vectors {
321 if v.values.len() != dim {
322 return Err(DakeraError::DimensionMismatch {
323 expected: dim,
324 actual: v.values.len(),
325 });
326 }
327 }
328 } else {
329 meta.dimension = Some(first_dim);
330 }
331
332 let mut upserted = 0;
333 for vector in vectors {
334 let path = Self::vector_path(namespace, &vector.id);
335 let stored: StoredVector = vector.into();
336 let data =
337 serde_json::to_vec(&stored).map_err(|e| DakeraError::Storage(e.to_string()))?;
338
339 let exists = self
341 .operator
342 .exists(&path)
343 .await
344 .map_err(|e| DakeraError::Storage(e.to_string()))?;
345
346 self.operator
347 .write(&path, data)
348 .await
349 .map_err(|e| DakeraError::Storage(e.to_string()))?;
350
351 if !exists {
352 meta.vector_count += 1;
353 }
354 upserted += 1;
355 }
356
357 meta.updated_at = Self::now();
358 self.write_namespace_meta(namespace, &meta).await?;
359
360 tracing::debug!(
361 namespace = namespace,
362 upserted = upserted,
363 "Upserted vectors to object storage"
364 );
365
366 Ok(upserted)
367 }
368
369 async fn get(&self, namespace: &NamespaceId, ids: &[VectorId]) -> Result<Vec<Vector>> {
370 if ids.is_empty() {
371 return Ok(Vec::new());
372 }
373
374 let now = std::time::SystemTime::now()
375 .duration_since(std::time::UNIX_EPOCH)
376 .unwrap_or_default()
377 .as_secs();
378
379 let read_tasks: Vec<_> = ids
380 .iter()
381 .map(|id| {
382 let operator = self.operator.clone();
383 let path = Self::vector_path(namespace, id);
384 let id = id.clone();
385 async move {
386 match operator.read(&path).await {
387 Ok(data) => {
388 let bytes = data.to_vec();
389 if bytes.is_empty() {
390 tracing::warn!(
391 vector_id = %id,
392 "Empty vector file detected, skipping"
393 );
394 return Ok(None);
395 }
396 match serde_json::from_slice::<StoredVector>(&bytes) {
397 Ok(stored) => {
398 let vector: Vector = stored.into();
399 if !vector.is_expired_at(now) {
400 Ok(Some(vector))
401 } else {
402 Ok(None)
403 }
404 }
405 Err(e) => {
406 tracing::warn!(
407 vector_id = %id,
408 error = %e,
409 bytes_len = bytes.len(),
410 "Corrupted vector file detected, skipping"
411 );
412 Ok(None)
413 }
414 }
415 }
416 Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(None),
417 Err(e) => Err(DakeraError::Storage(e.to_string())),
418 }
419 }
420 })
421 .collect();
422
423 let results: Vec<Result<Option<Vector>>> = stream::iter(read_tasks)
424 .buffer_unordered(S3_CONCURRENT_READS)
425 .collect()
426 .await;
427
428 let mut vectors = Vec::with_capacity(ids.len());
429 for result in results {
430 if let Some(v) = result? {
431 vectors.push(v);
432 }
433 }
434 Ok(vectors)
435 }
436
437 async fn get_all(&self, namespace: &NamespaceId) -> Result<Vec<Vector>> {
438 let prefix = Self::namespace_vectors_prefix(namespace);
439
440 let entries = self
441 .operator
442 .list(&prefix)
443 .await
444 .map_err(|e| DakeraError::Storage(e.to_string()))?;
445
446 let json_paths: Vec<String> = entries
447 .into_iter()
448 .filter(|e| e.path().ends_with(".json"))
449 .map(|e| e.path().to_string())
450 .collect();
451
452 if json_paths.is_empty() {
453 return Ok(Vec::new());
454 }
455
456 let now = std::time::SystemTime::now()
457 .duration_since(std::time::UNIX_EPOCH)
458 .unwrap_or_default()
459 .as_secs();
460
461 let results: Vec<Option<Vector>> = stream::iter(json_paths.into_iter().map(|path| {
462 let operator = self.operator.clone();
463 async move {
464 match operator.read(&path).await {
465 Ok(data) => {
466 let bytes = data.to_vec();
467 if let Ok(stored) = serde_json::from_slice::<StoredVector>(&bytes) {
468 let vector: Vector = stored.into();
469 if !vector.is_expired_at(now) {
470 return Some(vector);
471 }
472 }
473 None
474 }
475 Err(e) => {
476 tracing::warn!(path = %path, error = %e, "Failed to read vector");
477 None
478 }
479 }
480 }
481 }))
482 .buffer_unordered(S3_CONCURRENT_READS)
483 .collect()
484 .await;
485
486 Ok(results.into_iter().flatten().collect())
487 }
488
489 async fn delete(&self, namespace: &NamespaceId, ids: &[VectorId]) -> Result<usize> {
490 if ids.is_empty() {
491 return Ok(0);
492 }
493
494 let delete_tasks: Vec<_> = ids
495 .iter()
496 .map(|id| {
497 let operator = self.operator.clone();
498 let path = Self::vector_path(namespace, id);
499 async move {
500 let exists = operator
501 .exists(&path)
502 .await
503 .map_err(|e| DakeraError::Storage(e.to_string()))?;
504 if exists {
505 match operator.delete(&path).await {
506 Ok(_) => Ok(true),
507 Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(false),
508 Err(e) => Err(DakeraError::Storage(e.to_string())),
509 }
510 } else {
511 Ok(false)
512 }
513 }
514 })
515 .collect();
516
517 let results: Vec<Result<bool>> = stream::iter(delete_tasks)
518 .buffer_unordered(S3_CONCURRENT_READS)
519 .collect()
520 .await;
521
522 let mut deleted = 0;
523 for result in results {
524 if result? {
525 deleted += 1;
526 }
527 }
528
529 if deleted > 0 {
531 if let Some(mut meta) = self.read_namespace_meta(namespace).await? {
532 meta.vector_count = meta.vector_count.saturating_sub(deleted);
533 meta.updated_at = Self::now();
534 self.write_namespace_meta(namespace, &meta).await?;
535 }
536 }
537
538 Ok(deleted)
539 }
540
541 async fn namespace_exists(&self, namespace: &NamespaceId) -> Result<bool> {
542 Ok(self.read_namespace_meta(namespace).await?.is_some())
543 }
544
545 async fn ensure_namespace(&self, namespace: &NamespaceId) -> Result<()> {
546 if self.read_namespace_meta(namespace).await?.is_none() {
547 let meta = NamespaceMetadata {
548 dimension: None,
549 vector_count: 0,
550 created_at: Self::now(),
551 updated_at: Self::now(),
552 };
553 self.write_namespace_meta(namespace, &meta).await?;
554 }
555 Ok(())
556 }
557
558 async fn dimension(&self, namespace: &NamespaceId) -> Result<Option<usize>> {
559 Ok(self
560 .read_namespace_meta(namespace)
561 .await?
562 .and_then(|m| m.dimension))
563 }
564
565 async fn count(&self, namespace: &NamespaceId) -> Result<usize> {
566 Ok(self
567 .read_namespace_meta(namespace)
568 .await?
569 .map(|m| m.vector_count)
570 .unwrap_or(0))
571 }
572
573 async fn list_namespaces(&self) -> Result<Vec<NamespaceId>> {
574 let entries = self
575 .operator
576 .list("namespaces/")
577 .await
578 .map_err(|e| DakeraError::Storage(e.to_string()))?;
579
580 let mut namespaces = Vec::new();
581 for entry in entries {
582 let path = entry.path();
583 if let Some(ns) = path.strip_prefix("namespaces/") {
585 let ns = ns.trim_end_matches('/');
586 if !ns.is_empty() && !ns.contains('/') {
587 if self.read_namespace_meta(ns).await?.is_some() {
592 namespaces.push(ns.to_string());
593 }
594 }
595 }
596 }
597
598 Ok(namespaces)
599 }
600
601 async fn delete_namespace(&self, namespace: &NamespaceId) -> Result<bool> {
602 if !self.namespace_exists(namespace).await? {
604 return Ok(false);
605 }
606
607 let prefix = format!("namespaces/{}/", namespace);
611 self.operator
612 .remove_all(&prefix)
613 .await
614 .map_err(|e| DakeraError::Storage(e.to_string()))?;
615
616 tracing::debug!(
617 namespace = namespace,
618 "Deleted namespace from object storage"
619 );
620
621 Ok(true)
622 }
623
624 async fn cleanup_expired(&self, _namespace: &NamespaceId) -> Result<usize> {
625 Ok(0)
628 }
629
630 async fn cleanup_all_expired(&self) -> Result<usize> {
631 Ok(0)
633 }
634}
635
636#[async_trait]
637impl IndexStorage for ObjectStorage {
638 async fn save_index(
639 &self,
640 namespace: &NamespaceId,
641 index_type: IndexType,
642 data: Vec<u8>,
643 ) -> Result<()> {
644 let path = format!(
645 "namespaces/{}/indexes/{}.bin",
646 namespace,
647 index_type.as_str()
648 );
649 self.operator
650 .write(&path, data)
651 .await
652 .map_err(|e| DakeraError::Storage(e.to_string()))?;
653
654 tracing::debug!(
655 namespace = namespace,
656 index_type = index_type.as_str(),
657 "Saved index to object storage"
658 );
659 Ok(())
660 }
661
662 async fn load_index(
663 &self,
664 namespace: &NamespaceId,
665 index_type: IndexType,
666 ) -> Result<Option<Vec<u8>>> {
667 let path = format!(
668 "namespaces/{}/indexes/{}.bin",
669 namespace,
670 index_type.as_str()
671 );
672 match self.operator.read(&path).await {
673 Ok(data) => {
674 tracing::debug!(
675 namespace = namespace,
676 index_type = index_type.as_str(),
677 size = data.len(),
678 "Loaded index from object storage"
679 );
680 Ok(Some(data.to_vec()))
681 }
682 Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(None),
683 Err(e) => Err(DakeraError::Storage(e.to_string())),
684 }
685 }
686
687 async fn delete_index(&self, namespace: &NamespaceId, index_type: IndexType) -> Result<bool> {
688 let path = format!(
689 "namespaces/{}/indexes/{}.bin",
690 namespace,
691 index_type.as_str()
692 );
693 let exists = self
694 .operator
695 .exists(&path)
696 .await
697 .map_err(|e| DakeraError::Storage(e.to_string()))?;
698
699 if exists {
700 self.operator
701 .delete(&path)
702 .await
703 .map_err(|e| DakeraError::Storage(e.to_string()))?;
704 tracing::debug!(
705 namespace = namespace,
706 index_type = index_type.as_str(),
707 "Deleted index from object storage"
708 );
709 }
710 Ok(exists)
711 }
712
713 async fn index_exists(&self, namespace: &NamespaceId, index_type: IndexType) -> Result<bool> {
714 let path = format!(
715 "namespaces/{}/indexes/{}.bin",
716 namespace,
717 index_type.as_str()
718 );
719 self.operator
720 .exists(&path)
721 .await
722 .map_err(|e| DakeraError::Storage(e.to_string()))
723 }
724
725 async fn list_indexes(&self, namespace: &NamespaceId) -> Result<Vec<IndexType>> {
726 let prefix = format!("namespaces/{}/indexes/", namespace);
727 let entries = self
728 .operator
729 .list(&prefix)
730 .await
731 .map_err(|e| DakeraError::Storage(e.to_string()))?;
732
733 let mut indexes = Vec::new();
734 for entry in entries {
735 let path = entry.path();
736 if path.ends_with(".bin") {
737 if let Some(filename) = path.strip_prefix(&prefix) {
739 let name = filename.trim_end_matches(".bin");
740 match name {
741 "hnsw" => indexes.push(IndexType::Hnsw),
742 "pq" => indexes.push(IndexType::Pq),
743 "ivf" => indexes.push(IndexType::Ivf),
744 "spfresh" => indexes.push(IndexType::SpFresh),
745 "fulltext" => indexes.push(IndexType::FullText),
746 _ => {} }
748 }
749 }
750 }
751
752 Ok(indexes)
753 }
754}
755
756pub fn create_operator(config: &ObjectStorageConfig) -> Result<Operator> {
759 ObjectStorage::build_operator(config)
760}
761
762#[cfg(test)]
763mod tests {
764 use super::*;
765
766 #[tokio::test]
767 async fn test_object_storage_memory() {
768 let storage = ObjectStorage::memory().unwrap();
769 let namespace = "test".to_string();
770
771 storage.ensure_namespace(&namespace).await.unwrap();
773 assert!(storage.namespace_exists(&namespace).await.unwrap());
774
775 let vectors = vec![
777 Vector {
778 id: "v1".to_string(),
779 values: vec![1.0, 2.0, 3.0],
780 metadata: None,
781 ttl_seconds: None,
782 expires_at: None,
783 },
784 Vector {
785 id: "v2".to_string(),
786 values: vec![4.0, 5.0, 6.0],
787 metadata: Some(serde_json::json!({"key": "value"})),
788 ttl_seconds: None,
789 expires_at: None,
790 },
791 ];
792
793 let count = storage.upsert(&namespace, vectors).await.unwrap();
794 assert_eq!(count, 2);
795
796 let results = storage.get(&namespace, &["v1".to_string()]).await.unwrap();
798 assert_eq!(results.len(), 1);
799 assert_eq!(results[0].id, "v1");
800 assert_eq!(results[0].values, vec![1.0, 2.0, 3.0]);
801
802 let all = storage.get_all(&namespace).await.unwrap();
804 assert_eq!(all.len(), 2);
805
806 assert_eq!(storage.count(&namespace).await.unwrap(), 2);
808
809 let deleted = storage
811 .delete(&namespace, &["v1".to_string()])
812 .await
813 .unwrap();
814 assert_eq!(deleted, 1);
815 assert!(storage
816 .get(&namespace, &["v1".to_string()])
817 .await
818 .unwrap()
819 .is_empty());
820 assert_eq!(storage.count(&namespace).await.unwrap(), 1);
821 }
822
823 #[tokio::test]
824 async fn test_object_storage_dimension_mismatch() {
825 let storage = ObjectStorage::memory().unwrap();
826 let namespace = "test".to_string();
827 storage.ensure_namespace(&namespace).await.unwrap();
828
829 let v1 = vec![Vector {
831 id: "v1".to_string(),
832 values: vec![1.0, 2.0, 3.0],
833 metadata: None,
834 ttl_seconds: None,
835 expires_at: None,
836 }];
837 storage.upsert(&namespace, v1).await.unwrap();
838
839 let v2 = vec![Vector {
841 id: "v2".to_string(),
842 values: vec![1.0, 2.0], metadata: None,
844 ttl_seconds: None,
845 expires_at: None,
846 }];
847 let result = storage.upsert(&namespace, v2).await;
848 assert!(result.is_err());
849 }
850
851 #[tokio::test]
852 async fn test_object_storage_upsert() {
853 let storage = ObjectStorage::memory().unwrap();
854 let namespace = "test".to_string();
855 storage.ensure_namespace(&namespace).await.unwrap();
856
857 let v1 = vec![Vector {
859 id: "v1".to_string(),
860 values: vec![1.0, 2.0],
861 metadata: None,
862 ttl_seconds: None,
863 expires_at: None,
864 }];
865 storage.upsert(&namespace, v1).await.unwrap();
866
867 let v1_updated = vec![Vector {
869 id: "v1".to_string(),
870 values: vec![3.0, 4.0],
871 metadata: None,
872 ttl_seconds: None,
873 expires_at: None,
874 }];
875 storage.upsert(&namespace, v1_updated).await.unwrap();
876
877 let results = storage.get(&namespace, &["v1".to_string()]).await.unwrap();
879 assert_eq!(results.len(), 1);
880 assert_eq!(results[0].values, vec![3.0, 4.0]);
881
882 assert_eq!(storage.count(&namespace).await.unwrap(), 1);
884 }
885
886 #[tokio::test]
887 async fn test_index_storage() {
888 let storage = ObjectStorage::memory().unwrap();
889 let namespace = "test_index".to_string();
890
891 assert!(!storage
893 .index_exists(&namespace, IndexType::Hnsw)
894 .await
895 .unwrap());
896
897 let index_data = b"fake hnsw index data for testing".to_vec();
899 storage
900 .save_index(&namespace, IndexType::Hnsw, index_data.clone())
901 .await
902 .unwrap();
903
904 assert!(storage
906 .index_exists(&namespace, IndexType::Hnsw)
907 .await
908 .unwrap());
909 assert!(!storage
910 .index_exists(&namespace, IndexType::Pq)
911 .await
912 .unwrap());
913
914 let loaded = storage
916 .load_index(&namespace, IndexType::Hnsw)
917 .await
918 .unwrap();
919 assert!(loaded.is_some());
920 assert_eq!(loaded.unwrap(), index_data);
921
922 let pq_data = b"fake pq index data".to_vec();
924 storage
925 .save_index(&namespace, IndexType::Pq, pq_data)
926 .await
927 .unwrap();
928
929 let indexes = storage.list_indexes(&namespace).await.unwrap();
931 assert_eq!(indexes.len(), 2);
932 assert!(indexes.contains(&IndexType::Hnsw));
933 assert!(indexes.contains(&IndexType::Pq));
934
935 let deleted = storage
937 .delete_index(&namespace, IndexType::Hnsw)
938 .await
939 .unwrap();
940 assert!(deleted);
941 assert!(!storage
942 .index_exists(&namespace, IndexType::Hnsw)
943 .await
944 .unwrap());
945
946 let deleted = storage
948 .delete_index(&namespace, IndexType::Hnsw)
949 .await
950 .unwrap();
951 assert!(!deleted);
952
953 let loaded = storage
955 .load_index(&namespace, IndexType::Hnsw)
956 .await
957 .unwrap();
958 assert!(loaded.is_none());
959 }
960}