Skip to main content

storage/
object.rs

1//! Object Storage Backend using OpenDAL
2//!
3//! Provides storage abstraction for multiple cloud storage backends:
4//! - S3-compatible (AWS S3, MinIO, DigitalOcean Spaces, etc.)
5//! - Azure Blob Storage
6//! - Google Cloud Storage (GCS)
7//! - Local filesystem
8//! - In-memory (for testing)
9
10use async_trait::async_trait;
11use common::{DakeraError, NamespaceId, Result, Vector, VectorId};
12use futures_util::stream::{self, StreamExt};
13use opendal::{layers::RetryLayer, services, Operator};
14use serde::{Deserialize, Serialize};
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::time::Duration;
17
18static TMP_COUNTER: AtomicU64 = AtomicU64::new(0);
19
20use crate::traits::{IndexStorage, IndexType, VectorStorage};
21
22fn s3_concurrent_ops() -> usize {
23    std::env::var("DAKERA_S3_CONCURRENT_OPS")
24        .ok()
25        .and_then(|v| v.parse().ok())
26        .unwrap_or(16)
27}
28
29/// Serializable namespace metadata
30#[derive(Debug, Clone, Serialize, Deserialize)]
31struct NamespaceMetadata {
32    dimension: Option<usize>,
33    vector_count: usize,
34    created_at: u64,
35    updated_at: u64,
36}
37
38/// Serializable vector data for storage
39#[derive(Debug, Clone, Serialize, Deserialize)]
40struct StoredVector {
41    id: String,
42    values: Vec<f32>,
43    metadata: Option<serde_json::Value>,
44}
45
46impl From<Vector> for StoredVector {
47    fn from(v: Vector) -> Self {
48        Self {
49            id: v.id,
50            values: v.values,
51            metadata: v.metadata,
52        }
53    }
54}
55
56impl From<StoredVector> for Vector {
57    fn from(v: StoredVector) -> Self {
58        Self {
59            id: v.id,
60            values: v.values,
61            metadata: v.metadata,
62            ttl_seconds: None,
63            expires_at: None,
64        }
65    }
66}
67
68/// Object storage backend configuration
69#[derive(Debug, Clone, Default)]
70pub enum ObjectStorageConfig {
71    /// In-memory storage (for testing)
72    #[default]
73    Memory,
74    /// Local filesystem storage
75    Filesystem { root: String },
76    /// S3-compatible storage (S3, MinIO, etc.)
77    S3 {
78        bucket: String,
79        region: Option<String>,
80        endpoint: Option<String>,
81        access_key_id: Option<String>,
82        secret_access_key: Option<String>,
83    },
84    /// Azure Blob Storage
85    Azure {
86        container: String,
87        account_name: String,
88        account_key: Option<String>,
89        sas_token: Option<String>,
90        endpoint: Option<String>,
91    },
92    /// Google Cloud Storage
93    Gcs {
94        bucket: String,
95        credential_path: Option<String>,
96        endpoint: Option<String>,
97    },
98}
99
100/// Object storage backend using OpenDAL
101#[derive(Clone)]
102pub struct ObjectStorage {
103    operator: Operator,
104    /// DAK-6287: local filesystem root, retained ONLY when the backend is a real
105    /// local filesystem. Lets `upsert` take a raw-`std::fs` fast write path that
106    /// bypasses OpenDAL's per-op overhead (~9× faster on the same disk; profiled).
107    /// `None` for memory/S3/Azure/GCS — those keep the OpenDAL path.
108    fs_root: Option<std::path::PathBuf>,
109}
110
111impl ObjectStorage {
112    /// Create a new object storage with the given configuration
113    pub fn new(config: ObjectStorageConfig) -> Result<Self> {
114        let operator = Self::build_operator(&config)?;
115        let fs_root = match &config {
116            ObjectStorageConfig::Filesystem { root } => Some(std::path::PathBuf::from(root)),
117            _ => None,
118        };
119        Ok(Self { operator, fs_root })
120    }
121
122    /// DAK-6287: true when `s` is safe as a single filesystem path segment (non-empty,
123    /// no separators, no parent refs, no NUL). Gates the raw-fs fast write path so a
124    /// crafted namespace or vector id cannot escape the storage root via path traversal.
125    fn is_fs_safe_segment(s: &str) -> bool {
126        !s.is_empty()
127            && s != "."
128            && s != ".."
129            && !s.contains('/')
130            && !s.contains('\\')
131            && !s.contains('\0')
132    }
133
134    /// DAK-6287: raw-`std::fs` batch write for the local Filesystem backend. Each vector is
135    /// written to `<root>/namespaces/<ns>/vectors/<id>.json` via tmp+fsync+rename, matching
136    /// the OpenDAL on-disk layout byte-for-byte so the read/recall path is unchanged. The
137    /// directory is created once per batch; each file is fsync'd before its atomic rename, so
138    /// crash-consistency is >= the OpenDAL path. Bypasses OpenDAL's per-op overhead, which the
139    /// storage_upsert profile showed dominates (~9× faster on the same disk). Callers MUST
140    /// ensure `namespace` and every vector id pass `is_fs_safe_segment`.
141    ///
142    /// Returns the number of NEW inserts (ids not already present on disk before the batch),
143    /// so the caller can maintain `meta.vector_count` without a separate O(namespace) listing
144    /// (DAK-6299). Classification is a single cheap raw-`std::fs` stat per file (O(batch),
145    /// ~1% of upsert cost per the storage_upsert profile), done inside the per-file write
146    /// task so it adds no extra round-trip pass over the namespace.
147    async fn write_vectors_fs(
148        &self,
149        root: &std::path::Path,
150        namespace: &NamespaceId,
151        vectors: Vec<Vector>,
152    ) -> Result<usize> {
153        let vectors_dir = root.join(format!("namespaces/{}/vectors", namespace));
154        tokio::fs::create_dir_all(&vectors_dir)
155            .await
156            .map_err(|e| DakeraError::Storage(e.to_string()))?;
157
158        let results: Vec<Result<bool>> = stream::iter(vectors)
159            .map(|vector| {
160                let dir = vectors_dir.clone();
161                async move {
162                    let id = vector.id.clone();
163                    let stored: StoredVector = vector.into();
164                    let data = serde_json::to_vec(&stored)
165                        .map_err(|e| DakeraError::Storage(e.to_string()))?;
166                    let final_path = dir.join(format!("{}.json", id));
167                    let seq = TMP_COUNTER.fetch_add(1, Ordering::Relaxed);
168                    // Leading dot keeps the temp file out of the `*.json` id listing.
169                    let tmp_path = dir.join(format!(".{}.tmp.{}", id, seq));
170                    tokio::task::spawn_blocking(move || -> Result<bool> {
171                        use std::io::Write;
172                        // Cheap per-file stat classifies insert-vs-update (matches the prior
173                        // !exists semantics) without scanning the whole namespace.
174                        let is_new = !final_path.exists();
175                        let mut f = std::fs::File::create(&tmp_path)
176                            .map_err(|e| DakeraError::Storage(e.to_string()))?;
177                        f.write_all(&data)
178                            .map_err(|e| DakeraError::Storage(e.to_string()))?;
179                        // Flush contents before the atomic publish so a crash can never
180                        // expose a half-written vector file (>= OpenDAL durability).
181                        f.sync_all()
182                            .map_err(|e| DakeraError::Storage(e.to_string()))?;
183                        drop(f);
184                        if let Err(e) = std::fs::rename(&tmp_path, &final_path) {
185                            let _ = std::fs::remove_file(&tmp_path);
186                            return Err(DakeraError::Storage(e.to_string()));
187                        }
188                        Ok(is_new)
189                    })
190                    .await
191                    .map_err(|e| DakeraError::Storage(e.to_string()))?
192                }
193            })
194            .buffer_unordered(s3_concurrent_ops())
195            .collect()
196            .await;
197
198        let mut new_inserts = 0usize;
199        for r in results {
200            if r? {
201                new_inserts += 1;
202            }
203        }
204        Ok(new_inserts)
205    }
206
207    /// Create in-memory storage (for testing)
208    pub fn memory() -> Result<Self> {
209        Self::new(ObjectStorageConfig::Memory)
210    }
211
212    /// Create filesystem storage
213    pub fn filesystem(root: impl Into<String>) -> Result<Self> {
214        Self::new(ObjectStorageConfig::Filesystem { root: root.into() })
215    }
216
217    /// Create S3 storage
218    pub fn s3(bucket: impl Into<String>) -> Result<Self> {
219        Self::new(ObjectStorageConfig::S3 {
220            bucket: bucket.into(),
221            region: None,
222            endpoint: None,
223            access_key_id: None,
224            secret_access_key: None,
225        })
226    }
227
228    /// Create Azure Blob storage
229    pub fn azure(container: impl Into<String>, account_name: impl Into<String>) -> Result<Self> {
230        Self::new(ObjectStorageConfig::Azure {
231            container: container.into(),
232            account_name: account_name.into(),
233            account_key: None,
234            sas_token: None,
235            endpoint: None,
236        })
237    }
238
239    /// Create Google Cloud Storage
240    pub fn gcs(bucket: impl Into<String>) -> Result<Self> {
241        Self::new(ObjectStorageConfig::Gcs {
242            bucket: bucket.into(),
243            credential_path: None,
244            endpoint: None,
245        })
246    }
247
248    pub fn build_operator(config: &ObjectStorageConfig) -> Result<Operator> {
249        match config {
250            ObjectStorageConfig::Memory => {
251                let builder = services::Memory::default();
252                Operator::new(builder)
253                    .map(|op| op.finish())
254                    .map_err(|e| DakeraError::Storage(e.to_string()))
255            }
256            ObjectStorageConfig::Filesystem { root } => {
257                let builder = services::Fs::default().root(root);
258                Operator::new(builder)
259                    .map(|op| op.layer(Self::retry_layer()).finish())
260                    .map_err(|e| DakeraError::Storage(e.to_string()))
261            }
262            ObjectStorageConfig::S3 {
263                bucket,
264                region,
265                endpoint,
266                access_key_id,
267                secret_access_key,
268            } => {
269                let mut builder = services::S3::default().bucket(bucket);
270
271                if let Some(region) = region {
272                    builder = builder.region(region);
273                }
274                if let Some(endpoint) = endpoint {
275                    builder = builder.endpoint(endpoint);
276                }
277                if let Some(key) = access_key_id {
278                    builder = builder.access_key_id(key);
279                }
280                if let Some(secret) = secret_access_key {
281                    builder = builder.secret_access_key(secret);
282                }
283
284                Operator::new(builder)
285                    .map(|op| op.layer(Self::retry_layer()).finish())
286                    .map_err(|e| DakeraError::Storage(e.to_string()))
287            }
288            ObjectStorageConfig::Azure {
289                container,
290                account_name,
291                account_key,
292                sas_token,
293                endpoint,
294            } => {
295                let mut builder = services::Azblob::default()
296                    .container(container)
297                    .account_name(account_name);
298
299                if let Some(key) = account_key {
300                    builder = builder.account_key(key);
301                }
302                if let Some(token) = sas_token {
303                    builder = builder.sas_token(token);
304                }
305                if let Some(endpoint) = endpoint {
306                    builder = builder.endpoint(endpoint);
307                }
308
309                Operator::new(builder)
310                    .map(|op| op.layer(Self::retry_layer()).finish())
311                    .map_err(|e| DakeraError::Storage(e.to_string()))
312            }
313            ObjectStorageConfig::Gcs {
314                bucket,
315                credential_path,
316                endpoint,
317            } => {
318                let mut builder = services::Gcs::default().bucket(bucket);
319
320                if let Some(cred_path) = credential_path {
321                    builder = builder.credential_path(cred_path);
322                }
323                if let Some(endpoint) = endpoint {
324                    builder = builder.endpoint(endpoint);
325                }
326
327                Operator::new(builder)
328                    .map(|op| op.layer(Self::retry_layer()).finish())
329                    .map_err(|e| DakeraError::Storage(e.to_string()))
330            }
331        }
332    }
333
334    fn retry_layer() -> RetryLayer {
335        // DAK-3430: default capped from 10 → 3 so a MinIO 429 storm fails fast
336        // (≤3×60s ≈ 3min) rather than blocking production for 10min. Override
337        // with DAKERA_S3_MAX_RETRIES=10 for high-resilience deployments.
338        let max_times: usize = std::env::var("DAKERA_S3_MAX_RETRIES")
339            .ok()
340            .and_then(|v| v.parse().ok())
341            .unwrap_or(3);
342        let min_delay_ms: u64 = std::env::var("DAKERA_S3_RETRY_MIN_DELAY_MS")
343            .ok()
344            .and_then(|v| v.parse().ok())
345            .unwrap_or(500);
346        let max_delay_secs: u64 = std::env::var("DAKERA_S3_RETRY_MAX_DELAY_SECS")
347            .ok()
348            .and_then(|v| v.parse().ok())
349            .unwrap_or(60);
350
351        tracing::info!(
352            max_times,
353            min_delay_ms,
354            max_delay_secs,
355            "S3 retry layer configured"
356        );
357
358        RetryLayer::new()
359            .with_max_times(max_times)
360            .with_min_delay(Duration::from_millis(min_delay_ms))
361            .with_max_delay(Duration::from_secs(max_delay_secs))
362            .with_jitter()
363            .with_factor(2.0)
364    }
365
366    /// Get the path for a vector
367    fn vector_path(namespace: &str, vector_id: &str) -> String {
368        format!("namespaces/{}/vectors/{}.json", namespace, vector_id)
369    }
370
371    /// Get the path for namespace metadata
372    fn namespace_meta_path(namespace: &str) -> String {
373        format!("namespaces/{}/meta.json", namespace)
374    }
375
376    /// Get the path prefix for all vectors in a namespace
377    fn namespace_vectors_prefix(namespace: &str) -> String {
378        format!("namespaces/{}/vectors/", namespace)
379    }
380
381    /// Read namespace metadata
382    async fn read_namespace_meta(&self, namespace: &str) -> Result<Option<NamespaceMetadata>> {
383        let path = Self::namespace_meta_path(namespace);
384        match self.operator.read(&path).await {
385            Ok(data) => {
386                let bytes = data.to_vec();
387                if bytes.is_empty() {
388                    tracing::warn!(
389                        namespace = %namespace,
390                        path = %path,
391                        "Empty namespace metadata file detected, treating as missing"
392                    );
393                    return Ok(None);
394                }
395                match serde_json::from_slice(&bytes) {
396                    Ok(meta) => Ok(Some(meta)),
397                    Err(e) => {
398                        tracing::warn!(
399                            namespace = %namespace,
400                            path = %path,
401                            error = %e,
402                            bytes_len = bytes.len(),
403                            "Corrupted namespace metadata, treating as missing and will be recreated"
404                        );
405                        Ok(None)
406                    }
407                }
408            }
409            Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(None),
410            Err(e) => Err(DakeraError::Storage(e.to_string())),
411        }
412    }
413
414    /// Write namespace metadata atomically.
415    ///
416    /// Writes to a `.tmp` file first, then renames to the final path. On POSIX filesystems
417    async fn write_namespace_meta(&self, namespace: &str, meta: &NamespaceMetadata) -> Result<()> {
418        let path = Self::namespace_meta_path(namespace);
419        let data = serde_json::to_vec(meta).map_err(|e| DakeraError::Storage(e.to_string()))?;
420        self.write_atomic(&path, data).await
421    }
422
423    /// Write data to `path` atomically via tmp+rename (DAK-4545).
424    ///
425    /// Writes to a `.tmp` file first, then renames to the final path. On POSIX filesystems
426    /// rename(2) is atomic within the same device, preventing concurrent readers from seeing
427    /// a truncated/empty file during the write window (O_TRUNC race). For object-store
428    /// backends that don't support rename (e.g. S3 without copy-delete), falls back to a
429    /// direct write — S3 PUT is itself atomic (readers get previous version until PUT completes).
430    async fn write_atomic(&self, path: &str, data: Vec<u8>) -> Result<()> {
431        let seq = TMP_COUNTER.fetch_add(1, Ordering::Relaxed);
432        let tmp_path = format!("{}.tmp.{}.{}", path, Self::now(), seq);
433        let rename_result = async {
434            self.operator
435                .write(&tmp_path, data.clone())
436                .await
437                .map_err(|e| DakeraError::Storage(e.to_string()))?;
438            self.operator
439                .rename(&tmp_path, path)
440                .await
441                .map_err(|e| DakeraError::Storage(e.to_string()))?;
442            Ok::<(), DakeraError>(())
443        }
444        .await;
445        if let Err(e) = rename_result {
446            // Backend doesn't support rename: fall back to direct write.
447            tracing::debug!(path = %path, error = %e, "atomic rename failed, falling back to direct write");
448            let _ = self.operator.delete(&tmp_path).await;
449            self.operator
450                .write(path, data)
451                .await
452                .map_err(|e| DakeraError::Storage(e.to_string()))?;
453        }
454        Ok(())
455    }
456
457    /// Get current timestamp
458    fn now() -> u64 {
459        std::time::SystemTime::now()
460            .duration_since(std::time::UNIX_EPOCH)
461            .unwrap_or_default()
462            .as_secs()
463    }
464}
465
466#[async_trait]
467impl VectorStorage for ObjectStorage {
468    async fn upsert(&self, namespace: &NamespaceId, vectors: Vec<Vector>) -> Result<usize> {
469        if vectors.is_empty() {
470            return Ok(0);
471        }
472
473        // Get or create namespace metadata
474        let mut meta = self
475            .read_namespace_meta(namespace)
476            .await?
477            .unwrap_or_else(|| NamespaceMetadata {
478                dimension: None,
479                vector_count: 0,
480                created_at: Self::now(),
481                updated_at: Self::now(),
482            });
483
484        // Validate dimensions
485        let first_dim = vectors[0].values.len();
486        if let Some(dim) = meta.dimension {
487            for v in &vectors {
488                if v.values.len() != dim {
489                    return Err(DakeraError::DimensionMismatch {
490                        expected: dim,
491                        actual: v.values.len(),
492                    });
493                }
494            }
495        } else {
496            meta.dimension = Some(first_dim);
497        }
498
499        // DAK-5553: parallelize per-vector write to eliminate sequential I/O bottleneck.
500        // Buffer up to s3_concurrent_ops() (default 16) concurrent writes at a time.
501        //
502        // DAK-6299: classify insert-vs-update (to maintain meta.vector_count) with O(batch)
503        // per-vector existence checks, NOT a per-batch full-namespace list(). PR#612 replaced
504        // N per-vector exists() stats with ONE operator.list(vectors_prefix), but that list
505        // returns the ENTIRE namespace = O(namespace_size) per batch => O(N^2/batch) over a
506        // full ingest, and it ran for ALL backends (incl S3, where it is a paginated LIST of
507        // every key). Full-scale measurement (DAK-6297, fs, 2675 mem, one namespace) showed
508        // the list inverted the fast path: 1.43x at 220 mem -> 0.55x at 2675 mem. The
509        // profile that motivated PR#612 (DAK-6287) showed the per-vector stat was only ~1%
510        // of upsert cost, so removing it bought ~nothing while the list it introduced is the
511        // regression. Here: on the fs fast path a cheap raw std::fs stat per file (O(batch))
512        // classifies new-vs-existing; on the OpenDAL path a per-vector exists() (O(batch)
513        // HEAD/stat, the pre-PR#612 DAK-5553 semantics) does the same. No full-namespace
514        // scan. Write-path only, recall-neutral: vector file format, the read/recall path,
515        // and the tmp+rename atomic-publish durability guarantee are all unchanged.
516        let total = vectors.len();
517
518        // DAK-6287: on the local Filesystem backend, write via raw std::fs instead of
519        // OpenDAL. Profiled on an x64 runner (storage_upsert decomposition): the per-vector
520        // cost is dominated by OpenDAL's fs-writer overhead, NOT fsync/stat/rename — raw
521        // write+fsync reaches ~5800 mem/s vs OpenDAL ~650 mem/s on the same disk. The fast
522        // path keeps the EXACT on-disk layout (namespaces/<ns>/vectors/<id>.json), atomic
523        // tmp+rename publish, and adds an explicit per-file fsync, so the read/recall path is
524        // byte-identical and durability is >= the OpenDAL path. Guarded: only taken when the
525        // namespace and every id are filesystem-safe segments (no `/`, `\`, `..`, NUL) to
526        // prevent path traversal; otherwise it falls back to the OpenDAL path below.
527        let fs_root = if Self::is_fs_safe_segment(namespace)
528            && vectors.iter().all(|v| Self::is_fs_safe_segment(&v.id))
529        {
530            self.fs_root.clone()
531        } else {
532            None
533        };
534
535        let new_inserts = if let Some(root) = fs_root {
536            // Fast path classifies inserts via a per-file stat inside the write loop.
537            self.write_vectors_fs(&root, namespace, vectors).await?
538        } else {
539            let op = self.operator.clone();
540            let ns = namespace.clone();
541            let results: Vec<Result<bool>> = stream::iter(vectors)
542                .map(|vector| {
543                    let op = op.clone();
544                    let ns = ns.clone();
545                    async move {
546                        let path = ObjectStorage::vector_path(&ns, &vector.id);
547                        let stored: StoredVector = vector.into();
548                        let data = serde_json::to_vec(&stored)
549                            .map_err(|e| DakeraError::Storage(e.to_string()))?;
550
551                        // O(1) per vector: a single exists() (HEAD on S3) classifies
552                        // insert-vs-update without scanning the namespace.
553                        let exists = op
554                            .exists(&path)
555                            .await
556                            .map_err(|e| DakeraError::Storage(e.to_string()))?;
557
558                        // Atomic write: tmp+rename, fall back to direct write if unsupported.
559                        let seq = TMP_COUNTER.fetch_add(1, Ordering::Relaxed);
560                        let now_secs = std::time::SystemTime::now()
561                            .duration_since(std::time::UNIX_EPOCH)
562                            .unwrap_or_default()
563                            .as_secs();
564                        let tmp_path = format!("{}.tmp.{}.{}", path, now_secs, seq);
565                        let rename_result = async {
566                            op.write(&tmp_path, data.clone())
567                                .await
568                                .map_err(|e| DakeraError::Storage(e.to_string()))?;
569                            op.rename(&tmp_path, &path)
570                                .await
571                                .map_err(|e| DakeraError::Storage(e.to_string()))?;
572                            Ok::<(), DakeraError>(())
573                        }
574                        .await;
575                        if let Err(e) = rename_result {
576                            tracing::debug!(
577                                path = %path,
578                                error = %e,
579                                "atomic rename failed, falling back to direct write"
580                            );
581                            let _ = op.delete(&tmp_path).await;
582                            op.write(&path, data)
583                                .await
584                                .map_err(|e| DakeraError::Storage(e.to_string()))?;
585                        }
586
587                        Ok::<bool, DakeraError>(!exists)
588                    }
589                })
590                .buffer_unordered(s3_concurrent_ops())
591                .collect()
592                .await;
593
594            let mut count = 0usize;
595            for r in results {
596                if r? {
597                    count += 1;
598                }
599            }
600            count
601        };
602
603        meta.vector_count += new_inserts;
604        meta.updated_at = Self::now();
605        self.write_namespace_meta(namespace, &meta).await?;
606
607        tracing::debug!(
608            namespace = namespace,
609            upserted = total,
610            "Upserted vectors to object storage"
611        );
612
613        Ok(total)
614    }
615
616    async fn get(&self, namespace: &NamespaceId, ids: &[VectorId]) -> Result<Vec<Vector>> {
617        if ids.is_empty() {
618            return Ok(Vec::new());
619        }
620
621        let now = std::time::SystemTime::now()
622            .duration_since(std::time::UNIX_EPOCH)
623            .unwrap_or_default()
624            .as_secs();
625
626        let read_tasks: Vec<_> = ids
627            .iter()
628            .map(|id| {
629                let operator = self.operator.clone();
630                let path = Self::vector_path(namespace, id);
631                let id = id.clone();
632                async move {
633                    match operator.read(&path).await {
634                        Ok(data) => {
635                            let bytes = data.to_vec();
636                            if bytes.is_empty() {
637                                tracing::warn!(
638                                    vector_id = %id,
639                                    "Empty vector file detected, skipping"
640                                );
641                                return Ok(None);
642                            }
643                            match serde_json::from_slice::<StoredVector>(&bytes) {
644                                Ok(stored) => {
645                                    let vector: Vector = stored.into();
646                                    if !vector.is_expired_at(now) {
647                                        Ok(Some(vector))
648                                    } else {
649                                        Ok(None)
650                                    }
651                                }
652                                Err(e) => {
653                                    tracing::warn!(
654                                        vector_id = %id,
655                                        error = %e,
656                                        bytes_len = bytes.len(),
657                                        "Corrupted vector file detected, skipping"
658                                    );
659                                    Ok(None)
660                                }
661                            }
662                        }
663                        Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(None),
664                        Err(e) => Err(DakeraError::Storage(e.to_string())),
665                    }
666                }
667            })
668            .collect();
669
670        let results: Vec<Result<Option<Vector>>> = stream::iter(read_tasks)
671            .buffer_unordered(s3_concurrent_ops())
672            .collect()
673            .await;
674
675        let mut vectors = Vec::with_capacity(ids.len());
676        for result in results {
677            if let Some(v) = result? {
678                vectors.push(v);
679            }
680        }
681        Ok(vectors)
682    }
683
684    async fn get_all(&self, namespace: &NamespaceId) -> Result<Vec<Vector>> {
685        let prefix = Self::namespace_vectors_prefix(namespace);
686
687        let entries = self
688            .operator
689            .list(&prefix)
690            .await
691            .map_err(|e| DakeraError::Storage(e.to_string()))?;
692
693        let json_paths: Vec<String> = entries
694            .into_iter()
695            .filter(|e| e.path().ends_with(".json"))
696            .map(|e| e.path().to_string())
697            .collect();
698
699        if json_paths.is_empty() {
700            return Ok(Vec::new());
701        }
702
703        let now = std::time::SystemTime::now()
704            .duration_since(std::time::UNIX_EPOCH)
705            .unwrap_or_default()
706            .as_secs();
707
708        let results: Vec<Option<Vector>> = stream::iter(json_paths.into_iter().map(|path| {
709            let operator = self.operator.clone();
710            async move {
711                match operator.read(&path).await {
712                    Ok(data) => {
713                        let bytes = data.to_vec();
714                        if let Ok(stored) = serde_json::from_slice::<StoredVector>(&bytes) {
715                            let vector: Vector = stored.into();
716                            if !vector.is_expired_at(now) {
717                                return Some(vector);
718                            }
719                        }
720                        None
721                    }
722                    Err(e) => {
723                        tracing::warn!(path = %path, error = %e, "Failed to read vector");
724                        None
725                    }
726                }
727            }
728        }))
729        .buffer_unordered(s3_concurrent_ops())
730        .collect()
731        .await;
732
733        Ok(results.into_iter().flatten().collect())
734    }
735
736    async fn delete(&self, namespace: &NamespaceId, ids: &[VectorId]) -> Result<usize> {
737        if ids.is_empty() {
738            return Ok(0);
739        }
740
741        let delete_tasks: Vec<_> = ids
742            .iter()
743            .map(|id| {
744                let operator = self.operator.clone();
745                let path = Self::vector_path(namespace, id);
746                async move {
747                    let exists = operator
748                        .exists(&path)
749                        .await
750                        .map_err(|e| DakeraError::Storage(e.to_string()))?;
751                    if exists {
752                        match operator.delete(&path).await {
753                            Ok(_) => Ok(true),
754                            Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(false),
755                            Err(e) => Err(DakeraError::Storage(e.to_string())),
756                        }
757                    } else {
758                        Ok(false)
759                    }
760                }
761            })
762            .collect();
763
764        let results: Vec<Result<bool>> = stream::iter(delete_tasks)
765            .buffer_unordered(s3_concurrent_ops())
766            .collect()
767            .await;
768
769        let mut deleted = 0;
770        for result in results {
771            if result? {
772                deleted += 1;
773            }
774        }
775
776        // Update metadata
777        if deleted > 0 {
778            if let Some(mut meta) = self.read_namespace_meta(namespace).await? {
779                meta.vector_count = meta.vector_count.saturating_sub(deleted);
780                meta.updated_at = Self::now();
781                self.write_namespace_meta(namespace, &meta).await?;
782            }
783        }
784
785        Ok(deleted)
786    }
787
788    async fn namespace_exists(&self, namespace: &NamespaceId) -> Result<bool> {
789        Ok(self.read_namespace_meta(namespace).await?.is_some())
790    }
791
792    async fn ensure_namespace(&self, namespace: &NamespaceId) -> Result<()> {
793        if self.read_namespace_meta(namespace).await?.is_none() {
794            let meta = NamespaceMetadata {
795                dimension: None,
796                vector_count: 0,
797                created_at: Self::now(),
798                updated_at: Self::now(),
799            };
800            self.write_namespace_meta(namespace, &meta).await?;
801        }
802        Ok(())
803    }
804
805    async fn dimension(&self, namespace: &NamespaceId) -> Result<Option<usize>> {
806        Ok(self
807            .read_namespace_meta(namespace)
808            .await?
809            .and_then(|m| m.dimension))
810    }
811
812    async fn count(&self, namespace: &NamespaceId) -> Result<usize> {
813        Ok(self
814            .read_namespace_meta(namespace)
815            .await?
816            .map(|m| m.vector_count)
817            .unwrap_or(0))
818    }
819
820    async fn list_namespaces(&self) -> Result<Vec<NamespaceId>> {
821        let entries = self
822            .operator
823            .list("namespaces/")
824            .await
825            .map_err(|e| DakeraError::Storage(e.to_string()))?;
826
827        let mut namespaces = Vec::new();
828        for entry in entries {
829            let path = entry.path();
830            // Extract namespace name from path like "namespaces/myns/"
831            if let Some(ns) = path.strip_prefix("namespaces/") {
832                let ns = ns.trim_end_matches('/');
833                if !ns.is_empty() && !ns.contains('/') {
834                    // Only include namespaces that actually have metadata (meta.json).
835                    // This filters out ghost directory entries left behind after
836                    // namespace deletion on backends where empty directories persist
837                    // (e.g., local filesystem, some S3-compatible stores).
838                    if self.read_namespace_meta(ns).await?.is_some() {
839                        namespaces.push(ns.to_string());
840                    }
841                }
842            }
843        }
844
845        Ok(namespaces)
846    }
847
848    async fn delete_namespace(&self, namespace: &NamespaceId) -> Result<bool> {
849        // Check if namespace exists
850        if !self.namespace_exists(namespace).await? {
851            return Ok(false);
852        }
853
854        // Recursively remove everything under the namespace prefix.
855        // This deletes vectors, metadata, indexes, AND directory entries,
856        // preventing ghost namespaces from appearing in list_namespaces().
857        let prefix = format!("namespaces/{}/", namespace);
858        self.operator
859            .delete_with(&prefix)
860            .recursive(true)
861            .await
862            .map_err(|e| DakeraError::Storage(e.to_string()))?;
863
864        tracing::debug!(
865            namespace = namespace,
866            "Deleted namespace from object storage"
867        );
868
869        Ok(true)
870    }
871
872    async fn cleanup_expired(&self, _namespace: &NamespaceId) -> Result<usize> {
873        // Object storage doesn't track TTL internally - cleanup handled by application layer
874        // StoredVector doesn't persist TTL fields, so nothing to clean up here
875        Ok(0)
876    }
877
878    async fn cleanup_all_expired(&self) -> Result<usize> {
879        // Object storage doesn't track TTL internally - cleanup handled by application layer
880        Ok(0)
881    }
882}
883
884#[async_trait]
885impl IndexStorage for ObjectStorage {
886    async fn save_index(
887        &self,
888        namespace: &NamespaceId,
889        index_type: IndexType,
890        data: Vec<u8>,
891    ) -> Result<()> {
892        let path = format!(
893            "namespaces/{}/indexes/{}.bin",
894            namespace,
895            index_type.as_str()
896        );
897        self.operator
898            .write(&path, data)
899            .await
900            .map_err(|e| DakeraError::Storage(e.to_string()))?;
901
902        tracing::debug!(
903            namespace = namespace,
904            index_type = index_type.as_str(),
905            "Saved index to object storage"
906        );
907        Ok(())
908    }
909
910    async fn load_index(
911        &self,
912        namespace: &NamespaceId,
913        index_type: IndexType,
914    ) -> Result<Option<Vec<u8>>> {
915        let path = format!(
916            "namespaces/{}/indexes/{}.bin",
917            namespace,
918            index_type.as_str()
919        );
920        match self.operator.read(&path).await {
921            Ok(data) => {
922                tracing::debug!(
923                    namespace = namespace,
924                    index_type = index_type.as_str(),
925                    size = data.len(),
926                    "Loaded index from object storage"
927                );
928                Ok(Some(data.to_vec()))
929            }
930            Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(None),
931            Err(e) => Err(DakeraError::Storage(e.to_string())),
932        }
933    }
934
935    async fn delete_index(&self, namespace: &NamespaceId, index_type: IndexType) -> Result<bool> {
936        let path = format!(
937            "namespaces/{}/indexes/{}.bin",
938            namespace,
939            index_type.as_str()
940        );
941        let exists = self
942            .operator
943            .exists(&path)
944            .await
945            .map_err(|e| DakeraError::Storage(e.to_string()))?;
946
947        if exists {
948            self.operator
949                .delete(&path)
950                .await
951                .map_err(|e| DakeraError::Storage(e.to_string()))?;
952            tracing::debug!(
953                namespace = namespace,
954                index_type = index_type.as_str(),
955                "Deleted index from object storage"
956            );
957        }
958        Ok(exists)
959    }
960
961    async fn index_exists(&self, namespace: &NamespaceId, index_type: IndexType) -> Result<bool> {
962        let path = format!(
963            "namespaces/{}/indexes/{}.bin",
964            namespace,
965            index_type.as_str()
966        );
967        self.operator
968            .exists(&path)
969            .await
970            .map_err(|e| DakeraError::Storage(e.to_string()))
971    }
972
973    async fn list_indexes(&self, namespace: &NamespaceId) -> Result<Vec<IndexType>> {
974        let prefix = format!("namespaces/{}/indexes/", namespace);
975        let entries = self
976            .operator
977            .list(&prefix)
978            .await
979            .map_err(|e| DakeraError::Storage(e.to_string()))?;
980
981        let mut indexes = Vec::new();
982        for entry in entries {
983            let path = entry.path();
984            if path.ends_with(".bin") {
985                // Extract index type from filename
986                if let Some(filename) = path.strip_prefix(&prefix) {
987                    let name = filename.trim_end_matches(".bin");
988                    match name {
989                        "hnsw" => indexes.push(IndexType::Hnsw),
990                        "pq" => indexes.push(IndexType::Pq),
991                        "ivf" => indexes.push(IndexType::Ivf),
992                        "spfresh" => indexes.push(IndexType::SpFresh),
993                        "fulltext" => indexes.push(IndexType::FullText),
994                        _ => {} // Ignore unknown index types
995                    }
996                }
997            }
998        }
999
1000        Ok(indexes)
1001    }
1002}
1003
1004/// Create an OpenDAL operator from configuration without constructing a full ObjectStorage.
1005/// Useful for lightweight S3 access (e.g., BackupManager metadata persistence).
1006pub fn create_operator(config: &ObjectStorageConfig) -> Result<Operator> {
1007    ObjectStorage::build_operator(config)
1008}
1009
1010#[cfg(test)]
1011mod tests {
1012    use super::*;
1013
1014    #[tokio::test]
1015    async fn test_object_storage_memory() {
1016        let storage = ObjectStorage::memory().unwrap();
1017        let namespace = "test".to_string();
1018
1019        // Ensure namespace
1020        storage.ensure_namespace(&namespace).await.unwrap();
1021        assert!(storage.namespace_exists(&namespace).await.unwrap());
1022
1023        // Insert vectors
1024        let vectors = vec![
1025            Vector {
1026                id: "v1".to_string(),
1027                values: vec![1.0, 2.0, 3.0],
1028                metadata: None,
1029                ttl_seconds: None,
1030                expires_at: None,
1031            },
1032            Vector {
1033                id: "v2".to_string(),
1034                values: vec![4.0, 5.0, 6.0],
1035                metadata: Some(serde_json::json!({"key": "value"})),
1036                ttl_seconds: None,
1037                expires_at: None,
1038            },
1039        ];
1040
1041        let count = storage.upsert(&namespace, vectors).await.unwrap();
1042        assert_eq!(count, 2);
1043
1044        // Get single vector
1045        let results = storage.get(&namespace, &["v1".to_string()]).await.unwrap();
1046        assert_eq!(results.len(), 1);
1047        assert_eq!(results[0].id, "v1");
1048        assert_eq!(results[0].values, vec![1.0, 2.0, 3.0]);
1049
1050        // Get all vectors
1051        let all = storage.get_all(&namespace).await.unwrap();
1052        assert_eq!(all.len(), 2);
1053
1054        // Count
1055        assert_eq!(storage.count(&namespace).await.unwrap(), 2);
1056
1057        // Delete
1058        let deleted = storage
1059            .delete(&namespace, &["v1".to_string()])
1060            .await
1061            .unwrap();
1062        assert_eq!(deleted, 1);
1063        assert!(storage
1064            .get(&namespace, &["v1".to_string()])
1065            .await
1066            .unwrap()
1067            .is_empty());
1068        assert_eq!(storage.count(&namespace).await.unwrap(), 1);
1069    }
1070
1071    #[tokio::test]
1072    async fn test_object_storage_dimension_mismatch() {
1073        let storage = ObjectStorage::memory().unwrap();
1074        let namespace = "test".to_string();
1075        storage.ensure_namespace(&namespace).await.unwrap();
1076
1077        // Insert first vector
1078        let v1 = vec![Vector {
1079            id: "v1".to_string(),
1080            values: vec![1.0, 2.0, 3.0],
1081            metadata: None,
1082            ttl_seconds: None,
1083            expires_at: None,
1084        }];
1085        storage.upsert(&namespace, v1).await.unwrap();
1086
1087        // Try to insert vector with different dimension
1088        let v2 = vec![Vector {
1089            id: "v2".to_string(),
1090            values: vec![1.0, 2.0], // Wrong dimension
1091            metadata: None,
1092            ttl_seconds: None,
1093            expires_at: None,
1094        }];
1095        let result = storage.upsert(&namespace, v2).await;
1096        assert!(result.is_err());
1097    }
1098
1099    #[tokio::test]
1100    async fn test_object_storage_upsert() {
1101        let storage = ObjectStorage::memory().unwrap();
1102        let namespace = "test".to_string();
1103        storage.ensure_namespace(&namespace).await.unwrap();
1104
1105        // Insert
1106        let v1 = vec![Vector {
1107            id: "v1".to_string(),
1108            values: vec![1.0, 2.0],
1109            metadata: None,
1110            ttl_seconds: None,
1111            expires_at: None,
1112        }];
1113        storage.upsert(&namespace, v1).await.unwrap();
1114
1115        // Upsert (update)
1116        let v1_updated = vec![Vector {
1117            id: "v1".to_string(),
1118            values: vec![3.0, 4.0],
1119            metadata: None,
1120            ttl_seconds: None,
1121            expires_at: None,
1122        }];
1123        storage.upsert(&namespace, v1_updated).await.unwrap();
1124
1125        // Verify update
1126        let results = storage.get(&namespace, &["v1".to_string()]).await.unwrap();
1127        assert_eq!(results.len(), 1);
1128        assert_eq!(results[0].values, vec![3.0, 4.0]);
1129
1130        // Count should still be 1
1131        assert_eq!(storage.count(&namespace).await.unwrap(), 1);
1132    }
1133
1134    #[tokio::test]
1135    async fn test_index_storage() {
1136        let storage = ObjectStorage::memory().unwrap();
1137        let namespace = "test_index".to_string();
1138
1139        // Initially no indexes
1140        assert!(!storage
1141            .index_exists(&namespace, IndexType::Hnsw)
1142            .await
1143            .unwrap());
1144
1145        // Save an index
1146        let index_data = b"fake hnsw index data for testing".to_vec();
1147        storage
1148            .save_index(&namespace, IndexType::Hnsw, index_data.clone())
1149            .await
1150            .unwrap();
1151
1152        // Check it exists
1153        assert!(storage
1154            .index_exists(&namespace, IndexType::Hnsw)
1155            .await
1156            .unwrap());
1157        assert!(!storage
1158            .index_exists(&namespace, IndexType::Pq)
1159            .await
1160            .unwrap());
1161
1162        // Load it back
1163        let loaded = storage
1164            .load_index(&namespace, IndexType::Hnsw)
1165            .await
1166            .unwrap();
1167        assert!(loaded.is_some());
1168        assert_eq!(loaded.unwrap(), index_data);
1169
1170        // Save another index type
1171        let pq_data = b"fake pq index data".to_vec();
1172        storage
1173            .save_index(&namespace, IndexType::Pq, pq_data)
1174            .await
1175            .unwrap();
1176
1177        // List indexes
1178        let indexes = storage.list_indexes(&namespace).await.unwrap();
1179        assert_eq!(indexes.len(), 2);
1180        assert!(indexes.contains(&IndexType::Hnsw));
1181        assert!(indexes.contains(&IndexType::Pq));
1182
1183        // Delete index
1184        let deleted = storage
1185            .delete_index(&namespace, IndexType::Hnsw)
1186            .await
1187            .unwrap();
1188        assert!(deleted);
1189        assert!(!storage
1190            .index_exists(&namespace, IndexType::Hnsw)
1191            .await
1192            .unwrap());
1193
1194        // Delete non-existent
1195        let deleted = storage
1196            .delete_index(&namespace, IndexType::Hnsw)
1197            .await
1198            .unwrap();
1199        assert!(!deleted);
1200
1201        // Load non-existent
1202        let loaded = storage
1203            .load_index(&namespace, IndexType::Hnsw)
1204            .await
1205            .unwrap();
1206        assert!(loaded.is_none());
1207    }
1208
1209    // ── DAK-5553: parallel upsert correctness ─────────────────────────────────
1210
1211    fn make_vector(id: &str, dim: usize) -> Vector {
1212        Vector {
1213            id: id.to_string(),
1214            values: vec![0.1; dim],
1215            metadata: None,
1216            ttl_seconds: None,
1217            expires_at: None,
1218        }
1219    }
1220
1221    #[tokio::test]
1222    async fn test_upsert_batch_parallel_all_new() {
1223        let storage = ObjectStorage::memory().unwrap();
1224        let ns = "batch_all_new".to_string();
1225        storage.ensure_namespace(&ns).await.unwrap();
1226
1227        // Insert 50 unique vectors in one batch call
1228        let vectors: Vec<Vector> = (0..50).map(|i| make_vector(&format!("v{i}"), 4)).collect();
1229        let count = storage.upsert(&ns, vectors).await.unwrap();
1230
1231        assert_eq!(count, 50);
1232        assert_eq!(storage.count(&ns).await.unwrap(), 50);
1233    }
1234
1235    #[tokio::test]
1236    async fn test_upsert_batch_parallel_idempotent_count() {
1237        let storage = ObjectStorage::memory().unwrap();
1238        let ns = "batch_idempotent".to_string();
1239        storage.ensure_namespace(&ns).await.unwrap();
1240
1241        let vectors: Vec<Vector> = (0..10).map(|i| make_vector(&format!("v{i}"), 4)).collect();
1242        storage.upsert(&ns, vectors.clone()).await.unwrap();
1243
1244        // Upsert the same IDs again — vector_count must not double
1245        storage.upsert(&ns, vectors).await.unwrap();
1246        assert_eq!(storage.count(&ns).await.unwrap(), 10);
1247    }
1248
1249    #[tokio::test]
1250    async fn test_upsert_batch_parallel_empty() {
1251        let storage = ObjectStorage::memory().unwrap();
1252        let ns = "batch_empty".to_string();
1253        storage.ensure_namespace(&ns).await.unwrap();
1254
1255        let count = storage.upsert(&ns, vec![]).await.unwrap();
1256        assert_eq!(count, 0);
1257        assert_eq!(storage.count(&ns).await.unwrap(), 0);
1258    }
1259
1260    #[tokio::test]
1261    async fn test_upsert_batch_parallel_large_batch() {
1262        // Verify that a batch exceeding the default concurrency window (16) processes all items
1263        let storage = ObjectStorage::memory().unwrap();
1264        let ns = "batch_large".to_string();
1265        storage.ensure_namespace(&ns).await.unwrap();
1266
1267        let vectors: Vec<Vector> = (0..200).map(|i| make_vector(&format!("v{i}"), 8)).collect();
1268        let count = storage.upsert(&ns, vectors).await.unwrap();
1269
1270        assert_eq!(count, 200);
1271        assert_eq!(storage.count(&ns).await.unwrap(), 200);
1272    }
1273
1274    // DAK-6287: the per-vector exists() stat was replaced by a single directory list()
1275    // to classify insert-vs-update for vector_count. Prove a batch mixing updates of
1276    // existing ids with new inserts produces the exact count (only new ids increment).
1277    #[tokio::test]
1278    async fn test_upsert_count_mixed_insert_update_batch() {
1279        let storage = ObjectStorage::memory().unwrap();
1280        let ns = "mixed".to_string();
1281        storage.ensure_namespace(&ns).await.unwrap();
1282
1283        // Seed two vectors.
1284        storage
1285            .upsert(&ns, vec![make_vector("v0", 4), make_vector("v1", 4)])
1286            .await
1287            .unwrap();
1288        assert_eq!(storage.count(&ns).await.unwrap(), 2);
1289
1290        // Batch updates v1 (existing) and inserts v2, v3 (new) => count must be 4, not 5.
1291        storage
1292            .upsert(
1293                &ns,
1294                vec![
1295                    make_vector("v1", 4),
1296                    make_vector("v2", 4),
1297                    make_vector("v3", 4),
1298                ],
1299            )
1300            .await
1301            .unwrap();
1302        assert_eq!(storage.count(&ns).await.unwrap(), 4);
1303
1304        // Re-upserting only existing ids must not change the count.
1305        storage
1306            .upsert(&ns, vec![make_vector("v0", 4), make_vector("v3", 4)])
1307            .await
1308            .unwrap();
1309        assert_eq!(storage.count(&ns).await.unwrap(), 4);
1310    }
1311
1312    // DAK-6287: exercise the raw-fs fast write path (real Filesystem backend) end to end —
1313    // it must produce the same on-disk layout the OpenDAL read path expects, keep exact
1314    // count semantics, and safely fall back for traversal-unsafe ids without escaping root.
1315    #[tokio::test]
1316    async fn test_upsert_fs_fast_path_roundtrip() {
1317        let tmp = tempfile::tempdir().unwrap();
1318        let storage = ObjectStorage::filesystem(tmp.path().to_str().unwrap()).unwrap();
1319        let ns = "fsfast".to_string();
1320        storage.ensure_namespace(&ns).await.unwrap();
1321
1322        // Insert via the fast path.
1323        storage
1324            .upsert(
1325                &ns,
1326                vec![
1327                    make_vector("a", 4),
1328                    make_vector("b", 4),
1329                    make_vector("c", 4),
1330                ],
1331            )
1332            .await
1333            .unwrap();
1334        assert_eq!(storage.count(&ns).await.unwrap(), 3);
1335
1336        // Read back through OpenDAL: proves the fast-path files are where the read path looks.
1337        let got = storage
1338            .get(&ns, &["a".to_string(), "b".to_string()])
1339            .await
1340            .unwrap();
1341        assert_eq!(got.len(), 2);
1342
1343        // Update b (existing) + insert d (new) in one batch => count 4, not 5.
1344        storage
1345            .upsert(&ns, vec![make_vector("b", 4), make_vector("d", 4)])
1346            .await
1347            .unwrap();
1348        assert_eq!(storage.count(&ns).await.unwrap(), 4);
1349
1350        // Guard: a batch containing a traversal-unsafe id must not panic or escape the
1351        // storage root; it falls back to the OpenDAL path and the safe sibling persists.
1352        storage
1353            .upsert(
1354                &ns,
1355                vec![make_vector("safe1", 4), make_vector("../escape", 4)],
1356            )
1357            .await
1358            .unwrap();
1359        let got = storage.get(&ns, &["safe1".to_string()]).await.unwrap();
1360        assert_eq!(got.len(), 1);
1361        // Nothing was written above the storage root.
1362        assert!(!tmp.path().parent().unwrap().join("escape.json").exists());
1363    }
1364
1365    // DAK-6299: ingest pattern — many small batches into ONE growing namespace. This is the
1366    // O(N^2) scenario the removed per-batch full-namespace list() created. vector_count must
1367    // stay exact when classified incrementally per batch (fs fast path: per-file stat).
1368    // Each batch overlaps the previous one by one id (an update), so only the genuinely-new
1369    // id may increment the count.
1370    #[tokio::test]
1371    async fn test_upsert_incremental_count_growing_namespace_fs() {
1372        let tmp = tempfile::tempdir().unwrap();
1373        let storage = ObjectStorage::filesystem(tmp.path().to_str().unwrap()).unwrap();
1374        let ns = "grow".to_string();
1375        storage.ensure_namespace(&ns).await.unwrap();
1376
1377        const BATCHES: usize = 40;
1378        for b in 0..BATCHES {
1379            // Batch b writes ids {b, b+1}: id `b` is an update of the prior batch's new id,
1380            // id `b+1` is the only genuinely new vector => count grows by exactly 1 per batch.
1381            let new_id = format!("v{}", b + 1);
1382            let upd_id = format!("v{}", b);
1383            storage
1384                .upsert(&ns, vec![make_vector(&upd_id, 4), make_vector(&new_id, 4)])
1385                .await
1386                .unwrap();
1387            // After batch b we have ids v0..=v(b+1) => b+2 distinct vectors.
1388            assert_eq!(storage.count(&ns).await.unwrap(), b + 2);
1389        }
1390        assert_eq!(storage.count(&ns).await.unwrap(), BATCHES + 1);
1391    }
1392
1393    // DAK-6299: same incremental-count contract on the OpenDAL path (Memory backend), which
1394    // classifies inserts via a per-vector exists() rather than the fs stat.
1395    #[tokio::test]
1396    async fn test_upsert_incremental_count_growing_namespace_opendal() {
1397        let storage = ObjectStorage::memory().unwrap();
1398        let ns = "grow-od".to_string();
1399        storage.ensure_namespace(&ns).await.unwrap();
1400
1401        const BATCHES: usize = 30;
1402        for b in 0..BATCHES {
1403            let new_id = format!("v{}", b + 1);
1404            let upd_id = format!("v{}", b);
1405            storage
1406                .upsert(&ns, vec![make_vector(&upd_id, 4), make_vector(&new_id, 4)])
1407                .await
1408                .unwrap();
1409            assert_eq!(storage.count(&ns).await.unwrap(), b + 2);
1410        }
1411        assert_eq!(storage.count(&ns).await.unwrap(), BATCHES + 1);
1412    }
1413
1414    // DAK-6287: per-sub-stage profile of ObjectStorage::upsert on the fs backend.
1415    //
1416    // Post-tier (DAKERA_TIERED=1, embedding deferred) the per-stage ingest profile
1417    // (PR#609) names storage_upsert as the dominant stage (83.6% of the tiered
1418    // pipeline). This test decomposes that stage into its three per-vector fs ops
1419    // (exists stat / write-tmp / rename) plus candidate fixes, all at the production
1420    // concurrency window, so the dominant SUB-stage is provable rather than assumed.
1421    //
1422    // Ignored by default (writes thousands of files); run explicitly:
1423    //   cargo test -p dakera-storage --release profile_upsert_substages -- --ignored --nocapture
1424    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1425    #[ignore]
1426    async fn profile_upsert_substages() {
1427        use std::time::Instant;
1428        const N: usize = 200;
1429        const DIM: usize = 1024; // bge-large-en-v1.5
1430        let conc = s3_concurrent_ops();
1431
1432        // Realistic payload: 1024-d vector + LoCoMo-shaped metadata.
1433        let make = |i: usize| -> Vector {
1434            Vector {
1435                id: format!("prof-{i}"),
1436                values: vec![0.0123_f32; DIM],
1437                metadata: Some(serde_json::json!({
1438                    "_embedding_kind": "static",
1439                    "tags": ["dia_1", "speaker_a", "session_3", "2026-06-03"],
1440                    "importance": 0.7,
1441                    "session_id": format!("sess-{}", i % 8),
1442                })),
1443                ttl_seconds: None,
1444                expires_at: None,
1445            }
1446        };
1447        let vectors: Vec<Vector> = (0..N).map(make).collect();
1448        let payload_bytes = serde_json::to_vec(&StoredVector::from(vectors[0].clone()))
1449            .unwrap()
1450            .len();
1451
1452        // --- V0: authoritative current upsert() (real code path) ---
1453        let tmp0 = tempfile::tempdir().unwrap();
1454        let s0 = ObjectStorage::filesystem(tmp0.path().to_str().unwrap()).unwrap();
1455        let ns = "prof".to_string();
1456        s0.ensure_namespace(&ns).await.unwrap();
1457        let t = Instant::now();
1458        let n = s0.upsert(&ns, vectors.clone()).await.unwrap();
1459        let v0_ms = t.elapsed().as_secs_f64() * 1000.0;
1460        assert_eq!(n, N);
1461
1462        // Helper: run a per-vector closure over a fresh fs operator at `conc` and time it.
1463        async fn timed<F, Fut>(label: &str, n: usize, conc: usize, f: F) -> f64
1464        where
1465            F: Fn(usize, Operator) -> Fut,
1466            Fut: std::future::Future<Output = ()>,
1467        {
1468            let tmp = tempfile::tempdir().unwrap();
1469            let op = ObjectStorage::filesystem(tmp.path().to_str().unwrap())
1470                .unwrap()
1471                .operator;
1472            let t = Instant::now();
1473            stream::iter(0..n)
1474                .map(|i| {
1475                    let op = op.clone();
1476                    f(i, op)
1477                })
1478                .buffer_unordered(conc)
1479                .collect::<Vec<()>>()
1480                .await;
1481            let ms = t.elapsed().as_secs_f64() * 1000.0;
1482            // keep tmp alive until after timing
1483            drop(tmp);
1484            let _ = label;
1485            ms
1486        }
1487
1488        let data: Vec<Vec<u8>> = vectors
1489            .iter()
1490            .map(|v| serde_json::to_vec(&StoredVector::from(v.clone())).unwrap())
1491            .collect();
1492        let path_of = |i: usize| ObjectStorage::vector_path(&ns, &format!("prof-{i}"));
1493
1494        // P_exists: 200 stats only (the exists() round-trip upsert does per vector).
1495        let d = data.clone();
1496        let p_exists = timed("exists", N, conc, move |i, op| {
1497            let path = path_of(i);
1498            let _ = &d;
1499            async move {
1500                let _ = op.exists(&path).await;
1501            }
1502        })
1503        .await;
1504
1505        // P_write_direct: 200 direct writes (no tmp, no rename, no exists).
1506        let d = data.clone();
1507        let p_write_direct = timed("write_direct", N, conc, move |i, op| {
1508            let path = path_of(i);
1509            let bytes = d[i].clone();
1510            async move {
1511                op.write(&path, bytes).await.unwrap();
1512            }
1513        })
1514        .await;
1515
1516        // P_write_tmp_rename: 200 × (write tmp + rename) — the atomic write, no exists.
1517        let d = data.clone();
1518        let p_tmp_rename = timed("write_tmp_rename", N, conc, move |i, op| {
1519            let path = path_of(i);
1520            let bytes = d[i].clone();
1521            async move {
1522                let seq = TMP_COUNTER.fetch_add(1, Ordering::Relaxed);
1523                let tmp_path = format!("{path}.tmp.{seq}");
1524                op.write(&tmp_path, bytes).await.unwrap();
1525                op.rename(&tmp_path, &path).await.unwrap();
1526            }
1527        })
1528        .await;
1529
1530        // P_full: exists + write tmp + rename (mirror of current upsert per-vector body).
1531        let d = data.clone();
1532        let p_full = timed("exists+tmp+rename", N, conc, move |i, op| {
1533            let path = path_of(i);
1534            let bytes = d[i].clone();
1535            async move {
1536                let _ = op.exists(&path).await;
1537                let seq = TMP_COUNTER.fetch_add(1, Ordering::Relaxed);
1538                let tmp_path = format!("{path}.tmp.{seq}");
1539                op.write(&tmp_path, bytes).await.unwrap();
1540                op.rename(&tmp_path, &path).await.unwrap();
1541            }
1542        })
1543        .await;
1544
1545        // --- Concurrency sweep on the real write pattern: does throughput scale with
1546        // parallelism (I/O-latency-bound) or plateau (CPU/serialized)? ---
1547        let mut sweep = Vec::new();
1548        for &c in &[8usize, 16, 32, 64, 128] {
1549            let d = data.clone();
1550            let ms = timed("sweep", N, c, move |i, op| {
1551                let path = path_of(i);
1552                let bytes = d[i].clone();
1553                async move {
1554                    let seq = TMP_COUNTER.fetch_add(1, Ordering::Relaxed);
1555                    let tmp_path = format!("{path}.tmp.{seq}");
1556                    op.write(&tmp_path, bytes).await.unwrap();
1557                    op.rename(&tmp_path, &path).await.unwrap();
1558                }
1559            })
1560            .await;
1561            sweep.push((c, ms));
1562        }
1563
1564        // --- fsync isolation: raw std::fs write vs write+sync_all at conc 16, to test
1565        // whether the per-file durability flush is the dominant write cost (the issue's
1566        // "disable per-write sync" lever). ---
1567        async fn raw_fs(n: usize, conc: usize, bytes: usize, do_sync: bool) -> f64 {
1568            let dir = tempfile::tempdir().unwrap();
1569            let root = dir.path().to_path_buf();
1570            let payload = vec![0u8; bytes];
1571            let t = Instant::now();
1572            stream::iter(0..n)
1573                .map(|i| {
1574                    let root = root.clone();
1575                    let payload = payload.clone();
1576                    async move {
1577                        tokio::task::spawn_blocking(move || {
1578                            use std::io::Write;
1579                            let p = root.join(format!("f{i}.bin"));
1580                            let mut f = std::fs::File::create(&p).unwrap();
1581                            f.write_all(&payload).unwrap();
1582                            if do_sync {
1583                                f.sync_all().unwrap();
1584                            }
1585                        })
1586                        .await
1587                        .unwrap();
1588                    }
1589                })
1590                .buffer_unordered(conc)
1591                .collect::<Vec<()>>()
1592                .await;
1593            let ms = t.elapsed().as_secs_f64() * 1000.0;
1594            drop(dir);
1595            ms
1596        }
1597        let raw_nosync = raw_fs(N, 16, payload_bytes, false).await;
1598        let raw_sync = raw_fs(N, 16, payload_bytes, true).await;
1599
1600        let rate = |ms: f64| N as f64 / (ms / 1000.0);
1601        eprintln!("\n=== DAK-6287 storage_upsert sub-stage profile (fs backend) ===");
1602        eprintln!("N={N} dim={DIM} payload={payload_bytes}B concurrency={conc}");
1603        eprintln!(
1604            "V0 upsert() REAL path      : {v0_ms:8.2} ms  {:8.1} mem/s",
1605            rate(v0_ms)
1606        );
1607        eprintln!(
1608            "P_full exists+tmp+rename   : {p_full:8.2} ms  {:8.1} mem/s",
1609            rate(p_full)
1610        );
1611        eprintln!(
1612            "P_exists (stat only)       : {p_exists:8.2} ms  {:8.1} mem/s",
1613            rate(p_exists)
1614        );
1615        eprintln!(
1616            "P_write_tmp_rename         : {p_tmp_rename:8.2} ms  {:8.1} mem/s",
1617            rate(p_tmp_rename)
1618        );
1619        eprintln!(
1620            "P_write_direct             : {p_write_direct:8.2} ms  {:8.1} mem/s",
1621            rate(p_write_direct)
1622        );
1623        eprintln!("--- attribution (of P_full) ---");
1624        eprintln!("exists share   : {:5.1}%", 100.0 * p_exists / p_full);
1625        eprintln!("rename overhead: {:5.1}%  (tmp+rename {p_tmp_rename:.1} vs direct {p_write_direct:.1})",
1626            100.0 * (p_tmp_rename - p_write_direct).max(0.0) / p_full);
1627        eprintln!("--- concurrency sweep (write tmp+rename) ---");
1628        for (c, ms) in &sweep {
1629            eprintln!("conc={c:4} : {ms:8.2} ms  {:8.1} mem/s", rate(*ms));
1630        }
1631        eprintln!("--- fsync isolation (raw std::fs, conc=16, {payload_bytes}B) ---");
1632        eprintln!(
1633            "write no-sync : {raw_nosync:8.2} ms  {:8.1} mem/s",
1634            rate(raw_nosync)
1635        );
1636        eprintln!(
1637            "write+sync_all: {raw_sync:8.2} ms  {:8.1} mem/s",
1638            rate(raw_sync)
1639        );
1640        eprintln!(
1641            "=> fsync cost factor: {:.1}x",
1642            raw_sync / raw_nosync.max(0.001)
1643        );
1644    }
1645}