infino 0.1.0

A fast retrieval engine that stores data on object storage and runs SQL, full-text search, and vector search over it from a single system — search-on-Parquet.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Infino Authors

//! LocalFS-backed [`StorageProvider`].
//!
//! Wraps `object_store::local::LocalFileSystem` so the same
//! supertable code paths exercise both LocalFS (dev / tests /
//! single-node) and S3 (production / multi-node) without
//! backend-specific branching above the storage trait.
//!
//! The path scoping is: every URI handed to a method is
//! relative to the `root` passed at construction. So
//! `provider.get("data/seg-abc.sf.parquet")` reads
//! `<root>/data/seg-abc.sf.parquet`. No upward traversal — paths with
//! `..` get rejected by `object_store::path::Path`.

use std::{ops::Range, path::PathBuf, sync::Arc};

use async_trait::async_trait;
use bytes::Bytes;
use fs4::tokio::AsyncFileExt;
use futures::TryStreamExt;
use object_store::{
    Error as ObjError, MultipartUpload, ObjectStore, ObjectStoreExt, PutMode, PutOptions,
    PutPayload, local::LocalFileSystem, path::Path as ObjPath,
};

use super::{ObjectMeta, StorageError, StorageProvider};

#[derive(Debug)]
pub struct LocalFsStorageProvider {
    root: PathBuf,
    store: Arc<LocalFileSystem>,
}

impl LocalFsStorageProvider {
    /// Construct a new LocalFS-backed provider rooted at
    /// `root`. The directory is created (recursively) if it
    /// doesn't exist; surfacing
    /// [`StorageError::Permanent`] only if creation fails
    /// (permission denied, parent doesn't exist + we can't
    /// mkdir, etc.).
    pub fn new(root: impl Into<PathBuf>) -> Result<Self, StorageError> {
        let root: PathBuf = root.into();
        std::fs::create_dir_all(&root).map_err(|e| StorageError::Permanent {
            uri: root.display().to_string(),
            source: Box::new(e),
        })?;
        let store =
            LocalFileSystem::new_with_prefix(&root).map_err(|e| StorageError::Permanent {
                uri: root.display().to_string(),
                source: Box::new(e),
            })?;
        Ok(Self {
            root,
            store: Arc::new(store),
        })
    }

    /// Filesystem root this provider is scoped to. Useful for
    /// tests that need to inspect on-disk state directly.
    pub fn root(&self) -> &PathBuf {
        &self.root
    }

    fn path(uri: &str) -> Result<ObjPath, StorageError> {
        ObjPath::parse(uri).map_err(|e| StorageError::Permanent {
            uri: uri.into(),
            source: Box::new(e),
        })
    }
}

/// Translate an `object_store::Error` to our `StorageError`.
///
/// The mapping:
/// - `NotFound` → `NotFound`
/// - `AlreadyExists` / `Precondition` → `PreconditionFailed`
/// - everything else → `Permanent` (object_store has already
///   retried transient failures internally per its
///   `RetryConfig`; by the time we see one here it's
///   exhausted)
fn translate(uri: &str, e: ObjError) -> StorageError {
    match e {
        ObjError::NotFound { .. } => StorageError::NotFound { uri: uri.into() },
        ObjError::AlreadyExists { .. } | ObjError::Precondition { .. } => {
            StorageError::PreconditionFailed { uri: uri.into() }
        }
        ObjError::Generic { source, .. } => StorageError::TransientExhausted {
            uri: uri.into(),
            source,
        },
        other => StorageError::Permanent {
            uri: uri.into(),
            source: Box::new(other),
        },
    }
}

#[async_trait]
impl StorageProvider for LocalFsStorageProvider {
    async fn head(&self, uri: &str) -> Result<ObjectMeta, StorageError> {
        let path = Self::path(uri)?;
        let meta = self
            .store
            .head(&path)
            .await
            .map_err(|e| translate(uri, e))?;
        Ok(ObjectMeta {
            size: meta.size as u64,
            etag: meta.e_tag,
            last_modified: meta.last_modified.into(),
        })
    }

