Skip to main content

storage/
object.rs

1//! Object Storage Backend using OpenDAL
2//!
3//! Provides storage abstraction for multiple cloud storage backends:
4//! - S3-compatible (AWS S3, MinIO, DigitalOcean Spaces, etc.)
5//! - Azure Blob Storage
6//! - Google Cloud Storage (GCS)
7//! - Local filesystem
8//! - In-memory (for testing)
9
10use async_trait::async_trait;
11use common::{DakeraError, NamespaceId, Result, Vector, VectorId};
12use futures_util::stream::{self, StreamExt};
13use opendal::{layers::RetryLayer, services, Operator};
14use serde::{Deserialize, Serialize};
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::time::Duration;
17
18static TMP_COUNTER: AtomicU64 = AtomicU64::new(0);
19
20use crate::traits::{IndexStorage, IndexType, VectorStorage};
21
22fn s3_concurrent_ops() -> usize {
23    std::env::var("DAKERA_S3_CONCURRENT_OPS")
24        .ok()
25        .and_then(|v| v.parse().ok())
26        .unwrap_or(16)
27}
28
29/// Serializable namespace metadata
30#[derive(Debug, Clone, Serialize, Deserialize)]
31struct NamespaceMetadata {
32    dimension: Option<usize>,
33    vector_count: usize,
34    created_at: u64,
35    updated_at: u64,
36}
37
38/// Serializable vector data for storage
39#[derive(Debug, Clone, Serialize, Deserialize)]
40struct StoredVector {
41    id: String,
42    values: Vec<f32>,
43    metadata: Option<serde_json::Value>,
44}
45
46impl From<Vector> for StoredVector {
47    fn from(v: Vector) -> Self {
48        Self {
49            id: v.id,
50            values: v.values,
51            metadata: v.metadata,
52        }
53    }
54}
55
56impl From<StoredVector> for Vector {
57    fn from(v: StoredVector) -> Self {
58        Self {
59            id: v.id,
60            values: v.values,
61            metadata: v.metadata,
62            ttl_seconds: None,
63            expires_at: None,
64        }
65    }
66}
67
68/// Object storage backend configuration
69#[derive(Debug, Clone, Default)]
70pub enum ObjectStorageConfig {
71    /// In-memory storage (for testing)
72    #[default]
73    Memory,
74    /// Local filesystem storage
75    Filesystem { root: String },
76    /// S3-compatible storage (S3, MinIO, etc.)
77    S3 {
78        bucket: String,
79        region: Option<String>,
80        endpoint: Option<String>,
81        access_key_id: Option<String>,
82        secret_access_key: Option<String>,
83    },
84    /// Azure Blob Storage
85    Azure {
86        container: String,
87        account_name: String,
88        account_key: Option<String>,
89        sas_token: Option<String>,
90        endpoint: Option<String>,
91    },
92    /// Google Cloud Storage
93    Gcs {
94        bucket: String,
95        credential_path: Option<String>,
96        endpoint: Option<String>,
97    },
98}
99
100/// Object storage backend using OpenDAL
101#[derive(Clone)]
102pub struct ObjectStorage {
103    operator: Operator,
104}
105
106impl ObjectStorage {
107    /// Create a new object storage with the given configuration
108    pub fn new(config: ObjectStorageConfig) -> Result<Self> {
109        let operator = Self::build_operator(&config)?;
110        Ok(Self { operator })
111    }
112
113    /// Create in-memory storage (for testing)
114    pub fn memory() -> Result<Self> {
115        Self::new(ObjectStorageConfig::Memory)
116    }
117
118    /// Create filesystem storage
119    pub fn filesystem(root: impl Into<String>) -> Result<Self> {
120        Self::new(ObjectStorageConfig::Filesystem { root: root.into() })
121    }
122
123    /// Create S3 storage
124    pub fn s3(bucket: impl Into<String>) -> Result<Self> {
125        Self::new(ObjectStorageConfig::S3 {
126            bucket: bucket.into(),
127            region: None,
128            endpoint: None,
129            access_key_id: None,
130            secret_access_key: None,
131        })
132    }
133
134    /// Create Azure Blob storage
135    pub fn azure(container: impl Into<String>, account_name: impl Into<String>) -> Result<Self> {
136        Self::new(ObjectStorageConfig::Azure {
137            container: container.into(),
138            account_name: account_name.into(),
139            account_key: None,
140            sas_token: None,
141            endpoint: None,
142        })
143    }
144
145    /// Create Google Cloud Storage
146    pub fn gcs(bucket: impl Into<String>) -> Result<Self> {
147        Self::new(ObjectStorageConfig::Gcs {
148            bucket: bucket.into(),
149            credential_path: None,
150            endpoint: None,
151        })
152    }
153
154    pub fn build_operator(config: &ObjectStorageConfig) -> Result<Operator> {
155        match config {
156            ObjectStorageConfig::Memory => {
157                let builder = services::Memory::default();
158                Operator::new(builder)
159                    .map(|op| op.finish())
160                    .map_err(|e| DakeraError::Storage(e.to_string()))
161            }
162            ObjectStorageConfig::Filesystem { root } => {
163                let builder = services::Fs::default().root(root);
164                Operator::new(builder)
165                    .map(|op| op.layer(Self::retry_layer()).finish())
166                    .map_err(|e| DakeraError::Storage(e.to_string()))
167            }
168            ObjectStorageConfig::S3 {
169                bucket,
170                region,
171                endpoint,
172                access_key_id,
173                secret_access_key,
174            } => {
175                let mut builder = services::S3::default().bucket(bucket);
176
177                if let Some(region) = region {
178                    builder = builder.region(region);
179                }
180                if let Some(endpoint) = endpoint {
181                    builder = builder.endpoint(endpoint);
182                }
183                if let Some(key) = access_key_id {
184                    builder = builder.access_key_id(key);
185                }
186                if let Some(secret) = secret_access_key {
187                    builder = builder.secret_access_key(secret);
188                }
189
190                Operator::new(builder)
191                    .map(|op| op.layer(Self::retry_layer()).finish())
192                    .map_err(|e| DakeraError::Storage(e.to_string()))
193            }
194            ObjectStorageConfig::Azure {
195                container,
196                account_name,
197                account_key,
198                sas_token,
199                endpoint,
200            } => {
201                let mut builder = services::Azblob::default()
202                    .container(container)
203                    .account_name(account_name);
204
205                if let Some(key) = account_key {
206                    builder = builder.account_key(key);
207                }
208                if let Some(token) = sas_token {
209                    builder = builder.sas_token(token);
210                }
211                if let Some(endpoint) = endpoint {
212                    builder = builder.endpoint(endpoint);
213                }
214
215                Operator::new(builder)
216                    .map(|op| op.layer(Self::retry_layer()).finish())
217                    .map_err(|e| DakeraError::Storage(e.to_string()))
218            }
219            ObjectStorageConfig::Gcs {
220                bucket,
221                credential_path,
222                endpoint,
223            } => {
224                let mut builder = services::Gcs::default().bucket(bucket);
225
226                if let Some(cred_path) = credential_path {
227                    builder = builder.credential_path(cred_path);
228                }
229                if let Some(endpoint) = endpoint {
230                    builder = builder.endpoint(endpoint);
231                }
232
233                Operator::new(builder)
234                    .map(|op| op.layer(Self::retry_layer()).finish())
235                    .map_err(|e| DakeraError::Storage(e.to_string()))
236            }
237        }
238    }
239
240    fn retry_layer() -> RetryLayer {
241        // DAK-3430: default capped from 10 → 3 so a MinIO 429 storm fails fast
242        // (≤3×60s ≈ 3min) rather than blocking production for 10min. Override
243        // with DAKERA_S3_MAX_RETRIES=10 for high-resilience deployments.
244        let max_times: usize = std::env::var("DAKERA_S3_MAX_RETRIES")
245            .ok()
246            .and_then(|v| v.parse().ok())
247            .unwrap_or(3);
248        let min_delay_ms: u64 = std::env::var("DAKERA_S3_RETRY_MIN_DELAY_MS")
249            .ok()
250            .and_then(|v| v.parse().ok())
251            .unwrap_or(500);
252        let max_delay_secs: u64 = std::env::var("DAKERA_S3_RETRY_MAX_DELAY_SECS")
253            .ok()
254            .and_then(|v| v.parse().ok())
255            .unwrap_or(60);
256
257        tracing::info!(
258            max_times,
259            min_delay_ms,
260            max_delay_secs,
261            "S3 retry layer configured"
262        );
263
264        RetryLayer::new()
265            .with_max_times(max_times)
266            .with_min_delay(Duration::from_millis(min_delay_ms))
267            .with_max_delay(Duration::from_secs(max_delay_secs))
268            .with_jitter()
269            .with_factor(2.0)
270    }
271
272    /// Get the path for a vector
273    fn vector_path(namespace: &str, vector_id: &str) -> String {
274        format!("namespaces/{}/vectors/{}.json", namespace, vector_id)
275    }
276
277    /// Get the path for namespace metadata
278    fn namespace_meta_path(namespace: &str) -> String {
279        format!("namespaces/{}/meta.json", namespace)
280    }
281
282    /// Get the path prefix for all vectors in a namespace
283    fn namespace_vectors_prefix(namespace: &str) -> String {
284        format!("namespaces/{}/vectors/", namespace)
285    }
286
287    /// Read namespace metadata
288    async fn read_namespace_meta(&self, namespace: &str) -> Result<Option<NamespaceMetadata>> {
289        let path = Self::namespace_meta_path(namespace);
290        match self.operator.read(&path).await {
291            Ok(data) => {
292                let bytes = data.to_vec();
293                if bytes.is_empty() {
294                    tracing::warn!(
295                        namespace = %namespace,
296                        path = %path,
297                        "Empty namespace metadata file detected, treating as missing"
298                    );
299                    return Ok(None);
300                }
301                match serde_json::from_slice(&bytes) {
302                    Ok(meta) => Ok(Some(meta)),
303                    Err(e) => {
304                        tracing::warn!(
305                            namespace = %namespace,
306                            path = %path,
307                            error = %e,
308                            bytes_len = bytes.len(),
309                            "Corrupted namespace metadata, treating as missing and will be recreated"
310                        );
311                        Ok(None)
312                    }
313                }
314            }
315            Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(None),
316            Err(e) => Err(DakeraError::Storage(e.to_string())),
317        }
318    }
319
320    /// Write namespace metadata atomically.
321    ///
322    /// Writes to a `.tmp` file first, then renames to the final path. On POSIX filesystems
323    async fn write_namespace_meta(&self, namespace: &str, meta: &NamespaceMetadata) -> Result<()> {
324        let path = Self::namespace_meta_path(namespace);
325        let data = serde_json::to_vec(meta).map_err(|e| DakeraError::Storage(e.to_string()))?;
326        self.write_atomic(&path, data).await
327    }
328
329    /// Write data to `path` atomically via tmp+rename (DAK-4545).
330    ///
331    /// Writes to a `.tmp` file first, then renames to the final path. On POSIX filesystems
332    /// rename(2) is atomic within the same device, preventing concurrent readers from seeing
333    /// a truncated/empty file during the write window (O_TRUNC race). For object-store
334    /// backends that don't support rename (e.g. S3 without copy-delete), falls back to a
335    /// direct write — S3 PUT is itself atomic (readers get previous version until PUT completes).
336    async fn write_atomic(&self, path: &str, data: Vec<u8>) -> Result<()> {
337        let seq = TMP_COUNTER.fetch_add(1, Ordering::Relaxed);
338        let tmp_path = format!("{}.tmp.{}.{}", path, Self::now(), seq);
339        let rename_result = async {
340            self.operator
341                .write(&tmp_path, data.clone())
342                .await
343                .map_err(|e| DakeraError::Storage(e.to_string()))?;
344            self.operator
345                .rename(&tmp_path, path)
346                .await
347                .map_err(|e| DakeraError::Storage(e.to_string()))?;
348            Ok::<(), DakeraError>(())
349        }
350        .await;
351        if let Err(e) = rename_result {
352            // Backend doesn't support rename: fall back to direct write.
353            tracing::debug!(path = %path, error = %e, "atomic rename failed, falling back to direct write");
354            let _ = self.operator.delete(&tmp_path).await;
355            self.operator
356                .write(path, data)
357                .await
358                .map_err(|e| DakeraError::Storage(e.to_string()))?;
359        }
360        Ok(())
361    }
362
363    /// Get current timestamp
364    fn now() -> u64 {
365        std::time::SystemTime::now()
366            .duration_since(std::time::UNIX_EPOCH)
367            .unwrap_or_default()
368            .as_secs()
369    }
370}
371
372#[async_trait]
373impl VectorStorage for ObjectStorage {
374    async fn upsert(&self, namespace: &NamespaceId, vectors: Vec<Vector>) -> Result<usize> {
375        if vectors.is_empty() {
376            return Ok(0);
377        }
378
379        // Get or create namespace metadata
380        let mut meta = self
381            .read_namespace_meta(namespace)
382            .await?
383            .unwrap_or_else(|| NamespaceMetadata {
384                dimension: None,
385                vector_count: 0,
386                created_at: Self::now(),
387                updated_at: Self::now(),
388            });
389
390        // Validate dimensions
391        let first_dim = vectors[0].values.len();
392        if let Some(dim) = meta.dimension {
393            for v in &vectors {
394                if v.values.len() != dim {
395                    return Err(DakeraError::DimensionMismatch {
396                        expected: dim,
397                        actual: v.values.len(),
398                    });
399                }
400            }
401        } else {
402            meta.dimension = Some(first_dim);
403        }
404
405        // DAK-5553: parallelize per-vector exists+write to eliminate sequential I/O bottleneck.
406        // Buffer up to s3_concurrent_ops() (default 16) concurrent writes at a time.
407        let total = vectors.len();
408        let op = self.operator.clone();
409        let ns = namespace.clone();
410
411        let results: Vec<Result<bool>> = stream::iter(vectors)
412            .map(|vector| {
413                let op = op.clone();
414                let ns = ns.clone();
415                async move {
416                    let path = ObjectStorage::vector_path(&ns, &vector.id);
417                    let stored: StoredVector = vector.into();
418                    let data = serde_json::to_vec(&stored)
419                        .map_err(|e| DakeraError::Storage(e.to_string()))?;
420
421                    let exists = op
422                        .exists(&path)
423                        .await
424                        .map_err(|e| DakeraError::Storage(e.to_string()))?;
425
426                    // Atomic write: tmp+rename, fall back to direct write if rename unsupported.
427                    let seq = TMP_COUNTER.fetch_add(1, Ordering::Relaxed);
428                    let now_secs = std::time::SystemTime::now()
429                        .duration_since(std::time::UNIX_EPOCH)
430                        .unwrap_or_default()
431                        .as_secs();
432                    let tmp_path = format!("{}.tmp.{}.{}", path, now_secs, seq);
433                    let rename_result = async {
434                        op.write(&tmp_path, data.clone())
435                            .await
436                            .map_err(|e| DakeraError::Storage(e.to_string()))?;
437                        op.rename(&tmp_path, &path)
438                            .await
439                            .map_err(|e| DakeraError::Storage(e.to_string()))?;
440                        Ok::<(), DakeraError>(())
441                    }
442                    .await;
443                    if let Err(e) = rename_result {
444                        tracing::debug!(
445                            path = %path,
446                            error = %e,
447                            "atomic rename failed, falling back to direct write"
448                        );
449                        let _ = op.delete(&tmp_path).await;
450                        op.write(&path, data)
451                            .await
452                            .map_err(|e| DakeraError::Storage(e.to_string()))?;
453                    }
454
455                    Ok::<bool, DakeraError>(!exists)
456                }
457            })
458            .buffer_unordered(s3_concurrent_ops())
459            .collect()
460            .await;
461
462        let mut new_inserts = 0usize;
463        for r in results {
464            if r? {
465                new_inserts += 1;
466            }
467        }
468
469        meta.vector_count += new_inserts;
470        meta.updated_at = Self::now();
471        self.write_namespace_meta(namespace, &meta).await?;
472
473        tracing::debug!(
474            namespace = namespace,
475            upserted = total,
476            "Upserted vectors to object storage"
477        );
478
479        Ok(total)
480    }
481
482    async fn get(&self, namespace: &NamespaceId, ids: &[VectorId]) -> Result<Vec<Vector>> {
483        if ids.is_empty() {
484            return Ok(Vec::new());
485        }
486
487        let now = std::time::SystemTime::now()
488            .duration_since(std::time::UNIX_EPOCH)
489            .unwrap_or_default()
490            .as_secs();
491
492        let read_tasks: Vec<_> = ids
493            .iter()
494            .map(|id| {
495                let operator = self.operator.clone();
496                let path = Self::vector_path(namespace, id);
497                let id = id.clone();
498                async move {
499                    match operator.read(&path).await {
500                        Ok(data) => {
501                            let bytes = data.to_vec();
502                            if bytes.is_empty() {
503                                tracing::warn!(
504                                    vector_id = %id,
505                                    "Empty vector file detected, skipping"
506                                );
507                                return Ok(None);
508                            }
509                            match serde_json::from_slice::<StoredVector>(&bytes) {
510                                Ok(stored) => {
511                                    let vector: Vector = stored.into();
512                                    if !vector.is_expired_at(now) {
513                                        Ok(Some(vector))
514                                    } else {
515                                        Ok(None)
516                                    }
517                                }
518                                Err(e) => {
519                                    tracing::warn!(
520                                        vector_id = %id,
521                                        error = %e,
522                                        bytes_len = bytes.len(),
523                                        "Corrupted vector file detected, skipping"
524                                    );
525                                    Ok(None)
526                                }
527                            }
528                        }
529                        Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(None),
530                        Err(e) => Err(DakeraError::Storage(e.to_string())),
531                    }
532                }
533            })
534            .collect();
535
536        let results: Vec<Result<Option<Vector>>> = stream::iter(read_tasks)
537            .buffer_unordered(s3_concurrent_ops())
538            .collect()
539            .await;
540
541        let mut vectors = Vec::with_capacity(ids.len());
542        for result in results {
543            if let Some(v) = result? {
544                vectors.push(v);
545            }
546        }
547        Ok(vectors)
548    }
549
550    async fn get_all(&self, namespace: &NamespaceId) -> Result<Vec<Vector>> {
551        let prefix = Self::namespace_vectors_prefix(namespace);
552
553        let entries = self
554            .operator
555            .list(&prefix)
556            .await
557            .map_err(|e| DakeraError::Storage(e.to_string()))?;
558
559        let json_paths: Vec<String> = entries
560            .into_iter()
561            .filter(|e| e.path().ends_with(".json"))
562            .map(|e| e.path().to_string())
563            .collect();
564
565        if json_paths.is_empty() {
566            return Ok(Vec::new());
567        }
568
569        let now = std::time::SystemTime::now()
570            .duration_since(std::time::UNIX_EPOCH)
571            .unwrap_or_default()
572            .as_secs();
573
574        let results: Vec<Option<Vector>> = stream::iter(json_paths.into_iter().map(|path| {
575            let operator = self.operator.clone();
576            async move {
577                match operator.read(&path).await {
578                    Ok(data) => {
579                        let bytes = data.to_vec();
580                        if let Ok(stored) = serde_json::from_slice::<StoredVector>(&bytes) {
581                            let vector: Vector = stored.into();
582                            if !vector.is_expired_at(now) {
583                                return Some(vector);
584                            }
585                        }
586                        None
587                    }
588                    Err(e) => {
589                        tracing::warn!(path = %path, error = %e, "Failed to read vector");
590                        None
591                    }
592                }
593            }
594        }))
595        .buffer_unordered(s3_concurrent_ops())
596        .collect()
597        .await;
598
599        Ok(results.into_iter().flatten().collect())
600    }
601
602    async fn delete(&self, namespace: &NamespaceId, ids: &[VectorId]) -> Result<usize> {
603        if ids.is_empty() {
604            return Ok(0);
605        }
606
607        let delete_tasks: Vec<_> = ids
608            .iter()
609            .map(|id| {
610                let operator = self.operator.clone();
611                let path = Self::vector_path(namespace, id);
612                async move {
613                    let exists = operator
614                        .exists(&path)
615                        .await
616                        .map_err(|e| DakeraError::Storage(e.to_string()))?;
617                    if exists {
618                        match operator.delete(&path).await {
619                            Ok(_) => Ok(true),
620                            Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(false),
621                            Err(e) => Err(DakeraError::Storage(e.to_string())),
622                        }
623                    } else {
624                        Ok(false)
625                    }
626                }
627            })
628            .collect();
629
630        let results: Vec<Result<bool>> = stream::iter(delete_tasks)
631            .buffer_unordered(s3_concurrent_ops())
632            .collect()
633            .await;
634
635        let mut deleted = 0;
636        for result in results {
637            if result? {
638                deleted += 1;
639            }
640        }
641
642        // Update metadata
643        if deleted > 0 {
644            if let Some(mut meta) = self.read_namespace_meta(namespace).await? {
645                meta.vector_count = meta.vector_count.saturating_sub(deleted);
646                meta.updated_at = Self::now();
647                self.write_namespace_meta(namespace, &meta).await?;
648            }
649        }
650
651        Ok(deleted)
652    }
653
654    async fn namespace_exists(&self, namespace: &NamespaceId) -> Result<bool> {
655        Ok(self.read_namespace_meta(namespace).await?.is_some())
656    }
657
658    async fn ensure_namespace(&self, namespace: &NamespaceId) -> Result<()> {
659        if self.read_namespace_meta(namespace).await?.is_none() {
660            let meta = NamespaceMetadata {
661                dimension: None,
662                vector_count: 0,
663                created_at: Self::now(),
664                updated_at: Self::now(),
665            };
666            self.write_namespace_meta(namespace, &meta).await?;
667        }
668        Ok(())
669    }
670
671    async fn dimension(&self, namespace: &NamespaceId) -> Result<Option<usize>> {
672        Ok(self
673            .read_namespace_meta(namespace)
674            .await?
675            .and_then(|m| m.dimension))
676    }
677
678    async fn count(&self, namespace: &NamespaceId) -> Result<usize> {
679        Ok(self
680            .read_namespace_meta(namespace)
681            .await?
682            .map(|m| m.vector_count)
683            .unwrap_or(0))
684    }
685
686    async fn list_namespaces(&self) -> Result<Vec<NamespaceId>> {
687        let entries = self
688            .operator
689            .list("namespaces/")
690            .await
691            .map_err(|e| DakeraError::Storage(e.to_string()))?;
692
693        let mut namespaces = Vec::new();
694        for entry in entries {
695            let path = entry.path();
696            // Extract namespace name from path like "namespaces/myns/"
697            if let Some(ns) = path.strip_prefix("namespaces/") {
698                let ns = ns.trim_end_matches('/');
699                if !ns.is_empty() && !ns.contains('/') {
700                    // Only include namespaces that actually have metadata (meta.json).
701                    // This filters out ghost directory entries left behind after
702                    // namespace deletion on backends where empty directories persist
703                    // (e.g., local filesystem, some S3-compatible stores).
704                    if self.read_namespace_meta(ns).await?.is_some() {
705                        namespaces.push(ns.to_string());
706                    }
707                }
708            }
709        }
710
711        Ok(namespaces)
712    }
713
714    async fn delete_namespace(&self, namespace: &NamespaceId) -> Result<bool> {
715        // Check if namespace exists
716        if !self.namespace_exists(namespace).await? {
717            return Ok(false);
718        }
719
720        // Recursively remove everything under the namespace prefix.
721        // This deletes vectors, metadata, indexes, AND directory entries,
722        // preventing ghost namespaces from appearing in list_namespaces().
723        let prefix = format!("namespaces/{}/", namespace);
724        self.operator
725            .delete_with(&prefix)
726            .recursive(true)
727            .await
728            .map_err(|e| DakeraError::Storage(e.to_string()))?;
729
730        tracing::debug!(
731            namespace = namespace,
732            "Deleted namespace from object storage"
733        );
734
735        Ok(true)
736    }
737
738    async fn cleanup_expired(&self, _namespace: &NamespaceId) -> Result<usize> {
739        // Object storage doesn't track TTL internally - cleanup handled by application layer
740        // StoredVector doesn't persist TTL fields, so nothing to clean up here
741        Ok(0)
742    }
743
744    async fn cleanup_all_expired(&self) -> Result<usize> {
745        // Object storage doesn't track TTL internally - cleanup handled by application layer
746        Ok(0)
747    }
748}
749
750#[async_trait]
751impl IndexStorage for ObjectStorage {
752    async fn save_index(
753        &self,
754        namespace: &NamespaceId,
755        index_type: IndexType,
756        data: Vec<u8>,
757    ) -> Result<()> {
758        let path = format!(
759            "namespaces/{}/indexes/{}.bin",
760            namespace,
761            index_type.as_str()
762        );
763        self.operator
764            .write(&path, data)
765            .await
766            .map_err(|e| DakeraError::Storage(e.to_string()))?;
767
768        tracing::debug!(
769            namespace = namespace,
770            index_type = index_type.as_str(),
771            "Saved index to object storage"
772        );
773        Ok(())
774    }
775
776    async fn load_index(
777        &self,
778        namespace: &NamespaceId,
779        index_type: IndexType,
780    ) -> Result<Option<Vec<u8>>> {
781        let path = format!(
782            "namespaces/{}/indexes/{}.bin",
783            namespace,
784            index_type.as_str()
785        );
786        match self.operator.read(&path).await {
787            Ok(data) => {
788                tracing::debug!(
789                    namespace = namespace,
790                    index_type = index_type.as_str(),
791                    size = data.len(),
792                    "Loaded index from object storage"
793                );
794                Ok(Some(data.to_vec()))
795            }
796            Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(None),
797            Err(e) => Err(DakeraError::Storage(e.to_string())),
798        }
799    }
800
801    async fn delete_index(&self, namespace: &NamespaceId, index_type: IndexType) -> Result<bool> {
802        let path = format!(
803            "namespaces/{}/indexes/{}.bin",
804            namespace,
805            index_type.as_str()
806        );
807        let exists = self
808            .operator
809            .exists(&path)
810            .await
811            .map_err(|e| DakeraError::Storage(e.to_string()))?;
812
813        if exists {
814            self.operator
815                .delete(&path)
816                .await
817                .map_err(|e| DakeraError::Storage(e.to_string()))?;
818            tracing::debug!(
819                namespace = namespace,
820                index_type = index_type.as_str(),
821                "Deleted index from object storage"
822            );
823        }
824        Ok(exists)
825    }
826
827    async fn index_exists(&self, namespace: &NamespaceId, index_type: IndexType) -> Result<bool> {
828        let path = format!(
829            "namespaces/{}/indexes/{}.bin",
830            namespace,
831            index_type.as_str()
832        );
833        self.operator
834            .exists(&path)
835            .await
836            .map_err(|e| DakeraError::Storage(e.to_string()))
837    }
838
839    async fn list_indexes(&self, namespace: &NamespaceId) -> Result<Vec<IndexType>> {
840        let prefix = format!("namespaces/{}/indexes/", namespace);
841        let entries = self
842            .operator
843            .list(&prefix)
844            .await
845            .map_err(|e| DakeraError::Storage(e.to_string()))?;
846
847        let mut indexes = Vec::new();
848        for entry in entries {
849            let path = entry.path();
850            if path.ends_with(".bin") {
851                // Extract index type from filename
852                if let Some(filename) = path.strip_prefix(&prefix) {
853                    let name = filename.trim_end_matches(".bin");
854                    match name {
855                        "hnsw" => indexes.push(IndexType::Hnsw),
856                        "pq" => indexes.push(IndexType::Pq),
857                        "ivf" => indexes.push(IndexType::Ivf),
858                        "spfresh" => indexes.push(IndexType::SpFresh),
859                        "fulltext" => indexes.push(IndexType::FullText),
860                        _ => {} // Ignore unknown index types
861                    }
862                }
863            }
864        }
865
866        Ok(indexes)
867    }
868}
869
870/// Create an OpenDAL operator from configuration without constructing a full ObjectStorage.
871/// Useful for lightweight S3 access (e.g., BackupManager metadata persistence).
872pub fn create_operator(config: &ObjectStorageConfig) -> Result<Operator> {
873    ObjectStorage::build_operator(config)
874}
875
876#[cfg(test)]
877mod tests {
878    use super::*;
879
880    #[tokio::test]
881    async fn test_object_storage_memory() {
882        let storage = ObjectStorage::memory().unwrap();
883        let namespace = "test".to_string();
884
885        // Ensure namespace
886        storage.ensure_namespace(&namespace).await.unwrap();
887        assert!(storage.namespace_exists(&namespace).await.unwrap());
888
889        // Insert vectors
890        let vectors = vec![
891            Vector {
892                id: "v1".to_string(),
893                values: vec![1.0, 2.0, 3.0],
894                metadata: None,
895                ttl_seconds: None,
896                expires_at: None,
897            },
898            Vector {
899                id: "v2".to_string(),
900                values: vec![4.0, 5.0, 6.0],
901                metadata: Some(serde_json::json!({"key": "value"})),
902                ttl_seconds: None,
903                expires_at: None,
904            },
905        ];
906
907        let count = storage.upsert(&namespace, vectors).await.unwrap();
908        assert_eq!(count, 2);
909
910        // Get single vector
911        let results = storage.get(&namespace, &["v1".to_string()]).await.unwrap();
912        assert_eq!(results.len(), 1);
913        assert_eq!(results[0].id, "v1");
914        assert_eq!(results[0].values, vec![1.0, 2.0, 3.0]);
915
916        // Get all vectors
917        let all = storage.get_all(&namespace).await.unwrap();
918        assert_eq!(all.len(), 2);
919
920        // Count
921        assert_eq!(storage.count(&namespace).await.unwrap(), 2);
922
923        // Delete
924        let deleted = storage
925            .delete(&namespace, &["v1".to_string()])
926            .await
927            .unwrap();
928        assert_eq!(deleted, 1);
929        assert!(storage
930            .get(&namespace, &["v1".to_string()])
931            .await
932            .unwrap()
933            .is_empty());
934        assert_eq!(storage.count(&namespace).await.unwrap(), 1);
935    }
936
937    #[tokio::test]
938    async fn test_object_storage_dimension_mismatch() {
939        let storage = ObjectStorage::memory().unwrap();
940        let namespace = "test".to_string();
941        storage.ensure_namespace(&namespace).await.unwrap();
942
943        // Insert first vector
944        let v1 = vec![Vector {
945            id: "v1".to_string(),
946            values: vec![1.0, 2.0, 3.0],
947            metadata: None,
948            ttl_seconds: None,
949            expires_at: None,
950        }];
951        storage.upsert(&namespace, v1).await.unwrap();
952
953        // Try to insert vector with different dimension
954        let v2 = vec![Vector {
955            id: "v2".to_string(),
956            values: vec![1.0, 2.0], // Wrong dimension
957            metadata: None,
958            ttl_seconds: None,
959            expires_at: None,
960        }];
961        let result = storage.upsert(&namespace, v2).await;
962        assert!(result.is_err());
963    }
964
965    #[tokio::test]
966    async fn test_object_storage_upsert() {
967        let storage = ObjectStorage::memory().unwrap();
968        let namespace = "test".to_string();
969        storage.ensure_namespace(&namespace).await.unwrap();
970
971        // Insert
972        let v1 = vec![Vector {
973            id: "v1".to_string(),
974            values: vec![1.0, 2.0],
975            metadata: None,
976            ttl_seconds: None,
977            expires_at: None,
978        }];
979        storage.upsert(&namespace, v1).await.unwrap();
980
981        // Upsert (update)
982        let v1_updated = vec![Vector {
983            id: "v1".to_string(),
984            values: vec![3.0, 4.0],
985            metadata: None,
986            ttl_seconds: None,
987            expires_at: None,
988        }];
989        storage.upsert(&namespace, v1_updated).await.unwrap();
990
991        // Verify update
992        let results = storage.get(&namespace, &["v1".to_string()]).await.unwrap();
993        assert_eq!(results.len(), 1);
994        assert_eq!(results[0].values, vec![3.0, 4.0]);
995
996        // Count should still be 1
997        assert_eq!(storage.count(&namespace).await.unwrap(), 1);
998    }
999
1000    #[tokio::test]
1001    async fn test_index_storage() {
1002        let storage = ObjectStorage::memory().unwrap();
1003        let namespace = "test_index".to_string();
1004
1005        // Initially no indexes
1006        assert!(!storage
1007            .index_exists(&namespace, IndexType::Hnsw)
1008            .await
1009            .unwrap());
1010
1011        // Save an index
1012        let index_data = b"fake hnsw index data for testing".to_vec();
1013        storage
1014            .save_index(&namespace, IndexType::Hnsw, index_data.clone())
1015            .await
1016            .unwrap();
1017
1018        // Check it exists
1019        assert!(storage
1020            .index_exists(&namespace, IndexType::Hnsw)
1021            .await
1022            .unwrap());
1023        assert!(!storage
1024            .index_exists(&namespace, IndexType::Pq)
1025            .await
1026            .unwrap());
1027
1028        // Load it back
1029        let loaded = storage
1030            .load_index(&namespace, IndexType::Hnsw)
1031            .await
1032            .unwrap();
1033        assert!(loaded.is_some());
1034        assert_eq!(loaded.unwrap(), index_data);
1035
1036        // Save another index type
1037        let pq_data = b"fake pq index data".to_vec();
1038        storage
1039            .save_index(&namespace, IndexType::Pq, pq_data)
1040            .await
1041            .unwrap();
1042
1043        // List indexes
1044        let indexes = storage.list_indexes(&namespace).await.unwrap();
1045        assert_eq!(indexes.len(), 2);
1046        assert!(indexes.contains(&IndexType::Hnsw));
1047        assert!(indexes.contains(&IndexType::Pq));
1048
1049        // Delete index
1050        let deleted = storage
1051            .delete_index(&namespace, IndexType::Hnsw)
1052            .await
1053            .unwrap();
1054        assert!(deleted);
1055        assert!(!storage
1056            .index_exists(&namespace, IndexType::Hnsw)
1057            .await
1058            .unwrap());
1059
1060        // Delete non-existent
1061        let deleted = storage
1062            .delete_index(&namespace, IndexType::Hnsw)
1063            .await
1064            .unwrap();
1065        assert!(!deleted);
1066
1067        // Load non-existent
1068        let loaded = storage
1069            .load_index(&namespace, IndexType::Hnsw)
1070            .await
1071            .unwrap();
1072        assert!(loaded.is_none());
1073    }
1074
1075    // ── DAK-5553: parallel upsert correctness ─────────────────────────────────
1076
1077    fn make_vector(id: &str, dim: usize) -> Vector {
1078        Vector {
1079            id: id.to_string(),
1080            values: vec![0.1; dim],
1081            metadata: None,
1082            ttl_seconds: None,
1083            expires_at: None,
1084        }
1085    }
1086
1087    #[tokio::test]
1088    async fn test_upsert_batch_parallel_all_new() {
1089        let storage = ObjectStorage::memory().unwrap();
1090        let ns = "batch_all_new".to_string();
1091        storage.ensure_namespace(&ns).await.unwrap();
1092
1093        // Insert 50 unique vectors in one batch call
1094        let vectors: Vec<Vector> = (0..50).map(|i| make_vector(&format!("v{i}"), 4)).collect();
1095        let count = storage.upsert(&ns, vectors).await.unwrap();
1096
1097        assert_eq!(count, 50);
1098        assert_eq!(storage.count(&ns).await.unwrap(), 50);
1099    }
1100
1101    #[tokio::test]
1102    async fn test_upsert_batch_parallel_idempotent_count() {
1103        let storage = ObjectStorage::memory().unwrap();
1104        let ns = "batch_idempotent".to_string();
1105        storage.ensure_namespace(&ns).await.unwrap();
1106
1107        let vectors: Vec<Vector> = (0..10).map(|i| make_vector(&format!("v{i}"), 4)).collect();
1108        storage.upsert(&ns, vectors.clone()).await.unwrap();
1109
1110        // Upsert the same IDs again — vector_count must not double
1111        storage.upsert(&ns, vectors).await.unwrap();
1112        assert_eq!(storage.count(&ns).await.unwrap(), 10);
1113    }
1114
1115    #[tokio::test]
1116    async fn test_upsert_batch_parallel_empty() {
1117        let storage = ObjectStorage::memory().unwrap();
1118        let ns = "batch_empty".to_string();
1119        storage.ensure_namespace(&ns).await.unwrap();
1120
1121        let count = storage.upsert(&ns, vec![]).await.unwrap();
1122        assert_eq!(count, 0);
1123        assert_eq!(storage.count(&ns).await.unwrap(), 0);
1124    }
1125
1126    #[tokio::test]
1127    async fn test_upsert_batch_parallel_large_batch() {
1128        // Verify that a batch exceeding the default concurrency window (16) processes all items
1129        let storage = ObjectStorage::memory().unwrap();
1130        let ns = "batch_large".to_string();
1131        storage.ensure_namespace(&ns).await.unwrap();
1132
1133        let vectors: Vec<Vector> = (0..200).map(|i| make_vector(&format!("v{i}"), 8)).collect();
1134        let count = storage.upsert(&ns, vectors).await.unwrap();
1135
1136        assert_eq!(count, 200);
1137        assert_eq!(storage.count(&ns).await.unwrap(), 200);
1138    }
1139}