Skip to main content

storage/
object.rs

1//! Object Storage Backend using OpenDAL
2//!
3//! Provides storage abstraction for multiple cloud storage backends:
4//! - S3-compatible (AWS S3, MinIO, DigitalOcean Spaces, etc.)
5//! - Azure Blob Storage
6//! - Google Cloud Storage (GCS)
7//! - Local filesystem
8//! - In-memory (for testing)
9
10use async_trait::async_trait;
11use common::{DakeraError, NamespaceId, Result, Vector, VectorId};
12use futures_util::stream::{self, StreamExt};
13use opendal::{layers::RetryLayer, services, Operator};
14use serde::{Deserialize, Serialize};
15use std::time::Duration;
16
17use crate::traits::{IndexStorage, IndexType, VectorStorage};
18
19fn s3_concurrent_ops() -> usize {
20    std::env::var("DAKERA_S3_CONCURRENT_OPS")
21        .ok()
22        .and_then(|v| v.parse().ok())
23        .unwrap_or(16)
24}
25
26/// Serializable namespace metadata
27#[derive(Debug, Clone, Serialize, Deserialize)]
28struct NamespaceMetadata {
29    dimension: Option<usize>,
30    vector_count: usize,
31    created_at: u64,
32    updated_at: u64,
33}
34
35/// Serializable vector data for storage
36#[derive(Debug, Clone, Serialize, Deserialize)]
37struct StoredVector {
38    id: String,
39    values: Vec<f32>,
40    metadata: Option<serde_json::Value>,
41}
42
43impl From<Vector> for StoredVector {
44    fn from(v: Vector) -> Self {
45        Self {
46            id: v.id,
47            values: v.values,
48            metadata: v.metadata,
49        }
50    }
51}
52
53impl From<StoredVector> for Vector {
54    fn from(v: StoredVector) -> Self {
55        Self {
56            id: v.id,
57            values: v.values,
58            metadata: v.metadata,
59            ttl_seconds: None,
60            expires_at: None,
61        }
62    }
63}
64
65/// Object storage backend configuration
66#[derive(Debug, Clone, Default)]
67pub enum ObjectStorageConfig {
68    /// In-memory storage (for testing)
69    #[default]
70    Memory,
71    /// Local filesystem storage
72    Filesystem { root: String },
73    /// S3-compatible storage (S3, MinIO, etc.)
74    S3 {
75        bucket: String,
76        region: Option<String>,
77        endpoint: Option<String>,
78        access_key_id: Option<String>,
79        secret_access_key: Option<String>,
80    },
81    /// Azure Blob Storage
82    Azure {
83        container: String,
84        account_name: String,
85        account_key: Option<String>,
86        sas_token: Option<String>,
87        endpoint: Option<String>,
88    },
89    /// Google Cloud Storage
90    Gcs {
91        bucket: String,
92        credential_path: Option<String>,
93        endpoint: Option<String>,
94    },
95}
96
97/// Object storage backend using OpenDAL
98#[derive(Clone)]
99pub struct ObjectStorage {
100    operator: Operator,
101}
102
103impl ObjectStorage {
104    /// Create a new object storage with the given configuration
105    pub fn new(config: ObjectStorageConfig) -> Result<Self> {
106        let operator = Self::build_operator(&config)?;
107        Ok(Self { operator })
108    }
109
110    /// Create in-memory storage (for testing)
111    pub fn memory() -> Result<Self> {
112        Self::new(ObjectStorageConfig::Memory)
113    }
114
115    /// Create filesystem storage
116    pub fn filesystem(root: impl Into<String>) -> Result<Self> {
117        Self::new(ObjectStorageConfig::Filesystem { root: root.into() })
118    }
119
120    /// Create S3 storage
121    pub fn s3(bucket: impl Into<String>) -> Result<Self> {
122        Self::new(ObjectStorageConfig::S3 {
123            bucket: bucket.into(),
124            region: None,
125            endpoint: None,
126            access_key_id: None,
127            secret_access_key: None,
128        })
129    }
130
131    /// Create Azure Blob storage
132    pub fn azure(container: impl Into<String>, account_name: impl Into<String>) -> Result<Self> {
133        Self::new(ObjectStorageConfig::Azure {
134            container: container.into(),
135            account_name: account_name.into(),
136            account_key: None,
137            sas_token: None,
138            endpoint: None,
139        })
140    }
141
142    /// Create Google Cloud Storage
143    pub fn gcs(bucket: impl Into<String>) -> Result<Self> {
144        Self::new(ObjectStorageConfig::Gcs {
145            bucket: bucket.into(),
146            credential_path: None,
147            endpoint: None,
148        })
149    }
150
151    pub fn build_operator(config: &ObjectStorageConfig) -> Result<Operator> {
152        match config {
153            ObjectStorageConfig::Memory => {
154                let builder = services::Memory::default();
155                Operator::new(builder)
156                    .map(|op| op.finish())
157                    .map_err(|e| DakeraError::Storage(e.to_string()))
158            }
159            ObjectStorageConfig::Filesystem { root } => {
160                let builder = services::Fs::default().root(root);
161                Operator::new(builder)
162                    .map(|op| op.layer(Self::retry_layer()).finish())
163                    .map_err(|e| DakeraError::Storage(e.to_string()))
164            }
165            ObjectStorageConfig::S3 {
166                bucket,
167                region,
168                endpoint,
169                access_key_id,
170                secret_access_key,
171            } => {
172                let mut builder = services::S3::default().bucket(bucket);
173
174                if let Some(region) = region {
175                    builder = builder.region(region);
176                }
177                if let Some(endpoint) = endpoint {
178                    builder = builder.endpoint(endpoint);
179                }
180                if let Some(key) = access_key_id {
181                    builder = builder.access_key_id(key);
182                }
183                if let Some(secret) = secret_access_key {
184                    builder = builder.secret_access_key(secret);
185                }
186
187                Operator::new(builder)
188                    .map(|op| op.layer(Self::retry_layer()).finish())
189                    .map_err(|e| DakeraError::Storage(e.to_string()))
190            }
191            ObjectStorageConfig::Azure {
192                container,
193                account_name,
194                account_key,
195                sas_token,
196                endpoint,
197            } => {
198                let mut builder = services::Azblob::default()
199                    .container(container)
200                    .account_name(account_name);
201
202                if let Some(key) = account_key {
203                    builder = builder.account_key(key);
204                }
205                if let Some(token) = sas_token {
206                    builder = builder.sas_token(token);
207                }
208                if let Some(endpoint) = endpoint {
209                    builder = builder.endpoint(endpoint);
210                }
211
212                Operator::new(builder)
213                    .map(|op| op.layer(Self::retry_layer()).finish())
214                    .map_err(|e| DakeraError::Storage(e.to_string()))
215            }
216            ObjectStorageConfig::Gcs {
217                bucket,
218                credential_path,
219                endpoint,
220            } => {
221                let mut builder = services::Gcs::default().bucket(bucket);
222
223                if let Some(cred_path) = credential_path {
224                    builder = builder.credential_path(cred_path);
225                }
226                if let Some(endpoint) = endpoint {
227                    builder = builder.endpoint(endpoint);
228                }
229
230                Operator::new(builder)
231                    .map(|op| op.layer(Self::retry_layer()).finish())
232                    .map_err(|e| DakeraError::Storage(e.to_string()))
233            }
234        }
235    }
236
237    fn retry_layer() -> RetryLayer {
238        let max_times: usize = std::env::var("DAKERA_S3_MAX_RETRIES")
239            .ok()
240            .and_then(|v| v.parse().ok())
241            .unwrap_or(10);
242        let min_delay_ms: u64 = std::env::var("DAKERA_S3_RETRY_MIN_DELAY_MS")
243            .ok()
244            .and_then(|v| v.parse().ok())
245            .unwrap_or(500);
246        let max_delay_secs: u64 = std::env::var("DAKERA_S3_RETRY_MAX_DELAY_SECS")
247            .ok()
248            .and_then(|v| v.parse().ok())
249            .unwrap_or(60);
250
251        tracing::info!(
252            max_times,
253            min_delay_ms,
254            max_delay_secs,
255            "S3 retry layer configured"
256        );
257
258        RetryLayer::new()
259            .with_max_times(max_times)
260            .with_min_delay(Duration::from_millis(min_delay_ms))
261            .with_max_delay(Duration::from_secs(max_delay_secs))
262            .with_jitter()
263            .with_factor(2.0)
264    }
265
266    /// Get the path for a vector
267    fn vector_path(namespace: &str, vector_id: &str) -> String {
268        format!("namespaces/{}/vectors/{}.json", namespace, vector_id)
269    }
270
271    /// Get the path for namespace metadata
272    fn namespace_meta_path(namespace: &str) -> String {
273        format!("namespaces/{}/meta.json", namespace)
274    }
275
276    /// Get the path prefix for all vectors in a namespace
277    fn namespace_vectors_prefix(namespace: &str) -> String {
278        format!("namespaces/{}/vectors/", namespace)
279    }
280
281    /// Read namespace metadata
282    async fn read_namespace_meta(&self, namespace: &str) -> Result<Option<NamespaceMetadata>> {
283        let path = Self::namespace_meta_path(namespace);
284        match self.operator.read(&path).await {
285            Ok(data) => {
286                let bytes = data.to_vec();
287                if bytes.is_empty() {
288                    tracing::warn!(
289                        namespace = %namespace,
290                        path = %path,
291                        "Empty namespace metadata file detected, treating as missing"
292                    );
293                    return Ok(None);
294                }
295                match serde_json::from_slice(&bytes) {
296                    Ok(meta) => Ok(Some(meta)),
297                    Err(e) => {
298                        tracing::warn!(
299                            namespace = %namespace,
300                            path = %path,
301                            error = %e,
302                            bytes_len = bytes.len(),
303                            "Corrupted namespace metadata, treating as missing and will be recreated"
304                        );
305                        Ok(None)
306                    }
307                }
308            }
309            Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(None),
310            Err(e) => Err(DakeraError::Storage(e.to_string())),
311        }
312    }
313
314    /// Write namespace metadata
315    async fn write_namespace_meta(&self, namespace: &str, meta: &NamespaceMetadata) -> Result<()> {
316        let path = Self::namespace_meta_path(namespace);
317        let data = serde_json::to_vec(meta).map_err(|e| DakeraError::Storage(e.to_string()))?;
318        self.operator
319            .write(&path, data)
320            .await
321            .map_err(|e| DakeraError::Storage(e.to_string()))?;
322        Ok(())
323    }
324
325    /// Get current timestamp
326    fn now() -> u64 {
327        std::time::SystemTime::now()
328            .duration_since(std::time::UNIX_EPOCH)
329            .unwrap_or_default()
330            .as_secs()
331    }
332}
333
334#[async_trait]
335impl VectorStorage for ObjectStorage {
336    async fn upsert(&self, namespace: &NamespaceId, vectors: Vec<Vector>) -> Result<usize> {
337        if vectors.is_empty() {
338            return Ok(0);
339        }
340
341        // Get or create namespace metadata
342        let mut meta = self
343            .read_namespace_meta(namespace)
344            .await?
345            .unwrap_or_else(|| NamespaceMetadata {
346                dimension: None,
347                vector_count: 0,
348                created_at: Self::now(),
349                updated_at: Self::now(),
350            });
351
352        // Validate dimensions
353        let first_dim = vectors[0].values.len();
354        if let Some(dim) = meta.dimension {
355            for v in &vectors {
356                if v.values.len() != dim {
357                    return Err(DakeraError::DimensionMismatch {
358                        expected: dim,
359                        actual: v.values.len(),
360                    });
361                }
362            }
363        } else {
364            meta.dimension = Some(first_dim);
365        }
366
367        let mut upserted = 0;
368        for vector in vectors {
369            let path = Self::vector_path(namespace, &vector.id);
370            let stored: StoredVector = vector.into();
371            let data =
372                serde_json::to_vec(&stored).map_err(|e| DakeraError::Storage(e.to_string()))?;
373
374            // Check if this is an insert or update
375            let exists = self
376                .operator
377                .exists(&path)
378                .await
379                .map_err(|e| DakeraError::Storage(e.to_string()))?;
380
381            self.operator
382                .write(&path, data)
383                .await
384                .map_err(|e| DakeraError::Storage(e.to_string()))?;
385
386            if !exists {
387                meta.vector_count += 1;
388            }
389            upserted += 1;
390        }
391
392        meta.updated_at = Self::now();
393        self.write_namespace_meta(namespace, &meta).await?;
394
395        tracing::debug!(
396            namespace = namespace,
397            upserted = upserted,
398            "Upserted vectors to object storage"
399        );
400
401        Ok(upserted)
402    }
403
404    async fn get(&self, namespace: &NamespaceId, ids: &[VectorId]) -> Result<Vec<Vector>> {
405        if ids.is_empty() {
406            return Ok(Vec::new());
407        }
408
409        let now = std::time::SystemTime::now()
410            .duration_since(std::time::UNIX_EPOCH)
411            .unwrap_or_default()
412            .as_secs();
413
414        let read_tasks: Vec<_> = ids
415            .iter()
416            .map(|id| {
417                let operator = self.operator.clone();
418                let path = Self::vector_path(namespace, id);
419                let id = id.clone();
420                async move {
421                    match operator.read(&path).await {
422                        Ok(data) => {
423                            let bytes = data.to_vec();
424                            if bytes.is_empty() {
425                                tracing::warn!(
426                                    vector_id = %id,
427                                    "Empty vector file detected, skipping"
428                                );
429                                return Ok(None);
430                            }
431                            match serde_json::from_slice::<StoredVector>(&bytes) {
432                                Ok(stored) => {
433                                    let vector: Vector = stored.into();
434                                    if !vector.is_expired_at(now) {
435                                        Ok(Some(vector))
436                                    } else {
437                                        Ok(None)
438                                    }
439                                }
440                                Err(e) => {
441                                    tracing::warn!(
442                                        vector_id = %id,
443                                        error = %e,
444                                        bytes_len = bytes.len(),
445                                        "Corrupted vector file detected, skipping"
446                                    );
447                                    Ok(None)
448                                }
449                            }
450                        }
451                        Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(None),
452                        Err(e) => Err(DakeraError::Storage(e.to_string())),
453                    }
454                }
455            })
456            .collect();
457
458        let results: Vec<Result<Option<Vector>>> = stream::iter(read_tasks)
459            .buffer_unordered(s3_concurrent_ops())
460            .collect()
461            .await;
462
463        let mut vectors = Vec::with_capacity(ids.len());
464        for result in results {
465            if let Some(v) = result? {
466                vectors.push(v);
467            }
468        }
469        Ok(vectors)
470    }
471
472    async fn get_all(&self, namespace: &NamespaceId) -> Result<Vec<Vector>> {
473        let prefix = Self::namespace_vectors_prefix(namespace);
474
475        let entries = self
476            .operator
477            .list(&prefix)
478            .await
479            .map_err(|e| DakeraError::Storage(e.to_string()))?;
480
481        let json_paths: Vec<String> = entries
482            .into_iter()
483            .filter(|e| e.path().ends_with(".json"))
484            .map(|e| e.path().to_string())
485            .collect();
486
487        if json_paths.is_empty() {
488            return Ok(Vec::new());
489        }
490
491        let now = std::time::SystemTime::now()
492            .duration_since(std::time::UNIX_EPOCH)
493            .unwrap_or_default()
494            .as_secs();
495
496        let results: Vec<Option<Vector>> = stream::iter(json_paths.into_iter().map(|path| {
497            let operator = self.operator.clone();
498            async move {
499                match operator.read(&path).await {
500                    Ok(data) => {
501                        let bytes = data.to_vec();
502                        if let Ok(stored) = serde_json::from_slice::<StoredVector>(&bytes) {
503                            let vector: Vector = stored.into();
504                            if !vector.is_expired_at(now) {
505                                return Some(vector);
506                            }
507                        }
508                        None
509                    }
510                    Err(e) => {
511                        tracing::warn!(path = %path, error = %e, "Failed to read vector");
512                        None
513                    }
514                }
515            }
516        }))
517        .buffer_unordered(s3_concurrent_ops())
518        .collect()
519        .await;
520
521        Ok(results.into_iter().flatten().collect())
522    }
523
524    async fn delete(&self, namespace: &NamespaceId, ids: &[VectorId]) -> Result<usize> {
525        if ids.is_empty() {
526            return Ok(0);
527        }
528
529        let delete_tasks: Vec<_> = ids
530            .iter()
531            .map(|id| {
532                let operator = self.operator.clone();
533                let path = Self::vector_path(namespace, id);
534                async move {
535                    let exists = operator
536                        .exists(&path)
537                        .await
538                        .map_err(|e| DakeraError::Storage(e.to_string()))?;
539                    if exists {
540                        match operator.delete(&path).await {
541                            Ok(_) => Ok(true),
542                            Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(false),
543                            Err(e) => Err(DakeraError::Storage(e.to_string())),
544                        }
545                    } else {
546                        Ok(false)
547                    }
548                }
549            })
550            .collect();
551
552        let results: Vec<Result<bool>> = stream::iter(delete_tasks)
553            .buffer_unordered(s3_concurrent_ops())
554            .collect()
555            .await;
556
557        let mut deleted = 0;
558        for result in results {
559            if result? {
560                deleted += 1;
561            }
562        }
563
564        // Update metadata
565        if deleted > 0 {
566            if let Some(mut meta) = self.read_namespace_meta(namespace).await? {
567                meta.vector_count = meta.vector_count.saturating_sub(deleted);
568                meta.updated_at = Self::now();
569                self.write_namespace_meta(namespace, &meta).await?;
570            }
571        }
572
573        Ok(deleted)
574    }
575
576    async fn namespace_exists(&self, namespace: &NamespaceId) -> Result<bool> {
577        Ok(self.read_namespace_meta(namespace).await?.is_some())
578    }
579
580    async fn ensure_namespace(&self, namespace: &NamespaceId) -> Result<()> {
581        if self.read_namespace_meta(namespace).await?.is_none() {
582            let meta = NamespaceMetadata {
583                dimension: None,
584                vector_count: 0,
585                created_at: Self::now(),
586                updated_at: Self::now(),
587            };
588            self.write_namespace_meta(namespace, &meta).await?;
589        }
590        Ok(())
591    }
592
593    async fn dimension(&self, namespace: &NamespaceId) -> Result<Option<usize>> {
594        Ok(self
595            .read_namespace_meta(namespace)
596            .await?
597            .and_then(|m| m.dimension))
598    }
599
600    async fn count(&self, namespace: &NamespaceId) -> Result<usize> {
601        Ok(self
602            .read_namespace_meta(namespace)
603            .await?
604            .map(|m| m.vector_count)
605            .unwrap_or(0))
606    }
607
608    async fn list_namespaces(&self) -> Result<Vec<NamespaceId>> {
609        let entries = self
610            .operator
611            .list("namespaces/")
612            .await
613            .map_err(|e| DakeraError::Storage(e.to_string()))?;
614
615        let mut namespaces = Vec::new();
616        for entry in entries {
617            let path = entry.path();
618            // Extract namespace name from path like "namespaces/myns/"
619            if let Some(ns) = path.strip_prefix("namespaces/") {
620                let ns = ns.trim_end_matches('/');
621                if !ns.is_empty() && !ns.contains('/') {
622                    // Only include namespaces that actually have metadata (meta.json).
623                    // This filters out ghost directory entries left behind after
624                    // namespace deletion on backends where empty directories persist
625                    // (e.g., local filesystem, some S3-compatible stores).
626                    if self.read_namespace_meta(ns).await?.is_some() {
627                        namespaces.push(ns.to_string());
628                    }
629                }
630            }
631        }
632
633        Ok(namespaces)
634    }
635
636    async fn delete_namespace(&self, namespace: &NamespaceId) -> Result<bool> {
637        // Check if namespace exists
638        if !self.namespace_exists(namespace).await? {
639            return Ok(false);
640        }
641
642        // Recursively remove everything under the namespace prefix.
643        // This deletes vectors, metadata, indexes, AND directory entries,
644        // preventing ghost namespaces from appearing in list_namespaces().
645        let prefix = format!("namespaces/{}/", namespace);
646        self.operator
647            .delete_with(&prefix)
648            .recursive(true)
649            .await
650            .map_err(|e| DakeraError::Storage(e.to_string()))?;
651
652        tracing::debug!(
653            namespace = namespace,
654            "Deleted namespace from object storage"
655        );
656
657        Ok(true)
658    }
659
660    async fn cleanup_expired(&self, _namespace: &NamespaceId) -> Result<usize> {
661        // Object storage doesn't track TTL internally - cleanup handled by application layer
662        // StoredVector doesn't persist TTL fields, so nothing to clean up here
663        Ok(0)
664    }
665
666    async fn cleanup_all_expired(&self) -> Result<usize> {
667        // Object storage doesn't track TTL internally - cleanup handled by application layer
668        Ok(0)
669    }
670}
671
672#[async_trait]
673impl IndexStorage for ObjectStorage {
674    async fn save_index(
675        &self,
676        namespace: &NamespaceId,
677        index_type: IndexType,
678        data: Vec<u8>,
679    ) -> Result<()> {
680        let path = format!(
681            "namespaces/{}/indexes/{}.bin",
682            namespace,
683            index_type.as_str()
684        );
685        self.operator
686            .write(&path, data)
687            .await
688            .map_err(|e| DakeraError::Storage(e.to_string()))?;
689
690        tracing::debug!(
691            namespace = namespace,
692            index_type = index_type.as_str(),
693            "Saved index to object storage"
694        );
695        Ok(())
696    }
697
698    async fn load_index(
699        &self,
700        namespace: &NamespaceId,
701        index_type: IndexType,
702    ) -> Result<Option<Vec<u8>>> {
703        let path = format!(
704            "namespaces/{}/indexes/{}.bin",
705            namespace,
706            index_type.as_str()
707        );
708        match self.operator.read(&path).await {
709            Ok(data) => {
710                tracing::debug!(
711                    namespace = namespace,
712                    index_type = index_type.as_str(),
713                    size = data.len(),
714                    "Loaded index from object storage"
715                );
716                Ok(Some(data.to_vec()))
717            }
718            Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(None),
719            Err(e) => Err(DakeraError::Storage(e.to_string())),
720        }
721    }
722
723    async fn delete_index(&self, namespace: &NamespaceId, index_type: IndexType) -> Result<bool> {
724        let path = format!(
725            "namespaces/{}/indexes/{}.bin",
726            namespace,
727            index_type.as_str()
728        );
729        let exists = self
730            .operator
731            .exists(&path)
732            .await
733            .map_err(|e| DakeraError::Storage(e.to_string()))?;
734
735        if exists {
736            self.operator
737                .delete(&path)
738                .await
739                .map_err(|e| DakeraError::Storage(e.to_string()))?;
740            tracing::debug!(
741                namespace = namespace,
742                index_type = index_type.as_str(),
743                "Deleted index from object storage"
744            );
745        }
746        Ok(exists)
747    }
748
749    async fn index_exists(&self, namespace: &NamespaceId, index_type: IndexType) -> Result<bool> {
750        let path = format!(
751            "namespaces/{}/indexes/{}.bin",
752            namespace,
753            index_type.as_str()
754        );
755        self.operator
756            .exists(&path)
757            .await
758            .map_err(|e| DakeraError::Storage(e.to_string()))
759    }
760
761    async fn list_indexes(&self, namespace: &NamespaceId) -> Result<Vec<IndexType>> {
762        let prefix = format!("namespaces/{}/indexes/", namespace);
763        let entries = self
764            .operator
765            .list(&prefix)
766            .await
767            .map_err(|e| DakeraError::Storage(e.to_string()))?;
768
769        let mut indexes = Vec::new();
770        for entry in entries {
771            let path = entry.path();
772            if path.ends_with(".bin") {
773                // Extract index type from filename
774                if let Some(filename) = path.strip_prefix(&prefix) {
775                    let name = filename.trim_end_matches(".bin");
776                    match name {
777                        "hnsw" => indexes.push(IndexType::Hnsw),
778                        "pq" => indexes.push(IndexType::Pq),
779                        "ivf" => indexes.push(IndexType::Ivf),
780                        "spfresh" => indexes.push(IndexType::SpFresh),
781                        "fulltext" => indexes.push(IndexType::FullText),
782                        _ => {} // Ignore unknown index types
783                    }
784                }
785            }
786        }
787
788        Ok(indexes)
789    }
790}
791
792/// Create an OpenDAL operator from configuration without constructing a full ObjectStorage.
793/// Useful for lightweight S3 access (e.g., BackupManager metadata persistence).
794pub fn create_operator(config: &ObjectStorageConfig) -> Result<Operator> {
795    ObjectStorage::build_operator(config)
796}
797
798#[cfg(test)]
799mod tests {
800    use super::*;
801
802    #[tokio::test]
803    async fn test_object_storage_memory() {
804        let storage = ObjectStorage::memory().unwrap();
805        let namespace = "test".to_string();
806
807        // Ensure namespace
808        storage.ensure_namespace(&namespace).await.unwrap();
809        assert!(storage.namespace_exists(&namespace).await.unwrap());
810
811        // Insert vectors
812        let vectors = vec![
813            Vector {
814                id: "v1".to_string(),
815                values: vec![1.0, 2.0, 3.0],
816                metadata: None,
817                ttl_seconds: None,
818                expires_at: None,
819            },
820            Vector {
821                id: "v2".to_string(),
822                values: vec![4.0, 5.0, 6.0],
823                metadata: Some(serde_json::json!({"key": "value"})),
824                ttl_seconds: None,
825                expires_at: None,
826            },
827        ];
828
829        let count = storage.upsert(&namespace, vectors).await.unwrap();
830        assert_eq!(count, 2);
831
832        // Get single vector
833        let results = storage.get(&namespace, &["v1".to_string()]).await.unwrap();
834        assert_eq!(results.len(), 1);
835        assert_eq!(results[0].id, "v1");
836        assert_eq!(results[0].values, vec![1.0, 2.0, 3.0]);
837
838        // Get all vectors
839        let all = storage.get_all(&namespace).await.unwrap();
840        assert_eq!(all.len(), 2);
841
842        // Count
843        assert_eq!(storage.count(&namespace).await.unwrap(), 2);
844
845        // Delete
846        let deleted = storage
847            .delete(&namespace, &["v1".to_string()])
848            .await
849            .unwrap();
850        assert_eq!(deleted, 1);
851        assert!(storage
852            .get(&namespace, &["v1".to_string()])
853            .await
854            .unwrap()
855            .is_empty());
856        assert_eq!(storage.count(&namespace).await.unwrap(), 1);
857    }
858
859    #[tokio::test]
860    async fn test_object_storage_dimension_mismatch() {
861        let storage = ObjectStorage::memory().unwrap();
862        let namespace = "test".to_string();
863        storage.ensure_namespace(&namespace).await.unwrap();
864
865        // Insert first vector
866        let v1 = vec![Vector {
867            id: "v1".to_string(),
868            values: vec![1.0, 2.0, 3.0],
869            metadata: None,
870            ttl_seconds: None,
871            expires_at: None,
872        }];
873        storage.upsert(&namespace, v1).await.unwrap();
874
875        // Try to insert vector with different dimension
876        let v2 = vec![Vector {
877            id: "v2".to_string(),
878            values: vec![1.0, 2.0], // Wrong dimension
879            metadata: None,
880            ttl_seconds: None,
881            expires_at: None,
882        }];
883        let result = storage.upsert(&namespace, v2).await;
884        assert!(result.is_err());
885    }
886
887    #[tokio::test]
888    async fn test_object_storage_upsert() {
889        let storage = ObjectStorage::memory().unwrap();
890        let namespace = "test".to_string();
891        storage.ensure_namespace(&namespace).await.unwrap();
892
893        // Insert
894        let v1 = vec![Vector {
895            id: "v1".to_string(),
896            values: vec![1.0, 2.0],
897            metadata: None,
898            ttl_seconds: None,
899            expires_at: None,
900        }];
901        storage.upsert(&namespace, v1).await.unwrap();
902
903        // Upsert (update)
904        let v1_updated = vec![Vector {
905            id: "v1".to_string(),
906            values: vec![3.0, 4.0],
907            metadata: None,
908            ttl_seconds: None,
909            expires_at: None,
910        }];
911        storage.upsert(&namespace, v1_updated).await.unwrap();
912
913        // Verify update
914        let results = storage.get(&namespace, &["v1".to_string()]).await.unwrap();
915        assert_eq!(results.len(), 1);
916        assert_eq!(results[0].values, vec![3.0, 4.0]);
917
918        // Count should still be 1
919        assert_eq!(storage.count(&namespace).await.unwrap(), 1);
920    }
921
922    #[tokio::test]
923    async fn test_index_storage() {
924        let storage = ObjectStorage::memory().unwrap();
925        let namespace = "test_index".to_string();
926
927        // Initially no indexes
928        assert!(!storage
929            .index_exists(&namespace, IndexType::Hnsw)
930            .await
931            .unwrap());
932
933        // Save an index
934        let index_data = b"fake hnsw index data for testing".to_vec();
935        storage
936            .save_index(&namespace, IndexType::Hnsw, index_data.clone())
937            .await
938            .unwrap();
939
940        // Check it exists
941        assert!(storage
942            .index_exists(&namespace, IndexType::Hnsw)
943            .await
944            .unwrap());
945        assert!(!storage
946            .index_exists(&namespace, IndexType::Pq)
947            .await
948            .unwrap());
949
950        // Load it back
951        let loaded = storage
952            .load_index(&namespace, IndexType::Hnsw)
953            .await
954            .unwrap();
955        assert!(loaded.is_some());
956        assert_eq!(loaded.unwrap(), index_data);
957
958        // Save another index type
959        let pq_data = b"fake pq index data".to_vec();
960        storage
961            .save_index(&namespace, IndexType::Pq, pq_data)
962            .await
963            .unwrap();
964
965        // List indexes
966        let indexes = storage.list_indexes(&namespace).await.unwrap();
967        assert_eq!(indexes.len(), 2);
968        assert!(indexes.contains(&IndexType::Hnsw));
969        assert!(indexes.contains(&IndexType::Pq));
970
971        // Delete index
972        let deleted = storage
973            .delete_index(&namespace, IndexType::Hnsw)
974            .await
975            .unwrap();
976        assert!(deleted);
977        assert!(!storage
978            .index_exists(&namespace, IndexType::Hnsw)
979            .await
980            .unwrap());
981
982        // Delete non-existent
983        let deleted = storage
984            .delete_index(&namespace, IndexType::Hnsw)
985            .await
986            .unwrap();
987        assert!(!deleted);
988
989        // Load non-existent
990        let loaded = storage
991            .load_index(&namespace, IndexType::Hnsw)
992            .await
993            .unwrap();
994        assert!(loaded.is_none());
995    }
996}