    async fn get(&self, uri: &str) -> Result<(Bytes, ObjectMeta), StorageError> {
        let path = Self::path(uri)?;
        let result = self.store.get(&path).await.map_err(|e| translate(uri, e))?;
        // `GetResult.meta` matches the version we're about to
        // read — no separate HEAD needed to capture the etag.
        let meta = ObjectMeta {
            size: result.meta.size as u64,
            etag: result.meta.e_tag.clone(),
            last_modified: result.meta.last_modified.into(),
        };
        let bytes = result.bytes().await.map_err(|e| translate(uri, e))?;
        Ok((bytes, meta))
    }

    async fn get_range(&self, uri: &str, range: Range<u64>) -> Result<Bytes, StorageError> {
        let path = Self::path(uri)?;
        self.store
            .get_range(&path, range)
            .await
            .map_err(|e| translate(uri, e))
    }

    async fn put_atomic(&self, uri: &str, bytes: Bytes) -> Result<Option<String>, StorageError> {
        let path = Self::path(uri)?;
        let opts = PutOptions {
            mode: PutMode::Create,
            ..Default::default()
        };
        self.store
            .put_opts(&path, PutPayload::from_bytes(bytes), opts)
            .await
            .map(|r| r.e_tag)
            .map_err(|e| translate(uri, e))
    }

    async fn put_if_match(
        &self,
        uri: &str,
        bytes: Bytes,
        expected_etag: Option<&str>,
    ) -> Result<Option<String>, StorageError> {
        let path = Self::path(uri)?;
        match expected_etag {
            // None == create-only-if-absent. Same as put_atomic.
            None => {
                let opts = PutOptions {
                    mode: PutMode::Create,
                    ..Default::default()
                };
                self.store
                    .put_opts(&path, PutPayload::from_bytes(bytes), opts)
                    .await
                    .map(|r| r.e_tag)
                    .map_err(|e| translate(uri, e))
            }
            // Some(tag) == update-if-etag-matches.
            //
            // `object_store::LocalFileSystem` doesn't implement
            // `PutMode::Update` directly (it surfaces `NotImplemented`).
            // We implement etag-conditional update as
            // read-then-overwrite, bracketed by an advisory
            // `flock` on `<root>/_supertable/.lock` so two
            // processes can't both observe the same prior etag
            // and race the overwrite. POSIX `flock` releases on
            // fd close, so the lock file drops at the end of
            // this branch and the next contender proceeds.
            // S3 / GCS providers use native conditional PUT and
            // don't need this scaffolding — see
            // `S3StorageProvider::put_if_match`.
            Some(expected) => {
                let lock_path = self.root.join("_supertable").join(".lock");
                // The pointer commit path already creates
                // `_supertable/` on the first write; doing it
                // here too is idempotent + makes the lock
                // robust against any other call site that
                // routes through put_if_match before the
                // pointer commits.
                if let Some(parent) = lock_path.parent() {
                    let _ = tokio::fs::create_dir_all(parent).await;
                }
                let lock_file = tokio::fs::OpenOptions::new()
                    .create(true)
                    .read(true)
                    .write(true)
                    .truncate(false)
                    .open(&lock_path)
                    .await
                    .map_err(|e| StorageError::Permanent {
                        uri: uri.into(),
                        source: Box::new(e),
                    })?;
                lock_file
                    .lock_exclusive()
                    .map_err(|e| StorageError::Permanent {
                        uri: uri.into(),
                        source: Box::new(e),
                    })?;
                // Lock held below until `lock_file` drops at
                // end of branch (or early-return). Holding it
                // across `.await` points blocks the
                // tokio worker; head + put on LocalFS are
                // microseconds, so the worst-case stall is
                // bounded.

                let result: Result<Option<String>, StorageError> = async {
                    let current = self
                        .store
                        .head(&path)
                        .await
                        .map_err(|e| translate(uri, e))?;
                    let current_etag = current.e_tag.as_deref().unwrap_or("");
                    if current_etag != expected {
                        return Err(StorageError::PreconditionFailed { uri: uri.into() });
                    }
                    let opts = PutOptions {
                        mode: PutMode::Overwrite,
                        ..Default::default()
                    };
                    self.store
                        .put_opts(&path, PutPayload::from_bytes(bytes), opts)
                        .await
                        .map(|r| r.e_tag)
                        .map_err(|e| translate(uri, e))
                }
                .await;
                // `lock_file` drops here → POSIX flock
                // releases when the fd closes. Best-effort
                // explicit unlock too, ignoring failures (the
                // kernel cleans up regardless).
                let _ = lock_file.unlock_async().await;
                result
            }
        }
    }

