snapdir_stores/s3_store.rs
1//! `S3Store`: the `s3://` storage backend, backed by the native AWS SDK.
2//!
3//! An [`S3Store`] targets an `s3://bucket/prefix` location and holds the frozen
4//! content-addressable `.objects`/`.manifests` sharded layout, so a
5//! bucket/prefix is interchangeable across conforming implementations:
6//!
7//! ```text
8//! s3://<bucket>/<prefix>/.objects/<sharded checksum> raw object bytes
9//! s3://<bucket>/<prefix>/.manifests/<sharded snapshot id> manifest text
10//! ```
11//!
12//! Sharding and the relative keys come straight from [`snapdir_core::store`]
13//! ([`object_path`] / [`manifest_path`]); this module never reimplements them.
14//!
15//! # Credentials
16//!
17//! Authentication is delegated entirely to the standard AWS credential chain
18//! via [`aws_config`] (environment variables, shared config/credentials
19//! profiles, SSO, container/instance metadata, …). No bespoke snapdir
20//! credential variables are introduced. An S3-compatible endpoint (`MinIO`,
21//! `SeaweedFS`, …) can be selected with `SNAPDIR_S3_TEST_ENDPOINT` for the
22//! gated live test, or by constructing the store with an explicit endpoint.
23//!
24//! # TLS provider (project-load-bearing)
25//!
26//! The shipped binary must statically link on musl, so the workspace
27//! standardizes on the **`ring`** rustls provider; `aws-lc-rs` is banned. The
28//! AWS SDK defaults to an aws-lc-rs-backed HTTP connector, so this module builds
29//! the SDK's modern hyper-1.x HTTP client ([`aws_smithy_http_client`]) with its
30//! `rustls`/**`ring`** TLS provider and hands it to the SDK as a custom
31//! [`HttpClient`](aws_smithy_runtime_api::client::http::HttpClient). Native root
32//! trust anchors stay on (the builder's default `TrustStore`).
33//!
34//! # Sync trait, async SDK
35//!
36//! The SDK is async. [`S3Store`] owns a private multi-thread `tokio` runtime and
37//! bridges each [`Store`] method with `runtime.block_on(...)`, so no `async`
38//! leaks into `snapdir-core` or the orchestrator (see [`snapdir_core::store`]).
39
40use std::path::Path;
41use std::sync::Arc;
42
43use aws_config::BehaviorVersion;
44use aws_sdk_s3::config::Region;
45use aws_sdk_s3::error::{ProvideErrorMetadata, SdkError};
46use aws_sdk_s3::Client;
47use aws_smithy_http_client::tls::rustls_provider::CryptoMode;
48use aws_smithy_http_client::tls::Provider as TlsProvider;
49use aws_smithy_http_client::Builder as HttpClientBuilder;
50use aws_smithy_runtime_api::client::orchestrator::HttpResponse;
51use snapdir_core::manifest::Manifest;
52use snapdir_core::merkle::{Blake3Hasher, Hasher};
53use snapdir_core::store::{manifest_path, object_path, Store, StoreError};
54use snapdir_core::Meter;
55
56use crate::fetch::fetch_files_concurrent;
57use crate::push::{push_objects_concurrent, upload_object};
58use crate::retry::{parse_retry_after, retry_network, Attempt, DefaultJitter, TokioSleeper};
59use crate::stream::StreamStore;
60use crate::transfer::{classify_error, RateLimiter, TransferConfig};
61
62use std::error::Error as StdError;
63use tokio::runtime::Runtime;
64
65/// Number of times a fetch is retried when the downloaded bytes fail their
66/// checksum, mirroring the oracle's `_SNAPDIR_S3_STORE_RETRIES` default of 5.
67const MAX_FETCH_RETRIES: u32 = 5;
68
69/// The parsed location an [`S3Store`] targets: an S3 bucket plus a key prefix.
70#[derive(Debug, Clone, PartialEq, Eq)]
71pub struct S3Location {
72 /// The bucket name (first path segment of the `s3://` URL).
73 pub bucket: String,
74 /// The key prefix (remaining segments), with no leading or trailing slash.
75 /// Empty when the store points at the bucket root.
76 pub prefix: String,
77}
78
79impl S3Location {
80 /// Parses an `s3://bucket/prefix` URL into its bucket and prefix.
81 ///
82 /// Matches the frozen URL derivation
83 /// (`_snapdir_export_store_vars`): splitting the store URL on `/`, the
84 /// bucket is `cut -f3` (the segment after `s3://`) and the base dir is
85 /// `cut -f4-` (everything after). The prefix has any trailing slash
86 /// stripped, matching `_snapdir_s3_store_get_remote_prefix`.
87 ///
88 /// The `s3://` scheme is optional; a bare `bucket/prefix` is accepted too.
89 #[must_use]
90 pub fn parse(store_url: &str) -> Self {
91 // Drop the scheme (`s3://`, or any `<proto>://`) if present. The oracle
92 // splits the full URL on `/` and takes field 3 as the bucket, which for
93 // `s3://bucket/...` is exactly the segment after the `//`.
94 let without_scheme = match store_url.find("://") {
95 Some(idx) => &store_url[idx + 3..],
96 None => store_url,
97 };
98 let mut parts = without_scheme.splitn(2, '/');
99 let bucket = parts.next().unwrap_or("").to_owned();
100 let prefix = parts.next().unwrap_or("");
101 // Strip a trailing slash (and any leading slash from an empty first
102 // segment edge case); the prefix is joined back with a single `/`.
103 let prefix = prefix
104 .trim_end_matches('/')
105 .trim_start_matches('/')
106 .to_owned();
107 Self { bucket, prefix }
108 }
109
110 /// Returns the full S3 object key for a content object given its checksum,
111 /// i.e. `<prefix>/.objects/<sharded>` (no leading slash).
112 #[must_use]
113 pub fn object_key(&self, checksum: &str) -> String {
114 self.key_for(&object_path(checksum))
115 }
116
117 /// Returns the full S3 object key for a manifest given its snapshot id,
118 /// i.e. `<prefix>/.manifests/<sharded>` (no leading slash).
119 #[must_use]
120 pub fn manifest_key(&self, id: &str) -> String {
121 self.key_for(&manifest_path(id))
122 }
123
124 /// Joins the store prefix with a store-relative path (`.objects/...` or
125 /// `.manifests/...`), producing a leading-slash-free S3 key. Mirrors the
126 /// oracle's `${_SNAPDIR_STORE_BASE_DIR%/}/${source_path#/}` with the
127 /// leading slash trimmed.
128 fn key_for(&self, rel: &str) -> String {
129 let rel = rel.trim_start_matches('/');
130 if self.prefix.is_empty() {
131 rel.to_owned()
132 } else {
133 format!("{}/{rel}", self.prefix)
134 }
135 }
136}
137
138/// A content-addressable store backed by an S3 (or S3-compatible) bucket.
139///
140/// Construct one with [`S3Store::connect`] (resolves the standard AWS
141/// credential chain) or [`S3Store::from_client`] (an already-built SDK client,
142/// e.g. for tests against an emulator).
143pub struct S3Store {
144 client: Client,
145 location: S3Location,
146 runtime: Arc<Runtime>,
147 config: TransferConfig,
148 /// Per-call request-rate limiter (one token per SDK call), built from
149 /// [`TransferConfig::max_requests_per_sec`]. Unlimited (a no-op) by default.
150 /// Shared (clone) so every call paces against one aggregate request budget.
151 req_limiter: RateLimiter,
152 /// Optional progress meter; recorded into during transfers. `None` (the
153 /// default from every constructor) means zero recording and byte-identical
154 /// behavior. Set by the CLI via [`S3Store::with_meter`].
155 meter: Option<Arc<Meter>>,
156}
157
158impl S3Store {
159 /// Connects to the `s3://bucket/prefix` store, resolving credentials and
160 /// region via the standard AWS chain ([`aws_config::load_defaults`]).
161 ///
162 /// The HTTP client is pinned to the `ring` rustls provider (see the module
163 /// docs). An optional `endpoint_url` selects an S3-compatible backend
164 /// (path-style addressing is enabled when an endpoint is given, as
165 /// emulators rarely support virtual-host addressing).
166 ///
167 /// # Errors
168 ///
169 /// [`StoreError::Backend`] if the tokio runtime cannot be created or the
170 /// AWS configuration cannot be loaded.
171 pub fn connect(store_url: &str, endpoint_url: Option<&str>) -> Result<Self, StoreError> {
172 Self::connect_with(store_url, endpoint_url, TransferConfig::default())
173 }
174
175 /// Like [`connect`](Self::connect), but carries a [`TransferConfig`] for
176 /// concurrency / bandwidth control. The existing [`connect`](Self::connect)
177 /// delegates here with [`TransferConfig::default`].
178 ///
179 /// # Errors
180 ///
181 /// [`StoreError::Backend`] if the tokio runtime cannot be created or the
182 /// AWS configuration cannot be loaded.
183 pub fn connect_with(
184 store_url: &str,
185 endpoint_url: Option<&str>,
186 config: TransferConfig,
187 ) -> Result<Self, StoreError> {
188 let location = S3Location::parse(store_url);
189 let runtime = build_runtime()?;
190
191 let http_client = ring_https_client();
192 let endpoint = endpoint_url.map(ToOwned::to_owned);
193 let client = runtime.block_on(async move {
194 let mut loader = aws_config::defaults(BehaviorVersion::latest())
195 .http_client(http_client.clone())
196 // Disable the SDK's own retry loop so snapdir's RetryPolicy is
197 // the single backoff authority (no SDK-3x × ours-5x compounding).
198 .retry_config(aws_config::retry::RetryConfig::disabled());
199 if let Some(ep) = endpoint.as_deref() {
200 loader = loader.endpoint_url(ep);
201 }
202 let shared = loader.load().await;
203 let mut builder = aws_sdk_s3::config::Builder::from(&shared);
204 if endpoint.is_some() {
205 // S3-compatible emulators generally require path-style keys.
206 builder = builder.force_path_style(true);
207 }
208 // Some emulators / configs leave the region unset; S3 still
209 // requires a value to sign requests, so default it.
210 if shared.region().is_none() {
211 builder = builder.region(Region::new("us-east-1"));
212 }
213 Client::from_conf(builder.build())
214 });
215
216 let req_limiter = RateLimiter::new(config.max_requests_per_sec);
217 Ok(Self {
218 client,
219 location,
220 runtime: Arc::new(runtime),
221 config,
222 req_limiter,
223 meter: None,
224 })
225 }
226
227 /// Builds a store from an already-configured SDK [`Client`] and a parsed
228 /// location, owning a fresh tokio runtime for the sync bridge. Intended for
229 /// tests (e.g. wiring a client at an emulator endpoint).
230 ///
231 /// # Errors
232 ///
233 /// [`StoreError::Backend`] if the tokio runtime cannot be created.
234 pub fn from_client(client: Client, location: S3Location) -> Result<Self, StoreError> {
235 let config = TransferConfig::default();
236 let req_limiter = RateLimiter::new(config.max_requests_per_sec);
237 Ok(Self {
238 client,
239 location,
240 runtime: Arc::new(build_runtime()?),
241 config,
242 req_limiter,
243 meter: None,
244 })
245 }
246
247 /// Attaches (or clears) an optional progress [`Meter`], rides alongside
248 /// [`config`](Self::transfer_config). The transfer paths record bytes-in /
249 /// bytes-out + per-object progress into it; `None` (the constructor default)
250 /// means zero recording and byte-identical behavior. The CLI sets this after
251 /// construction.
252 #[must_use]
253 pub fn with_meter(mut self, meter: Option<Arc<Meter>>) -> Self {
254 self.meter = meter;
255 self
256 }
257
258 /// The parsed bucket/prefix this store targets.
259 #[must_use]
260 pub fn location(&self) -> &S3Location {
261 &self.location
262 }
263
264 /// The [`TransferConfig`] (concurrency / bandwidth) this store was built
265 /// with. Consumed by the transfer loops in later gates.
266 #[must_use]
267 pub fn transfer_config(&self) -> &TransferConfig {
268 &self.config
269 }
270
271 /// HEAD an object key; `Ok(true)` if it exists, `Ok(false)` if absent.
272 ///
273 /// The single SDK call is wrapped in [`retry_network`]: it acquires one
274 /// request-rate token, then retries a TRANSIENT failure under the store's
275 /// [`RetryPolicy`]. A `404`/not-found is a normal `Ok(false)` outcome (not a
276 /// retry); any other error is classified via [`s3_attempt_from_err`].
277 async fn key_exists(&self, key: &str) -> Result<bool, StoreError> {
278 retry_network(
279 &self.config.retry,
280 &self.req_limiter,
281 &TokioSleeper,
282 &DefaultJitter::new(),
283 || async {
284 match self
285 .client
286 .head_object()
287 .bucket(&self.location.bucket)
288 .key(key)
289 .send()
290 .await
291 {
292 Ok(_) => Ok(true),
293 Err(err) => {
294 // A genuine "absent" is a successful outcome of the op,
295 // not a retry: peek the concrete service error first.
296 if err.as_service_error().is_some_and(
297 aws_sdk_s3::operation::head_object::HeadObjectError::is_not_found,
298 ) {
299 return Ok(false);
300 }
301 Err(s3_attempt_from_err("S3 HEAD object failed", err))
302 }
303 }
304 },
305 )
306 .await
307 }
308
309 /// GET an object key's full body, or `None` if it is absent.
310 ///
311 /// The SDK call is wrapped in [`retry_network`] exactly like
312 /// [`key_exists`](Self::key_exists). A `NoSuchKey` is a normal `Ok(None)`
313 /// outcome; transient failures retry under the store's [`RetryPolicy`](crate::retry::RetryPolicy).
314 async fn get_bytes(&self, key: &str) -> Result<Option<Vec<u8>>, StoreError> {
315 retry_network(
316 &self.config.retry,
317 &self.req_limiter,
318 &TokioSleeper,
319 &DefaultJitter::new(),
320 || async {
321 match self
322 .client
323 .get_object()
324 .bucket(&self.location.bucket)
325 .key(key)
326 .send()
327 .await
328 {
329 Ok(resp) => {
330 // Draining the streamed body can itself fail transiently
331 // (connection reset mid-download); classify it too.
332 let data = resp.body.collect().await.map_err(|e| {
333 let err = backend("reading S3 object body", e);
334 let transient =
335 matches!(classify_error(&err), crate::adaptive::OpResult::Throttle);
336 Attempt {
337 transient,
338 retry_after: None,
339 err,
340 }
341 })?;
342 Ok(Some(data.into_bytes().to_vec()))
343 }
344 Err(err) => {
345 if err.as_service_error().is_some_and(
346 aws_sdk_s3::operation::get_object::GetObjectError::is_no_such_key,
347 ) {
348 return Ok(None);
349 }
350 Err(s3_attempt_from_err("S3 GET object failed", err))
351 }
352 }
353 },
354 )
355 .await
356 }
357
358 /// PUT `bytes` at `key`. S3 PUT is atomic, so no temp-key dance is needed
359 /// (the oracle relies on the same atomicity for manifests/objects).
360 ///
361 /// Wrapped in [`retry_network`]: each (re)try re-sends the full body, so the
362 /// content-addressed bytes that land are unchanged — only transient failures
363 /// retry under the store's [`RetryPolicy`](crate::retry::RetryPolicy).
364 async fn put_bytes(&self, key: &str, bytes: Vec<u8>) -> Result<(), StoreError> {
365 retry_network(
366 &self.config.retry,
367 &self.req_limiter,
368 &TokioSleeper,
369 &DefaultJitter::new(),
370 || {
371 let bytes = bytes.clone();
372 async move {
373 self.client
374 .put_object()
375 .bucket(&self.location.bucket)
376 .key(key)
377 .body(bytes.into())
378 .send()
379 .await
380 .map(|_| ())
381 .map_err(|err| s3_attempt_from_err("S3 PUT object failed", err))
382 }
383 },
384 )
385 .await
386 }
387
388 /// Downloads `key`, verifying its BLAKE3 against `expected`, retrying up to
389 /// [`MAX_FETCH_RETRIES`] times. Mirrors `_snapdir_s3_fetch_to_cache`.
390 async fn fetch_verified(&self, key: &str, expected: &str) -> Result<Vec<u8>, StoreError> {
391 let hasher = Blake3Hasher::new();
392 let mut attempts_left = MAX_FETCH_RETRIES;
393 loop {
394 match self.get_bytes(key).await? {
395 Some(bytes) => {
396 let actual = hasher.hash_hex(&bytes);
397 if actual == expected {
398 return Ok(bytes);
399 }
400 // Mismatched checksum after fetching: retry (the oracle
401 // decrements its retry budget on the same condition).
402 attempts_left = attempts_left.saturating_sub(1);
403 if attempts_left == 0 {
404 return Err(StoreError::Integrity {
405 address: format!("s3://{}/{key}", self.location.bucket),
406 expected: expected.to_owned(),
407 actual,
408 });
409 }
410 }
411 None => {
412 // Treat a missing key as not-found rather than spinning.
413 return Err(StoreError::ObjectNotFound {
414 checksum: expected.to_owned(),
415 });
416 }
417 }
418 }
419 }
420}
421
422impl Store for S3Store {
423 fn get_manifest(&self, id: &str) -> Result<Manifest, StoreError> {
424 let key = self.location.manifest_key(id);
425 let bytes = self.runtime.block_on(async {
426 match self.get_bytes(&key).await? {
427 Some(b) => Ok(b),
428 None => Err(StoreError::ManifestNotFound { id: id.to_owned() }),
429 }
430 })?;
431
432 let text = String::from_utf8(bytes).map_err(|err| StoreError::Backend {
433 message: format!("manifest {id} is not valid UTF-8"),
434 source: Some(Box::new(err)),
435 })?;
436 let manifest = Manifest::parse(&text)?;
437
438 // Verify the stored manifest hashes back to its snapshot id before
439 // trusting it (oracle: the id check on fetch).
440 let actual = snapdir_core::merkle::snapshot_id(&manifest, &Blake3Hasher::new());
441 if actual != id {
442 return Err(StoreError::Integrity {
443 address: self.location.manifest_key(id),
444 expected: id.to_owned(),
445 actual,
446 });
447 }
448 Ok(manifest)
449 }
450
451 fn fetch_files(&self, manifest: &Manifest, dest: &Path) -> Result<(), StoreError> {
452 // Concurrent download via the shared orchestrator: it owns the
453 // skip-if-present-and-verified short-circuit, directory creation, the
454 // bounded-concurrency pass, the per-object rate limit, and the atomic
455 // write. S3 only injects the per-object download, preserving the
456 // BLAKE3-verify + retry discipline of `fetch_verified`.
457 let limiter = RateLimiter::new(self.config.max_bytes_per_sec);
458 let meter = self.meter.as_deref();
459 let meter_arc = self.meter.clone();
460 self.runtime.block_on(async {
461 fetch_files_concurrent(
462 manifest,
463 dest,
464 &self.config,
465 &limiter,
466 meter,
467 meter_arc,
468 |entry| async {
469 let key = self.location.object_key(&entry.checksum);
470 self.fetch_verified(&key, &entry.checksum).await
471 },
472 )
473 .await
474 })
475 }
476
477 fn push(&self, manifest: &Manifest, source: &Path) -> Result<(), StoreError> {
478 let hasher = Blake3Hasher::new();
479 let id = snapdir_core::merkle::snapshot_id(manifest, &hasher);
480
481 // Concurrent upload via the shared orchestrator: it owns the bounded
482 // per-object pass and the manifest-last / all-or-nothing ordering. S3
483 // injects the per-object skip-present + upload (via `upload_object`,
484 // which also owns the shared read+verify) and the manifest-write
485 // closure. A failed push writes NO manifest.
486 let limiter = RateLimiter::new(self.config.max_bytes_per_sec);
487 let meter = self.meter.as_deref();
488 let meter_arc = self.meter.clone();
489 self.runtime.block_on(async {
490 // Skip-if-manifest-present pre-check: a present manifest implies all
491 // its objects are present (we always write the manifest last).
492 let manifest_key = self.location.manifest_key(&id);
493 if self.key_exists(&manifest_key).await? {
494 return Ok(());
495 }
496
497 push_objects_concurrent(
498 manifest,
499 &self.config,
500 &limiter,
501 meter,
502 meter_arc,
503 |entry| {
504 let object_key = self.location.object_key(&entry.checksum);
505 upload_object(
506 entry,
507 object_key,
508 source,
509 &limiter,
510 meter,
511 |key| async move { self.key_exists(&key).await },
512 |key, bytes| async move { self.put_bytes(&key, bytes).await },
513 )
514 },
515 || async {
516 // Write the manifest last (verified to hash back to its id),
517 // exactly as the oracle stores `echo "${manifest}"` text.
518 let mut text = manifest.to_string();
519 text.push('\n');
520 let manifest_actual = hasher.hash_hex(text.as_bytes());
521 if manifest_actual != id {
522 return Err(StoreError::Integrity {
523 address: manifest_key.clone(),
524 expected: id.clone(),
525 actual: manifest_actual,
526 });
527 }
528 self.put_bytes(&manifest_key, text.into_bytes()).await
529 },
530 )
531 .await
532 })
533 }
534}
535
536impl StreamStore for S3Store {
537 fn has_object(&self, checksum: &str) -> Result<bool, StoreError> {
538 let key = self.location.object_key(checksum);
539 self.runtime.block_on(async { self.key_exists(&key).await })
540 }
541
542 fn get_object(&self, checksum: &str) -> Result<Vec<u8>, StoreError> {
543 let key = self.location.object_key(checksum);
544 let bytes = self.runtime.block_on(async {
545 self.get_bytes(&key)
546 .await?
547 .ok_or_else(|| StoreError::ObjectNotFound {
548 checksum: checksum.to_owned(),
549 })
550 })?;
551
552 // Verify the downloaded blob hashes back to its content-address before
553 // returning it (corruption surfaces as `Integrity`, never bad bytes).
554 let actual = Blake3Hasher::new().hash_hex(&bytes);
555 if actual != checksum {
556 return Err(StoreError::Integrity {
557 address: format!("s3://{}/{key}", self.location.bucket),
558 expected: checksum.to_owned(),
559 actual,
560 });
561 }
562 Ok(bytes)
563 }
564
565 fn put_object(&self, checksum: &str, bytes: Vec<u8>) -> Result<(), StoreError> {
566 // Verify BEFORE uploading: a blob whose bytes do not hash to `checksum`
567 // must never land at that content-address (nothing is stored).
568 let actual = Blake3Hasher::new().hash_hex(&bytes);
569 if actual != checksum {
570 return Err(StoreError::Integrity {
571 address: self.location.object_key(checksum),
572 expected: checksum.to_owned(),
573 actual,
574 });
575 }
576 let key = self.location.object_key(checksum);
577 self.runtime
578 .block_on(async { self.put_bytes(&key, bytes).await })
579 }
580
581 fn put_manifest(&self, id: &str, manifest: &Manifest) -> Result<(), StoreError> {
582 let key = self.location.manifest_key(id);
583 // Mirror the manifest-write tail of `push`: render the oracle's
584 // `echo "${manifest}"` bytes, verify they hash back to `id`, then PUT.
585 let mut text = manifest.to_string();
586 text.push('\n');
587 let actual = Blake3Hasher::new().hash_hex(text.as_bytes());
588 if actual != id {
589 return Err(StoreError::Integrity {
590 address: key,
591 expected: id.to_owned(),
592 actual,
593 });
594 }
595 self.runtime
596 .block_on(async { self.put_bytes(&key, text.into_bytes()).await })
597 }
598}
599
600/// Builds the multi-thread tokio runtime that backs the sync bridge.
601fn build_runtime() -> Result<Runtime, StoreError> {
602 tokio::runtime::Builder::new_multi_thread()
603 .enable_all()
604 .build()
605 .map_err(|e| backend("creating tokio runtime for S3Store", e))
606}
607
608/// Builds the AWS-SDK hyper-1.x HTTP client backed by `rustls` using the
609/// **`ring`** crypto provider, with native-root trust anchors (the builder's
610/// default `TrustStore` enables them). This is the load-bearing piece that keeps
611/// `aws-lc-rs` (and the legacy hyper-0.14 TLS island) out of the dependency
612/// graph.
613fn ring_https_client() -> aws_smithy_runtime_api::client::http::SharedHttpClient {
614 HttpClientBuilder::new()
615 .tls_provider(TlsProvider::Rustls(CryptoMode::Ring))
616 .build_https()
617}
618
619/// Wraps any backend error into [`StoreError::Backend`] with a message.
620fn backend<E>(message: &str, source: E) -> StoreError
621where
622 E: std::error::Error + Send + Sync + 'static,
623{
624 StoreError::Backend {
625 message: message.to_owned(),
626 source: Some(Box::new(source)),
627 }
628}
629
630/// Builds a retry [`Attempt`] from a concrete aws-sdk-s3 [`SdkError`], at the
631/// boundary where the SDK error still carries its raw HTTP response.
632///
633/// - `transient`: true for the retryable signal set — an HTTP `429`/`503`
634/// status on the raw response, the throttle error codes
635/// (`SlowDown`/`Throttling`/`RequestTimeout`/`ServiceUnavailable`/…), and the
636/// transport/timeout classes — folded through the shared
637/// [`classify_error`](crate::classify_error) on the mapped [`StoreError`].
638/// Conservative: an unknown error is NOT transient.
639/// - `retry_after`: extracted from the raw response's `Retry-After` header (the
640/// delta-seconds form) when present, else `None`.
641/// - `err`: the mapped [`StoreError::Backend`] (the same value the non-retry
642/// path used to surface).
643fn s3_attempt_from_err<E>(message: &str, err: SdkError<E, HttpResponse>) -> Attempt
644where
645 E: ProvideErrorMetadata + StdError + Send + Sync + 'static,
646{
647 // Extract status code + Retry-After off the raw HTTP response (present on
648 // ServiceError / ResponseError variants) BEFORE we consume the error.
649 let (http_status, retry_after) = match err.raw_response() {
650 Some(resp) => {
651 let status = resp.status().as_u16();
652 let hint = resp
653 .headers()
654 .get("retry-after")
655 .and_then(parse_retry_after);
656 (Some(status), hint)
657 }
658 None => (None, None),
659 };
660
661 // The SDK error code (e.g. "SlowDown", "ServiceUnavailable") rides in the
662 // error metadata; fold it (plus the status) into the mapped StoreError's
663 // text so the shared classifier sees the transient signals.
664 let code = err.code().unwrap_or_default().to_owned();
665 let store_err = backend(message, err);
666
667 let transient = http_status.is_some_and(|s| s == 429 || s == 503)
668 || matches!(
669 classify_error(&store_err),
670 crate::adaptive::OpResult::Throttle
671 )
672 || {
673 let c = code.to_ascii_lowercase();
674 c.contains("slowdown")
675 || c.contains("throttl")
676 || c.contains("requesttimeout")
677 || c.contains("serviceunavailable")
678 || c.contains("internalerror")
679 };
680
681 Attempt {
682 transient,
683 retry_after,
684 err: store_err,
685 }
686}
687
688#[cfg(test)]
689mod tests {
690 use super::*;
691 use aws_sdk_s3::error::ErrorMetadata;
692 use aws_sdk_s3::operation::head_object::HeadObjectError;
693 use aws_sdk_s3::primitives::SdkBody;
694 use aws_smithy_runtime_api::http::StatusCode;
695 use snapdir_core::manifest::PathType;
696 use std::time::Duration;
697
698 /// Builds a raw S3 [`HttpResponse`] with the given status and optional
699 /// `Retry-After` header, for exercising [`s3_attempt_from_err`] without a
700 /// live SDK call.
701 fn raw_response(status: u16, retry_after: Option<&str>) -> HttpResponse {
702 let mut resp = HttpResponse::new(
703 StatusCode::try_from(status).expect("valid status"),
704 SdkBody::empty(),
705 );
706 if let Some(v) = retry_after {
707 resp.headers_mut().insert("retry-after", v.to_owned());
708 }
709 resp
710 }
711
712 /// Wraps an S3 service error (carrying `code`) plus a raw response into the
713 /// concrete `SdkError` the store methods see, so the extractor runs on a
714 /// real SDK error shape.
715 fn s3_service_error(code: &str, status: u16, retry_after: Option<&str>) -> Attempt {
716 let meta = ErrorMetadata::builder().code(code).build();
717 let svc = HeadObjectError::generic(meta);
718 let err = SdkError::service_error(svc, raw_response(status, retry_after));
719 s3_attempt_from_err("S3 op failed", err)
720 }
721
722 #[test]
723 fn backoff_wire_s3_extract_503_retry_after_is_transient_with_hint() {
724 // A 503 SlowDown carrying `Retry-After: 12` => transient, hint = 12s.
725 let attempt = s3_service_error("SlowDown", 503, Some("12"));
726 assert!(attempt.transient, "503/SlowDown must be transient");
727 assert_eq!(
728 attempt.retry_after,
729 Some(Duration::from_secs(12)),
730 "the Retry-After delta-seconds header must be extracted"
731 );
732 }
733
734 #[test]
735 fn backoff_wire_s3_extract_429_without_header_is_transient_no_hint() {
736 // A 429 with no Retry-After header => still transient, but no hint.
737 let attempt = s3_service_error("Throttling", 429, None);
738 assert!(attempt.transient, "429 must be transient");
739 assert_eq!(
740 attempt.retry_after, None,
741 "absent Retry-After header => None (backoff handles the delay)"
742 );
743 }
744
745 #[test]
746 fn backoff_wire_s3_extract_404_is_not_transient() {
747 // A 404 NoSuchKey-style hard error => NOT transient (conservative).
748 let attempt = s3_service_error("NoSuchKey", 404, None);
749 assert!(
750 !attempt.transient,
751 "a 404/not-found must never be classified transient"
752 );
753 assert_eq!(attempt.retry_after, None);
754 }
755
756 /// Strips a leading `./` and a trailing `/` from a manifest path. Kept as a
757 /// test-only assertion of the path normalization the orchestrator
758 /// (`crate::push`) performs.
759 fn strip_leading_dot_slash(path: &str) -> &str {
760 let trimmed = path.strip_prefix("./").unwrap_or(path);
761 trimmed.strip_suffix('/').unwrap_or(trimmed)
762 }
763
764 // The canonical content-addressable fixtures from the s3 store test suite.
765 const FOO_CHECKSUM: &str = "49dc870df1de7fd60794cebce449f5ccdae575affaa67a24b62acb03e039db92";
766 const FOO_SHARDED: &str = "49d/c87/0df/1de7fd60794cebce449f5ccdae575affaa67a24b62acb03e039db92";
767 const MANIFEST_ID: &str = "aa91e498f401ea9e6ddbaa1138a0dbeb030fab8defc1252d80c77ebefafbc70d";
768 const MANIFEST_SHARDED: &str =
769 "aa9/1e4/98f/401ea9e6ddbaa1138a0dbeb030fab8defc1252d80c77ebefafbc70d";
770
771 #[test]
772 fn s3_store_parses_bucket_and_prefix() {
773 let loc = S3Location::parse("s3://my-bucket/long/term/storage");
774 assert_eq!(loc.bucket, "my-bucket");
775 assert_eq!(loc.prefix, "long/term/storage");
776 }
777
778 #[test]
779 fn s3_store_parse_matches_oracle_cut_fields() {
780 // Oracle: bucket = `cut -d'/' -f3`, base_dir = `cut -d'/' -f4-`.
781 // For "s3://bucket/a/b/c": fields are [s3:,"",bucket,a,b,c].
782 let loc = S3Location::parse("s3://bucket/a/b/c");
783 assert_eq!(loc.bucket, "bucket");
784 assert_eq!(loc.prefix, "a/b/c");
785 }
786
787 #[test]
788 fn s3_store_parse_strips_trailing_slash() {
789 // `_snapdir_s3_store_get_remote_prefix` strips the trailing slash.
790 let loc = S3Location::parse("s3://bucket/prefix/");
791 assert_eq!(loc.bucket, "bucket");
792 assert_eq!(loc.prefix, "prefix");
793 }
794
795 #[test]
796 fn s3_store_parse_bucket_root_has_empty_prefix() {
797 let loc = S3Location::parse("s3://bucket");
798 assert_eq!(loc.bucket, "bucket");
799 assert_eq!(loc.prefix, "");
800
801 let loc_slash = S3Location::parse("s3://bucket/");
802 assert_eq!(loc_slash.bucket, "bucket");
803 assert_eq!(loc_slash.prefix, "");
804 }
805
806 #[test]
807 fn s3_store_parse_accepts_bare_bucket_prefix_without_scheme() {
808 let loc = S3Location::parse("bucket/some/prefix");
809 assert_eq!(loc.bucket, "bucket");
810 assert_eq!(loc.prefix, "some/prefix");
811 }
812
813 #[test]
814 fn s3_store_object_key_matches_sharded_scheme() {
815 let loc = S3Location::parse("s3://b/long/term/storage");
816 assert_eq!(
817 loc.object_key(FOO_CHECKSUM),
818 format!("long/term/storage/.objects/{FOO_SHARDED}")
819 );
820 }
821
822 #[test]
823 fn s3_store_manifest_key_matches_sharded_scheme() {
824 let loc = S3Location::parse("s3://b/long/term/storage");
825 assert_eq!(
826 loc.manifest_key(MANIFEST_ID),
827 format!("long/term/storage/.manifests/{MANIFEST_SHARDED}")
828 );
829 }
830
831 #[test]
832 fn s3_store_keys_have_no_leading_slash_at_bucket_root() {
833 // With an empty prefix the keys are just `.objects/...` / `.manifests/...`.
834 let loc = S3Location::parse("s3://bucket");
835 assert_eq!(
836 loc.object_key(FOO_CHECKSUM),
837 format!(".objects/{FOO_SHARDED}")
838 );
839 assert_eq!(
840 loc.manifest_key(MANIFEST_ID),
841 format!(".manifests/{MANIFEST_SHARDED}")
842 );
843 }
844
845 #[test]
846 fn s3_store_object_key_uses_core_object_path() {
847 // Cross-check that we delegate to the frozen core sharding helper rather
848 // than reimplementing it: the key tail must equal `object_path` output.
849 let loc = S3Location::parse("s3://b");
850 assert_eq!(loc.object_key(FOO_CHECKSUM), object_path(FOO_CHECKSUM));
851 }
852
853 #[test]
854 fn s3_store_strip_leading_dot_slash() {
855 assert_eq!(strip_leading_dot_slash("./foo"), "foo");
856 assert_eq!(strip_leading_dot_slash("./a/b/c"), "a/b/c");
857 assert_eq!(strip_leading_dot_slash("./a/"), "a");
858 assert_eq!(strip_leading_dot_slash("./"), "");
859 }
860
861 // --- Live round-trip, skipped by default --------------------------------
862 //
863 // Requires an S3-compatible endpoint (e.g. MinIO/SeaweedFS) plus AWS
864 // credentials in the environment. Gated behind `SNAPDIR_S3_TEST_ENDPOINT`
865 // and `SNAPDIR_S3_TEST_STORE` (an `s3://bucket/prefix` URL) so it is skipped
866 // unless explicitly configured. Real emulator round-trips are exercised by
867 // the later `remote-interop` gate.
868 #[test]
869 fn s3_store_live_round_trip_when_configured() {
870 use snapdir_core::manifest::ManifestEntry;
871
872 let (Ok(endpoint), Ok(store)) = (
873 std::env::var("SNAPDIR_S3_TEST_ENDPOINT"),
874 std::env::var("SNAPDIR_S3_TEST_STORE"),
875 ) else {
876 eprintln!(
877 "skipping s3_store live round-trip: set SNAPDIR_S3_TEST_ENDPOINT \
878 and SNAPDIR_S3_TEST_STORE (s3://bucket/prefix) to run it"
879 );
880 return;
881 };
882
883 let hasher = Blake3Hasher::new();
884
885 // Build a tiny source tree + matching manifest.
886 let src = std::env::temp_dir().join(format!("snapdir-s3-live-{}", std::process::id()));
887 std::fs::create_dir_all(&src).unwrap();
888 std::fs::write(src.join("foo"), b"foo\n").unwrap();
889 let foo_sum = hasher.hash_hex(b"foo\n");
890 let root_sum = snapdir_core::merkle::directory_checksum([foo_sum.as_str()], &hasher);
891 let mut manifest = Manifest::new();
892 manifest.push(ManifestEntry::new(
893 PathType::Directory,
894 "700",
895 root_sum,
896 4,
897 "./",
898 ));
899 manifest.push(ManifestEntry::new(
900 PathType::File,
901 "600",
902 foo_sum,
903 4,
904 "./foo",
905 ));
906 let manifest = Manifest::from_entries(manifest.entries().to_vec());
907 let id = snapdir_core::merkle::snapshot_id(&manifest, &hasher);
908
909 let s3 = S3Store::connect(&store, Some(&endpoint)).expect("connect");
910 s3.push(&manifest, &src).expect("push");
911 let read_back = s3.get_manifest(&id).expect("get_manifest");
912 assert_eq!(read_back, manifest);
913
914 let dest = std::env::temp_dir().join(format!("snapdir-s3-dest-{}", std::process::id()));
915 std::fs::create_dir_all(&dest).unwrap();
916 s3.fetch_files(&read_back, &dest).expect("fetch_files");
917 assert_eq!(std::fs::read(dest.join("foo")).unwrap(), b"foo\n");
918
919 let _ = std::fs::remove_dir_all(&src);
920 let _ = std::fs::remove_dir_all(&dest);
921 }
922}