Skip to main content

snapdir_stores/
s3_store.rs

1//! `S3Store`: the `s3://` storage backend, backed by the native AWS SDK.
2//!
3//! An [`S3Store`] targets an `s3://bucket/prefix` location and holds the frozen
4//! content-addressable `.objects`/`.manifests` sharded layout, so a
5//! bucket/prefix is interchangeable across conforming implementations:
6//!
7//! ```text
8//! s3://<bucket>/<prefix>/.objects/<sharded checksum>     raw object bytes
9//! s3://<bucket>/<prefix>/.manifests/<sharded snapshot id> manifest text
10//! ```
11//!
12//! Sharding and the relative keys come straight from [`snapdir_core::store`]
13//! ([`object_path`] / [`manifest_path`]); this module never reimplements them.
14//!
15//! # Credentials
16//!
17//! Authentication is delegated entirely to the standard AWS credential chain
18//! via [`aws_config`] (environment variables, shared config/credentials
19//! profiles, SSO, container/instance metadata, …). No bespoke snapdir
20//! credential variables are introduced. An S3-compatible endpoint (`MinIO`,
21//! `SeaweedFS`, …) can be selected with `SNAPDIR_S3_TEST_ENDPOINT` for the
22//! gated live test, or by constructing the store with an explicit endpoint.
23//!
24//! # TLS provider (project-load-bearing)
25//!
26//! The shipped binary must statically link on musl, so the workspace
27//! standardizes on the **`ring`** rustls provider; `aws-lc-rs` is banned. The
28//! AWS SDK defaults to an aws-lc-rs-backed HTTP connector, so this module builds
29//! the SDK's modern hyper-1.x HTTP client ([`aws_smithy_http_client`]) with its
30//! `rustls`/**`ring`** TLS provider and hands it to the SDK as a custom
31//! [`HttpClient`](aws_smithy_runtime_api::client::http::HttpClient). Native root
32//! trust anchors stay on (the builder's default `TrustStore`).
33//!
34//! # Sync trait, async SDK
35//!
36//! The SDK is async. [`S3Store`] owns a private multi-thread `tokio` runtime and
37//! bridges each [`Store`] method with `runtime.block_on(...)`, so no `async`
38//! leaks into `snapdir-core` or the orchestrator (see [`snapdir_core::store`]).
39
40use std::path::Path;
41use std::sync::Arc;
42
43use aws_config::BehaviorVersion;
44use aws_sdk_s3::config::Region;
45use aws_sdk_s3::error::{ProvideErrorMetadata, SdkError};
46use aws_sdk_s3::Client;
47use aws_smithy_http_client::tls::rustls_provider::CryptoMode;
48use aws_smithy_http_client::tls::Provider as TlsProvider;
49use aws_smithy_http_client::Builder as HttpClientBuilder;
50use aws_smithy_runtime_api::client::orchestrator::HttpResponse;
51use snapdir_core::manifest::Manifest;
52use snapdir_core::merkle::{Blake3Hasher, Hasher};
53use snapdir_core::store::{manifest_path, object_path, Store, StoreError};
54use snapdir_core::Meter;
55
56use crate::fetch::fetch_files_concurrent;
57use crate::push::{push_objects_concurrent, upload_object};
58use crate::retry::{parse_retry_after, retry_network, Attempt, DefaultJitter, TokioSleeper};
59use crate::stream::StreamStore;
60use crate::transfer::{classify_error, RateLimiter, TransferConfig};
61
62use std::error::Error as StdError;
63use tokio::runtime::Runtime;
64
65/// Number of times a fetch is retried when the downloaded bytes fail their
66/// checksum, mirroring the oracle's `_SNAPDIR_S3_STORE_RETRIES` default of 5.
67const MAX_FETCH_RETRIES: u32 = 5;
68
69/// The parsed location an [`S3Store`] targets: an S3 bucket plus a key prefix.
70#[derive(Debug, Clone, PartialEq, Eq)]
71pub struct S3Location {
72    /// The bucket name (first path segment of the `s3://` URL).
73    pub bucket: String,
74    /// The key prefix (remaining segments), with no leading or trailing slash.
75    /// Empty when the store points at the bucket root.
76    pub prefix: String,
77}
78
79impl S3Location {
80    /// Parses an `s3://bucket/prefix` URL into its bucket and prefix.
81    ///
82    /// Matches the frozen URL derivation
83    /// (`_snapdir_export_store_vars`): splitting the store URL on `/`, the
84    /// bucket is `cut -f3` (the segment after `s3://`) and the base dir is
85    /// `cut -f4-` (everything after). The prefix has any trailing slash
86    /// stripped, matching `_snapdir_s3_store_get_remote_prefix`.
87    ///
88    /// The `s3://` scheme is optional; a bare `bucket/prefix` is accepted too.
89    #[must_use]
90    pub fn parse(store_url: &str) -> Self {
91        // Drop the scheme (`s3://`, or any `<proto>://`) if present. The oracle
92        // splits the full URL on `/` and takes field 3 as the bucket, which for
93        // `s3://bucket/...` is exactly the segment after the `//`.
94        let without_scheme = match store_url.find("://") {
95            Some(idx) => &store_url[idx + 3..],
96            None => store_url,
97        };
98        let mut parts = without_scheme.splitn(2, '/');
99        let bucket = parts.next().unwrap_or("").to_owned();
100        let prefix = parts.next().unwrap_or("");
101        // Strip a trailing slash (and any leading slash from an empty first
102        // segment edge case); the prefix is joined back with a single `/`.
103        let prefix = prefix
104            .trim_end_matches('/')
105            .trim_start_matches('/')
106            .to_owned();
107        Self { bucket, prefix }
108    }
109
110    /// Returns the full S3 object key for a content object given its checksum,
111    /// i.e. `<prefix>/.objects/<sharded>` (no leading slash).
112    #[must_use]
113    pub fn object_key(&self, checksum: &str) -> String {
114        self.key_for(&object_path(checksum))
115    }
116
117    /// Returns the full S3 object key for a manifest given its snapshot id,
118    /// i.e. `<prefix>/.manifests/<sharded>` (no leading slash).
119    #[must_use]
120    pub fn manifest_key(&self, id: &str) -> String {
121        self.key_for(&manifest_path(id))
122    }
123
124    /// Joins the store prefix with a store-relative path (`.objects/...` or
125    /// `.manifests/...`), producing a leading-slash-free S3 key. Mirrors the
126    /// oracle's `${_SNAPDIR_STORE_BASE_DIR%/}/${source_path#/}` with the
127    /// leading slash trimmed.
128    fn key_for(&self, rel: &str) -> String {
129        let rel = rel.trim_start_matches('/');
130        if self.prefix.is_empty() {
131            rel.to_owned()
132        } else {
133            format!("{}/{rel}", self.prefix)
134        }
135    }
136}
137
138/// A content-addressable store backed by an S3 (or S3-compatible) bucket.
139///
140/// Construct one with [`S3Store::connect`] (resolves the standard AWS
141/// credential chain) or [`S3Store::from_client`] (an already-built SDK client,
142/// e.g. for tests against an emulator).
143pub struct S3Store {
144    client: Client,
145    location: S3Location,
146    runtime: Arc<Runtime>,
147    config: TransferConfig,
148    /// Per-call request-rate limiter (one token per SDK call), built from
149    /// [`TransferConfig::max_requests_per_sec`]. Unlimited (a no-op) by default.
150    /// Shared (clone) so every call paces against one aggregate request budget.
151    req_limiter: RateLimiter,
152    /// Optional progress meter; recorded into during transfers. `None` (the
153    /// default from every constructor) means zero recording and byte-identical
154    /// behavior. Set by the CLI via [`S3Store::with_meter`].
155    meter: Option<Arc<Meter>>,
156}
157
158impl S3Store {
159    /// Connects to the `s3://bucket/prefix` store, resolving credentials and
160    /// region via the standard AWS chain ([`aws_config::load_defaults`]).
161    ///
162    /// The HTTP client is pinned to the `ring` rustls provider (see the module
163    /// docs). An optional `endpoint_url` selects an S3-compatible backend
164    /// (path-style addressing is enabled when an endpoint is given, as
165    /// emulators rarely support virtual-host addressing).
166    ///
167    /// # Errors
168    ///
169    /// [`StoreError::Backend`] if the tokio runtime cannot be created or the
170    /// AWS configuration cannot be loaded.
171    pub fn connect(store_url: &str, endpoint_url: Option<&str>) -> Result<Self, StoreError> {
172        Self::connect_with(store_url, endpoint_url, TransferConfig::default())
173    }
174
175    /// Like [`connect`](Self::connect), but carries a [`TransferConfig`] for
176    /// concurrency / bandwidth control. The existing [`connect`](Self::connect)
177    /// delegates here with [`TransferConfig::default`].
178    ///
179    /// # Errors
180    ///
181    /// [`StoreError::Backend`] if the tokio runtime cannot be created or the
182    /// AWS configuration cannot be loaded.
183    pub fn connect_with(
184        store_url: &str,
185        endpoint_url: Option<&str>,
186        config: TransferConfig,
187    ) -> Result<Self, StoreError> {
188        let location = S3Location::parse(store_url);
189        let runtime = build_runtime()?;
190
191        let http_client = ring_https_client();
192        let endpoint = endpoint_url.map(ToOwned::to_owned);
193        let client = runtime.block_on(async move {
194            let mut loader = aws_config::defaults(BehaviorVersion::latest())
195                .http_client(http_client.clone())
196                // Disable the SDK's own retry loop so snapdir's RetryPolicy is
197                // the single backoff authority (no SDK-3x × ours-5x compounding).
198                .retry_config(aws_config::retry::RetryConfig::disabled());
199            if let Some(ep) = endpoint.as_deref() {
200                loader = loader.endpoint_url(ep);
201            }
202            let shared = loader.load().await;
203            let mut builder = aws_sdk_s3::config::Builder::from(&shared);
204            if endpoint.is_some() {
205                // S3-compatible emulators generally require path-style keys.
206                builder = builder.force_path_style(true);
207            }
208            // Some emulators / configs leave the region unset; S3 still
209            // requires a value to sign requests, so default it.
210            if shared.region().is_none() {
211                builder = builder.region(Region::new("us-east-1"));
212            }
213            Client::from_conf(builder.build())
214        });
215
216        let req_limiter = RateLimiter::new(config.max_requests_per_sec);
217        Ok(Self {
218            client,
219            location,
220            runtime: Arc::new(runtime),
221            config,
222            req_limiter,
223            meter: None,
224        })
225    }
226
227    /// Builds a store from an already-configured SDK [`Client`] and a parsed
228    /// location, owning a fresh tokio runtime for the sync bridge. Intended for
229    /// tests (e.g. wiring a client at an emulator endpoint).
230    ///
231    /// # Errors
232    ///
233    /// [`StoreError::Backend`] if the tokio runtime cannot be created.
234    pub fn from_client(client: Client, location: S3Location) -> Result<Self, StoreError> {
235        let config = TransferConfig::default();
236        let req_limiter = RateLimiter::new(config.max_requests_per_sec);
237        Ok(Self {
238            client,
239            location,
240            runtime: Arc::new(build_runtime()?),
241            config,
242            req_limiter,
243            meter: None,
244        })
245    }
246
247    /// Attaches (or clears) an optional progress [`Meter`], rides alongside
248    /// [`config`](Self::transfer_config). The transfer paths record bytes-in /
249    /// bytes-out + per-object progress into it; `None` (the constructor default)
250    /// means zero recording and byte-identical behavior. The CLI sets this after
251    /// construction.
252    #[must_use]
253    pub fn with_meter(mut self, meter: Option<Arc<Meter>>) -> Self {
254        self.meter = meter;
255        self
256    }
257
258    /// The parsed bucket/prefix this store targets.
259    #[must_use]
260    pub fn location(&self) -> &S3Location {
261        &self.location
262    }
263
264    /// The [`TransferConfig`] (concurrency / bandwidth) this store was built
265    /// with. Consumed by the transfer loops in later gates.
266    #[must_use]
267    pub fn transfer_config(&self) -> &TransferConfig {
268        &self.config
269    }
270
271    /// HEAD an object key; `Ok(true)` if it exists, `Ok(false)` if absent.
272    ///
273    /// The single SDK call is wrapped in [`retry_network`]: it acquires one
274    /// request-rate token, then retries a TRANSIENT failure under the store's
275    /// [`RetryPolicy`]. A `404`/not-found is a normal `Ok(false)` outcome (not a
276    /// retry); any other error is classified via [`s3_attempt_from_err`].
277    async fn key_exists(&self, key: &str) -> Result<bool, StoreError> {
278        retry_network(
279            &self.config.retry,
280            &self.req_limiter,
281            &TokioSleeper,
282            &DefaultJitter::new(),
283            || async {
284                match self
285                    .client
286                    .head_object()
287                    .bucket(&self.location.bucket)
288                    .key(key)
289                    .send()
290                    .await
291                {
292                    Ok(_) => Ok(true),
293                    Err(err) => {
294                        // A genuine "absent" is a successful outcome of the op,
295                        // not a retry: peek the concrete service error first.
296                        if err.as_service_error().is_some_and(
297                            aws_sdk_s3::operation::head_object::HeadObjectError::is_not_found,
298                        ) {
299                            return Ok(false);
300                        }
301                        Err(s3_attempt_from_err("S3 HEAD object failed", err))
302                    }
303                }
304            },
305        )
306        .await
307    }
308
309    /// GET an object key's full body, or `None` if it is absent.
310    ///
311    /// The SDK call is wrapped in [`retry_network`] exactly like
312    /// [`key_exists`](Self::key_exists). A `NoSuchKey` is a normal `Ok(None)`
313    /// outcome; transient failures retry under the store's [`RetryPolicy`](crate::retry::RetryPolicy).
314    async fn get_bytes(&self, key: &str) -> Result<Option<Vec<u8>>, StoreError> {
315        retry_network(
316            &self.config.retry,
317            &self.req_limiter,
318            &TokioSleeper,
319            &DefaultJitter::new(),
320            || async {
321                match self
322                    .client
323                    .get_object()
324                    .bucket(&self.location.bucket)
325                    .key(key)
326                    .send()
327                    .await
328                {
329                    Ok(resp) => {
330                        // Draining the streamed body can itself fail transiently
331                        // (connection reset mid-download); classify it too.
332                        let data = resp.body.collect().await.map_err(|e| {
333                            let err = backend("reading S3 object body", e);
334                            let transient =
335                                matches!(classify_error(&err), crate::adaptive::OpResult::Throttle);
336                            Attempt {
337                                transient,
338                                retry_after: None,
339                                err,
340                            }
341                        })?;
342                        Ok(Some(data.into_bytes().to_vec()))
343                    }
344                    Err(err) => {
345                        if err.as_service_error().is_some_and(
346                            aws_sdk_s3::operation::get_object::GetObjectError::is_no_such_key,
347                        ) {
348                            return Ok(None);
349                        }
350                        Err(s3_attempt_from_err("S3 GET object failed", err))
351                    }
352                }
353            },
354        )
355        .await
356    }
357
358    /// PUT `bytes` at `key`. S3 PUT is atomic, so no temp-key dance is needed
359    /// (the oracle relies on the same atomicity for manifests/objects).
360    ///
361    /// Wrapped in [`retry_network`]: each (re)try re-sends the full body, so the
362    /// content-addressed bytes that land are unchanged — only transient failures
363    /// retry under the store's [`RetryPolicy`](crate::retry::RetryPolicy).
364    async fn put_bytes(&self, key: &str, bytes: Vec<u8>) -> Result<(), StoreError> {
365        retry_network(
366            &self.config.retry,
367            &self.req_limiter,
368            &TokioSleeper,
369            &DefaultJitter::new(),
370            || {
371                let bytes = bytes.clone();
372                async move {
373                    self.client
374                        .put_object()
375                        .bucket(&self.location.bucket)
376                        .key(key)
377                        .body(bytes.into())
378                        .send()
379                        .await
380                        .map(|_| ())
381                        .map_err(|err| s3_attempt_from_err("S3 PUT object failed", err))
382                }
383            },
384        )
385        .await
386    }
387
388    /// Downloads `key`, verifying its BLAKE3 against `expected`, retrying up to
389    /// [`MAX_FETCH_RETRIES`] times. Mirrors `_snapdir_s3_fetch_to_cache`.
390    async fn fetch_verified(&self, key: &str, expected: &str) -> Result<Vec<u8>, StoreError> {
391        let hasher = Blake3Hasher::new();
392        let mut attempts_left = MAX_FETCH_RETRIES;
393        loop {
394            match self.get_bytes(key).await? {
395                Some(bytes) => {
396                    let actual = hasher.hash_hex(&bytes);
397                    if actual == expected {
398                        return Ok(bytes);
399                    }
400                    // Mismatched checksum after fetching: retry (the oracle
401                    // decrements its retry budget on the same condition).
402                    attempts_left = attempts_left.saturating_sub(1);
403                    if attempts_left == 0 {
404                        return Err(StoreError::Integrity {
405                            address: format!("s3://{}/{key}", self.location.bucket),
406                            expected: expected.to_owned(),
407                            actual,
408                        });
409                    }
410                }
411                None => {
412                    // Treat a missing key as not-found rather than spinning.
413                    return Err(StoreError::ObjectNotFound {
414                        checksum: expected.to_owned(),
415                    });
416                }
417            }
418        }
419    }
420}
421
422impl Store for S3Store {
423    fn get_manifest(&self, id: &str) -> Result<Manifest, StoreError> {
424        let key = self.location.manifest_key(id);
425        let bytes = self.runtime.block_on(async {
426            match self.get_bytes(&key).await? {
427                Some(b) => Ok(b),
428                None => Err(StoreError::ManifestNotFound { id: id.to_owned() }),
429            }
430        })?;
431
432        let text = String::from_utf8(bytes).map_err(|err| StoreError::Backend {
433            message: format!("manifest {id} is not valid UTF-8"),
434            source: Some(Box::new(err)),
435        })?;
436        let manifest = Manifest::parse(&text)?;
437
438        // Verify the stored manifest hashes back to its snapshot id before
439        // trusting it (oracle: the id check on fetch).
440        let actual = snapdir_core::merkle::snapshot_id(&manifest, &Blake3Hasher::new());
441        if actual != id {
442            return Err(StoreError::Integrity {
443                address: self.location.manifest_key(id),
444                expected: id.to_owned(),
445                actual,
446            });
447        }
448        Ok(manifest)
449    }
450
451    fn fetch_files(&self, manifest: &Manifest, dest: &Path) -> Result<(), StoreError> {
452        // Concurrent download via the shared orchestrator: it owns the
453        // skip-if-present-and-verified short-circuit, directory creation, the
454        // bounded-concurrency pass, the per-object rate limit, and the atomic
455        // write. S3 only injects the per-object download, preserving the
456        // BLAKE3-verify + retry discipline of `fetch_verified`.
457        let limiter = RateLimiter::new(self.config.max_bytes_per_sec);
458        let meter = self.meter.as_deref();
459        let meter_arc = self.meter.clone();
460        self.runtime.block_on(async {
461            fetch_files_concurrent(
462                manifest,
463                dest,
464                &self.config,
465                &limiter,
466                meter,
467                meter_arc,
468                |entry| async {
469                    let key = self.location.object_key(&entry.checksum);
470                    self.fetch_verified(&key, &entry.checksum).await
471                },
472            )
473            .await
474        })
475    }
476
477    fn push(&self, manifest: &Manifest, source: &Path) -> Result<(), StoreError> {
478        let hasher = Blake3Hasher::new();
479        let id = snapdir_core::merkle::snapshot_id(manifest, &hasher);
480
481        // Concurrent upload via the shared orchestrator: it owns the bounded
482        // per-object pass and the manifest-last / all-or-nothing ordering. S3
483        // injects the per-object skip-present + upload (via `upload_object`,
484        // which also owns the shared read+verify) and the manifest-write
485        // closure. A failed push writes NO manifest.
486        let limiter = RateLimiter::new(self.config.max_bytes_per_sec);
487        let meter = self.meter.as_deref();
488        let meter_arc = self.meter.clone();
489        self.runtime.block_on(async {
490            // Skip-if-manifest-present pre-check: a present manifest implies all
491            // its objects are present (we always write the manifest last).
492            let manifest_key = self.location.manifest_key(&id);
493            if self.key_exists(&manifest_key).await? {
494                return Ok(());
495            }
496
497            push_objects_concurrent(
498                manifest,
499                &self.config,
500                &limiter,
501                meter,
502                meter_arc,
503                |entry| {
504                    let object_key = self.location.object_key(&entry.checksum);
505                    upload_object(
506                        entry,
507                        object_key,
508                        source,
509                        &limiter,
510                        meter,
511                        |key| async move { self.key_exists(&key).await },
512                        |key, bytes| async move { self.put_bytes(&key, bytes).await },
513                    )
514                },
515                || async {
516                    // Write the manifest last (verified to hash back to its id),
517                    // exactly as the oracle stores `echo "${manifest}"` text.
518                    let mut text = manifest.to_string();
519                    text.push('\n');
520                    let manifest_actual = hasher.hash_hex(text.as_bytes());
521                    if manifest_actual != id {
522                        return Err(StoreError::Integrity {
523                            address: manifest_key.clone(),
524                            expected: id.clone(),
525                            actual: manifest_actual,
526                        });
527                    }
528                    self.put_bytes(&manifest_key, text.into_bytes()).await
529                },
530            )
531            .await
532        })
533    }
534}
535
536impl StreamStore for S3Store {
537    fn has_object(&self, checksum: &str) -> Result<bool, StoreError> {
538        let key = self.location.object_key(checksum);
539        self.runtime.block_on(async { self.key_exists(&key).await })
540    }
541
542    fn get_object(&self, checksum: &str) -> Result<Vec<u8>, StoreError> {
543        let key = self.location.object_key(checksum);
544        let bytes = self.runtime.block_on(async {
545            self.get_bytes(&key)
546                .await?
547                .ok_or_else(|| StoreError::ObjectNotFound {
548                    checksum: checksum.to_owned(),
549                })
550        })?;
551
552        // Verify the downloaded blob hashes back to its content-address before
553        // returning it (corruption surfaces as `Integrity`, never bad bytes).
554        let actual = Blake3Hasher::new().hash_hex(&bytes);
555        if actual != checksum {
556            return Err(StoreError::Integrity {
557                address: format!("s3://{}/{key}", self.location.bucket),
558                expected: checksum.to_owned(),
559                actual,
560            });
561        }
562        Ok(bytes)
563    }
564
565    fn put_object(&self, checksum: &str, bytes: Vec<u8>) -> Result<(), StoreError> {
566        // Verify BEFORE uploading: a blob whose bytes do not hash to `checksum`
567        // must never land at that content-address (nothing is stored).
568        let actual = Blake3Hasher::new().hash_hex(&bytes);
569        if actual != checksum {
570            return Err(StoreError::Integrity {
571                address: self.location.object_key(checksum),
572                expected: checksum.to_owned(),
573                actual,
574            });
575        }
576        let key = self.location.object_key(checksum);
577        self.runtime
578            .block_on(async { self.put_bytes(&key, bytes).await })
579    }
580
581    fn put_manifest(&self, id: &str, manifest: &Manifest) -> Result<(), StoreError> {
582        let key = self.location.manifest_key(id);
583        // Mirror the manifest-write tail of `push`: render the oracle's
584        // `echo "${manifest}"` bytes, verify they hash back to `id`, then PUT.
585        let mut text = manifest.to_string();
586        text.push('\n');
587        let actual = Blake3Hasher::new().hash_hex(text.as_bytes());
588        if actual != id {
589            return Err(StoreError::Integrity {
590                address: key,
591                expected: id.to_owned(),
592                actual,
593            });
594        }
595        self.runtime
596            .block_on(async { self.put_bytes(&key, text.into_bytes()).await })
597    }
598}
599
600/// Builds the multi-thread tokio runtime that backs the sync bridge.
601fn build_runtime() -> Result<Runtime, StoreError> {
602    tokio::runtime::Builder::new_multi_thread()
603        .enable_all()
604        .build()
605        .map_err(|e| backend("creating tokio runtime for S3Store", e))
606}
607
608/// Builds the AWS-SDK hyper-1.x HTTP client backed by `rustls` using the
609/// **`ring`** crypto provider, with native-root trust anchors (the builder's
610/// default `TrustStore` enables them). This is the load-bearing piece that keeps
611/// `aws-lc-rs` (and the legacy hyper-0.14 TLS island) out of the dependency
612/// graph.
613fn ring_https_client() -> aws_smithy_runtime_api::client::http::SharedHttpClient {
614    HttpClientBuilder::new()
615        .tls_provider(TlsProvider::Rustls(CryptoMode::Ring))
616        .build_https()
617}
618
619/// Wraps any backend error into [`StoreError::Backend`] with a message.
620fn backend<E>(message: &str, source: E) -> StoreError
621where
622    E: std::error::Error + Send + Sync + 'static,
623{
624    StoreError::Backend {
625        message: message.to_owned(),
626        source: Some(Box::new(source)),
627    }
628}
629
630/// Builds a retry [`Attempt`] from a concrete aws-sdk-s3 [`SdkError`], at the
631/// boundary where the SDK error still carries its raw HTTP response.
632///
633/// - `transient`: true for the retryable signal set — an HTTP `429`/`503`
634///   status on the raw response, the throttle error codes
635///   (`SlowDown`/`Throttling`/`RequestTimeout`/`ServiceUnavailable`/…), and the
636///   transport/timeout classes — folded through the shared
637///   [`classify_error`](crate::classify_error) on the mapped [`StoreError`].
638///   Conservative: an unknown error is NOT transient.
639/// - `retry_after`: extracted from the raw response's `Retry-After` header (the
640///   delta-seconds form) when present, else `None`.
641/// - `err`: the mapped [`StoreError::Backend`] (the same value the non-retry
642///   path used to surface).
643fn s3_attempt_from_err<E>(message: &str, err: SdkError<E, HttpResponse>) -> Attempt
644where
645    E: ProvideErrorMetadata + StdError + Send + Sync + 'static,
646{
647    // Extract status code + Retry-After off the raw HTTP response (present on
648    // ServiceError / ResponseError variants) BEFORE we consume the error.
649    let (http_status, retry_after) = match err.raw_response() {
650        Some(resp) => {
651            let status = resp.status().as_u16();
652            let hint = resp
653                .headers()
654                .get("retry-after")
655                .and_then(parse_retry_after);
656            (Some(status), hint)
657        }
658        None => (None, None),
659    };
660
661    // The SDK error code (e.g. "SlowDown", "ServiceUnavailable") rides in the
662    // error metadata; fold it (plus the status) into the mapped StoreError's
663    // text so the shared classifier sees the transient signals.
664    let code = err.code().unwrap_or_default().to_owned();
665    let store_err = backend(message, err);
666
667    let transient = http_status.is_some_and(|s| s == 429 || s == 503)
668        || matches!(
669            classify_error(&store_err),
670            crate::adaptive::OpResult::Throttle
671        )
672        || {
673            let c = code.to_ascii_lowercase();
674            c.contains("slowdown")
675                || c.contains("throttl")
676                || c.contains("requesttimeout")
677                || c.contains("serviceunavailable")
678                || c.contains("internalerror")
679        };
680
681    Attempt {
682        transient,
683        retry_after,
684        err: store_err,
685    }
686}
687
688#[cfg(test)]
689mod tests {
690    use super::*;
691    use aws_sdk_s3::error::ErrorMetadata;
692    use aws_sdk_s3::operation::head_object::HeadObjectError;
693    use aws_sdk_s3::primitives::SdkBody;
694    use aws_smithy_runtime_api::http::StatusCode;
695    use snapdir_core::manifest::PathType;
696    use std::time::Duration;
697
698    /// Builds a raw S3 [`HttpResponse`] with the given status and optional
699    /// `Retry-After` header, for exercising [`s3_attempt_from_err`] without a
700    /// live SDK call.
701    fn raw_response(status: u16, retry_after: Option<&str>) -> HttpResponse {
702        let mut resp = HttpResponse::new(
703            StatusCode::try_from(status).expect("valid status"),
704            SdkBody::empty(),
705        );
706        if let Some(v) = retry_after {
707            resp.headers_mut().insert("retry-after", v.to_owned());
708        }
709        resp
710    }
711
712    /// Wraps an S3 service error (carrying `code`) plus a raw response into the
713    /// concrete `SdkError` the store methods see, so the extractor runs on a
714    /// real SDK error shape.
715    fn s3_service_error(code: &str, status: u16, retry_after: Option<&str>) -> Attempt {
716        let meta = ErrorMetadata::builder().code(code).build();
717        let svc = HeadObjectError::generic(meta);
718        let err = SdkError::service_error(svc, raw_response(status, retry_after));
719        s3_attempt_from_err("S3 op failed", err)
720    }
721
722    #[test]
723    fn backoff_wire_s3_extract_503_retry_after_is_transient_with_hint() {
724        // A 503 SlowDown carrying `Retry-After: 12` => transient, hint = 12s.
725        let attempt = s3_service_error("SlowDown", 503, Some("12"));
726        assert!(attempt.transient, "503/SlowDown must be transient");
727        assert_eq!(
728            attempt.retry_after,
729            Some(Duration::from_secs(12)),
730            "the Retry-After delta-seconds header must be extracted"
731        );
732    }
733
734    #[test]
735    fn backoff_wire_s3_extract_429_without_header_is_transient_no_hint() {
736        // A 429 with no Retry-After header => still transient, but no hint.
737        let attempt = s3_service_error("Throttling", 429, None);
738        assert!(attempt.transient, "429 must be transient");
739        assert_eq!(
740            attempt.retry_after, None,
741            "absent Retry-After header => None (backoff handles the delay)"
742        );
743    }
744
745    #[test]
746    fn backoff_wire_s3_extract_404_is_not_transient() {
747        // A 404 NoSuchKey-style hard error => NOT transient (conservative).
748        let attempt = s3_service_error("NoSuchKey", 404, None);
749        assert!(
750            !attempt.transient,
751            "a 404/not-found must never be classified transient"
752        );
753        assert_eq!(attempt.retry_after, None);
754    }
755
756    /// Strips a leading `./` and a trailing `/` from a manifest path. Kept as a
757    /// test-only assertion of the path normalization the orchestrator
758    /// (`crate::push`) performs.
759    fn strip_leading_dot_slash(path: &str) -> &str {
760        let trimmed = path.strip_prefix("./").unwrap_or(path);
761        trimmed.strip_suffix('/').unwrap_or(trimmed)
762    }
763
764    // The canonical content-addressable fixtures from the s3 store test suite.
765    const FOO_CHECKSUM: &str = "49dc870df1de7fd60794cebce449f5ccdae575affaa67a24b62acb03e039db92";
766    const FOO_SHARDED: &str = "49d/c87/0df/1de7fd60794cebce449f5ccdae575affaa67a24b62acb03e039db92";
767    const MANIFEST_ID: &str = "aa91e498f401ea9e6ddbaa1138a0dbeb030fab8defc1252d80c77ebefafbc70d";
768    const MANIFEST_SHARDED: &str =
769        "aa9/1e4/98f/401ea9e6ddbaa1138a0dbeb030fab8defc1252d80c77ebefafbc70d";
770
771    #[test]
772    fn s3_store_parses_bucket_and_prefix() {
773        let loc = S3Location::parse("s3://my-bucket/long/term/storage");
774        assert_eq!(loc.bucket, "my-bucket");
775        assert_eq!(loc.prefix, "long/term/storage");
776    }
777
778    #[test]
779    fn s3_store_parse_matches_oracle_cut_fields() {
780        // Oracle: bucket = `cut -d'/' -f3`, base_dir = `cut -d'/' -f4-`.
781        // For "s3://bucket/a/b/c": fields are [s3:,"",bucket,a,b,c].
782        let loc = S3Location::parse("s3://bucket/a/b/c");
783        assert_eq!(loc.bucket, "bucket");
784        assert_eq!(loc.prefix, "a/b/c");
785    }
786
787    #[test]
788    fn s3_store_parse_strips_trailing_slash() {
789        // `_snapdir_s3_store_get_remote_prefix` strips the trailing slash.
790        let loc = S3Location::parse("s3://bucket/prefix/");
791        assert_eq!(loc.bucket, "bucket");
792        assert_eq!(loc.prefix, "prefix");
793    }
794
795    #[test]
796    fn s3_store_parse_bucket_root_has_empty_prefix() {
797        let loc = S3Location::parse("s3://bucket");
798        assert_eq!(loc.bucket, "bucket");
799        assert_eq!(loc.prefix, "");
800
801        let loc_slash = S3Location::parse("s3://bucket/");
802        assert_eq!(loc_slash.bucket, "bucket");
803        assert_eq!(loc_slash.prefix, "");
804    }
805
806    #[test]
807    fn s3_store_parse_accepts_bare_bucket_prefix_without_scheme() {
808        let loc = S3Location::parse("bucket/some/prefix");
809        assert_eq!(loc.bucket, "bucket");
810        assert_eq!(loc.prefix, "some/prefix");
811    }
812
813    #[test]
814    fn s3_store_object_key_matches_sharded_scheme() {
815        let loc = S3Location::parse("s3://b/long/term/storage");
816        assert_eq!(
817            loc.object_key(FOO_CHECKSUM),
818            format!("long/term/storage/.objects/{FOO_SHARDED}")
819        );
820    }
821
822    #[test]
823    fn s3_store_manifest_key_matches_sharded_scheme() {
824        let loc = S3Location::parse("s3://b/long/term/storage");
825        assert_eq!(
826            loc.manifest_key(MANIFEST_ID),
827            format!("long/term/storage/.manifests/{MANIFEST_SHARDED}")
828        );
829    }
830
831    #[test]
832    fn s3_store_keys_have_no_leading_slash_at_bucket_root() {
833        // With an empty prefix the keys are just `.objects/...` / `.manifests/...`.
834        let loc = S3Location::parse("s3://bucket");
835        assert_eq!(
836            loc.object_key(FOO_CHECKSUM),
837            format!(".objects/{FOO_SHARDED}")
838        );
839        assert_eq!(
840            loc.manifest_key(MANIFEST_ID),
841            format!(".manifests/{MANIFEST_SHARDED}")
842        );
843    }
844
845    #[test]
846    fn s3_store_object_key_uses_core_object_path() {
847        // Cross-check that we delegate to the frozen core sharding helper rather
848        // than reimplementing it: the key tail must equal `object_path` output.
849        let loc = S3Location::parse("s3://b");
850        assert_eq!(loc.object_key(FOO_CHECKSUM), object_path(FOO_CHECKSUM));
851    }
852
853    #[test]
854    fn s3_store_strip_leading_dot_slash() {
855        assert_eq!(strip_leading_dot_slash("./foo"), "foo");
856        assert_eq!(strip_leading_dot_slash("./a/b/c"), "a/b/c");
857        assert_eq!(strip_leading_dot_slash("./a/"), "a");
858        assert_eq!(strip_leading_dot_slash("./"), "");
859    }
860
861    // --- Live round-trip, skipped by default --------------------------------
862    //
863    // Requires an S3-compatible endpoint (e.g. MinIO/SeaweedFS) plus AWS
864    // credentials in the environment. Gated behind `SNAPDIR_S3_TEST_ENDPOINT`
865    // and `SNAPDIR_S3_TEST_STORE` (an `s3://bucket/prefix` URL) so it is skipped
866    // unless explicitly configured. Real emulator round-trips are exercised by
867    // the later `remote-interop` gate.
868    #[test]
869    fn s3_store_live_round_trip_when_configured() {
870        use snapdir_core::manifest::ManifestEntry;
871
872        let (Ok(endpoint), Ok(store)) = (
873            std::env::var("SNAPDIR_S3_TEST_ENDPOINT"),
874            std::env::var("SNAPDIR_S3_TEST_STORE"),
875        ) else {
876            eprintln!(
877                "skipping s3_store live round-trip: set SNAPDIR_S3_TEST_ENDPOINT \
878                 and SNAPDIR_S3_TEST_STORE (s3://bucket/prefix) to run it"
879            );
880            return;
881        };
882
883        let hasher = Blake3Hasher::new();
884
885        // Build a tiny source tree + matching manifest.
886        let src = std::env::temp_dir().join(format!("snapdir-s3-live-{}", std::process::id()));
887        std::fs::create_dir_all(&src).unwrap();
888        std::fs::write(src.join("foo"), b"foo\n").unwrap();
889        let foo_sum = hasher.hash_hex(b"foo\n");
890        let root_sum = snapdir_core::merkle::directory_checksum([foo_sum.as_str()], &hasher);
891        let mut manifest = Manifest::new();
892        manifest.push(ManifestEntry::new(
893            PathType::Directory,
894            "700",
895            root_sum,
896            4,
897            "./",
898        ));
899        manifest.push(ManifestEntry::new(
900            PathType::File,
901            "600",
902            foo_sum,
903            4,
904            "./foo",
905        ));
906        let manifest = Manifest::from_entries(manifest.entries().to_vec());
907        let id = snapdir_core::merkle::snapshot_id(&manifest, &hasher);
908
909        let s3 = S3Store::connect(&store, Some(&endpoint)).expect("connect");
910        s3.push(&manifest, &src).expect("push");
911        let read_back = s3.get_manifest(&id).expect("get_manifest");
912        assert_eq!(read_back, manifest);
913
914        let dest = std::env::temp_dir().join(format!("snapdir-s3-dest-{}", std::process::id()));
915        std::fs::create_dir_all(&dest).unwrap();
916        s3.fetch_files(&read_back, &dest).expect("fetch_files");
917        assert_eq!(std::fs::read(dest.join("foo")).unwrap(), b"foo\n");
918
919        let _ = std::fs::remove_dir_all(&src);
920        let _ = std::fs::remove_dir_all(&dest);
921    }
922}