    async fn put_multipart(&self, uri: &str) -> Result<Box<dyn MultipartUpload>, StorageError> {
        let path = Self::path(uri)?;
        self.store
            .put_multipart(&path)
            .await
            .map_err(|e| translate(uri, e))
    }

    async fn delete(&self, uri: &str) -> Result<(), StorageError> {
        let path = Self::path(uri)?;
        match self.store.delete(&path).await {
            Ok(()) => Ok(()),
            Err(ObjError::NotFound { .. }) => Ok(()),
            Err(e) => Err(translate(uri, e)),
        }
    }

    async fn list_with_prefix_metadata(
        &self,
        prefix: &str,
    ) -> Result<Vec<(String, ObjectMeta)>, StorageError> {
        let path = ObjPath::from(prefix);
        let mut stream = self.store.list(Some(&path));
        let mut out = Vec::new();
        while let Some(meta) = stream.try_next().await.map_err(|e| translate(prefix, e))? {
            out.push((
                meta.location.to_string(),
                ObjectMeta {
                    size: meta.size,
                    etag: meta.e_tag,
                    last_modified: meta.last_modified.into(),
                },
            ));
        }
        Ok(out)
    }

    fn object_store_handle(&self, uri: &str) -> Option<(Arc<dyn ObjectStore>, ObjPath)> {
        // The prefix (root) is baked into the LocalFileSystem store, so
        // the object key is the bare uri.
        let path = Self::path(uri).ok()?;
        Some((Arc::clone(&self.store) as Arc<dyn ObjectStore>, path))
    }
}

#[cfg(test)]
mod tests {
    //! `StorageProvider` trait contract against
    //! `LocalFsStorageProvider`.
    //!
    //! Covers: round-trip put + get; head returns accurate
    //! size + etag presence; range-fetch over a known
    //! object; `put_atomic` rejects an already-existing
    //! target; `put_if_match` honors ETag preconditions
    //! (success + failure paths) — the OCC primitive the
    //! manifest-pointer commit rides on; `delete` is
    //! idempotent on a missing target; `get` / `head` /
    //! `get_range` return `NotFound` on missing; advisory
    //! flock file is created on `put_if_match` (the TOCTOU-
    //! closing path); `put_multipart` returns a handle.
    use std::{
        error::Error,
        time::{Duration, SystemTime},
    };

    use bytes::Bytes;
    use tempfile::TempDir;

    use super::*;

    fn provider() -> (TempDir, LocalFsStorageProvider) {
        let dir = TempDir::new().expect("tempdir");
        let p = LocalFsStorageProvider::new(dir.path()).expect("provider");
        (dir, p)
    }

    #[tokio::test]
    async fn put_then_get_roundtrip() {
        let (_dir, p) = provider();
        let payload = Bytes::from_static(b"hello supertable storage");
        p.put_atomic("data/seg-abc.sf.parquet", payload.clone())
            .await
            .expect("put");
        let (got, _) = p.get("data/seg-abc.sf.parquet").await.expect("get");
        assert_eq!(got, payload);
    }

