Skip to main content

storage/
object.rs

1//! Object Storage Backend using OpenDAL
2//!
3//! Provides storage abstraction for multiple cloud storage backends:
4//! - S3-compatible (AWS S3, MinIO, DigitalOcean Spaces, etc.)
5//! - Azure Blob Storage
6//! - Google Cloud Storage (GCS)
7//! - Local filesystem
8//! - In-memory (for testing)
9
10use async_trait::async_trait;
11use common::{DakeraError, NamespaceId, Result, Vector, VectorId};
12use futures_util::stream::{self, StreamExt};
13use opendal::{layers::RetryLayer, services, Operator};
14use serde::{Deserialize, Serialize};
15use std::time::Duration;
16
17use crate::traits::{IndexStorage, IndexType, VectorStorage};
18
19fn s3_concurrent_ops() -> usize {
20    std::env::var("DAKERA_S3_CONCURRENT_OPS")
21        .ok()
22        .and_then(|v| v.parse().ok())
23        .unwrap_or(16)
24}
25
26/// Serializable namespace metadata
27#[derive(Debug, Clone, Serialize, Deserialize)]
28struct NamespaceMetadata {
29    dimension: Option<usize>,
30    vector_count: usize,
31    created_at: u64,
32    updated_at: u64,
33}
34
35/// Serializable vector data for storage
36#[derive(Debug, Clone, Serialize, Deserialize)]
37struct StoredVector {
38    id: String,
39    values: Vec<f32>,
40    metadata: Option<serde_json::Value>,
41}
42
43impl From<Vector> for StoredVector {
44    fn from(v: Vector) -> Self {
45        Self {
46            id: v.id,
47            values: v.values,
48            metadata: v.metadata,
49        }
50    }
51}
52
53impl From<StoredVector> for Vector {
54    fn from(v: StoredVector) -> Self {
55        Self {
56            id: v.id,
57            values: v.values,
58            metadata: v.metadata,
59            ttl_seconds: None,
60            expires_at: None,
61        }
62    }
63}
64
65/// Object storage backend configuration
66#[derive(Debug, Clone, Default)]
67pub enum ObjectStorageConfig {
68    /// In-memory storage (for testing)
69    #[default]
70    Memory,
71    /// Local filesystem storage
72    Filesystem { root: String },
73    /// S3-compatible storage (S3, MinIO, etc.)
74    S3 {
75        bucket: String,
76        region: Option<String>,
77        endpoint: Option<String>,
78        access_key_id: Option<String>,
79        secret_access_key: Option<String>,
80    },
81    /// Azure Blob Storage
82    Azure {
83        container: String,
84        account_name: String,
85        account_key: Option<String>,
86        sas_token: Option<String>,
87        endpoint: Option<String>,
88    },
89    /// Google Cloud Storage
90    Gcs {
91        bucket: String,
92        credential_path: Option<String>,
93        endpoint: Option<String>,
94    },
95}
96
97/// Object storage backend using OpenDAL
98#[derive(Clone)]
99pub struct ObjectStorage {
100    operator: Operator,
101}
102
103impl ObjectStorage {
104    /// Create a new object storage with the given configuration
105    pub fn new(config: ObjectStorageConfig) -> Result<Self> {
106        let operator = Self::build_operator(&config)?;
107        Ok(Self { operator })
108    }
109
110    /// Create in-memory storage (for testing)
111    pub fn memory() -> Result<Self> {
112        Self::new(ObjectStorageConfig::Memory)
113    }
114
115    /// Create filesystem storage
116    pub fn filesystem(root: impl Into<String>) -> Result<Self> {
117        Self::new(ObjectStorageConfig::Filesystem { root: root.into() })
118    }
119
120    /// Create S3 storage
121    pub fn s3(bucket: impl Into<String>) -> Result<Self> {
122        Self::new(ObjectStorageConfig::S3 {
123            bucket: bucket.into(),
124            region: None,
125            endpoint: None,
126            access_key_id: None,
127            secret_access_key: None,
128        })
129    }
130
131    /// Create Azure Blob storage
132    pub fn azure(container: impl Into<String>, account_name: impl Into<String>) -> Result<Self> {
133        Self::new(ObjectStorageConfig::Azure {
134            container: container.into(),
135            account_name: account_name.into(),
136            account_key: None,
137            sas_token: None,
138            endpoint: None,
139        })
140    }
141
142    /// Create Google Cloud Storage
143    pub fn gcs(bucket: impl Into<String>) -> Result<Self> {
144        Self::new(ObjectStorageConfig::Gcs {
145            bucket: bucket.into(),
146            credential_path: None,
147            endpoint: None,
148        })
149    }
150
151    pub fn build_operator(config: &ObjectStorageConfig) -> Result<Operator> {
152        match config {
153            ObjectStorageConfig::Memory => {
154                let builder = services::Memory::default();
155                Operator::new(builder)
156                    .map(|op| op.finish())
157                    .map_err(|e| DakeraError::Storage(e.to_string()))
158            }
159            ObjectStorageConfig::Filesystem { root } => {
160                let builder = services::Fs::default().root(root);
161                Operator::new(builder)
162                    .map(|op| op.layer(Self::retry_layer()).finish())
163                    .map_err(|e| DakeraError::Storage(e.to_string()))
164            }
165            ObjectStorageConfig::S3 {
166                bucket,
167                region,
168                endpoint,
169                access_key_id,
170                secret_access_key,
171            } => {
172                let mut builder = services::S3::default().bucket(bucket);
173
174                if let Some(region) = region {
175                    builder = builder.region(region);
176                }
177                if let Some(endpoint) = endpoint {
178                    builder = builder.endpoint(endpoint);
179                }
180                if let Some(key) = access_key_id {
181                    builder = builder.access_key_id(key);
182                }
183                if let Some(secret) = secret_access_key {
184                    builder = builder.secret_access_key(secret);
185                }
186
187                Operator::new(builder)
188                    .map(|op| op.layer(Self::retry_layer()).finish())
189                    .map_err(|e| DakeraError::Storage(e.to_string()))
190            }
191            ObjectStorageConfig::Azure {
192                container,
193                account_name,
194                account_key,
195                sas_token,
196                endpoint,
197            } => {
198                let mut builder = services::Azblob::default()
199                    .container(container)
200                    .account_name(account_name);
201
202                if let Some(key) = account_key {
203                    builder = builder.account_key(key);
204                }
205                if let Some(token) = sas_token {
206                    builder = builder.sas_token(token);
207                }
208                if let Some(endpoint) = endpoint {
209                    builder = builder.endpoint(endpoint);
210                }
211
212                Operator::new(builder)
213                    .map(|op| op.layer(Self::retry_layer()).finish())
214                    .map_err(|e| DakeraError::Storage(e.to_string()))
215            }
216            ObjectStorageConfig::Gcs {
217                bucket,
218                credential_path,
219                endpoint,
220            } => {
221                let mut builder = services::Gcs::default().bucket(bucket);
222
223                if let Some(cred_path) = credential_path {
224                    builder = builder.credential_path(cred_path);
225                }
226                if let Some(endpoint) = endpoint {
227                    builder = builder.endpoint(endpoint);
228                }
229
230                Operator::new(builder)
231                    .map(|op| op.layer(Self::retry_layer()).finish())
232                    .map_err(|e| DakeraError::Storage(e.to_string()))
233            }
234        }
235    }
236
237    fn retry_layer() -> RetryLayer {
238        // DAK-3430: default capped from 10 → 3 so a MinIO 429 storm fails fast
239        // (≤3×60s ≈ 3min) rather than blocking production for 10min. Override
240        // with DAKERA_S3_MAX_RETRIES=10 for high-resilience deployments.
241        let max_times: usize = std::env::var("DAKERA_S3_MAX_RETRIES")
242            .ok()
243            .and_then(|v| v.parse().ok())
244            .unwrap_or(3);
245        let min_delay_ms: u64 = std::env::var("DAKERA_S3_RETRY_MIN_DELAY_MS")
246            .ok()
247            .and_then(|v| v.parse().ok())
248            .unwrap_or(500);
249        let max_delay_secs: u64 = std::env::var("DAKERA_S3_RETRY_MAX_DELAY_SECS")
250            .ok()
251            .and_then(|v| v.parse().ok())
252            .unwrap_or(60);
253
254        tracing::info!(
255            max_times,
256            min_delay_ms,
257            max_delay_secs,
258            "S3 retry layer configured"
259        );
260
261        RetryLayer::new()
262            .with_max_times(max_times)
263            .with_min_delay(Duration::from_millis(min_delay_ms))
264            .with_max_delay(Duration::from_secs(max_delay_secs))
265            .with_jitter()
266            .with_factor(2.0)
267    }
268
269    /// Get the path for a vector
270    fn vector_path(namespace: &str, vector_id: &str) -> String {
271        format!("namespaces/{}/vectors/{}.json", namespace, vector_id)
272    }
273
274    /// Get the path for namespace metadata
275    fn namespace_meta_path(namespace: &str) -> String {
276        format!("namespaces/{}/meta.json", namespace)
277    }
278
279    /// Get the path prefix for all vectors in a namespace
280    fn namespace_vectors_prefix(namespace: &str) -> String {
281        format!("namespaces/{}/vectors/", namespace)
282    }
283
284    /// Read namespace metadata
285    async fn read_namespace_meta(&self, namespace: &str) -> Result<Option<NamespaceMetadata>> {
286        let path = Self::namespace_meta_path(namespace);
287        match self.operator.read(&path).await {
288            Ok(data) => {
289                let bytes = data.to_vec();
290                if bytes.is_empty() {
291                    tracing::warn!(
292                        namespace = %namespace,
293                        path = %path,
294                        "Empty namespace metadata file detected, treating as missing"
295                    );
296                    return Ok(None);
297                }
298                match serde_json::from_slice(&bytes) {
299                    Ok(meta) => Ok(Some(meta)),
300                    Err(e) => {
301                        tracing::warn!(
302                            namespace = %namespace,
303                            path = %path,
304                            error = %e,
305                            bytes_len = bytes.len(),
306                            "Corrupted namespace metadata, treating as missing and will be recreated"
307                        );
308                        Ok(None)
309                    }
310                }
311            }
312            Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(None),
313            Err(e) => Err(DakeraError::Storage(e.to_string())),
314        }
315    }
316
317    /// Write namespace metadata
318    async fn write_namespace_meta(&self, namespace: &str, meta: &NamespaceMetadata) -> Result<()> {
319        let path = Self::namespace_meta_path(namespace);
320        let data = serde_json::to_vec(meta).map_err(|e| DakeraError::Storage(e.to_string()))?;
321        self.operator
322            .write(&path, data)
323            .await
324            .map_err(|e| DakeraError::Storage(e.to_string()))?;
325        Ok(())
326    }
327
328    /// Get current timestamp
329    fn now() -> u64 {
330        std::time::SystemTime::now()
331            .duration_since(std::time::UNIX_EPOCH)
332            .unwrap_or_default()
333            .as_secs()
334    }
335}
336
337#[async_trait]
338impl VectorStorage for ObjectStorage {
339    async fn upsert(&self, namespace: &NamespaceId, vectors: Vec<Vector>) -> Result<usize> {
340        if vectors.is_empty() {
341            return Ok(0);
342        }
343
344        // Get or create namespace metadata
345        let mut meta = self
346            .read_namespace_meta(namespace)
347            .await?
348            .unwrap_or_else(|| NamespaceMetadata {
349                dimension: None,
350                vector_count: 0,
351                created_at: Self::now(),
352                updated_at: Self::now(),
353            });
354
355        // Validate dimensions
356        let first_dim = vectors[0].values.len();
357        if let Some(dim) = meta.dimension {
358            for v in &vectors {
359                if v.values.len() != dim {
360                    return Err(DakeraError::DimensionMismatch {
361                        expected: dim,
362                        actual: v.values.len(),
363                    });
364                }
365            }
366        } else {
367            meta.dimension = Some(first_dim);
368        }
369
370        let mut upserted = 0;
371        for vector in vectors {
372            let path = Self::vector_path(namespace, &vector.id);
373            let stored: StoredVector = vector.into();
374            let data =
375                serde_json::to_vec(&stored).map_err(|e| DakeraError::Storage(e.to_string()))?;
376
377            // Check if this is an insert or update
378            let exists = self
379                .operator
380                .exists(&path)
381                .await
382                .map_err(|e| DakeraError::Storage(e.to_string()))?;
383
384            self.operator
385                .write(&path, data)
386                .await
387                .map_err(|e| DakeraError::Storage(e.to_string()))?;
388
389            if !exists {
390                meta.vector_count += 1;
391            }
392            upserted += 1;
393        }
394
395        meta.updated_at = Self::now();
396        self.write_namespace_meta(namespace, &meta).await?;
397
398        tracing::debug!(
399            namespace = namespace,
400            upserted = upserted,
401            "Upserted vectors to object storage"
402        );
403
404        Ok(upserted)
405    }
406
407    async fn get(&self, namespace: &NamespaceId, ids: &[VectorId]) -> Result<Vec<Vector>> {
408        if ids.is_empty() {
409            return Ok(Vec::new());
410        }
411
412        let now = std::time::SystemTime::now()
413            .duration_since(std::time::UNIX_EPOCH)
414            .unwrap_or_default()
415            .as_secs();
416
417        let read_tasks: Vec<_> = ids
418            .iter()
419            .map(|id| {
420                let operator = self.operator.clone();
421                let path = Self::vector_path(namespace, id);
422                let id = id.clone();
423                async move {
424                    match operator.read(&path).await {
425                        Ok(data) => {
426                            let bytes = data.to_vec();
427                            if bytes.is_empty() {
428                                tracing::warn!(
429                                    vector_id = %id,
430                                    "Empty vector file detected, skipping"
431                                );
432                                return Ok(None);
433                            }
434                            match serde_json::from_slice::<StoredVector>(&bytes) {
435                                Ok(stored) => {
436                                    let vector: Vector = stored.into();
437                                    if !vector.is_expired_at(now) {
438                                        Ok(Some(vector))
439                                    } else {
440                                        Ok(None)
441                                    }
442                                }
443                                Err(e) => {
444                                    tracing::warn!(
445                                        vector_id = %id,
446                                        error = %e,
447                                        bytes_len = bytes.len(),
448                                        "Corrupted vector file detected, skipping"
449                                    );
450                                    Ok(None)
451                                }
452                            }
453                        }
454                        Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(None),
455                        Err(e) => Err(DakeraError::Storage(e.to_string())),
456                    }
457                }
458            })
459            .collect();
460
461        let results: Vec<Result<Option<Vector>>> = stream::iter(read_tasks)
462            .buffer_unordered(s3_concurrent_ops())
463            .collect()
464            .await;
465
466        let mut vectors = Vec::with_capacity(ids.len());
467        for result in results {
468            if let Some(v) = result? {
469                vectors.push(v);
470            }
471        }
472        Ok(vectors)
473    }
474
475    async fn get_all(&self, namespace: &NamespaceId) -> Result<Vec<Vector>> {
476        let prefix = Self::namespace_vectors_prefix(namespace);
477
478        let entries = self
479            .operator
480            .list(&prefix)
481            .await
482            .map_err(|e| DakeraError::Storage(e.to_string()))?;
483
484        let json_paths: Vec<String> = entries
485            .into_iter()
486            .filter(|e| e.path().ends_with(".json"))
487            .map(|e| e.path().to_string())
488            .collect();
489
490        if json_paths.is_empty() {
491            return Ok(Vec::new());
492        }
493
494        let now = std::time::SystemTime::now()
495            .duration_since(std::time::UNIX_EPOCH)
496            .unwrap_or_default()
497            .as_secs();
498
499        let results: Vec<Option<Vector>> = stream::iter(json_paths.into_iter().map(|path| {
500            let operator = self.operator.clone();
501            async move {
502                match operator.read(&path).await {
503                    Ok(data) => {
504                        let bytes = data.to_vec();
505                        if let Ok(stored) = serde_json::from_slice::<StoredVector>(&bytes) {
506                            let vector: Vector = stored.into();
507                            if !vector.is_expired_at(now) {
508                                return Some(vector);
509                            }
510                        }
511                        None
512                    }
513                    Err(e) => {
514                        tracing::warn!(path = %path, error = %e, "Failed to read vector");
515                        None
516                    }
517                }
518            }
519        }))
520        .buffer_unordered(s3_concurrent_ops())
521        .collect()
522        .await;
523
524        Ok(results.into_iter().flatten().collect())
525    }
526
527    async fn delete(&self, namespace: &NamespaceId, ids: &[VectorId]) -> Result<usize> {
528        if ids.is_empty() {
529            return Ok(0);
530        }
531
532        let delete_tasks: Vec<_> = ids
533            .iter()
534            .map(|id| {
535                let operator = self.operator.clone();
536                let path = Self::vector_path(namespace, id);
537                async move {
538                    let exists = operator
539                        .exists(&path)
540                        .await
541                        .map_err(|e| DakeraError::Storage(e.to_string()))?;
542                    if exists {
543                        match operator.delete(&path).await {
544                            Ok(_) => Ok(true),
545                            Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(false),
546                            Err(e) => Err(DakeraError::Storage(e.to_string())),
547                        }
548                    } else {
549                        Ok(false)
550                    }
551                }
552            })
553            .collect();
554
555        let results: Vec<Result<bool>> = stream::iter(delete_tasks)
556            .buffer_unordered(s3_concurrent_ops())
557            .collect()
558            .await;
559
560        let mut deleted = 0;
561        for result in results {
562            if result? {
563                deleted += 1;
564            }
565        }
566
567        // Update metadata
568        if deleted > 0 {
569            if let Some(mut meta) = self.read_namespace_meta(namespace).await? {
570                meta.vector_count = meta.vector_count.saturating_sub(deleted);
571                meta.updated_at = Self::now();
572                self.write_namespace_meta(namespace, &meta).await?;
573            }
574        }
575
576        Ok(deleted)
577    }
578
579    async fn namespace_exists(&self, namespace: &NamespaceId) -> Result<bool> {
580        Ok(self.read_namespace_meta(namespace).await?.is_some())
581    }
582
583    async fn ensure_namespace(&self, namespace: &NamespaceId) -> Result<()> {
584        if self.read_namespace_meta(namespace).await?.is_none() {
585            let meta = NamespaceMetadata {
586                dimension: None,
587                vector_count: 0,
588                created_at: Self::now(),
589                updated_at: Self::now(),
590            };
591            self.write_namespace_meta(namespace, &meta).await?;
592        }
593        Ok(())
594    }
595
596    async fn dimension(&self, namespace: &NamespaceId) -> Result<Option<usize>> {
597        Ok(self
598            .read_namespace_meta(namespace)
599            .await?
600            .and_then(|m| m.dimension))
601    }
602
603    async fn count(&self, namespace: &NamespaceId) -> Result<usize> {
604        Ok(self
605            .read_namespace_meta(namespace)
606            .await?
607            .map(|m| m.vector_count)
608            .unwrap_or(0))
609    }
610
611    async fn list_namespaces(&self) -> Result<Vec<NamespaceId>> {
612        let entries = self
613            .operator
614            .list("namespaces/")
615            .await
616            .map_err(|e| DakeraError::Storage(e.to_string()))?;
617
618        let mut namespaces = Vec::new();
619        for entry in entries {
620            let path = entry.path();
621            // Extract namespace name from path like "namespaces/myns/"
622            if let Some(ns) = path.strip_prefix("namespaces/") {
623                let ns = ns.trim_end_matches('/');
624                if !ns.is_empty() && !ns.contains('/') {
625                    // Only include namespaces that actually have metadata (meta.json).
626                    // This filters out ghost directory entries left behind after
627                    // namespace deletion on backends where empty directories persist
628                    // (e.g., local filesystem, some S3-compatible stores).
629                    if self.read_namespace_meta(ns).await?.is_some() {
630                        namespaces.push(ns.to_string());
631                    }
632                }
633            }
634        }
635
636        Ok(namespaces)
637    }
638
639    async fn delete_namespace(&self, namespace: &NamespaceId) -> Result<bool> {
640        // Check if namespace exists
641        if !self.namespace_exists(namespace).await? {
642            return Ok(false);
643        }
644
645        // Recursively remove everything under the namespace prefix.
646        // This deletes vectors, metadata, indexes, AND directory entries,
647        // preventing ghost namespaces from appearing in list_namespaces().
648        let prefix = format!("namespaces/{}/", namespace);
649        self.operator
650            .delete_with(&prefix)
651            .recursive(true)
652            .await
653            .map_err(|e| DakeraError::Storage(e.to_string()))?;
654
655        tracing::debug!(
656            namespace = namespace,
657            "Deleted namespace from object storage"
658        );
659
660        Ok(true)
661    }
662
663    async fn cleanup_expired(&self, _namespace: &NamespaceId) -> Result<usize> {
664        // Object storage doesn't track TTL internally - cleanup handled by application layer
665        // StoredVector doesn't persist TTL fields, so nothing to clean up here
666        Ok(0)
667    }
668
669    async fn cleanup_all_expired(&self) -> Result<usize> {
670        // Object storage doesn't track TTL internally - cleanup handled by application layer
671        Ok(0)
672    }
673}
674
675#[async_trait]
676impl IndexStorage for ObjectStorage {
677    async fn save_index(
678        &self,
679        namespace: &NamespaceId,
680        index_type: IndexType,
681        data: Vec<u8>,
682    ) -> Result<()> {
683        let path = format!(
684            "namespaces/{}/indexes/{}.bin",
685            namespace,
686            index_type.as_str()
687        );
688        self.operator
689            .write(&path, data)
690            .await
691            .map_err(|e| DakeraError::Storage(e.to_string()))?;
692
693        tracing::debug!(
694            namespace = namespace,
695            index_type = index_type.as_str(),
696            "Saved index to object storage"
697        );
698        Ok(())
699    }
700
701    async fn load_index(
702        &self,
703        namespace: &NamespaceId,
704        index_type: IndexType,
705    ) -> Result<Option<Vec<u8>>> {
706        let path = format!(
707            "namespaces/{}/indexes/{}.bin",
708            namespace,
709            index_type.as_str()
710        );
711        match self.operator.read(&path).await {
712            Ok(data) => {
713                tracing::debug!(
714                    namespace = namespace,
715                    index_type = index_type.as_str(),
716                    size = data.len(),
717                    "Loaded index from object storage"
718                );
719                Ok(Some(data.to_vec()))
720            }
721            Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(None),
722            Err(e) => Err(DakeraError::Storage(e.to_string())),
723        }
724    }
725
726    async fn delete_index(&self, namespace: &NamespaceId, index_type: IndexType) -> Result<bool> {
727        let path = format!(
728            "namespaces/{}/indexes/{}.bin",
729            namespace,
730            index_type.as_str()
731        );
732        let exists = self
733            .operator
734            .exists(&path)
735            .await
736            .map_err(|e| DakeraError::Storage(e.to_string()))?;
737
738        if exists {
739            self.operator
740                .delete(&path)
741                .await
742                .map_err(|e| DakeraError::Storage(e.to_string()))?;
743            tracing::debug!(
744                namespace = namespace,
745                index_type = index_type.as_str(),
746                "Deleted index from object storage"
747            );
748        }
749        Ok(exists)
750    }
751
752    async fn index_exists(&self, namespace: &NamespaceId, index_type: IndexType) -> Result<bool> {
753        let path = format!(
754            "namespaces/{}/indexes/{}.bin",
755            namespace,
756            index_type.as_str()
757        );
758        self.operator
759            .exists(&path)
760            .await
761            .map_err(|e| DakeraError::Storage(e.to_string()))
762    }
763
764    async fn list_indexes(&self, namespace: &NamespaceId) -> Result<Vec<IndexType>> {
765        let prefix = format!("namespaces/{}/indexes/", namespace);
766        let entries = self
767            .operator
768            .list(&prefix)
769            .await
770            .map_err(|e| DakeraError::Storage(e.to_string()))?;
771
772        let mut indexes = Vec::new();
773        for entry in entries {
774            let path = entry.path();
775            if path.ends_with(".bin") {
776                // Extract index type from filename
777                if let Some(filename) = path.strip_prefix(&prefix) {
778                    let name = filename.trim_end_matches(".bin");
779                    match name {
780                        "hnsw" => indexes.push(IndexType::Hnsw),
781                        "pq" => indexes.push(IndexType::Pq),
782                        "ivf" => indexes.push(IndexType::Ivf),
783                        "spfresh" => indexes.push(IndexType::SpFresh),
784                        "fulltext" => indexes.push(IndexType::FullText),
785                        _ => {} // Ignore unknown index types
786                    }
787                }
788            }
789        }
790
791        Ok(indexes)
792    }
793}
794
795/// Create an OpenDAL operator from configuration without constructing a full ObjectStorage.
796/// Useful for lightweight S3 access (e.g., BackupManager metadata persistence).
797pub fn create_operator(config: &ObjectStorageConfig) -> Result<Operator> {
798    ObjectStorage::build_operator(config)
799}
800
801#[cfg(test)]
802mod tests {
803    use super::*;
804
805    #[tokio::test]
806    async fn test_object_storage_memory() {
807        let storage = ObjectStorage::memory().unwrap();
808        let namespace = "test".to_string();
809
810        // Ensure namespace
811        storage.ensure_namespace(&namespace).await.unwrap();
812        assert!(storage.namespace_exists(&namespace).await.unwrap());
813
814        // Insert vectors
815        let vectors = vec![
816            Vector {
817                id: "v1".to_string(),
818                values: vec![1.0, 2.0, 3.0],
819                metadata: None,
820                ttl_seconds: None,
821                expires_at: None,
822            },
823            Vector {
824                id: "v2".to_string(),
825                values: vec![4.0, 5.0, 6.0],
826                metadata: Some(serde_json::json!({"key": "value"})),
827                ttl_seconds: None,
828                expires_at: None,
829            },
830        ];
831
832        let count = storage.upsert(&namespace, vectors).await.unwrap();
833        assert_eq!(count, 2);
834
835        // Get single vector
836        let results = storage.get(&namespace, &["v1".to_string()]).await.unwrap();
837        assert_eq!(results.len(), 1);
838        assert_eq!(results[0].id, "v1");
839        assert_eq!(results[0].values, vec![1.0, 2.0, 3.0]);
840
841        // Get all vectors
842        let all = storage.get_all(&namespace).await.unwrap();
843        assert_eq!(all.len(), 2);
844
845        // Count
846        assert_eq!(storage.count(&namespace).await.unwrap(), 2);
847
848        // Delete
849        let deleted = storage
850            .delete(&namespace, &["v1".to_string()])
851            .await
852            .unwrap();
853        assert_eq!(deleted, 1);
854        assert!(storage
855            .get(&namespace, &["v1".to_string()])
856            .await
857            .unwrap()
858            .is_empty());
859        assert_eq!(storage.count(&namespace).await.unwrap(), 1);
860    }
861
862    #[tokio::test]
863    async fn test_object_storage_dimension_mismatch() {
864        let storage = ObjectStorage::memory().unwrap();
865        let namespace = "test".to_string();
866        storage.ensure_namespace(&namespace).await.unwrap();
867
868        // Insert first vector
869        let v1 = vec![Vector {
870            id: "v1".to_string(),
871            values: vec![1.0, 2.0, 3.0],
872            metadata: None,
873            ttl_seconds: None,
874            expires_at: None,
875        }];
876        storage.upsert(&namespace, v1).await.unwrap();
877
878        // Try to insert vector with different dimension
879        let v2 = vec![Vector {
880            id: "v2".to_string(),
881            values: vec![1.0, 2.0], // Wrong dimension
882            metadata: None,
883            ttl_seconds: None,
884            expires_at: None,
885        }];
886        let result = storage.upsert(&namespace, v2).await;
887        assert!(result.is_err());
888    }
889
890    #[tokio::test]
891    async fn test_object_storage_upsert() {
892        let storage = ObjectStorage::memory().unwrap();
893        let namespace = "test".to_string();
894        storage.ensure_namespace(&namespace).await.unwrap();
895
896        // Insert
897        let v1 = vec![Vector {
898            id: "v1".to_string(),
899            values: vec![1.0, 2.0],
900            metadata: None,
901            ttl_seconds: None,
902            expires_at: None,
903        }];
904        storage.upsert(&namespace, v1).await.unwrap();
905
906        // Upsert (update)
907        let v1_updated = vec![Vector {
908            id: "v1".to_string(),
909            values: vec![3.0, 4.0],
910            metadata: None,
911            ttl_seconds: None,
912            expires_at: None,
913        }];
914        storage.upsert(&namespace, v1_updated).await.unwrap();
915
916        // Verify update
917        let results = storage.get(&namespace, &["v1".to_string()]).await.unwrap();
918        assert_eq!(results.len(), 1);
919        assert_eq!(results[0].values, vec![3.0, 4.0]);
920
921        // Count should still be 1
922        assert_eq!(storage.count(&namespace).await.unwrap(), 1);
923    }
924
925    #[tokio::test]
926    async fn test_index_storage() {
927        let storage = ObjectStorage::memory().unwrap();
928        let namespace = "test_index".to_string();
929
930        // Initially no indexes
931        assert!(!storage
932            .index_exists(&namespace, IndexType::Hnsw)
933            .await
934            .unwrap());
935
936        // Save an index
937        let index_data = b"fake hnsw index data for testing".to_vec();
938        storage
939            .save_index(&namespace, IndexType::Hnsw, index_data.clone())
940            .await
941            .unwrap();
942
943        // Check it exists
944        assert!(storage
945            .index_exists(&namespace, IndexType::Hnsw)
946            .await
947            .unwrap());
948        assert!(!storage
949            .index_exists(&namespace, IndexType::Pq)
950            .await
951            .unwrap());
952
953        // Load it back
954        let loaded = storage
955            .load_index(&namespace, IndexType::Hnsw)
956            .await
957            .unwrap();
958        assert!(loaded.is_some());
959        assert_eq!(loaded.unwrap(), index_data);
960
961        // Save another index type
962        let pq_data = b"fake pq index data".to_vec();
963        storage
964            .save_index(&namespace, IndexType::Pq, pq_data)
965            .await
966            .unwrap();
967
968        // List indexes
969        let indexes = storage.list_indexes(&namespace).await.unwrap();
970        assert_eq!(indexes.len(), 2);
971        assert!(indexes.contains(&IndexType::Hnsw));
972        assert!(indexes.contains(&IndexType::Pq));
973
974        // Delete index
975        let deleted = storage
976            .delete_index(&namespace, IndexType::Hnsw)
977            .await
978            .unwrap();
979        assert!(deleted);
980        assert!(!storage
981            .index_exists(&namespace, IndexType::Hnsw)
982            .await
983            .unwrap());
984
985        // Delete non-existent
986        let deleted = storage
987            .delete_index(&namespace, IndexType::Hnsw)
988            .await
989            .unwrap();
990        assert!(!deleted);
991
992        // Load non-existent
993        let loaded = storage
994            .load_index(&namespace, IndexType::Hnsw)
995            .await
996            .unwrap();
997        assert!(loaded.is_none());
998    }
999}