Skip to main content

git_remote_object_store/object_store/
s3.rs

1//! S3 backend for the [`ObjectStore`] trait.
2//!
3//! `S3Store` wraps `aws-sdk-s3`. The SDK owns `SigV4`, retries, connection
4//! pooling, and timeout policy; this module owns the URL → SDK config
5//! translation, the error-code classifier ([`classify`]), and the
6//! hand-rolled multipart download orchestrator that the SDK does not
7//! provide.
8//!
9//! ## Key composition
10//!
11//! `S3Store` does **not** auto-prepend the [`RemoteUrl`] `prefix`. Trait
12//! keys are byte-prefix per the contract on
13//! [`ObjectStore::list`]; the URL `prefix` is
14//! a repository concern and is composed by callers that build keys like
15//! `<prefix>/refs/.../<sha>.bundle`.
16//!
17//! ## Conditional writes
18//!
19//! [`put_if_absent`][super::ObjectStore::put_if_absent] uses
20//! `If-None-Match: "*"`. S3 returns either 412 (`PreconditionFailed`)
21//! when the key already exists or 409 (`ConditionalRequestConflict`)
22//! when two PUTs race. Both collapse to `Ok(false)`.
23//!
24//! ## Size limits
25//!
26//! AWS caps a single `PutObject` body at [`SINGLE_PUT_LIMIT_BYTES`]
27//! (5 GiB) and a multipart upload at [`S3_MAX_PARTS`] (10 000) parts;
28//! the per-object ceiling is 5 TiB. The helper auto-promotes uploads
29//! above [`super::multipart::MULTIPART_PUT_THRESHOLD`] onto the
30//! multipart path, so callers do not have to reason about the 5 GiB
31//! single-PUT cutoff. The upload path is **not resumable** across
32//! process death — see the README "Known limitations" section.
33//!
34//! ## Atomic `get_to_file`
35//!
36//! Both the small-object and multipart download paths write to a sibling
37//! [`tempfile::NamedTempFile`] and rename on success so a partial
38//! failure cannot leave a corrupt destination for the unbundle step.
39//!
40//! Every GET carries `If-Match: <etag>` derived from the preceding
41//! `HeadObject` call. If the object is overwritten between `head` and
42//! the body download, S3 returns 412 and `get_to_file` retries once
43//! with a fresh `head`/`ETag`. After one retry the 412 propagates as
44//! [`ObjectStoreError::PreconditionFailed`].
45//!
46//! ## HTTP transport tuning
47//!
48//! `aws-sdk-s3`'s default HTTP client keeps idle pooled connections
49//! indefinitely, so a pooled connection to a rotated VIP would wedge
50//! an in-flight request until the OS-level TCP retransmit timeout
51//! fires (~15 minutes on Linux). [`S3Store::from_remote_url`] installs
52//! a custom HTTP client built via [`aws_smithy_http_client::Builder`]
53//! with [`POOL_IDLE_TIMEOUT`] bounded to 30 s, so a rotation costs at
54//! most one short-circuited request rather than minutes of wedged
55//! transfer. Tracking issue: #26.
56//!
57//! Pool-idle alone does not bound a *hot* pooled connection — one that
58//! was used within the last 30 s but has since become stuck — and the
59//! 412 retry in [`ObjectStore::get_to_file`] is a deliberate-server-
60//! response retry, so forcing a fresh connection there does not help.
61//! Instead, the SDK's [`aws_config::timeout::TimeoutConfig`] is given
62//! a [`READ_TIMEOUT`] so a stuck request fails fast and the SDK's
63//! internal retry layer can pick a fresh one. `connect_timeout` is
64//! left at the SDK default (3.1 s, already aggressive). Tracking
65//! issue: #26.
66//!
67//! Note: smithy's `read_timeout` resolves the HTTP connector future at
68//! "response-headers received." That bounds:
69//! - **Uploads** in full — the connector future cannot resolve until
70//!   the request body is sent and the response status arrives, so a
71//!   stuck upload trips at [`READ_TIMEOUT`]. `put_body` therefore
72//!   overrides the timeout per-operation so large bundle uploads are
73//!   not cut off at 30 s.
74//! - **Downloads** only up to time-to-first-byte. Once response
75//!   headers arrive the future resolves; subsequent body-chunk reads
76//!   are not bounded by `read_timeout`. A peer that wedges mid-body
77//!   on a GET (e.g. a stuck multipart range) is still subject to the
78//!   pool-idle / TCP-keepalive layers, but not to `READ_TIMEOUT`.
79//!   Lesson #2 in `docs/development/lessons_learned.md` covers this.
80//!
81//! TCP keepalive (the second knob suggested in #27) is **not** wired
82//! on the S3 path: `aws-smithy-http-client` 1.1.12's public `Builder`
83//! / `ConnectorBuilder` API exposes `pool_idle_timeout` but does not
84//! expose `tcp_keepalive`. The dominant DNS-rotation failure in #26 is
85//! pool reuse of a dead VIP, which `pool_idle_timeout` already fixes;
86//! the gap relative to the Azure backend (which uses `reqwest` and
87//! gets keepalive for free) is documented in `CHANGELOG.md`.
88//!
89//! ## Multipart-upload lifetime safety
90//!
91//! S3 retains uncompleted multipart uploads indefinitely without an
92//! explicit lifecycle rule, so a future dropped between
93//! `CreateMultipartUpload` and `CompleteMultipartUpload` orphans the
94//! upload-id and bills the caller for the staged parts (issues #169,
95//! #171). [`S3Store::start_multipart_upload`] therefore hands back a
96//! [`MultipartUploadGuard`] that owns the upload-id and best-effort
97//! issues `AbortMultipartUpload` on `Drop`; [`finish_multipart_upload`]
98//! is the only call site that may [`disarm`] the guard.
99//!
100//! Future contributors must **not** introduce an early `?`-return
101//! between obtaining the upload-id and constructing the
102//! [`MultipartUploadGuard`] inside `start_multipart_upload`, nor
103//! between `start_multipart_upload` and the matching
104//! `finish_multipart_upload`: a bare upload-id outside the guard
105//! reintroduces the leak the guard exists to prevent. (The
106//! `ok_or_else` for a missing upload-id field on the SDK response is
107//! benign — there is no upload-id to abort.)
108//!
109//! Azure has no equivalent need — uncommitted blocks auto-expire after
110//! seven days (`azure.rs`).
111//!
112//! [`finish_multipart_upload`]: S3Store::finish_multipart_upload
113//! [`disarm`]: MultipartUploadGuard::disarm
114//!
115//! ## Stdout discipline
116//!
117//! Per `.claude/rules/protocol-stdout.md`, this module never writes to
118//! stdout. Diagnostics go through `tracing` (which the helper binaries
119//! configure to write to stderr).
120
121use std::io::SeekFrom;
122use std::path::Path;
123use std::sync::Arc;
124use std::time::Duration;
125
126use aws_config::timeout::TimeoutConfig;
127use aws_config::{BehaviorVersion, Region};
128use aws_sdk_s3::error::{ProvideErrorMetadata, SdkError};
129use aws_sdk_s3::primitives::{ByteStream, Length};
130use aws_sdk_s3::types::{CompletedMultipartUpload, CompletedPart, MetadataDirective};
131use aws_smithy_http_client::tls::{Provider as TlsProvider, rustls_provider::CryptoMode};
132use aws_smithy_types_convert::date_time::DateTimeExt;
133use bytes::Bytes;
134use percent_encoding::{AsciiSet, CONTROLS, utf8_percent_encode};
135use tempfile::NamedTempFile;
136use tokio::io::{AsyncSeekExt, AsyncWriteExt};
137use tokio::sync::{Mutex, Semaphore};
138use tokio::task::JoinSet;
139use url::Url;
140
141use crate::url::{
142    AWS_S3_INFIXES, RemoteUrl, S3Addressing, s3_virtual_hosted_bucket, strip_aws_host_suffix,
143};
144
145use super::error::{network_boxed, other_boxed};
146use super::multipart::{
147    MULTIPART_PUT_MAX_CONCURRENCY, MULTIPART_PUT_PART_SIZE, S3_MAX_PARTS, UploadPart,
148    plan_upload_parts, read_file_part, should_use_multipart, slice_bytes_part,
149};
150use super::{
151    GetOpts, ObjectMeta, ObjectStore, ObjectStoreError, ProgressSink, PutOpts, persist_temp,
152};
153
154/// Object-size cutoff above which `get_to_file` switches from a single
155/// streaming GET to parallel ranged GETs.
156pub(crate) const MULTIPART_THRESHOLD: u64 = 25 * 1024 * 1024;
157/// Range size for each ranged GET in the multipart download path.
158pub(crate) const MULTIPART_CHUNK_SIZE: u64 = 16 * 1024 * 1024;
159/// Maximum simultaneous in-flight ranged GETs in the multipart download path.
160pub(crate) const MULTIPART_MAX_CONCURRENCY: usize = 8;
161
162/// S3's hard ceiling on a single `PutObject` body. Reported in
163/// [`ObjectStoreError::PayloadTooLarge`] when the SDK surfaces
164/// `EntityTooLarge` (HTTP 400) or HTTP 413 so the wire-line names the
165/// number rather than dumping an opaque SDK chain.
166pub(crate) const SINGLE_PUT_LIMIT_BYTES: u64 = 5 * (1 << 30);
167
168/// Percent-encode set used for `x-amz-copy-source` keys: every non-
169/// alphanumeric ASCII byte except the path-structural and unreserved
170/// characters (`/`, `.`, `-`, `_`, `~`).
171const COPY_SOURCE_ENCODE: &AsciiSet = &CONTROLS
172    .add(b' ')
173    .add(b'!')
174    .add(b'"')
175    .add(b'#')
176    .add(b'$')
177    .add(b'%')
178    .add(b'&')
179    .add(b'\'')
180    .add(b'(')
181    .add(b')')
182    .add(b'*')
183    .add(b'+')
184    .add(b',')
185    .add(b':')
186    .add(b';')
187    .add(b'<')
188    .add(b'=')
189    .add(b'>')
190    .add(b'?')
191    .add(b'@')
192    .add(b'[')
193    .add(b'\\')
194    .add(b']')
195    .add(b'^')
196    .add(b'`')
197    .add(b'{')
198    .add(b'|')
199    .add(b'}');
200
201/// Bound on how long an idle pooled HTTPS connection lingers before
202/// the smithy connection pool drops it. Short enough that DNS rotation
203/// rarely hits a stale pooled connection; long enough that bursty
204/// fetch / push batches still benefit from connection reuse. See the
205/// module-level "HTTP transport tuning" docs and issue #26.
206pub(crate) const POOL_IDLE_TIMEOUT: Duration = Duration::from_secs(30);
207
208/// Timeout applied to every S3 GET, HEAD, LIST, and lock-write request.
209/// Catches a hot pooled connection that has gone silent (e.g. mid-LFS
210/// session when the server VIP rotates). Sized to match
211/// [`POOL_IDLE_TIMEOUT`] — both budgets are "give up and let the SDK
212/// retry pick a fresh connection" budgets.
213///
214/// Note: smithy's `read_timeout` resolves the HTTP connector future at
215/// "response-headers received." For uploads that includes the request
216/// body (the connector cannot resolve until the response arrives), so
217/// [`S3Store::put_body`] overrides the timeout per-operation to keep
218/// large bundle uploads from being cut off at 30 s. For downloads it
219/// is a time-to-first-byte bound only — body chunks streamed after the
220/// headers are not subject to `READ_TIMEOUT`. See the module-level
221/// transport docs and lesson #2 in `docs/development/lessons_learned.md`.
222pub(crate) const READ_TIMEOUT: Duration = Duration::from_secs(30);
223
224/// Per-operation timeout config applied to every S3 PUT (object upload).
225///
226/// `disable_read_timeout()` is the entire point of this helper: a
227/// regression that returns `TimeoutConfig::builder().build()` (all
228/// fields `Unset`) re-introduces issue #26 (large uploads aborted at
229/// 30 s). Pinned by `put_body_upload_override_disables_read_timeout`
230/// so the fix cannot silently revert.
231fn upload_timeout_config() -> TimeoutConfig {
232    TimeoutConfig::builder().disable_read_timeout().build()
233}
234
235/// Production [`ObjectStore`] backed by `aws-sdk-s3`.
236#[derive(Debug)]
237pub struct S3Store {
238    client: aws_sdk_s3::Client,
239    bucket: String,
240}
241
242/// The decisions extracted from a [`RemoteUrl::S3`] before they are
243/// fed into the `aws-sdk-s3` config builder. Factored out so unit
244/// tests can assert each decision without going through the SDK
245/// (whose getters vary across patch releases).
246#[derive(Debug, Clone, PartialEq, Eq)]
247pub(crate) struct ResolvedS3Config {
248    pub(crate) endpoint_url: Url,
249    pub(crate) region: Option<String>,
250    pub(crate) force_path_style: bool,
251    pub(crate) profile: Option<String>,
252}
253
254impl ResolvedS3Config {
255    pub(crate) fn from_url_parts(
256        endpoint: &Url,
257        addressing: S3Addressing,
258        profile: Option<&str>,
259        region_flag: Option<&str>,
260    ) -> Result<Self, ObjectStoreError> {
261        Ok(Self {
262            endpoint_url: normalize_endpoint(endpoint, addressing)?,
263            region: resolve_region(endpoint, region_flag),
264            force_path_style: matches!(addressing, S3Addressing::PathStyle),
265            profile: profile.map(str::to_owned),
266        })
267    }
268}
269
270impl S3Store {
271    /// Build an `S3Store` from a parsed [`RemoteUrl`].
272    ///
273    /// The [`RemoteUrl::S3::prefix`] field is intentionally **not**
274    /// consumed here; callers compose it into keys themselves per the
275    /// module-level docs.
276    ///
277    /// # Errors
278    ///
279    /// Returns [`ObjectStoreError::Other`] if `url` is not the S3 variant
280    /// or if the endpoint URL cannot be normalised for virtual-hosted
281    /// addressing.
282    pub async fn from_remote_url(url: &RemoteUrl) -> Result<Self, ObjectStoreError> {
283        let RemoteUrl::S3 {
284            endpoint,
285            bucket,
286            addressing,
287            flags,
288            ..
289        } = url
290        else {
291            return Err(ObjectStoreError::Other(
292                format!("S3Store::from_remote_url called with non-S3 URL: {url}").into(),
293            ));
294        };
295
296        let resolved = ResolvedS3Config::from_url_parts(
297            endpoint,
298            *addressing,
299            flags.profile.as_deref(),
300            flags.region.as_deref(),
301        )?;
302        let sdk_config = build_s3_config(&resolved).await;
303        let client = aws_sdk_s3::Client::from_conf(sdk_config);
304
305        Ok(Self {
306            client,
307            bucket: bucket.clone(),
308        })
309    }
310
311    /// Verify the bucket is reachable with the configured credentials by
312    /// issuing a single `ListObjectsV2` with `max_keys=1`. Used by
313    /// [`crate::protocol::backend::build`] to fold credential / missing-bucket /
314    /// authorization failures into categorical
315    /// [`crate::protocol::backend::BackendError`] variants before the
316    /// helper REPL runs its first command.
317    pub(crate) async fn probe(&self, prefix: &str) -> Result<(), ObjectStoreError> {
318        self.client
319            .list_objects_v2()
320            .bucket(&self.bucket)
321            .prefix(prefix)
322            .max_keys(1)
323            .send()
324            .await
325            .map_err(|e| classify(e, prefix))?;
326        Ok(())
327    }
328}
329
330/// Build the `aws-sdk-s3` config from a [`ResolvedS3Config`].
331///
332/// 1. Load the AWS SDK provider chain with `BehaviorVersion::latest()`.
333/// 2. Install a custom HTTP client with [`POOL_IDLE_TIMEOUT`] so DNS
334///    rotation does not wedge long-running sessions (#26).
335/// 3. Apply [`READ_TIMEOUT`] so a stuck GET/HEAD/LIST/lock request fails
336///    fast instead of waiting for the OS-level TCP retransmit timeout
337///    (#26). `connect_timeout` is left at the SDK default (3.1 s).
338///    `put_body` overrides this per-operation to allow large uploads.
339/// 4. Apply `endpoint_url`, `profile`, `region` from the resolved decisions.
340/// 5. Override `force_path_style` on the resulting `aws_sdk_s3::Config`.
341pub(crate) async fn build_s3_config(resolved: &ResolvedS3Config) -> aws_sdk_s3::Config {
342    let mut loader = aws_config::defaults(BehaviorVersion::latest())
343        .http_client(
344            aws_smithy_http_client::Builder::new()
345                .tls_provider(TlsProvider::Rustls(CryptoMode::AwsLc))
346                .pool_idle_timeout(POOL_IDLE_TIMEOUT)
347                .build_https(),
348        )
349        .timeout_config(TimeoutConfig::builder().read_timeout(READ_TIMEOUT).build())
350        .endpoint_url(resolved.endpoint_url.as_str());
351    if let Some(p) = &resolved.profile {
352        loader = loader.profile_name(p);
353    }
354    if let Some(r) = &resolved.region {
355        loader = loader.region(Region::new(r.clone()));
356    }
357    let sdk_config = loader.load().await;
358
359    aws_sdk_s3::config::Builder::from(&sdk_config)
360        .force_path_style(resolved.force_path_style)
361        .build()
362}
363
364/// Rewrite the parsed endpoint URL into the form `aws-sdk-s3` expects
365/// as `endpoint_url`: a base of `scheme://host[:port]` with **no path,
366/// query, or fragment**, and with any bucket label stripped from the
367/// host for virtual-hosted addressing.
368///
369/// The SDK rejects an `endpoint_url` that carries a query component
370/// (e.g. our `?addressing=...` flag) and adds the bucket itself —
371/// either as a path segment (`force_path_style(true)`) or as a host
372/// subdomain (`force_path_style(false)`) — so we must strip both
373/// before handing the URL off.
374pub(crate) fn normalize_endpoint(
375    endpoint: &Url,
376    addressing: S3Addressing,
377) -> Result<Url, ObjectStoreError> {
378    let mut rewritten = endpoint.clone();
379    rewritten.set_path("");
380    rewritten.set_query(None);
381    rewritten.set_fragment(None);
382
383    if matches!(addressing, S3Addressing::VirtualHosted) {
384        let host = rewritten
385            .host_str()
386            .ok_or_else(|| ObjectStoreError::Other("endpoint URL has no host".into()))?;
387        // Use `s3_virtual_hosted_bucket` (rightmost-infix rfind) to find the
388        // bucket label length, then strip `bucket.` from the front. This
389        // handles dotted bucket names like `bucketname.com.s3.region.amazonaws.com`
390        // correctly; a plain `split_once('.')` would stop at the first dot
391        // and leave `com.s3.…` as the endpoint instead of `s3.…`.
392        // For non-AWS virtual-hosted endpoints without the `.s3.` infix
393        // (e.g. MinIO with `bucket.minio.example.com`), fall back to
394        // stripping just the leftmost label.
395        let regional_host = s3_virtual_hosted_bucket(host)
396            // `s3_virtual_hosted_bucket` returns the bucket label (a strict
397            // prefix of `host` ending just before the `.s3.` or `.s3-`
398            // infix). The byte at `host[bucket.len()]` is always `.` (ASCII
399            // 0x2E), so slicing at `+ 1` is always a valid UTF-8 boundary.
400            .map(|bucket| host[bucket.len() + 1..].to_owned())
401            .or_else(|| host.split_once('.').map(|(_, rest)| rest.to_owned()))
402            .ok_or_else(|| {
403                ObjectStoreError::Other(
404                    format!("virtual-hosted endpoint host `{host}` has no dot separator").into(),
405                )
406            })?;
407        rewritten
408            .set_host(Some(&regional_host))
409            .map_err(other_boxed)?;
410    }
411
412    Ok(rewritten)
413}
414
415/// Resolve the `SigV4` signing region.
416///
417/// Order: `?region=` flag → AWS hostname pattern → `us-east-1` default
418/// for non-AWS hosts → `None` for legacy AWS hosts that don't carry a
419/// region segment (the SDK provider chain takes over).
420pub(crate) fn resolve_region(endpoint: &Url, flag: Option<&str>) -> Option<String> {
421    if let Some(r) = flag {
422        return Some(r.to_owned());
423    }
424    let host = endpoint.host_str()?;
425    // Bare `amazonaws.com` is an AWS host with no leading content (no
426    // region segment), so it short-circuits to `None` like `s3.amazonaws.com`
427    // does — the SDK provider chain picks the region. Everything else
428    // that does not end in an AWS partition suffix is treated as a
429    // third-party S3-compatible endpoint and gets the safe `us-east-1`
430    // default.
431    if host == "amazonaws.com" {
432        return None;
433    }
434    let Some(trimmed) = strip_aws_host_suffix(host) else {
435        return Some("us-east-1".to_owned());
436    };
437    extract_aws_region(trimmed)
438}
439
440/// Parse the AWS region out of an AWS hostname's leading portion (the
441/// host with its [`AWS_HOST_SUFFIXES`] suffix already stripped).
442fn extract_aws_region(trimmed: &str) -> Option<String> {
443    // Patterns we accept (in priority order):
444    //   s3                        → legacy us-east-1 (no region segment) → None
445    //   s3.<region>               → path-style regional
446    //   s3-<region>               → legacy hyphenated form
447    //   <bucket>.s3.<region>      → simple virtual-hosted (single-label bucket)
448    //   <dotted.bucket>.s3.<region>  → dotted-bucket virtual-hosted (4+ labels)
449    let labels: Vec<&str> = trimmed.split('.').collect();
450    // The explicit arms below match a fixed label count (1, 2, or 3),
451    // which guarantees the captured `region` is a single dot-free label.
452    // Only the fallback arm operates on the unbounded "4+ labels" shape,
453    // where the captured region could in principle still contain a `.`
454    // (a malformed host); the dot-filter on that arm rejects those.
455    match labels.as_slice() {
456        ["s3"] => None,
457        ["s3", region] => Some((*region).to_owned()),
458        [_bucket, "s3", region] => Some((*region).to_owned()),
459        [head] if head.starts_with("s3-") => Some(head["s3-".len()..].to_owned()),
460        // Dotted-bucket virtual-hosted: find the rightmost service infix
461        // (.s3. or .s3-) and return the segment after it as the region.
462        // e.g. "bucketname.com.s3.us-west-2" → "us-west-2"
463        // Use max by byte position so we pick the rightmost infix when
464        // both `.s3.` and `.s3-` appear in the host.
465        _ => AWS_S3_INFIXES
466            .iter()
467            .filter_map(|infix| {
468                trimmed
469                    .rfind(infix)
470                    .map(|idx| (idx, trimmed[idx + infix.len()..].to_owned()))
471            })
472            .max_by_key(|(idx, _)| *idx)
473            .map(|(_, region)| region)
474            .filter(|region| !region.is_empty() && !region.contains('.')),
475    }
476}
477
478/// Plan inclusive RFC 7233 byte ranges for a parallel ranged-GET download.
479///
480/// `size = 0` → empty vec (caller writes a zero-length file directly).
481/// Otherwise: full chunks of `chunk_size` bytes, with the final range
482/// covering whatever remainder is left (`(N*chunk, size-1)`).
483pub(crate) fn plan_ranges(size: u64, chunk_size: u64) -> Vec<(u64, u64)> {
484    if size == 0 || chunk_size == 0 {
485        return Vec::new();
486    }
487    let mut ranges = Vec::new();
488    let mut start = 0u64;
489    while start < size {
490        let end = (start + chunk_size - 1).min(size - 1);
491        ranges.push((start, end));
492        start = end + 1;
493    }
494    ranges
495}
496
497/// Encode a `<bucket>/<key>` pair for the `x-amz-copy-source` header.
498///
499/// `aws-sdk-s3` 1.x forwards `copy_source` verbatim; we have to encode
500/// reserved characters (notably `#` in `LOCK#.lock`) ourselves.
501pub(crate) fn encode_copy_source(bucket: &str, key: &str) -> String {
502    let bucket_enc = utf8_percent_encode(bucket, COPY_SOURCE_ENCODE);
503    let key_enc = utf8_percent_encode(key, COPY_SOURCE_ENCODE);
504    format!("{bucket_enc}/{key_enc}")
505}
506
507/// Map a typed `aws-sdk-s3` error into the trait's [`ObjectStoreError`] enum.
508///
509/// `key` is the operation's key/prefix context — it appears in the
510/// resulting [`ObjectStoreError::NotFound`] / [`ObjectStoreError::AccessDenied`] /
511/// [`ObjectStoreError::PreconditionFailed`] / [`ObjectStoreError::Conflict`] payload.
512///
513/// Note that this also covers typed `NotFound` / `NoSuchKey` variants
514/// the SDK constructs from 404 responses: those carry HTTP 404 on
515/// `svc.raw().status()` and so route through the status-based branch
516/// of [`classify_status_and_code`].
517fn classify<E>(err: SdkError<E>, key: &str) -> ObjectStoreError
518where
519    E: std::error::Error + Send + Sync + 'static + ProvideErrorMetadata,
520{
521    if let SdkError::ServiceError(svc) = &err {
522        let status = svc.raw().status().as_u16();
523        let code = svc.err().code();
524        if let Some(mapped) = classify_status_and_code(status, code, key) {
525            return mapped;
526        }
527    }
528    match &err {
529        SdkError::DispatchFailure(_) | SdkError::TimeoutError(_) => network_boxed(err),
530        _ => other_boxed(err),
531    }
532}
533
534/// Convert a single [`aws_sdk_s3::types::Object`] from a
535/// `ListObjectsV2` page into the trait's [`ObjectMeta`].
536///
537/// Extracted so unit tests can drive the missing-key and
538/// missing-last-modified guard branches via `Object`'s builder
539/// without synthesising a full `ListObjectsV2Output`.
540pub(crate) fn object_to_meta(
541    obj: &aws_sdk_s3::types::Object,
542) -> Result<ObjectMeta, ObjectStoreError> {
543    let key = obj
544        .key()
545        .ok_or_else(|| {
546            ObjectStoreError::Other("list_objects_v2 returned an object without a key".into())
547        })?
548        .to_owned();
549    let size = u64::try_from(obj.size().unwrap_or(0)).unwrap_or(0);
550    let last_modified = obj
551        .last_modified()
552        .ok_or_else(|| {
553            ObjectStoreError::Other(
554                format!("list_objects_v2 returned object `{key}` without last_modified").into(),
555            )
556        })?
557        .to_time()
558        .map_err(other_boxed)?;
559    Ok(ObjectMeta {
560        key,
561        size,
562        last_modified,
563        // ListObjectsV2 does return ETags, but they are not consumed
564        // by any current caller; keep `None` to avoid inflating the
565        // per-object metadata for list results.
566        etag: None,
567    })
568}
569
570/// Convert a [`HeadObject`] response's relevant fields into the trait's
571/// [`ObjectMeta`].
572///
573/// Extracted so unit tests can drive the missing-content-length and
574/// missing-last-modified guard branches without standing up a live S3
575/// or constructing a full `HeadObjectOutput` (whose builder is not
576/// trivially mockable).
577///
578/// A missing `Content-Length` is an error rather than silent zero: a
579/// 0-byte size is semantically meaningful in this codebase (lock
580/// files are intentionally empty) and downstream `get_to_file` takes
581/// a fast path on `size == 0` that writes an empty destination file.
582/// Treating "header absent" as 0 would silently produce empty bundles
583/// instead of surfacing the malformed response. Every backend HEAD
584/// must yield `Content-Length`.
585pub(crate) fn head_output_to_meta(
586    key: &str,
587    content_length: Option<i64>,
588    last_modified: Option<&aws_sdk_s3::primitives::DateTime>,
589    etag: Option<&str>,
590) -> Result<ObjectMeta, ObjectStoreError> {
591    let raw_size = content_length.ok_or_else(|| {
592        ObjectStoreError::Other(format!("head_object on `{key}` returned no content-length").into())
593    })?;
594    // `i64` is the SDK's wire type; clamp a (legally impossible) negative
595    // value to 0 rather than wrap to a huge u64. Mirrors `object_to_meta`.
596    let size = u64::try_from(raw_size).unwrap_or(0);
597    let last_modified = last_modified
598        .ok_or_else(|| {
599            ObjectStoreError::Other(
600                format!("head_object on `{key}` returned no last_modified").into(),
601            )
602        })?
603        .to_time()
604        .map_err(other_boxed)?;
605    Ok(ObjectMeta {
606        key: key.to_owned(),
607        size,
608        last_modified,
609        etag: etag.map(str::to_owned),
610    })
611}
612
613/// Pure classifier core (no `SdkError` involvement) so unit tests can
614/// exercise every branch without synthesising SDK error types.
615fn classify_status_and_code(
616    status: u16,
617    code: Option<&str>,
618    key: &str,
619) -> Option<ObjectStoreError> {
620    match status {
621        404 => return Some(ObjectStoreError::NotFound(key.to_owned())),
622        403 => return Some(ObjectStoreError::AccessDenied(key.to_owned())),
623        412 => return Some(ObjectStoreError::PreconditionFailed(key.to_owned())),
624        409 => return Some(ObjectStoreError::Conflict(key.to_owned())),
625        // S3 occasionally surfaces HTTP 413 directly (front-door / proxy
626        // path); the canonical EntityTooLarge response is HTTP 400, but
627        // the status mapping is the same regardless of the SDK code.
628        413 => {
629            return Some(ObjectStoreError::PayloadTooLarge {
630                limit_bytes: SINGLE_PUT_LIMIT_BYTES,
631            });
632        }
633        _ => {}
634    }
635    match code {
636        Some("NoSuchKey" | "NoSuchBucket" | "NotFound") => {
637            Some(ObjectStoreError::NotFound(key.to_owned()))
638        }
639        Some("AccessDenied") => Some(ObjectStoreError::AccessDenied(key.to_owned())),
640        Some("PreconditionFailed") => Some(ObjectStoreError::PreconditionFailed(key.to_owned())),
641        Some("ConditionalRequestConflict") => Some(ObjectStoreError::Conflict(key.to_owned())),
642        // S3 returns HTTP 400 + `EntityTooLarge` when a single-PUT body
643        // exceeds 5 GiB. The status alone is too broad to hang
644        // `PayloadTooLarge` on, so route via the code.
645        Some("EntityTooLarge") => Some(ObjectStoreError::PayloadTooLarge {
646            limit_bytes: SINGLE_PUT_LIMIT_BYTES,
647        }),
648        _ => None,
649    }
650}
651
652#[async_trait::async_trait]
653impl ObjectStore for S3Store {
654    async fn list(&self, prefix: &str) -> Result<Vec<ObjectMeta>, ObjectStoreError> {
655        let mut out = Vec::new();
656        let mut token: Option<String> = None;
657        loop {
658            let resp = self
659                .client
660                .list_objects_v2()
661                .bucket(&self.bucket)
662                .prefix(prefix)
663                .set_continuation_token(token.take())
664                .send()
665                .await
666                .map_err(|e| classify(e, prefix))?;
667
668            out.reserve(resp.contents().len());
669            for obj in resp.contents() {
670                out.push(object_to_meta(obj)?);
671            }
672
673            if !resp.is_truncated().unwrap_or(false) {
674                break;
675            }
676            // Defensive: a server that signals truncated but omits the
677            // continuation token would loop forever.
678            match resp.next_continuation_token() {
679                Some(t) => token = Some(t.to_owned()),
680                None => break,
681            }
682        }
683        Ok(out)
684    }
685
686    async fn get_to_file(
687        &self,
688        key: &str,
689        dest: &Path,
690        opts: GetOpts,
691    ) -> Result<(), ObjectStoreError> {
692        let parent = dest.parent().ok_or_else(|| {
693            ObjectStoreError::Other(
694                format!("destination `{}` has no parent directory", dest.display()).into(),
695            )
696        })?;
697
698        // Mirror Azure: try once, retry once on 412 (the head→GET race).
699        // After the second attempt any error — including a repeated 412 —
700        // propagates. Encoding retry as an explicit second call keeps every
701        // control-flow path returning a value, so no `unreachable!` is
702        // required.
703        let progress = opts.progress.as_ref();
704        match self.head_then_download(key, dest, parent, progress).await {
705            Err(ObjectStoreError::PreconditionFailed(_)) => {
706                tracing::warn!(key, "object changed between head and GET; retrying");
707                self.head_then_download(key, dest, parent, progress).await
708            }
709            other => other,
710        }
711    }
712
713    async fn get_bytes(&self, key: &str) -> Result<Bytes, ObjectStoreError> {
714        let resp = self
715            .client
716            .get_object()
717            .bucket(&self.bucket)
718            .key(key)
719            .send()
720            .await
721            .map_err(|e| classify(e, key))?;
722        let aggregated = resp.body.collect().await.map_err(network_boxed)?;
723        Ok(aggregated.into_bytes())
724    }
725
726    /// Issue a `GetObject` with a `Range: bytes=<start>-<end-1>` header.
727    /// HTTP 416 maps to [`ObjectStoreError::RangeNotSatisfiable`] with
728    /// the original requested range (so the wire-line names what the
729    /// caller asked for, not the server's translation). All other
730    /// failures route through [`classify`].
731    ///
732    /// S3 silently truncates a ranged GET to EOF when the requested
733    /// range overruns the object — `start < body.len() <= end` returns
734    /// `start..body.len()` bytes with HTTP 206 and no error. The
735    /// post-flight length check via [`super::verify_range_response_length`]
736    /// elevates that mismatch to [`ObjectStoreError::RangeNotSatisfiable`]
737    /// so callers (notably the packchain reader) cannot mistake a
738    /// truncated slice for the full requested range.
739    async fn get_bytes_range(
740        &self,
741        key: &str,
742        range: std::ops::Range<u64>,
743    ) -> Result<Bytes, ObjectStoreError> {
744        if let Some(empty) = super::precheck_range(key, &range)? {
745            return Ok(empty);
746        }
747        // Inclusive end byte for the HTTP `Range` header (RFC 7233).
748        let inclusive_end = range.end - 1;
749        let result = self
750            .client
751            .get_object()
752            .bucket(&self.bucket)
753            .key(key)
754            .range(format!("bytes={}-{}", range.start, inclusive_end))
755            .send()
756            .await;
757        let resp = match result {
758            Ok(resp) => resp,
759            Err(err) => {
760                if let SdkError::ServiceError(svc) = &err
761                    && svc.raw().status().as_u16() == 416
762                {
763                    return Err(ObjectStoreError::RangeNotSatisfiable {
764                        key: key.to_owned(),
765                        requested: range,
766                    });
767                }
768                return Err(classify(err, key));
769            }
770        };
771        let aggregated = resp.body.collect().await.map_err(network_boxed)?;
772        super::verify_range_response_length(key, &range, aggregated.into_bytes())
773    }
774
775    async fn put_bytes(
776        &self,
777        key: &str,
778        body: Bytes,
779        opts: PutOpts,
780    ) -> Result<(), ObjectStoreError> {
781        // PutObject rejects bodies > 5 GiB; above [`MULTIPART_PUT_THRESHOLD`]
782        // we hand off to the multipart path which lifts that ceiling and
783        // emits per-part progress events. Below the threshold we keep the
784        // single round trip (no `CreateMultipartUpload` cost). Issue #53.
785        let size = body.len() as u64;
786        if should_use_multipart(size) {
787            return self.multipart_put_bytes(key, body, size, opts).await;
788        }
789        let progress = opts.progress.clone();
790        self.put_body(key, ByteStream::from(body), opts).await?;
791        if let Some(sink) = progress
792            && size > 0
793        {
794            sink.report(size);
795        }
796        Ok(())
797    }
798
799    async fn put_path(&self, key: &str, src: &Path, opts: PutOpts) -> Result<(), ObjectStoreError> {
800        // Open the file once and read size from the open handle. This
801        // closes the metadata/upload race that would let a concurrent
802        // truncate or rename produce a body whose length disagrees
803        // with the size we used for multipart planning. The size-based
804        // multipart dispatch (issue #53) and the post-transfer
805        // progress event both consume the same `body_len`.
806        let file = tokio::fs::File::open(src).await.map_err(other_boxed)?;
807        let body_len = file.metadata().await.map_err(other_boxed)?.len();
808        if should_use_multipart(body_len) {
809            return self.multipart_put_path(key, file, body_len, opts).await;
810        }
811        // Below the threshold: single PutObject. `FsBuilder::file`
812        // consumes our already-open handle so the SDK does not
813        // re-open by path (which would re-introduce the race).
814        let stream = ByteStream::read_from()
815            .file(file)
816            .length(Length::Exact(body_len))
817            .build()
818            .await
819            .map_err(other_boxed)?;
820        let progress = opts.progress.clone();
821        self.put_body(key, stream, opts).await?;
822        if let Some(sink) = progress
823            && body_len > 0
824        {
825            sink.report(body_len);
826        }
827        Ok(())
828    }
829
830    async fn put_if_absent(&self, key: &str, body: Bytes) -> Result<bool, ObjectStoreError> {
831        let resp = self
832            .client
833            .put_object()
834            .bucket(&self.bucket)
835            .key(key)
836            .if_none_match("*")
837            .body(ByteStream::from(body))
838            .send()
839            .await;
840        match resp.map_err(|e| classify(e, key)) {
841            Ok(_) => Ok(true),
842            Err(ObjectStoreError::PreconditionFailed(_) | ObjectStoreError::Conflict(_)) => {
843                Ok(false)
844            }
845            Err(other) => Err(other),
846        }
847    }
848
849    async fn head(&self, key: &str) -> Result<ObjectMeta, ObjectStoreError> {
850        let resp = self
851            .client
852            .head_object()
853            .bucket(&self.bucket)
854            .key(key)
855            .send()
856            .await
857            .map_err(|e| classify(e, key))?;
858        head_output_to_meta(
859            key,
860            resp.content_length(),
861            resp.last_modified(),
862            resp.e_tag(),
863        )
864    }
865
866    async fn copy(&self, src: &str, dst: &str) -> Result<(), ObjectStoreError> {
867        // `CopyObject` rejects bodies > 5 GiB; above the multipart
868        // threshold we HEAD `src` and use `UploadPartCopy` per part
869        // (issue #53). The HEAD adds one round trip for small copies,
870        // but the only production caller (`Doctor::evict_losing_bundle`)
871        // is a quarantine path on bundles that can be multi-GiB —
872        // paying one HEAD to learn whether to multipart is worth it.
873        //
874        // The HEAD also yields the source `ETag`, which we pass as
875        // `x-amz-copy-source-if-match` on every subsequent
876        // `CopyObject` / `UploadPartCopy`. Without it, a source
877        // mutation between HEAD and copy would silently produce a
878        // destination whose bytes are a mix of the pre- and post-
879        // mutation source. With it, S3 returns 412
880        // (`PreconditionFailed`) and the caller can retry. Azure's
881        // `copy()` already has this property because it routes
882        // through `head_then_download`'s 412 retry.
883        let meta = self.head(src).await?;
884        if should_use_multipart(meta.size) {
885            return self
886                .multipart_copy(src, dst, meta.size, meta.etag.as_deref())
887                .await;
888        }
889        let copy_source = encode_copy_source(&self.bucket, src);
890        // `MetadataDirective::Replace` makes S3 consistent with the Azure
891        // backend (which drops metadata on copy via download-then-reupload):
892        // neither backend preserves user metadata, matching the trait
893        // contract in `ObjectStore::copy`.
894        // Pass `src` as the key context so a 404 surfaces as
895        // `NotFound(src)` — that's what the trait promises.
896        let mut req = self
897            .client
898            .copy_object()
899            .bucket(&self.bucket)
900            .key(dst)
901            .copy_source(copy_source)
902            .metadata_directive(MetadataDirective::Replace);
903        if let Some(etag) = meta.etag.as_deref() {
904            req = req.copy_source_if_match(etag);
905        }
906        req.send().await.map_err(|e| classify(e, src))?;
907        Ok(())
908    }
909
910    async fn delete(&self, key: &str) -> Result<(), ObjectStoreError> {
911        // S3 DeleteObject is idempotent (returns 204 even for missing
912        // keys), but the trait contract demands `Err(NotFound)` on a
913        // missing key — so HEAD first. Concurrent deletion between this
914        // HEAD and the DELETE will return Ok rather than NotFound;
915        // semantically acceptable since the key existed at some point
916        // during the call.
917        self.head(key).await?;
918        self.client
919            .delete_object()
920            .bucket(&self.bucket)
921            .key(key)
922            .send()
923            .await
924            .map_err(|e| classify(e, key))?;
925        Ok(())
926    }
927
928    /// Presign a `GetObject` request for `key` valid for `ttl`. Used
929    /// by the `bundle-uri` capability (issue #76) to advertise
930    /// time-limited download URLs against private buckets. The
931    /// returned URL carries an `X-Amz-Signature` query parameter
932    /// derived from the SDK's resolved `SigV4` credentials and an
933    /// `X-Amz-Expires=<ttl-seconds>` parameter that the operator
934    /// can use to verify the requested TTL was honoured.
935    ///
936    /// # Errors
937    ///
938    /// Returns [`ObjectStoreError::Other`] when the SDK rejects the
939    /// TTL (AWS caps presigned URLs at 7 days) or when the
940    /// presigning step fails (e.g. credential provider returned no
941    /// credentials).
942    async fn presigned_get_url(
943        &self,
944        key: &str,
945        ttl: std::time::Duration,
946    ) -> Result<String, ObjectStoreError> {
947        let config = aws_sdk_s3::presigning::PresigningConfig::expires_in(ttl).map_err(|e| {
948            ObjectStoreError::Other(format!("PresigningConfig::expires_in({ttl:?}): {e}").into())
949        })?;
950        let presigned = self
951            .client
952            .get_object()
953            .bucket(&self.bucket)
954            .key(key)
955            .presigned(config)
956            .await
957            .map_err(|e| classify(e, key))?;
958        Ok(presigned.uri().to_owned())
959    }
960}
961
962impl S3Store {
963    /// One head→tempfile→download→persist round trip.
964    ///
965    /// Factored out so [`get_to_file`](ObjectStore::get_to_file) can invoke
966    /// it twice: once normally, once more on a 412 retry. Mirrors
967    /// `AzureStore::head_then_download` so both backends share the same
968    /// retry shape.
969    async fn head_then_download(
970        &self,
971        key: &str,
972        dest: &Path,
973        parent: &Path,
974        progress: Option<&ProgressSink>,
975    ) -> Result<(), ObjectStoreError> {
976        let meta = self.head(key).await?;
977        let temp = NamedTempFile::new_in(parent).map_err(other_boxed)?;
978        if meta.size == 0 {
979            // Skip the GET entirely for zero-byte objects (lock files):
980            // `download_single` would issue a plain GET for an empty body
981            // and `download_multipart` would set_len(0) with no ranges —
982            // both correct but a wasted round trip and a wasted file
983            // open, respectively.
984            return persist_temp(temp, dest);
985        }
986
987        if meta.size <= MULTIPART_THRESHOLD {
988            self.download_single(key, temp.path(), meta.etag.as_deref(), progress)
989                .await?;
990        } else {
991            self.download_multipart(key, temp.path(), meta.size, meta.etag.as_deref(), progress)
992                .await?;
993        }
994        persist_temp(temp, dest)
995    }
996
997    /// Common upload helper: sends the given [`ByteStream`] to S3 with
998    /// optional `Content-Disposition` and user metadata from [`PutOpts`].
999    /// Shared by `put_bytes` (in-memory) and `put_path` (streamed from
1000    /// disk).
1001    async fn put_body(
1002        &self,
1003        key: &str,
1004        body: ByteStream,
1005        opts: PutOpts,
1006    ) -> Result<(), ObjectStoreError> {
1007        let mut req = self
1008            .client
1009            .put_object()
1010            .bucket(&self.bucket)
1011            .key(key)
1012            .body(body);
1013        if let Some(cd) = &opts.content_disposition {
1014            req = req.content_disposition(cd);
1015        }
1016        for (k, v) in &opts.user_metadata {
1017            // S3 lowercases user-metadata keys on storage and limits the
1018            // combined header set to ~2 KB; ASCII only (RFC 2047 encode
1019            // non-ASCII upstream).
1020            req = req.metadata(k, v);
1021        }
1022        // Disable read_timeout for this operation: smithy's read_timeout
1023        // resolves the HTTP connector future at "response-headers received,"
1024        // which for uploads includes the entire request body upload. The
1025        // global READ_TIMEOUT (30 s) would otherwise abort any bundle
1026        // upload that takes longer than 30 s. GET/HEAD/LIST operations keep
1027        // the timeout via the client-level config; uploads opt out here.
1028        //
1029        // Caveat: smithy's `MergeTimeoutConfig::merge_iter` treats an
1030        // override whose `has_timeouts()` is false (no field in `Set` state
1031        // — only `Disabled` and `Unset` count as "no timeouts") as a no-op
1032        // and skips inheriting from the client-level config. So this
1033        // override does NOT inherit `connect_timeout` (or any future
1034        // client-level timeout) from the SDK's config. That is fine for the
1035        // current use case — the only timeout we configure at the client
1036        // level is `read_timeout`, which we explicitly want to disable —
1037        // but a future client-level `connect_timeout` would have to be
1038        // duplicated here to take effect on uploads.
1039        req.customize()
1040            .config_override(
1041                aws_sdk_s3::config::Builder::new().timeout_config(upload_timeout_config()),
1042            )
1043            .send()
1044            .await
1045            .map_err(|e| classify(e, key))?;
1046        Ok(())
1047    }
1048
1049    /// Stream a small (<= [`MULTIPART_THRESHOLD`]) object directly to the
1050    /// temp-file path. Caller is responsible for `persist`-ing the file.
1051    ///
1052    /// When `etag` is `Some`, the request carries `If-Match` so S3
1053    /// returns 412 if the object was overwritten since the `head` call.
1054    /// When `progress` is `Some`, fires once per SDK body chunk read
1055    /// off the wire — chunk sizes follow the SDK's internal aggregation
1056    /// (typically 1 MiB-ish for HTTPS).
1057    async fn download_single(
1058        &self,
1059        key: &str,
1060        temp_path: &Path,
1061        etag: Option<&str>,
1062        progress: Option<&ProgressSink>,
1063    ) -> Result<(), ObjectStoreError> {
1064        let mut req = self.client.get_object().bucket(&self.bucket).key(key);
1065        if let Some(etag) = etag {
1066            req = req.if_match(etag);
1067        }
1068        let mut resp = req.send().await.map_err(|e| classify(e, key))?;
1069
1070        let mut file = tokio::fs::OpenOptions::new()
1071            .write(true)
1072            .truncate(true)
1073            .open(temp_path)
1074            .await
1075            .map_err(other_boxed)?;
1076
1077        while let Some(chunk) = resp.body.next().await {
1078            let bytes = chunk.map_err(network_boxed)?;
1079            let chunk_len = bytes.len() as u64;
1080            file.write_all(&bytes).await.map_err(other_boxed)?;
1081            if let Some(sink) = progress
1082                && chunk_len > 0
1083            {
1084                sink.report(chunk_len);
1085            }
1086        }
1087        file.flush().await.map_err(other_boxed)?;
1088        Ok(())
1089    }
1090
1091    /// Download a large object via parallel ranged GETs, writing each
1092    /// range at its absolute offset into the pre-allocated temp file.
1093    ///
1094    /// When `etag` is `Some`, every ranged GET carries `If-Match` so
1095    /// S3 returns 412 if the object is overwritten mid-download. When
1096    /// `progress` is `Some`, fires once per completed range with the
1097    /// range's byte count — events arrive out of order, matching the
1098    /// concurrent-GET schedule, but cumulative bytes equal `size` after
1099    /// the last event.
1100    async fn download_multipart(
1101        &self,
1102        key: &str,
1103        temp_path: &Path,
1104        size: u64,
1105        etag: Option<&str>,
1106        progress: Option<&ProgressSink>,
1107    ) -> Result<(), ObjectStoreError> {
1108        let async_file = tokio::fs::OpenOptions::new()
1109            .write(true)
1110            .truncate(false)
1111            .open(temp_path)
1112            .await
1113            .map_err(other_boxed)?;
1114        async_file.set_len(size).await.map_err(other_boxed)?;
1115
1116        let file = Arc::new(Mutex::new(async_file));
1117        let semaphore = Arc::new(Semaphore::new(MULTIPART_MAX_CONCURRENCY));
1118        let mut tasks: JoinSet<Result<(), ObjectStoreError>> = JoinSet::new();
1119
1120        let etag_owned = etag.map(str::to_owned);
1121        let progress_owned = progress.cloned();
1122        for (start, end) in plan_ranges(size, MULTIPART_CHUNK_SIZE) {
1123            let client = self.client.clone();
1124            let bucket = self.bucket.clone();
1125            let key = key.to_owned();
1126            let etag = etag_owned.clone();
1127            let file = Arc::clone(&file);
1128            let semaphore = Arc::clone(&semaphore);
1129            let progress = progress_owned.clone();
1130            tasks.spawn(async move {
1131                let _permit = semaphore.acquire_owned().await.map_err(other_boxed)?;
1132                let mut req = client
1133                    .get_object()
1134                    .bucket(&bucket)
1135                    .key(&key)
1136                    .range(format!("bytes={start}-{end}"));
1137                if let Some(etag) = &etag {
1138                    req = req.if_match(etag);
1139                }
1140                let resp = req.send().await.map_err(|e| classify(e, &key))?;
1141                let bytes = resp
1142                    .body
1143                    .collect()
1144                    .await
1145                    .map_err(network_boxed)?
1146                    .into_bytes();
1147                let expected = end - start + 1;
1148                if bytes.len() as u64 != expected {
1149                    return Err(ObjectStoreError::Other(
1150                        format!(
1151                            "range bytes={start}-{end} returned {} bytes, expected {expected}",
1152                            bytes.len()
1153                        )
1154                        .into(),
1155                    ));
1156                }
1157                let chunk_len = bytes.len() as u64;
1158                let mut f = file.lock().await;
1159                f.seek(SeekFrom::Start(start)).await.map_err(other_boxed)?;
1160                f.write_all(&bytes).await.map_err(other_boxed)?;
1161                drop(f);
1162                if let Some(sink) = &progress {
1163                    sink.report(chunk_len);
1164                }
1165                Ok(())
1166            });
1167        }
1168
1169        while let Some(joined) = tasks.join_next().await {
1170            joined.map_err(other_boxed)??;
1171        }
1172
1173        // All spawned tasks have been joined above — each task's
1174        // captured `Arc` clone was dropped when its closure
1175        // completed, so this is the only outstanding reference. If
1176        // some future refactor accidentally leaks a clone, surface a
1177        // structured error rather than aborting the process: flush via
1178        // the `Mutex` instead of taking sole ownership.
1179        match Arc::try_unwrap(file) {
1180            Ok(mutex) => {
1181                let mut f = mutex.into_inner();
1182                f.flush().await.map_err(other_boxed)?;
1183            }
1184            Err(shared) => {
1185                let mut f = shared.lock().await;
1186                f.flush().await.map_err(other_boxed)?;
1187            }
1188        }
1189        Ok(())
1190    }
1191
1192    /// Drive a multipart upload from a fully-buffered `Bytes` body.
1193    ///
1194    /// `Bytes::slice` is zero-copy — every part borrows into the same
1195    /// underlying allocation, so peak memory equals the caller's body
1196    /// rather than `body × parts`.
1197    async fn multipart_put_bytes(
1198        &self,
1199        key: &str,
1200        body: Bytes,
1201        size: u64,
1202        opts: PutOpts,
1203    ) -> Result<(), ObjectStoreError> {
1204        let parts = plan_upload_parts(size, MULTIPART_PUT_PART_SIZE, S3_MAX_PARTS);
1205        let guard = self.start_multipart_upload(key, &opts).await?;
1206        let progress = opts.progress.clone();
1207        let result = self
1208            .upload_parts_with_bodies(key, guard.upload_id(), &parts, progress, |part| {
1209                slice_bytes_part(&body, part)
1210            })
1211            .await;
1212        self.finish_multipart_upload(guard, result).await
1213    }
1214
1215    /// Drive a multipart upload by streaming a local file part-by-part.
1216    ///
1217    /// All tasks share one `Arc<std::fs::File>`; per-task
1218    /// `read_file_part` uses `pread` so reads are concurrent without
1219    /// offset contention. Sharing one open file description closes
1220    /// the metadata/upload race that would otherwise let a
1221    /// concurrent rename or truncate produce parts with inconsistent
1222    /// content. With `MULTIPART_PUT_MAX_CONCURRENCY = 8` and
1223    /// `MULTIPART_PUT_PART_SIZE = 16 MiB`, peak memory is bounded at
1224    /// 128 MiB regardless of file size — acceptable for LFS uploads.
1225    async fn multipart_put_path(
1226        &self,
1227        key: &str,
1228        file: tokio::fs::File,
1229        size: u64,
1230        opts: PutOpts,
1231    ) -> Result<(), ObjectStoreError> {
1232        let parts = plan_upload_parts(size, MULTIPART_PUT_PART_SIZE, S3_MAX_PARTS);
1233        let guard = self.start_multipart_upload(key, &opts).await?;
1234        let progress = opts.progress.clone();
1235        let file: Arc<std::fs::File> = Arc::new(file.into_std().await);
1236        let result = self
1237            .upload_parts_from_file(key, guard.upload_id(), file, &parts, progress)
1238            .await;
1239        self.finish_multipart_upload(guard, result).await
1240    }
1241
1242    /// Drive a multipart server-side copy via `UploadPartCopy`.
1243    ///
1244    /// Each part issues an `UploadPartCopy` request with
1245    /// `x-amz-copy-source` (the bucket+key, percent-encoded) and
1246    /// `x-amz-copy-source-range: bytes=<start>-<end>` so the copy
1247    /// runs entirely server-side — no body crosses the wire.
1248    /// The destination object's metadata starts empty (matching the
1249    /// trait contract: copy drops user metadata) because
1250    /// `CreateMultipartUpload` is invoked without any metadata.
1251    ///
1252    /// `src_etag`, if `Some`, is forwarded as
1253    /// `x-amz-copy-source-if-match` on every part so a mid-copy
1254    /// source mutation surfaces as `PreconditionFailed` rather than
1255    /// silently producing a destination with mixed pre/post-mutation
1256    /// bytes.
1257    async fn multipart_copy(
1258        &self,
1259        src: &str,
1260        dst: &str,
1261        size: u64,
1262        src_etag: Option<&str>,
1263    ) -> Result<(), ObjectStoreError> {
1264        let parts = plan_upload_parts(size, MULTIPART_PUT_PART_SIZE, S3_MAX_PARTS);
1265        // `copy()` uses default opts (no progress, no metadata) so the
1266        // create-call below also carries no metadata. That preserves
1267        // the "copy drops user metadata" contract.
1268        let guard = self
1269            .start_multipart_upload(dst, &PutOpts::default())
1270            .await?;
1271        let copy_source = encode_copy_source(&self.bucket, src);
1272        let result = self
1273            .upload_parts_via_copy(src, dst, guard.upload_id(), &copy_source, src_etag, &parts)
1274            .await;
1275        // The upload_id lives under `dst` even though the bytes come
1276        // from `src`; the guard already carries `dst` as its key.
1277        // Errors mid-copy are reported with `src` as context inside
1278        // the per-part task (matches single-call `copy()` at the trait
1279        // surface).
1280        self.finish_multipart_upload(guard, result).await
1281    }
1282
1283    /// Begin a multipart upload, returning a guard that owns the
1284    /// upload-id and aborts the upload on drop.
1285    ///
1286    /// `content_disposition` and `user_metadata` from `PutOpts` flow
1287    /// onto `CreateMultipartUpload` — the destination object inherits
1288    /// them on `CompleteMultipartUpload` (`UploadPart`/`UploadPartCopy`
1289    /// have no metadata fields of their own).
1290    ///
1291    /// The returned [`MultipartUploadGuard`] keeps the upload-id alive
1292    /// for the duration of the upload. If the calling future is
1293    /// dropped (cancelled, panicked, or the caller picks the other arm
1294    /// of a `select!`) before [`finish_multipart_upload`] runs, the
1295    /// guard's [`Drop`] best-effort dispatches `AbortMultipartUpload`
1296    /// so the upload-id is reclaimed and the caller is not billed for
1297    /// orphaned parts (issues #169, #171). S3 retains uncompleted
1298    /// multipart uploads indefinitely without an explicit lifecycle
1299    /// rule; Azure has no equivalent need (uncommitted blocks auto-
1300    /// expire after seven days).
1301    ///
1302    /// [`finish_multipart_upload`]: Self::finish_multipart_upload
1303    async fn start_multipart_upload(
1304        &self,
1305        key: &str,
1306        opts: &PutOpts,
1307    ) -> Result<MultipartUploadGuard, ObjectStoreError> {
1308        let mut req = self
1309            .client
1310            .create_multipart_upload()
1311            .bucket(&self.bucket)
1312            .key(key);
1313        if let Some(cd) = &opts.content_disposition {
1314            req = req.content_disposition(cd);
1315        }
1316        for (k, v) in &opts.user_metadata {
1317            req = req.metadata(k, v);
1318        }
1319        let resp = req.send().await.map_err(|e| classify(e, key))?;
1320        let upload_id = resp.upload_id().map(str::to_owned).ok_or_else(|| {
1321            ObjectStoreError::Other(
1322                format!("CreateMultipartUpload for `{key}` returned no upload-id").into(),
1323            )
1324        })?;
1325        Ok(MultipartUploadGuard::new(
1326            self.client.clone(),
1327            self.bucket.clone(),
1328            key.to_owned(),
1329            upload_id,
1330        ))
1331    }
1332
1333    /// Spawn parallel `UploadPart` tasks, one per planned part, with
1334    /// the part body produced by `make_body(part) -> Bytes`. Used by
1335    /// `multipart_put_bytes` (slice from in-memory `Bytes`).
1336    async fn upload_parts_with_bodies<F>(
1337        &self,
1338        key: &str,
1339        upload_id: &str,
1340        parts: &[UploadPart],
1341        progress: Option<ProgressSink>,
1342        make_body: F,
1343    ) -> Result<Vec<CompletedPart>, ObjectStoreError>
1344    where
1345        F: Fn(UploadPart) -> Result<Bytes, ObjectStoreError>,
1346    {
1347        let semaphore = Arc::new(Semaphore::new(MULTIPART_PUT_MAX_CONCURRENCY));
1348        let mut tasks: JoinSet<Result<CompletedPart, ObjectStoreError>> = JoinSet::new();
1349        for (idx, part) in parts.iter().enumerate() {
1350            let part = *part;
1351            // S3 caps multipart uploads at S3_MAX_PARTS = 10 000
1352            // (`plan_upload_parts` enforces this), so `idx + 1` always
1353            // fits in i32.
1354            let part_number = i32::try_from(idx + 1)
1355                .expect("plan_upload_parts caps parts <= S3_MAX_PARTS = 10_000");
1356            let body = make_body(part)?;
1357            let client = self.client.clone();
1358            let bucket = self.bucket.clone();
1359            let key = key.to_owned();
1360            let upload_id = upload_id.to_owned();
1361            let semaphore = Arc::clone(&semaphore);
1362            let progress = progress.clone();
1363            tasks.spawn(async move {
1364                let _permit = semaphore.acquire_owned().await.map_err(other_boxed)?;
1365                let resp = client
1366                    .upload_part()
1367                    .bucket(&bucket)
1368                    .key(&key)
1369                    .upload_id(&upload_id)
1370                    .part_number(part_number)
1371                    .body(ByteStream::from(body))
1372                    .customize()
1373                    .config_override(
1374                        aws_sdk_s3::config::Builder::new().timeout_config(upload_timeout_config()),
1375                    )
1376                    .send()
1377                    .await
1378                    .map_err(|e| classify(e, &key))?;
1379                let etag = resp.e_tag().map(str::to_owned).ok_or_else(|| {
1380                    ObjectStoreError::Other(
1381                        format!("UploadPart for `{key}` part {part_number} returned no ETag")
1382                            .into(),
1383                    )
1384                })?;
1385                if let Some(sink) = &progress {
1386                    sink.report(part.length);
1387                }
1388                Ok(CompletedPart::builder()
1389                    .part_number(part_number)
1390                    .e_tag(etag)
1391                    .build())
1392            });
1393        }
1394        join_completed_parts(tasks, parts.len()).await
1395    }
1396
1397    /// Spawn parallel `UploadPart` tasks that each read their part
1398    /// from the shared `Arc<std::fs::File>` via `pread`. With
1399    /// concurrency 8 and 16 MiB parts, peak memory is 128 MiB.
1400    async fn upload_parts_from_file(
1401        &self,
1402        key: &str,
1403        upload_id: &str,
1404        file: Arc<std::fs::File>,
1405        parts: &[UploadPart],
1406        progress: Option<ProgressSink>,
1407    ) -> Result<Vec<CompletedPart>, ObjectStoreError> {
1408        let semaphore = Arc::new(Semaphore::new(MULTIPART_PUT_MAX_CONCURRENCY));
1409        let mut tasks: JoinSet<Result<CompletedPart, ObjectStoreError>> = JoinSet::new();
1410        for (idx, part) in parts.iter().enumerate() {
1411            let part = *part;
1412            // S3 caps multipart uploads at S3_MAX_PARTS = 10 000
1413            // (`plan_upload_parts` enforces this), so `idx + 1` always
1414            // fits in i32.
1415            let part_number = i32::try_from(idx + 1)
1416                .expect("plan_upload_parts caps parts <= S3_MAX_PARTS = 10_000");
1417            let client = self.client.clone();
1418            let bucket = self.bucket.clone();
1419            let key = key.to_owned();
1420            let upload_id = upload_id.to_owned();
1421            let task_file = Arc::clone(&file);
1422            let semaphore = Arc::clone(&semaphore);
1423            let progress = progress.clone();
1424            tasks.spawn(async move {
1425                let _permit = semaphore.acquire_owned().await.map_err(other_boxed)?;
1426                let body = read_file_part(task_file, part).await?;
1427                let resp = client
1428                    .upload_part()
1429                    .bucket(&bucket)
1430                    .key(&key)
1431                    .upload_id(&upload_id)
1432                    .part_number(part_number)
1433                    .body(ByteStream::from(body))
1434                    .customize()
1435                    .config_override(
1436                        aws_sdk_s3::config::Builder::new().timeout_config(upload_timeout_config()),
1437                    )
1438                    .send()
1439                    .await
1440                    .map_err(|e| classify(e, &key))?;
1441                let etag = resp.e_tag().map(str::to_owned).ok_or_else(|| {
1442                    ObjectStoreError::Other(
1443                        format!("UploadPart for `{key}` part {part_number} returned no ETag")
1444                            .into(),
1445                    )
1446                })?;
1447                if let Some(sink) = &progress {
1448                    sink.report(part.length);
1449                }
1450                Ok(CompletedPart::builder()
1451                    .part_number(part_number)
1452                    .e_tag(etag)
1453                    .build())
1454            });
1455        }
1456        join_completed_parts(tasks, parts.len()).await
1457    }
1458
1459    /// Spawn parallel `UploadPartCopy` tasks for a server-side
1460    /// multipart copy. No body crosses the wire — each task only
1461    /// sends the source identifier and a byte range header.
1462    ///
1463    /// `src_etag`, if `Some`, is set as `x-amz-copy-source-if-match`
1464    /// on every part. A mid-copy source mutation then surfaces as
1465    /// `PreconditionFailed` on the offending part rather than
1466    /// silently producing a mixed destination.
1467    async fn upload_parts_via_copy(
1468        &self,
1469        src: &str,
1470        dst: &str,
1471        upload_id: &str,
1472        copy_source: &str,
1473        src_etag: Option<&str>,
1474        parts: &[UploadPart],
1475    ) -> Result<Vec<CompletedPart>, ObjectStoreError> {
1476        let semaphore = Arc::new(Semaphore::new(MULTIPART_PUT_MAX_CONCURRENCY));
1477        let mut tasks: JoinSet<Result<CompletedPart, ObjectStoreError>> = JoinSet::new();
1478        for (idx, part) in parts.iter().enumerate() {
1479            let part = *part;
1480            // S3 caps multipart uploads at S3_MAX_PARTS = 10 000
1481            // (`plan_upload_parts` enforces this), so `idx + 1` always
1482            // fits in i32.
1483            let part_number = i32::try_from(idx + 1)
1484                .expect("plan_upload_parts caps parts <= S3_MAX_PARTS = 10_000");
1485            let client = self.client.clone();
1486            let bucket = self.bucket.clone();
1487            let dst = dst.to_owned();
1488            let src_ctx = src.to_owned();
1489            let upload_id = upload_id.to_owned();
1490            let copy_source = copy_source.to_owned();
1491            let src_etag = src_etag.map(str::to_owned);
1492            let range = format!("bytes={}-{}", part.offset, part.offset + part.length - 1);
1493            let semaphore = Arc::clone(&semaphore);
1494            tasks.spawn(async move {
1495                let _permit = semaphore.acquire_owned().await.map_err(other_boxed)?;
1496                // `UploadPartCopy` failures point at the source (a 404
1497                // or 403 means the source went away or is now denied)
1498                // so classify against `src_ctx`, mirroring single-call
1499                // `copy()` at the trait surface.
1500                // Disable read_timeout for the same reason `put_body`
1501                // does (lessons_learned.md #2 / issue #26): smithy
1502                // resolves the connector future at "response-headers
1503                // received," but `UploadPartCopy` doesn't return until
1504                // the server-side copy completes — which for a 16 MiB
1505                // part on a slow region can exceed the 30 s
1506                // [`READ_TIMEOUT`].
1507                let mut req = client
1508                    .upload_part_copy()
1509                    .bucket(&bucket)
1510                    .key(&dst)
1511                    .upload_id(&upload_id)
1512                    .part_number(part_number)
1513                    .copy_source(&copy_source)
1514                    .copy_source_range(&range);
1515                if let Some(etag) = &src_etag {
1516                    req = req.copy_source_if_match(etag);
1517                }
1518                let resp = req
1519                    .customize()
1520                    .config_override(
1521                        aws_sdk_s3::config::Builder::new().timeout_config(upload_timeout_config()),
1522                    )
1523                    .send()
1524                    .await
1525                    .map_err(|e| classify(e, &src_ctx))?;
1526                let etag = resp
1527                    .copy_part_result()
1528                    .and_then(|r| r.e_tag())
1529                    .map(str::to_owned)
1530                    .ok_or_else(|| {
1531                        ObjectStoreError::Other(
1532                            format!(
1533                                "UploadPartCopy for `{src_ctx}` → `{dst}` part {part_number} returned no ETag"
1534                            )
1535                            .into(),
1536                        )
1537                    })?;
1538                Ok(CompletedPart::builder()
1539                    .part_number(part_number)
1540                    .e_tag(etag)
1541                    .build())
1542            });
1543        }
1544        join_completed_parts(tasks, parts.len()).await
1545    }
1546
1547    /// Finalize a multipart upload: complete on success, best-effort
1548    /// abort on error.
1549    ///
1550    /// On success, `complete_multipart_upload` runs and the guard is
1551    /// disarmed so its [`Drop`] does not fire a redundant abort.
1552    ///
1553    /// On a per-part error or join failure, the abort is issued
1554    /// inline (awaited) so the call returns only after the upload-id
1555    /// has been released — matching the synchronous error semantics
1556    /// callers had before the RAII guard was introduced. The guard
1557    /// is disarmed *after* the inline abort completes; if the inline
1558    /// abort itself panics or is cancelled, the guard's [`Drop`]
1559    /// fires the abort again on a detached task. A double-abort is
1560    /// harmless — S3 returns `NoSuchUpload` on the second call.
1561    ///
1562    /// If `complete_multipart_upload` itself fails, the function
1563    /// returns the classified error via `?` *before* `disarm()`;
1564    /// the still-armed guard's [`Drop`] then dispatches the abort
1565    /// on a detached task. This keeps the function short — no
1566    /// extra inline-abort branch — and folds the rare
1567    /// "Complete failed" case onto the same Drop path the
1568    /// future-cancellation case exercises.
1569    ///
1570    /// Abort failures are logged via `tracing::warn` but the
1571    /// original error wins; surfacing the abort error would mask
1572    /// the cause.
1573    async fn finish_multipart_upload(
1574        &self,
1575        mut guard: MultipartUploadGuard,
1576        parts: Result<Vec<CompletedPart>, ObjectStoreError>,
1577    ) -> Result<(), ObjectStoreError> {
1578        match parts {
1579            Ok(parts) => {
1580                let multipart = CompletedMultipartUpload::builder()
1581                    .set_parts(Some(parts))
1582                    .build();
1583                self.client
1584                    .complete_multipart_upload()
1585                    .bucket(&self.bucket)
1586                    .key(guard.key())
1587                    .upload_id(guard.upload_id())
1588                    .multipart_upload(multipart)
1589                    .send()
1590                    .await
1591                    .map_err(|e| classify(e, guard.key()))?;
1592                guard.disarm();
1593                Ok(())
1594            }
1595            Err(err) => {
1596                if let Err(abort_err) = self
1597                    .client
1598                    .abort_multipart_upload()
1599                    .bucket(&self.bucket)
1600                    .key(guard.key())
1601                    .upload_id(guard.upload_id())
1602                    .send()
1603                    .await
1604                {
1605                    tracing::warn!(
1606                        key = %guard.key(),
1607                        upload_id = %guard.upload_id(),
1608                        ?abort_err,
1609                        "AbortMultipartUpload failed; orphan upload may incur storage cost \
1610                         until lifecycle expiry",
1611                    );
1612                }
1613                guard.disarm();
1614                Err(err)
1615            }
1616        }
1617    }
1618}
1619
1620/// RAII guard for an in-flight S3 multipart upload.
1621///
1622/// Owns the inputs needed to issue `AbortMultipartUpload` without
1623/// re-borrowing the [`S3Store`]. While `armed`, the guard's [`Drop`]
1624/// best-effort dispatches `AbortMultipartUpload` on a detached
1625/// `tokio::spawn` task so a future dropped between
1626/// `CreateMultipartUpload` and `CompleteMultipartUpload` does not
1627/// orphan the upload-id on the bucket (issues #169, #171).
1628///
1629/// Call [`disarm`] once the upload has been completed or its abort
1630/// has been issued synchronously so [`Drop`] becomes a no-op.
1631///
1632/// [`disarm`]: Self::disarm
1633struct MultipartUploadGuard {
1634    client: aws_sdk_s3::Client,
1635    bucket: String,
1636    key: String,
1637    upload_id: String,
1638    armed: bool,
1639}
1640
1641impl MultipartUploadGuard {
1642    fn new(client: aws_sdk_s3::Client, bucket: String, key: String, upload_id: String) -> Self {
1643        Self {
1644            client,
1645            bucket,
1646            key,
1647            upload_id,
1648            armed: true,
1649        }
1650    }
1651
1652    fn upload_id(&self) -> &str {
1653        &self.upload_id
1654    }
1655
1656    fn key(&self) -> &str {
1657        &self.key
1658    }
1659
1660    /// Mark the upload as resolved so [`Drop`] does not fire a
1661    /// redundant `AbortMultipartUpload`.
1662    fn disarm(&mut self) {
1663        self.armed = false;
1664    }
1665}
1666
1667impl Drop for MultipartUploadGuard {
1668    fn drop(&mut self) {
1669        if !self.armed {
1670            return;
1671        }
1672        // `Drop` cannot `.await`, so the abort is issued on a
1673        // detached `tokio::spawn` task. `Handle::try_current()`
1674        // returns `Err` if Drop runs outside any runtime (e.g. a
1675        // test that constructs the guard and immediately drops it);
1676        // in that case the best we can do is warn-log — panicking
1677        // in Drop is forbidden by project rules.
1678        let Ok(handle) = tokio::runtime::Handle::try_current() else {
1679            tracing::warn!(
1680                key = %self.key,
1681                upload_id = %self.upload_id,
1682                "MultipartUploadGuard dropped outside a tokio runtime; \
1683                 cannot dispatch AbortMultipartUpload (orphan upload may \
1684                 incur storage cost until S3 lifecycle expiry)",
1685            );
1686            return;
1687        };
1688        // Move owned fields into the detached task so the abort
1689        // outlives the dropped future. `Client` clones cheaply
1690        // (internal `Arc`); the strings move via `mem::take`.
1691        let client = self.client.clone();
1692        let bucket = std::mem::take(&mut self.bucket);
1693        let key = std::mem::take(&mut self.key);
1694        let upload_id = std::mem::take(&mut self.upload_id);
1695        handle.spawn(async move {
1696            if let Err(abort_err) = client
1697                .abort_multipart_upload()
1698                .bucket(&bucket)
1699                .key(&key)
1700                .upload_id(&upload_id)
1701                .send()
1702                .await
1703            {
1704                tracing::warn!(
1705                    key = %key,
1706                    upload_id = %upload_id,
1707                    ?abort_err,
1708                    "AbortMultipartUpload (drop-fire) failed; orphan upload may \
1709                     incur storage cost until S3 lifecycle expiry",
1710                );
1711            }
1712        });
1713    }
1714}
1715
1716/// Drain a `JoinSet` of part-upload tasks into a `Vec<CompletedPart>`,
1717/// short-circuiting on the first error and sorting the result by
1718/// `part_number` so the `CompleteMultipartUpload` request honours
1719/// S3's "parts in part-number order" requirement.
1720async fn join_completed_parts(
1721    mut tasks: JoinSet<Result<CompletedPart, ObjectStoreError>>,
1722    capacity: usize,
1723) -> Result<Vec<CompletedPart>, ObjectStoreError> {
1724    let mut completed = Vec::with_capacity(capacity);
1725    while let Some(joined) = tasks.join_next().await {
1726        let part = joined.map_err(other_boxed)??;
1727        completed.push(part);
1728    }
1729    completed.sort_by_key(|p| {
1730        // Each `CompletedPart` here was built via
1731        // `CompletedPart::builder().part_number(...).e_tag(...).build()`
1732        // in the spawn loops above, so `part_number` is always set.
1733        p.part_number()
1734            .expect("CompletedPart built with explicit part_number")
1735    });
1736    Ok(completed)
1737}
1738
1739#[cfg(test)]
1740mod tests {
1741    use super::*;
1742    use crate::url::{AzureAddressing, RemoteFlags};
1743    use aws_sdk_s3::primitives::DateTime;
1744    use aws_sdk_s3::types::Object;
1745
1746    fn parse_endpoint(s: &str) -> Url {
1747        Url::parse(s).expect("test endpoint URL parses")
1748    }
1749
1750    // --- object_to_meta -----------------------------------------------
1751
1752    #[test]
1753    fn object_to_meta_round_trips_well_formed_object() {
1754        let modified = DateTime::from_secs(1_700_000_000);
1755        let obj = Object::builder()
1756            .key("refs/heads/main/abc.bundle")
1757            .size(42)
1758            .last_modified(modified)
1759            .build();
1760        let meta = object_to_meta(&obj).expect("conversion succeeds");
1761        assert_eq!(meta.key, "refs/heads/main/abc.bundle");
1762        assert_eq!(meta.size, 42);
1763        assert_eq!(meta.last_modified.unix_timestamp(), 1_700_000_000);
1764    }
1765
1766    #[test]
1767    fn object_to_meta_rejects_missing_key() {
1768        let obj = Object::builder()
1769            .last_modified(DateTime::from_secs(1_700_000_000))
1770            .build();
1771        let err = object_to_meta(&obj).expect_err("missing key must error");
1772        match err {
1773            ObjectStoreError::Other(inner) => {
1774                assert!(
1775                    inner.to_string().contains("without a key"),
1776                    "error message names the failure: {inner}"
1777                );
1778            }
1779            other => panic!("expected ObjectStoreError::Other for missing key, got {other:?}"),
1780        }
1781    }
1782
1783    #[test]
1784    fn object_to_meta_rejects_missing_last_modified() {
1785        let obj = Object::builder().key("k").size(0).build();
1786        let err = object_to_meta(&obj).expect_err("missing last_modified must error");
1787        match err {
1788            ObjectStoreError::Other(inner) => {
1789                let msg = inner.to_string();
1790                assert!(
1791                    msg.contains("without last_modified"),
1792                    "names failure: {msg}"
1793                );
1794                assert!(msg.contains("`k`"), "includes the key for context: {msg}");
1795            }
1796            other => {
1797                panic!("expected ObjectStoreError::Other for missing last_modified, got {other:?}")
1798            }
1799        }
1800    }
1801
1802    // --- head_output_to_meta -------------------------------------------
1803
1804    #[test]
1805    fn head_output_to_meta_round_trips_well_formed_response() {
1806        let modified = DateTime::from_secs(1_700_000_000);
1807        let meta = head_output_to_meta("k", Some(42), Some(&modified), Some("\"abc\""))
1808            .expect("conversion succeeds");
1809        assert_eq!(meta.key, "k");
1810        assert_eq!(meta.size, 42);
1811        assert_eq!(meta.last_modified.unix_timestamp(), 1_700_000_000);
1812        assert_eq!(meta.etag.as_deref(), Some("\"abc\""));
1813    }
1814
1815    #[test]
1816    fn head_output_to_meta_preserves_legitimate_zero_size() {
1817        // Zero-byte lock files are legitimate in this codebase; a
1818        // `Content-Length: 0` header (i.e. `Some(0)`) must round-trip
1819        // as `size == 0`, distinct from the missing-header error.
1820        let modified = DateTime::from_secs(1_700_000_000);
1821        let meta = head_output_to_meta("LOCK", Some(0), Some(&modified), None)
1822            .expect("conversion succeeds");
1823        assert_eq!(meta.size, 0);
1824    }
1825
1826    #[test]
1827    fn head_output_to_meta_rejects_missing_content_length() {
1828        let modified = DateTime::from_secs(1_700_000_000);
1829        let err = head_output_to_meta("k", None, Some(&modified), None)
1830            .expect_err("missing content-length must error");
1831        match err {
1832            ObjectStoreError::Other(inner) => {
1833                let msg = inner.to_string();
1834                assert!(msg.contains("no content-length"), "names failure: {msg}");
1835                assert!(msg.contains("`k`"), "includes the key for context: {msg}");
1836            }
1837            other => {
1838                panic!("expected ObjectStoreError::Other for missing content-length, got {other:?}")
1839            }
1840        }
1841    }
1842
1843    #[test]
1844    fn head_output_to_meta_rejects_missing_last_modified() {
1845        let err = head_output_to_meta("k", Some(0), None, None)
1846            .expect_err("missing last_modified must error");
1847        match err {
1848            ObjectStoreError::Other(inner) => {
1849                let msg = inner.to_string();
1850                assert!(msg.contains("no last_modified"), "names failure: {msg}");
1851                assert!(msg.contains("`k`"), "includes the key for context: {msg}");
1852            }
1853            other => {
1854                panic!("expected ObjectStoreError::Other for missing last_modified, got {other:?}")
1855            }
1856        }
1857    }
1858
1859    #[test]
1860    fn head_output_to_meta_clamps_negative_size_to_zero() {
1861        // The SDK types content_length as `Option<i64>`; a (legally
1862        // impossible) negative value clamps to 0 rather than wrapping
1863        // to a huge u64. Mirrors `object_to_meta` behavior.
1864        let modified = DateTime::from_secs(1_700_000_000);
1865        let meta =
1866            head_output_to_meta("k", Some(-1), Some(&modified), None).expect("conversion succeeds");
1867        assert_eq!(meta.size, 0);
1868    }
1869
1870    #[test]
1871    fn object_to_meta_clamps_negative_size_to_zero() {
1872        // S3 cannot legally return a negative size, but the SDK types
1873        // it as `i64`. Defensive default: clamp to 0 rather than
1874        // sign-extend to a huge u64.
1875        let obj = Object::builder()
1876            .key("k")
1877            .size(-1)
1878            .last_modified(DateTime::from_secs(1_700_000_000))
1879            .build();
1880        let meta = object_to_meta(&obj).expect("conversion succeeds");
1881        assert_eq!(meta.size, 0);
1882    }
1883
1884    // --- plan_ranges --------------------------------------------------
1885
1886    #[test]
1887    fn plan_ranges_zero_size_yields_empty_vec() {
1888        assert!(plan_ranges(0, 16).is_empty());
1889    }
1890
1891    #[test]
1892    fn plan_ranges_zero_chunk_yields_empty_vec() {
1893        assert!(plan_ranges(100, 0).is_empty());
1894    }
1895
1896    #[test]
1897    fn plan_ranges_size_one_byte() {
1898        assert_eq!(plan_ranges(1, 16), vec![(0, 0)]);
1899    }
1900
1901    #[test]
1902    fn plan_ranges_size_below_chunk() {
1903        assert_eq!(plan_ranges(10, 16), vec![(0, 9)]);
1904    }
1905
1906    #[test]
1907    fn plan_ranges_size_equals_chunk() {
1908        assert_eq!(plan_ranges(16, 16), vec![(0, 15)]);
1909    }
1910
1911    #[test]
1912    fn plan_ranges_size_one_byte_above_chunk() {
1913        assert_eq!(plan_ranges(17, 16), vec![(0, 15), (16, 16)]);
1914    }
1915
1916    #[test]
1917    fn plan_ranges_exact_multiple_of_chunk() {
1918        assert_eq!(
1919            plan_ranges(48, 16),
1920            vec![(0, 15), (16, 31), (32, 47)],
1921            "three full chunks, no leftover"
1922        );
1923    }
1924
1925    #[test]
1926    fn plan_ranges_with_partial_final_chunk() {
1927        assert_eq!(
1928            plan_ranges(50, 16),
1929            vec![(0, 15), (16, 31), (32, 47), (48, 49)]
1930        );
1931    }
1932
1933    #[test]
1934    fn plan_ranges_handles_huge_size_without_overflow() {
1935        // 6 GiB at 16 MiB chunks → 384 chunks, all valid u64 arithmetic.
1936        let size = 6u64 * 1024 * 1024 * 1024;
1937        let chunk = 16u64 * 1024 * 1024;
1938        let ranges = plan_ranges(size, chunk);
1939        assert_eq!(ranges.len(), 384);
1940        assert_eq!(ranges.first().copied(), Some((0, chunk - 1)));
1941        assert_eq!(ranges.last().copied(), Some((size - chunk, size - 1)));
1942    }
1943
1944    // --- normalize_endpoint -------------------------------------------
1945
1946    #[test]
1947    fn normalize_endpoint_path_style_strips_bucket_path() {
1948        let url = parse_endpoint("https://s3.us-west-2.amazonaws.com/my-bucket");
1949        let out = normalize_endpoint(&url, S3Addressing::PathStyle).unwrap();
1950        assert_eq!(out.host_str(), Some("s3.us-west-2.amazonaws.com"));
1951        assert_eq!(out.path(), "/");
1952        assert!(out.query().is_none());
1953    }
1954
1955    #[test]
1956    fn normalize_endpoint_strips_query_string() {
1957        // Our URL parser leaves `?addressing=path` etc. on the endpoint;
1958        // the SDK rejects any query component.
1959        let url = parse_endpoint("http://127.0.0.1:9000/my-bucket?addressing=path");
1960        let out = normalize_endpoint(&url, S3Addressing::PathStyle).unwrap();
1961        assert!(out.query().is_none(), "query must be stripped: {out}");
1962        assert_eq!(out.path(), "/");
1963        assert_eq!(out.host_str(), Some("127.0.0.1"));
1964        assert_eq!(out.port(), Some(9000));
1965    }
1966
1967    #[test]
1968    fn normalize_endpoint_strips_bucket_label_for_virtual_hosted() {
1969        let url = parse_endpoint("https://my-bucket.s3.us-west-2.amazonaws.com/");
1970        let out = normalize_endpoint(&url, S3Addressing::VirtualHosted).unwrap();
1971        assert_eq!(out.host_str(), Some("s3.us-west-2.amazonaws.com"));
1972        assert_eq!(out.scheme(), "https");
1973        assert_eq!(out.path(), "/");
1974    }
1975
1976    #[test]
1977    fn normalize_endpoint_virtual_hosted_preserves_port_and_scheme() {
1978        let url = parse_endpoint("http://my-bucket.s3.example.com:9000/some/path?x=1");
1979        let out = normalize_endpoint(&url, S3Addressing::VirtualHosted).unwrap();
1980        assert_eq!(out.scheme(), "http");
1981        assert_eq!(out.host_str(), Some("s3.example.com"));
1982        assert_eq!(out.port(), Some(9000));
1983        assert_eq!(out.path(), "/");
1984        assert!(out.query().is_none());
1985    }
1986
1987    #[test]
1988    fn normalize_endpoint_dotted_bucket_virtual_hosted() {
1989        // Bucket name contains dots (e.g. "bucketname.com"). A plain
1990        // `split_once('.')` would stop at the first dot and produce
1991        // "com.s3.us-west-2.amazonaws.com" instead of the correct
1992        // "s3.us-west-2.amazonaws.com".
1993        let url = parse_endpoint("https://bucketname.com.s3.us-west-2.amazonaws.com/some/path");
1994        let out = normalize_endpoint(&url, S3Addressing::VirtualHosted).unwrap();
1995        assert_eq!(out.host_str(), Some("s3.us-west-2.amazonaws.com"));
1996        assert_eq!(out.path(), "/");
1997        assert!(out.query().is_none());
1998    }
1999
2000    // --- resolve_region -----------------------------------------------
2001
2002    #[test]
2003    fn resolve_region_flag_takes_precedence() {
2004        let url = parse_endpoint("https://my-bucket.s3.us-west-2.amazonaws.com/");
2005        assert_eq!(
2006            resolve_region(&url, Some("eu-central-1")),
2007            Some("eu-central-1".to_owned())
2008        );
2009    }
2010
2011    #[test]
2012    fn resolve_region_extracts_from_virtual_hosted_aws_host() {
2013        let url = parse_endpoint("https://my-bucket.s3.us-west-2.amazonaws.com/");
2014        assert_eq!(resolve_region(&url, None), Some("us-west-2".to_owned()));
2015    }
2016
2017    #[test]
2018    fn resolve_region_extracts_from_path_style_aws_host() {
2019        let url = parse_endpoint("https://s3.eu-west-1.amazonaws.com/my-bucket");
2020        assert_eq!(resolve_region(&url, None), Some("eu-west-1".to_owned()));
2021    }
2022
2023    #[test]
2024    fn resolve_region_handles_legacy_hyphenated_form() {
2025        let url = parse_endpoint("https://s3-ap-south-1.amazonaws.com/my-bucket");
2026        assert_eq!(resolve_region(&url, None), Some("ap-south-1".to_owned()));
2027    }
2028
2029    #[test]
2030    fn resolve_region_legacy_no_segment_returns_none() {
2031        // s3.amazonaws.com (no region segment) — let the SDK's provider
2032        // chain pick from env/profile.
2033        let url = parse_endpoint("https://s3.amazonaws.com/my-bucket");
2034        assert_eq!(resolve_region(&url, None), None);
2035    }
2036
2037    #[test]
2038    fn resolve_region_non_aws_host_defaults_to_us_east_1() {
2039        let url = parse_endpoint("http://localhost:9000/my-bucket");
2040        assert_eq!(resolve_region(&url, None), Some("us-east-1".to_owned()));
2041    }
2042
2043    #[test]
2044    fn resolve_region_r2_endpoint_defaults_to_us_east_1() {
2045        let url = parse_endpoint("https://abc123.r2.cloudflarestorage.com/my-bucket");
2046        assert_eq!(resolve_region(&url, None), Some("us-east-1".to_owned()));
2047    }
2048
2049    #[test]
2050    fn resolve_region_dotted_bucket_virtual_hosted() {
2051        // Bucket name contains dots. The host has 4+ labels after stripping
2052        // `.amazonaws.com`; `resolve_region` must still find the region.
2053        let url = parse_endpoint("https://bucketname.com.s3.us-west-2.amazonaws.com/some/path");
2054        assert_eq!(resolve_region(&url, None), Some("us-west-2".to_owned()));
2055    }
2056
2057    #[test]
2058    fn resolve_region_china_partition_virtual_hosted() {
2059        // China partition (`.amazonaws.com.cn`) — the suffix list in
2060        // `crate::url::AWS_HOST_SUFFIXES` is the single source of truth
2061        // for which suffixes count as AWS. This test pins parity between
2062        // `check_aws_s3_host` (which accepts the suffix) and
2063        // `resolve_region` (which must extract the region from it).
2064        let url = parse_endpoint("https://my-bucket.s3.cn-north-1.amazonaws.com.cn/repo");
2065        assert_eq!(resolve_region(&url, None), Some("cn-north-1".to_owned()));
2066    }
2067
2068    #[test]
2069    fn resolve_region_china_partition_path_style() {
2070        let url = parse_endpoint("https://s3.cn-northwest-1.amazonaws.com.cn/my-bucket");
2071        assert_eq!(
2072            resolve_region(&url, None),
2073            Some("cn-northwest-1".to_owned())
2074        );
2075    }
2076
2077    // --- encode_copy_source -------------------------------------------
2078
2079    #[test]
2080    fn encode_copy_source_preserves_slash_between_bucket_and_key() {
2081        let out = encode_copy_source("my-bucket", "refs/heads/main/abc.bundle");
2082        assert_eq!(out, "my-bucket/refs/heads/main/abc.bundle");
2083    }
2084
2085    #[test]
2086    fn encode_copy_source_encodes_hash_in_lock_keys() {
2087        // LOCK#.lock from the per-ref locking scheme — # is reserved.
2088        let out = encode_copy_source("my-bucket", "refs/heads/main/LOCK#.lock");
2089        assert_eq!(out, "my-bucket/refs/heads/main/LOCK%23.lock");
2090    }
2091
2092    #[test]
2093    fn encode_copy_source_encodes_spaces_and_query_chars() {
2094        let out = encode_copy_source("my-bucket", "weird key?with=stuff");
2095        assert!(out.contains("%20"), "space encoded: {out}");
2096        assert!(out.contains("%3F"), "? encoded: {out}");
2097        assert!(out.contains("%3D"), "= encoded: {out}");
2098    }
2099
2100    #[test]
2101    fn encode_copy_source_passes_unreserved_through() {
2102        let out = encode_copy_source("my.bucket-name_v1~", "abc-def_ghi.txt");
2103        assert_eq!(out, "my.bucket-name_v1~/abc-def_ghi.txt");
2104    }
2105
2106    // --- classify_status_and_code ------------------------------------
2107
2108    #[test]
2109    fn classify_404_status_is_not_found() {
2110        assert!(matches!(
2111            classify_status_and_code(404, None, "k"),
2112            Some(ObjectStoreError::NotFound(s)) if s == "k"
2113        ));
2114    }
2115
2116    #[test]
2117    fn classify_403_status_is_access_denied() {
2118        assert!(matches!(
2119            classify_status_and_code(403, None, "k"),
2120            Some(ObjectStoreError::AccessDenied(s)) if s == "k"
2121        ));
2122    }
2123
2124    #[test]
2125    fn classify_412_status_is_precondition_failed() {
2126        assert!(matches!(
2127            classify_status_and_code(412, None, "k"),
2128            Some(ObjectStoreError::PreconditionFailed(s)) if s == "k"
2129        ));
2130    }
2131
2132    #[test]
2133    fn classify_409_status_is_conflict() {
2134        // The 409 case is critical: AWS S3 returns 409 when two
2135        // If-None-Match: "*" PUTs race even on a key that did not exist
2136        // beforehand. Without this branch, put_if_absent would surface
2137        // racing-write contention as a hard error instead of Ok(false).
2138        assert!(matches!(
2139            classify_status_and_code(409, None, "k"),
2140            Some(ObjectStoreError::Conflict(s)) if s == "k"
2141        ));
2142    }
2143
2144    #[test]
2145    fn classify_no_such_key_code_falls_back_to_not_found() {
2146        assert!(matches!(
2147            classify_status_and_code(500, Some("NoSuchKey"), "k"),
2148            Some(ObjectStoreError::NotFound(s)) if s == "k"
2149        ));
2150    }
2151
2152    #[test]
2153    fn classify_conditional_request_conflict_code_is_conflict() {
2154        assert!(matches!(
2155            classify_status_and_code(500, Some("ConditionalRequestConflict"), "k"),
2156            Some(ObjectStoreError::Conflict(s)) if s == "k"
2157        ));
2158    }
2159
2160    #[test]
2161    fn classify_entity_too_large_code_is_payload_too_large() {
2162        // S3 returns HTTP 400 + `EntityTooLarge` when a single-PUT body
2163        // exceeds 5 GiB. Status 400 alone is too broad; route via code.
2164        assert!(matches!(
2165            classify_status_and_code(400, Some("EntityTooLarge"), "k"),
2166            Some(ObjectStoreError::PayloadTooLarge { limit_bytes })
2167                if limit_bytes == SINGLE_PUT_LIMIT_BYTES
2168        ));
2169    }
2170
2171    #[test]
2172    fn classify_413_status_is_payload_too_large() {
2173        // Front-door / proxy paths can surface HTTP 413 directly even
2174        // when the canonical S3 response is 400; treat 413 the same.
2175        assert!(matches!(
2176            classify_status_and_code(413, None, "k"),
2177            Some(ObjectStoreError::PayloadTooLarge { limit_bytes })
2178                if limit_bytes == SINGLE_PUT_LIMIT_BYTES
2179        ));
2180    }
2181
2182    #[test]
2183    fn classify_unrecognised_returns_none() {
2184        assert!(classify_status_and_code(500, Some("InternalError"), "k").is_none());
2185        assert!(classify_status_and_code(500, None, "k").is_none());
2186        // Plain 400 with no recognised code stays in `Other` so callers
2187        // see the SDK chain rather than a misleading PayloadTooLarge.
2188        assert!(classify_status_and_code(400, None, "k").is_none());
2189        assert!(classify_status_and_code(400, Some("MalformedXML"), "k").is_none());
2190    }
2191
2192    // --- from_remote_url constructor branch ---------------------------
2193
2194    fn azure_url() -> RemoteUrl {
2195        RemoteUrl::Azure {
2196            endpoint: parse_endpoint("https://acct.blob.core.windows.net/container"),
2197            account: "acct".to_owned(),
2198            container: "container".to_owned(),
2199            prefix: None,
2200            addressing: AzureAddressing::VirtualHosted,
2201            flags: RemoteFlags::default(),
2202        }
2203    }
2204
2205    #[tokio::test]
2206    async fn from_remote_url_rejects_azure() {
2207        let result = S3Store::from_remote_url(&azure_url()).await;
2208        match result {
2209            Err(ObjectStoreError::Other(_)) => {}
2210            Err(other) => panic!("expected ObjectStoreError::Other, got {other:?}"),
2211            Ok(_) => panic!("expected Azure URL to be rejected"),
2212        }
2213    }
2214
2215    // --- ResolvedS3Config (URL → decisions) ---------------------------
2216
2217    #[test]
2218    fn resolved_path_style_minio() {
2219        let endpoint = parse_endpoint("http://127.0.0.1:9000/my-bucket?addressing=path");
2220        let resolved =
2221            ResolvedS3Config::from_url_parts(&endpoint, S3Addressing::PathStyle, None, None)
2222                .expect("resolves");
2223        assert!(resolved.force_path_style);
2224        assert_eq!(resolved.endpoint_url.host_str(), Some("127.0.0.1"));
2225        assert_eq!(resolved.endpoint_url.port(), Some(9000));
2226        assert_eq!(resolved.endpoint_url.path(), "/");
2227        assert!(resolved.endpoint_url.query().is_none());
2228        assert_eq!(resolved.region.as_deref(), Some("us-east-1"));
2229        assert!(resolved.profile.is_none());
2230    }
2231
2232    #[test]
2233    fn resolved_virtual_hosted_aws_strips_bucket_and_picks_region() {
2234        let endpoint = parse_endpoint("https://my-bucket.s3.us-west-2.amazonaws.com/");
2235        let resolved =
2236            ResolvedS3Config::from_url_parts(&endpoint, S3Addressing::VirtualHosted, None, None)
2237                .expect("resolves");
2238        assert!(!resolved.force_path_style);
2239        assert_eq!(
2240            resolved.endpoint_url.host_str(),
2241            Some("s3.us-west-2.amazonaws.com")
2242        );
2243        assert!(
2244            !resolved.endpoint_url.as_str().contains("my-bucket"),
2245            "bucket label must be stripped: {}",
2246            resolved.endpoint_url
2247        );
2248        assert_eq!(resolved.region.as_deref(), Some("us-west-2"));
2249    }
2250
2251    #[test]
2252    fn resolved_explicit_flags_propagate() {
2253        let endpoint = parse_endpoint("http://127.0.0.1:9000/my-bucket");
2254        let resolved = ResolvedS3Config::from_url_parts(
2255            &endpoint,
2256            S3Addressing::PathStyle,
2257            Some("dev-profile"),
2258            Some("eu-central-1"),
2259        )
2260        .expect("resolves");
2261        assert_eq!(resolved.region.as_deref(), Some("eu-central-1"));
2262        assert_eq!(resolved.profile.as_deref(), Some("dev-profile"));
2263    }
2264
2265    #[tokio::test]
2266    async fn build_s3_config_round_trips_resolved_decisions() {
2267        // We can't peek into aws_sdk_s3::Config getters reliably across
2268        // SDK 1.x patch releases, so just confirm the build call accepts
2269        // every decision shape without panicking. The decisions
2270        // themselves are tested via `ResolvedS3Config` above.
2271        //
2272        // Coverage scope: this test catches a panic during
2273        // `Builder::build_https()` construction (e.g. a missing TLS
2274        // provider feature), but does NOT catch a regression that
2275        // silently drops `.http_client(...)` from the loader chain —
2276        // that call is optional, so removing it still compiles and
2277        // returns a config. The constant-pin test below guards the
2278        // value; only an integration test against a real server with
2279        // observable connection-pool timing would catch a regression
2280        // in the wiring itself.
2281        let endpoint = parse_endpoint("http://127.0.0.1:9000/my-bucket");
2282        let resolved =
2283            ResolvedS3Config::from_url_parts(&endpoint, S3Addressing::PathStyle, None, None)
2284                .expect("resolves");
2285        let _config = build_s3_config(&resolved).await;
2286    }
2287
2288    /// Pin the timeout values. A future copy-paste mistake
2289    /// (`from_millis` instead of `from_secs`, an accidental zero)
2290    /// silently disables the very behaviour the constants exist for;
2291    /// fail fast instead. If a constant is deliberately changed,
2292    /// update the expected value on the right-hand side together —
2293    /// the test exists to make such a change deliberate, not to lock
2294    /// the value forever. See the matching Azure-side test for the
2295    /// same rationale.
2296    #[test]
2297    fn timeout_constants_have_expected_values() {
2298        assert_eq!(POOL_IDLE_TIMEOUT, Duration::from_secs(30));
2299        assert_eq!(READ_TIMEOUT, Duration::from_secs(30));
2300    }
2301
2302    /// Pin the `should_use_multipart` predicate at and around the
2303    /// shared threshold (issue #53).
2304    ///
2305    /// `put_bytes`, `put_path`, and `copy` route through this
2306    /// predicate to decide single-PUT vs multipart. The integration
2307    /// tests `multipart_put_emits_per_part_progress_events` and the
2308    /// env-gated `multipart_put_path_above_5_gib_round_trips` cover
2309    /// the dispatch *call* (only multipart emits >= 2 progress
2310    /// events; only multipart succeeds above 5 GiB); this unit test
2311    /// pins the predicate's boundary semantics so the constant can't
2312    /// be moved out from under those tests without something failing.
2313    #[test]
2314    fn should_use_multipart_pins_threshold_boundary() {
2315        use super::super::multipart::MULTIPART_PUT_THRESHOLD;
2316        assert!(!should_use_multipart(MULTIPART_PUT_THRESHOLD - 1));
2317        assert!(should_use_multipart(MULTIPART_PUT_THRESHOLD));
2318        assert!(should_use_multipart(MULTIPART_PUT_THRESHOLD + 1));
2319        // A 6 GiB body must take the multipart path: this is the
2320        // failure mode named in the issue (`EntityTooLarge` on bare
2321        // `PutObject`).
2322        assert!(should_use_multipart(6 * (1 << 30)));
2323    }
2324
2325    /// Tripwire for the `disable_read_timeout()` fix in commit bfec2f4.
2326    ///
2327    /// `put_body` overrides the SDK timeout config per-operation so
2328    /// large bundle uploads are not aborted at [`READ_TIMEOUT`].
2329    /// A regression that drops `.disable_read_timeout()` from the
2330    /// override (e.g. a bare `TimeoutConfig::builder().build()`) would
2331    /// re-introduce the upload-abort bug silently.
2332    ///
2333    /// `TimeoutConfig` does not expose the per-field state via getters
2334    /// (both `Unset` and `Disabled` return `None`), so the assertion
2335    /// uses the merge semantics: build a base config that *does* set
2336    /// `read_timeout`, then verify that merging via `take_defaults_from`
2337    /// keeps `read_timeout` disabled rather than inheriting the base.
2338    #[test]
2339    fn put_body_upload_override_disables_read_timeout() {
2340        let base = TimeoutConfig::builder()
2341            .read_timeout(Duration::from_secs(99))
2342            .build();
2343
2344        // `upload_timeout_config()` is the function `put_body` calls.
2345        let mut override_cfg = upload_timeout_config();
2346        let merged = override_cfg.take_defaults_from(&base);
2347
2348        // If `disable_read_timeout()` is in place, the merged config
2349        // returns `None` (Disabled). If a regression dropped it, the
2350        // merged config would inherit `Some(99s)` from the base.
2351        assert_eq!(
2352            merged.read_timeout(),
2353            None,
2354            "upload override must disable read_timeout, not just leave it Unset",
2355        );
2356    }
2357
2358    // --- MultipartUploadGuard (#169, #171) ----------------------------
2359
2360    /// Build a no-network `Client` for guard tests. The Client is
2361    /// only used to construct guards; no requests are issued from
2362    /// within these tests (we never let an armed guard's spawned
2363    /// abort actually reach the SDK).
2364    fn test_client() -> aws_sdk_s3::Client {
2365        let conf = aws_sdk_s3::Config::builder()
2366            .behavior_version(BehaviorVersion::latest())
2367            .region(Region::new("us-east-1"))
2368            .endpoint_url("http://127.0.0.1:1/")
2369            .build();
2370        aws_sdk_s3::Client::from_conf(conf)
2371    }
2372
2373    fn make_guard() -> MultipartUploadGuard {
2374        MultipartUploadGuard::new(
2375            test_client(),
2376            "bkt".to_owned(),
2377            "k".to_owned(),
2378            "uid".to_owned(),
2379        )
2380    }
2381
2382    #[test]
2383    fn multipart_upload_guard_exposes_constructor_fields() {
2384        // The accessors are load-bearing: `finish_multipart_upload`
2385        // reads them to address the complete/abort calls.
2386        let mut guard = make_guard();
2387        assert_eq!(guard.key(), "k");
2388        assert_eq!(guard.upload_id(), "uid");
2389        // Disarm so Drop is a no-op (the constructor-fields test
2390        // should not exercise the spawn-on-Drop path).
2391        guard.disarm();
2392    }
2393
2394    #[test]
2395    fn multipart_upload_guard_disarmed_drop_outside_runtime_is_silent() {
2396        // Observable contract: a disarmed guard must Drop without
2397        // attempting `Handle::try_current()` or `spawn`. We exercise
2398        // this outside any tokio runtime — `spawn` would panic there
2399        // and a `try_current()` lookup would warn-log. Neither must
2400        // happen on the success path.
2401        let mut guard = make_guard();
2402        guard.disarm();
2403        drop(guard);
2404    }
2405
2406    #[test]
2407    fn multipart_upload_guard_armed_drop_outside_runtime_does_not_panic() {
2408        // Project rule: Drop must never panic. When dropped armed
2409        // outside any tokio runtime, the guard logs a warn and
2410        // returns cleanly rather than panicking on a missing
2411        // runtime handle.
2412        let guard = make_guard();
2413        drop(guard);
2414    }
2415
2416    #[tokio::test]
2417    async fn multipart_upload_guard_armed_drop_inside_runtime_spawns_abort_task() {
2418        // Production failure mode: a future dropped between
2419        // `start_multipart_upload` and `finish_multipart_upload`
2420        // must dispatch `AbortMultipartUpload` on a detached
2421        // tokio task. This test pins the cheap observable contract:
2422        //
2423        // 1. Drop neither panics nor blocks.
2424        // 2. The spawned abort task makes forward progress (the
2425        //    unreachable endpoint `127.0.0.1:1` forces a connect
2426        //    failure inside `send().await`, exercising the spawned
2427        //    closure's `Err`-arm warn-log).
2428        //
2429        // The companion test below
2430        // (`..._drop_issues_abort_multipart_upload`) goes further
2431        // and byte-equality-checks the captured HTTP request, so
2432        // this test only needs to keep the no-panic / forward-
2433        // progress contract on the warn-log path.
2434        //
2435        // Yielding `JoinSet`-style would let us join the task and
2436        // observe completion, but Drop uses a detached `spawn` by
2437        // design (a `JoinHandle` would require Drop to hold state
2438        // for the lifetime of the runtime). Yielding the runtime
2439        // a handful of times lets the spawned task reach its
2440        // `send().await` before the test returns and the runtime
2441        // tears down.
2442        let guard = make_guard();
2443        drop(guard);
2444        for _ in 0..4 {
2445            tokio::task::yield_now().await;
2446        }
2447    }
2448
2449    /// Build an `aws_sdk_s3::Client` whose HTTP layer is the
2450    /// smithy `capture_request` handler — every request the SDK
2451    /// emits is recorded on the returned `CaptureRequestReceiver`
2452    /// instead of touching the network. Static test credentials
2453    /// keep `SigV4` happy so the SDK actually reaches the HTTP
2454    /// layer (an unsigned chain would short-circuit earlier).
2455    fn capture_client() -> (
2456        aws_sdk_s3::Client,
2457        aws_smithy_http_client::test_util::CaptureRequestReceiver,
2458    ) {
2459        use aws_sdk_s3::config::Credentials;
2460        let (http_client, rx) = aws_smithy_http_client::test_util::capture_request(None);
2461        let conf = aws_sdk_s3::Config::builder()
2462            .behavior_version(BehaviorVersion::latest())
2463            .region(Region::new("us-east-1"))
2464            .credentials_provider(Credentials::new(
2465                "AKIAIOSFODNN7EXAMPLE",
2466                "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
2467                None,
2468                None,
2469                "test",
2470            ))
2471            .http_client(http_client)
2472            .force_path_style(true)
2473            .build();
2474        (aws_sdk_s3::Client::from_conf(conf), rx)
2475    }
2476
2477    #[tokio::test]
2478    async fn multipart_upload_guard_drop_issues_abort_multipart_upload() {
2479        // Byte-equality contract for issue #173: when an armed
2480        // `MultipartUploadGuard` is dropped inside a tokio runtime,
2481        // the detached abort task must issue an
2482        // `AbortMultipartUpload` request addressing the exact
2483        // (bucket, key, upload-id) the guard was constructed with.
2484        //
2485        // S3's wire form for `AbortMultipartUpload` is
2486        //   `DELETE /<bucket>/<key>?uploadId=<id>` (path style)
2487        // so we assert: method=DELETE, the captured URI contains
2488        // the key, and `uploadId=<id>` appears in the query.
2489        // We deliberately do NOT couple to the exact host, scheme,
2490        // or full URL — those are SDK-internal details unrelated
2491        // to the abort contract.
2492        let (client, rx) = capture_client();
2493        let guard = MultipartUploadGuard::new(
2494            client,
2495            "test-bucket".to_owned(),
2496            "test/key.pack".to_owned(),
2497            "test-upload-id-abc123".to_owned(),
2498        );
2499        drop(guard);
2500
2501        // The detached `tokio::spawn` task must run far enough to
2502        // submit the request through the capture client. Yielding
2503        // a handful of times lets the SDK's signing + request
2504        // pipeline complete; `capture_request` resolves the call
2505        // synchronously once it sees the request, so no real
2506        // network wait is needed.
2507        for _ in 0..16 {
2508            tokio::task::yield_now().await;
2509        }
2510
2511        let request = rx.expect_request();
2512        assert_eq!(
2513            request.method(),
2514            "DELETE",
2515            "AbortMultipartUpload must be DELETE; got {}",
2516            request.method(),
2517        );
2518        let uri = request.uri();
2519        assert!(
2520            uri.contains("test/key.pack"),
2521            "captured URI must address the guard's key; got {uri}",
2522        );
2523        assert!(
2524            uri.contains("uploadId=test-upload-id-abc123"),
2525            "captured URI must carry the guard's upload-id in the \
2526             query string; got {uri}",
2527        );
2528    }
2529}