    #[tokio::test]
    async fn head_returns_accurate_size() {
        let (_dir, p) = provider();
        let payload = Bytes::from_static(&[0xABu8; 1024]);
        p.put_atomic("data/seg-head.sf.parquet", payload)
            .await
            .expect("put");

        let meta = p.head("data/seg-head.sf.parquet").await.expect("head");
        assert_eq!(meta.size, 1024);
        // LocalFS surfaces an mtime-derived etag; other
        // backends may not. Assert presence, not value.
        assert!(meta.etag.is_some(), "LocalFS should surface an etag");
    }

    #[tokio::test]
    async fn get_range_reads_exact_slice() {
        let (_dir, p) = provider();
        let payload: Vec<u8> = (0u8..=255).collect();
        p.put_atomic("data/seg-range.sf.parquet", Bytes::from(payload.clone()))
            .await
            .expect("put");

        let slice = p
            .get_range("data/seg-range.sf.parquet", 32..64)
            .await
            .expect("range");
        assert_eq!(slice.as_ref(), &payload[32..64]);

        let tail = p
            .get_range("data/seg-range.sf.parquet", 255..256)
            .await
            .expect("range tail");
        assert_eq!(tail.as_ref(), &payload[255..256]);
    }

    #[tokio::test]
    async fn put_atomic_rejects_existing() {
        let (_dir, p) = provider();
        let payload = Bytes::from_static(b"first writer wins");
        p.put_atomic("manifest-lists/list-1.json", payload.clone())
            .await
            .expect("first put");

        let err = p
            .put_atomic("manifest-lists/list-1.json", Bytes::from_static(b"second"))
            .await
            .expect_err("second put must fail");
        assert!(
            matches!(err, StorageError::PreconditionFailed { .. }),
            "expected PreconditionFailed, got {err:?}"
        );

        let (got, _) = p
            .get("manifest-lists/list-1.json")
            .await
            .expect("get after losing put");
        assert_eq!(got, payload);
    }

    #[tokio::test]
    async fn put_if_match_with_correct_etag_succeeds() {
        let (_dir, p) = provider();
        p.put_atomic("ptr/current", Bytes::from_static(b"v1"))
            .await
            .expect("initial");
        let meta = p.head("ptr/current").await.expect("head");
        let etag = meta.etag.expect("LocalFS etag");

        p.put_if_match("ptr/current", Bytes::from_static(b"v2"), Some(&etag))
            .await
            .expect("conditional update with correct etag");

        let (got, _) = p.get("ptr/current").await.expect("get v2");
        assert_eq!(got.as_ref(), b"v2");
    }

    #[tokio::test]
    async fn put_if_match_with_stale_etag_fails() {
        let (_dir, p) = provider();
        p.put_atomic("ptr/current", Bytes::from_static(b"v1"))
            .await
            .expect("initial");
        let stale_meta = p.head("ptr/current").await.expect("head v1");
        let stale_etag = stale_meta.etag.clone().expect("etag v1");

        // Legitimate writer wins the OCC race.
        p.put_if_match(
            "ptr/current",
            Bytes::from_static(b"v_intermediate"),
            Some(&stale_etag),
        )
        .await
        .expect("legitimate update");

        // Second writer with the now-stale etag must lose.
        let err = p
            .put_if_match(
                "ptr/current",
                Bytes::from_static(b"v_stale_writer"),
                Some(&stale_etag),
            )
            .await
            .expect_err("stale etag must fail");
        assert!(
            matches!(err, StorageError::PreconditionFailed { .. }),
            "expected PreconditionFailed, got {err:?}"
        );

        let (got, _) = p.get("ptr/current").await.expect("get");
        assert_eq!(got.as_ref(), b"v_intermediate");
    }

    #[tokio::test]
    async fn delete_is_idempotent() {
        let (_dir, p) = provider();
        p.put_atomic("data/orphan.sf.parquet", Bytes::from_static(b"x"))
            .await
            .expect("put");

        p.delete("data/orphan.sf.parquet")
            .await
            .expect("first delete");
        p.delete("data/orphan.sf.parquet")
            .await
            .expect("second delete (idempotent)");
        p.delete("data/never-existed.sf.parquet")
            .await
            .expect("delete of never-existing");
    }

