Skip to main content

snapdir_stores/
s3_store.rs

1//! `S3Store`: the `s3://` storage backend, backed by the native AWS SDK.
2//!
3//! An [`S3Store`] targets an `s3://bucket/prefix` location and holds the frozen
4//! content-addressable `.objects`/`.manifests` sharded layout, so a
5//! bucket/prefix is interchangeable across conforming implementations:
6//!
7//! ```text
8//! s3://<bucket>/<prefix>/.objects/<sharded checksum>     raw object bytes
9//! s3://<bucket>/<prefix>/.manifests/<sharded snapshot id> manifest text
10//! ```
11//!
12//! Sharding and the relative keys come straight from [`snapdir_core::store`]
13//! ([`object_path`] / [`manifest_path`]); this module never reimplements them.
14//!
15//! # Credentials
16//!
17//! Authentication is delegated entirely to the standard AWS credential chain
18//! via [`aws_config`] (environment variables, shared config/credentials
19//! profiles, SSO, container/instance metadata, …). No bespoke snapdir
20//! credential variables are introduced. An S3-compatible endpoint (`MinIO`,
21//! `SeaweedFS`, …) can be selected with `SNAPDIR_S3_TEST_ENDPOINT` for the
22//! gated live test, or by constructing the store with an explicit endpoint.
23//!
24//! # TLS provider (project-load-bearing)
25//!
26//! The shipped binary must statically link on musl, so the workspace
27//! standardizes on the **`ring`** rustls provider; `aws-lc-rs` is banned. The
28//! AWS SDK defaults to an aws-lc-rs-backed HTTP connector, so this module builds
29//! the SDK's modern hyper-1.x HTTP client ([`aws_smithy_http_client`]) with its
30//! `rustls`/**`ring`** TLS provider and hands it to the SDK as a custom
31//! [`HttpClient`](aws_smithy_runtime_api::client::http::HttpClient). Native root
32//! trust anchors stay on (the builder's default `TrustStore`).
33//!
34//! # Sync trait, async SDK
35//!
36//! The SDK is async. [`S3Store`] owns a private multi-thread `tokio` runtime and
37//! bridges each [`Store`] method with `runtime.block_on(...)`, so no `async`
38//! leaks into `snapdir-core` or the orchestrator (see [`snapdir_core::store`]).
39
40use std::path::Path;
41use std::sync::Arc;
42
43use aws_config::BehaviorVersion;
44use aws_sdk_s3::config::Region;
45use aws_sdk_s3::Client;
46use aws_smithy_http_client::tls::rustls_provider::CryptoMode;
47use aws_smithy_http_client::tls::Provider as TlsProvider;
48use aws_smithy_http_client::Builder as HttpClientBuilder;
49use snapdir_core::manifest::Manifest;
50use snapdir_core::merkle::{Blake3Hasher, Hasher};
51use snapdir_core::store::{manifest_path, object_path, Store, StoreError};
52use snapdir_core::Meter;
53
54use crate::fetch::fetch_files_concurrent;
55use crate::push::{push_objects_concurrent, upload_object};
56use crate::stream::StreamStore;
57use crate::transfer::{RateLimiter, TransferConfig};
58
59use tokio::runtime::Runtime;
60
61/// Number of times a fetch is retried when the downloaded bytes fail their
62/// checksum, mirroring the oracle's `_SNAPDIR_S3_STORE_RETRIES` default of 5.
63const MAX_FETCH_RETRIES: u32 = 5;
64
65/// The parsed location an [`S3Store`] targets: an S3 bucket plus a key prefix.
66#[derive(Debug, Clone, PartialEq, Eq)]
67pub struct S3Location {
68    /// The bucket name (first path segment of the `s3://` URL).
69    pub bucket: String,
70    /// The key prefix (remaining segments), with no leading or trailing slash.
71    /// Empty when the store points at the bucket root.
72    pub prefix: String,
73}
74
75impl S3Location {
76    /// Parses an `s3://bucket/prefix` URL into its bucket and prefix.
77    ///
78    /// Matches the frozen URL derivation
79    /// (`_snapdir_export_store_vars`): splitting the store URL on `/`, the
80    /// bucket is `cut -f3` (the segment after `s3://`) and the base dir is
81    /// `cut -f4-` (everything after). The prefix has any trailing slash
82    /// stripped, matching `_snapdir_s3_store_get_remote_prefix`.
83    ///
84    /// The `s3://` scheme is optional; a bare `bucket/prefix` is accepted too.
85    #[must_use]
86    pub fn parse(store_url: &str) -> Self {
87        // Drop the scheme (`s3://`, or any `<proto>://`) if present. The oracle
88        // splits the full URL on `/` and takes field 3 as the bucket, which for
89        // `s3://bucket/...` is exactly the segment after the `//`.
90        let without_scheme = match store_url.find("://") {
91            Some(idx) => &store_url[idx + 3..],
92            None => store_url,
93        };
94        let mut parts = without_scheme.splitn(2, '/');
95        let bucket = parts.next().unwrap_or("").to_owned();
96        let prefix = parts.next().unwrap_or("");
97        // Strip a trailing slash (and any leading slash from an empty first
98        // segment edge case); the prefix is joined back with a single `/`.
99        let prefix = prefix
100            .trim_end_matches('/')
101            .trim_start_matches('/')
102            .to_owned();
103        Self { bucket, prefix }
104    }
105
106    /// Returns the full S3 object key for a content object given its checksum,
107    /// i.e. `<prefix>/.objects/<sharded>` (no leading slash).
108    #[must_use]
109    pub fn object_key(&self, checksum: &str) -> String {
110        self.key_for(&object_path(checksum))
111    }
112
113    /// Returns the full S3 object key for a manifest given its snapshot id,
114    /// i.e. `<prefix>/.manifests/<sharded>` (no leading slash).
115    #[must_use]
116    pub fn manifest_key(&self, id: &str) -> String {
117        self.key_for(&manifest_path(id))
118    }
119
120    /// Joins the store prefix with a store-relative path (`.objects/...` or
121    /// `.manifests/...`), producing a leading-slash-free S3 key. Mirrors the
122    /// oracle's `${_SNAPDIR_STORE_BASE_DIR%/}/${source_path#/}` with the
123    /// leading slash trimmed.
124    fn key_for(&self, rel: &str) -> String {
125        let rel = rel.trim_start_matches('/');
126        if self.prefix.is_empty() {
127            rel.to_owned()
128        } else {
129            format!("{}/{rel}", self.prefix)
130        }
131    }
132}
133
134/// A content-addressable store backed by an S3 (or S3-compatible) bucket.
135///
136/// Construct one with [`S3Store::connect`] (resolves the standard AWS
137/// credential chain) or [`S3Store::from_client`] (an already-built SDK client,
138/// e.g. for tests against an emulator).
139pub struct S3Store {
140    client: Client,
141    location: S3Location,
142    runtime: Arc<Runtime>,
143    config: TransferConfig,
144    /// Optional progress meter; recorded into during transfers. `None` (the
145    /// default from every constructor) means zero recording and byte-identical
146    /// behavior. Set by the CLI via [`S3Store::with_meter`].
147    meter: Option<Arc<Meter>>,
148}
149
150impl S3Store {
151    /// Connects to the `s3://bucket/prefix` store, resolving credentials and
152    /// region via the standard AWS chain ([`aws_config::load_defaults`]).
153    ///
154    /// The HTTP client is pinned to the `ring` rustls provider (see the module
155    /// docs). An optional `endpoint_url` selects an S3-compatible backend
156    /// (path-style addressing is enabled when an endpoint is given, as
157    /// emulators rarely support virtual-host addressing).
158    ///
159    /// # Errors
160    ///
161    /// [`StoreError::Backend`] if the tokio runtime cannot be created or the
162    /// AWS configuration cannot be loaded.
163    pub fn connect(store_url: &str, endpoint_url: Option<&str>) -> Result<Self, StoreError> {
164        Self::connect_with(store_url, endpoint_url, TransferConfig::default())
165    }
166
167    /// Like [`connect`](Self::connect), but carries a [`TransferConfig`] for
168    /// concurrency / bandwidth control. The existing [`connect`](Self::connect)
169    /// delegates here with [`TransferConfig::default`].
170    ///
171    /// # Errors
172    ///
173    /// [`StoreError::Backend`] if the tokio runtime cannot be created or the
174    /// AWS configuration cannot be loaded.
175    pub fn connect_with(
176        store_url: &str,
177        endpoint_url: Option<&str>,
178        config: TransferConfig,
179    ) -> Result<Self, StoreError> {
180        let location = S3Location::parse(store_url);
181        let runtime = build_runtime()?;
182
183        let http_client = ring_https_client();
184        let endpoint = endpoint_url.map(ToOwned::to_owned);
185        let client = runtime.block_on(async move {
186            let mut loader =
187                aws_config::defaults(BehaviorVersion::latest()).http_client(http_client.clone());
188            if let Some(ep) = endpoint.as_deref() {
189                loader = loader.endpoint_url(ep);
190            }
191            let shared = loader.load().await;
192            let mut builder = aws_sdk_s3::config::Builder::from(&shared);
193            if endpoint.is_some() {
194                // S3-compatible emulators generally require path-style keys.
195                builder = builder.force_path_style(true);
196            }
197            // Some emulators / configs leave the region unset; S3 still
198            // requires a value to sign requests, so default it.
199            if shared.region().is_none() {
200                builder = builder.region(Region::new("us-east-1"));
201            }
202            Client::from_conf(builder.build())
203        });
204
205        Ok(Self {
206            client,
207            location,
208            runtime: Arc::new(runtime),
209            config,
210            meter: None,
211        })
212    }
213
214    /// Builds a store from an already-configured SDK [`Client`] and a parsed
215    /// location, owning a fresh tokio runtime for the sync bridge. Intended for
216    /// tests (e.g. wiring a client at an emulator endpoint).
217    ///
218    /// # Errors
219    ///
220    /// [`StoreError::Backend`] if the tokio runtime cannot be created.
221    pub fn from_client(client: Client, location: S3Location) -> Result<Self, StoreError> {
222        Ok(Self {
223            client,
224            location,
225            runtime: Arc::new(build_runtime()?),
226            config: TransferConfig::default(),
227            meter: None,
228        })
229    }
230
231    /// Attaches (or clears) an optional progress [`Meter`], rides alongside
232    /// [`config`](Self::transfer_config). The transfer paths record bytes-in /
233    /// bytes-out + per-object progress into it; `None` (the constructor default)
234    /// means zero recording and byte-identical behavior. The CLI sets this after
235    /// construction.
236    #[must_use]
237    pub fn with_meter(mut self, meter: Option<Arc<Meter>>) -> Self {
238        self.meter = meter;
239        self
240    }
241
242    /// The parsed bucket/prefix this store targets.
243    #[must_use]
244    pub fn location(&self) -> &S3Location {
245        &self.location
246    }
247
248    /// The [`TransferConfig`] (concurrency / bandwidth) this store was built
249    /// with. Consumed by the transfer loops in later gates.
250    #[must_use]
251    pub fn transfer_config(&self) -> &TransferConfig {
252        &self.config
253    }
254
255    /// HEAD an object key; `Ok(true)` if it exists, `Ok(false)` if absent.
256    async fn key_exists(&self, key: &str) -> Result<bool, StoreError> {
257        match self
258            .client
259            .head_object()
260            .bucket(&self.location.bucket)
261            .key(key)
262            .send()
263            .await
264        {
265            Ok(_) => Ok(true),
266            Err(err) => {
267                let svc = err.into_service_error();
268                if svc.is_not_found() {
269                    Ok(false)
270                } else {
271                    Err(backend("S3 HEAD object failed", svc))
272                }
273            }
274        }
275    }
276
277    /// GET an object key's full body, or `None` if it is absent.
278    async fn get_bytes(&self, key: &str) -> Result<Option<Vec<u8>>, StoreError> {
279        match self
280            .client
281            .get_object()
282            .bucket(&self.location.bucket)
283            .key(key)
284            .send()
285            .await
286        {
287            Ok(resp) => {
288                let data = resp
289                    .body
290                    .collect()
291                    .await
292                    .map_err(|e| backend("reading S3 object body", e))?;
293                Ok(Some(data.into_bytes().to_vec()))
294            }
295            Err(err) => {
296                let svc = err.into_service_error();
297                if svc.is_no_such_key() {
298                    Ok(None)
299                } else {
300                    Err(backend("S3 GET object failed", svc))
301                }
302            }
303        }
304    }
305
306    /// PUT `bytes` at `key`. S3 PUT is atomic, so no temp-key dance is needed
307    /// (the oracle relies on the same atomicity for manifests/objects).
308    async fn put_bytes(&self, key: &str, bytes: Vec<u8>) -> Result<(), StoreError> {
309        self.client
310            .put_object()
311            .bucket(&self.location.bucket)
312            .key(key)
313            .body(bytes.into())
314            .send()
315            .await
316            .map_err(|e| backend("S3 PUT object failed", e))?;
317        Ok(())
318    }
319
320    /// Downloads `key`, verifying its BLAKE3 against `expected`, retrying up to
321    /// [`MAX_FETCH_RETRIES`] times. Mirrors `_snapdir_s3_fetch_to_cache`.
322    async fn fetch_verified(&self, key: &str, expected: &str) -> Result<Vec<u8>, StoreError> {
323        let hasher = Blake3Hasher::new();
324        let mut attempts_left = MAX_FETCH_RETRIES;
325        loop {
326            match self.get_bytes(key).await? {
327                Some(bytes) => {
328                    let actual = hasher.hash_hex(&bytes);
329                    if actual == expected {
330                        return Ok(bytes);
331                    }
332                    // Mismatched checksum after fetching: retry (the oracle
333                    // decrements its retry budget on the same condition).
334                    attempts_left = attempts_left.saturating_sub(1);
335                    if attempts_left == 0 {
336                        return Err(StoreError::Integrity {
337                            address: format!("s3://{}/{key}", self.location.bucket),
338                            expected: expected.to_owned(),
339                            actual,
340                        });
341                    }
342                }
343                None => {
344                    // Treat a missing key as not-found rather than spinning.
345                    return Err(StoreError::ObjectNotFound {
346                        checksum: expected.to_owned(),
347                    });
348                }
349            }
350        }
351    }
352}
353
354impl Store for S3Store {
355    fn get_manifest(&self, id: &str) -> Result<Manifest, StoreError> {
356        let key = self.location.manifest_key(id);
357        let bytes = self.runtime.block_on(async {
358            match self.get_bytes(&key).await? {
359                Some(b) => Ok(b),
360                None => Err(StoreError::ManifestNotFound { id: id.to_owned() }),
361            }
362        })?;
363
364        let text = String::from_utf8(bytes).map_err(|err| StoreError::Backend {
365            message: format!("manifest {id} is not valid UTF-8"),
366            source: Some(Box::new(err)),
367        })?;
368        let manifest = Manifest::parse(&text)?;
369
370        // Verify the stored manifest hashes back to its snapshot id before
371        // trusting it (oracle: the id check on fetch).
372        let actual = snapdir_core::merkle::snapshot_id(&manifest, &Blake3Hasher::new());
373        if actual != id {
374            return Err(StoreError::Integrity {
375                address: self.location.manifest_key(id),
376                expected: id.to_owned(),
377                actual,
378            });
379        }
380        Ok(manifest)
381    }
382
383    fn fetch_files(&self, manifest: &Manifest, dest: &Path) -> Result<(), StoreError> {
384        // Concurrent download via the shared orchestrator: it owns the
385        // skip-if-present-and-verified short-circuit, directory creation, the
386        // bounded-concurrency pass, the per-object rate limit, and the atomic
387        // write. S3 only injects the per-object download, preserving the
388        // BLAKE3-verify + retry discipline of `fetch_verified`.
389        let limiter = RateLimiter::new(self.config.max_bytes_per_sec);
390        let meter = self.meter.as_deref();
391        self.runtime.block_on(async {
392            fetch_files_concurrent(
393                manifest,
394                dest,
395                &self.config,
396                &limiter,
397                meter,
398                |entry| async {
399                    let key = self.location.object_key(&entry.checksum);
400                    self.fetch_verified(&key, &entry.checksum).await
401                },
402            )
403            .await
404        })
405    }
406
407    fn push(&self, manifest: &Manifest, source: &Path) -> Result<(), StoreError> {
408        let hasher = Blake3Hasher::new();
409        let id = snapdir_core::merkle::snapshot_id(manifest, &hasher);
410
411        // Concurrent upload via the shared orchestrator: it owns the bounded
412        // per-object pass and the manifest-last / all-or-nothing ordering. S3
413        // injects the per-object skip-present + upload (via `upload_object`,
414        // which also owns the shared read+verify) and the manifest-write
415        // closure. A failed push writes NO manifest.
416        let limiter = RateLimiter::new(self.config.max_bytes_per_sec);
417        let meter = self.meter.as_deref();
418        self.runtime.block_on(async {
419            // Skip-if-manifest-present pre-check: a present manifest implies all
420            // its objects are present (we always write the manifest last).
421            let manifest_key = self.location.manifest_key(&id);
422            if self.key_exists(&manifest_key).await? {
423                return Ok(());
424            }
425
426            push_objects_concurrent(
427                manifest,
428                &self.config,
429                meter,
430                |entry| {
431                    let object_key = self.location.object_key(&entry.checksum);
432                    upload_object(
433                        entry,
434                        object_key,
435                        source,
436                        &limiter,
437                        meter,
438                        |key| async move { self.key_exists(&key).await },
439                        |key, bytes| async move { self.put_bytes(&key, bytes).await },
440                    )
441                },
442                || async {
443                    // Write the manifest last (verified to hash back to its id),
444                    // exactly as the oracle stores `echo "${manifest}"` text.
445                    let mut text = manifest.to_string();
446                    text.push('\n');
447                    let manifest_actual = hasher.hash_hex(text.as_bytes());
448                    if manifest_actual != id {
449                        return Err(StoreError::Integrity {
450                            address: manifest_key.clone(),
451                            expected: id.clone(),
452                            actual: manifest_actual,
453                        });
454                    }
455                    self.put_bytes(&manifest_key, text.into_bytes()).await
456                },
457            )
458            .await
459        })
460    }
461}
462
463impl StreamStore for S3Store {
464    fn has_object(&self, checksum: &str) -> Result<bool, StoreError> {
465        let key = self.location.object_key(checksum);
466        self.runtime.block_on(async { self.key_exists(&key).await })
467    }
468
469    fn get_object(&self, checksum: &str) -> Result<Vec<u8>, StoreError> {
470        let key = self.location.object_key(checksum);
471        let bytes = self.runtime.block_on(async {
472            self.get_bytes(&key)
473                .await?
474                .ok_or_else(|| StoreError::ObjectNotFound {
475                    checksum: checksum.to_owned(),
476                })
477        })?;
478
479        // Verify the downloaded blob hashes back to its content-address before
480        // returning it (corruption surfaces as `Integrity`, never bad bytes).
481        let actual = Blake3Hasher::new().hash_hex(&bytes);
482        if actual != checksum {
483            return Err(StoreError::Integrity {
484                address: format!("s3://{}/{key}", self.location.bucket),
485                expected: checksum.to_owned(),
486                actual,
487            });
488        }
489        Ok(bytes)
490    }
491
492    fn put_object(&self, checksum: &str, bytes: Vec<u8>) -> Result<(), StoreError> {
493        // Verify BEFORE uploading: a blob whose bytes do not hash to `checksum`
494        // must never land at that content-address (nothing is stored).
495        let actual = Blake3Hasher::new().hash_hex(&bytes);
496        if actual != checksum {
497            return Err(StoreError::Integrity {
498                address: self.location.object_key(checksum),
499                expected: checksum.to_owned(),
500                actual,
501            });
502        }
503        let key = self.location.object_key(checksum);
504        self.runtime
505            .block_on(async { self.put_bytes(&key, bytes).await })
506    }
507
508    fn put_manifest(&self, id: &str, manifest: &Manifest) -> Result<(), StoreError> {
509        let key = self.location.manifest_key(id);
510        // Mirror the manifest-write tail of `push`: render the oracle's
511        // `echo "${manifest}"` bytes, verify they hash back to `id`, then PUT.
512        let mut text = manifest.to_string();
513        text.push('\n');
514        let actual = Blake3Hasher::new().hash_hex(text.as_bytes());
515        if actual != id {
516            return Err(StoreError::Integrity {
517                address: key,
518                expected: id.to_owned(),
519                actual,
520            });
521        }
522        self.runtime
523            .block_on(async { self.put_bytes(&key, text.into_bytes()).await })
524    }
525}
526
527/// Builds the multi-thread tokio runtime that backs the sync bridge.
528fn build_runtime() -> Result<Runtime, StoreError> {
529    tokio::runtime::Builder::new_multi_thread()
530        .enable_all()
531        .build()
532        .map_err(|e| backend("creating tokio runtime for S3Store", e))
533}
534
535/// Builds the AWS-SDK hyper-1.x HTTP client backed by `rustls` using the
536/// **`ring`** crypto provider, with native-root trust anchors (the builder's
537/// default `TrustStore` enables them). This is the load-bearing piece that keeps
538/// `aws-lc-rs` (and the legacy hyper-0.14 TLS island) out of the dependency
539/// graph.
540fn ring_https_client() -> aws_smithy_runtime_api::client::http::SharedHttpClient {
541    HttpClientBuilder::new()
542        .tls_provider(TlsProvider::Rustls(CryptoMode::Ring))
543        .build_https()
544}
545
546/// Wraps any backend error into [`StoreError::Backend`] with a message.
547fn backend<E>(message: &str, source: E) -> StoreError
548where
549    E: std::error::Error + Send + Sync + 'static,
550{
551    StoreError::Backend {
552        message: message.to_owned(),
553        source: Some(Box::new(source)),
554    }
555}
556
557#[cfg(test)]
558mod tests {
559    use super::*;
560    use snapdir_core::manifest::PathType;
561
562    /// Strips a leading `./` and a trailing `/` from a manifest path. Kept as a
563    /// test-only assertion of the path normalization the orchestrator
564    /// (`crate::push`) performs.
565    fn strip_leading_dot_slash(path: &str) -> &str {
566        let trimmed = path.strip_prefix("./").unwrap_or(path);
567        trimmed.strip_suffix('/').unwrap_or(trimmed)
568    }
569
570    // The canonical content-addressable fixtures from the s3 store test suite.
571    const FOO_CHECKSUM: &str = "49dc870df1de7fd60794cebce449f5ccdae575affaa67a24b62acb03e039db92";
572    const FOO_SHARDED: &str = "49d/c87/0df/1de7fd60794cebce449f5ccdae575affaa67a24b62acb03e039db92";
573    const MANIFEST_ID: &str = "aa91e498f401ea9e6ddbaa1138a0dbeb030fab8defc1252d80c77ebefafbc70d";
574    const MANIFEST_SHARDED: &str =
575        "aa9/1e4/98f/401ea9e6ddbaa1138a0dbeb030fab8defc1252d80c77ebefafbc70d";
576
577    #[test]
578    fn s3_store_parses_bucket_and_prefix() {
579        let loc = S3Location::parse("s3://my-bucket/long/term/storage");
580        assert_eq!(loc.bucket, "my-bucket");
581        assert_eq!(loc.prefix, "long/term/storage");
582    }
583
584    #[test]
585    fn s3_store_parse_matches_oracle_cut_fields() {
586        // Oracle: bucket = `cut -d'/' -f3`, base_dir = `cut -d'/' -f4-`.
587        // For "s3://bucket/a/b/c": fields are [s3:,"",bucket,a,b,c].
588        let loc = S3Location::parse("s3://bucket/a/b/c");
589        assert_eq!(loc.bucket, "bucket");
590        assert_eq!(loc.prefix, "a/b/c");
591    }
592
593    #[test]
594    fn s3_store_parse_strips_trailing_slash() {
595        // `_snapdir_s3_store_get_remote_prefix` strips the trailing slash.
596        let loc = S3Location::parse("s3://bucket/prefix/");
597        assert_eq!(loc.bucket, "bucket");
598        assert_eq!(loc.prefix, "prefix");
599    }
600
601    #[test]
602    fn s3_store_parse_bucket_root_has_empty_prefix() {
603        let loc = S3Location::parse("s3://bucket");
604        assert_eq!(loc.bucket, "bucket");
605        assert_eq!(loc.prefix, "");
606
607        let loc_slash = S3Location::parse("s3://bucket/");
608        assert_eq!(loc_slash.bucket, "bucket");
609        assert_eq!(loc_slash.prefix, "");
610    }
611
612    #[test]
613    fn s3_store_parse_accepts_bare_bucket_prefix_without_scheme() {
614        let loc = S3Location::parse("bucket/some/prefix");
615        assert_eq!(loc.bucket, "bucket");
616        assert_eq!(loc.prefix, "some/prefix");
617    }
618
619    #[test]
620    fn s3_store_object_key_matches_sharded_scheme() {
621        let loc = S3Location::parse("s3://b/long/term/storage");
622        assert_eq!(
623            loc.object_key(FOO_CHECKSUM),
624            format!("long/term/storage/.objects/{FOO_SHARDED}")
625        );
626    }
627
628    #[test]
629    fn s3_store_manifest_key_matches_sharded_scheme() {
630        let loc = S3Location::parse("s3://b/long/term/storage");
631        assert_eq!(
632            loc.manifest_key(MANIFEST_ID),
633            format!("long/term/storage/.manifests/{MANIFEST_SHARDED}")
634        );
635    }
636
637    #[test]
638    fn s3_store_keys_have_no_leading_slash_at_bucket_root() {
639        // With an empty prefix the keys are just `.objects/...` / `.manifests/...`.
640        let loc = S3Location::parse("s3://bucket");
641        assert_eq!(
642            loc.object_key(FOO_CHECKSUM),
643            format!(".objects/{FOO_SHARDED}")
644        );
645        assert_eq!(
646            loc.manifest_key(MANIFEST_ID),
647            format!(".manifests/{MANIFEST_SHARDED}")
648        );
649    }
650
651    #[test]
652    fn s3_store_object_key_uses_core_object_path() {
653        // Cross-check that we delegate to the frozen core sharding helper rather
654        // than reimplementing it: the key tail must equal `object_path` output.
655        let loc = S3Location::parse("s3://b");
656        assert_eq!(loc.object_key(FOO_CHECKSUM), object_path(FOO_CHECKSUM));
657    }
658
659    #[test]
660    fn s3_store_strip_leading_dot_slash() {
661        assert_eq!(strip_leading_dot_slash("./foo"), "foo");
662        assert_eq!(strip_leading_dot_slash("./a/b/c"), "a/b/c");
663        assert_eq!(strip_leading_dot_slash("./a/"), "a");
664        assert_eq!(strip_leading_dot_slash("./"), "");
665    }
666
667    // --- Live round-trip, skipped by default --------------------------------
668    //
669    // Requires an S3-compatible endpoint (e.g. MinIO/SeaweedFS) plus AWS
670    // credentials in the environment. Gated behind `SNAPDIR_S3_TEST_ENDPOINT`
671    // and `SNAPDIR_S3_TEST_STORE` (an `s3://bucket/prefix` URL) so it is skipped
672    // unless explicitly configured. Real emulator round-trips are exercised by
673    // the later `remote-interop` gate.
674    #[test]
675    fn s3_store_live_round_trip_when_configured() {
676        use snapdir_core::manifest::ManifestEntry;
677
678        let (Ok(endpoint), Ok(store)) = (
679            std::env::var("SNAPDIR_S3_TEST_ENDPOINT"),
680            std::env::var("SNAPDIR_S3_TEST_STORE"),
681        ) else {
682            eprintln!(
683                "skipping s3_store live round-trip: set SNAPDIR_S3_TEST_ENDPOINT \
684                 and SNAPDIR_S3_TEST_STORE (s3://bucket/prefix) to run it"
685            );
686            return;
687        };
688
689        let hasher = Blake3Hasher::new();
690
691        // Build a tiny source tree + matching manifest.
692        let src = std::env::temp_dir().join(format!("snapdir-s3-live-{}", std::process::id()));
693        std::fs::create_dir_all(&src).unwrap();
694        std::fs::write(src.join("foo"), b"foo\n").unwrap();
695        let foo_sum = hasher.hash_hex(b"foo\n");
696        let root_sum = snapdir_core::merkle::directory_checksum([foo_sum.as_str()], &hasher);
697        let mut manifest = Manifest::new();
698        manifest.push(ManifestEntry::new(
699            PathType::Directory,
700            "700",
701            root_sum,
702            4,
703            "./",
704        ));
705        manifest.push(ManifestEntry::new(
706            PathType::File,
707            "600",
708            foo_sum,
709            4,
710            "./foo",
711        ));
712        let manifest = Manifest::from_entries(manifest.entries().to_vec());
713        let id = snapdir_core::merkle::snapshot_id(&manifest, &hasher);
714
715        let s3 = S3Store::connect(&store, Some(&endpoint)).expect("connect");
716        s3.push(&manifest, &src).expect("push");
717        let read_back = s3.get_manifest(&id).expect("get_manifest");
718        assert_eq!(read_back, manifest);
719
720        let dest = std::env::temp_dir().join(format!("snapdir-s3-dest-{}", std::process::id()));
721        std::fs::create_dir_all(&dest).unwrap();
722        s3.fetch_files(&read_back, &dest).expect("fetch_files");
723        assert_eq!(std::fs::read(dest.join("foo")).unwrap(), b"foo\n");
724
725        let _ = std::fs::remove_dir_all(&src);
726        let _ = std::fs::remove_dir_all(&dest);
727    }
728}