Skip to main content

storage/
object.rs

1//! Object Storage Backend using OpenDAL
2//!
3//! Provides storage abstraction for multiple cloud storage backends:
4//! - S3-compatible (AWS S3, MinIO, DigitalOcean Spaces, etc.)
5//! - Azure Blob Storage
6//! - Google Cloud Storage (GCS)
7//! - Local filesystem
8//! - In-memory (for testing)
9
10use async_trait::async_trait;
11use common::{DakeraError, NamespaceId, Result, Vector, VectorId};
12use futures_util::stream::{self, StreamExt};
13use opendal::{layers::RetryLayer, services, Operator};
14use serde::{Deserialize, Serialize};
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::time::Duration;
17
18static TMP_COUNTER: AtomicU64 = AtomicU64::new(0);
19
20use crate::traits::{IndexStorage, IndexType, VectorStorage};
21
22fn s3_concurrent_ops() -> usize {
23    std::env::var("DAKERA_S3_CONCURRENT_OPS")
24        .ok()
25        .and_then(|v| v.parse().ok())
26        .unwrap_or(16)
27}
28
29/// Serializable namespace metadata
30#[derive(Debug, Clone, Serialize, Deserialize)]
31struct NamespaceMetadata {
32    dimension: Option<usize>,
33    vector_count: usize,
34    created_at: u64,
35    updated_at: u64,
36}
37
38/// Serializable vector data for storage
39#[derive(Debug, Clone, Serialize, Deserialize)]
40struct StoredVector {
41    id: String,
42    values: Vec<f32>,
43    metadata: Option<serde_json::Value>,
44}
45
46impl From<Vector> for StoredVector {
47    fn from(v: Vector) -> Self {
48        Self {
49            id: v.id,
50            values: v.values,
51            metadata: v.metadata,
52        }
53    }
54}
55
56impl From<StoredVector> for Vector {
57    fn from(v: StoredVector) -> Self {
58        Self {
59            id: v.id,
60            values: v.values,
61            metadata: v.metadata,
62            ttl_seconds: None,
63            expires_at: None,
64        }
65    }
66}
67
68/// Object storage backend configuration
69#[derive(Debug, Clone, Default)]
70pub enum ObjectStorageConfig {
71    /// In-memory storage (for testing)
72    #[default]
73    Memory,
74    /// Local filesystem storage
75    Filesystem { root: String },
76    /// S3-compatible storage (S3, MinIO, etc.)
77    S3 {
78        bucket: String,
79        region: Option<String>,
80        endpoint: Option<String>,
81        access_key_id: Option<String>,
82        secret_access_key: Option<String>,
83    },
84    /// Azure Blob Storage
85    Azure {
86        container: String,
87        account_name: String,
88        account_key: Option<String>,
89        sas_token: Option<String>,
90        endpoint: Option<String>,
91    },
92    /// Google Cloud Storage
93    Gcs {
94        bucket: String,
95        credential_path: Option<String>,
96        endpoint: Option<String>,
97    },
98}
99
100/// Object storage backend using OpenDAL
101#[derive(Clone)]
102pub struct ObjectStorage {
103    operator: Operator,
104    /// DAK-6287: local filesystem root, retained ONLY when the backend is a real
105    /// local filesystem. Lets `upsert` take a raw-`std::fs` fast write path that
106    /// bypasses OpenDAL's per-op overhead (~9× faster on the same disk; profiled).
107    /// `None` for memory/S3/Azure/GCS — those keep the OpenDAL path.
108    fs_root: Option<std::path::PathBuf>,
109    /// DAK-6289: whether write-path operations must use the tmp-file + rename atomic
110    /// publish dance. TRUE only for the local `Filesystem` backend, where a direct
111    /// overwrite is non-atomic and a concurrent reader can observe a truncated file
112    /// during the write window (O_TRUNC race, DAK-4545). FALSE for `Memory` and every
113    /// object store (`S3`/`Azure`/`Gcs`): their `PUT` is atomic and read-after-write
114    /// consistent — a reader sees either the prior object or the fully-written new one,
115    /// never a torn read. Object stores have no POSIX rename: OpenDAL's S3 service returns
116    /// `Unsupported` for `rename` (verified against live MinIO, DAK-6289), so the old
117    /// tmp+rename path ALWAYS fell through to its fallback (delete(tmp) + direct write) =
118    /// a wasted PUT(tmp) + DELETE(tmp) per object for zero durability benefit. We write
119    /// direct to the final key instead.
120    needs_tmp_rename: bool,
121}
122
123impl ObjectStorage {
124    /// Create a new object storage with the given configuration
125    pub fn new(config: ObjectStorageConfig) -> Result<Self> {
126        let operator = Self::build_operator(&config)?;
127        let fs_root = match &config {
128            ObjectStorageConfig::Filesystem { root } => Some(std::path::PathBuf::from(root)),
129            _ => None,
130        };
131        let needs_tmp_rename = Self::needs_tmp_rename_for(&config);
132        Ok(Self {
133            operator,
134            fs_root,
135            needs_tmp_rename,
136        })
137    }
138
139    /// DAK-6289: whether `config`'s backend needs the tmp-file + rename atomic publish.
140    /// TRUE only for the local `Filesystem` (a direct overwrite there is non-atomic and
141    /// a concurrent reader can observe a truncated file — the O_TRUNC race, DAK-4545).
142    /// FALSE for `Memory` and every object store (`S3`/`Azure`/`Gcs`), whose `PUT` is
143    /// atomic and read-after-write consistent, so we write direct to the final key and
144    /// avoid OpenDAL's COPY+DELETE rename on S3.
145    fn needs_tmp_rename_for(config: &ObjectStorageConfig) -> bool {
146        matches!(config, ObjectStorageConfig::Filesystem { .. })
147    }
148
149    /// DAK-6287: true when `s` is safe as a single filesystem path segment (non-empty,
150    /// no separators, no parent refs, no NUL). Gates the raw-fs fast write path so a
151    /// crafted namespace or vector id cannot escape the storage root via path traversal.
152    fn is_fs_safe_segment(s: &str) -> bool {
153        !s.is_empty()
154            && s != "."
155            && s != ".."
156            && !s.contains('/')
157            && !s.contains('\\')
158            && !s.contains('\0')
159    }
160
161    /// DAK-6287: raw-`std::fs` batch write for the local Filesystem backend. Each vector is
162    /// written to `<root>/namespaces/<ns>/vectors/<id>.json` via tmp+fsync+rename, matching
163    /// the OpenDAL on-disk layout byte-for-byte so the read/recall path is unchanged. The
164    /// directory is created once per batch; each file is fsync'd before its atomic rename, so
165    /// crash-consistency is >= the OpenDAL path. Bypasses OpenDAL's per-op overhead, which the
166    /// storage_upsert profile showed dominates (~9× faster on the same disk). Callers MUST
167    /// ensure `namespace` and every vector id pass `is_fs_safe_segment`.
168    ///
169    /// Returns the number of NEW inserts (ids not already present on disk before the batch),
170    /// so the caller can maintain `meta.vector_count` without a separate O(namespace) listing
171    /// (DAK-6299). Classification is a single cheap raw-`std::fs` stat per file (O(batch),
172    /// ~1% of upsert cost per the storage_upsert profile), done inside the per-file write
173    /// task so it adds no extra round-trip pass over the namespace.
174    async fn write_vectors_fs(
175        &self,
176        root: &std::path::Path,
177        namespace: &NamespaceId,
178        vectors: Vec<Vector>,
179    ) -> Result<usize> {
180        let vectors_dir = root.join(format!("namespaces/{}/vectors", namespace));
181        tokio::fs::create_dir_all(&vectors_dir)
182            .await
183            .map_err(|e| DakeraError::Storage(e.to_string()))?;
184
185        let results: Vec<Result<bool>> = stream::iter(vectors)
186            .map(|vector| {
187                let dir = vectors_dir.clone();
188                async move {
189                    let id = vector.id.clone();
190                    let stored: StoredVector = vector.into();
191                    let data = serde_json::to_vec(&stored)
192                        .map_err(|e| DakeraError::Storage(e.to_string()))?;
193                    let final_path = dir.join(format!("{}.json", id));
194                    let seq = TMP_COUNTER.fetch_add(1, Ordering::Relaxed);
195                    // Leading dot keeps the temp file out of the `*.json` id listing.
196                    let tmp_path = dir.join(format!(".{}.tmp.{}", id, seq));
197                    tokio::task::spawn_blocking(move || -> Result<bool> {
198                        use std::io::Write;
199                        // Cheap per-file stat classifies insert-vs-update (matches the prior
200                        // !exists semantics) without scanning the whole namespace.
201                        let is_new = !final_path.exists();
202                        let mut f = std::fs::File::create(&tmp_path)
203                            .map_err(|e| DakeraError::Storage(e.to_string()))?;
204                        f.write_all(&data)
205                            .map_err(|e| DakeraError::Storage(e.to_string()))?;
206                        // Flush contents before the atomic publish so a crash can never
207                        // expose a half-written vector file (>= OpenDAL durability).
208                        f.sync_all()
209                            .map_err(|e| DakeraError::Storage(e.to_string()))?;
210                        drop(f);
211                        if let Err(e) = std::fs::rename(&tmp_path, &final_path) {
212                            let _ = std::fs::remove_file(&tmp_path);
213                            return Err(DakeraError::Storage(e.to_string()));
214                        }
215                        Ok(is_new)
216                    })
217                    .await
218                    .map_err(|e| DakeraError::Storage(e.to_string()))?
219                }
220            })
221            .buffer_unordered(s3_concurrent_ops())
222            .collect()
223            .await;
224
225        let mut new_inserts = 0usize;
226        for r in results {
227            if r? {
228                new_inserts += 1;
229            }
230        }
231        Ok(new_inserts)
232    }
233
234    /// Create in-memory storage (for testing)
235    pub fn memory() -> Result<Self> {
236        Self::new(ObjectStorageConfig::Memory)
237    }
238
239    /// Create filesystem storage
240    pub fn filesystem(root: impl Into<String>) -> Result<Self> {
241        Self::new(ObjectStorageConfig::Filesystem { root: root.into() })
242    }
243
244    /// Create S3 storage
245    pub fn s3(bucket: impl Into<String>) -> Result<Self> {
246        Self::new(ObjectStorageConfig::S3 {
247            bucket: bucket.into(),
248            region: None,
249            endpoint: None,
250            access_key_id: None,
251            secret_access_key: None,
252        })
253    }
254
255    /// Create Azure Blob storage
256    pub fn azure(container: impl Into<String>, account_name: impl Into<String>) -> Result<Self> {
257        Self::new(ObjectStorageConfig::Azure {
258            container: container.into(),
259            account_name: account_name.into(),
260            account_key: None,
261            sas_token: None,
262            endpoint: None,
263        })
264    }
265
266    /// Create Google Cloud Storage
267    pub fn gcs(bucket: impl Into<String>) -> Result<Self> {
268        Self::new(ObjectStorageConfig::Gcs {
269            bucket: bucket.into(),
270            credential_path: None,
271            endpoint: None,
272        })
273    }
274
275    pub fn build_operator(config: &ObjectStorageConfig) -> Result<Operator> {
276        match config {
277            ObjectStorageConfig::Memory => {
278                let builder = services::Memory::default();
279                Operator::new(builder)
280                    .map(|op| op.finish())
281                    .map_err(|e| DakeraError::Storage(e.to_string()))
282            }
283            ObjectStorageConfig::Filesystem { root } => {
284                let builder = services::Fs::default().root(root);
285                Operator::new(builder)
286                    .map(|op| op.layer(Self::retry_layer()).finish())
287                    .map_err(|e| DakeraError::Storage(e.to_string()))
288            }
289            ObjectStorageConfig::S3 {
290                bucket,
291                region,
292                endpoint,
293                access_key_id,
294                secret_access_key,
295            } => {
296                let mut builder = services::S3::default().bucket(bucket);
297
298                if let Some(region) = region {
299                    builder = builder.region(region);
300                }
301                if let Some(endpoint) = endpoint {
302                    builder = builder.endpoint(endpoint);
303                }
304                if let Some(key) = access_key_id {
305                    builder = builder.access_key_id(key);
306                }
307                if let Some(secret) = secret_access_key {
308                    builder = builder.secret_access_key(secret);
309                }
310
311                Operator::new(builder)
312                    .map(|op| op.layer(Self::retry_layer()).finish())
313                    .map_err(|e| DakeraError::Storage(e.to_string()))
314            }
315            ObjectStorageConfig::Azure {
316                container,
317                account_name,
318                account_key,
319                sas_token,
320                endpoint,
321            } => {
322                let mut builder = services::Azblob::default()
323                    .container(container)
324                    .account_name(account_name);
325
326                if let Some(key) = account_key {
327                    builder = builder.account_key(key);
328                }
329                if let Some(token) = sas_token {
330                    builder = builder.sas_token(token);
331                }
332                if let Some(endpoint) = endpoint {
333                    builder = builder.endpoint(endpoint);
334                }
335
336                Operator::new(builder)
337                    .map(|op| op.layer(Self::retry_layer()).finish())
338                    .map_err(|e| DakeraError::Storage(e.to_string()))
339            }
340            ObjectStorageConfig::Gcs {
341                bucket,
342                credential_path,
343                endpoint,
344            } => {
345                let mut builder = services::Gcs::default().bucket(bucket);
346
347                if let Some(cred_path) = credential_path {
348                    builder = builder.credential_path(cred_path);
349                }
350                if let Some(endpoint) = endpoint {
351                    builder = builder.endpoint(endpoint);
352                }
353
354                Operator::new(builder)
355                    .map(|op| op.layer(Self::retry_layer()).finish())
356                    .map_err(|e| DakeraError::Storage(e.to_string()))
357            }
358        }
359    }
360
361    fn retry_layer() -> RetryLayer {
362        // DAK-3430: default capped from 10 → 3 so a MinIO 429 storm fails fast
363        // (≤3×60s ≈ 3min) rather than blocking production for 10min. Override
364        // with DAKERA_S3_MAX_RETRIES=10 for high-resilience deployments.
365        let max_times: usize = std::env::var("DAKERA_S3_MAX_RETRIES")
366            .ok()
367            .and_then(|v| v.parse().ok())
368            .unwrap_or(3);
369        let min_delay_ms: u64 = std::env::var("DAKERA_S3_RETRY_MIN_DELAY_MS")
370            .ok()
371            .and_then(|v| v.parse().ok())
372            .unwrap_or(500);
373        let max_delay_secs: u64 = std::env::var("DAKERA_S3_RETRY_MAX_DELAY_SECS")
374            .ok()
375            .and_then(|v| v.parse().ok())
376            .unwrap_or(60);
377
378        tracing::info!(
379            max_times,
380            min_delay_ms,
381            max_delay_secs,
382            "S3 retry layer configured"
383        );
384
385        RetryLayer::new()
386            .with_max_times(max_times)
387            .with_min_delay(Duration::from_millis(min_delay_ms))
388            .with_max_delay(Duration::from_secs(max_delay_secs))
389            .with_jitter()
390            .with_factor(2.0)
391    }
392
393    /// Get the path for a vector
394    fn vector_path(namespace: &str, vector_id: &str) -> String {
395        format!("namespaces/{}/vectors/{}.json", namespace, vector_id)
396    }
397
398    /// Get the path for namespace metadata
399    fn namespace_meta_path(namespace: &str) -> String {
400        format!("namespaces/{}/meta.json", namespace)
401    }
402
403    /// Get the path prefix for all vectors in a namespace
404    fn namespace_vectors_prefix(namespace: &str) -> String {
405        format!("namespaces/{}/vectors/", namespace)
406    }
407
408    /// Read namespace metadata
409    async fn read_namespace_meta(&self, namespace: &str) -> Result<Option<NamespaceMetadata>> {
410        let path = Self::namespace_meta_path(namespace);
411        match self.operator.read(&path).await {
412            Ok(data) => {
413                let bytes = data.to_vec();
414                if bytes.is_empty() {
415                    tracing::warn!(
416                        namespace = %namespace,
417                        path = %path,
418                        "Empty namespace metadata file detected, treating as missing"
419                    );
420                    return Ok(None);
421                }
422                match serde_json::from_slice(&bytes) {
423                    Ok(meta) => Ok(Some(meta)),
424                    Err(e) => {
425                        tracing::warn!(
426                            namespace = %namespace,
427                            path = %path,
428                            error = %e,
429                            bytes_len = bytes.len(),
430                            "Corrupted namespace metadata, treating as missing and will be recreated"
431                        );
432                        Ok(None)
433                    }
434                }
435            }
436            Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(None),
437            Err(e) => Err(DakeraError::Storage(e.to_string())),
438        }
439    }
440
441    /// Write namespace metadata atomically.
442    ///
443    /// Writes to a `.tmp` file first, then renames to the final path. On POSIX filesystems
444    async fn write_namespace_meta(&self, namespace: &str, meta: &NamespaceMetadata) -> Result<()> {
445        let path = Self::namespace_meta_path(namespace);
446        let data = serde_json::to_vec(meta).map_err(|e| DakeraError::Storage(e.to_string()))?;
447        self.write_atomic(&path, data).await
448    }
449
450    /// Write data to `path` atomically.
451    ///
452    /// On the local `Filesystem` backend (`needs_tmp_rename == true`) this writes to a
453    /// `.tmp` file first, then renames to the final path. On POSIX filesystems rename(2)
454    /// is atomic within the same device, preventing concurrent readers from seeing a
455    /// truncated/empty file during the write window (O_TRUNC race, DAK-4545).
456    ///
457    /// On object stores and memory (`needs_tmp_rename == false`) the backend's `PUT` is
458    /// already atomic and read-after-write consistent, so we write direct to the final
459    /// key. This is the DAK-6289 fix: OpenDAL's S3 service has no `rename` (returns
460    /// `Unsupported`), so the old tmp+rename always fell through to its fallback —
461    /// delete(tmp) + direct write — turning each metadata write into PUT(tmp) +
462    /// DELETE(tmp) + PUT(final) for zero durability gain. A direct `PUT` still leaves
463    /// readers on the previous version until it completes — no torn-read window exists.
464    async fn write_atomic(&self, path: &str, data: Vec<u8>) -> Result<()> {
465        if !self.needs_tmp_rename {
466            // Atomic-PUT backend (S3/Azure/GCS/Memory): write straight to the final key.
467            return self
468                .operator
469                .write(path, data)
470                .await
471                .map(|_| ())
472                .map_err(|e| DakeraError::Storage(e.to_string()));
473        }
474        let seq = TMP_COUNTER.fetch_add(1, Ordering::Relaxed);
475        let tmp_path = format!("{}.tmp.{}.{}", path, Self::now(), seq);
476        let rename_result = async {
477            self.operator
478                .write(&tmp_path, data.clone())
479                .await
480                .map_err(|e| DakeraError::Storage(e.to_string()))?;
481            self.operator
482                .rename(&tmp_path, path)
483                .await
484                .map_err(|e| DakeraError::Storage(e.to_string()))?;
485            Ok::<(), DakeraError>(())
486        }
487        .await;
488        if let Err(e) = rename_result {
489            // Backend doesn't support rename: fall back to direct write.
490            tracing::debug!(path = %path, error = %e, "atomic rename failed, falling back to direct write");
491            let _ = self.operator.delete(&tmp_path).await;
492            self.operator
493                .write(path, data)
494                .await
495                .map_err(|e| DakeraError::Storage(e.to_string()))?;
496        }
497        Ok(())
498    }
499
500    /// Get current timestamp
501    fn now() -> u64 {
502        std::time::SystemTime::now()
503            .duration_since(std::time::UNIX_EPOCH)
504            .unwrap_or_default()
505            .as_secs()
506    }
507}
508
509#[async_trait]
510impl VectorStorage for ObjectStorage {
511    async fn upsert(&self, namespace: &NamespaceId, vectors: Vec<Vector>) -> Result<usize> {
512        if vectors.is_empty() {
513            return Ok(0);
514        }
515
516        // Get or create namespace metadata
517        let mut meta = self
518            .read_namespace_meta(namespace)
519            .await?
520            .unwrap_or_else(|| NamespaceMetadata {
521                dimension: None,
522                vector_count: 0,
523                created_at: Self::now(),
524                updated_at: Self::now(),
525            });
526
527        // Validate dimensions
528        let first_dim = vectors[0].values.len();
529        if let Some(dim) = meta.dimension {
530            for v in &vectors {
531                if v.values.len() != dim {
532                    return Err(DakeraError::DimensionMismatch {
533                        expected: dim,
534                        actual: v.values.len(),
535                    });
536                }
537            }
538        } else {
539            meta.dimension = Some(first_dim);
540        }
541
542        // DAK-5553: parallelize per-vector write to eliminate sequential I/O bottleneck.
543        // Buffer up to s3_concurrent_ops() (default 16) concurrent writes at a time.
544        //
545        // DAK-6299: classify insert-vs-update (to maintain meta.vector_count) with O(batch)
546        // per-vector existence checks, NOT a per-batch full-namespace list(). PR#612 replaced
547        // N per-vector exists() stats with ONE operator.list(vectors_prefix), but that list
548        // returns the ENTIRE namespace = O(namespace_size) per batch => O(N^2/batch) over a
549        // full ingest, and it ran for ALL backends (incl S3, where it is a paginated LIST of
550        // every key). Full-scale measurement (DAK-6297, fs, 2675 mem, one namespace) showed
551        // the list inverted the fast path: 1.43x at 220 mem -> 0.55x at 2675 mem. The
552        // profile that motivated PR#612 (DAK-6287) showed the per-vector stat was only ~1%
553        // of upsert cost, so removing it bought ~nothing while the list it introduced is the
554        // regression. Here: on the fs fast path a cheap raw std::fs stat per file (O(batch))
555        // classifies new-vs-existing; on the OpenDAL path a per-vector exists() (O(batch)
556        // HEAD/stat, the pre-PR#612 DAK-5553 semantics) does the same. No full-namespace
557        // scan. Write-path only, recall-neutral: vector file format, the read/recall path,
558        // and the tmp+rename atomic-publish durability guarantee are all unchanged.
559        let total = vectors.len();
560
561        // DAK-6287: on the local Filesystem backend, write via raw std::fs instead of
562        // OpenDAL. Profiled on an x64 runner (storage_upsert decomposition): the per-vector
563        // cost is dominated by OpenDAL's fs-writer overhead, NOT fsync/stat/rename — raw
564        // write+fsync reaches ~5800 mem/s vs OpenDAL ~650 mem/s on the same disk. The fast
565        // path keeps the EXACT on-disk layout (namespaces/<ns>/vectors/<id>.json), atomic
566        // tmp+rename publish, and adds an explicit per-file fsync, so the read/recall path is
567        // byte-identical and durability is >= the OpenDAL path. Guarded: only taken when the
568        // namespace and every id are filesystem-safe segments (no `/`, `\`, `..`, NUL) to
569        // prevent path traversal; otherwise it falls back to the OpenDAL path below.
570        let fs_root = if Self::is_fs_safe_segment(namespace)
571            && vectors.iter().all(|v| Self::is_fs_safe_segment(&v.id))
572        {
573            self.fs_root.clone()
574        } else {
575            None
576        };
577
578        let new_inserts = if let Some(root) = fs_root {
579            // Fast path classifies inserts via a per-file stat inside the write loop.
580            self.write_vectors_fs(&root, namespace, vectors).await?
581        } else {
582            let op = self.operator.clone();
583            let ns = namespace.clone();
584            // DAK-6289: capture the write strategy once (Copy bool) so each per-vector task
585            // chooses direct-PUT vs tmp+rename without touching `self`.
586            let needs_tmp_rename = self.needs_tmp_rename;
587            let results: Vec<Result<bool>> = stream::iter(vectors)
588                .map(|vector| {
589                    let op = op.clone();
590                    let ns = ns.clone();
591                    async move {
592                        let path = ObjectStorage::vector_path(&ns, &vector.id);
593                        let stored: StoredVector = vector.into();
594                        let data = serde_json::to_vec(&stored)
595                            .map_err(|e| DakeraError::Storage(e.to_string()))?;
596
597                        // O(1) per vector: a single exists() (HEAD on S3) classifies
598                        // insert-vs-update without scanning the namespace.
599                        let exists = op
600                            .exists(&path)
601                            .await
602                            .map_err(|e| DakeraError::Storage(e.to_string()))?;
603
604                        if needs_tmp_rename {
605                            // Local filesystem fallback (only reached here for traversal-unsafe
606                            // ids; the safe-id common case takes write_vectors_fs above):
607                            // tmp+rename avoids the O_TRUNC torn-read race (DAK-4545).
608                            let seq = TMP_COUNTER.fetch_add(1, Ordering::Relaxed);
609                            let now_secs = std::time::SystemTime::now()
610                                .duration_since(std::time::UNIX_EPOCH)
611                                .unwrap_or_default()
612                                .as_secs();
613                            let tmp_path = format!("{}.tmp.{}.{}", path, now_secs, seq);
614                            let rename_result = async {
615                                op.write(&tmp_path, data.clone())
616                                    .await
617                                    .map_err(|e| DakeraError::Storage(e.to_string()))?;
618                                op.rename(&tmp_path, &path)
619                                    .await
620                                    .map_err(|e| DakeraError::Storage(e.to_string()))?;
621                                Ok::<(), DakeraError>(())
622                            }
623                            .await;
624                            if let Err(e) = rename_result {
625                                tracing::debug!(
626                                    path = %path,
627                                    error = %e,
628                                    "atomic rename failed, falling back to direct write"
629                                );
630                                let _ = op.delete(&tmp_path).await;
631                                op.write(&path, data)
632                                    .await
633                                    .map_err(|e| DakeraError::Storage(e.to_string()))?;
634                            }
635                        } else {
636                            // DAK-6289 hot path: S3/Azure/GCS/Memory `PUT` is atomic and
637                            // read-after-write consistent. Write direct to the final key.
638                            // The old tmp+rename always degraded to PUT(tmp)+DELETE(tmp)+
639                            // PUT(final) on S3 (OpenDAL rename => Unsupported → fallback),
640                            // i.e. the payload was PUT twice. Direct write is one PUT with
641                            // identical atomicity. Recall-neutral: same key, same bytes.
642                            op.write(&path, data)
643                                .await
644                                .map_err(|e| DakeraError::Storage(e.to_string()))?;
645                        }
646
647                        Ok::<bool, DakeraError>(!exists)
648                    }
649                })
650                .buffer_unordered(s3_concurrent_ops())
651                .collect()
652                .await;
653
654            let mut count = 0usize;
655            for r in results {
656                if r? {
657                    count += 1;
658                }
659            }
660            count
661        };
662
663        meta.vector_count += new_inserts;
664        meta.updated_at = Self::now();
665        self.write_namespace_meta(namespace, &meta).await?;
666
667        tracing::debug!(
668            namespace = namespace,
669            upserted = total,
670            "Upserted vectors to object storage"
671        );
672
673        Ok(total)
674    }
675
676    async fn get(&self, namespace: &NamespaceId, ids: &[VectorId]) -> Result<Vec<Vector>> {
677        if ids.is_empty() {
678            return Ok(Vec::new());
679        }
680
681        let now = std::time::SystemTime::now()
682            .duration_since(std::time::UNIX_EPOCH)
683            .unwrap_or_default()
684            .as_secs();
685
686        let read_tasks: Vec<_> = ids
687            .iter()
688            .map(|id| {
689                let operator = self.operator.clone();
690                let path = Self::vector_path(namespace, id);
691                let id = id.clone();
692                async move {
693                    match operator.read(&path).await {
694                        Ok(data) => {
695                            let bytes = data.to_vec();
696                            if bytes.is_empty() {
697                                tracing::warn!(
698                                    vector_id = %id,
699                                    "Empty vector file detected, skipping"
700                                );
701                                return Ok(None);
702                            }
703                            match serde_json::from_slice::<StoredVector>(&bytes) {
704                                Ok(stored) => {
705                                    let vector: Vector = stored.into();
706                                    if !vector.is_expired_at(now) {
707                                        Ok(Some(vector))
708                                    } else {
709                                        Ok(None)
710                                    }
711                                }
712                                Err(e) => {
713                                    tracing::warn!(
714                                        vector_id = %id,
715                                        error = %e,
716                                        bytes_len = bytes.len(),
717                                        "Corrupted vector file detected, skipping"
718                                    );
719                                    Ok(None)
720                                }
721                            }
722                        }
723                        Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(None),
724                        Err(e) => Err(DakeraError::Storage(e.to_string())),
725                    }
726                }
727            })
728            .collect();
729
730        let results: Vec<Result<Option<Vector>>> = stream::iter(read_tasks)
731            .buffer_unordered(s3_concurrent_ops())
732            .collect()
733            .await;
734
735        let mut vectors = Vec::with_capacity(ids.len());
736        for result in results {
737            if let Some(v) = result? {
738                vectors.push(v);
739            }
740        }
741        Ok(vectors)
742    }
743
744    async fn get_all(&self, namespace: &NamespaceId) -> Result<Vec<Vector>> {
745        let prefix = Self::namespace_vectors_prefix(namespace);
746
747        let entries = self
748            .operator
749            .list(&prefix)
750            .await
751            .map_err(|e| DakeraError::Storage(e.to_string()))?;
752
753        let json_paths: Vec<String> = entries
754            .into_iter()
755            .filter(|e| e.path().ends_with(".json"))
756            .map(|e| e.path().to_string())
757            .collect();
758
759        if json_paths.is_empty() {
760            return Ok(Vec::new());
761        }
762
763        let now = std::time::SystemTime::now()
764            .duration_since(std::time::UNIX_EPOCH)
765            .unwrap_or_default()
766            .as_secs();
767
768        let results: Vec<Option<Vector>> = stream::iter(json_paths.into_iter().map(|path| {
769            let operator = self.operator.clone();
770            async move {
771                match operator.read(&path).await {
772                    Ok(data) => {
773                        let bytes = data.to_vec();
774                        if let Ok(stored) = serde_json::from_slice::<StoredVector>(&bytes) {
775                            let vector: Vector = stored.into();
776                            if !vector.is_expired_at(now) {
777                                return Some(vector);
778                            }
779                        }
780                        None
781                    }
782                    Err(e) => {
783                        tracing::warn!(path = %path, error = %e, "Failed to read vector");
784                        None
785                    }
786                }
787            }
788        }))
789        .buffer_unordered(s3_concurrent_ops())
790        .collect()
791        .await;
792
793        Ok(results.into_iter().flatten().collect())
794    }
795
796    async fn delete(&self, namespace: &NamespaceId, ids: &[VectorId]) -> Result<usize> {
797        if ids.is_empty() {
798            return Ok(0);
799        }
800
801        let delete_tasks: Vec<_> = ids
802            .iter()
803            .map(|id| {
804                let operator = self.operator.clone();
805                let path = Self::vector_path(namespace, id);
806                async move {
807                    let exists = operator
808                        .exists(&path)
809                        .await
810                        .map_err(|e| DakeraError::Storage(e.to_string()))?;
811                    if exists {
812                        match operator.delete(&path).await {
813                            Ok(_) => Ok(true),
814                            Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(false),
815                            Err(e) => Err(DakeraError::Storage(e.to_string())),
816                        }
817                    } else {
818                        Ok(false)
819                    }
820                }
821            })
822            .collect();
823
824        let results: Vec<Result<bool>> = stream::iter(delete_tasks)
825            .buffer_unordered(s3_concurrent_ops())
826            .collect()
827            .await;
828
829        let mut deleted = 0;
830        for result in results {
831            if result? {
832                deleted += 1;
833            }
834        }
835
836        // Update metadata
837        if deleted > 0 {
838            if let Some(mut meta) = self.read_namespace_meta(namespace).await? {
839                meta.vector_count = meta.vector_count.saturating_sub(deleted);
840                meta.updated_at = Self::now();
841                self.write_namespace_meta(namespace, &meta).await?;
842            }
843        }
844
845        Ok(deleted)
846    }
847
848    async fn namespace_exists(&self, namespace: &NamespaceId) -> Result<bool> {
849        Ok(self.read_namespace_meta(namespace).await?.is_some())
850    }
851
852    async fn ensure_namespace(&self, namespace: &NamespaceId) -> Result<()> {
853        if self.read_namespace_meta(namespace).await?.is_none() {
854            let meta = NamespaceMetadata {
855                dimension: None,
856                vector_count: 0,
857                created_at: Self::now(),
858                updated_at: Self::now(),
859            };
860            self.write_namespace_meta(namespace, &meta).await?;
861        }
862        Ok(())
863    }
864
865    async fn dimension(&self, namespace: &NamespaceId) -> Result<Option<usize>> {
866        Ok(self
867            .read_namespace_meta(namespace)
868            .await?
869            .and_then(|m| m.dimension))
870    }
871
872    async fn count(&self, namespace: &NamespaceId) -> Result<usize> {
873        Ok(self
874            .read_namespace_meta(namespace)
875            .await?
876            .map(|m| m.vector_count)
877            .unwrap_or(0))
878    }
879
880    async fn list_namespaces(&self) -> Result<Vec<NamespaceId>> {
881        let entries = self
882            .operator
883            .list("namespaces/")
884            .await
885            .map_err(|e| DakeraError::Storage(e.to_string()))?;
886
887        let mut namespaces = Vec::new();
888        for entry in entries {
889            let path = entry.path();
890            // Extract namespace name from path like "namespaces/myns/"
891            if let Some(ns) = path.strip_prefix("namespaces/") {
892                let ns = ns.trim_end_matches('/');
893                if !ns.is_empty() && !ns.contains('/') {
894                    // Only include namespaces that actually have metadata (meta.json).
895                    // This filters out ghost directory entries left behind after
896                    // namespace deletion on backends where empty directories persist
897                    // (e.g., local filesystem, some S3-compatible stores).
898                    if self.read_namespace_meta(ns).await?.is_some() {
899                        namespaces.push(ns.to_string());
900                    }
901                }
902            }
903        }
904
905        Ok(namespaces)
906    }
907
908    async fn delete_namespace(&self, namespace: &NamespaceId) -> Result<bool> {
909        // Check if namespace exists
910        if !self.namespace_exists(namespace).await? {
911            return Ok(false);
912        }
913
914        // Recursively remove everything under the namespace prefix.
915        // This deletes vectors, metadata, indexes, AND directory entries,
916        // preventing ghost namespaces from appearing in list_namespaces().
917        let prefix = format!("namespaces/{}/", namespace);
918        self.operator
919            .delete_with(&prefix)
920            .recursive(true)
921            .await
922            .map_err(|e| DakeraError::Storage(e.to_string()))?;
923
924        tracing::debug!(
925            namespace = namespace,
926            "Deleted namespace from object storage"
927        );
928
929        Ok(true)
930    }
931
932    async fn cleanup_expired(&self, _namespace: &NamespaceId) -> Result<usize> {
933        // Object storage doesn't track TTL internally - cleanup handled by application layer
934        // StoredVector doesn't persist TTL fields, so nothing to clean up here
935        Ok(0)
936    }
937
938    async fn cleanup_all_expired(&self) -> Result<usize> {
939        // Object storage doesn't track TTL internally - cleanup handled by application layer
940        Ok(0)
941    }
942}
943
944#[async_trait]
945impl IndexStorage for ObjectStorage {
946    async fn save_index(
947        &self,
948        namespace: &NamespaceId,
949        index_type: IndexType,
950        data: Vec<u8>,
951    ) -> Result<()> {
952        let path = format!(
953            "namespaces/{}/indexes/{}.bin",
954            namespace,
955            index_type.as_str()
956        );
957        self.operator
958            .write(&path, data)
959            .await
960            .map_err(|e| DakeraError::Storage(e.to_string()))?;
961
962        tracing::debug!(
963            namespace = namespace,
964            index_type = index_type.as_str(),
965            "Saved index to object storage"
966        );
967        Ok(())
968    }
969
970    async fn load_index(
971        &self,
972        namespace: &NamespaceId,
973        index_type: IndexType,
974    ) -> Result<Option<Vec<u8>>> {
975        let path = format!(
976            "namespaces/{}/indexes/{}.bin",
977            namespace,
978            index_type.as_str()
979        );
980        match self.operator.read(&path).await {
981            Ok(data) => {
982                tracing::debug!(
983                    namespace = namespace,
984                    index_type = index_type.as_str(),
985                    size = data.len(),
986                    "Loaded index from object storage"
987                );
988                Ok(Some(data.to_vec()))
989            }
990            Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(None),
991            Err(e) => Err(DakeraError::Storage(e.to_string())),
992        }
993    }
994
995    async fn delete_index(&self, namespace: &NamespaceId, index_type: IndexType) -> Result<bool> {
996        let path = format!(
997            "namespaces/{}/indexes/{}.bin",
998            namespace,
999            index_type.as_str()
1000        );
1001        let exists = self
1002            .operator
1003            .exists(&path)
1004            .await
1005            .map_err(|e| DakeraError::Storage(e.to_string()))?;
1006
1007        if exists {
1008            self.operator
1009                .delete(&path)
1010                .await
1011                .map_err(|e| DakeraError::Storage(e.to_string()))?;
1012            tracing::debug!(
1013                namespace = namespace,
1014                index_type = index_type.as_str(),
1015                "Deleted index from object storage"
1016            );
1017        }
1018        Ok(exists)
1019    }
1020
1021    async fn index_exists(&self, namespace: &NamespaceId, index_type: IndexType) -> Result<bool> {
1022        let path = format!(
1023            "namespaces/{}/indexes/{}.bin",
1024            namespace,
1025            index_type.as_str()
1026        );
1027        self.operator
1028            .exists(&path)
1029            .await
1030            .map_err(|e| DakeraError::Storage(e.to_string()))
1031    }
1032
1033    async fn list_indexes(&self, namespace: &NamespaceId) -> Result<Vec<IndexType>> {
1034        let prefix = format!("namespaces/{}/indexes/", namespace);
1035        let entries = self
1036            .operator
1037            .list(&prefix)
1038            .await
1039            .map_err(|e| DakeraError::Storage(e.to_string()))?;
1040
1041        let mut indexes = Vec::new();
1042        for entry in entries {
1043            let path = entry.path();
1044            if path.ends_with(".bin") {
1045                // Extract index type from filename
1046                if let Some(filename) = path.strip_prefix(&prefix) {
1047                    let name = filename.trim_end_matches(".bin");
1048                    match name {
1049                        "hnsw" => indexes.push(IndexType::Hnsw),
1050                        "pq" => indexes.push(IndexType::Pq),
1051                        "ivf" => indexes.push(IndexType::Ivf),
1052                        "spfresh" => indexes.push(IndexType::SpFresh),
1053                        "fulltext" => indexes.push(IndexType::FullText),
1054                        _ => {} // Ignore unknown index types
1055                    }
1056                }
1057            }
1058        }
1059
1060        Ok(indexes)
1061    }
1062}
1063
1064/// Create an OpenDAL operator from configuration without constructing a full ObjectStorage.
1065/// Useful for lightweight S3 access (e.g., BackupManager metadata persistence).
1066pub fn create_operator(config: &ObjectStorageConfig) -> Result<Operator> {
1067    ObjectStorage::build_operator(config)
1068}
1069
1070#[cfg(test)]
1071mod tests {
1072    use super::*;
1073
1074    #[tokio::test]
1075    async fn test_object_storage_memory() {
1076        let storage = ObjectStorage::memory().unwrap();
1077        let namespace = "test".to_string();
1078
1079        // Ensure namespace
1080        storage.ensure_namespace(&namespace).await.unwrap();
1081        assert!(storage.namespace_exists(&namespace).await.unwrap());
1082
1083        // Insert vectors
1084        let vectors = vec![
1085            Vector {
1086                id: "v1".to_string(),
1087                values: vec![1.0, 2.0, 3.0],
1088                metadata: None,
1089                ttl_seconds: None,
1090                expires_at: None,
1091            },
1092            Vector {
1093                id: "v2".to_string(),
1094                values: vec![4.0, 5.0, 6.0],
1095                metadata: Some(serde_json::json!({"key": "value"})),
1096                ttl_seconds: None,
1097                expires_at: None,
1098            },
1099        ];
1100
1101        let count = storage.upsert(&namespace, vectors).await.unwrap();
1102        assert_eq!(count, 2);
1103
1104        // Get single vector
1105        let results = storage.get(&namespace, &["v1".to_string()]).await.unwrap();
1106        assert_eq!(results.len(), 1);
1107        assert_eq!(results[0].id, "v1");
1108        assert_eq!(results[0].values, vec![1.0, 2.0, 3.0]);
1109
1110        // Get all vectors
1111        let all = storage.get_all(&namespace).await.unwrap();
1112        assert_eq!(all.len(), 2);
1113
1114        // Count
1115        assert_eq!(storage.count(&namespace).await.unwrap(), 2);
1116
1117        // Delete
1118        let deleted = storage
1119            .delete(&namespace, &["v1".to_string()])
1120            .await
1121            .unwrap();
1122        assert_eq!(deleted, 1);
1123        assert!(storage
1124            .get(&namespace, &["v1".to_string()])
1125            .await
1126            .unwrap()
1127            .is_empty());
1128        assert_eq!(storage.count(&namespace).await.unwrap(), 1);
1129    }
1130
1131    #[tokio::test]
1132    async fn test_object_storage_dimension_mismatch() {
1133        let storage = ObjectStorage::memory().unwrap();
1134        let namespace = "test".to_string();
1135        storage.ensure_namespace(&namespace).await.unwrap();
1136
1137        // Insert first vector
1138        let v1 = vec![Vector {
1139            id: "v1".to_string(),
1140            values: vec![1.0, 2.0, 3.0],
1141            metadata: None,
1142            ttl_seconds: None,
1143            expires_at: None,
1144        }];
1145        storage.upsert(&namespace, v1).await.unwrap();
1146
1147        // Try to insert vector with different dimension
1148        let v2 = vec![Vector {
1149            id: "v2".to_string(),
1150            values: vec![1.0, 2.0], // Wrong dimension
1151            metadata: None,
1152            ttl_seconds: None,
1153            expires_at: None,
1154        }];
1155        let result = storage.upsert(&namespace, v2).await;
1156        assert!(result.is_err());
1157    }
1158
1159    #[tokio::test]
1160    async fn test_object_storage_upsert() {
1161        let storage = ObjectStorage::memory().unwrap();
1162        let namespace = "test".to_string();
1163        storage.ensure_namespace(&namespace).await.unwrap();
1164
1165        // Insert
1166        let v1 = vec![Vector {
1167            id: "v1".to_string(),
1168            values: vec![1.0, 2.0],
1169            metadata: None,
1170            ttl_seconds: None,
1171            expires_at: None,
1172        }];
1173        storage.upsert(&namespace, v1).await.unwrap();
1174
1175        // Upsert (update)
1176        let v1_updated = vec![Vector {
1177            id: "v1".to_string(),
1178            values: vec![3.0, 4.0],
1179            metadata: None,
1180            ttl_seconds: None,
1181            expires_at: None,
1182        }];
1183        storage.upsert(&namespace, v1_updated).await.unwrap();
1184
1185        // Verify update
1186        let results = storage.get(&namespace, &["v1".to_string()]).await.unwrap();
1187        assert_eq!(results.len(), 1);
1188        assert_eq!(results[0].values, vec![3.0, 4.0]);
1189
1190        // Count should still be 1
1191        assert_eq!(storage.count(&namespace).await.unwrap(), 1);
1192    }
1193
1194    #[tokio::test]
1195    async fn test_index_storage() {
1196        let storage = ObjectStorage::memory().unwrap();
1197        let namespace = "test_index".to_string();
1198
1199        // Initially no indexes
1200        assert!(!storage
1201            .index_exists(&namespace, IndexType::Hnsw)
1202            .await
1203            .unwrap());
1204
1205        // Save an index
1206        let index_data = b"fake hnsw index data for testing".to_vec();
1207        storage
1208            .save_index(&namespace, IndexType::Hnsw, index_data.clone())
1209            .await
1210            .unwrap();
1211
1212        // Check it exists
1213        assert!(storage
1214            .index_exists(&namespace, IndexType::Hnsw)
1215            .await
1216            .unwrap());
1217        assert!(!storage
1218            .index_exists(&namespace, IndexType::Pq)
1219            .await
1220            .unwrap());
1221
1222        // Load it back
1223        let loaded = storage
1224            .load_index(&namespace, IndexType::Hnsw)
1225            .await
1226            .unwrap();
1227        assert!(loaded.is_some());
1228        assert_eq!(loaded.unwrap(), index_data);
1229
1230        // Save another index type
1231        let pq_data = b"fake pq index data".to_vec();
1232        storage
1233            .save_index(&namespace, IndexType::Pq, pq_data)
1234            .await
1235            .unwrap();
1236
1237        // List indexes
1238        let indexes = storage.list_indexes(&namespace).await.unwrap();
1239        assert_eq!(indexes.len(), 2);
1240        assert!(indexes.contains(&IndexType::Hnsw));
1241        assert!(indexes.contains(&IndexType::Pq));
1242
1243        // Delete index
1244        let deleted = storage
1245            .delete_index(&namespace, IndexType::Hnsw)
1246            .await
1247            .unwrap();
1248        assert!(deleted);
1249        assert!(!storage
1250            .index_exists(&namespace, IndexType::Hnsw)
1251            .await
1252            .unwrap());
1253
1254        // Delete non-existent
1255        let deleted = storage
1256            .delete_index(&namespace, IndexType::Hnsw)
1257            .await
1258            .unwrap();
1259        assert!(!deleted);
1260
1261        // Load non-existent
1262        let loaded = storage
1263            .load_index(&namespace, IndexType::Hnsw)
1264            .await
1265            .unwrap();
1266        assert!(loaded.is_none());
1267    }
1268
1269    // ── DAK-5553: parallel upsert correctness ─────────────────────────────────
1270
1271    fn make_vector(id: &str, dim: usize) -> Vector {
1272        Vector {
1273            id: id.to_string(),
1274            values: vec![0.1; dim],
1275            metadata: None,
1276            ttl_seconds: None,
1277            expires_at: None,
1278        }
1279    }
1280
1281    #[tokio::test]
1282    async fn test_upsert_batch_parallel_all_new() {
1283        let storage = ObjectStorage::memory().unwrap();
1284        let ns = "batch_all_new".to_string();
1285        storage.ensure_namespace(&ns).await.unwrap();
1286
1287        // Insert 50 unique vectors in one batch call
1288        let vectors: Vec<Vector> = (0..50).map(|i| make_vector(&format!("v{i}"), 4)).collect();
1289        let count = storage.upsert(&ns, vectors).await.unwrap();
1290
1291        assert_eq!(count, 50);
1292        assert_eq!(storage.count(&ns).await.unwrap(), 50);
1293    }
1294
1295    #[tokio::test]
1296    async fn test_upsert_batch_parallel_idempotent_count() {
1297        let storage = ObjectStorage::memory().unwrap();
1298        let ns = "batch_idempotent".to_string();
1299        storage.ensure_namespace(&ns).await.unwrap();
1300
1301        let vectors: Vec<Vector> = (0..10).map(|i| make_vector(&format!("v{i}"), 4)).collect();
1302        storage.upsert(&ns, vectors.clone()).await.unwrap();
1303
1304        // Upsert the same IDs again — vector_count must not double
1305        storage.upsert(&ns, vectors).await.unwrap();
1306        assert_eq!(storage.count(&ns).await.unwrap(), 10);
1307    }
1308
1309    #[tokio::test]
1310    async fn test_upsert_batch_parallel_empty() {
1311        let storage = ObjectStorage::memory().unwrap();
1312        let ns = "batch_empty".to_string();
1313        storage.ensure_namespace(&ns).await.unwrap();
1314
1315        let count = storage.upsert(&ns, vec![]).await.unwrap();
1316        assert_eq!(count, 0);
1317        assert_eq!(storage.count(&ns).await.unwrap(), 0);
1318    }
1319
1320    #[tokio::test]
1321    async fn test_upsert_batch_parallel_large_batch() {
1322        // Verify that a batch exceeding the default concurrency window (16) processes all items
1323        let storage = ObjectStorage::memory().unwrap();
1324        let ns = "batch_large".to_string();
1325        storage.ensure_namespace(&ns).await.unwrap();
1326
1327        let vectors: Vec<Vector> = (0..200).map(|i| make_vector(&format!("v{i}"), 8)).collect();
1328        let count = storage.upsert(&ns, vectors).await.unwrap();
1329
1330        assert_eq!(count, 200);
1331        assert_eq!(storage.count(&ns).await.unwrap(), 200);
1332    }
1333
1334    // DAK-6287: the per-vector exists() stat was replaced by a single directory list()
1335    // to classify insert-vs-update for vector_count. Prove a batch mixing updates of
1336    // existing ids with new inserts produces the exact count (only new ids increment).
1337    #[tokio::test]
1338    async fn test_upsert_count_mixed_insert_update_batch() {
1339        let storage = ObjectStorage::memory().unwrap();
1340        let ns = "mixed".to_string();
1341        storage.ensure_namespace(&ns).await.unwrap();
1342
1343        // Seed two vectors.
1344        storage
1345            .upsert(&ns, vec![make_vector("v0", 4), make_vector("v1", 4)])
1346            .await
1347            .unwrap();
1348        assert_eq!(storage.count(&ns).await.unwrap(), 2);
1349
1350        // Batch updates v1 (existing) and inserts v2, v3 (new) => count must be 4, not 5.
1351        storage
1352            .upsert(
1353                &ns,
1354                vec![
1355                    make_vector("v1", 4),
1356                    make_vector("v2", 4),
1357                    make_vector("v3", 4),
1358                ],
1359            )
1360            .await
1361            .unwrap();
1362        assert_eq!(storage.count(&ns).await.unwrap(), 4);
1363
1364        // Re-upserting only existing ids must not change the count.
1365        storage
1366            .upsert(&ns, vec![make_vector("v0", 4), make_vector("v3", 4)])
1367            .await
1368            .unwrap();
1369        assert_eq!(storage.count(&ns).await.unwrap(), 4);
1370    }
1371
1372    // DAK-6287: exercise the raw-fs fast write path (real Filesystem backend) end to end —
1373    // it must produce the same on-disk layout the OpenDAL read path expects, keep exact
1374    // count semantics, and safely fall back for traversal-unsafe ids without escaping root.
1375    #[tokio::test]
1376    async fn test_upsert_fs_fast_path_roundtrip() {
1377        let tmp = tempfile::tempdir().unwrap();
1378        let storage = ObjectStorage::filesystem(tmp.path().to_str().unwrap()).unwrap();
1379        let ns = "fsfast".to_string();
1380        storage.ensure_namespace(&ns).await.unwrap();
1381
1382        // Insert via the fast path.
1383        storage
1384            .upsert(
1385                &ns,
1386                vec![
1387                    make_vector("a", 4),
1388                    make_vector("b", 4),
1389                    make_vector("c", 4),
1390                ],
1391            )
1392            .await
1393            .unwrap();
1394        assert_eq!(storage.count(&ns).await.unwrap(), 3);
1395
1396        // Read back through OpenDAL: proves the fast-path files are where the read path looks.
1397        let got = storage
1398            .get(&ns, &["a".to_string(), "b".to_string()])
1399            .await
1400            .unwrap();
1401        assert_eq!(got.len(), 2);
1402
1403        // Update b (existing) + insert d (new) in one batch => count 4, not 5.
1404        storage
1405            .upsert(&ns, vec![make_vector("b", 4), make_vector("d", 4)])
1406            .await
1407            .unwrap();
1408        assert_eq!(storage.count(&ns).await.unwrap(), 4);
1409
1410        // Guard: a batch containing a traversal-unsafe id must not panic or escape the
1411        // storage root; it falls back to the OpenDAL path and the safe sibling persists.
1412        storage
1413            .upsert(
1414                &ns,
1415                vec![make_vector("safe1", 4), make_vector("../escape", 4)],
1416            )
1417            .await
1418            .unwrap();
1419        let got = storage.get(&ns, &["safe1".to_string()]).await.unwrap();
1420        assert_eq!(got.len(), 1);
1421        // Nothing was written above the storage root.
1422        assert!(!tmp.path().parent().unwrap().join("escape.json").exists());
1423    }
1424
1425    // DAK-6299: ingest pattern — many small batches into ONE growing namespace. This is the
1426    // O(N^2) scenario the removed per-batch full-namespace list() created. vector_count must
1427    // stay exact when classified incrementally per batch (fs fast path: per-file stat).
1428    // Each batch overlaps the previous one by one id (an update), so only the genuinely-new
1429    // id may increment the count.
1430    #[tokio::test]
1431    async fn test_upsert_incremental_count_growing_namespace_fs() {
1432        let tmp = tempfile::tempdir().unwrap();
1433        let storage = ObjectStorage::filesystem(tmp.path().to_str().unwrap()).unwrap();
1434        let ns = "grow".to_string();
1435        storage.ensure_namespace(&ns).await.unwrap();
1436
1437        const BATCHES: usize = 40;
1438        for b in 0..BATCHES {
1439            // Batch b writes ids {b, b+1}: id `b` is an update of the prior batch's new id,
1440            // id `b+1` is the only genuinely new vector => count grows by exactly 1 per batch.
1441            let new_id = format!("v{}", b + 1);
1442            let upd_id = format!("v{}", b);
1443            storage
1444                .upsert(&ns, vec![make_vector(&upd_id, 4), make_vector(&new_id, 4)])
1445                .await
1446                .unwrap();
1447            // After batch b we have ids v0..=v(b+1) => b+2 distinct vectors.
1448            assert_eq!(storage.count(&ns).await.unwrap(), b + 2);
1449        }
1450        assert_eq!(storage.count(&ns).await.unwrap(), BATCHES + 1);
1451    }
1452
1453    // DAK-6299: same incremental-count contract on the OpenDAL path (Memory backend), which
1454    // classifies inserts via a per-vector exists() rather than the fs stat.
1455    #[tokio::test]
1456    async fn test_upsert_incremental_count_growing_namespace_opendal() {
1457        let storage = ObjectStorage::memory().unwrap();
1458        let ns = "grow-od".to_string();
1459        storage.ensure_namespace(&ns).await.unwrap();
1460
1461        const BATCHES: usize = 30;
1462        for b in 0..BATCHES {
1463            let new_id = format!("v{}", b + 1);
1464            let upd_id = format!("v{}", b);
1465            storage
1466                .upsert(&ns, vec![make_vector(&upd_id, 4), make_vector(&new_id, 4)])
1467                .await
1468                .unwrap();
1469            assert_eq!(storage.count(&ns).await.unwrap(), b + 2);
1470        }
1471        assert_eq!(storage.count(&ns).await.unwrap(), BATCHES + 1);
1472    }
1473
1474    // ── DAK-6289: S3/object-store direct-write path ───────────────────────────
1475
1476    // The write strategy is selected once at construction. Only the local filesystem
1477    // needs the tmp+rename O_TRUNC guard; memory and every object store get the
1478    // direct-PUT fast path (no COPY+DELETE on S3).
1479    #[test]
1480    fn test_write_strategy_selection() {
1481        // Object stores + memory → direct write (atomic PUT, no COPY+DELETE).
1482        assert!(!ObjectStorage::needs_tmp_rename_for(
1483            &ObjectStorageConfig::Memory
1484        ));
1485        assert!(!ObjectStorage::needs_tmp_rename_for(
1486            &ObjectStorageConfig::S3 {
1487                bucket: "dakera".to_string(),
1488                region: None,
1489                endpoint: None,
1490                access_key_id: None,
1491                secret_access_key: None,
1492            }
1493        ));
1494        assert!(!ObjectStorage::needs_tmp_rename_for(
1495            &ObjectStorageConfig::Azure {
1496                container: "c".to_string(),
1497                account_name: "acct".to_string(),
1498                account_key: None,
1499                sas_token: None,
1500                endpoint: None,
1501            }
1502        ));
1503        assert!(!ObjectStorage::needs_tmp_rename_for(
1504            &ObjectStorageConfig::Gcs {
1505                bucket: "dakera".to_string(),
1506                credential_path: None,
1507                endpoint: None,
1508            }
1509        ));
1510        // Local filesystem → tmp+rename (O_TRUNC torn-read guard).
1511        assert!(ObjectStorage::needs_tmp_rename_for(
1512            &ObjectStorageConfig::Filesystem {
1513                root: "/tmp/x".to_string(),
1514            }
1515        ));
1516        // Constructed instances agree with the classifier.
1517        assert!(!ObjectStorage::memory().unwrap().needs_tmp_rename);
1518        let tmp = tempfile::tempdir().unwrap();
1519        assert!(
1520            ObjectStorage::filesystem(tmp.path().to_str().unwrap())
1521                .unwrap()
1522                .needs_tmp_rename
1523        );
1524    }
1525
1526    // End-to-end exercise of the direct-write branch (needs_tmp_rename == false) on a
1527    // non-filesystem backend: insert, overwrite-in-place, and a mixed insert/update
1528    // batch must all round-trip with exact count semantics — proving the removal of
1529    // tmp+rename keeps the read/recall path and vector_count classification intact.
1530    #[tokio::test]
1531    async fn test_upsert_direct_write_path_roundtrip() {
1532        let storage = ObjectStorage::memory().unwrap();
1533        assert!(!storage.needs_tmp_rename, "memory must take direct path");
1534        let ns = "direct".to_string();
1535        storage.ensure_namespace(&ns).await.unwrap();
1536
1537        // Insert two new vectors via the direct path.
1538        storage
1539            .upsert(&ns, vec![make_vector("a", 4), make_vector("b", 4)])
1540            .await
1541            .unwrap();
1542        assert_eq!(storage.count(&ns).await.unwrap(), 2);
1543
1544        // Overwrite `a` in place with new values (direct PUT to the final key, no rename).
1545        let a_updated = Vector {
1546            id: "a".to_string(),
1547            values: vec![9.0, 9.0, 9.0, 9.0],
1548            metadata: None,
1549            ttl_seconds: None,
1550            expires_at: None,
1551        };
1552        storage.upsert(&ns, vec![a_updated]).await.unwrap();
1553        let got = storage.get(&ns, &["a".to_string()]).await.unwrap();
1554        assert_eq!(got.len(), 1);
1555        assert_eq!(got[0].values, vec![9.0, 9.0, 9.0, 9.0]);
1556        // Overwrite must NOT increment the count.
1557        assert_eq!(storage.count(&ns).await.unwrap(), 2);
1558
1559        // Mixed batch: update `b`, insert `c` → count 3, not 4.
1560        storage
1561            .upsert(&ns, vec![make_vector("b", 4), make_vector("c", 4)])
1562            .await
1563            .unwrap();
1564        assert_eq!(storage.count(&ns).await.unwrap(), 3);
1565    }
1566
1567    // DAK-6287: per-sub-stage profile of ObjectStorage::upsert on the fs backend.
1568    //
1569    // Post-tier (DAKERA_TIERED=1, embedding deferred) the per-stage ingest profile
1570    // (PR#609) names storage_upsert as the dominant stage (83.6% of the tiered
1571    // pipeline). This test decomposes that stage into its three per-vector fs ops
1572    // (exists stat / write-tmp / rename) plus candidate fixes, all at the production
1573    // concurrency window, so the dominant SUB-stage is provable rather than assumed.
1574    //
1575    // Ignored by default (writes thousands of files); run explicitly:
1576    //   cargo test -p dakera-storage --release profile_upsert_substages -- --ignored --nocapture
1577    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1578    #[ignore]
1579    async fn profile_upsert_substages() {
1580        use std::time::Instant;
1581        const N: usize = 200;
1582        const DIM: usize = 1024; // bge-large-en-v1.5
1583        let conc = s3_concurrent_ops();
1584
1585        // Realistic payload: 1024-d vector + LoCoMo-shaped metadata.
1586        let make = |i: usize| -> Vector {
1587            Vector {
1588                id: format!("prof-{i}"),
1589                values: vec![0.0123_f32; DIM],
1590                metadata: Some(serde_json::json!({
1591                    "_embedding_kind": "static",
1592                    "tags": ["dia_1", "speaker_a", "session_3", "2026-06-03"],
1593                    "importance": 0.7,
1594                    "session_id": format!("sess-{}", i % 8),
1595                })),
1596                ttl_seconds: None,
1597                expires_at: None,
1598            }
1599        };
1600        let vectors: Vec<Vector> = (0..N).map(make).collect();
1601        let payload_bytes = serde_json::to_vec(&StoredVector::from(vectors[0].clone()))
1602            .unwrap()
1603            .len();
1604
1605        // --- V0: authoritative current upsert() (real code path) ---
1606        let tmp0 = tempfile::tempdir().unwrap();
1607        let s0 = ObjectStorage::filesystem(tmp0.path().to_str().unwrap()).unwrap();
1608        let ns = "prof".to_string();
1609        s0.ensure_namespace(&ns).await.unwrap();
1610        let t = Instant::now();
1611        let n = s0.upsert(&ns, vectors.clone()).await.unwrap();
1612        let v0_ms = t.elapsed().as_secs_f64() * 1000.0;
1613        assert_eq!(n, N);
1614
1615        // Helper: run a per-vector closure over a fresh fs operator at `conc` and time it.
1616        async fn timed<F, Fut>(label: &str, n: usize, conc: usize, f: F) -> f64
1617        where
1618            F: Fn(usize, Operator) -> Fut,
1619            Fut: std::future::Future<Output = ()>,
1620        {
1621            let tmp = tempfile::tempdir().unwrap();
1622            let op = ObjectStorage::filesystem(tmp.path().to_str().unwrap())
1623                .unwrap()
1624                .operator;
1625            let t = Instant::now();
1626            stream::iter(0..n)
1627                .map(|i| {
1628                    let op = op.clone();
1629                    f(i, op)
1630                })
1631                .buffer_unordered(conc)
1632                .collect::<Vec<()>>()
1633                .await;
1634            let ms = t.elapsed().as_secs_f64() * 1000.0;
1635            // keep tmp alive until after timing
1636            drop(tmp);
1637            let _ = label;
1638            ms
1639        }
1640
1641        let data: Vec<Vec<u8>> = vectors
1642            .iter()
1643            .map(|v| serde_json::to_vec(&StoredVector::from(v.clone())).unwrap())
1644            .collect();
1645        let path_of = |i: usize| ObjectStorage::vector_path(&ns, &format!("prof-{i}"));
1646
1647        // P_exists: 200 stats only (the exists() round-trip upsert does per vector).
1648        let d = data.clone();
1649        let p_exists = timed("exists", N, conc, move |i, op| {
1650            let path = path_of(i);
1651            let _ = &d;
1652            async move {
1653                let _ = op.exists(&path).await;
1654            }
1655        })
1656        .await;
1657
1658        // P_write_direct: 200 direct writes (no tmp, no rename, no exists).
1659        let d = data.clone();
1660        let p_write_direct = timed("write_direct", N, conc, move |i, op| {
1661            let path = path_of(i);
1662            let bytes = d[i].clone();
1663            async move {
1664                op.write(&path, bytes).await.unwrap();
1665            }
1666        })
1667        .await;
1668
1669        // P_write_tmp_rename: 200 × (write tmp + rename) — the atomic write, no exists.
1670        let d = data.clone();
1671        let p_tmp_rename = timed("write_tmp_rename", N, conc, move |i, op| {
1672            let path = path_of(i);
1673            let bytes = d[i].clone();
1674            async move {
1675                let seq = TMP_COUNTER.fetch_add(1, Ordering::Relaxed);
1676                let tmp_path = format!("{path}.tmp.{seq}");
1677                op.write(&tmp_path, bytes).await.unwrap();
1678                op.rename(&tmp_path, &path).await.unwrap();
1679            }
1680        })
1681        .await;
1682
1683        // P_full: exists + write tmp + rename (mirror of current upsert per-vector body).
1684        let d = data.clone();
1685        let p_full = timed("exists+tmp+rename", N, conc, move |i, op| {
1686            let path = path_of(i);
1687            let bytes = d[i].clone();
1688            async move {
1689                let _ = op.exists(&path).await;
1690                let seq = TMP_COUNTER.fetch_add(1, Ordering::Relaxed);
1691                let tmp_path = format!("{path}.tmp.{seq}");
1692                op.write(&tmp_path, bytes).await.unwrap();
1693                op.rename(&tmp_path, &path).await.unwrap();
1694            }
1695        })
1696        .await;
1697
1698        // --- Concurrency sweep on the real write pattern: does throughput scale with
1699        // parallelism (I/O-latency-bound) or plateau (CPU/serialized)? ---
1700        let mut sweep = Vec::new();
1701        for &c in &[8usize, 16, 32, 64, 128] {
1702            let d = data.clone();
1703            let ms = timed("sweep", N, c, move |i, op| {
1704                let path = path_of(i);
1705                let bytes = d[i].clone();
1706                async move {
1707                    let seq = TMP_COUNTER.fetch_add(1, Ordering::Relaxed);
1708                    let tmp_path = format!("{path}.tmp.{seq}");
1709                    op.write(&tmp_path, bytes).await.unwrap();
1710                    op.rename(&tmp_path, &path).await.unwrap();
1711                }
1712            })
1713            .await;
1714            sweep.push((c, ms));
1715        }
1716
1717        // --- fsync isolation: raw std::fs write vs write+sync_all at conc 16, to test
1718        // whether the per-file durability flush is the dominant write cost (the issue's
1719        // "disable per-write sync" lever). ---
1720        async fn raw_fs(n: usize, conc: usize, bytes: usize, do_sync: bool) -> f64 {
1721            let dir = tempfile::tempdir().unwrap();
1722            let root = dir.path().to_path_buf();
1723            let payload = vec![0u8; bytes];
1724            let t = Instant::now();
1725            stream::iter(0..n)
1726                .map(|i| {
1727                    let root = root.clone();
1728                    let payload = payload.clone();
1729                    async move {
1730                        tokio::task::spawn_blocking(move || {
1731                            use std::io::Write;
1732                            let p = root.join(format!("f{i}.bin"));
1733                            let mut f = std::fs::File::create(&p).unwrap();
1734                            f.write_all(&payload).unwrap();
1735                            if do_sync {
1736                                f.sync_all().unwrap();
1737                            }
1738                        })
1739                        .await
1740                        .unwrap();
1741                    }
1742                })
1743                .buffer_unordered(conc)
1744                .collect::<Vec<()>>()
1745                .await;
1746            let ms = t.elapsed().as_secs_f64() * 1000.0;
1747            drop(dir);
1748            ms
1749        }
1750        let raw_nosync = raw_fs(N, 16, payload_bytes, false).await;
1751        let raw_sync = raw_fs(N, 16, payload_bytes, true).await;
1752
1753        let rate = |ms: f64| N as f64 / (ms / 1000.0);
1754        eprintln!("\n=== DAK-6287 storage_upsert sub-stage profile (fs backend) ===");
1755        eprintln!("N={N} dim={DIM} payload={payload_bytes}B concurrency={conc}");
1756        eprintln!(
1757            "V0 upsert() REAL path      : {v0_ms:8.2} ms  {:8.1} mem/s",
1758            rate(v0_ms)
1759        );
1760        eprintln!(
1761            "P_full exists+tmp+rename   : {p_full:8.2} ms  {:8.1} mem/s",
1762            rate(p_full)
1763        );
1764        eprintln!(
1765            "P_exists (stat only)       : {p_exists:8.2} ms  {:8.1} mem/s",
1766            rate(p_exists)
1767        );
1768        eprintln!(
1769            "P_write_tmp_rename         : {p_tmp_rename:8.2} ms  {:8.1} mem/s",
1770            rate(p_tmp_rename)
1771        );
1772        eprintln!(
1773            "P_write_direct             : {p_write_direct:8.2} ms  {:8.1} mem/s",
1774            rate(p_write_direct)
1775        );
1776        eprintln!("--- attribution (of P_full) ---");
1777        eprintln!("exists share   : {:5.1}%", 100.0 * p_exists / p_full);
1778        eprintln!("rename overhead: {:5.1}%  (tmp+rename {p_tmp_rename:.1} vs direct {p_write_direct:.1})",
1779            100.0 * (p_tmp_rename - p_write_direct).max(0.0) / p_full);
1780        eprintln!("--- concurrency sweep (write tmp+rename) ---");
1781        for (c, ms) in &sweep {
1782            eprintln!("conc={c:4} : {ms:8.2} ms  {:8.1} mem/s", rate(*ms));
1783        }
1784        eprintln!("--- fsync isolation (raw std::fs, conc=16, {payload_bytes}B) ---");
1785        eprintln!(
1786            "write no-sync : {raw_nosync:8.2} ms  {:8.1} mem/s",
1787            rate(raw_nosync)
1788        );
1789        eprintln!(
1790            "write+sync_all: {raw_sync:8.2} ms  {:8.1} mem/s",
1791            rate(raw_sync)
1792        );
1793        eprintln!(
1794            "=> fsync cost factor: {:.1}x",
1795            raw_sync / raw_nosync.max(0.001)
1796        );
1797    }
1798
1799    // DAK-6289: per-sub-stage profile of ObjectStorage::upsert against a REAL S3/MinIO
1800    // endpoint. Prod runs DAKERA_STORAGE=s3 (dakera-deploy docker-compose + k8s), whose
1801    // write path is network-bound and a different shape from the fs 83.6% profiled above.
1802    //
1803    // MEASURED (this harness, live MinIO): OpenDAL's S3 service has NO rename capability —
1804    // `rename` returns Unsupported (a fast CLIENT-side error, not a network COPY). So the OLD
1805    // upsert always hit its fallback: write(tmp) → rename fails → delete(tmp) + write(final).
1806    // Net OLD per vector = exists(HEAD) + PUT(tmp) + DELETE(tmp) + PUT(final) — payload PUT
1807    // TWICE. The fix writes direct = exists(HEAD) + PUT(final). This decomposes the path so
1808    // the wasted PUT(tmp)+DELETE(tmp) is PROVABLE, and reports BEFORE vs AFTER on one bucket:
1809    //   - V0            : the REAL upsert() (now the direct-PUT path) end to end
1810    //   - P_full_old    : exists + write(tmp) + rename-with-fallback (the real BEFORE path)
1811    //   - P_direct_new  : exists + write(final key)                                  ← AFTER
1812    //   - P_exists      : HEAD only
1813    //   - P_put         : PUT only
1814    //   - P_waste       : write(tmp) + delete(tmp) — the wasted ops the fix removes
1815    //   - concurrency sweep on the direct-PUT pattern
1816    //
1817    // Ignored by default (needs a live endpoint + writes objects). Stand up MinIO and run:
1818    //   docker run -d -p 9000:9000 -e MINIO_ROOT_USER=minioadmin \
1819    //     -e MINIO_ROOT_PASSWORD=minioadmin minio/minio server /data
1820    //   # create bucket `dakera-prof` (mc mb), then:
1821    //   DAKERA_PROFILE_S3_ENDPOINT=http://127.0.0.1:9000 \
1822    //   DAKERA_PROFILE_S3_BUCKET=dakera-prof \
1823    //   AWS_ACCESS_KEY_ID=minioadmin AWS_SECRET_ACCESS_KEY=minioadmin \
1824    //   DAKERA_PROFILE_S3_REGION=us-east-1 \
1825    //   cargo test -p dakera-storage --release profile_upsert_substages_s3 -- --ignored --nocapture
1826    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1827    #[ignore]
1828    async fn profile_upsert_substages_s3() {
1829        use std::time::Instant;
1830
1831        let endpoint = match std::env::var("DAKERA_PROFILE_S3_ENDPOINT") {
1832            Ok(e) => e,
1833            Err(_) => {
1834                eprintln!(
1835                    "SKIP profile_upsert_substages_s3: set DAKERA_PROFILE_S3_ENDPOINT + \
1836                     DAKERA_PROFILE_S3_BUCKET (+ AWS_ACCESS_KEY_ID/AWS_SECRET_ACCESS_KEY) \
1837                     to a live MinIO/S3 endpoint to run this profile."
1838                );
1839                return;
1840            }
1841        };
1842        let bucket = std::env::var("DAKERA_PROFILE_S3_BUCKET")
1843            .expect("DAKERA_PROFILE_S3_BUCKET required for S3 profile");
1844        let region = std::env::var("DAKERA_PROFILE_S3_REGION").ok();
1845        let access_key_id = std::env::var("AWS_ACCESS_KEY_ID").ok();
1846        let secret_access_key = std::env::var("AWS_SECRET_ACCESS_KEY").ok();
1847        let n: usize = std::env::var("DAKERA_PROFILE_N")
1848            .ok()
1849            .and_then(|v| v.parse().ok())
1850            .unwrap_or(100);
1851        const DIM: usize = 1024; // bge-large-en-v1.5
1852        let conc = s3_concurrent_ops();
1853
1854        let s3_cfg = ObjectStorageConfig::S3 {
1855            bucket: bucket.clone(),
1856            region: region.clone(),
1857            endpoint: Some(endpoint.clone()),
1858            access_key_id: access_key_id.clone(),
1859            secret_access_key: secret_access_key.clone(),
1860        };
1861
1862        // Realistic payload: 1024-d vector + LoCoMo-shaped metadata.
1863        let make = |i: usize| -> Vector {
1864            Vector {
1865                id: format!("prof-{i}"),
1866                values: vec![0.0123_f32; DIM],
1867                metadata: Some(serde_json::json!({
1868                    "_embedding_kind": "static",
1869                    "tags": ["dia_1", "speaker_a", "session_3", "2026-06-03"],
1870                    "importance": 0.7,
1871                    "session_id": format!("sess-{}", i % 8),
1872                })),
1873                ttl_seconds: None,
1874                expires_at: None,
1875            }
1876        };
1877        let vectors: Vec<Vector> = (0..n).map(make).collect();
1878        let data: Vec<Vec<u8>> = vectors
1879            .iter()
1880            .map(|v| serde_json::to_vec(&StoredVector::from(v.clone())).unwrap())
1881            .collect();
1882        let payload_bytes = data[0].len();
1883
1884        // --- V0: authoritative current upsert() (the shipped direct-PUT path) ---
1885        let s0 = ObjectStorage::new(s3_cfg.clone()).unwrap();
1886        assert!(!s0.needs_tmp_rename, "S3 must take the direct-PUT path");
1887        let ns = "locomo-bench-prof-v0".to_string();
1888        let _ = s0.delete_namespace(&ns).await; // clean slate
1889        s0.ensure_namespace(&ns).await.unwrap();
1890        let t = Instant::now();
1891        let written = s0.upsert(&ns, vectors.clone()).await.unwrap();
1892        let v0_ms = t.elapsed().as_secs_f64() * 1000.0;
1893        assert_eq!(written, n);
1894
1895        // Helper: run a per-vector closure over the S3 operator at `conc`, time it.
1896        let op = ObjectStorage::build_operator(&s3_cfg).unwrap();
1897        async fn timed<F, Fut>(op: &Operator, n: usize, conc: usize, f: F) -> f64
1898        where
1899            F: Fn(usize, Operator) -> Fut,
1900            Fut: std::future::Future<Output = ()>,
1901        {
1902            let t = Instant::now();
1903            stream::iter(0..n)
1904                .map(|i| f(i, op.clone()))
1905                .buffer_unordered(conc)
1906                .collect::<Vec<()>>()
1907                .await;
1908            t.elapsed().as_secs_f64() * 1000.0
1909        }
1910        // Distinct key prefix per phase so phases don't interfere. A nested `fn` (not a
1911        // closure) so the `move` timing closures below can call it without capturing it.
1912        fn key(phase: &str, i: usize) -> String {
1913            format!("namespaces/locomo-bench-prof-{phase}/vectors/prof-{i}.json")
1914        }
1915
1916        // P_exists: n HEAD requests.
1917        let p_exists = timed(&op, n, conc, |i, op| {
1918            let path = key("exists", i);
1919            async move {
1920                let _ = op.exists(&path).await;
1921            }
1922        })
1923        .await;
1924
1925        // P_put: n direct PUTs (no exists, no tmp, no rename).
1926        let d = data.clone();
1927        let p_put = timed(&op, n, conc, move |i, op| {
1928            let path = key("put", i);
1929            let bytes = d[i].clone();
1930            async move {
1931                op.write(&path, bytes).await.unwrap();
1932            }
1933        })
1934        .await;
1935
1936        // P_waste: the wasted ops the fix removes — write(tmp) + delete(tmp). On S3,
1937        // OpenDAL has NO rename capability (`rename` => Unsupported, a fast CLIENT-side
1938        // error, NO network COPY), so the OLD upsert always fell through to its fallback:
1939        // delete(tmp) + write(final). Net OLD per vector = PUT(tmp) + DELETE(tmp) + PUT(final)
1940        // — the payload is PUT twice. This phase isolates the PUT(tmp)+DELETE(tmp) the fix drops.
1941        let d = data.clone();
1942        let p_waste = timed(&op, n, conc, move |i, op| {
1943            let tmp = format!("{}.tmp", key("waste", i));
1944            let bytes = d[i].clone();
1945            async move {
1946                op.write(&tmp, bytes).await.unwrap();
1947                op.delete(&tmp).await.unwrap();
1948            }
1949        })
1950        .await;
1951
1952        // P_full_old: the EXACT old per-vector body — exists + write(tmp) + rename-with-fallback.
1953        // On S3 the rename errors (Unsupported), so this exercises the real fallback path
1954        // (delete(tmp) + write(final)) that prod actually ran. This is the true BEFORE cost.
1955        let d = data.clone();
1956        let p_full_old = timed(&op, n, conc, move |i, op| {
1957            let path = key("old", i);
1958            let tmp = format!("{path}.tmp");
1959            let bytes = d[i].clone();
1960            async move {
1961                let _ = op.exists(&path).await;
1962                let rename_ok = async {
1963                    op.write(&tmp, bytes.clone()).await?;
1964                    op.rename(&tmp, &path).await?;
1965                    Ok::<(), opendal::Error>(())
1966                }
1967                .await
1968                .is_ok();
1969                if !rename_ok {
1970                    let _ = op.delete(&tmp).await;
1971                    op.write(&path, bytes).await.unwrap();
1972                }
1973            }
1974        })
1975        .await;
1976
1977        // P_direct_new: exists + write(final key)  (the AFTER per-vector body).
1978        let d = data.clone();
1979        let p_direct_new = timed(&op, n, conc, move |i, op| {
1980            let path = key("new", i);
1981            let bytes = d[i].clone();
1982            async move {
1983                let _ = op.exists(&path).await;
1984                op.write(&path, bytes).await.unwrap();
1985            }
1986        })
1987        .await;
1988
1989        // Concurrency sweep on the shipped direct-PUT pattern.
1990        let mut sweep = Vec::new();
1991        for &c in &[8usize, 16, 32, 64] {
1992            let d = data.clone();
1993            let ms = timed(&op, n, c, move |i, op| {
1994                let path = format!("namespaces/locomo-bench-prof-sweep{c}/vectors/prof-{i}.json");
1995                let bytes = d[i].clone();
1996                async move {
1997                    op.write(&path, bytes).await.unwrap();
1998                }
1999            })
2000            .await;
2001            sweep.push((c, ms));
2002        }
2003
2004        let rate = |ms: f64| n as f64 / (ms / 1000.0);
2005        eprintln!("\n=== DAK-6289 storage_upsert sub-stage profile (S3/MinIO backend) ===");
2006        eprintln!("endpoint={endpoint} bucket={bucket} N={n} dim={DIM} payload={payload_bytes}B concurrency={conc}");
2007        eprintln!(
2008            "V0 upsert() REAL (direct PUT): {v0_ms:8.2} ms  {:8.1} mem/s",
2009            rate(v0_ms)
2010        );
2011        eprintln!(
2012            "P_full_old exists+PUT+rename : {p_full_old:8.2} ms  {:8.1} mem/s   <- BEFORE",
2013            rate(p_full_old)
2014        );
2015        eprintln!(
2016            "P_direct_new exists+PUT      : {p_direct_new:8.2} ms  {:8.1} mem/s   <- AFTER",
2017            rate(p_direct_new)
2018        );
2019        eprintln!(
2020            "P_exists (HEAD only)         : {p_exists:8.2} ms  {:8.1} mem/s",
2021            rate(p_exists)
2022        );
2023        eprintln!(
2024            "P_put (direct PUT only)      : {p_put:8.2} ms  {:8.1} mem/s",
2025            rate(p_put)
2026        );
2027        eprintln!(
2028            "P_waste (PUT tmp + DELETE)   : {p_waste:8.2} ms  {:8.1} mem/s  (the ops the fix drops)",
2029            rate(p_waste)
2030        );
2031        eprintln!("--- attribution (S3 rename is Unsupported → old path = PUT(tmp)+DELETE(tmp)+PUT(final)) ---");
2032        eprintln!(
2033            "wasted-op share of old body  : {:5.1}%  ({p_waste:.1}ms of {p_full_old:.1}ms)",
2034            100.0 * p_waste / p_full_old.max(0.001)
2035        );
2036        eprintln!(
2037            "=> direct-PUT speedup (old/new): {:.2}x  (end-to-end V0 vs P_full_old: {:.2}x)",
2038            p_full_old / p_direct_new.max(0.001),
2039            p_full_old / v0_ms.max(0.001)
2040        );
2041        eprintln!("--- concurrency sweep (direct PUT) ---");
2042        for (c, ms) in &sweep {
2043            eprintln!("conc={c:4} : {ms:8.2} ms  {:8.1} mem/s", rate(*ms));
2044        }
2045
2046        // Best-effort cleanup of every profile object written this run (DAK-2407 bench
2047        // hygiene). The decomposition phases write raw vector objects with no meta.json,
2048        // so delete_namespace() would skip them — recursively delete the prefixes directly.
2049        let mut prefixes: Vec<String> = ["v0", "exists", "put", "waste", "old", "new"]
2050            .iter()
2051            .map(|p| format!("namespaces/locomo-bench-prof-{p}/"))
2052            .collect();
2053        for &c in &[8usize, 16, 32, 64] {
2054            prefixes.push(format!("namespaces/locomo-bench-prof-sweep{c}/"));
2055        }
2056        for prefix in prefixes {
2057            let _ = op.delete_with(&prefix).recursive(true).await;
2058        }
2059    }
2060}