Skip to main content

storage/
object.rs

1//! Object Storage Backend using OpenDAL
2//!
3//! Provides storage abstraction for multiple cloud storage backends:
4//! - S3-compatible (AWS S3, MinIO, DigitalOcean Spaces, etc.)
5//! - Azure Blob Storage
6//! - Google Cloud Storage (GCS)
7//! - Local filesystem
8//! - In-memory (for testing)
9
10use async_trait::async_trait;
11use common::{DakeraError, NamespaceId, Result, Vector, VectorId};
12use futures_util::stream::{self, StreamExt};
13use opendal::{layers::RetryLayer, services, Operator};
14use serde::{Deserialize, Serialize};
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::time::Duration;
17
18static TMP_COUNTER: AtomicU64 = AtomicU64::new(0);
19
20use crate::traits::{IndexStorage, IndexType, VectorStorage};
21
22fn s3_concurrent_ops() -> usize {
23    std::env::var("DAKERA_S3_CONCURRENT_OPS")
24        .ok()
25        .and_then(|v| v.parse().ok())
26        .unwrap_or(16)
27}
28
29/// Serializable namespace metadata
30#[derive(Debug, Clone, Serialize, Deserialize)]
31struct NamespaceMetadata {
32    dimension: Option<usize>,
33    vector_count: usize,
34    created_at: u64,
35    updated_at: u64,
36}
37
38/// Serializable vector data for storage
39#[derive(Debug, Clone, Serialize, Deserialize)]
40struct StoredVector {
41    id: String,
42    values: Vec<f32>,
43    metadata: Option<serde_json::Value>,
44}
45
46impl From<Vector> for StoredVector {
47    fn from(v: Vector) -> Self {
48        Self {
49            id: v.id,
50            values: v.values,
51            metadata: v.metadata,
52        }
53    }
54}
55
56impl From<StoredVector> for Vector {
57    fn from(v: StoredVector) -> Self {
58        Self {
59            id: v.id,
60            values: v.values,
61            metadata: v.metadata,
62            ttl_seconds: None,
63            expires_at: None,
64        }
65    }
66}
67
68/// Object storage backend configuration
69#[derive(Debug, Clone, Default)]
70pub enum ObjectStorageConfig {
71    /// In-memory storage (for testing)
72    #[default]
73    Memory,
74    /// Local filesystem storage
75    Filesystem { root: String },
76    /// S3-compatible storage (S3, MinIO, etc.)
77    S3 {
78        bucket: String,
79        region: Option<String>,
80        endpoint: Option<String>,
81        access_key_id: Option<String>,
82        secret_access_key: Option<String>,
83    },
84    /// Azure Blob Storage
85    Azure {
86        container: String,
87        account_name: String,
88        account_key: Option<String>,
89        sas_token: Option<String>,
90        endpoint: Option<String>,
91    },
92    /// Google Cloud Storage
93    Gcs {
94        bucket: String,
95        credential_path: Option<String>,
96        endpoint: Option<String>,
97    },
98}
99
100/// Object storage backend using OpenDAL
101#[derive(Clone)]
102pub struct ObjectStorage {
103    operator: Operator,
104}
105
106impl ObjectStorage {
107    /// Create a new object storage with the given configuration
108    pub fn new(config: ObjectStorageConfig) -> Result<Self> {
109        let operator = Self::build_operator(&config)?;
110        Ok(Self { operator })
111    }
112
113    /// Create in-memory storage (for testing)
114    pub fn memory() -> Result<Self> {
115        Self::new(ObjectStorageConfig::Memory)
116    }
117
118    /// Create filesystem storage
119    pub fn filesystem(root: impl Into<String>) -> Result<Self> {
120        Self::new(ObjectStorageConfig::Filesystem { root: root.into() })
121    }
122
123    /// Create S3 storage
124    pub fn s3(bucket: impl Into<String>) -> Result<Self> {
125        Self::new(ObjectStorageConfig::S3 {
126            bucket: bucket.into(),
127            region: None,
128            endpoint: None,
129            access_key_id: None,
130            secret_access_key: None,
131        })
132    }
133
134    /// Create Azure Blob storage
135    pub fn azure(container: impl Into<String>, account_name: impl Into<String>) -> Result<Self> {
136        Self::new(ObjectStorageConfig::Azure {
137            container: container.into(),
138            account_name: account_name.into(),
139            account_key: None,
140            sas_token: None,
141            endpoint: None,
142        })
143    }
144
145    /// Create Google Cloud Storage
146    pub fn gcs(bucket: impl Into<String>) -> Result<Self> {
147        Self::new(ObjectStorageConfig::Gcs {
148            bucket: bucket.into(),
149            credential_path: None,
150            endpoint: None,
151        })
152    }
153
154    pub fn build_operator(config: &ObjectStorageConfig) -> Result<Operator> {
155        match config {
156            ObjectStorageConfig::Memory => {
157                let builder = services::Memory::default();
158                Operator::new(builder)
159                    .map(|op| op.finish())
160                    .map_err(|e| DakeraError::Storage(e.to_string()))
161            }
162            ObjectStorageConfig::Filesystem { root } => {
163                let builder = services::Fs::default().root(root);
164                Operator::new(builder)
165                    .map(|op| op.layer(Self::retry_layer()).finish())
166                    .map_err(|e| DakeraError::Storage(e.to_string()))
167            }
168            ObjectStorageConfig::S3 {
169                bucket,
170                region,
171                endpoint,
172                access_key_id,
173                secret_access_key,
174            } => {
175                let mut builder = services::S3::default().bucket(bucket);
176
177                if let Some(region) = region {
178                    builder = builder.region(region);
179                }
180                if let Some(endpoint) = endpoint {
181                    builder = builder.endpoint(endpoint);
182                }
183                if let Some(key) = access_key_id {
184                    builder = builder.access_key_id(key);
185                }
186                if let Some(secret) = secret_access_key {
187                    builder = builder.secret_access_key(secret);
188                }
189
190                Operator::new(builder)
191                    .map(|op| op.layer(Self::retry_layer()).finish())
192                    .map_err(|e| DakeraError::Storage(e.to_string()))
193            }
194            ObjectStorageConfig::Azure {
195                container,
196                account_name,
197                account_key,
198                sas_token,
199                endpoint,
200            } => {
201                let mut builder = services::Azblob::default()
202                    .container(container)
203                    .account_name(account_name);
204
205                if let Some(key) = account_key {
206                    builder = builder.account_key(key);
207                }
208                if let Some(token) = sas_token {
209                    builder = builder.sas_token(token);
210                }
211                if let Some(endpoint) = endpoint {
212                    builder = builder.endpoint(endpoint);
213                }
214
215                Operator::new(builder)
216                    .map(|op| op.layer(Self::retry_layer()).finish())
217                    .map_err(|e| DakeraError::Storage(e.to_string()))
218            }
219            ObjectStorageConfig::Gcs {
220                bucket,
221                credential_path,
222                endpoint,
223            } => {
224                let mut builder = services::Gcs::default().bucket(bucket);
225
226                if let Some(cred_path) = credential_path {
227                    builder = builder.credential_path(cred_path);
228                }
229                if let Some(endpoint) = endpoint {
230                    builder = builder.endpoint(endpoint);
231                }
232
233                Operator::new(builder)
234                    .map(|op| op.layer(Self::retry_layer()).finish())
235                    .map_err(|e| DakeraError::Storage(e.to_string()))
236            }
237        }
238    }
239
240    fn retry_layer() -> RetryLayer {
241        // DAK-3430: default capped from 10 → 3 so a MinIO 429 storm fails fast
242        // (≤3×60s ≈ 3min) rather than blocking production for 10min. Override
243        // with DAKERA_S3_MAX_RETRIES=10 for high-resilience deployments.
244        let max_times: usize = std::env::var("DAKERA_S3_MAX_RETRIES")
245            .ok()
246            .and_then(|v| v.parse().ok())
247            .unwrap_or(3);
248        let min_delay_ms: u64 = std::env::var("DAKERA_S3_RETRY_MIN_DELAY_MS")
249            .ok()
250            .and_then(|v| v.parse().ok())
251            .unwrap_or(500);
252        let max_delay_secs: u64 = std::env::var("DAKERA_S3_RETRY_MAX_DELAY_SECS")
253            .ok()
254            .and_then(|v| v.parse().ok())
255            .unwrap_or(60);
256
257        tracing::info!(
258            max_times,
259            min_delay_ms,
260            max_delay_secs,
261            "S3 retry layer configured"
262        );
263
264        RetryLayer::new()
265            .with_max_times(max_times)
266            .with_min_delay(Duration::from_millis(min_delay_ms))
267            .with_max_delay(Duration::from_secs(max_delay_secs))
268            .with_jitter()
269            .with_factor(2.0)
270    }
271
272    /// Get the path for a vector
273    fn vector_path(namespace: &str, vector_id: &str) -> String {
274        format!("namespaces/{}/vectors/{}.json", namespace, vector_id)
275    }
276
277    /// Get the path for namespace metadata
278    fn namespace_meta_path(namespace: &str) -> String {
279        format!("namespaces/{}/meta.json", namespace)
280    }
281
282    /// Get the path prefix for all vectors in a namespace
283    fn namespace_vectors_prefix(namespace: &str) -> String {
284        format!("namespaces/{}/vectors/", namespace)
285    }
286
287    /// Read namespace metadata
288    async fn read_namespace_meta(&self, namespace: &str) -> Result<Option<NamespaceMetadata>> {
289        let path = Self::namespace_meta_path(namespace);
290        match self.operator.read(&path).await {
291            Ok(data) => {
292                let bytes = data.to_vec();
293                if bytes.is_empty() {
294                    tracing::warn!(
295                        namespace = %namespace,
296                        path = %path,
297                        "Empty namespace metadata file detected, treating as missing"
298                    );
299                    return Ok(None);
300                }
301                match serde_json::from_slice(&bytes) {
302                    Ok(meta) => Ok(Some(meta)),
303                    Err(e) => {
304                        tracing::warn!(
305                            namespace = %namespace,
306                            path = %path,
307                            error = %e,
308                            bytes_len = bytes.len(),
309                            "Corrupted namespace metadata, treating as missing and will be recreated"
310                        );
311                        Ok(None)
312                    }
313                }
314            }
315            Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(None),
316            Err(e) => Err(DakeraError::Storage(e.to_string())),
317        }
318    }
319
320    /// Write namespace metadata atomically.
321    ///
322    /// Writes to a `.tmp` file first, then renames to the final path. On POSIX filesystems
323    async fn write_namespace_meta(&self, namespace: &str, meta: &NamespaceMetadata) -> Result<()> {
324        let path = Self::namespace_meta_path(namespace);
325        let data = serde_json::to_vec(meta).map_err(|e| DakeraError::Storage(e.to_string()))?;
326        self.write_atomic(&path, data).await
327    }
328
329    /// Write data to `path` atomically via tmp+rename (DAK-4545).
330    ///
331    /// Writes to a `.tmp` file first, then renames to the final path. On POSIX filesystems
332    /// rename(2) is atomic within the same device, preventing concurrent readers from seeing
333    /// a truncated/empty file during the write window (O_TRUNC race). For object-store
334    /// backends that don't support rename (e.g. S3 without copy-delete), falls back to a
335    /// direct write — S3 PUT is itself atomic (readers get previous version until PUT completes).
336    async fn write_atomic(&self, path: &str, data: Vec<u8>) -> Result<()> {
337        let seq = TMP_COUNTER.fetch_add(1, Ordering::Relaxed);
338        let tmp_path = format!("{}.tmp.{}.{}", path, Self::now(), seq);
339        let rename_result = async {
340            self.operator
341                .write(&tmp_path, data.clone())
342                .await
343                .map_err(|e| DakeraError::Storage(e.to_string()))?;
344            self.operator
345                .rename(&tmp_path, path)
346                .await
347                .map_err(|e| DakeraError::Storage(e.to_string()))?;
348            Ok::<(), DakeraError>(())
349        }
350        .await;
351        if let Err(e) = rename_result {
352            // Backend doesn't support rename: fall back to direct write.
353            tracing::debug!(path = %path, error = %e, "atomic rename failed, falling back to direct write");
354            let _ = self.operator.delete(&tmp_path).await;
355            self.operator
356                .write(path, data)
357                .await
358                .map_err(|e| DakeraError::Storage(e.to_string()))?;
359        }
360        Ok(())
361    }
362
363    /// Get current timestamp
364    fn now() -> u64 {
365        std::time::SystemTime::now()
366            .duration_since(std::time::UNIX_EPOCH)
367            .unwrap_or_default()
368            .as_secs()
369    }
370}
371
372#[async_trait]
373impl VectorStorage for ObjectStorage {
374    async fn upsert(&self, namespace: &NamespaceId, vectors: Vec<Vector>) -> Result<usize> {
375        if vectors.is_empty() {
376            return Ok(0);
377        }
378
379        // Get or create namespace metadata
380        let mut meta = self
381            .read_namespace_meta(namespace)
382            .await?
383            .unwrap_or_else(|| NamespaceMetadata {
384                dimension: None,
385                vector_count: 0,
386                created_at: Self::now(),
387                updated_at: Self::now(),
388            });
389
390        // Validate dimensions
391        let first_dim = vectors[0].values.len();
392        if let Some(dim) = meta.dimension {
393            for v in &vectors {
394                if v.values.len() != dim {
395                    return Err(DakeraError::DimensionMismatch {
396                        expected: dim,
397                        actual: v.values.len(),
398                    });
399                }
400            }
401        } else {
402            meta.dimension = Some(first_dim);
403        }
404
405        let mut upserted = 0;
406        for vector in vectors {
407            let path = Self::vector_path(namespace, &vector.id);
408            let stored: StoredVector = vector.into();
409            let data =
410                serde_json::to_vec(&stored).map_err(|e| DakeraError::Storage(e.to_string()))?;
411
412            // Check if this is an insert or update
413            let exists = self
414                .operator
415                .exists(&path)
416                .await
417                .map_err(|e| DakeraError::Storage(e.to_string()))?;
418
419            self.write_atomic(&path, data).await?;
420
421            if !exists {
422                meta.vector_count += 1;
423            }
424            upserted += 1;
425        }
426
427        meta.updated_at = Self::now();
428        self.write_namespace_meta(namespace, &meta).await?;
429
430        tracing::debug!(
431            namespace = namespace,
432            upserted = upserted,
433            "Upserted vectors to object storage"
434        );
435
436        Ok(upserted)
437    }
438
439    async fn get(&self, namespace: &NamespaceId, ids: &[VectorId]) -> Result<Vec<Vector>> {
440        if ids.is_empty() {
441            return Ok(Vec::new());
442        }
443
444        let now = std::time::SystemTime::now()
445            .duration_since(std::time::UNIX_EPOCH)
446            .unwrap_or_default()
447            .as_secs();
448
449        let read_tasks: Vec<_> = ids
450            .iter()
451            .map(|id| {
452                let operator = self.operator.clone();
453                let path = Self::vector_path(namespace, id);
454                let id = id.clone();
455                async move {
456                    match operator.read(&path).await {
457                        Ok(data) => {
458                            let bytes = data.to_vec();
459                            if bytes.is_empty() {
460                                tracing::warn!(
461                                    vector_id = %id,
462                                    "Empty vector file detected, skipping"
463                                );
464                                return Ok(None);
465                            }
466                            match serde_json::from_slice::<StoredVector>(&bytes) {
467                                Ok(stored) => {
468                                    let vector: Vector = stored.into();
469                                    if !vector.is_expired_at(now) {
470                                        Ok(Some(vector))
471                                    } else {
472                                        Ok(None)
473                                    }
474                                }
475                                Err(e) => {
476                                    tracing::warn!(
477                                        vector_id = %id,
478                                        error = %e,
479                                        bytes_len = bytes.len(),
480                                        "Corrupted vector file detected, skipping"
481                                    );
482                                    Ok(None)
483                                }
484                            }
485                        }
486                        Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(None),
487                        Err(e) => Err(DakeraError::Storage(e.to_string())),
488                    }
489                }
490            })
491            .collect();
492
493        let results: Vec<Result<Option<Vector>>> = stream::iter(read_tasks)
494            .buffer_unordered(s3_concurrent_ops())
495            .collect()
496            .await;
497
498        let mut vectors = Vec::with_capacity(ids.len());
499        for result in results {
500            if let Some(v) = result? {
501                vectors.push(v);
502            }
503        }
504        Ok(vectors)
505    }
506
507    async fn get_all(&self, namespace: &NamespaceId) -> Result<Vec<Vector>> {
508        let prefix = Self::namespace_vectors_prefix(namespace);
509
510        let entries = self
511            .operator
512            .list(&prefix)
513            .await
514            .map_err(|e| DakeraError::Storage(e.to_string()))?;
515
516        let json_paths: Vec<String> = entries
517            .into_iter()
518            .filter(|e| e.path().ends_with(".json"))
519            .map(|e| e.path().to_string())
520            .collect();
521
522        if json_paths.is_empty() {
523            return Ok(Vec::new());
524        }
525
526        let now = std::time::SystemTime::now()
527            .duration_since(std::time::UNIX_EPOCH)
528            .unwrap_or_default()
529            .as_secs();
530
531        let results: Vec<Option<Vector>> = stream::iter(json_paths.into_iter().map(|path| {
532            let operator = self.operator.clone();
533            async move {
534                match operator.read(&path).await {
535                    Ok(data) => {
536                        let bytes = data.to_vec();
537                        if let Ok(stored) = serde_json::from_slice::<StoredVector>(&bytes) {
538                            let vector: Vector = stored.into();
539                            if !vector.is_expired_at(now) {
540                                return Some(vector);
541                            }
542                        }
543                        None
544                    }
545                    Err(e) => {
546                        tracing::warn!(path = %path, error = %e, "Failed to read vector");
547                        None
548                    }
549                }
550            }
551        }))
552        .buffer_unordered(s3_concurrent_ops())
553        .collect()
554        .await;
555
556        Ok(results.into_iter().flatten().collect())
557    }
558
559    async fn delete(&self, namespace: &NamespaceId, ids: &[VectorId]) -> Result<usize> {
560        if ids.is_empty() {
561            return Ok(0);
562        }
563
564        let delete_tasks: Vec<_> = ids
565            .iter()
566            .map(|id| {
567                let operator = self.operator.clone();
568                let path = Self::vector_path(namespace, id);
569                async move {
570                    let exists = operator
571                        .exists(&path)
572                        .await
573                        .map_err(|e| DakeraError::Storage(e.to_string()))?;
574                    if exists {
575                        match operator.delete(&path).await {
576                            Ok(_) => Ok(true),
577                            Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(false),
578                            Err(e) => Err(DakeraError::Storage(e.to_string())),
579                        }
580                    } else {
581                        Ok(false)
582                    }
583                }
584            })
585            .collect();
586
587        let results: Vec<Result<bool>> = stream::iter(delete_tasks)
588            .buffer_unordered(s3_concurrent_ops())
589            .collect()
590            .await;
591
592        let mut deleted = 0;
593        for result in results {
594            if result? {
595                deleted += 1;
596            }
597        }
598
599        // Update metadata
600        if deleted > 0 {
601            if let Some(mut meta) = self.read_namespace_meta(namespace).await? {
602                meta.vector_count = meta.vector_count.saturating_sub(deleted);
603                meta.updated_at = Self::now();
604                self.write_namespace_meta(namespace, &meta).await?;
605            }
606        }
607
608        Ok(deleted)
609    }
610
611    async fn namespace_exists(&self, namespace: &NamespaceId) -> Result<bool> {
612        Ok(self.read_namespace_meta(namespace).await?.is_some())
613    }
614
615    async fn ensure_namespace(&self, namespace: &NamespaceId) -> Result<()> {
616        if self.read_namespace_meta(namespace).await?.is_none() {
617            let meta = NamespaceMetadata {
618                dimension: None,
619                vector_count: 0,
620                created_at: Self::now(),
621                updated_at: Self::now(),
622            };
623            self.write_namespace_meta(namespace, &meta).await?;
624        }
625        Ok(())
626    }
627
628    async fn dimension(&self, namespace: &NamespaceId) -> Result<Option<usize>> {
629        Ok(self
630            .read_namespace_meta(namespace)
631            .await?
632            .and_then(|m| m.dimension))
633    }
634
635    async fn count(&self, namespace: &NamespaceId) -> Result<usize> {
636        Ok(self
637            .read_namespace_meta(namespace)
638            .await?
639            .map(|m| m.vector_count)
640            .unwrap_or(0))
641    }
642
643    async fn list_namespaces(&self) -> Result<Vec<NamespaceId>> {
644        let entries = self
645            .operator
646            .list("namespaces/")
647            .await
648            .map_err(|e| DakeraError::Storage(e.to_string()))?;
649
650        let mut namespaces = Vec::new();
651        for entry in entries {
652            let path = entry.path();
653            // Extract namespace name from path like "namespaces/myns/"
654            if let Some(ns) = path.strip_prefix("namespaces/") {
655                let ns = ns.trim_end_matches('/');
656                if !ns.is_empty() && !ns.contains('/') {
657                    // Only include namespaces that actually have metadata (meta.json).
658                    // This filters out ghost directory entries left behind after
659                    // namespace deletion on backends where empty directories persist
660                    // (e.g., local filesystem, some S3-compatible stores).
661                    if self.read_namespace_meta(ns).await?.is_some() {
662                        namespaces.push(ns.to_string());
663                    }
664                }
665            }
666        }
667
668        Ok(namespaces)
669    }
670
671    async fn delete_namespace(&self, namespace: &NamespaceId) -> Result<bool> {
672        // Check if namespace exists
673        if !self.namespace_exists(namespace).await? {
674            return Ok(false);
675        }
676
677        // Recursively remove everything under the namespace prefix.
678        // This deletes vectors, metadata, indexes, AND directory entries,
679        // preventing ghost namespaces from appearing in list_namespaces().
680        let prefix = format!("namespaces/{}/", namespace);
681        self.operator
682            .delete_with(&prefix)
683            .recursive(true)
684            .await
685            .map_err(|e| DakeraError::Storage(e.to_string()))?;
686
687        tracing::debug!(
688            namespace = namespace,
689            "Deleted namespace from object storage"
690        );
691
692        Ok(true)
693    }
694
695    async fn cleanup_expired(&self, _namespace: &NamespaceId) -> Result<usize> {
696        // Object storage doesn't track TTL internally - cleanup handled by application layer
697        // StoredVector doesn't persist TTL fields, so nothing to clean up here
698        Ok(0)
699    }
700
701    async fn cleanup_all_expired(&self) -> Result<usize> {
702        // Object storage doesn't track TTL internally - cleanup handled by application layer
703        Ok(0)
704    }
705}
706
707#[async_trait]
708impl IndexStorage for ObjectStorage {
709    async fn save_index(
710        &self,
711        namespace: &NamespaceId,
712        index_type: IndexType,
713        data: Vec<u8>,
714    ) -> Result<()> {
715        let path = format!(
716            "namespaces/{}/indexes/{}.bin",
717            namespace,
718            index_type.as_str()
719        );
720        self.operator
721            .write(&path, data)
722            .await
723            .map_err(|e| DakeraError::Storage(e.to_string()))?;
724
725        tracing::debug!(
726            namespace = namespace,
727            index_type = index_type.as_str(),
728            "Saved index to object storage"
729        );
730        Ok(())
731    }
732
733    async fn load_index(
734        &self,
735        namespace: &NamespaceId,
736        index_type: IndexType,
737    ) -> Result<Option<Vec<u8>>> {
738        let path = format!(
739            "namespaces/{}/indexes/{}.bin",
740            namespace,
741            index_type.as_str()
742        );
743        match self.operator.read(&path).await {
744            Ok(data) => {
745                tracing::debug!(
746                    namespace = namespace,
747                    index_type = index_type.as_str(),
748                    size = data.len(),
749                    "Loaded index from object storage"
750                );
751                Ok(Some(data.to_vec()))
752            }
753            Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(None),
754            Err(e) => Err(DakeraError::Storage(e.to_string())),
755        }
756    }
757
758    async fn delete_index(&self, namespace: &NamespaceId, index_type: IndexType) -> Result<bool> {
759        let path = format!(
760            "namespaces/{}/indexes/{}.bin",
761            namespace,
762            index_type.as_str()
763        );
764        let exists = self
765            .operator
766            .exists(&path)
767            .await
768            .map_err(|e| DakeraError::Storage(e.to_string()))?;
769
770        if exists {
771            self.operator
772                .delete(&path)
773                .await
774                .map_err(|e| DakeraError::Storage(e.to_string()))?;
775            tracing::debug!(
776                namespace = namespace,
777                index_type = index_type.as_str(),
778                "Deleted index from object storage"
779            );
780        }
781        Ok(exists)
782    }
783
784    async fn index_exists(&self, namespace: &NamespaceId, index_type: IndexType) -> Result<bool> {
785        let path = format!(
786            "namespaces/{}/indexes/{}.bin",
787            namespace,
788            index_type.as_str()
789        );
790        self.operator
791            .exists(&path)
792            .await
793            .map_err(|e| DakeraError::Storage(e.to_string()))
794    }
795
796    async fn list_indexes(&self, namespace: &NamespaceId) -> Result<Vec<IndexType>> {
797        let prefix = format!("namespaces/{}/indexes/", namespace);
798        let entries = self
799            .operator
800            .list(&prefix)
801            .await
802            .map_err(|e| DakeraError::Storage(e.to_string()))?;
803
804        let mut indexes = Vec::new();
805        for entry in entries {
806            let path = entry.path();
807            if path.ends_with(".bin") {
808                // Extract index type from filename
809                if let Some(filename) = path.strip_prefix(&prefix) {
810                    let name = filename.trim_end_matches(".bin");
811                    match name {
812                        "hnsw" => indexes.push(IndexType::Hnsw),
813                        "pq" => indexes.push(IndexType::Pq),
814                        "ivf" => indexes.push(IndexType::Ivf),
815                        "spfresh" => indexes.push(IndexType::SpFresh),
816                        "fulltext" => indexes.push(IndexType::FullText),
817                        _ => {} // Ignore unknown index types
818                    }
819                }
820            }
821        }
822
823        Ok(indexes)
824    }
825}
826
827/// Create an OpenDAL operator from configuration without constructing a full ObjectStorage.
828/// Useful for lightweight S3 access (e.g., BackupManager metadata persistence).
829pub fn create_operator(config: &ObjectStorageConfig) -> Result<Operator> {
830    ObjectStorage::build_operator(config)
831}
832
833#[cfg(test)]
834mod tests {
835    use super::*;
836
837    #[tokio::test]
838    async fn test_object_storage_memory() {
839        let storage = ObjectStorage::memory().unwrap();
840        let namespace = "test".to_string();
841
842        // Ensure namespace
843        storage.ensure_namespace(&namespace).await.unwrap();
844        assert!(storage.namespace_exists(&namespace).await.unwrap());
845
846        // Insert vectors
847        let vectors = vec![
848            Vector {
849                id: "v1".to_string(),
850                values: vec![1.0, 2.0, 3.0],
851                metadata: None,
852                ttl_seconds: None,
853                expires_at: None,
854            },
855            Vector {
856                id: "v2".to_string(),
857                values: vec![4.0, 5.0, 6.0],
858                metadata: Some(serde_json::json!({"key": "value"})),
859                ttl_seconds: None,
860                expires_at: None,
861            },
862        ];
863
864        let count = storage.upsert(&namespace, vectors).await.unwrap();
865        assert_eq!(count, 2);
866
867        // Get single vector
868        let results = storage.get(&namespace, &["v1".to_string()]).await.unwrap();
869        assert_eq!(results.len(), 1);
870        assert_eq!(results[0].id, "v1");
871        assert_eq!(results[0].values, vec![1.0, 2.0, 3.0]);
872
873        // Get all vectors
874        let all = storage.get_all(&namespace).await.unwrap();
875        assert_eq!(all.len(), 2);
876
877        // Count
878        assert_eq!(storage.count(&namespace).await.unwrap(), 2);
879
880        // Delete
881        let deleted = storage
882            .delete(&namespace, &["v1".to_string()])
883            .await
884            .unwrap();
885        assert_eq!(deleted, 1);
886        assert!(storage
887            .get(&namespace, &["v1".to_string()])
888            .await
889            .unwrap()
890            .is_empty());
891        assert_eq!(storage.count(&namespace).await.unwrap(), 1);
892    }
893
894    #[tokio::test]
895    async fn test_object_storage_dimension_mismatch() {
896        let storage = ObjectStorage::memory().unwrap();
897        let namespace = "test".to_string();
898        storage.ensure_namespace(&namespace).await.unwrap();
899
900        // Insert first vector
901        let v1 = vec![Vector {
902            id: "v1".to_string(),
903            values: vec![1.0, 2.0, 3.0],
904            metadata: None,
905            ttl_seconds: None,
906            expires_at: None,
907        }];
908        storage.upsert(&namespace, v1).await.unwrap();
909
910        // Try to insert vector with different dimension
911        let v2 = vec![Vector {
912            id: "v2".to_string(),
913            values: vec![1.0, 2.0], // Wrong dimension
914            metadata: None,
915            ttl_seconds: None,
916            expires_at: None,
917        }];
918        let result = storage.upsert(&namespace, v2).await;
919        assert!(result.is_err());
920    }
921
922    #[tokio::test]
923    async fn test_object_storage_upsert() {
924        let storage = ObjectStorage::memory().unwrap();
925        let namespace = "test".to_string();
926        storage.ensure_namespace(&namespace).await.unwrap();
927
928        // Insert
929        let v1 = vec![Vector {
930            id: "v1".to_string(),
931            values: vec![1.0, 2.0],
932            metadata: None,
933            ttl_seconds: None,
934            expires_at: None,
935        }];
936        storage.upsert(&namespace, v1).await.unwrap();
937
938        // Upsert (update)
939        let v1_updated = vec![Vector {
940            id: "v1".to_string(),
941            values: vec![3.0, 4.0],
942            metadata: None,
943            ttl_seconds: None,
944            expires_at: None,
945        }];
946        storage.upsert(&namespace, v1_updated).await.unwrap();
947
948        // Verify update
949        let results = storage.get(&namespace, &["v1".to_string()]).await.unwrap();
950        assert_eq!(results.len(), 1);
951        assert_eq!(results[0].values, vec![3.0, 4.0]);
952
953        // Count should still be 1
954        assert_eq!(storage.count(&namespace).await.unwrap(), 1);
955    }
956
957    #[tokio::test]
958    async fn test_index_storage() {
959        let storage = ObjectStorage::memory().unwrap();
960        let namespace = "test_index".to_string();
961
962        // Initially no indexes
963        assert!(!storage
964            .index_exists(&namespace, IndexType::Hnsw)
965            .await
966            .unwrap());
967
968        // Save an index
969        let index_data = b"fake hnsw index data for testing".to_vec();
970        storage
971            .save_index(&namespace, IndexType::Hnsw, index_data.clone())
972            .await
973            .unwrap();
974
975        // Check it exists
976        assert!(storage
977            .index_exists(&namespace, IndexType::Hnsw)
978            .await
979            .unwrap());
980        assert!(!storage
981            .index_exists(&namespace, IndexType::Pq)
982            .await
983            .unwrap());
984
985        // Load it back
986        let loaded = storage
987            .load_index(&namespace, IndexType::Hnsw)
988            .await
989            .unwrap();
990        assert!(loaded.is_some());
991        assert_eq!(loaded.unwrap(), index_data);
992
993        // Save another index type
994        let pq_data = b"fake pq index data".to_vec();
995        storage
996            .save_index(&namespace, IndexType::Pq, pq_data)
997            .await
998            .unwrap();
999
1000        // List indexes
1001        let indexes = storage.list_indexes(&namespace).await.unwrap();
1002        assert_eq!(indexes.len(), 2);
1003        assert!(indexes.contains(&IndexType::Hnsw));
1004        assert!(indexes.contains(&IndexType::Pq));
1005
1006        // Delete index
1007        let deleted = storage
1008            .delete_index(&namespace, IndexType::Hnsw)
1009            .await
1010            .unwrap();
1011        assert!(deleted);
1012        assert!(!storage
1013            .index_exists(&namespace, IndexType::Hnsw)
1014            .await
1015            .unwrap());
1016
1017        // Delete non-existent
1018        let deleted = storage
1019            .delete_index(&namespace, IndexType::Hnsw)
1020            .await
1021            .unwrap();
1022        assert!(!deleted);
1023
1024        // Load non-existent
1025        let loaded = storage
1026            .load_index(&namespace, IndexType::Hnsw)
1027            .await
1028            .unwrap();
1029        assert!(loaded.is_none());
1030    }
1031}