Skip to main content

storage/
object.rs

1//! Object Storage Backend using OpenDAL
2//!
3//! Provides storage abstraction for multiple cloud storage backends:
4//! - S3-compatible (AWS S3, MinIO, DigitalOcean Spaces, etc.)
5//! - Azure Blob Storage
6//! - Google Cloud Storage (GCS)
7//! - Local filesystem
8//! - In-memory (for testing)
9
10use async_trait::async_trait;
11use common::{DakeraError, NamespaceId, Result, Vector, VectorId};
12use opendal::{services, Operator};
13use serde::{Deserialize, Serialize};
14
15use crate::traits::{IndexStorage, IndexType, VectorStorage};
16
17/// Serializable namespace metadata
18#[derive(Debug, Clone, Serialize, Deserialize)]
19struct NamespaceMetadata {
20    dimension: Option<usize>,
21    vector_count: usize,
22    created_at: u64,
23    updated_at: u64,
24}
25
26/// Serializable vector data for storage
27#[derive(Debug, Clone, Serialize, Deserialize)]
28struct StoredVector {
29    id: String,
30    values: Vec<f32>,
31    metadata: Option<serde_json::Value>,
32}
33
34impl From<Vector> for StoredVector {
35    fn from(v: Vector) -> Self {
36        Self {
37            id: v.id,
38            values: v.values,
39            metadata: v.metadata,
40        }
41    }
42}
43
44impl From<StoredVector> for Vector {
45    fn from(v: StoredVector) -> Self {
46        Self {
47            id: v.id,
48            values: v.values,
49            metadata: v.metadata,
50            ttl_seconds: None,
51            expires_at: None,
52        }
53    }
54}
55
56/// Object storage backend configuration
57#[derive(Debug, Clone, Default)]
58pub enum ObjectStorageConfig {
59    /// In-memory storage (for testing)
60    #[default]
61    Memory,
62    /// Local filesystem storage
63    Filesystem { root: String },
64    /// S3-compatible storage (S3, MinIO, etc.)
65    S3 {
66        bucket: String,
67        region: Option<String>,
68        endpoint: Option<String>,
69        access_key_id: Option<String>,
70        secret_access_key: Option<String>,
71    },
72    /// Azure Blob Storage
73    Azure {
74        container: String,
75        account_name: String,
76        account_key: Option<String>,
77        sas_token: Option<String>,
78        endpoint: Option<String>,
79    },
80    /// Google Cloud Storage
81    Gcs {
82        bucket: String,
83        credential_path: Option<String>,
84        endpoint: Option<String>,
85    },
86}
87
88/// Object storage backend using OpenDAL
89pub struct ObjectStorage {
90    operator: Operator,
91}
92
93impl ObjectStorage {
94    /// Create a new object storage with the given configuration
95    pub fn new(config: ObjectStorageConfig) -> Result<Self> {
96        let operator = Self::build_operator(&config)?;
97        Ok(Self { operator })
98    }
99
100    /// Create in-memory storage (for testing)
101    pub fn memory() -> Result<Self> {
102        Self::new(ObjectStorageConfig::Memory)
103    }
104
105    /// Create filesystem storage
106    pub fn filesystem(root: impl Into<String>) -> Result<Self> {
107        Self::new(ObjectStorageConfig::Filesystem { root: root.into() })
108    }
109
110    /// Create S3 storage
111    pub fn s3(bucket: impl Into<String>) -> Result<Self> {
112        Self::new(ObjectStorageConfig::S3 {
113            bucket: bucket.into(),
114            region: None,
115            endpoint: None,
116            access_key_id: None,
117            secret_access_key: None,
118        })
119    }
120
121    /// Create Azure Blob storage
122    pub fn azure(container: impl Into<String>, account_name: impl Into<String>) -> Result<Self> {
123        Self::new(ObjectStorageConfig::Azure {
124            container: container.into(),
125            account_name: account_name.into(),
126            account_key: None,
127            sas_token: None,
128            endpoint: None,
129        })
130    }
131
132    /// Create Google Cloud Storage
133    pub fn gcs(bucket: impl Into<String>) -> Result<Self> {
134        Self::new(ObjectStorageConfig::Gcs {
135            bucket: bucket.into(),
136            credential_path: None,
137            endpoint: None,
138        })
139    }
140
141    pub fn build_operator(config: &ObjectStorageConfig) -> Result<Operator> {
142        match config {
143            ObjectStorageConfig::Memory => {
144                let builder = services::Memory::default();
145                Operator::new(builder)
146                    .map(|op| op.finish())
147                    .map_err(|e| DakeraError::Storage(e.to_string()))
148            }
149            ObjectStorageConfig::Filesystem { root } => {
150                let builder = services::Fs::default().root(root);
151                Operator::new(builder)
152                    .map(|op| op.finish())
153                    .map_err(|e| DakeraError::Storage(e.to_string()))
154            }
155            ObjectStorageConfig::S3 {
156                bucket,
157                region,
158                endpoint,
159                access_key_id,
160                secret_access_key,
161            } => {
162                let mut builder = services::S3::default().bucket(bucket);
163
164                if let Some(region) = region {
165                    builder = builder.region(region);
166                }
167                if let Some(endpoint) = endpoint {
168                    builder = builder.endpoint(endpoint);
169                }
170                if let Some(key) = access_key_id {
171                    builder = builder.access_key_id(key);
172                }
173                if let Some(secret) = secret_access_key {
174                    builder = builder.secret_access_key(secret);
175                }
176
177                Operator::new(builder)
178                    .map(|op| op.finish())
179                    .map_err(|e| DakeraError::Storage(e.to_string()))
180            }
181            ObjectStorageConfig::Azure {
182                container,
183                account_name,
184                account_key,
185                sas_token,
186                endpoint,
187            } => {
188                let mut builder = services::Azblob::default()
189                    .container(container)
190                    .account_name(account_name);
191
192                if let Some(key) = account_key {
193                    builder = builder.account_key(key);
194                }
195                if let Some(token) = sas_token {
196                    builder = builder.sas_token(token);
197                }
198                if let Some(endpoint) = endpoint {
199                    builder = builder.endpoint(endpoint);
200                }
201
202                Operator::new(builder)
203                    .map(|op| op.finish())
204                    .map_err(|e| DakeraError::Storage(e.to_string()))
205            }
206            ObjectStorageConfig::Gcs {
207                bucket,
208                credential_path,
209                endpoint,
210            } => {
211                let mut builder = services::Gcs::default().bucket(bucket);
212
213                if let Some(cred_path) = credential_path {
214                    builder = builder.credential_path(cred_path);
215                }
216                if let Some(endpoint) = endpoint {
217                    builder = builder.endpoint(endpoint);
218                }
219
220                Operator::new(builder)
221                    .map(|op| op.finish())
222                    .map_err(|e| DakeraError::Storage(e.to_string()))
223            }
224        }
225    }
226
227    /// Get the path for a vector
228    fn vector_path(namespace: &str, vector_id: &str) -> String {
229        format!("namespaces/{}/vectors/{}.json", namespace, vector_id)
230    }
231
232    /// Get the path for namespace metadata
233    fn namespace_meta_path(namespace: &str) -> String {
234        format!("namespaces/{}/meta.json", namespace)
235    }
236
237    /// Get the path prefix for all vectors in a namespace
238    fn namespace_vectors_prefix(namespace: &str) -> String {
239        format!("namespaces/{}/vectors/", namespace)
240    }
241
242    /// Read namespace metadata
243    async fn read_namespace_meta(&self, namespace: &str) -> Result<Option<NamespaceMetadata>> {
244        let path = Self::namespace_meta_path(namespace);
245        match self.operator.read(&path).await {
246            Ok(data) => {
247                let bytes = data.to_vec();
248                if bytes.is_empty() {
249                    tracing::warn!(
250                        namespace = %namespace,
251                        path = %path,
252                        "Empty namespace metadata file detected, treating as missing"
253                    );
254                    return Ok(None);
255                }
256                match serde_json::from_slice(&bytes) {
257                    Ok(meta) => Ok(Some(meta)),
258                    Err(e) => {
259                        tracing::warn!(
260                            namespace = %namespace,
261                            path = %path,
262                            error = %e,
263                            bytes_len = bytes.len(),
264                            "Corrupted namespace metadata, treating as missing and will be recreated"
265                        );
266                        Ok(None)
267                    }
268                }
269            }
270            Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(None),
271            Err(e) => Err(DakeraError::Storage(e.to_string())),
272        }
273    }
274
275    /// Write namespace metadata
276    async fn write_namespace_meta(&self, namespace: &str, meta: &NamespaceMetadata) -> Result<()> {
277        let path = Self::namespace_meta_path(namespace);
278        let data = serde_json::to_vec(meta).map_err(|e| DakeraError::Storage(e.to_string()))?;
279        self.operator
280            .write(&path, data)
281            .await
282            .map_err(|e| DakeraError::Storage(e.to_string()))?;
283        Ok(())
284    }
285
286    /// Get current timestamp
287    fn now() -> u64 {
288        std::time::SystemTime::now()
289            .duration_since(std::time::UNIX_EPOCH)
290            .unwrap_or_default()
291            .as_secs()
292    }
293}
294
295#[async_trait]
296impl VectorStorage for ObjectStorage {
297    async fn upsert(&self, namespace: &NamespaceId, vectors: Vec<Vector>) -> Result<usize> {
298        if vectors.is_empty() {
299            return Ok(0);
300        }
301
302        // Get or create namespace metadata
303        let mut meta = self
304            .read_namespace_meta(namespace)
305            .await?
306            .unwrap_or_else(|| NamespaceMetadata {
307                dimension: None,
308                vector_count: 0,
309                created_at: Self::now(),
310                updated_at: Self::now(),
311            });
312
313        // Validate dimensions
314        let first_dim = vectors[0].values.len();
315        if let Some(dim) = meta.dimension {
316            for v in &vectors {
317                if v.values.len() != dim {
318                    return Err(DakeraError::DimensionMismatch {
319                        expected: dim,
320                        actual: v.values.len(),
321                    });
322                }
323            }
324        } else {
325            meta.dimension = Some(first_dim);
326        }
327
328        let mut upserted = 0;
329        for vector in vectors {
330            let path = Self::vector_path(namespace, &vector.id);
331            let stored: StoredVector = vector.into();
332            let data =
333                serde_json::to_vec(&stored).map_err(|e| DakeraError::Storage(e.to_string()))?;
334
335            // Check if this is an insert or update
336            let exists = self
337                .operator
338                .exists(&path)
339                .await
340                .map_err(|e| DakeraError::Storage(e.to_string()))?;
341
342            self.operator
343                .write(&path, data)
344                .await
345                .map_err(|e| DakeraError::Storage(e.to_string()))?;
346
347            if !exists {
348                meta.vector_count += 1;
349            }
350            upserted += 1;
351        }
352
353        meta.updated_at = Self::now();
354        self.write_namespace_meta(namespace, &meta).await?;
355
356        tracing::debug!(
357            namespace = namespace,
358            upserted = upserted,
359            "Upserted vectors to object storage"
360        );
361
362        Ok(upserted)
363    }
364
365    async fn get(&self, namespace: &NamespaceId, ids: &[VectorId]) -> Result<Vec<Vector>> {
366        let mut vectors = Vec::new();
367        let now = std::time::SystemTime::now()
368            .duration_since(std::time::UNIX_EPOCH)
369            .unwrap_or_default()
370            .as_secs();
371        for id in ids {
372            let path = Self::vector_path(namespace, id);
373            match self.operator.read(&path).await {
374                Ok(data) => {
375                    let bytes = data.to_vec();
376                    if bytes.is_empty() {
377                        tracing::warn!(
378                            namespace = %namespace,
379                            vector_id = %id,
380                            "Empty vector file detected, skipping"
381                        );
382                        continue;
383                    }
384                    match serde_json::from_slice::<StoredVector>(&bytes) {
385                        Ok(stored) => {
386                            let vector: Vector = stored.into();
387                            // Filter out expired vectors
388                            if !vector.is_expired_at(now) {
389                                vectors.push(vector);
390                            }
391                        }
392                        Err(e) => {
393                            tracing::warn!(
394                                namespace = %namespace,
395                                vector_id = %id,
396                                error = %e,
397                                bytes_len = bytes.len(),
398                                "Corrupted vector file detected, skipping"
399                            );
400                        }
401                    }
402                }
403                Err(e) if e.kind() == opendal::ErrorKind::NotFound => {
404                    // Skip missing vectors
405                }
406                Err(e) => return Err(DakeraError::Storage(e.to_string())),
407            }
408        }
409        Ok(vectors)
410    }
411
412    async fn get_all(&self, namespace: &NamespaceId) -> Result<Vec<Vector>> {
413        let prefix = Self::namespace_vectors_prefix(namespace);
414
415        let entries = self
416            .operator
417            .list(&prefix)
418            .await
419            .map_err(|e| DakeraError::Storage(e.to_string()))?;
420
421        let now = std::time::SystemTime::now()
422            .duration_since(std::time::UNIX_EPOCH)
423            .unwrap_or_default()
424            .as_secs();
425        let mut vectors = Vec::new();
426        for entry in entries {
427            let path = entry.path();
428            if path.ends_with(".json") {
429                match self.operator.read(path).await {
430                    Ok(data) => {
431                        let bytes = data.to_vec();
432                        if let Ok(stored) = serde_json::from_slice::<StoredVector>(&bytes) {
433                            let vector: Vector = stored.into();
434                            // Filter out expired vectors
435                            if !vector.is_expired_at(now) {
436                                vectors.push(vector);
437                            }
438                        }
439                    }
440                    Err(e) => {
441                        tracing::warn!(path = path, error = %e, "Failed to read vector");
442                    }
443                }
444            }
445        }
446
447        Ok(vectors)
448    }
449
450    async fn delete(&self, namespace: &NamespaceId, ids: &[VectorId]) -> Result<usize> {
451        let mut deleted = 0;
452
453        for id in ids {
454            let path = Self::vector_path(namespace, id);
455            // Check if the vector exists before deleting (OpenDAL delete is idempotent)
456            let exists = self
457                .operator
458                .exists(&path)
459                .await
460                .map_err(|e| DakeraError::Storage(e.to_string()))?;
461
462            if exists {
463                match self.operator.delete(&path).await {
464                    Ok(_) => deleted += 1,
465                    Err(e) if e.kind() == opendal::ErrorKind::NotFound => {}
466                    Err(e) => return Err(DakeraError::Storage(e.to_string())),
467                }
468            }
469        }
470
471        // Update metadata
472        if deleted > 0 {
473            if let Some(mut meta) = self.read_namespace_meta(namespace).await? {
474                meta.vector_count = meta.vector_count.saturating_sub(deleted);
475                meta.updated_at = Self::now();
476                self.write_namespace_meta(namespace, &meta).await?;
477            }
478        }
479
480        Ok(deleted)
481    }
482
483    async fn namespace_exists(&self, namespace: &NamespaceId) -> Result<bool> {
484        Ok(self.read_namespace_meta(namespace).await?.is_some())
485    }
486
487    async fn ensure_namespace(&self, namespace: &NamespaceId) -> Result<()> {
488        if self.read_namespace_meta(namespace).await?.is_none() {
489            let meta = NamespaceMetadata {
490                dimension: None,
491                vector_count: 0,
492                created_at: Self::now(),
493                updated_at: Self::now(),
494            };
495            self.write_namespace_meta(namespace, &meta).await?;
496        }
497        Ok(())
498    }
499
500    async fn dimension(&self, namespace: &NamespaceId) -> Result<Option<usize>> {
501        Ok(self
502            .read_namespace_meta(namespace)
503            .await?
504            .and_then(|m| m.dimension))
505    }
506
507    async fn count(&self, namespace: &NamespaceId) -> Result<usize> {
508        Ok(self
509            .read_namespace_meta(namespace)
510            .await?
511            .map(|m| m.vector_count)
512            .unwrap_or(0))
513    }
514
515    async fn list_namespaces(&self) -> Result<Vec<NamespaceId>> {
516        let entries = self
517            .operator
518            .list("namespaces/")
519            .await
520            .map_err(|e| DakeraError::Storage(e.to_string()))?;
521
522        let mut namespaces = Vec::new();
523        for entry in entries {
524            let path = entry.path();
525            // Extract namespace name from path like "namespaces/myns/"
526            if let Some(ns) = path.strip_prefix("namespaces/") {
527                let ns = ns.trim_end_matches('/');
528                if !ns.is_empty() && !ns.contains('/') {
529                    // Only include namespaces that actually have metadata (meta.json).
530                    // This filters out ghost directory entries left behind after
531                    // namespace deletion on backends where empty directories persist
532                    // (e.g., local filesystem, some S3-compatible stores).
533                    if self.read_namespace_meta(ns).await?.is_some() {
534                        namespaces.push(ns.to_string());
535                    }
536                }
537            }
538        }
539
540        Ok(namespaces)
541    }
542
543    async fn delete_namespace(&self, namespace: &NamespaceId) -> Result<bool> {
544        // Check if namespace exists
545        if !self.namespace_exists(namespace).await? {
546            return Ok(false);
547        }
548
549        // Recursively remove everything under the namespace prefix.
550        // This deletes vectors, metadata, indexes, AND directory entries,
551        // preventing ghost namespaces from appearing in list_namespaces().
552        let prefix = format!("namespaces/{}/", namespace);
553        self.operator
554            .remove_all(&prefix)
555            .await
556            .map_err(|e| DakeraError::Storage(e.to_string()))?;
557
558        tracing::debug!(
559            namespace = namespace,
560            "Deleted namespace from object storage"
561        );
562
563        Ok(true)
564    }
565
566    async fn cleanup_expired(&self, _namespace: &NamespaceId) -> Result<usize> {
567        // Object storage doesn't track TTL internally - cleanup handled by application layer
568        // StoredVector doesn't persist TTL fields, so nothing to clean up here
569        Ok(0)
570    }
571
572    async fn cleanup_all_expired(&self) -> Result<usize> {
573        // Object storage doesn't track TTL internally - cleanup handled by application layer
574        Ok(0)
575    }
576}
577
578#[async_trait]
579impl IndexStorage for ObjectStorage {
580    async fn save_index(
581        &self,
582        namespace: &NamespaceId,
583        index_type: IndexType,
584        data: Vec<u8>,
585    ) -> Result<()> {
586        let path = format!(
587            "namespaces/{}/indexes/{}.bin",
588            namespace,
589            index_type.as_str()
590        );
591        self.operator
592            .write(&path, data)
593            .await
594            .map_err(|e| DakeraError::Storage(e.to_string()))?;
595
596        tracing::debug!(
597            namespace = namespace,
598            index_type = index_type.as_str(),
599            "Saved index to object storage"
600        );
601        Ok(())
602    }
603
604    async fn load_index(
605        &self,
606        namespace: &NamespaceId,
607        index_type: IndexType,
608    ) -> Result<Option<Vec<u8>>> {
609        let path = format!(
610            "namespaces/{}/indexes/{}.bin",
611            namespace,
612            index_type.as_str()
613        );
614        match self.operator.read(&path).await {
615            Ok(data) => {
616                tracing::debug!(
617                    namespace = namespace,
618                    index_type = index_type.as_str(),
619                    size = data.len(),
620                    "Loaded index from object storage"
621                );
622                Ok(Some(data.to_vec()))
623            }
624            Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(None),
625            Err(e) => Err(DakeraError::Storage(e.to_string())),
626        }
627    }
628
629    async fn delete_index(&self, namespace: &NamespaceId, index_type: IndexType) -> Result<bool> {
630        let path = format!(
631            "namespaces/{}/indexes/{}.bin",
632            namespace,
633            index_type.as_str()
634        );
635        let exists = self
636            .operator
637            .exists(&path)
638            .await
639            .map_err(|e| DakeraError::Storage(e.to_string()))?;
640
641        if exists {
642            self.operator
643                .delete(&path)
644                .await
645                .map_err(|e| DakeraError::Storage(e.to_string()))?;
646            tracing::debug!(
647                namespace = namespace,
648                index_type = index_type.as_str(),
649                "Deleted index from object storage"
650            );
651        }
652        Ok(exists)
653    }
654
655    async fn index_exists(&self, namespace: &NamespaceId, index_type: IndexType) -> Result<bool> {
656        let path = format!(
657            "namespaces/{}/indexes/{}.bin",
658            namespace,
659            index_type.as_str()
660        );
661        self.operator
662            .exists(&path)
663            .await
664            .map_err(|e| DakeraError::Storage(e.to_string()))
665    }
666
667    async fn list_indexes(&self, namespace: &NamespaceId) -> Result<Vec<IndexType>> {
668        let prefix = format!("namespaces/{}/indexes/", namespace);
669        let entries = self
670            .operator
671            .list(&prefix)
672            .await
673            .map_err(|e| DakeraError::Storage(e.to_string()))?;
674
675        let mut indexes = Vec::new();
676        for entry in entries {
677            let path = entry.path();
678            if path.ends_with(".bin") {
679                // Extract index type from filename
680                if let Some(filename) = path.strip_prefix(&prefix) {
681                    let name = filename.trim_end_matches(".bin");
682                    match name {
683                        "hnsw" => indexes.push(IndexType::Hnsw),
684                        "pq" => indexes.push(IndexType::Pq),
685                        "ivf" => indexes.push(IndexType::Ivf),
686                        "spfresh" => indexes.push(IndexType::SpFresh),
687                        "fulltext" => indexes.push(IndexType::FullText),
688                        _ => {} // Ignore unknown index types
689                    }
690                }
691            }
692        }
693
694        Ok(indexes)
695    }
696}
697
698/// Create an OpenDAL operator from configuration without constructing a full ObjectStorage.
699/// Useful for lightweight S3 access (e.g., BackupManager metadata persistence).
700pub fn create_operator(config: &ObjectStorageConfig) -> Result<Operator> {
701    ObjectStorage::build_operator(config)
702}
703
704#[cfg(test)]
705mod tests {
706    use super::*;
707
708    #[tokio::test]
709    async fn test_object_storage_memory() {
710        let storage = ObjectStorage::memory().unwrap();
711        let namespace = "test".to_string();
712
713        // Ensure namespace
714        storage.ensure_namespace(&namespace).await.unwrap();
715        assert!(storage.namespace_exists(&namespace).await.unwrap());
716
717        // Insert vectors
718        let vectors = vec![
719            Vector {
720                id: "v1".to_string(),
721                values: vec![1.0, 2.0, 3.0],
722                metadata: None,
723                ttl_seconds: None,
724                expires_at: None,
725            },
726            Vector {
727                id: "v2".to_string(),
728                values: vec![4.0, 5.0, 6.0],
729                metadata: Some(serde_json::json!({"key": "value"})),
730                ttl_seconds: None,
731                expires_at: None,
732            },
733        ];
734
735        let count = storage.upsert(&namespace, vectors).await.unwrap();
736        assert_eq!(count, 2);
737
738        // Get single vector
739        let results = storage.get(&namespace, &["v1".to_string()]).await.unwrap();
740        assert_eq!(results.len(), 1);
741        assert_eq!(results[0].id, "v1");
742        assert_eq!(results[0].values, vec![1.0, 2.0, 3.0]);
743
744        // Get all vectors
745        let all = storage.get_all(&namespace).await.unwrap();
746        assert_eq!(all.len(), 2);
747
748        // Count
749        assert_eq!(storage.count(&namespace).await.unwrap(), 2);
750
751        // Delete
752        let deleted = storage
753            .delete(&namespace, &["v1".to_string()])
754            .await
755            .unwrap();
756        assert_eq!(deleted, 1);
757        assert!(storage
758            .get(&namespace, &["v1".to_string()])
759            .await
760            .unwrap()
761            .is_empty());
762        assert_eq!(storage.count(&namespace).await.unwrap(), 1);
763    }
764
765    #[tokio::test]
766    async fn test_object_storage_dimension_mismatch() {
767        let storage = ObjectStorage::memory().unwrap();
768        let namespace = "test".to_string();
769        storage.ensure_namespace(&namespace).await.unwrap();
770
771        // Insert first vector
772        let v1 = vec![Vector {
773            id: "v1".to_string(),
774            values: vec![1.0, 2.0, 3.0],
775            metadata: None,
776            ttl_seconds: None,
777            expires_at: None,
778        }];
779        storage.upsert(&namespace, v1).await.unwrap();
780
781        // Try to insert vector with different dimension
782        let v2 = vec![Vector {
783            id: "v2".to_string(),
784            values: vec![1.0, 2.0], // Wrong dimension
785            metadata: None,
786            ttl_seconds: None,
787            expires_at: None,
788        }];
789        let result = storage.upsert(&namespace, v2).await;
790        assert!(result.is_err());
791    }
792
793    #[tokio::test]
794    async fn test_object_storage_upsert() {
795        let storage = ObjectStorage::memory().unwrap();
796        let namespace = "test".to_string();
797        storage.ensure_namespace(&namespace).await.unwrap();
798
799        // Insert
800        let v1 = vec![Vector {
801            id: "v1".to_string(),
802            values: vec![1.0, 2.0],
803            metadata: None,
804            ttl_seconds: None,
805            expires_at: None,
806        }];
807        storage.upsert(&namespace, v1).await.unwrap();
808
809        // Upsert (update)
810        let v1_updated = vec![Vector {
811            id: "v1".to_string(),
812            values: vec![3.0, 4.0],
813            metadata: None,
814            ttl_seconds: None,
815            expires_at: None,
816        }];
817        storage.upsert(&namespace, v1_updated).await.unwrap();
818
819        // Verify update
820        let results = storage.get(&namespace, &["v1".to_string()]).await.unwrap();
821        assert_eq!(results.len(), 1);
822        assert_eq!(results[0].values, vec![3.0, 4.0]);
823
824        // Count should still be 1
825        assert_eq!(storage.count(&namespace).await.unwrap(), 1);
826    }
827
828    #[tokio::test]
829    async fn test_index_storage() {
830        let storage = ObjectStorage::memory().unwrap();
831        let namespace = "test_index".to_string();
832
833        // Initially no indexes
834        assert!(!storage
835            .index_exists(&namespace, IndexType::Hnsw)
836            .await
837            .unwrap());
838
839        // Save an index
840        let index_data = b"fake hnsw index data for testing".to_vec();
841        storage
842            .save_index(&namespace, IndexType::Hnsw, index_data.clone())
843            .await
844            .unwrap();
845
846        // Check it exists
847        assert!(storage
848            .index_exists(&namespace, IndexType::Hnsw)
849            .await
850            .unwrap());
851        assert!(!storage
852            .index_exists(&namespace, IndexType::Pq)
853            .await
854            .unwrap());
855
856        // Load it back
857        let loaded = storage
858            .load_index(&namespace, IndexType::Hnsw)
859            .await
860            .unwrap();
861        assert!(loaded.is_some());
862        assert_eq!(loaded.unwrap(), index_data);
863
864        // Save another index type
865        let pq_data = b"fake pq index data".to_vec();
866        storage
867            .save_index(&namespace, IndexType::Pq, pq_data)
868            .await
869            .unwrap();
870
871        // List indexes
872        let indexes = storage.list_indexes(&namespace).await.unwrap();
873        assert_eq!(indexes.len(), 2);
874        assert!(indexes.contains(&IndexType::Hnsw));
875        assert!(indexes.contains(&IndexType::Pq));
876
877        // Delete index
878        let deleted = storage
879            .delete_index(&namespace, IndexType::Hnsw)
880            .await
881            .unwrap();
882        assert!(deleted);
883        assert!(!storage
884            .index_exists(&namespace, IndexType::Hnsw)
885            .await
886            .unwrap());
887
888        // Delete non-existent
889        let deleted = storage
890            .delete_index(&namespace, IndexType::Hnsw)
891            .await
892            .unwrap();
893        assert!(!deleted);
894
895        // Load non-existent
896        let loaded = storage
897            .load_index(&namespace, IndexType::Hnsw)
898            .await
899            .unwrap();
900        assert!(loaded.is_none());
901    }
902}