    #[tokio::test]
    async fn missing_object_returns_not_found() {
        let (_dir, p) = provider();
        let err = p
            .head("data/no-such.sf.parquet")
            .await
            .expect_err("head missing");
        assert!(matches!(err, StorageError::NotFound { .. }));

        let err = p
            .get("data/no-such.sf.parquet")
            .await
            .expect_err("get missing");
        assert!(matches!(err, StorageError::NotFound { .. }));

        let err = p
            .get_range("data/no-such.sf.parquet", 0..1)
            .await
            .expect_err("get_range missing");
        assert!(matches!(err, StorageError::NotFound { .. }));
    }

    #[tokio::test]
    async fn put_at_nested_path_creates_dirs() {
        // Forward-slash-separated paths are object_store
        // idiom; LocalFileSystem creates intermediate dirs.
        let (_dir, p) = provider();
        p.put_atomic("a/b/c/d/leaf.bin", Bytes::from_static(b"deep"))
            .await
            .expect("nested put");
        let (got, _) = p.get("a/b/c/d/leaf.bin").await.expect("nested get");
        assert_eq!(got.as_ref(), b"deep");
    }

    #[tokio::test]
    async fn put_if_match_creates_supertable_lock_file() {
        // `put_if_match`'s Some(etag) branch acquires an
        // advisory flock on `<root>/_supertable/.lock` to
        // close the read-then-overwrite TOCTOU window. The
        // lock file persists (best-effort cleanup is not
        // attempted), so its presence after a successful
        // conditional update is a direct signal the lock
        // path was exercised.
        let dir = TempDir::new().expect("tempdir");
        let p = LocalFsStorageProvider::new(dir.path()).expect("provider");
        p.put_atomic("ptr/current", Bytes::from_static(b"v1"))
            .await
            .expect("initial");
        let etag = p
            .head("ptr/current")
            .await
            .expect("head")
            .etag
            .expect("etag");
        p.put_if_match("ptr/current", Bytes::from_static(b"v2"), Some(&etag))
            .await
            .expect("conditional update");

        let lock_path = dir.path().join("_supertable").join(".lock");
        assert!(
            lock_path.exists(),
            "expected advisory lock file at {lock_path:?}"
        );
    }

    #[tokio::test]
    async fn put_multipart_returns_handle() {
        // Surface check only — driving real part PUTs
        // happens at the supertable commit layer.
        let (_dir, p) = provider();
        let mut upload = p
            .put_multipart("data/multipart-test.sf.parquet")
            .await
            .expect("multipart handle");
        upload.abort().await.expect("abort");
    }

    #[tokio::test]
    async fn list_with_prefix_returns_matching_keys() {
        let (_dir, p) = provider();
        for key in ["seg/a.parquet", "seg/b.parquet", "other/c.parquet"] {
            p.put_atomic(key, Bytes::from_static(b"x"))
                .await
                .expect("put");
        }
        let mut under_seg = p.list_with_prefix("seg").await.expect("list");
        under_seg.sort();
        assert_eq!(under_seg, vec!["seg/a.parquet", "seg/b.parquet"]);

        let all = p.list_with_prefix("").await.expect("list all");
        assert_eq!(all.len(), 3);

        let none = p
            .list_with_prefix("does-not-exist")
            .await
            .expect("list empty");
        assert!(none.is_empty());
    }

    #[tokio::test]
    async fn list_with_prefix_metadata_returns_mtime_and_size() {
        let (_dir, p) = provider();
        let before = SystemTime::now()
            .checked_sub(Duration::from_secs(2))
            .expect("parsing failed");
        p.put_atomic("data/a.parquet", Bytes::from_static(b"hello"))
            .await
            .expect("put");
        let after = SystemTime::now()
            .checked_add(Duration::from_secs(2))
            .expect("parsing failed");

        let mut entries = p
            .list_with_prefix_metadata("data/")
            .await
            .expect("list metadata");
        assert_eq!(entries.len(), 1);
        entries.sort_by_key(|(key, _)| key.clone());
        let (key, meta) = &entries[0];
        assert_eq!(key, "data/a.parquet");
        assert!(meta.last_modified >= before, "mtime too old");
        assert!(meta.last_modified <= after, "mtime in future");
        assert_eq!(meta.size, 5);
    }

