Skip to main content

storage/
object.rs

1//! Object Storage Backend using OpenDAL
2//!
3//! Provides storage abstraction for multiple cloud storage backends:
4//! - S3-compatible (AWS S3, MinIO, DigitalOcean Spaces, etc.)
5//! - Azure Blob Storage
6//! - Google Cloud Storage (GCS)
7//! - Local filesystem
8//! - In-memory (for testing)
9
10use async_trait::async_trait;
11use common::{DakeraError, NamespaceId, Result, Vector, VectorId};
12use futures_util::stream::{self, StreamExt};
13use opendal::{services, Operator};
14use serde::{Deserialize, Serialize};
15
16use crate::traits::{IndexStorage, IndexType, VectorStorage};
17
18const S3_CONCURRENT_READS: usize = 32;
19
20/// Serializable namespace metadata
21#[derive(Debug, Clone, Serialize, Deserialize)]
22struct NamespaceMetadata {
23    dimension: Option<usize>,
24    vector_count: usize,
25    created_at: u64,
26    updated_at: u64,
27}
28
29/// Serializable vector data for storage
30#[derive(Debug, Clone, Serialize, Deserialize)]
31struct StoredVector {
32    id: String,
33    values: Vec<f32>,
34    metadata: Option<serde_json::Value>,
35}
36
37impl From<Vector> for StoredVector {
38    fn from(v: Vector) -> Self {
39        Self {
40            id: v.id,
41            values: v.values,
42            metadata: v.metadata,
43        }
44    }
45}
46
47impl From<StoredVector> for Vector {
48    fn from(v: StoredVector) -> Self {
49        Self {
50            id: v.id,
51            values: v.values,
52            metadata: v.metadata,
53            ttl_seconds: None,
54            expires_at: None,
55        }
56    }
57}
58
59/// Object storage backend configuration
60#[derive(Debug, Clone, Default)]
61pub enum ObjectStorageConfig {
62    /// In-memory storage (for testing)
63    #[default]
64    Memory,
65    /// Local filesystem storage
66    Filesystem { root: String },
67    /// S3-compatible storage (S3, MinIO, etc.)
68    S3 {
69        bucket: String,
70        region: Option<String>,
71        endpoint: Option<String>,
72        access_key_id: Option<String>,
73        secret_access_key: Option<String>,
74    },
75    /// Azure Blob Storage
76    Azure {
77        container: String,
78        account_name: String,
79        account_key: Option<String>,
80        sas_token: Option<String>,
81        endpoint: Option<String>,
82    },
83    /// Google Cloud Storage
84    Gcs {
85        bucket: String,
86        credential_path: Option<String>,
87        endpoint: Option<String>,
88    },
89}
90
91/// Object storage backend using OpenDAL
92#[derive(Clone)]
93pub struct ObjectStorage {
94    operator: Operator,
95}
96
97impl ObjectStorage {
98    /// Create a new object storage with the given configuration
99    pub fn new(config: ObjectStorageConfig) -> Result<Self> {
100        let operator = Self::build_operator(&config)?;
101        Ok(Self { operator })
102    }
103
104    /// Create in-memory storage (for testing)
105    pub fn memory() -> Result<Self> {
106        Self::new(ObjectStorageConfig::Memory)
107    }
108
109    /// Create filesystem storage
110    pub fn filesystem(root: impl Into<String>) -> Result<Self> {
111        Self::new(ObjectStorageConfig::Filesystem { root: root.into() })
112    }
113
114    /// Create S3 storage
115    pub fn s3(bucket: impl Into<String>) -> Result<Self> {
116        Self::new(ObjectStorageConfig::S3 {
117            bucket: bucket.into(),
118            region: None,
119            endpoint: None,
120            access_key_id: None,
121            secret_access_key: None,
122        })
123    }
124
125    /// Create Azure Blob storage
126    pub fn azure(container: impl Into<String>, account_name: impl Into<String>) -> Result<Self> {
127        Self::new(ObjectStorageConfig::Azure {
128            container: container.into(),
129            account_name: account_name.into(),
130            account_key: None,
131            sas_token: None,
132            endpoint: None,
133        })
134    }
135
136    /// Create Google Cloud Storage
137    pub fn gcs(bucket: impl Into<String>) -> Result<Self> {
138        Self::new(ObjectStorageConfig::Gcs {
139            bucket: bucket.into(),
140            credential_path: None,
141            endpoint: None,
142        })
143    }
144
145    pub fn build_operator(config: &ObjectStorageConfig) -> Result<Operator> {
146        match config {
147            ObjectStorageConfig::Memory => {
148                let builder = services::Memory::default();
149                Operator::new(builder)
150                    .map(|op| op.finish())
151                    .map_err(|e| DakeraError::Storage(e.to_string()))
152            }
153            ObjectStorageConfig::Filesystem { root } => {
154                let builder = services::Fs::default().root(root);
155                Operator::new(builder)
156                    .map(|op| op.finish())
157                    .map_err(|e| DakeraError::Storage(e.to_string()))
158            }
159            ObjectStorageConfig::S3 {
160                bucket,
161                region,
162                endpoint,
163                access_key_id,
164                secret_access_key,
165            } => {
166                let mut builder = services::S3::default().bucket(bucket);
167
168                if let Some(region) = region {
169                    builder = builder.region(region);
170                }
171                if let Some(endpoint) = endpoint {
172                    builder = builder.endpoint(endpoint);
173                }
174                if let Some(key) = access_key_id {
175                    builder = builder.access_key_id(key);
176                }
177                if let Some(secret) = secret_access_key {
178                    builder = builder.secret_access_key(secret);
179                }
180
181                Operator::new(builder)
182                    .map(|op| op.finish())
183                    .map_err(|e| DakeraError::Storage(e.to_string()))
184            }
185            ObjectStorageConfig::Azure {
186                container,
187                account_name,
188                account_key,
189                sas_token,
190                endpoint,
191            } => {
192                let mut builder = services::Azblob::default()
193                    .container(container)
194                    .account_name(account_name);
195
196                if let Some(key) = account_key {
197                    builder = builder.account_key(key);
198                }
199                if let Some(token) = sas_token {
200                    builder = builder.sas_token(token);
201                }
202                if let Some(endpoint) = endpoint {
203                    builder = builder.endpoint(endpoint);
204                }
205
206                Operator::new(builder)
207                    .map(|op| op.finish())
208                    .map_err(|e| DakeraError::Storage(e.to_string()))
209            }
210            ObjectStorageConfig::Gcs {
211                bucket,
212                credential_path,
213                endpoint,
214            } => {
215                let mut builder = services::Gcs::default().bucket(bucket);
216
217                if let Some(cred_path) = credential_path {
218                    builder = builder.credential_path(cred_path);
219                }
220                if let Some(endpoint) = endpoint {
221                    builder = builder.endpoint(endpoint);
222                }
223
224                Operator::new(builder)
225                    .map(|op| op.finish())
226                    .map_err(|e| DakeraError::Storage(e.to_string()))
227            }
228        }
229    }
230
231    /// Get the path for a vector
232    fn vector_path(namespace: &str, vector_id: &str) -> String {
233        format!("namespaces/{}/vectors/{}.json", namespace, vector_id)
234    }
235
236    /// Get the path for namespace metadata
237    fn namespace_meta_path(namespace: &str) -> String {
238        format!("namespaces/{}/meta.json", namespace)
239    }
240
241    /// Get the path prefix for all vectors in a namespace
242    fn namespace_vectors_prefix(namespace: &str) -> String {
243        format!("namespaces/{}/vectors/", namespace)
244    }
245
246    /// Read namespace metadata
247    async fn read_namespace_meta(&self, namespace: &str) -> Result<Option<NamespaceMetadata>> {
248        let path = Self::namespace_meta_path(namespace);
249        match self.operator.read(&path).await {
250            Ok(data) => {
251                let bytes = data.to_vec();
252                if bytes.is_empty() {
253                    tracing::warn!(
254                        namespace = %namespace,
255                        path = %path,
256                        "Empty namespace metadata file detected, treating as missing"
257                    );
258                    return Ok(None);
259                }
260                match serde_json::from_slice(&bytes) {
261                    Ok(meta) => Ok(Some(meta)),
262                    Err(e) => {
263                        tracing::warn!(
264                            namespace = %namespace,
265                            path = %path,
266                            error = %e,
267                            bytes_len = bytes.len(),
268                            "Corrupted namespace metadata, treating as missing and will be recreated"
269                        );
270                        Ok(None)
271                    }
272                }
273            }
274            Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(None),
275            Err(e) => Err(DakeraError::Storage(e.to_string())),
276        }
277    }
278
279    /// Write namespace metadata
280    async fn write_namespace_meta(&self, namespace: &str, meta: &NamespaceMetadata) -> Result<()> {
281        let path = Self::namespace_meta_path(namespace);
282        let data = serde_json::to_vec(meta).map_err(|e| DakeraError::Storage(e.to_string()))?;
283        self.operator
284            .write(&path, data)
285            .await
286            .map_err(|e| DakeraError::Storage(e.to_string()))?;
287        Ok(())
288    }
289
290    /// Get current timestamp
291    fn now() -> u64 {
292        std::time::SystemTime::now()
293            .duration_since(std::time::UNIX_EPOCH)
294            .unwrap_or_default()
295            .as_secs()
296    }
297}
298
299#[async_trait]
300impl VectorStorage for ObjectStorage {
301    async fn upsert(&self, namespace: &NamespaceId, vectors: Vec<Vector>) -> Result<usize> {
302        if vectors.is_empty() {
303            return Ok(0);
304        }
305
306        // Get or create namespace metadata
307        let mut meta = self
308            .read_namespace_meta(namespace)
309            .await?
310            .unwrap_or_else(|| NamespaceMetadata {
311                dimension: None,
312                vector_count: 0,
313                created_at: Self::now(),
314                updated_at: Self::now(),
315            });
316
317        // Validate dimensions
318        let first_dim = vectors[0].values.len();
319        if let Some(dim) = meta.dimension {
320            for v in &vectors {
321                if v.values.len() != dim {
322                    return Err(DakeraError::DimensionMismatch {
323                        expected: dim,
324                        actual: v.values.len(),
325                    });
326                }
327            }
328        } else {
329            meta.dimension = Some(first_dim);
330        }
331
332        let mut upserted = 0;
333        for vector in vectors {
334            let path = Self::vector_path(namespace, &vector.id);
335            let stored: StoredVector = vector.into();
336            let data =
337                serde_json::to_vec(&stored).map_err(|e| DakeraError::Storage(e.to_string()))?;
338
339            // Check if this is an insert or update
340            let exists = self
341                .operator
342                .exists(&path)
343                .await
344                .map_err(|e| DakeraError::Storage(e.to_string()))?;
345
346            self.operator
347                .write(&path, data)
348                .await
349                .map_err(|e| DakeraError::Storage(e.to_string()))?;
350
351            if !exists {
352                meta.vector_count += 1;
353            }
354            upserted += 1;
355        }
356
357        meta.updated_at = Self::now();
358        self.write_namespace_meta(namespace, &meta).await?;
359
360        tracing::debug!(
361            namespace = namespace,
362            upserted = upserted,
363            "Upserted vectors to object storage"
364        );
365
366        Ok(upserted)
367    }
368
369    async fn get(&self, namespace: &NamespaceId, ids: &[VectorId]) -> Result<Vec<Vector>> {
370        if ids.is_empty() {
371            return Ok(Vec::new());
372        }
373
374        let now = std::time::SystemTime::now()
375            .duration_since(std::time::UNIX_EPOCH)
376            .unwrap_or_default()
377            .as_secs();
378
379        let read_tasks: Vec<_> = ids
380            .iter()
381            .map(|id| {
382                let operator = self.operator.clone();
383                let path = Self::vector_path(namespace, id);
384                let id = id.clone();
385                async move {
386                    match operator.read(&path).await {
387                        Ok(data) => {
388                            let bytes = data.to_vec();
389                            if bytes.is_empty() {
390                                tracing::warn!(
391                                    vector_id = %id,
392                                    "Empty vector file detected, skipping"
393                                );
394                                return Ok(None);
395                            }
396                            match serde_json::from_slice::<StoredVector>(&bytes) {
397                                Ok(stored) => {
398                                    let vector: Vector = stored.into();
399                                    if !vector.is_expired_at(now) {
400                                        Ok(Some(vector))
401                                    } else {
402                                        Ok(None)
403                                    }
404                                }
405                                Err(e) => {
406                                    tracing::warn!(
407                                        vector_id = %id,
408                                        error = %e,
409                                        bytes_len = bytes.len(),
410                                        "Corrupted vector file detected, skipping"
411                                    );
412                                    Ok(None)
413                                }
414                            }
415                        }
416                        Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(None),
417                        Err(e) => Err(DakeraError::Storage(e.to_string())),
418                    }
419                }
420            })
421            .collect();
422
423        let results: Vec<Result<Option<Vector>>> = stream::iter(read_tasks)
424            .buffer_unordered(S3_CONCURRENT_READS)
425            .collect()
426            .await;
427
428        let mut vectors = Vec::with_capacity(ids.len());
429        for result in results {
430            if let Some(v) = result? {
431                vectors.push(v);
432            }
433        }
434        Ok(vectors)
435    }
436
437    async fn get_all(&self, namespace: &NamespaceId) -> Result<Vec<Vector>> {
438        let prefix = Self::namespace_vectors_prefix(namespace);
439
440        let entries = self
441            .operator
442            .list(&prefix)
443            .await
444            .map_err(|e| DakeraError::Storage(e.to_string()))?;
445
446        let json_paths: Vec<String> = entries
447            .into_iter()
448            .filter(|e| e.path().ends_with(".json"))
449            .map(|e| e.path().to_string())
450            .collect();
451
452        if json_paths.is_empty() {
453            return Ok(Vec::new());
454        }
455
456        let now = std::time::SystemTime::now()
457            .duration_since(std::time::UNIX_EPOCH)
458            .unwrap_or_default()
459            .as_secs();
460
461        let results: Vec<Option<Vector>> = stream::iter(json_paths.into_iter().map(|path| {
462            let operator = self.operator.clone();
463            async move {
464                match operator.read(&path).await {
465                    Ok(data) => {
466                        let bytes = data.to_vec();
467                        if let Ok(stored) = serde_json::from_slice::<StoredVector>(&bytes) {
468                            let vector: Vector = stored.into();
469                            if !vector.is_expired_at(now) {
470                                return Some(vector);
471                            }
472                        }
473                        None
474                    }
475                    Err(e) => {
476                        tracing::warn!(path = %path, error = %e, "Failed to read vector");
477                        None
478                    }
479                }
480            }
481        }))
482        .buffer_unordered(S3_CONCURRENT_READS)
483        .collect()
484        .await;
485
486        Ok(results.into_iter().flatten().collect())
487    }
488
489    async fn delete(&self, namespace: &NamespaceId, ids: &[VectorId]) -> Result<usize> {
490        if ids.is_empty() {
491            return Ok(0);
492        }
493
494        let delete_tasks: Vec<_> = ids
495            .iter()
496            .map(|id| {
497                let operator = self.operator.clone();
498                let path = Self::vector_path(namespace, id);
499                async move {
500                    let exists = operator
501                        .exists(&path)
502                        .await
503                        .map_err(|e| DakeraError::Storage(e.to_string()))?;
504                    if exists {
505                        match operator.delete(&path).await {
506                            Ok(_) => Ok(true),
507                            Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(false),
508                            Err(e) => Err(DakeraError::Storage(e.to_string())),
509                        }
510                    } else {
511                        Ok(false)
512                    }
513                }
514            })
515            .collect();
516
517        let results: Vec<Result<bool>> = stream::iter(delete_tasks)
518            .buffer_unordered(S3_CONCURRENT_READS)
519            .collect()
520            .await;
521
522        let mut deleted = 0;
523        for result in results {
524            if result? {
525                deleted += 1;
526            }
527        }
528
529        // Update metadata
530        if deleted > 0 {
531            if let Some(mut meta) = self.read_namespace_meta(namespace).await? {
532                meta.vector_count = meta.vector_count.saturating_sub(deleted);
533                meta.updated_at = Self::now();
534                self.write_namespace_meta(namespace, &meta).await?;
535            }
536        }
537
538        Ok(deleted)
539    }
540
541    async fn namespace_exists(&self, namespace: &NamespaceId) -> Result<bool> {
542        Ok(self.read_namespace_meta(namespace).await?.is_some())
543    }
544
545    async fn ensure_namespace(&self, namespace: &NamespaceId) -> Result<()> {
546        if self.read_namespace_meta(namespace).await?.is_none() {
547            let meta = NamespaceMetadata {
548                dimension: None,
549                vector_count: 0,
550                created_at: Self::now(),
551                updated_at: Self::now(),
552            };
553            self.write_namespace_meta(namespace, &meta).await?;
554        }
555        Ok(())
556    }
557
558    async fn dimension(&self, namespace: &NamespaceId) -> Result<Option<usize>> {
559        Ok(self
560            .read_namespace_meta(namespace)
561            .await?
562            .and_then(|m| m.dimension))
563    }
564
565    async fn count(&self, namespace: &NamespaceId) -> Result<usize> {
566        Ok(self
567            .read_namespace_meta(namespace)
568            .await?
569            .map(|m| m.vector_count)
570            .unwrap_or(0))
571    }
572
573    async fn list_namespaces(&self) -> Result<Vec<NamespaceId>> {
574        let entries = self
575            .operator
576            .list("namespaces/")
577            .await
578            .map_err(|e| DakeraError::Storage(e.to_string()))?;
579
580        let mut namespaces = Vec::new();
581        for entry in entries {
582            let path = entry.path();
583            // Extract namespace name from path like "namespaces/myns/"
584            if let Some(ns) = path.strip_prefix("namespaces/") {
585                let ns = ns.trim_end_matches('/');
586                if !ns.is_empty() && !ns.contains('/') {
587                    // Only include namespaces that actually have metadata (meta.json).
588                    // This filters out ghost directory entries left behind after
589                    // namespace deletion on backends where empty directories persist
590                    // (e.g., local filesystem, some S3-compatible stores).
591                    if self.read_namespace_meta(ns).await?.is_some() {
592                        namespaces.push(ns.to_string());
593                    }
594                }
595            }
596        }
597
598        Ok(namespaces)
599    }
600
601    async fn delete_namespace(&self, namespace: &NamespaceId) -> Result<bool> {
602        // Check if namespace exists
603        if !self.namespace_exists(namespace).await? {
604            return Ok(false);
605        }
606
607        // Recursively remove everything under the namespace prefix.
608        // This deletes vectors, metadata, indexes, AND directory entries,
609        // preventing ghost namespaces from appearing in list_namespaces().
610        let prefix = format!("namespaces/{}/", namespace);
611        self.operator
612            .remove_all(&prefix)
613            .await
614            .map_err(|e| DakeraError::Storage(e.to_string()))?;
615
616        tracing::debug!(
617            namespace = namespace,
618            "Deleted namespace from object storage"
619        );
620
621        Ok(true)
622    }
623
624    async fn cleanup_expired(&self, _namespace: &NamespaceId) -> Result<usize> {
625        // Object storage doesn't track TTL internally - cleanup handled by application layer
626        // StoredVector doesn't persist TTL fields, so nothing to clean up here
627        Ok(0)
628    }
629
630    async fn cleanup_all_expired(&self) -> Result<usize> {
631        // Object storage doesn't track TTL internally - cleanup handled by application layer
632        Ok(0)
633    }
634}
635
636#[async_trait]
637impl IndexStorage for ObjectStorage {
638    async fn save_index(
639        &self,
640        namespace: &NamespaceId,
641        index_type: IndexType,
642        data: Vec<u8>,
643    ) -> Result<()> {
644        let path = format!(
645            "namespaces/{}/indexes/{}.bin",
646            namespace,
647            index_type.as_str()
648        );
649        self.operator
650            .write(&path, data)
651            .await
652            .map_err(|e| DakeraError::Storage(e.to_string()))?;
653
654        tracing::debug!(
655            namespace = namespace,
656            index_type = index_type.as_str(),
657            "Saved index to object storage"
658        );
659        Ok(())
660    }
661
662    async fn load_index(
663        &self,
664        namespace: &NamespaceId,
665        index_type: IndexType,
666    ) -> Result<Option<Vec<u8>>> {
667        let path = format!(
668            "namespaces/{}/indexes/{}.bin",
669            namespace,
670            index_type.as_str()
671        );
672        match self.operator.read(&path).await {
673            Ok(data) => {
674                tracing::debug!(
675                    namespace = namespace,
676                    index_type = index_type.as_str(),
677                    size = data.len(),
678                    "Loaded index from object storage"
679                );
680                Ok(Some(data.to_vec()))
681            }
682            Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(None),
683            Err(e) => Err(DakeraError::Storage(e.to_string())),
684        }
685    }
686
687    async fn delete_index(&self, namespace: &NamespaceId, index_type: IndexType) -> Result<bool> {
688        let path = format!(
689            "namespaces/{}/indexes/{}.bin",
690            namespace,
691            index_type.as_str()
692        );
693        let exists = self
694            .operator
695            .exists(&path)
696            .await
697            .map_err(|e| DakeraError::Storage(e.to_string()))?;
698
699        if exists {
700            self.operator
701                .delete(&path)
702                .await
703                .map_err(|e| DakeraError::Storage(e.to_string()))?;
704            tracing::debug!(
705                namespace = namespace,
706                index_type = index_type.as_str(),
707                "Deleted index from object storage"
708            );
709        }
710        Ok(exists)
711    }
712
713    async fn index_exists(&self, namespace: &NamespaceId, index_type: IndexType) -> Result<bool> {
714        let path = format!(
715            "namespaces/{}/indexes/{}.bin",
716            namespace,
717            index_type.as_str()
718        );
719        self.operator
720            .exists(&path)
721            .await
722            .map_err(|e| DakeraError::Storage(e.to_string()))
723    }
724
725    async fn list_indexes(&self, namespace: &NamespaceId) -> Result<Vec<IndexType>> {
726        let prefix = format!("namespaces/{}/indexes/", namespace);
727        let entries = self
728            .operator
729            .list(&prefix)
730            .await
731            .map_err(|e| DakeraError::Storage(e.to_string()))?;
732
733        let mut indexes = Vec::new();
734        for entry in entries {
735            let path = entry.path();
736            if path.ends_with(".bin") {
737                // Extract index type from filename
738                if let Some(filename) = path.strip_prefix(&prefix) {
739                    let name = filename.trim_end_matches(".bin");
740                    match name {
741                        "hnsw" => indexes.push(IndexType::Hnsw),
742                        "pq" => indexes.push(IndexType::Pq),
743                        "ivf" => indexes.push(IndexType::Ivf),
744                        "spfresh" => indexes.push(IndexType::SpFresh),
745                        "fulltext" => indexes.push(IndexType::FullText),
746                        _ => {} // Ignore unknown index types
747                    }
748                }
749            }
750        }
751
752        Ok(indexes)
753    }
754}
755
756/// Create an OpenDAL operator from configuration without constructing a full ObjectStorage.
757/// Useful for lightweight S3 access (e.g., BackupManager metadata persistence).
758pub fn create_operator(config: &ObjectStorageConfig) -> Result<Operator> {
759    ObjectStorage::build_operator(config)
760}
761
762#[cfg(test)]
763mod tests {
764    use super::*;
765
766    #[tokio::test]
767    async fn test_object_storage_memory() {
768        let storage = ObjectStorage::memory().unwrap();
769        let namespace = "test".to_string();
770
771        // Ensure namespace
772        storage.ensure_namespace(&namespace).await.unwrap();
773        assert!(storage.namespace_exists(&namespace).await.unwrap());
774
775        // Insert vectors
776        let vectors = vec![
777            Vector {
778                id: "v1".to_string(),
779                values: vec![1.0, 2.0, 3.0],
780                metadata: None,
781                ttl_seconds: None,
782                expires_at: None,
783            },
784            Vector {
785                id: "v2".to_string(),
786                values: vec![4.0, 5.0, 6.0],
787                metadata: Some(serde_json::json!({"key": "value"})),
788                ttl_seconds: None,
789                expires_at: None,
790            },
791        ];
792
793        let count = storage.upsert(&namespace, vectors).await.unwrap();
794        assert_eq!(count, 2);
795
796        // Get single vector
797        let results = storage.get(&namespace, &["v1".to_string()]).await.unwrap();
798        assert_eq!(results.len(), 1);
799        assert_eq!(results[0].id, "v1");
800        assert_eq!(results[0].values, vec![1.0, 2.0, 3.0]);
801
802        // Get all vectors
803        let all = storage.get_all(&namespace).await.unwrap();
804        assert_eq!(all.len(), 2);
805
806        // Count
807        assert_eq!(storage.count(&namespace).await.unwrap(), 2);
808
809        // Delete
810        let deleted = storage
811            .delete(&namespace, &["v1".to_string()])
812            .await
813            .unwrap();
814        assert_eq!(deleted, 1);
815        assert!(storage
816            .get(&namespace, &["v1".to_string()])
817            .await
818            .unwrap()
819            .is_empty());
820        assert_eq!(storage.count(&namespace).await.unwrap(), 1);
821    }
822
823    #[tokio::test]
824    async fn test_object_storage_dimension_mismatch() {
825        let storage = ObjectStorage::memory().unwrap();
826        let namespace = "test".to_string();
827        storage.ensure_namespace(&namespace).await.unwrap();
828
829        // Insert first vector
830        let v1 = vec![Vector {
831            id: "v1".to_string(),
832            values: vec![1.0, 2.0, 3.0],
833            metadata: None,
834            ttl_seconds: None,
835            expires_at: None,
836        }];
837        storage.upsert(&namespace, v1).await.unwrap();
838
839        // Try to insert vector with different dimension
840        let v2 = vec![Vector {
841            id: "v2".to_string(),
842            values: vec![1.0, 2.0], // Wrong dimension
843            metadata: None,
844            ttl_seconds: None,
845            expires_at: None,
846        }];
847        let result = storage.upsert(&namespace, v2).await;
848        assert!(result.is_err());
849    }
850
851    #[tokio::test]
852    async fn test_object_storage_upsert() {
853        let storage = ObjectStorage::memory().unwrap();
854        let namespace = "test".to_string();
855        storage.ensure_namespace(&namespace).await.unwrap();
856
857        // Insert
858        let v1 = vec![Vector {
859            id: "v1".to_string(),
860            values: vec![1.0, 2.0],
861            metadata: None,
862            ttl_seconds: None,
863            expires_at: None,
864        }];
865        storage.upsert(&namespace, v1).await.unwrap();
866
867        // Upsert (update)
868        let v1_updated = vec![Vector {
869            id: "v1".to_string(),
870            values: vec![3.0, 4.0],
871            metadata: None,
872            ttl_seconds: None,
873            expires_at: None,
874        }];
875        storage.upsert(&namespace, v1_updated).await.unwrap();
876
877        // Verify update
878        let results = storage.get(&namespace, &["v1".to_string()]).await.unwrap();
879        assert_eq!(results.len(), 1);
880        assert_eq!(results[0].values, vec![3.0, 4.0]);
881
882        // Count should still be 1
883        assert_eq!(storage.count(&namespace).await.unwrap(), 1);
884    }
885
886    #[tokio::test]
887    async fn test_index_storage() {
888        let storage = ObjectStorage::memory().unwrap();
889        let namespace = "test_index".to_string();
890
891        // Initially no indexes
892        assert!(!storage
893            .index_exists(&namespace, IndexType::Hnsw)
894            .await
895            .unwrap());
896
897        // Save an index
898        let index_data = b"fake hnsw index data for testing".to_vec();
899        storage
900            .save_index(&namespace, IndexType::Hnsw, index_data.clone())
901            .await
902            .unwrap();
903
904        // Check it exists
905        assert!(storage
906            .index_exists(&namespace, IndexType::Hnsw)
907            .await
908            .unwrap());
909        assert!(!storage
910            .index_exists(&namespace, IndexType::Pq)
911            .await
912            .unwrap());
913
914        // Load it back
915        let loaded = storage
916            .load_index(&namespace, IndexType::Hnsw)
917            .await
918            .unwrap();
919        assert!(loaded.is_some());
920        assert_eq!(loaded.unwrap(), index_data);
921
922        // Save another index type
923        let pq_data = b"fake pq index data".to_vec();
924        storage
925            .save_index(&namespace, IndexType::Pq, pq_data)
926            .await
927            .unwrap();
928
929        // List indexes
930        let indexes = storage.list_indexes(&namespace).await.unwrap();
931        assert_eq!(indexes.len(), 2);
932        assert!(indexes.contains(&IndexType::Hnsw));
933        assert!(indexes.contains(&IndexType::Pq));
934
935        // Delete index
936        let deleted = storage
937            .delete_index(&namespace, IndexType::Hnsw)
938            .await
939            .unwrap();
940        assert!(deleted);
941        assert!(!storage
942            .index_exists(&namespace, IndexType::Hnsw)
943            .await
944            .unwrap());
945
946        // Delete non-existent
947        let deleted = storage
948            .delete_index(&namespace, IndexType::Hnsw)
949            .await
950            .unwrap();
951        assert!(!deleted);
952
953        // Load non-existent
954        let loaded = storage
955            .load_index(&namespace, IndexType::Hnsw)
956            .await
957            .unwrap();
958        assert!(loaded.is_none());
959    }
960}