Skip to main content

storage/
object.rs

1//! Object Storage Backend using OpenDAL
2//!
3//! Provides storage abstraction for multiple cloud storage backends:
4//! - S3-compatible (AWS S3, MinIO, DigitalOcean Spaces, etc.)
5//! - Azure Blob Storage
6//! - Google Cloud Storage (GCS)
7//! - Local filesystem
8//! - In-memory (for testing)
9
10use async_trait::async_trait;
11use common::{DakeraError, NamespaceId, Result, Vector, VectorId};
12use futures_util::stream::{self, StreamExt};
13use opendal::{layers::RetryLayer, services, Operator};
14use serde::{Deserialize, Serialize};
15use std::time::Duration;
16
17use crate::traits::{IndexStorage, IndexType, VectorStorage};
18
19fn s3_concurrent_ops() -> usize {
20    std::env::var("DAKERA_S3_CONCURRENT_OPS")
21        .ok()
22        .and_then(|v| v.parse().ok())
23        .unwrap_or(16)
24}
25
26/// Serializable namespace metadata
27#[derive(Debug, Clone, Serialize, Deserialize)]
28struct NamespaceMetadata {
29    dimension: Option<usize>,
30    vector_count: usize,
31    created_at: u64,
32    updated_at: u64,
33}
34
35/// Serializable vector data for storage
36#[derive(Debug, Clone, Serialize, Deserialize)]
37struct StoredVector {
38    id: String,
39    values: Vec<f32>,
40    metadata: Option<serde_json::Value>,
41}
42
43impl From<Vector> for StoredVector {
44    fn from(v: Vector) -> Self {
45        Self {
46            id: v.id,
47            values: v.values,
48            metadata: v.metadata,
49        }
50    }
51}
52
53impl From<StoredVector> for Vector {
54    fn from(v: StoredVector) -> Self {
55        Self {
56            id: v.id,
57            values: v.values,
58            metadata: v.metadata,
59            ttl_seconds: None,
60            expires_at: None,
61        }
62    }
63}
64
65/// Object storage backend configuration
66#[derive(Debug, Clone, Default)]
67pub enum ObjectStorageConfig {
68    /// In-memory storage (for testing)
69    #[default]
70    Memory,
71    /// Local filesystem storage
72    Filesystem { root: String },
73    /// S3-compatible storage (S3, MinIO, etc.)
74    S3 {
75        bucket: String,
76        region: Option<String>,
77        endpoint: Option<String>,
78        access_key_id: Option<String>,
79        secret_access_key: Option<String>,
80    },
81    /// Azure Blob Storage
82    Azure {
83        container: String,
84        account_name: String,
85        account_key: Option<String>,
86        sas_token: Option<String>,
87        endpoint: Option<String>,
88    },
89    /// Google Cloud Storage
90    Gcs {
91        bucket: String,
92        credential_path: Option<String>,
93        endpoint: Option<String>,
94    },
95}
96
97/// Object storage backend using OpenDAL
98#[derive(Clone)]
99pub struct ObjectStorage {
100    operator: Operator,
101}
102
103impl ObjectStorage {
104    /// Create a new object storage with the given configuration
105    pub fn new(config: ObjectStorageConfig) -> Result<Self> {
106        let operator = Self::build_operator(&config)?;
107        Ok(Self { operator })
108    }
109
110    /// Create in-memory storage (for testing)
111    pub fn memory() -> Result<Self> {
112        Self::new(ObjectStorageConfig::Memory)
113    }
114
115    /// Create filesystem storage
116    pub fn filesystem(root: impl Into<String>) -> Result<Self> {
117        Self::new(ObjectStorageConfig::Filesystem { root: root.into() })
118    }
119
120    /// Create S3 storage
121    pub fn s3(bucket: impl Into<String>) -> Result<Self> {
122        Self::new(ObjectStorageConfig::S3 {
123            bucket: bucket.into(),
124            region: None,
125            endpoint: None,
126            access_key_id: None,
127            secret_access_key: None,
128        })
129    }
130
131    /// Create Azure Blob storage
132    pub fn azure(container: impl Into<String>, account_name: impl Into<String>) -> Result<Self> {
133        Self::new(ObjectStorageConfig::Azure {
134            container: container.into(),
135            account_name: account_name.into(),
136            account_key: None,
137            sas_token: None,
138            endpoint: None,
139        })
140    }
141
142    /// Create Google Cloud Storage
143    pub fn gcs(bucket: impl Into<String>) -> Result<Self> {
144        Self::new(ObjectStorageConfig::Gcs {
145            bucket: bucket.into(),
146            credential_path: None,
147            endpoint: None,
148        })
149    }
150
151    pub fn build_operator(config: &ObjectStorageConfig) -> Result<Operator> {
152        match config {
153            ObjectStorageConfig::Memory => {
154                let builder = services::Memory::default();
155                Operator::new(builder)
156                    .map(|op| op.finish())
157                    .map_err(|e| DakeraError::Storage(e.to_string()))
158            }
159            ObjectStorageConfig::Filesystem { root } => {
160                let builder = services::Fs::default().root(root);
161                Operator::new(builder)
162                    .map(|op| op.layer(Self::retry_layer()).finish())
163                    .map_err(|e| DakeraError::Storage(e.to_string()))
164            }
165            ObjectStorageConfig::S3 {
166                bucket,
167                region,
168                endpoint,
169                access_key_id,
170                secret_access_key,
171            } => {
172                let mut builder = services::S3::default().bucket(bucket);
173
174                if let Some(region) = region {
175                    builder = builder.region(region);
176                }
177                if let Some(endpoint) = endpoint {
178                    builder = builder.endpoint(endpoint);
179                }
180                if let Some(key) = access_key_id {
181                    builder = builder.access_key_id(key);
182                }
183                if let Some(secret) = secret_access_key {
184                    builder = builder.secret_access_key(secret);
185                }
186
187                Operator::new(builder)
188                    .map(|op| op.layer(Self::retry_layer()).finish())
189                    .map_err(|e| DakeraError::Storage(e.to_string()))
190            }
191            ObjectStorageConfig::Azure {
192                container,
193                account_name,
194                account_key,
195                sas_token,
196                endpoint,
197            } => {
198                let mut builder = services::Azblob::default()
199                    .container(container)
200                    .account_name(account_name);
201
202                if let Some(key) = account_key {
203                    builder = builder.account_key(key);
204                }
205                if let Some(token) = sas_token {
206                    builder = builder.sas_token(token);
207                }
208                if let Some(endpoint) = endpoint {
209                    builder = builder.endpoint(endpoint);
210                }
211
212                Operator::new(builder)
213                    .map(|op| op.layer(Self::retry_layer()).finish())
214                    .map_err(|e| DakeraError::Storage(e.to_string()))
215            }
216            ObjectStorageConfig::Gcs {
217                bucket,
218                credential_path,
219                endpoint,
220            } => {
221                let mut builder = services::Gcs::default().bucket(bucket);
222
223                if let Some(cred_path) = credential_path {
224                    builder = builder.credential_path(cred_path);
225                }
226                if let Some(endpoint) = endpoint {
227                    builder = builder.endpoint(endpoint);
228                }
229
230                Operator::new(builder)
231                    .map(|op| op.layer(Self::retry_layer()).finish())
232                    .map_err(|e| DakeraError::Storage(e.to_string()))
233            }
234        }
235    }
236
237    fn retry_layer() -> RetryLayer {
238        let max_times: usize = std::env::var("DAKERA_S3_MAX_RETRIES")
239            .ok()
240            .and_then(|v| v.parse().ok())
241            .unwrap_or(10);
242        let min_delay_ms: u64 = std::env::var("DAKERA_S3_RETRY_MIN_DELAY_MS")
243            .ok()
244            .and_then(|v| v.parse().ok())
245            .unwrap_or(500);
246        let max_delay_secs: u64 = std::env::var("DAKERA_S3_RETRY_MAX_DELAY_SECS")
247            .ok()
248            .and_then(|v| v.parse().ok())
249            .unwrap_or(60);
250
251        tracing::info!(
252            max_times,
253            min_delay_ms,
254            max_delay_secs,
255            "S3 retry layer configured"
256        );
257
258        RetryLayer::new()
259            .with_max_times(max_times)
260            .with_min_delay(Duration::from_millis(min_delay_ms))
261            .with_max_delay(Duration::from_secs(max_delay_secs))
262            .with_jitter()
263            .with_factor(2.0)
264    }
265
266    /// Get the path for a vector
267    fn vector_path(namespace: &str, vector_id: &str) -> String {
268        format!("namespaces/{}/vectors/{}.json", namespace, vector_id)
269    }
270
271    /// Get the path for namespace metadata
272    fn namespace_meta_path(namespace: &str) -> String {
273        format!("namespaces/{}/meta.json", namespace)
274    }
275
276    /// Get the path prefix for all vectors in a namespace
277    fn namespace_vectors_prefix(namespace: &str) -> String {
278        format!("namespaces/{}/vectors/", namespace)
279    }
280
281    /// Read namespace metadata
282    async fn read_namespace_meta(&self, namespace: &str) -> Result<Option<NamespaceMetadata>> {
283        let path = Self::namespace_meta_path(namespace);
284        match self.operator.read(&path).await {
285            Ok(data) => {
286                let bytes = data.to_vec();
287                if bytes.is_empty() {
288                    tracing::warn!(
289                        namespace = %namespace,
290                        path = %path,
291                        "Empty namespace metadata file detected, treating as missing"
292                    );
293                    return Ok(None);
294                }
295                match serde_json::from_slice(&bytes) {
296                    Ok(meta) => Ok(Some(meta)),
297                    Err(e) => {
298                        tracing::warn!(
299                            namespace = %namespace,
300                            path = %path,
301                            error = %e,
302                            bytes_len = bytes.len(),
303                            "Corrupted namespace metadata, treating as missing and will be recreated"
304                        );
305                        Ok(None)
306                    }
307                }
308            }
309            Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(None),
310            Err(e) => Err(DakeraError::Storage(e.to_string())),
311        }
312    }
313
314    /// Write namespace metadata
315    async fn write_namespace_meta(&self, namespace: &str, meta: &NamespaceMetadata) -> Result<()> {
316        let path = Self::namespace_meta_path(namespace);
317        let data = serde_json::to_vec(meta).map_err(|e| DakeraError::Storage(e.to_string()))?;
318        self.operator
319            .write(&path, data)
320            .await
321            .map_err(|e| DakeraError::Storage(e.to_string()))?;
322        Ok(())
323    }
324
325    /// Get current timestamp
326    fn now() -> u64 {
327        std::time::SystemTime::now()
328            .duration_since(std::time::UNIX_EPOCH)
329            .unwrap_or_default()
330            .as_secs()
331    }
332}
333
334#[async_trait]
335impl VectorStorage for ObjectStorage {
336    async fn upsert(&self, namespace: &NamespaceId, vectors: Vec<Vector>) -> Result<usize> {
337        if vectors.is_empty() {
338            return Ok(0);
339        }
340
341        // Get or create namespace metadata
342        let mut meta = self
343            .read_namespace_meta(namespace)
344            .await?
345            .unwrap_or_else(|| NamespaceMetadata {
346                dimension: None,
347                vector_count: 0,
348                created_at: Self::now(),
349                updated_at: Self::now(),
350            });
351
352        // Validate dimensions
353        let first_dim = vectors[0].values.len();
354        if let Some(dim) = meta.dimension {
355            for v in &vectors {
356                if v.values.len() != dim {
357                    return Err(DakeraError::DimensionMismatch {
358                        expected: dim,
359                        actual: v.values.len(),
360                    });
361                }
362            }
363        } else {
364            meta.dimension = Some(first_dim);
365        }
366
367        let mut upserted = 0;
368        for vector in vectors {
369            let path = Self::vector_path(namespace, &vector.id);
370            let stored: StoredVector = vector.into();
371            let data =
372                serde_json::to_vec(&stored).map_err(|e| DakeraError::Storage(e.to_string()))?;
373
374            // Check if this is an insert or update
375            let exists = self
376                .operator
377                .exists(&path)
378                .await
379                .map_err(|e| DakeraError::Storage(e.to_string()))?;
380
381            self.operator
382                .write(&path, data)
383                .await
384                .map_err(|e| DakeraError::Storage(e.to_string()))?;
385
386            if !exists {
387                meta.vector_count += 1;
388            }
389            upserted += 1;
390        }
391
392        meta.updated_at = Self::now();
393        self.write_namespace_meta(namespace, &meta).await?;
394
395        tracing::debug!(
396            namespace = namespace,
397            upserted = upserted,
398            "Upserted vectors to object storage"
399        );
400
401        Ok(upserted)
402    }
403
404    async fn get(&self, namespace: &NamespaceId, ids: &[VectorId]) -> Result<Vec<Vector>> {
405        if ids.is_empty() {
406            return Ok(Vec::new());
407        }
408
409        let now = std::time::SystemTime::now()
410            .duration_since(std::time::UNIX_EPOCH)
411            .unwrap_or_default()
412            .as_secs();
413
414        let read_tasks: Vec<_> = ids
415            .iter()
416            .map(|id| {
417                let operator = self.operator.clone();
418                let path = Self::vector_path(namespace, id);
419                let id = id.clone();
420                async move {
421                    match operator.read(&path).await {
422                        Ok(data) => {
423                            let bytes = data.to_vec();
424                            if bytes.is_empty() {
425                                tracing::warn!(
426                                    vector_id = %id,
427                                    "Empty vector file detected, skipping"
428                                );
429                                return Ok(None);
430                            }
431                            match serde_json::from_slice::<StoredVector>(&bytes) {
432                                Ok(stored) => {
433                                    let vector: Vector = stored.into();
434                                    if !vector.is_expired_at(now) {
435                                        Ok(Some(vector))
436                                    } else {
437                                        Ok(None)
438                                    }
439                                }
440                                Err(e) => {
441                                    tracing::warn!(
442                                        vector_id = %id,
443                                        error = %e,
444                                        bytes_len = bytes.len(),
445                                        "Corrupted vector file detected, skipping"
446                                    );
447                                    Ok(None)
448                                }
449                            }
450                        }
451                        Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(None),
452                        Err(e) => Err(DakeraError::Storage(e.to_string())),
453                    }
454                }
455            })
456            .collect();
457
458        let results: Vec<Result<Option<Vector>>> = stream::iter(read_tasks)
459            .buffer_unordered(s3_concurrent_ops())
460            .collect()
461            .await;
462
463        let mut vectors = Vec::with_capacity(ids.len());
464        for result in results {
465            if let Some(v) = result? {
466                vectors.push(v);
467            }
468        }
469        Ok(vectors)
470    }
471
472    async fn get_all(&self, namespace: &NamespaceId) -> Result<Vec<Vector>> {
473        let prefix = Self::namespace_vectors_prefix(namespace);
474
475        let entries = self
476            .operator
477            .list(&prefix)
478            .await
479            .map_err(|e| DakeraError::Storage(e.to_string()))?;
480
481        let json_paths: Vec<String> = entries
482            .into_iter()
483            .filter(|e| e.path().ends_with(".json"))
484            .map(|e| e.path().to_string())
485            .collect();
486
487        if json_paths.is_empty() {
488            return Ok(Vec::new());
489        }
490
491        let now = std::time::SystemTime::now()
492            .duration_since(std::time::UNIX_EPOCH)
493            .unwrap_or_default()
494            .as_secs();
495
496        let results: Vec<Option<Vector>> = stream::iter(json_paths.into_iter().map(|path| {
497            let operator = self.operator.clone();
498            async move {
499                match operator.read(&path).await {
500                    Ok(data) => {
501                        let bytes = data.to_vec();
502                        if let Ok(stored) = serde_json::from_slice::<StoredVector>(&bytes) {
503                            let vector: Vector = stored.into();
504                            if !vector.is_expired_at(now) {
505                                return Some(vector);
506                            }
507                        }
508                        None
509                    }
510                    Err(e) => {
511                        tracing::warn!(path = %path, error = %e, "Failed to read vector");
512                        None
513                    }
514                }
515            }
516        }))
517        .buffer_unordered(s3_concurrent_ops())
518        .collect()
519        .await;
520
521        Ok(results.into_iter().flatten().collect())
522    }
523
524    async fn delete(&self, namespace: &NamespaceId, ids: &[VectorId]) -> Result<usize> {
525        if ids.is_empty() {
526            return Ok(0);
527        }
528
529        let delete_tasks: Vec<_> = ids
530            .iter()
531            .map(|id| {
532                let operator = self.operator.clone();
533                let path = Self::vector_path(namespace, id);
534                async move {
535                    let exists = operator
536                        .exists(&path)
537                        .await
538                        .map_err(|e| DakeraError::Storage(e.to_string()))?;
539                    if exists {
540                        match operator.delete(&path).await {
541                            Ok(_) => Ok(true),
542                            Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(false),
543                            Err(e) => Err(DakeraError::Storage(e.to_string())),
544                        }
545                    } else {
546                        Ok(false)
547                    }
548                }
549            })
550            .collect();
551
552        let results: Vec<Result<bool>> = stream::iter(delete_tasks)
553            .buffer_unordered(s3_concurrent_ops())
554            .collect()
555            .await;
556
557        let mut deleted = 0;
558        for result in results {
559            if result? {
560                deleted += 1;
561            }
562        }
563
564        // Update metadata
565        if deleted > 0 {
566            if let Some(mut meta) = self.read_namespace_meta(namespace).await? {
567                meta.vector_count = meta.vector_count.saturating_sub(deleted);
568                meta.updated_at = Self::now();
569                self.write_namespace_meta(namespace, &meta).await?;
570            }
571        }
572
573        Ok(deleted)
574    }
575
576    async fn namespace_exists(&self, namespace: &NamespaceId) -> Result<bool> {
577        Ok(self.read_namespace_meta(namespace).await?.is_some())
578    }
579
580    async fn ensure_namespace(&self, namespace: &NamespaceId) -> Result<()> {
581        if self.read_namespace_meta(namespace).await?.is_none() {
582            let meta = NamespaceMetadata {
583                dimension: None,
584                vector_count: 0,
585                created_at: Self::now(),
586                updated_at: Self::now(),
587            };
588            self.write_namespace_meta(namespace, &meta).await?;
589        }
590        Ok(())
591    }
592
593    async fn dimension(&self, namespace: &NamespaceId) -> Result<Option<usize>> {
594        Ok(self
595            .read_namespace_meta(namespace)
596            .await?
597            .and_then(|m| m.dimension))
598    }
599
600    async fn count(&self, namespace: &NamespaceId) -> Result<usize> {
601        Ok(self
602            .read_namespace_meta(namespace)
603            .await?
604            .map(|m| m.vector_count)
605            .unwrap_or(0))
606    }
607
608    async fn list_namespaces(&self) -> Result<Vec<NamespaceId>> {
609        let entries = self
610            .operator
611            .list("namespaces/")
612            .await
613            .map_err(|e| DakeraError::Storage(e.to_string()))?;
614
615        let mut namespaces = Vec::new();
616        for entry in entries {
617            let path = entry.path();
618            // Extract namespace name from path like "namespaces/myns/"
619            if let Some(ns) = path.strip_prefix("namespaces/") {
620                let ns = ns.trim_end_matches('/');
621                if !ns.is_empty() && !ns.contains('/') {
622                    // Only include namespaces that actually have metadata (meta.json).
623                    // This filters out ghost directory entries left behind after
624                    // namespace deletion on backends where empty directories persist
625                    // (e.g., local filesystem, some S3-compatible stores).
626                    if self.read_namespace_meta(ns).await?.is_some() {
627                        namespaces.push(ns.to_string());
628                    }
629                }
630            }
631        }
632
633        Ok(namespaces)
634    }
635
636    async fn delete_namespace(&self, namespace: &NamespaceId) -> Result<bool> {
637        // Check if namespace exists
638        if !self.namespace_exists(namespace).await? {
639            return Ok(false);
640        }
641
642        // Recursively remove everything under the namespace prefix.
643        // This deletes vectors, metadata, indexes, AND directory entries,
644        // preventing ghost namespaces from appearing in list_namespaces().
645        let prefix = format!("namespaces/{}/", namespace);
646        self.operator
647            .remove_all(&prefix)
648            .await
649            .map_err(|e| DakeraError::Storage(e.to_string()))?;
650
651        tracing::debug!(
652            namespace = namespace,
653            "Deleted namespace from object storage"
654        );
655
656        Ok(true)
657    }
658
659    async fn cleanup_expired(&self, _namespace: &NamespaceId) -> Result<usize> {
660        // Object storage doesn't track TTL internally - cleanup handled by application layer
661        // StoredVector doesn't persist TTL fields, so nothing to clean up here
662        Ok(0)
663    }
664
665    async fn cleanup_all_expired(&self) -> Result<usize> {
666        // Object storage doesn't track TTL internally - cleanup handled by application layer
667        Ok(0)
668    }
669}
670
671#[async_trait]
672impl IndexStorage for ObjectStorage {
673    async fn save_index(
674        &self,
675        namespace: &NamespaceId,
676        index_type: IndexType,
677        data: Vec<u8>,
678    ) -> Result<()> {
679        let path = format!(
680            "namespaces/{}/indexes/{}.bin",
681            namespace,
682            index_type.as_str()
683        );
684        self.operator
685            .write(&path, data)
686            .await
687            .map_err(|e| DakeraError::Storage(e.to_string()))?;
688
689        tracing::debug!(
690            namespace = namespace,
691            index_type = index_type.as_str(),
692            "Saved index to object storage"
693        );
694        Ok(())
695    }
696
697    async fn load_index(
698        &self,
699        namespace: &NamespaceId,
700        index_type: IndexType,
701    ) -> Result<Option<Vec<u8>>> {
702        let path = format!(
703            "namespaces/{}/indexes/{}.bin",
704            namespace,
705            index_type.as_str()
706        );
707        match self.operator.read(&path).await {
708            Ok(data) => {
709                tracing::debug!(
710                    namespace = namespace,
711                    index_type = index_type.as_str(),
712                    size = data.len(),
713                    "Loaded index from object storage"
714                );
715                Ok(Some(data.to_vec()))
716            }
717            Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(None),
718            Err(e) => Err(DakeraError::Storage(e.to_string())),
719        }
720    }
721
722    async fn delete_index(&self, namespace: &NamespaceId, index_type: IndexType) -> Result<bool> {
723        let path = format!(
724            "namespaces/{}/indexes/{}.bin",
725            namespace,
726            index_type.as_str()
727        );
728        let exists = self
729            .operator
730            .exists(&path)
731            .await
732            .map_err(|e| DakeraError::Storage(e.to_string()))?;
733
734        if exists {
735            self.operator
736                .delete(&path)
737                .await
738                .map_err(|e| DakeraError::Storage(e.to_string()))?;
739            tracing::debug!(
740                namespace = namespace,
741                index_type = index_type.as_str(),
742                "Deleted index from object storage"
743            );
744        }
745        Ok(exists)
746    }
747
748    async fn index_exists(&self, namespace: &NamespaceId, index_type: IndexType) -> Result<bool> {
749        let path = format!(
750            "namespaces/{}/indexes/{}.bin",
751            namespace,
752            index_type.as_str()
753        );
754        self.operator
755            .exists(&path)
756            .await
757            .map_err(|e| DakeraError::Storage(e.to_string()))
758    }
759
760    async fn list_indexes(&self, namespace: &NamespaceId) -> Result<Vec<IndexType>> {
761        let prefix = format!("namespaces/{}/indexes/", namespace);
762        let entries = self
763            .operator
764            .list(&prefix)
765            .await
766            .map_err(|e| DakeraError::Storage(e.to_string()))?;
767
768        let mut indexes = Vec::new();
769        for entry in entries {
770            let path = entry.path();
771            if path.ends_with(".bin") {
772                // Extract index type from filename
773                if let Some(filename) = path.strip_prefix(&prefix) {
774                    let name = filename.trim_end_matches(".bin");
775                    match name {
776                        "hnsw" => indexes.push(IndexType::Hnsw),
777                        "pq" => indexes.push(IndexType::Pq),
778                        "ivf" => indexes.push(IndexType::Ivf),
779                        "spfresh" => indexes.push(IndexType::SpFresh),
780                        "fulltext" => indexes.push(IndexType::FullText),
781                        _ => {} // Ignore unknown index types
782                    }
783                }
784            }
785        }
786
787        Ok(indexes)
788    }
789}
790
791/// Create an OpenDAL operator from configuration without constructing a full ObjectStorage.
792/// Useful for lightweight S3 access (e.g., BackupManager metadata persistence).
793pub fn create_operator(config: &ObjectStorageConfig) -> Result<Operator> {
794    ObjectStorage::build_operator(config)
795}
796
797#[cfg(test)]
798mod tests {
799    use super::*;
800
801    #[tokio::test]
802    async fn test_object_storage_memory() {
803        let storage = ObjectStorage::memory().unwrap();
804        let namespace = "test".to_string();
805
806        // Ensure namespace
807        storage.ensure_namespace(&namespace).await.unwrap();
808        assert!(storage.namespace_exists(&namespace).await.unwrap());
809
810        // Insert vectors
811        let vectors = vec![
812            Vector {
813                id: "v1".to_string(),
814                values: vec![1.0, 2.0, 3.0],
815                metadata: None,
816                ttl_seconds: None,
817                expires_at: None,
818            },
819            Vector {
820                id: "v2".to_string(),
821                values: vec![4.0, 5.0, 6.0],
822                metadata: Some(serde_json::json!({"key": "value"})),
823                ttl_seconds: None,
824                expires_at: None,
825            },
826        ];
827
828        let count = storage.upsert(&namespace, vectors).await.unwrap();
829        assert_eq!(count, 2);
830
831        // Get single vector
832        let results = storage.get(&namespace, &["v1".to_string()]).await.unwrap();
833        assert_eq!(results.len(), 1);
834        assert_eq!(results[0].id, "v1");
835        assert_eq!(results[0].values, vec![1.0, 2.0, 3.0]);
836
837        // Get all vectors
838        let all = storage.get_all(&namespace).await.unwrap();
839        assert_eq!(all.len(), 2);
840
841        // Count
842        assert_eq!(storage.count(&namespace).await.unwrap(), 2);
843
844        // Delete
845        let deleted = storage
846            .delete(&namespace, &["v1".to_string()])
847            .await
848            .unwrap();
849        assert_eq!(deleted, 1);
850        assert!(storage
851            .get(&namespace, &["v1".to_string()])
852            .await
853            .unwrap()
854            .is_empty());
855        assert_eq!(storage.count(&namespace).await.unwrap(), 1);
856    }
857
858    #[tokio::test]
859    async fn test_object_storage_dimension_mismatch() {
860        let storage = ObjectStorage::memory().unwrap();
861        let namespace = "test".to_string();
862        storage.ensure_namespace(&namespace).await.unwrap();
863
864        // Insert first vector
865        let v1 = vec![Vector {
866            id: "v1".to_string(),
867            values: vec![1.0, 2.0, 3.0],
868            metadata: None,
869            ttl_seconds: None,
870            expires_at: None,
871        }];
872        storage.upsert(&namespace, v1).await.unwrap();
873
874        // Try to insert vector with different dimension
875        let v2 = vec![Vector {
876            id: "v2".to_string(),
877            values: vec![1.0, 2.0], // Wrong dimension
878            metadata: None,
879            ttl_seconds: None,
880            expires_at: None,
881        }];
882        let result = storage.upsert(&namespace, v2).await;
883        assert!(result.is_err());
884    }
885
886    #[tokio::test]
887    async fn test_object_storage_upsert() {
888        let storage = ObjectStorage::memory().unwrap();
889        let namespace = "test".to_string();
890        storage.ensure_namespace(&namespace).await.unwrap();
891
892        // Insert
893        let v1 = vec![Vector {
894            id: "v1".to_string(),
895            values: vec![1.0, 2.0],
896            metadata: None,
897            ttl_seconds: None,
898            expires_at: None,
899        }];
900        storage.upsert(&namespace, v1).await.unwrap();
901
902        // Upsert (update)
903        let v1_updated = vec![Vector {
904            id: "v1".to_string(),
905            values: vec![3.0, 4.0],
906            metadata: None,
907            ttl_seconds: None,
908            expires_at: None,
909        }];
910        storage.upsert(&namespace, v1_updated).await.unwrap();
911
912        // Verify update
913        let results = storage.get(&namespace, &["v1".to_string()]).await.unwrap();
914        assert_eq!(results.len(), 1);
915        assert_eq!(results[0].values, vec![3.0, 4.0]);
916
917        // Count should still be 1
918        assert_eq!(storage.count(&namespace).await.unwrap(), 1);
919    }
920
921    #[tokio::test]
922    async fn test_index_storage() {
923        let storage = ObjectStorage::memory().unwrap();
924        let namespace = "test_index".to_string();
925
926        // Initially no indexes
927        assert!(!storage
928            .index_exists(&namespace, IndexType::Hnsw)
929            .await
930            .unwrap());
931
932        // Save an index
933        let index_data = b"fake hnsw index data for testing".to_vec();
934        storage
935            .save_index(&namespace, IndexType::Hnsw, index_data.clone())
936            .await
937            .unwrap();
938
939        // Check it exists
940        assert!(storage
941            .index_exists(&namespace, IndexType::Hnsw)
942            .await
943            .unwrap());
944        assert!(!storage
945            .index_exists(&namespace, IndexType::Pq)
946            .await
947            .unwrap());
948
949        // Load it back
950        let loaded = storage
951            .load_index(&namespace, IndexType::Hnsw)
952            .await
953            .unwrap();
954        assert!(loaded.is_some());
955        assert_eq!(loaded.unwrap(), index_data);
956
957        // Save another index type
958        let pq_data = b"fake pq index data".to_vec();
959        storage
960            .save_index(&namespace, IndexType::Pq, pq_data)
961            .await
962            .unwrap();
963
964        // List indexes
965        let indexes = storage.list_indexes(&namespace).await.unwrap();
966        assert_eq!(indexes.len(), 2);
967        assert!(indexes.contains(&IndexType::Hnsw));
968        assert!(indexes.contains(&IndexType::Pq));
969
970        // Delete index
971        let deleted = storage
972            .delete_index(&namespace, IndexType::Hnsw)
973            .await
974            .unwrap();
975        assert!(deleted);
976        assert!(!storage
977            .index_exists(&namespace, IndexType::Hnsw)
978            .await
979            .unwrap());
980
981        // Delete non-existent
982        let deleted = storage
983            .delete_index(&namespace, IndexType::Hnsw)
984            .await
985            .unwrap();
986        assert!(!deleted);
987
988        // Load non-existent
989        let loaded = storage
990            .load_index(&namespace, IndexType::Hnsw)
991            .await
992            .unwrap();
993        assert!(loaded.is_none());
994    }
995}