    #[tokio::test]
    async fn object_store_handle_exposes_store_and_key() {
        let (_dir, p) = provider();
        let (_store, path) = p
            .object_store_handle("seg/x.parquet")
            .expect("handle for valid uri");
        assert_eq!(path.to_string(), "seg/x.parquet");
    }

    #[test]
    fn new_records_root_and_creates_it() {
        let dir = TempDir::new().expect("tempdir");
        let root = dir.path().join("nested/created/here");
        let p = LocalFsStorageProvider::new(&root).expect("provider creates root");
        assert_eq!(p.root(), &root);
        assert!(root.is_dir());
    }

    #[test]
    fn translate_maps_generic_to_transient_exhausted() {
        // `object_store` retries transient failures internally per its
        // RetryConfig; a `Generic` reaching `translate` is post-retry,
        // so it maps to `TransientExhausted`.
        let boxed: Box<dyn Error + Send + Sync> = "boom".into();
        let e = ObjError::Generic {
            store: "test",
            source: boxed,
        };
        let mapped = translate("data/x.sf.parquet", e);
        assert!(
            matches!(mapped, StorageError::TransientExhausted { .. }),
            "expected TransientExhausted, got {mapped:?}"
        );
    }

    #[test]
    fn translate_maps_unhandled_variant_to_permanent() {
        // A variant with no dedicated arm (e.g. `NotImplemented`)
        // falls through to the catch-all `Permanent`.
        let e = ObjError::NotImplemented {
            operation: "put_opts(Update)".into(),
            implementer: "LocalFileSystem".into(),
        };
        let mapped = translate("data/x.sf.parquet", e);
        match mapped {
            StorageError::Permanent { uri, .. } => assert_eq!(uri, "data/x.sf.parquet"),
            other => panic!("expected Permanent, got {other:?}"),
        }
    }

    #[test]
    fn translate_maps_already_exists_and_precondition_to_precondition_failed() {
        let already = ObjError::AlreadyExists {
            path: "p".into(),
            source: "exists".into(),
        };
        assert!(matches!(
            translate("uri", already),
            StorageError::PreconditionFailed { .. }
        ));
        let precond = ObjError::Precondition {
            path: "p".into(),
            source: "stale".into(),
        };
        assert!(matches!(
            translate("uri", precond),
            StorageError::PreconditionFailed { .. }
        ));
    }

    #[test]
    fn translate_maps_not_found() {
        let nf = ObjError::NotFound {
            path: "p".into(),
            source: "missing".into(),
        };
        assert!(matches!(
            translate("uri", nf),
            StorageError::NotFound { .. }
        ));
    }

    #[tokio::test]
    async fn invalid_path_surfaces_permanent_error() {
        // A NUL byte is illegal in an `object_store::path::Path`, so
        // `Self::path` fails before any I/O — surfacing the `path()`
        // error arm as `Permanent`.
        let (_dir, p) = provider();
        let bad_uri = "data/seg\0bad.sf.parquet";
        let err = p.head(bad_uri).await.expect_err("illegal path must fail");
        match err {
            StorageError::Permanent { uri, .. } => assert_eq!(uri, bad_uri),
            other => panic!("expected Permanent, got {other:?}"),
        }
        // The same rejection happens on the write paths.
        let err = p
            .put_atomic(bad_uri, Bytes::from_static(b"x"))
            .await
            .expect_err("illegal path must fail on put");
        assert!(matches!(err, StorageError::Permanent { .. }));
    }

    #[tokio::test]
    async fn object_store_handle_returns_none_for_invalid_path() {
        // `object_store_handle` swallows the path-parse error and
        // returns `None` (the `?`-via-`.ok()?` arm).
        let (_dir, p) = provider();
        assert!(p.object_store_handle("data/bad\0path").is_none());
    }
}