Skip to main content

storage/
object.rs

1//! Object Storage Backend using OpenDAL
2//!
3//! Provides storage abstraction for multiple cloud storage backends:
4//! - S3-compatible (AWS S3, MinIO, DigitalOcean Spaces, etc.)
5//! - Azure Blob Storage
6//! - Google Cloud Storage (GCS)
7//! - Local filesystem
8//! - In-memory (for testing)
9
10use async_trait::async_trait;
11use common::{DakeraError, NamespaceId, Result, Vector, VectorId};
12use opendal::{services, Operator};
13use serde::{Deserialize, Serialize};
14
15use crate::traits::{IndexStorage, IndexType, VectorStorage};
16
17/// Serializable namespace metadata
18#[derive(Debug, Clone, Serialize, Deserialize)]
19struct NamespaceMetadata {
20    dimension: Option<usize>,
21    vector_count: usize,
22    created_at: u64,
23    updated_at: u64,
24}
25
26/// Serializable vector data for storage
27#[derive(Debug, Clone, Serialize, Deserialize)]
28struct StoredVector {
29    id: String,
30    values: Vec<f32>,
31    metadata: Option<serde_json::Value>,
32}
33
34impl From<Vector> for StoredVector {
35    fn from(v: Vector) -> Self {
36        Self {
37            id: v.id,
38            values: v.values,
39            metadata: v.metadata,
40        }
41    }
42}
43
44impl From<StoredVector> for Vector {
45    fn from(v: StoredVector) -> Self {
46        Self {
47            id: v.id,
48            values: v.values,
49            metadata: v.metadata,
50            ttl_seconds: None,
51            expires_at: None,
52        }
53    }
54}
55
56/// Object storage backend configuration
57#[derive(Debug, Clone, Default)]
58pub enum ObjectStorageConfig {
59    /// In-memory storage (for testing)
60    #[default]
61    Memory,
62    /// Local filesystem storage
63    Filesystem { root: String },
64    /// S3-compatible storage (S3, MinIO, etc.)
65    S3 {
66        bucket: String,
67        region: Option<String>,
68        endpoint: Option<String>,
69        access_key_id: Option<String>,
70        secret_access_key: Option<String>,
71    },
72    /// Azure Blob Storage
73    Azure {
74        container: String,
75        account_name: String,
76        account_key: Option<String>,
77        sas_token: Option<String>,
78        endpoint: Option<String>,
79    },
80    /// Google Cloud Storage
81    Gcs {
82        bucket: String,
83        credential_path: Option<String>,
84        endpoint: Option<String>,
85    },
86}
87
88/// Object storage backend using OpenDAL
89#[derive(Clone)]
90pub struct ObjectStorage {
91    operator: Operator,
92}
93
94impl ObjectStorage {
95    /// Create a new object storage with the given configuration
96    pub fn new(config: ObjectStorageConfig) -> Result<Self> {
97        let operator = Self::build_operator(&config)?;
98        Ok(Self { operator })
99    }
100
101    /// Create in-memory storage (for testing)
102    pub fn memory() -> Result<Self> {
103        Self::new(ObjectStorageConfig::Memory)
104    }
105
106    /// Create filesystem storage
107    pub fn filesystem(root: impl Into<String>) -> Result<Self> {
108        Self::new(ObjectStorageConfig::Filesystem { root: root.into() })
109    }
110
111    /// Create S3 storage
112    pub fn s3(bucket: impl Into<String>) -> Result<Self> {
113        Self::new(ObjectStorageConfig::S3 {
114            bucket: bucket.into(),
115            region: None,
116            endpoint: None,
117            access_key_id: None,
118            secret_access_key: None,
119        })
120    }
121
122    /// Create Azure Blob storage
123    pub fn azure(container: impl Into<String>, account_name: impl Into<String>) -> Result<Self> {
124        Self::new(ObjectStorageConfig::Azure {
125            container: container.into(),
126            account_name: account_name.into(),
127            account_key: None,
128            sas_token: None,
129            endpoint: None,
130        })
131    }
132
133    /// Create Google Cloud Storage
134    pub fn gcs(bucket: impl Into<String>) -> Result<Self> {
135        Self::new(ObjectStorageConfig::Gcs {
136            bucket: bucket.into(),
137            credential_path: None,
138            endpoint: None,
139        })
140    }
141
142    pub fn build_operator(config: &ObjectStorageConfig) -> Result<Operator> {
143        match config {
144            ObjectStorageConfig::Memory => {
145                let builder = services::Memory::default();
146                Operator::new(builder)
147                    .map(|op| op.finish())
148                    .map_err(|e| DakeraError::Storage(e.to_string()))
149            }
150            ObjectStorageConfig::Filesystem { root } => {
151                let builder = services::Fs::default().root(root);
152                Operator::new(builder)
153                    .map(|op| op.finish())
154                    .map_err(|e| DakeraError::Storage(e.to_string()))
155            }
156            ObjectStorageConfig::S3 {
157                bucket,
158                region,
159                endpoint,
160                access_key_id,
161                secret_access_key,
162            } => {
163                let mut builder = services::S3::default().bucket(bucket);
164
165                if let Some(region) = region {
166                    builder = builder.region(region);
167                }
168                if let Some(endpoint) = endpoint {
169                    builder = builder.endpoint(endpoint);
170                }
171                if let Some(key) = access_key_id {
172                    builder = builder.access_key_id(key);
173                }
174                if let Some(secret) = secret_access_key {
175                    builder = builder.secret_access_key(secret);
176                }
177
178                Operator::new(builder)
179                    .map(|op| op.finish())
180                    .map_err(|e| DakeraError::Storage(e.to_string()))
181            }
182            ObjectStorageConfig::Azure {
183                container,
184                account_name,
185                account_key,
186                sas_token,
187                endpoint,
188            } => {
189                let mut builder = services::Azblob::default()
190                    .container(container)
191                    .account_name(account_name);
192
193                if let Some(key) = account_key {
194                    builder = builder.account_key(key);
195                }
196                if let Some(token) = sas_token {
197                    builder = builder.sas_token(token);
198                }
199                if let Some(endpoint) = endpoint {
200                    builder = builder.endpoint(endpoint);
201                }
202
203                Operator::new(builder)
204                    .map(|op| op.finish())
205                    .map_err(|e| DakeraError::Storage(e.to_string()))
206            }
207            ObjectStorageConfig::Gcs {
208                bucket,
209                credential_path,
210                endpoint,
211            } => {
212                let mut builder = services::Gcs::default().bucket(bucket);
213
214                if let Some(cred_path) = credential_path {
215                    builder = builder.credential_path(cred_path);
216                }
217                if let Some(endpoint) = endpoint {
218                    builder = builder.endpoint(endpoint);
219                }
220
221                Operator::new(builder)
222                    .map(|op| op.finish())
223                    .map_err(|e| DakeraError::Storage(e.to_string()))
224            }
225        }
226    }
227
228    /// Get the path for a vector
229    fn vector_path(namespace: &str, vector_id: &str) -> String {
230        format!("namespaces/{}/vectors/{}.json", namespace, vector_id)
231    }
232
233    /// Get the path for namespace metadata
234    fn namespace_meta_path(namespace: &str) -> String {
235        format!("namespaces/{}/meta.json", namespace)
236    }
237
238    /// Get the path prefix for all vectors in a namespace
239    fn namespace_vectors_prefix(namespace: &str) -> String {
240        format!("namespaces/{}/vectors/", namespace)
241    }
242
243    /// Read namespace metadata
244    async fn read_namespace_meta(&self, namespace: &str) -> Result<Option<NamespaceMetadata>> {
245        let path = Self::namespace_meta_path(namespace);
246        match self.operator.read(&path).await {
247            Ok(data) => {
248                let bytes = data.to_vec();
249                if bytes.is_empty() {
250                    tracing::warn!(
251                        namespace = %namespace,
252                        path = %path,
253                        "Empty namespace metadata file detected, treating as missing"
254                    );
255                    return Ok(None);
256                }
257                match serde_json::from_slice(&bytes) {
258                    Ok(meta) => Ok(Some(meta)),
259                    Err(e) => {
260                        tracing::warn!(
261                            namespace = %namespace,
262                            path = %path,
263                            error = %e,
264                            bytes_len = bytes.len(),
265                            "Corrupted namespace metadata, treating as missing and will be recreated"
266                        );
267                        Ok(None)
268                    }
269                }
270            }
271            Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(None),
272            Err(e) => Err(DakeraError::Storage(e.to_string())),
273        }
274    }
275
276    /// Write namespace metadata
277    async fn write_namespace_meta(&self, namespace: &str, meta: &NamespaceMetadata) -> Result<()> {
278        let path = Self::namespace_meta_path(namespace);
279        let data = serde_json::to_vec(meta).map_err(|e| DakeraError::Storage(e.to_string()))?;
280        self.operator
281            .write(&path, data)
282            .await
283            .map_err(|e| DakeraError::Storage(e.to_string()))?;
284        Ok(())
285    }
286
287    /// Get current timestamp
288    fn now() -> u64 {
289        std::time::SystemTime::now()
290            .duration_since(std::time::UNIX_EPOCH)
291            .unwrap_or_default()
292            .as_secs()
293    }
294}
295
296#[async_trait]
297impl VectorStorage for ObjectStorage {
298    async fn upsert(&self, namespace: &NamespaceId, vectors: Vec<Vector>) -> Result<usize> {
299        if vectors.is_empty() {
300            return Ok(0);
301        }
302
303        // Get or create namespace metadata
304        let mut meta = self
305            .read_namespace_meta(namespace)
306            .await?
307            .unwrap_or_else(|| NamespaceMetadata {
308                dimension: None,
309                vector_count: 0,
310                created_at: Self::now(),
311                updated_at: Self::now(),
312            });
313
314        // Validate dimensions
315        let first_dim = vectors[0].values.len();
316        if let Some(dim) = meta.dimension {
317            for v in &vectors {
318                if v.values.len() != dim {
319                    return Err(DakeraError::DimensionMismatch {
320                        expected: dim,
321                        actual: v.values.len(),
322                    });
323                }
324            }
325        } else {
326            meta.dimension = Some(first_dim);
327        }
328
329        let mut upserted = 0;
330        for vector in vectors {
331            let path = Self::vector_path(namespace, &vector.id);
332            let stored: StoredVector = vector.into();
333            let data =
334                serde_json::to_vec(&stored).map_err(|e| DakeraError::Storage(e.to_string()))?;
335
336            // Check if this is an insert or update
337            let exists = self
338                .operator
339                .exists(&path)
340                .await
341                .map_err(|e| DakeraError::Storage(e.to_string()))?;
342
343            self.operator
344                .write(&path, data)
345                .await
346                .map_err(|e| DakeraError::Storage(e.to_string()))?;
347
348            if !exists {
349                meta.vector_count += 1;
350            }
351            upserted += 1;
352        }
353
354        meta.updated_at = Self::now();
355        self.write_namespace_meta(namespace, &meta).await?;
356
357        tracing::debug!(
358            namespace = namespace,
359            upserted = upserted,
360            "Upserted vectors to object storage"
361        );
362
363        Ok(upserted)
364    }
365
366    async fn get(&self, namespace: &NamespaceId, ids: &[VectorId]) -> Result<Vec<Vector>> {
367        let mut vectors = Vec::new();
368        let now = std::time::SystemTime::now()
369            .duration_since(std::time::UNIX_EPOCH)
370            .unwrap_or_default()
371            .as_secs();
372        for id in ids {
373            let path = Self::vector_path(namespace, id);
374            match self.operator.read(&path).await {
375                Ok(data) => {
376                    let bytes = data.to_vec();
377                    if bytes.is_empty() {
378                        tracing::warn!(
379                            namespace = %namespace,
380                            vector_id = %id,
381                            "Empty vector file detected, skipping"
382                        );
383                        continue;
384                    }
385                    match serde_json::from_slice::<StoredVector>(&bytes) {
386                        Ok(stored) => {
387                            let vector: Vector = stored.into();
388                            // Filter out expired vectors
389                            if !vector.is_expired_at(now) {
390                                vectors.push(vector);
391                            }
392                        }
393                        Err(e) => {
394                            tracing::warn!(
395                                namespace = %namespace,
396                                vector_id = %id,
397                                error = %e,
398                                bytes_len = bytes.len(),
399                                "Corrupted vector file detected, skipping"
400                            );
401                        }
402                    }
403                }
404                Err(e) if e.kind() == opendal::ErrorKind::NotFound => {
405                    // Skip missing vectors
406                }
407                Err(e) => return Err(DakeraError::Storage(e.to_string())),
408            }
409        }
410        Ok(vectors)
411    }
412
413    async fn get_all(&self, namespace: &NamespaceId) -> Result<Vec<Vector>> {
414        let prefix = Self::namespace_vectors_prefix(namespace);
415
416        let entries = self
417            .operator
418            .list(&prefix)
419            .await
420            .map_err(|e| DakeraError::Storage(e.to_string()))?;
421
422        let now = std::time::SystemTime::now()
423            .duration_since(std::time::UNIX_EPOCH)
424            .unwrap_or_default()
425            .as_secs();
426        let mut vectors = Vec::new();
427        for entry in entries {
428            let path = entry.path();
429            if path.ends_with(".json") {
430                match self.operator.read(path).await {
431                    Ok(data) => {
432                        let bytes = data.to_vec();
433                        if let Ok(stored) = serde_json::from_slice::<StoredVector>(&bytes) {
434                            let vector: Vector = stored.into();
435                            // Filter out expired vectors
436                            if !vector.is_expired_at(now) {
437                                vectors.push(vector);
438                            }
439                        }
440                    }
441                    Err(e) => {
442                        tracing::warn!(path = path, error = %e, "Failed to read vector");
443                    }
444                }
445            }
446        }
447
448        Ok(vectors)
449    }
450
451    async fn delete(&self, namespace: &NamespaceId, ids: &[VectorId]) -> Result<usize> {
452        let mut deleted = 0;
453
454        for id in ids {
455            let path = Self::vector_path(namespace, id);
456            // Check if the vector exists before deleting (OpenDAL delete is idempotent)
457            let exists = self
458                .operator
459                .exists(&path)
460                .await
461                .map_err(|e| DakeraError::Storage(e.to_string()))?;
462
463            if exists {
464                match self.operator.delete(&path).await {
465                    Ok(_) => deleted += 1,
466                    Err(e) if e.kind() == opendal::ErrorKind::NotFound => {}
467                    Err(e) => return Err(DakeraError::Storage(e.to_string())),
468                }
469            }
470        }
471
472        // Update metadata
473        if deleted > 0 {
474            if let Some(mut meta) = self.read_namespace_meta(namespace).await? {
475                meta.vector_count = meta.vector_count.saturating_sub(deleted);
476                meta.updated_at = Self::now();
477                self.write_namespace_meta(namespace, &meta).await?;
478            }
479        }
480
481        Ok(deleted)
482    }
483
484    async fn namespace_exists(&self, namespace: &NamespaceId) -> Result<bool> {
485        Ok(self.read_namespace_meta(namespace).await?.is_some())
486    }
487
488    async fn ensure_namespace(&self, namespace: &NamespaceId) -> Result<()> {
489        if self.read_namespace_meta(namespace).await?.is_none() {
490            let meta = NamespaceMetadata {
491                dimension: None,
492                vector_count: 0,
493                created_at: Self::now(),
494                updated_at: Self::now(),
495            };
496            self.write_namespace_meta(namespace, &meta).await?;
497        }
498        Ok(())
499    }
500
501    async fn dimension(&self, namespace: &NamespaceId) -> Result<Option<usize>> {
502        Ok(self
503            .read_namespace_meta(namespace)
504            .await?
505            .and_then(|m| m.dimension))
506    }
507
508    async fn count(&self, namespace: &NamespaceId) -> Result<usize> {
509        Ok(self
510            .read_namespace_meta(namespace)
511            .await?
512            .map(|m| m.vector_count)
513            .unwrap_or(0))
514    }
515
516    async fn list_namespaces(&self) -> Result<Vec<NamespaceId>> {
517        let entries = self
518            .operator
519            .list("namespaces/")
520            .await
521            .map_err(|e| DakeraError::Storage(e.to_string()))?;
522
523        let mut namespaces = Vec::new();
524        for entry in entries {
525            let path = entry.path();
526            // Extract namespace name from path like "namespaces/myns/"
527            if let Some(ns) = path.strip_prefix("namespaces/") {
528                let ns = ns.trim_end_matches('/');
529                if !ns.is_empty() && !ns.contains('/') {
530                    // Only include namespaces that actually have metadata (meta.json).
531                    // This filters out ghost directory entries left behind after
532                    // namespace deletion on backends where empty directories persist
533                    // (e.g., local filesystem, some S3-compatible stores).
534                    if self.read_namespace_meta(ns).await?.is_some() {
535                        namespaces.push(ns.to_string());
536                    }
537                }
538            }
539        }
540
541        Ok(namespaces)
542    }
543
544    async fn delete_namespace(&self, namespace: &NamespaceId) -> Result<bool> {
545        // Check if namespace exists
546        if !self.namespace_exists(namespace).await? {
547            return Ok(false);
548        }
549
550        // Recursively remove everything under the namespace prefix.
551        // This deletes vectors, metadata, indexes, AND directory entries,
552        // preventing ghost namespaces from appearing in list_namespaces().
553        let prefix = format!("namespaces/{}/", namespace);
554        self.operator
555            .remove_all(&prefix)
556            .await
557            .map_err(|e| DakeraError::Storage(e.to_string()))?;
558
559        tracing::debug!(
560            namespace = namespace,
561            "Deleted namespace from object storage"
562        );
563
564        Ok(true)
565    }
566
567    async fn cleanup_expired(&self, _namespace: &NamespaceId) -> Result<usize> {
568        // Object storage doesn't track TTL internally - cleanup handled by application layer
569        // StoredVector doesn't persist TTL fields, so nothing to clean up here
570        Ok(0)
571    }
572
573    async fn cleanup_all_expired(&self) -> Result<usize> {
574        // Object storage doesn't track TTL internally - cleanup handled by application layer
575        Ok(0)
576    }
577}
578
579#[async_trait]
580impl IndexStorage for ObjectStorage {
581    async fn save_index(
582        &self,
583        namespace: &NamespaceId,
584        index_type: IndexType,
585        data: Vec<u8>,
586    ) -> Result<()> {
587        let path = format!(
588            "namespaces/{}/indexes/{}.bin",
589            namespace,
590            index_type.as_str()
591        );
592        self.operator
593            .write(&path, data)
594            .await
595            .map_err(|e| DakeraError::Storage(e.to_string()))?;
596
597        tracing::debug!(
598            namespace = namespace,
599            index_type = index_type.as_str(),
600            "Saved index to object storage"
601        );
602        Ok(())
603    }
604
605    async fn load_index(
606        &self,
607        namespace: &NamespaceId,
608        index_type: IndexType,
609    ) -> Result<Option<Vec<u8>>> {
610        let path = format!(
611            "namespaces/{}/indexes/{}.bin",
612            namespace,
613            index_type.as_str()
614        );
615        match self.operator.read(&path).await {
616            Ok(data) => {
617                tracing::debug!(
618                    namespace = namespace,
619                    index_type = index_type.as_str(),
620                    size = data.len(),
621                    "Loaded index from object storage"
622                );
623                Ok(Some(data.to_vec()))
624            }
625            Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(None),
626            Err(e) => Err(DakeraError::Storage(e.to_string())),
627        }
628    }
629
630    async fn delete_index(&self, namespace: &NamespaceId, index_type: IndexType) -> Result<bool> {
631        let path = format!(
632            "namespaces/{}/indexes/{}.bin",
633            namespace,
634            index_type.as_str()
635        );
636        let exists = self
637            .operator
638            .exists(&path)
639            .await
640            .map_err(|e| DakeraError::Storage(e.to_string()))?;
641
642        if exists {
643            self.operator
644                .delete(&path)
645                .await
646                .map_err(|e| DakeraError::Storage(e.to_string()))?;
647            tracing::debug!(
648                namespace = namespace,
649                index_type = index_type.as_str(),
650                "Deleted index from object storage"
651            );
652        }
653        Ok(exists)
654    }
655
656    async fn index_exists(&self, namespace: &NamespaceId, index_type: IndexType) -> Result<bool> {
657        let path = format!(
658            "namespaces/{}/indexes/{}.bin",
659            namespace,
660            index_type.as_str()
661        );
662        self.operator
663            .exists(&path)
664            .await
665            .map_err(|e| DakeraError::Storage(e.to_string()))
666    }
667
668    async fn list_indexes(&self, namespace: &NamespaceId) -> Result<Vec<IndexType>> {
669        let prefix = format!("namespaces/{}/indexes/", namespace);
670        let entries = self
671            .operator
672            .list(&prefix)
673            .await
674            .map_err(|e| DakeraError::Storage(e.to_string()))?;
675
676        let mut indexes = Vec::new();
677        for entry in entries {
678            let path = entry.path();
679            if path.ends_with(".bin") {
680                // Extract index type from filename
681                if let Some(filename) = path.strip_prefix(&prefix) {
682                    let name = filename.trim_end_matches(".bin");
683                    match name {
684                        "hnsw" => indexes.push(IndexType::Hnsw),
685                        "pq" => indexes.push(IndexType::Pq),
686                        "ivf" => indexes.push(IndexType::Ivf),
687                        "spfresh" => indexes.push(IndexType::SpFresh),
688                        "fulltext" => indexes.push(IndexType::FullText),
689                        _ => {} // Ignore unknown index types
690                    }
691                }
692            }
693        }
694
695        Ok(indexes)
696    }
697}
698
699/// Create an OpenDAL operator from configuration without constructing a full ObjectStorage.
700/// Useful for lightweight S3 access (e.g., BackupManager metadata persistence).
701pub fn create_operator(config: &ObjectStorageConfig) -> Result<Operator> {
702    ObjectStorage::build_operator(config)
703}
704
705#[cfg(test)]
706mod tests {
707    use super::*;
708
709    #[tokio::test]
710    async fn test_object_storage_memory() {
711        let storage = ObjectStorage::memory().unwrap();
712        let namespace = "test".to_string();
713
714        // Ensure namespace
715        storage.ensure_namespace(&namespace).await.unwrap();
716        assert!(storage.namespace_exists(&namespace).await.unwrap());
717
718        // Insert vectors
719        let vectors = vec![
720            Vector {
721                id: "v1".to_string(),
722                values: vec![1.0, 2.0, 3.0],
723                metadata: None,
724                ttl_seconds: None,
725                expires_at: None,
726            },
727            Vector {
728                id: "v2".to_string(),
729                values: vec![4.0, 5.0, 6.0],
730                metadata: Some(serde_json::json!({"key": "value"})),
731                ttl_seconds: None,
732                expires_at: None,
733            },
734        ];
735
736        let count = storage.upsert(&namespace, vectors).await.unwrap();
737        assert_eq!(count, 2);
738
739        // Get single vector
740        let results = storage.get(&namespace, &["v1".to_string()]).await.unwrap();
741        assert_eq!(results.len(), 1);
742        assert_eq!(results[0].id, "v1");
743        assert_eq!(results[0].values, vec![1.0, 2.0, 3.0]);
744
745        // Get all vectors
746        let all = storage.get_all(&namespace).await.unwrap();
747        assert_eq!(all.len(), 2);
748
749        // Count
750        assert_eq!(storage.count(&namespace).await.unwrap(), 2);
751
752        // Delete
753        let deleted = storage
754            .delete(&namespace, &["v1".to_string()])
755            .await
756            .unwrap();
757        assert_eq!(deleted, 1);
758        assert!(storage
759            .get(&namespace, &["v1".to_string()])
760            .await
761            .unwrap()
762            .is_empty());
763        assert_eq!(storage.count(&namespace).await.unwrap(), 1);
764    }
765
766    #[tokio::test]
767    async fn test_object_storage_dimension_mismatch() {
768        let storage = ObjectStorage::memory().unwrap();
769        let namespace = "test".to_string();
770        storage.ensure_namespace(&namespace).await.unwrap();
771
772        // Insert first vector
773        let v1 = vec![Vector {
774            id: "v1".to_string(),
775            values: vec![1.0, 2.0, 3.0],
776            metadata: None,
777            ttl_seconds: None,
778            expires_at: None,
779        }];
780        storage.upsert(&namespace, v1).await.unwrap();
781
782        // Try to insert vector with different dimension
783        let v2 = vec![Vector {
784            id: "v2".to_string(),
785            values: vec![1.0, 2.0], // Wrong dimension
786            metadata: None,
787            ttl_seconds: None,
788            expires_at: None,
789        }];
790        let result = storage.upsert(&namespace, v2).await;
791        assert!(result.is_err());
792    }
793
794    #[tokio::test]
795    async fn test_object_storage_upsert() {
796        let storage = ObjectStorage::memory().unwrap();
797        let namespace = "test".to_string();
798        storage.ensure_namespace(&namespace).await.unwrap();
799
800        // Insert
801        let v1 = vec![Vector {
802            id: "v1".to_string(),
803            values: vec![1.0, 2.0],
804            metadata: None,
805            ttl_seconds: None,
806            expires_at: None,
807        }];
808        storage.upsert(&namespace, v1).await.unwrap();
809
810        // Upsert (update)
811        let v1_updated = vec![Vector {
812            id: "v1".to_string(),
813            values: vec![3.0, 4.0],
814            metadata: None,
815            ttl_seconds: None,
816            expires_at: None,
817        }];
818        storage.upsert(&namespace, v1_updated).await.unwrap();
819
820        // Verify update
821        let results = storage.get(&namespace, &["v1".to_string()]).await.unwrap();
822        assert_eq!(results.len(), 1);
823        assert_eq!(results[0].values, vec![3.0, 4.0]);
824
825        // Count should still be 1
826        assert_eq!(storage.count(&namespace).await.unwrap(), 1);
827    }
828
829    #[tokio::test]
830    async fn test_index_storage() {
831        let storage = ObjectStorage::memory().unwrap();
832        let namespace = "test_index".to_string();
833
834        // Initially no indexes
835        assert!(!storage
836            .index_exists(&namespace, IndexType::Hnsw)
837            .await
838            .unwrap());
839
840        // Save an index
841        let index_data = b"fake hnsw index data for testing".to_vec();
842        storage
843            .save_index(&namespace, IndexType::Hnsw, index_data.clone())
844            .await
845            .unwrap();
846
847        // Check it exists
848        assert!(storage
849            .index_exists(&namespace, IndexType::Hnsw)
850            .await
851            .unwrap());
852        assert!(!storage
853            .index_exists(&namespace, IndexType::Pq)
854            .await
855            .unwrap());
856
857        // Load it back
858        let loaded = storage
859            .load_index(&namespace, IndexType::Hnsw)
860            .await
861            .unwrap();
862        assert!(loaded.is_some());
863        assert_eq!(loaded.unwrap(), index_data);
864
865        // Save another index type
866        let pq_data = b"fake pq index data".to_vec();
867        storage
868            .save_index(&namespace, IndexType::Pq, pq_data)
869            .await
870            .unwrap();
871
872        // List indexes
873        let indexes = storage.list_indexes(&namespace).await.unwrap();
874        assert_eq!(indexes.len(), 2);
875        assert!(indexes.contains(&IndexType::Hnsw));
876        assert!(indexes.contains(&IndexType::Pq));
877
878        // Delete index
879        let deleted = storage
880            .delete_index(&namespace, IndexType::Hnsw)
881            .await
882            .unwrap();
883        assert!(deleted);
884        assert!(!storage
885            .index_exists(&namespace, IndexType::Hnsw)
886            .await
887            .unwrap());
888
889        // Delete non-existent
890        let deleted = storage
891            .delete_index(&namespace, IndexType::Hnsw)
892            .await
893            .unwrap();
894        assert!(!deleted);
895
896        // Load non-existent
897        let loaded = storage
898            .load_index(&namespace, IndexType::Hnsw)
899            .await
900            .unwrap();
901        assert!(loaded.is_none());
902    }
903}