Skip to main content

omnigraph/
storage.rs

1use std::env;
2use std::fmt::Debug;
3use std::path::{Component, Path, PathBuf};
4use std::sync::Arc;
5
6use async_trait::async_trait;
7use futures::TryStreamExt;
8use object_store::aws::AmazonS3Builder;
9use object_store::local::LocalFileSystem;
10use object_store::memory::InMemory;
11use object_store::path::Path as ObjectPath;
12use object_store::{DynObjectStore, ObjectStore, ObjectStoreExt, PutMode, PutPayload};
13use url::Url;
14
15use crate::error::{OmniError, Result};
16
17const FILE_SCHEME_PREFIX: &str = "file://";
18const S3_SCHEME_PREFIX: &str = "s3://";
19
20#[async_trait]
21pub trait StorageAdapter: Debug + Send + Sync {
22    async fn read_text(&self, uri: &str) -> Result<String>;
23    async fn write_text(&self, uri: &str, contents: &str) -> Result<()>;
24    /// Write a text object only if no object exists at `uri`.
25    ///
26    /// Returns `Ok(true)` when this call created the object, `Ok(false)`
27    /// when the object already existed, and propagates every other storage
28    /// error. Callers use this to establish ownership before running
29    /// best-effort cleanup on partial failure.
30    async fn write_text_if_absent(&self, uri: &str, contents: &str) -> Result<bool>;
31    async fn exists(&self, uri: &str) -> Result<bool>;
32    /// Move a file from `from_uri` to `to_uri`, replacing any existing file at
33    /// `to_uri`. Atomic on local POSIX; on S3 implemented as copy + delete
34    /// (NOT atomic — callers that depend on atomicity for crash recovery must
35    /// tolerate "both source and destination exist after a crash").
36    async fn rename_text(&self, from_uri: &str, to_uri: &str) -> Result<()>;
37    /// Remove a file. Returns Ok(()) if the file does not exist.
38    async fn delete(&self, uri: &str) -> Result<()>;
39    /// List all files (non-recursively, files only) directly under `dir_uri`.
40    /// Returns full URIs (same scheme as `dir_uri`). The result is unordered.
41    /// Returns Ok(empty) if the directory does not exist or is empty.
42    /// Consumers must tolerate non-payload residue appearing in storage
43    /// (backend staging files are filtered by the backend, but crash residue
44    /// of any future producer may not be) — filter by suffix, never assume
45    /// every entry is yours.
46    async fn list_dir(&self, dir_uri: &str) -> Result<Vec<String>>;
47    /// Read a text object together with its backend version token (stores
48    /// with conditional-update support: the object's ETag; local: sha256 of
49    /// the content). The token is opaque — valid only for
50    /// `write_text_if_match` against the same adapter.
51    async fn read_text_versioned(&self, uri: &str) -> Result<(String, String)>;
52    /// Replace the object at `uri` only if its current version still matches
53    /// `expected_version` (obtained from a prior versioned read/write on this
54    /// adapter). Returns `Ok(Some(new_version))` on success and `Ok(None)`
55    /// when the precondition failed (a concurrent writer won — the CAS-lost
56    /// case callers must surface, never swallow). Stores with conditional
57    /// updates (S3, in-memory) use a true conditional put (If-Match); the
58    /// local filesystem has no such primitive (`PutMode::Update` is
59    /// unimplemented upstream), so local compares content then replaces via
60    /// an atomic staged write — the same single-machine semantics the
61    /// callers had before this trait, safe under the callers' own lock
62    /// protocol but not a cross-process barrier by itself (see the Known
63    /// Gaps entry in docs/dev/invariants.md).
64    async fn write_text_if_match(
65        &self,
66        uri: &str,
67        contents: &str,
68        expected_version: &str,
69    ) -> Result<Option<String>>;
70    /// Recursively delete every object under `prefix_uri`. Returns Ok(())
71    /// when nothing exists there (idempotent). Local: `remove_dir_all`
72    /// (directories are a local-FS concept; list+delete would leave empty
73    /// directory skeletons that local existence probes report as present);
74    /// object stores: list + delete (NOT atomic — callers must tolerate
75    /// partial prefixes on crash, which the cluster delete protocol does by
76    /// retry).
77    async fn delete_prefix(&self, prefix_uri: &str) -> Result<()>;
78}
79
80/// Version token for local files: content identity. The local filesystem
81/// backend reports mtime-derived ETags too coarse for CAS (sub-granularity
82/// rewrites collide); sha256 is stable, cheap at these object sizes, and
83/// already the cluster ledger's CAS vocabulary.
84fn local_version_token(bytes: &[u8]) -> String {
85    use sha2::{Digest, Sha256};
86    let digest = Sha256::digest(bytes);
87    digest.iter().map(|byte| format!("{byte:02x}")).collect()
88}
89
90#[derive(Debug, Clone, Copy, PartialEq, Eq)]
91pub enum StorageKind {
92    Local,
93    S3,
94}
95
96/// The one storage implementation: every backend is an
97/// [`object_store::ObjectStore`], so the semantics (atomic-visibility puts,
98/// conditional creates, path-delimited listing) are upstream-maintained and
99/// identical across backends by construction. The per-backend residue is
100/// confined to [`UriCodec`] (URI ↔ object path mapping) and the
101/// `supports_conditional_update` capability flag (false only for the local
102/// filesystem, where upstream `PutMode::Update` is unimplemented).
103#[derive(Debug)]
104pub struct ObjectStorageAdapter {
105    store: Arc<DynObjectStore>,
106    codec: UriCodec,
107    /// Whether the backend implements `PutMode::Update` (ETag-conditioned
108    /// put). Gates BOTH the version-token source in `read_text_versioned`
109    /// and the `write_text_if_match` strategy — the two must agree or every
110    /// CAS loses.
111    supports_conditional_update: bool,
112}
113
114#[derive(Debug, Clone, PartialEq, Eq)]
115enum UriCodec {
116    /// Plain absolute/relative paths or `file://` URIs, mapped onto a
117    /// root-anchored [`LocalFileSystem`].
118    Local,
119    /// `s3://{bucket}/{key}` URIs, mapped onto a bucket-scoped store.
120    S3 { bucket: String },
121    /// Opaque keys for the in-memory test/embedded backend; leading
122    /// slashes are stripped.
123    Memory,
124}
125
126#[derive(Debug, Clone, PartialEq, Eq)]
127struct S3Location {
128    bucket: String,
129    key: String,
130}
131
132impl ObjectStorageAdapter {
133    /// Local-filesystem backend rooted at `/`. URIs are plain paths or
134    /// `file://` URIs; relative paths are lexically absolutized against the
135    /// current working directory.
136    pub fn local() -> Self {
137        Self {
138            store: Arc::new(LocalFileSystem::new()),
139            codec: UriCodec::Local,
140            supports_conditional_update: false,
141        }
142    }
143
144    /// S3 backend scoped to the bucket named in `root_uri`. Credentials and
145    /// endpoint come from the standard `AWS_*` environment variables (the
146    /// same ones Lance reads for its dataset stores).
147    pub fn s3_from_root_uri(root_uri: &str) -> Result<Self> {
148        let location = parse_s3_uri(root_uri)?;
149        let mut builder = AmazonS3Builder::from_env().with_bucket_name(&location.bucket);
150
151        if let Some(endpoint) = env::var("AWS_ENDPOINT_URL_S3")
152            .ok()
153            .or_else(|| env::var("AWS_ENDPOINT_URL").ok())
154        {
155            builder = builder.with_endpoint(&endpoint);
156            if endpoint.starts_with("http://") || env_var_truthy("AWS_ALLOW_HTTP") {
157                builder = builder.with_allow_http(true);
158            }
159        }
160
161        if env_var_truthy("AWS_S3_FORCE_PATH_STYLE") {
162            builder = builder.with_virtual_hosted_style_request(false);
163        }
164
165        let store = builder.build().map_err(|err| {
166            OmniError::manifest_internal(format!(
167                "failed to initialize s3 storage for '{}': {}",
168                root_uri, err
169            ))
170        })?;
171
172        Ok(Self {
173            store: Arc::new(store),
174            codec: UriCodec::S3 {
175                bucket: location.bucket,
176            },
177            supports_conditional_update: true,
178        })
179    }
180
181    /// In-memory backend for tests and embedded experiments. Implements the
182    /// FULL contract including true conditional updates (unlike the local
183    /// filesystem), so contract tests exercise the strong-CAS path without a
184    /// bucket. State lives only as long as the adapter.
185    pub fn in_memory() -> Self {
186        Self {
187            store: Arc::new(InMemory::new()),
188            codec: UriCodec::Memory,
189            supports_conditional_update: true,
190        }
191    }
192
193    fn object_path(&self, uri: &str) -> Result<ObjectPath> {
194        match &self.codec {
195            UriCodec::Local => {
196                let path = absolutize_lexically(local_path_from_uri(uri)?)?;
197                ObjectPath::from_absolute_path(&path).map_err(|err| {
198                    OmniError::manifest_internal(format!(
199                        "invalid local object path for '{}': {}",
200                        uri, err
201                    ))
202                })
203            }
204            UriCodec::S3 { bucket } => {
205                let location = parse_s3_uri(uri)?;
206                if &location.bucket != bucket {
207                    return Err(OmniError::manifest_internal(format!(
208                        "s3 storage bucket mismatch for '{}': expected '{}', found '{}'",
209                        uri, bucket, location.bucket
210                    )));
211                }
212                if location.key.is_empty() {
213                    return Err(OmniError::manifest_internal(format!(
214                        "s3 storage path is empty for '{}'",
215                        uri
216                    )));
217                }
218                ObjectPath::parse(&location.key).map_err(|err| {
219                    OmniError::manifest_internal(format!(
220                        "invalid s3 object path for '{}': {}",
221                        uri, err
222                    ))
223                })
224            }
225            UriCodec::Memory => {
226                ObjectPath::parse(uri.trim_start_matches('/')).map_err(|err| {
227                    OmniError::manifest_internal(format!(
228                        "invalid memory object path for '{}': {}",
229                        uri, err
230                    ))
231                })
232            }
233        }
234    }
235}
236
237#[async_trait]
238impl StorageAdapter for ObjectStorageAdapter {
239    async fn read_text(&self, uri: &str) -> Result<String> {
240        let location = self.object_path(uri)?;
241        let bytes = self
242            .store
243            .get(&location)
244            .await
245            .map_err(|err| storage_backend_error("read", uri, err))?
246            .bytes()
247            .await
248            .map_err(|err| storage_backend_error("read", uri, err))?;
249
250        String::from_utf8(bytes.to_vec()).map_err(|err| {
251            OmniError::manifest_internal(format!("storage read failed for '{}': {}", uri, err))
252        })
253    }
254
255    async fn write_text(&self, uri: &str, contents: &str) -> Result<()> {
256        // Atomic visibility is the backend's contract: object stores via
257        // PutObject; LocalFileSystem via an internal staged-temp + rename
258        // (a reader sees the old object or the new one, never a truncated
259        // in-progress write). Callers (sidecar protocol, cluster state)
260        // assume it.
261        let location = self.object_path(uri)?;
262        self.store
263            .put(&location, PutPayload::from(contents.as_bytes().to_vec()))
264            .await
265            .map_err(|err| storage_backend_error("write", uri, err))?;
266        Ok(())
267    }
268
269    async fn write_text_if_absent(&self, uri: &str, contents: &str) -> Result<bool> {
270        // PutMode::Create: atomic no-replace publish on every backend —
271        // exactly one of N concurrent claimants wins, and the winner's
272        // object is fully readable at the instant it becomes visible
273        // (LocalFileSystem stages the temp file completely, then
274        // hard_links it; pinned by
275        // `local_write_text_if_absent_is_read_visible_on_return`).
276        let location = self.object_path(uri)?;
277        match self
278            .store
279            .put_opts(
280                &location,
281                PutPayload::from(contents.as_bytes().to_vec()),
282                PutMode::Create.into(),
283            )
284            .await
285        {
286            Ok(_) => Ok(true),
287            Err(object_store::Error::AlreadyExists { .. })
288            | Err(object_store::Error::Precondition { .. }) => Ok(false),
289            Err(err) => Err(storage_backend_error("write_if_absent", uri, err)),
290        }
291    }
292
293    async fn exists(&self, uri: &str) -> Result<bool> {
294        // head() answers for objects; the list fallback answers for
295        // "directory-shaped" URIs (e.g. a Lance dataset root, whose
296        // `_versions/*.manifest` makes any committed dataset non-empty).
297        // Object-store semantics throughout: only objects exist —
298        // an EMPTY local directory does not (callers that probe local
299        // directories use std::fs directly).
300        let location = self.object_path(uri)?;
301        match self.store.head(&location).await {
302            Ok(_) => Ok(true),
303            Err(object_store::Error::NotFound { .. }) => {
304                let mut entries = self.store.list(Some(&location));
305                let has_prefix_entries = entries
306                    .try_next()
307                    .await
308                    .map_err(|err| storage_backend_error("exists", uri, err))?
309                    .is_some();
310                Ok(has_prefix_entries)
311            }
312            Err(err) => Err(storage_backend_error("exists", uri, err)),
313        }
314    }
315
316    async fn rename_text(&self, from_uri: &str, to_uri: &str) -> Result<()> {
317        // ObjectStore::rename: LocalFileSystem overrides it with an atomic
318        // fs::rename (creating missing destination parents); object stores
319        // use the default copy + delete — if the copy succeeds and the
320        // delete fails (or the process crashes between them), both source
321        // and destination exist with the same content. Recovery code must
322        // tolerate this case — see schema_state::recover_schema_state_files.
323        let from = self.object_path(from_uri)?;
324        let to = self.object_path(to_uri)?;
325        self.store
326            .rename(&from, &to)
327            .await
328            .map_err(|err| storage_backend_error("rename", from_uri, err))?;
329        Ok(())
330    }
331
332    async fn delete(&self, uri: &str) -> Result<()> {
333        let location = self.object_path(uri)?;
334        match self.store.delete(&location).await {
335            Ok(()) => Ok(()),
336            Err(object_store::Error::NotFound { .. }) => Ok(()),
337            Err(err) => Err(storage_backend_error("delete", uri, err)),
338        }
339    }
340
341    async fn list_dir(&self, dir_uri: &str) -> Result<Vec<String>> {
342        // list_with_delimiter is non-recursive and path-delimited on every
343        // backend (no sibling-prefix bleed: listing `__recovery` cannot
344        // match `__recovery_log/...`), and returns Ok(empty) for a missing
345        // directory. Output URIs are anchored on the INPUT `dir_uri` plus
346        // the entry filename, so the strings round-trip byte-identically
347        // into read_text/delete regardless of scheme (plain path, file://,
348        // s3://).
349        let anchor = dir_uri.trim_end_matches('/');
350        let prefix = self.object_path(anchor)?;
351        let listing = self
352            .store
353            .list_with_delimiter(Some(&prefix))
354            .await
355            .map_err(|err| storage_backend_error("list_dir", dir_uri, err))?;
356        let mut out = Vec::with_capacity(listing.objects.len());
357        for meta in listing.objects {
358            if let Some(name) = meta.location.filename() {
359                out.push(format!("{}/{}", anchor, name));
360            }
361        }
362        Ok(out)
363    }
364
365    async fn read_text_versioned(&self, uri: &str) -> Result<(String, String)> {
366        let location = self.object_path(uri)?;
367        let result = self
368            .store
369            .get(&location)
370            .await
371            .map_err(|err| storage_backend_error("read", uri, err))?;
372        let etag = result.meta.e_tag.clone();
373        let bytes = result
374            .bytes()
375            .await
376            .map_err(|err| storage_backend_error("read", uri, err))?;
377        // The token SOURCE must agree with the write_text_if_match strategy
378        // below: conditional-update backends compare ETags server-side, so
379        // the token is the ETag; the local emulation compares content, so
380        // the token is the content hash. Mixing them makes every CAS lose.
381        let version = if self.supports_conditional_update {
382            // Every S3-compatible store we target returns ETags; fall back
383            // to a content token rather than failing if one ever omits it.
384            etag.unwrap_or_else(|| local_version_token(&bytes))
385        } else {
386            local_version_token(&bytes)
387        };
388        let text = String::from_utf8(bytes.to_vec()).map_err(|err| {
389            OmniError::manifest_internal(format!("storage read failed for '{}': {}", uri, err))
390        })?;
391        Ok((text, version))
392    }
393
394    async fn write_text_if_match(
395        &self,
396        uri: &str,
397        contents: &str,
398        expected_version: &str,
399    ) -> Result<Option<String>> {
400        let location = self.object_path(uri)?;
401        if self.supports_conditional_update {
402            let mode = PutMode::Update(object_store::UpdateVersion {
403                e_tag: Some(expected_version.to_string()),
404                version: None,
405            });
406            return match self
407                .store
408                .put_opts(
409                    &location,
410                    PutPayload::from(contents.as_bytes().to_vec()),
411                    mode.into(),
412                )
413                .await
414            {
415                Ok(result) => Ok(Some(
416                    result
417                        .e_tag
418                        .unwrap_or_else(|| local_version_token(contents.as_bytes())),
419                )),
420                Err(object_store::Error::Precondition { .. })
421                | Err(object_store::Error::NotFound { .. }) => Ok(None),
422                Err(err) => Err(storage_backend_error("write_if_match", uri, err)),
423            };
424        }
425        // Local emulation: content-compare then atomic replace. NOT a
426        // cross-process CAS (check-then-act gap) — safe under the callers'
427        // lock protocol only; tracked in docs/dev/invariants.md Known Gaps.
428        let current = match self.store.get(&location).await {
429            Ok(result) => result
430                .bytes()
431                .await
432                .map_err(|err| storage_backend_error("read", uri, err))?,
433            Err(object_store::Error::NotFound { .. }) => return Ok(None),
434            Err(err) => return Err(storage_backend_error("read", uri, err)),
435        };
436        if local_version_token(&current) != expected_version {
437            return Ok(None);
438        }
439        self.store
440            .put(&location, PutPayload::from(contents.as_bytes().to_vec()))
441            .await
442            .map_err(|err| storage_backend_error("write_if_match", uri, err))?;
443        Ok(Some(local_version_token(contents.as_bytes())))
444    }
445
446    async fn delete_prefix(&self, prefix_uri: &str) -> Result<()> {
447        // Directories are a local-FS concept: a list+delete loop would
448        // leave empty directory skeletons that local existence probes
449        // (cluster graph_root_exists uses std Path::exists) report as
450        // still-present. remove_dir_all reclaims them in one call.
451        if self.codec == UriCodec::Local {
452            let path = absolutize_lexically(local_path_from_uri(prefix_uri)?)?;
453            return match tokio::fs::remove_dir_all(&path).await {
454                Ok(()) => Ok(()),
455                Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()),
456                Err(err) => Err(err.into()),
457            };
458        }
459        let prefix = self.object_path(prefix_uri.trim_end_matches('/'))?;
460        let mut entries = self.store.list(Some(&prefix));
461        let mut locations = Vec::new();
462        while let Some(meta) = entries
463            .try_next()
464            .await
465            .map_err(|err| storage_backend_error("delete_prefix", prefix_uri, err))?
466        {
467            locations.push(meta.location);
468        }
469        for location in locations {
470            match self.store.delete(&location).await {
471                Ok(()) => {}
472                Err(object_store::Error::NotFound { .. }) => {}
473                Err(err) => return Err(storage_backend_error("delete_prefix", prefix_uri, err)),
474            }
475        }
476        Ok(())
477    }
478}
479
480pub fn storage_kind_for_uri(uri: &str) -> StorageKind {
481    if uri.starts_with(S3_SCHEME_PREFIX) {
482        StorageKind::S3
483    } else {
484        StorageKind::Local
485    }
486}
487
488pub fn storage_for_uri(uri: &str) -> Result<Arc<dyn StorageAdapter>> {
489    match storage_kind_for_uri(uri) {
490        StorageKind::Local => Ok(Arc::new(ObjectStorageAdapter::local())),
491        StorageKind::S3 => Ok(Arc::new(ObjectStorageAdapter::s3_from_root_uri(uri)?)),
492    }
493}
494
495pub fn normalize_root_uri(uri: &str) -> Result<String> {
496    match storage_kind_for_uri(uri) {
497        StorageKind::Local => {
498            let path = local_path_from_uri(uri)?;
499            Ok(normalize_local_path(&path))
500        }
501        StorageKind::S3 => Ok(trim_trailing_slashes(uri)),
502    }
503}
504
505pub fn join_uri(root_uri: &str, relative_path: &str) -> String {
506    let relative_path = relative_path.trim_start_matches('/');
507    match storage_kind_for_uri(root_uri) {
508        StorageKind::S3 => {
509            let root = trim_trailing_slashes(root_uri);
510            if root.is_empty() {
511                relative_path.to_string()
512            } else {
513                format!("{}/{}", root, relative_path)
514            }
515        }
516        StorageKind::Local => {
517            let root = if root_uri.starts_with(FILE_SCHEME_PREFIX) {
518                local_path_from_file_uri(root_uri)
519                    .map(|path| normalize_local_path(&path))
520                    .unwrap_or_else(|_| trim_trailing_slashes(root_uri))
521            } else {
522                normalize_local_path(Path::new(root_uri))
523            };
524            let joined = Path::new(&root).join(relative_path);
525            normalize_local_path(&joined)
526        }
527    }
528}
529
530fn local_path_from_uri(uri: &str) -> Result<PathBuf> {
531    if uri.starts_with(FILE_SCHEME_PREFIX) {
532        return local_path_from_file_uri(uri);
533    }
534    Ok(PathBuf::from(uri))
535}
536
537/// Lexically absolutize a local path: join relative paths onto the current
538/// working directory and fold `.` / `..` components, without touching the
539/// filesystem. Required because `object_store::path::Path` rejects
540/// relative and dot segments, while callers (the CLI in particular) pass
541/// paths like `./graph.omni` verbatim.
542fn absolutize_lexically(path: PathBuf) -> Result<PathBuf> {
543    let joined = if path.is_absolute() {
544        path
545    } else {
546        std::env::current_dir()
547            .map_err(|err| {
548                OmniError::manifest_internal(format!(
549                    "cannot resolve relative storage path '{}': {}",
550                    path.display(),
551                    err
552                ))
553            })?
554            .join(path)
555    };
556    let mut out = PathBuf::new();
557    for component in joined.components() {
558        match component {
559            Component::CurDir => {}
560            Component::ParentDir => {
561                out.pop();
562            }
563            other => out.push(other),
564        }
565    }
566    Ok(out)
567}
568
569fn local_path_from_file_uri(uri: &str) -> Result<PathBuf> {
570    let url = Url::parse(uri).map_err(|err| {
571        OmniError::manifest_internal(format!("invalid file uri '{}': {}", uri, err))
572    })?;
573    url.to_file_path()
574        .map_err(|_| OmniError::manifest_internal(format!("invalid file uri '{}'", uri)))
575}
576
577fn parse_s3_uri(uri: &str) -> Result<S3Location> {
578    let url = Url::parse(uri).map_err(|err| {
579        OmniError::manifest_internal(format!("invalid s3 uri '{}': {}", uri, err))
580    })?;
581    if url.scheme() != "s3" {
582        return Err(OmniError::manifest_internal(format!(
583            "unsupported s3 uri '{}'",
584            uri
585        )));
586    }
587    let bucket = url
588        .host_str()
589        .ok_or_else(|| OmniError::manifest_internal(format!("missing s3 bucket in '{}'", uri)))?;
590    Ok(S3Location {
591        bucket: bucket.to_string(),
592        key: url.path().trim_start_matches('/').to_string(),
593    })
594}
595
596fn storage_backend_error(action: &str, uri: &str, err: impl std::fmt::Display) -> OmniError {
597    OmniError::manifest_internal(format!("storage {} failed for '{}': {}", action, uri, err))
598}
599
600fn normalize_local_path(path: &Path) -> String {
601    let raw = path.as_os_str().to_string_lossy();
602    if raw == "/" {
603        return raw.to_string();
604    }
605    trim_trailing_slashes(&raw)
606}
607
608fn trim_trailing_slashes(value: &str) -> String {
609    let trimmed = value.trim_end_matches('/');
610    if trimmed.is_empty() {
611        value.to_string()
612    } else {
613        trimmed.to_string()
614    }
615}
616
617fn env_var_truthy(key: &str) -> bool {
618    matches!(
619        env::var(key).ok().as_deref(),
620        Some("1" | "true" | "TRUE" | "True" | "yes" | "YES" | "on" | "ON")
621    )
622}
623
624#[cfg(test)]
625mod tests {
626    use super::*;
627
628    /// The executable backend contract: every assertion here must hold for
629    /// EVERY backend (the divergence class this adapter closed was "two
630    /// implementations, one prose contract, no referee"). The S3 variant
631    /// runs bucket-gated in `tests/s3_storage.rs`
632    /// (`s3_adapter_conditional_writes_contract`).
633    async fn contract_suite(adapter: &dyn StorageAdapter, root: &str) {
634        // Write/read round-trip; replace is in-place and atomic.
635        let a = format!("{root}/contract/a.json");
636        adapter.write_text(&a, "v1").await.unwrap();
637        assert_eq!(adapter.read_text(&a).await.unwrap(), "v1");
638        adapter.write_text(&a, "v2").await.unwrap();
639        assert_eq!(adapter.read_text(&a).await.unwrap(), "v2");
640
641        // exists: object yes; missing no; non-empty prefix yes (the
642        // directory-shaped probe Lance dataset roots rely on).
643        assert!(adapter.exists(&a).await.unwrap());
644        assert!(
645            !adapter
646                .exists(&format!("{root}/contract/missing.json"))
647                .await
648                .unwrap()
649        );
650        assert!(adapter.exists(&format!("{root}/contract")).await.unwrap());
651
652        // if_absent: exactly one claim wins; the loser leaves the winner's
653        // object untouched.
654        let claim = format!("{root}/contract/claim.json");
655        assert!(adapter.write_text_if_absent(&claim, "first").await.unwrap());
656        assert!(!adapter.write_text_if_absent(&claim, "second").await.unwrap());
657        assert_eq!(adapter.read_text(&claim).await.unwrap(), "first");
658
659        // Versioned CAS: fresh token wins, stale token loses with Ok(None)
660        // (never a silent overwrite), missing object can't match.
661        let state = format!("{root}/contract/state.json");
662        adapter.write_text(&state, "s1").await.unwrap();
663        let (text, v1) = adapter.read_text_versioned(&state).await.unwrap();
664        assert_eq!(text, "s1");
665        let v2 = adapter
666            .write_text_if_match(&state, "s2", &v1)
667            .await
668            .unwrap()
669            .expect("fresh token must win");
670        assert_ne!(v2, v1);
671        assert!(
672            adapter
673                .write_text_if_match(&state, "s3", &v1)
674                .await
675                .unwrap()
676                .is_none()
677        );
678        assert_eq!(adapter.read_text(&state).await.unwrap(), "s2");
679        assert!(
680            adapter
681                .write_text_if_match(&format!("{root}/contract/absent.json"), "x", &v1)
682                .await
683                .unwrap()
684                .is_none()
685        );
686
687        // rename: destination is replaced; source is gone.
688        let src = format!("{root}/contract/src.json");
689        adapter.write_text(&src, "moved").await.unwrap();
690        adapter.rename_text(&src, &a).await.unwrap();
691        assert_eq!(adapter.read_text(&a).await.unwrap(), "moved");
692        assert!(!adapter.exists(&src).await.unwrap());
693
694        // list_dir: direct children only, no sibling-prefix bleed, output
695        // URIs round-trip verbatim into read_text, missing dir is empty.
696        let dir_uri = format!("{root}/contract/list");
697        adapter
698            .write_text(&format!("{dir_uri}/one.json"), "1")
699            .await
700            .unwrap();
701        adapter
702            .write_text(&format!("{dir_uri}/two.json"), "2")
703            .await
704            .unwrap();
705        adapter
706            .write_text(&format!("{dir_uri}/sub/three.json"), "3")
707            .await
708            .unwrap();
709        adapter
710            .write_text(&format!("{root}/contract/list_log/x.json"), "x")
711            .await
712            .unwrap();
713        let mut listed = adapter.list_dir(&dir_uri).await.unwrap();
714        listed.sort();
715        assert_eq!(
716            listed,
717            vec![
718                format!("{dir_uri}/one.json"),
719                format!("{dir_uri}/two.json")
720            ]
721        );
722        for uri in &listed {
723            adapter.read_text(uri).await.unwrap();
724        }
725        assert!(
726            adapter
727                .list_dir(&format!("{root}/contract/nope"))
728                .await
729                .unwrap()
730                .is_empty()
731        );
732
733        // delete: idempotent.
734        adapter.delete(&claim).await.unwrap();
735        adapter.delete(&claim).await.unwrap();
736        assert!(!adapter.exists(&claim).await.unwrap());
737
738        // delete_prefix: recursive + idempotent; nothing under the prefix
739        // (including local directory skeletons) survives.
740        adapter
741            .delete_prefix(&format!("{root}/contract"))
742            .await
743            .unwrap();
744        assert!(!adapter.exists(&a).await.unwrap());
745        assert!(!adapter.exists(&format!("{root}/contract")).await.unwrap());
746        adapter
747            .delete_prefix(&format!("{root}/contract"))
748            .await
749            .unwrap();
750    }
751
752    #[tokio::test]
753    async fn contract_suite_local() {
754        let dir = tempfile::tempdir().unwrap();
755        let adapter = ObjectStorageAdapter::local();
756        contract_suite(&adapter, dir.path().to_str().unwrap()).await;
757    }
758
759    #[tokio::test]
760    async fn contract_suite_in_memory() {
761        // InMemory implements true conditional updates, so this runs the
762        // strong-CAS path (ETag tokens + PutMode::Update) without a bucket.
763        let adapter = ObjectStorageAdapter::in_memory();
764        contract_suite(&adapter, "mem-root").await;
765    }
766
767    /// `write_text_if_absent` must make the contents visible to any
768    /// subsequent reader before it returns — callers acknowledge
769    /// success the moment it resolves (cluster state bootstrap reads
770    /// the file back; init ownership claims depend on it).
771    /// Regression: the previous hand-rolled local adapter wrote through a
772    /// buffered `tokio::fs::File` without flushing, so the bytes could
773    /// still be in flight on the blocking pool while a reader saw an empty
774    /// or partial file. Reads back through `std::fs` deliberately —
775    /// cross-API visibility is the point.
776    #[tokio::test]
777    async fn local_write_text_if_absent_is_read_visible_on_return() {
778        let dir = tempfile::tempdir().unwrap();
779        let adapter = ObjectStorageAdapter::local();
780        let payload = "x".repeat(8 * 1024);
781        for i in 0..1000 {
782            let path = dir.path().join(format!("obj-{i}.json"));
783            let uri = format!("{}", path.display());
784            assert!(adapter.write_text_if_absent(&uri, &payload).await.unwrap());
785            let read = std::fs::read_to_string(&path).unwrap();
786            assert_eq!(
787                read.len(),
788                payload.len(),
789                "iteration {i}: write_text_if_absent returned before its \
790                 contents reached the file"
791            );
792        }
793    }
794
795    /// Regression for the write_text_if_absent buffering bug, via the
796    /// `storage_for_uri` + `file://` construction path and a multi-thread
797    /// runtime (complements `local_write_text_if_absent_is_read_visible_-
798    /// on_return`, which uses the direct constructor and plain paths): a
799    /// reader immediately after Ok(true) must never see the created file
800    /// empty or short.
801    #[tokio::test(flavor = "multi_thread")]
802    async fn write_text_if_absent_is_read_consistent_immediately() {
803        let dir = tempfile::tempdir().unwrap();
804        let adapter = storage_for_uri(&format!("file://{}", dir.path().display())).unwrap();
805        let payload = "x".repeat(64 * 1024);
806        for i in 0..200 {
807            let uri = format!("file://{}/f{}.json", dir.path().display(), i);
808            assert!(adapter.write_text_if_absent(&uri, &payload).await.unwrap());
809            let read = std::fs::read_to_string(dir.path().join(format!("f{i}.json"))).unwrap();
810            assert_eq!(read.len(), payload.len(), "iteration {i}: short read");
811        }
812    }
813
814    /// Object-store semantics on the local filesystem: only objects exist.
815    /// An empty directory is not an object and not a non-empty prefix —
816    /// callers that genuinely probe local directories use std::fs.
817    #[tokio::test]
818    async fn local_exists_is_object_semantics_for_directories() {
819        let dir = tempfile::tempdir().unwrap();
820        let probe = dir.path().join("maybe-dataset");
821        let adapter = ObjectStorageAdapter::local();
822        std::fs::create_dir(&probe).unwrap();
823        assert!(
824            !adapter.exists(probe.to_str().unwrap()).await.unwrap(),
825            "an empty directory is not an object"
826        );
827        std::fs::write(probe.join("1.manifest"), "m").unwrap();
828        assert!(
829            adapter.exists(probe.to_str().unwrap()).await.unwrap(),
830            "a non-empty prefix exists (the Lance dataset-root probe shape)"
831        );
832    }
833
834    /// list_dir output is anchored on the INPUT dir_uri, so `file://`
835    /// anchors and paths with spaces round-trip byte-identically into
836    /// read_text — the cluster store passes file://-schemed roots.
837    #[tokio::test]
838    async fn local_list_round_trips_file_scheme_and_spaces() {
839        let dir = tempfile::tempdir().unwrap();
840        let root = dir.path().join("with space");
841        let adapter = ObjectStorageAdapter::local();
842        let plain = format!("{}/x.json", root.display());
843        adapter.write_text(&plain, "x").await.unwrap();
844
845        let listed = adapter.list_dir(root.to_str().unwrap()).await.unwrap();
846        assert_eq!(listed, vec![plain.clone()]);
847        assert_eq!(adapter.read_text(&listed[0]).await.unwrap(), "x");
848
849        let file_anchor = format!("file://{}", root.display());
850        let listed = adapter.list_dir(&file_anchor).await.unwrap();
851        assert_eq!(listed, vec![format!("{file_anchor}/x.json")]);
852        assert_eq!(adapter.read_text(&listed[0]).await.unwrap(), "x");
853    }
854
855    /// Relative and dot-segment paths are lexically absolutized before
856    /// hitting the object-path layer (which rejects them) — the CLI passes
857    /// `./graph.omni`-shaped URIs verbatim.
858    #[tokio::test]
859    async fn local_paths_with_dot_segments_are_absolutized() {
860        let dir = tempfile::tempdir().unwrap();
861        let adapter = ObjectStorageAdapter::local();
862        let uri = format!("{}/sub/../dotted.json", dir.path().display());
863        adapter.write_text(&uri, "x").await.unwrap();
864        assert_eq!(adapter.read_text(&uri).await.unwrap(), "x");
865        assert!(dir.path().join("dotted.json").exists());
866    }
867
868    /// Upstream local rename creates missing destination parents — more
869    /// lenient than the previous bare fs::rename; pinned so an upstream
870    /// regression is loud.
871    #[tokio::test]
872    async fn local_rename_creates_missing_destination_parents() {
873        let dir = tempfile::tempdir().unwrap();
874        let adapter = ObjectStorageAdapter::local();
875        let src = format!("{}/src.json", dir.path().display());
876        adapter.write_text(&src, "x").await.unwrap();
877        let dst = format!("{}/new-sub/dst.json", dir.path().display());
878        adapter.rename_text(&src, &dst).await.unwrap();
879        assert_eq!(adapter.read_text(&dst).await.unwrap(), "x");
880    }
881
882    #[test]
883    fn storage_backend_selection_is_scheme_aware() {
884        assert_eq!(storage_kind_for_uri("/tmp/graph"), StorageKind::Local);
885        assert_eq!(
886            storage_kind_for_uri("file:///tmp/graph"),
887            StorageKind::Local
888        );
889        assert_eq!(
890            storage_kind_for_uri("s3://omnigraph-preview/graph"),
891            StorageKind::S3
892        );
893    }
894
895    #[test]
896    fn normalize_root_uri_preserves_local_and_s3_shapes() {
897        assert_eq!(
898            normalize_root_uri("/tmp/omnigraph/").unwrap(),
899            "/tmp/omnigraph"
900        );
901        assert_eq!(
902            normalize_root_uri("file:///tmp/omnigraph/").unwrap(),
903            "/tmp/omnigraph"
904        );
905        assert_eq!(
906            normalize_root_uri("s3://bucket/prefix/").unwrap(),
907            "s3://bucket/prefix"
908        );
909    }
910
911    #[test]
912    fn join_uri_handles_local_file_and_s3_roots() {
913        assert_eq!(
914            join_uri("/tmp/omnigraph", "_schema.pg"),
915            "/tmp/omnigraph/_schema.pg"
916        );
917        assert_eq!(
918            join_uri("file:///tmp/omnigraph", "_schema.pg"),
919            "/tmp/omnigraph/_schema.pg"
920        );
921        assert_eq!(
922            join_uri("s3://bucket/prefix", "_schema.pg"),
923            "s3://bucket/prefix/_schema.pg"
924        );
925    }
926
927    #[test]
928    fn parse_s3_uri_splits_bucket_and_key() {
929        let location = parse_s3_uri("s3://bucket/graph/_schema.pg").unwrap();
930        assert_eq!(location.bucket, "bucket");
931        assert_eq!(location.key, "graph/_schema.pg");
932    }
933
934}