git_remote_object_store/object_store/s3.rs
1//! S3 backend for the [`ObjectStore`] trait.
2//!
3//! `S3Store` wraps `aws-sdk-s3`. The SDK owns `SigV4`, retries, connection
4//! pooling, and timeout policy; this module owns the URL → SDK config
5//! translation, the error-code classifier ([`classify`]), and the
6//! hand-rolled multipart download orchestrator that the SDK does not
7//! provide.
8//!
9//! ## Key composition
10//!
11//! `S3Store` does **not** auto-prepend the [`RemoteUrl`] `prefix`. Trait
12//! keys are byte-prefix per the contract on
13//! [`ObjectStore::list`]; the URL `prefix` is
14//! a repository concern and is composed by callers that build keys like
15//! `<prefix>/refs/.../<sha>.bundle`.
16//!
17//! ## Conditional writes
18//!
19//! [`put_if_absent`][super::ObjectStore::put_if_absent] uses
20//! `If-None-Match: "*"`. S3 returns either 412 (`PreconditionFailed`)
21//! when the key already exists or 409 (`ConditionalRequestConflict`)
22//! when two PUTs race. Both collapse to `Ok(false)`.
23//!
24//! ## Size limits
25//!
26//! AWS caps a single `PutObject` body at [`SINGLE_PUT_LIMIT_BYTES`]
27//! (5 GiB) and a multipart upload at [`S3_MAX_PARTS`] (10 000) parts;
28//! the per-object ceiling is 5 TiB. The helper auto-promotes uploads
29//! above [`super::multipart::MULTIPART_PUT_THRESHOLD`] onto the
30//! multipart path, so callers do not have to reason about the 5 GiB
31//! single-PUT cutoff. The upload path is **not resumable** across
32//! process death — see the README "Known limitations" section.
33//!
34//! ## Atomic `get_to_file`
35//!
36//! Both the small-object and multipart download paths write to a sibling
37//! [`tempfile::NamedTempFile`] and rename on success so a partial
38//! failure cannot leave a corrupt destination for the unbundle step.
39//!
40//! Every GET carries `If-Match: <etag>` derived from the preceding
41//! `HeadObject` call. If the object is overwritten between `head` and
42//! the body download, S3 returns 412 and `get_to_file` retries once
43//! with a fresh `head`/`ETag`. After one retry the 412 propagates as
44//! [`ObjectStoreError::PreconditionFailed`].
45//!
46//! ## HTTP transport tuning
47//!
48//! `aws-sdk-s3`'s default HTTP client keeps idle pooled connections
49//! indefinitely, so a pooled connection to a rotated VIP would wedge
50//! an in-flight request until the OS-level TCP retransmit timeout
51//! fires (~15 minutes on Linux). [`S3Store::from_remote_url`] installs
52//! a custom HTTP client built via [`aws_smithy_http_client::Builder`]
53//! with [`POOL_IDLE_TIMEOUT`] bounded to 30 s, so a rotation costs at
54//! most one short-circuited request rather than minutes of wedged
55//! transfer. Tracking issue: #26.
56//!
57//! Pool-idle alone does not bound a *hot* pooled connection — one that
58//! was used within the last 30 s but has since become stuck — and the
59//! 412 retry in [`ObjectStore::get_to_file`] is a deliberate-server-
60//! response retry, so forcing a fresh connection there does not help.
61//! Instead, the SDK's [`aws_config::timeout::TimeoutConfig`] is given
62//! a [`READ_TIMEOUT`] so a stuck request fails fast and the SDK's
63//! internal retry layer can pick a fresh one. `connect_timeout` is
64//! left at the SDK default (3.1 s, already aggressive). Tracking
65//! issue: #26.
66//!
67//! Note: smithy's `read_timeout` resolves the HTTP connector future at
68//! "response-headers received." That bounds:
69//! - **Uploads** in full — the connector future cannot resolve until
70//! the request body is sent and the response status arrives, so a
71//! stuck upload trips at [`READ_TIMEOUT`]. `put_body` therefore
72//! overrides the timeout per-operation so large bundle uploads are
73//! not cut off at 30 s.
74//! - **Downloads** only up to time-to-first-byte. Once response
75//! headers arrive the future resolves; subsequent body-chunk reads
76//! are not bounded by `read_timeout`. A peer that wedges mid-body
77//! on a GET (e.g. a stuck multipart range) is still subject to the
78//! pool-idle / TCP-keepalive layers, but not to `READ_TIMEOUT`.
79//! Lesson #2 in `docs/development/lessons_learned.md` covers this.
80//!
81//! TCP keepalive (the second knob suggested in #27) is **not** wired
82//! on the S3 path: `aws-smithy-http-client` 1.1.12's public `Builder`
83//! / `ConnectorBuilder` API exposes `pool_idle_timeout` but does not
84//! expose `tcp_keepalive`. The dominant DNS-rotation failure in #26 is
85//! pool reuse of a dead VIP, which `pool_idle_timeout` already fixes;
86//! the gap relative to the Azure backend (which uses `reqwest` and
87//! gets keepalive for free) is documented in `CHANGELOG.md`.
88//!
89//! ## Multipart-upload lifetime safety
90//!
91//! S3 retains uncompleted multipart uploads indefinitely without an
92//! explicit lifecycle rule, so a future dropped between
93//! `CreateMultipartUpload` and `CompleteMultipartUpload` orphans the
94//! upload-id and bills the caller for the staged parts (issues #169,
95//! #171). [`S3Store::start_multipart_upload`] therefore hands back a
96//! [`MultipartUploadGuard`] that owns the upload-id and best-effort
97//! issues `AbortMultipartUpload` on `Drop`; [`finish_multipart_upload`]
98//! is the only call site that may [`disarm`] the guard.
99//!
100//! Future contributors must **not** introduce an early `?`-return
101//! between obtaining the upload-id and constructing the
102//! [`MultipartUploadGuard`] inside `start_multipart_upload`, nor
103//! between `start_multipart_upload` and the matching
104//! `finish_multipart_upload`: a bare upload-id outside the guard
105//! reintroduces the leak the guard exists to prevent. (The
106//! `ok_or_else` for a missing upload-id field on the SDK response is
107//! benign — there is no upload-id to abort.)
108//!
109//! Azure has no equivalent need — uncommitted blocks auto-expire after
110//! seven days (`azure.rs`).
111//!
112//! [`finish_multipart_upload`]: S3Store::finish_multipart_upload
113//! [`disarm`]: MultipartUploadGuard::disarm
114//!
115//! ## Stdout discipline
116//!
117//! Per `.claude/rules/protocol-stdout.md`, this module never writes to
118//! stdout. Diagnostics go through `tracing` (which the helper binaries
119//! configure to write to stderr).
120
121use std::io::SeekFrom;
122use std::path::Path;
123use std::sync::Arc;
124use std::time::Duration;
125
126use aws_config::timeout::TimeoutConfig;
127use aws_config::{BehaviorVersion, Region};
128use aws_sdk_s3::error::{ProvideErrorMetadata, SdkError};
129use aws_sdk_s3::primitives::{ByteStream, Length};
130use aws_sdk_s3::types::{CompletedMultipartUpload, CompletedPart, MetadataDirective};
131use aws_smithy_http_client::tls::{Provider as TlsProvider, rustls_provider::CryptoMode};
132use aws_smithy_types_convert::date_time::DateTimeExt;
133use bytes::Bytes;
134use percent_encoding::{AsciiSet, CONTROLS, utf8_percent_encode};
135use tempfile::NamedTempFile;
136use tokio::io::{AsyncSeekExt, AsyncWriteExt};
137use tokio::sync::{Mutex, Semaphore};
138use tokio::task::JoinSet;
139use url::Url;
140
141use crate::url::{
142 AWS_S3_INFIXES, RemoteUrl, S3Addressing, s3_virtual_hosted_bucket, strip_aws_host_suffix,
143};
144
145use super::error::{network_boxed, other_boxed};
146use super::multipart::{
147 MULTIPART_PUT_MAX_CONCURRENCY, MULTIPART_PUT_PART_SIZE, S3_MAX_PARTS, UploadPart,
148 plan_upload_parts, read_file_part, should_use_multipart, slice_bytes_part,
149};
150use super::{
151 GetOpts, ObjectMeta, ObjectStore, ObjectStoreError, ProgressSink, PutOpts, persist_temp,
152};
153
154/// Object-size cutoff above which `get_to_file` switches from a single
155/// streaming GET to parallel ranged GETs.
156pub(crate) const MULTIPART_THRESHOLD: u64 = 25 * 1024 * 1024;
157/// Range size for each ranged GET in the multipart download path.
158pub(crate) const MULTIPART_CHUNK_SIZE: u64 = 16 * 1024 * 1024;
159/// Maximum simultaneous in-flight ranged GETs in the multipart download path.
160pub(crate) const MULTIPART_MAX_CONCURRENCY: usize = 8;
161
162/// S3's hard ceiling on a single `PutObject` body. Reported in
163/// [`ObjectStoreError::PayloadTooLarge`] when the SDK surfaces
164/// `EntityTooLarge` (HTTP 400) or HTTP 413 so the wire-line names the
165/// number rather than dumping an opaque SDK chain.
166pub(crate) const SINGLE_PUT_LIMIT_BYTES: u64 = 5 * (1 << 30);
167
168/// Percent-encode set used for `x-amz-copy-source` keys: every non-
169/// alphanumeric ASCII byte except the path-structural and unreserved
170/// characters (`/`, `.`, `-`, `_`, `~`).
171const COPY_SOURCE_ENCODE: &AsciiSet = &CONTROLS
172 .add(b' ')
173 .add(b'!')
174 .add(b'"')
175 .add(b'#')
176 .add(b'$')
177 .add(b'%')
178 .add(b'&')
179 .add(b'\'')
180 .add(b'(')
181 .add(b')')
182 .add(b'*')
183 .add(b'+')
184 .add(b',')
185 .add(b':')
186 .add(b';')
187 .add(b'<')
188 .add(b'=')
189 .add(b'>')
190 .add(b'?')
191 .add(b'@')
192 .add(b'[')
193 .add(b'\\')
194 .add(b']')
195 .add(b'^')
196 .add(b'`')
197 .add(b'{')
198 .add(b'|')
199 .add(b'}');
200
201/// Bound on how long an idle pooled HTTPS connection lingers before
202/// the smithy connection pool drops it. Short enough that DNS rotation
203/// rarely hits a stale pooled connection; long enough that bursty
204/// fetch / push batches still benefit from connection reuse. See the
205/// module-level "HTTP transport tuning" docs and issue #26.
206pub(crate) const POOL_IDLE_TIMEOUT: Duration = Duration::from_secs(30);
207
208/// Timeout applied to every S3 GET, HEAD, LIST, and lock-write request.
209/// Catches a hot pooled connection that has gone silent (e.g. mid-LFS
210/// session when the server VIP rotates). Sized to match
211/// [`POOL_IDLE_TIMEOUT`] — both budgets are "give up and let the SDK
212/// retry pick a fresh connection" budgets.
213///
214/// Note: smithy's `read_timeout` resolves the HTTP connector future at
215/// "response-headers received." For uploads that includes the request
216/// body (the connector cannot resolve until the response arrives), so
217/// [`S3Store::put_body`] overrides the timeout per-operation to keep
218/// large bundle uploads from being cut off at 30 s. For downloads it
219/// is a time-to-first-byte bound only — body chunks streamed after the
220/// headers are not subject to `READ_TIMEOUT`. See the module-level
221/// transport docs and lesson #2 in `docs/development/lessons_learned.md`.
222pub(crate) const READ_TIMEOUT: Duration = Duration::from_secs(30);
223
224/// Per-operation timeout config applied to every S3 PUT (object upload).
225///
226/// `disable_read_timeout()` is the entire point of this helper: a
227/// regression that returns `TimeoutConfig::builder().build()` (all
228/// fields `Unset`) re-introduces issue #26 (large uploads aborted at
229/// 30 s). Pinned by `put_body_upload_override_disables_read_timeout`
230/// so the fix cannot silently revert.
231fn upload_timeout_config() -> TimeoutConfig {
232 TimeoutConfig::builder().disable_read_timeout().build()
233}
234
235/// Production [`ObjectStore`] backed by `aws-sdk-s3`.
236#[derive(Debug)]
237pub struct S3Store {
238 client: aws_sdk_s3::Client,
239 bucket: String,
240}
241
242/// The decisions extracted from a [`RemoteUrl::S3`] before they are
243/// fed into the `aws-sdk-s3` config builder. Factored out so unit
244/// tests can assert each decision without going through the SDK
245/// (whose getters vary across patch releases).
246#[derive(Debug, Clone, PartialEq, Eq)]
247pub(crate) struct ResolvedS3Config {
248 pub(crate) endpoint_url: Url,
249 pub(crate) region: Option<String>,
250 pub(crate) force_path_style: bool,
251 pub(crate) profile: Option<String>,
252}
253
254impl ResolvedS3Config {
255 pub(crate) fn from_url_parts(
256 endpoint: &Url,
257 addressing: S3Addressing,
258 profile: Option<&str>,
259 region_flag: Option<&str>,
260 ) -> Result<Self, ObjectStoreError> {
261 Ok(Self {
262 endpoint_url: normalize_endpoint(endpoint, addressing)?,
263 region: resolve_region(endpoint, region_flag),
264 force_path_style: matches!(addressing, S3Addressing::PathStyle),
265 profile: profile.map(str::to_owned),
266 })
267 }
268}
269
270impl S3Store {
271 /// Build an `S3Store` from a parsed [`RemoteUrl`].
272 ///
273 /// The [`RemoteUrl::S3::prefix`] field is intentionally **not**
274 /// consumed here; callers compose it into keys themselves per the
275 /// module-level docs.
276 ///
277 /// # Errors
278 ///
279 /// Returns [`ObjectStoreError::Other`] if `url` is not the S3 variant
280 /// or if the endpoint URL cannot be normalised for virtual-hosted
281 /// addressing.
282 pub async fn from_remote_url(url: &RemoteUrl) -> Result<Self, ObjectStoreError> {
283 let RemoteUrl::S3 {
284 endpoint,
285 bucket,
286 addressing,
287 flags,
288 ..
289 } = url
290 else {
291 return Err(ObjectStoreError::Other(
292 format!("S3Store::from_remote_url called with non-S3 URL: {url}").into(),
293 ));
294 };
295
296 let resolved = ResolvedS3Config::from_url_parts(
297 endpoint,
298 *addressing,
299 flags.profile.as_deref(),
300 flags.region.as_deref(),
301 )?;
302 let sdk_config = build_s3_config(&resolved).await;
303 let client = aws_sdk_s3::Client::from_conf(sdk_config);
304
305 Ok(Self {
306 client,
307 bucket: bucket.clone(),
308 })
309 }
310
311 /// Verify the bucket is reachable with the configured credentials by
312 /// issuing a single `ListObjectsV2` with `max_keys=1`. Used by
313 /// [`crate::protocol::backend::build`] to fold credential / missing-bucket /
314 /// authorization failures into categorical
315 /// [`crate::protocol::backend::BackendError`] variants before the
316 /// helper REPL runs its first command.
317 pub(crate) async fn probe(&self, prefix: &str) -> Result<(), ObjectStoreError> {
318 self.client
319 .list_objects_v2()
320 .bucket(&self.bucket)
321 .prefix(prefix)
322 .max_keys(1)
323 .send()
324 .await
325 .map_err(|e| classify(e, prefix))?;
326 Ok(())
327 }
328}
329
330/// Build the `aws-sdk-s3` config from a [`ResolvedS3Config`].
331///
332/// 1. Load the AWS SDK provider chain with `BehaviorVersion::latest()`.
333/// 2. Install a custom HTTP client with [`POOL_IDLE_TIMEOUT`] so DNS
334/// rotation does not wedge long-running sessions (#26).
335/// 3. Apply [`READ_TIMEOUT`] so a stuck GET/HEAD/LIST/lock request fails
336/// fast instead of waiting for the OS-level TCP retransmit timeout
337/// (#26). `connect_timeout` is left at the SDK default (3.1 s).
338/// `put_body` overrides this per-operation to allow large uploads.
339/// 4. Apply `endpoint_url`, `profile`, `region` from the resolved decisions.
340/// 5. Override `force_path_style` on the resulting `aws_sdk_s3::Config`.
341pub(crate) async fn build_s3_config(resolved: &ResolvedS3Config) -> aws_sdk_s3::Config {
342 let mut loader = aws_config::defaults(BehaviorVersion::latest())
343 .http_client(
344 aws_smithy_http_client::Builder::new()
345 .tls_provider(TlsProvider::Rustls(CryptoMode::AwsLc))
346 .pool_idle_timeout(POOL_IDLE_TIMEOUT)
347 .build_https(),
348 )
349 .timeout_config(TimeoutConfig::builder().read_timeout(READ_TIMEOUT).build())
350 .endpoint_url(resolved.endpoint_url.as_str());
351 if let Some(p) = &resolved.profile {
352 loader = loader.profile_name(p);
353 }
354 if let Some(r) = &resolved.region {
355 loader = loader.region(Region::new(r.clone()));
356 }
357 let sdk_config = loader.load().await;
358
359 aws_sdk_s3::config::Builder::from(&sdk_config)
360 .force_path_style(resolved.force_path_style)
361 .build()
362}
363
364/// Rewrite the parsed endpoint URL into the form `aws-sdk-s3` expects
365/// as `endpoint_url`: a base of `scheme://host[:port]` with **no path,
366/// query, or fragment**, and with any bucket label stripped from the
367/// host for virtual-hosted addressing.
368///
369/// The SDK rejects an `endpoint_url` that carries a query component
370/// (e.g. our `?addressing=...` flag) and adds the bucket itself —
371/// either as a path segment (`force_path_style(true)`) or as a host
372/// subdomain (`force_path_style(false)`) — so we must strip both
373/// before handing the URL off.
374pub(crate) fn normalize_endpoint(
375 endpoint: &Url,
376 addressing: S3Addressing,
377) -> Result<Url, ObjectStoreError> {
378 let mut rewritten = endpoint.clone();
379 rewritten.set_path("");
380 rewritten.set_query(None);
381 rewritten.set_fragment(None);
382
383 if matches!(addressing, S3Addressing::VirtualHosted) {
384 let host = rewritten
385 .host_str()
386 .ok_or_else(|| ObjectStoreError::Other("endpoint URL has no host".into()))?;
387 // Use `s3_virtual_hosted_bucket` (rightmost-infix rfind) to find the
388 // bucket label length, then strip `bucket.` from the front. This
389 // handles dotted bucket names like `bucketname.com.s3.region.amazonaws.com`
390 // correctly; a plain `split_once('.')` would stop at the first dot
391 // and leave `com.s3.…` as the endpoint instead of `s3.…`.
392 // For non-AWS virtual-hosted endpoints without the `.s3.` infix
393 // (e.g. MinIO with `bucket.minio.example.com`), fall back to
394 // stripping just the leftmost label.
395 let regional_host = s3_virtual_hosted_bucket(host)
396 // `s3_virtual_hosted_bucket` returns the bucket label (a strict
397 // prefix of `host` ending just before the `.s3.` or `.s3-`
398 // infix). The byte at `host[bucket.len()]` is always `.` (ASCII
399 // 0x2E), so slicing at `+ 1` is always a valid UTF-8 boundary.
400 .map(|bucket| host[bucket.len() + 1..].to_owned())
401 .or_else(|| host.split_once('.').map(|(_, rest)| rest.to_owned()))
402 .ok_or_else(|| {
403 ObjectStoreError::Other(
404 format!("virtual-hosted endpoint host `{host}` has no dot separator").into(),
405 )
406 })?;
407 rewritten
408 .set_host(Some(®ional_host))
409 .map_err(other_boxed)?;
410 }
411
412 Ok(rewritten)
413}
414
415/// Resolve the `SigV4` signing region.
416///
417/// Order: `?region=` flag → AWS hostname pattern → `us-east-1` default
418/// for non-AWS hosts → `None` for legacy AWS hosts that don't carry a
419/// region segment (the SDK provider chain takes over).
420pub(crate) fn resolve_region(endpoint: &Url, flag: Option<&str>) -> Option<String> {
421 if let Some(r) = flag {
422 return Some(r.to_owned());
423 }
424 let host = endpoint.host_str()?;
425 // Bare `amazonaws.com` is an AWS host with no leading content (no
426 // region segment), so it short-circuits to `None` like `s3.amazonaws.com`
427 // does — the SDK provider chain picks the region. Everything else
428 // that does not end in an AWS partition suffix is treated as a
429 // third-party S3-compatible endpoint and gets the safe `us-east-1`
430 // default.
431 if host == "amazonaws.com" {
432 return None;
433 }
434 let Some(trimmed) = strip_aws_host_suffix(host) else {
435 return Some("us-east-1".to_owned());
436 };
437 extract_aws_region(trimmed)
438}
439
440/// Parse the AWS region out of an AWS hostname's leading portion (the
441/// host with its [`AWS_HOST_SUFFIXES`] suffix already stripped).
442fn extract_aws_region(trimmed: &str) -> Option<String> {
443 // Patterns we accept (in priority order):
444 // s3 → legacy us-east-1 (no region segment) → None
445 // s3.<region> → path-style regional
446 // s3-<region> → legacy hyphenated form
447 // <bucket>.s3.<region> → simple virtual-hosted (single-label bucket)
448 // <dotted.bucket>.s3.<region> → dotted-bucket virtual-hosted (4+ labels)
449 let labels: Vec<&str> = trimmed.split('.').collect();
450 // The explicit arms below match a fixed label count (1, 2, or 3),
451 // which guarantees the captured `region` is a single dot-free label.
452 // Only the fallback arm operates on the unbounded "4+ labels" shape,
453 // where the captured region could in principle still contain a `.`
454 // (a malformed host); the dot-filter on that arm rejects those.
455 match labels.as_slice() {
456 ["s3"] => None,
457 ["s3", region] => Some((*region).to_owned()),
458 [_bucket, "s3", region] => Some((*region).to_owned()),
459 [head] if head.starts_with("s3-") => Some(head["s3-".len()..].to_owned()),
460 // Dotted-bucket virtual-hosted: find the rightmost service infix
461 // (.s3. or .s3-) and return the segment after it as the region.
462 // e.g. "bucketname.com.s3.us-west-2" → "us-west-2"
463 // Use max by byte position so we pick the rightmost infix when
464 // both `.s3.` and `.s3-` appear in the host.
465 _ => AWS_S3_INFIXES
466 .iter()
467 .filter_map(|infix| {
468 trimmed
469 .rfind(infix)
470 .map(|idx| (idx, trimmed[idx + infix.len()..].to_owned()))
471 })
472 .max_by_key(|(idx, _)| *idx)
473 .map(|(_, region)| region)
474 .filter(|region| !region.is_empty() && !region.contains('.')),
475 }
476}
477
478/// Plan inclusive RFC 7233 byte ranges for a parallel ranged-GET download.
479///
480/// `size = 0` → empty vec (caller writes a zero-length file directly).
481/// Otherwise: full chunks of `chunk_size` bytes, with the final range
482/// covering whatever remainder is left (`(N*chunk, size-1)`).
483pub(crate) fn plan_ranges(size: u64, chunk_size: u64) -> Vec<(u64, u64)> {
484 if size == 0 || chunk_size == 0 {
485 return Vec::new();
486 }
487 let mut ranges = Vec::new();
488 let mut start = 0u64;
489 while start < size {
490 let end = (start + chunk_size - 1).min(size - 1);
491 ranges.push((start, end));
492 start = end + 1;
493 }
494 ranges
495}
496
497/// Encode a `<bucket>/<key>` pair for the `x-amz-copy-source` header.
498///
499/// `aws-sdk-s3` 1.x forwards `copy_source` verbatim; we have to encode
500/// reserved characters (notably `#` in `LOCK#.lock`) ourselves.
501pub(crate) fn encode_copy_source(bucket: &str, key: &str) -> String {
502 let bucket_enc = utf8_percent_encode(bucket, COPY_SOURCE_ENCODE);
503 let key_enc = utf8_percent_encode(key, COPY_SOURCE_ENCODE);
504 format!("{bucket_enc}/{key_enc}")
505}
506
507/// Map a typed `aws-sdk-s3` error into the trait's [`ObjectStoreError`] enum.
508///
509/// `key` is the operation's key/prefix context — it appears in the
510/// resulting [`ObjectStoreError::NotFound`] / [`ObjectStoreError::AccessDenied`] /
511/// [`ObjectStoreError::PreconditionFailed`] / [`ObjectStoreError::Conflict`] payload.
512///
513/// Note that this also covers typed `NotFound` / `NoSuchKey` variants
514/// the SDK constructs from 404 responses: those carry HTTP 404 on
515/// `svc.raw().status()` and so route through the status-based branch
516/// of [`classify_status_and_code`].
517fn classify<E>(err: SdkError<E>, key: &str) -> ObjectStoreError
518where
519 E: std::error::Error + Send + Sync + 'static + ProvideErrorMetadata,
520{
521 if let SdkError::ServiceError(svc) = &err {
522 let status = svc.raw().status().as_u16();
523 let code = svc.err().code();
524 if let Some(mapped) = classify_status_and_code(status, code, key) {
525 return mapped;
526 }
527 }
528 match &err {
529 SdkError::DispatchFailure(_) | SdkError::TimeoutError(_) => network_boxed(err),
530 _ => other_boxed(err),
531 }
532}
533
534/// Convert a single [`aws_sdk_s3::types::Object`] from a
535/// `ListObjectsV2` page into the trait's [`ObjectMeta`].
536///
537/// Extracted so unit tests can drive the missing-key and
538/// missing-last-modified guard branches via `Object`'s builder
539/// without synthesising a full `ListObjectsV2Output`.
540pub(crate) fn object_to_meta(
541 obj: &aws_sdk_s3::types::Object,
542) -> Result<ObjectMeta, ObjectStoreError> {
543 let key = obj
544 .key()
545 .ok_or_else(|| {
546 ObjectStoreError::Other("list_objects_v2 returned an object without a key".into())
547 })?
548 .to_owned();
549 let size = u64::try_from(obj.size().unwrap_or(0)).unwrap_or(0);
550 let last_modified = obj
551 .last_modified()
552 .ok_or_else(|| {
553 ObjectStoreError::Other(
554 format!("list_objects_v2 returned object `{key}` without last_modified").into(),
555 )
556 })?
557 .to_time()
558 .map_err(other_boxed)?;
559 Ok(ObjectMeta {
560 key,
561 size,
562 last_modified,
563 // ListObjectsV2 does return ETags, but they are not consumed
564 // by any current caller; keep `None` to avoid inflating the
565 // per-object metadata for list results.
566 etag: None,
567 })
568}
569
570/// Convert a [`HeadObject`] response's relevant fields into the trait's
571/// [`ObjectMeta`].
572///
573/// Extracted so unit tests can drive the missing-content-length and
574/// missing-last-modified guard branches without standing up a live S3
575/// or constructing a full `HeadObjectOutput` (whose builder is not
576/// trivially mockable).
577///
578/// A missing `Content-Length` is an error rather than silent zero: a
579/// 0-byte size is semantically meaningful in this codebase (lock
580/// files are intentionally empty) and downstream `get_to_file` takes
581/// a fast path on `size == 0` that writes an empty destination file.
582/// Treating "header absent" as 0 would silently produce empty bundles
583/// instead of surfacing the malformed response. Every backend HEAD
584/// must yield `Content-Length`.
585pub(crate) fn head_output_to_meta(
586 key: &str,
587 content_length: Option<i64>,
588 last_modified: Option<&aws_sdk_s3::primitives::DateTime>,
589 etag: Option<&str>,
590) -> Result<ObjectMeta, ObjectStoreError> {
591 let raw_size = content_length.ok_or_else(|| {
592 ObjectStoreError::Other(format!("head_object on `{key}` returned no content-length").into())
593 })?;
594 // `i64` is the SDK's wire type; clamp a (legally impossible) negative
595 // value to 0 rather than wrap to a huge u64. Mirrors `object_to_meta`.
596 let size = u64::try_from(raw_size).unwrap_or(0);
597 let last_modified = last_modified
598 .ok_or_else(|| {
599 ObjectStoreError::Other(
600 format!("head_object on `{key}` returned no last_modified").into(),
601 )
602 })?
603 .to_time()
604 .map_err(other_boxed)?;
605 Ok(ObjectMeta {
606 key: key.to_owned(),
607 size,
608 last_modified,
609 etag: etag.map(str::to_owned),
610 })
611}
612
613/// Pure classifier core (no `SdkError` involvement) so unit tests can
614/// exercise every branch without synthesising SDK error types.
615fn classify_status_and_code(
616 status: u16,
617 code: Option<&str>,
618 key: &str,
619) -> Option<ObjectStoreError> {
620 match status {
621 404 => return Some(ObjectStoreError::NotFound(key.to_owned())),
622 403 => return Some(ObjectStoreError::AccessDenied(key.to_owned())),
623 412 => return Some(ObjectStoreError::PreconditionFailed(key.to_owned())),
624 409 => return Some(ObjectStoreError::Conflict(key.to_owned())),
625 // S3 occasionally surfaces HTTP 413 directly (front-door / proxy
626 // path); the canonical EntityTooLarge response is HTTP 400, but
627 // the status mapping is the same regardless of the SDK code.
628 413 => {
629 return Some(ObjectStoreError::PayloadTooLarge {
630 limit_bytes: SINGLE_PUT_LIMIT_BYTES,
631 });
632 }
633 _ => {}
634 }
635 match code {
636 Some("NoSuchKey" | "NoSuchBucket" | "NotFound") => {
637 Some(ObjectStoreError::NotFound(key.to_owned()))
638 }
639 Some("AccessDenied") => Some(ObjectStoreError::AccessDenied(key.to_owned())),
640 Some("PreconditionFailed") => Some(ObjectStoreError::PreconditionFailed(key.to_owned())),
641 Some("ConditionalRequestConflict") => Some(ObjectStoreError::Conflict(key.to_owned())),
642 // S3 returns HTTP 400 + `EntityTooLarge` when a single-PUT body
643 // exceeds 5 GiB. The status alone is too broad to hang
644 // `PayloadTooLarge` on, so route via the code.
645 Some("EntityTooLarge") => Some(ObjectStoreError::PayloadTooLarge {
646 limit_bytes: SINGLE_PUT_LIMIT_BYTES,
647 }),
648 _ => None,
649 }
650}
651
652#[async_trait::async_trait]
653impl ObjectStore for S3Store {
654 async fn list(&self, prefix: &str) -> Result<Vec<ObjectMeta>, ObjectStoreError> {
655 let mut out = Vec::new();
656 let mut token: Option<String> = None;
657 loop {
658 let resp = self
659 .client
660 .list_objects_v2()
661 .bucket(&self.bucket)
662 .prefix(prefix)
663 .set_continuation_token(token.take())
664 .send()
665 .await
666 .map_err(|e| classify(e, prefix))?;
667
668 out.reserve(resp.contents().len());
669 for obj in resp.contents() {
670 out.push(object_to_meta(obj)?);
671 }
672
673 if !resp.is_truncated().unwrap_or(false) {
674 break;
675 }
676 // Defensive: a server that signals truncated but omits the
677 // continuation token would loop forever.
678 match resp.next_continuation_token() {
679 Some(t) => token = Some(t.to_owned()),
680 None => break,
681 }
682 }
683 Ok(out)
684 }
685
686 async fn get_to_file(
687 &self,
688 key: &str,
689 dest: &Path,
690 opts: GetOpts,
691 ) -> Result<(), ObjectStoreError> {
692 let parent = dest.parent().ok_or_else(|| {
693 ObjectStoreError::Other(
694 format!("destination `{}` has no parent directory", dest.display()).into(),
695 )
696 })?;
697
698 // Mirror Azure: try once, retry once on 412 (the head→GET race).
699 // After the second attempt any error — including a repeated 412 —
700 // propagates. Encoding retry as an explicit second call keeps every
701 // control-flow path returning a value, so no `unreachable!` is
702 // required.
703 let progress = opts.progress.as_ref();
704 match self.head_then_download(key, dest, parent, progress).await {
705 Err(ObjectStoreError::PreconditionFailed(_)) => {
706 tracing::warn!(key, "object changed between head and GET; retrying");
707 self.head_then_download(key, dest, parent, progress).await
708 }
709 other => other,
710 }
711 }
712
713 async fn get_bytes(&self, key: &str) -> Result<Bytes, ObjectStoreError> {
714 let resp = self
715 .client
716 .get_object()
717 .bucket(&self.bucket)
718 .key(key)
719 .send()
720 .await
721 .map_err(|e| classify(e, key))?;
722 let aggregated = resp.body.collect().await.map_err(network_boxed)?;
723 Ok(aggregated.into_bytes())
724 }
725
726 /// Issue a `GetObject` with a `Range: bytes=<start>-<end-1>` header.
727 /// HTTP 416 maps to [`ObjectStoreError::RangeNotSatisfiable`] with
728 /// the original requested range (so the wire-line names what the
729 /// caller asked for, not the server's translation). All other
730 /// failures route through [`classify`].
731 ///
732 /// S3 silently truncates a ranged GET to EOF when the requested
733 /// range overruns the object — `start < body.len() <= end` returns
734 /// `start..body.len()` bytes with HTTP 206 and no error. The
735 /// post-flight length check via [`super::verify_range_response_length`]
736 /// elevates that mismatch to [`ObjectStoreError::RangeNotSatisfiable`]
737 /// so callers (notably the packchain reader) cannot mistake a
738 /// truncated slice for the full requested range.
739 async fn get_bytes_range(
740 &self,
741 key: &str,
742 range: std::ops::Range<u64>,
743 ) -> Result<Bytes, ObjectStoreError> {
744 if let Some(empty) = super::precheck_range(key, &range)? {
745 return Ok(empty);
746 }
747 // Inclusive end byte for the HTTP `Range` header (RFC 7233).
748 let inclusive_end = range.end - 1;
749 let result = self
750 .client
751 .get_object()
752 .bucket(&self.bucket)
753 .key(key)
754 .range(format!("bytes={}-{}", range.start, inclusive_end))
755 .send()
756 .await;
757 let resp = match result {
758 Ok(resp) => resp,
759 Err(err) => {
760 if let SdkError::ServiceError(svc) = &err
761 && svc.raw().status().as_u16() == 416
762 {
763 return Err(ObjectStoreError::RangeNotSatisfiable {
764 key: key.to_owned(),
765 requested: range,
766 });
767 }
768 return Err(classify(err, key));
769 }
770 };
771 let aggregated = resp.body.collect().await.map_err(network_boxed)?;
772 super::verify_range_response_length(key, &range, aggregated.into_bytes())
773 }
774
775 async fn put_bytes(
776 &self,
777 key: &str,
778 body: Bytes,
779 opts: PutOpts,
780 ) -> Result<(), ObjectStoreError> {
781 // PutObject rejects bodies > 5 GiB; above [`MULTIPART_PUT_THRESHOLD`]
782 // we hand off to the multipart path which lifts that ceiling and
783 // emits per-part progress events. Below the threshold we keep the
784 // single round trip (no `CreateMultipartUpload` cost). Issue #53.
785 let size = body.len() as u64;
786 if should_use_multipart(size) {
787 return self.multipart_put_bytes(key, body, size, opts).await;
788 }
789 let progress = opts.progress.clone();
790 self.put_body(key, ByteStream::from(body), opts).await?;
791 if let Some(sink) = progress
792 && size > 0
793 {
794 sink.report(size);
795 }
796 Ok(())
797 }
798
799 async fn put_path(&self, key: &str, src: &Path, opts: PutOpts) -> Result<(), ObjectStoreError> {
800 // Open the file once and read size from the open handle. This
801 // closes the metadata/upload race that would let a concurrent
802 // truncate or rename produce a body whose length disagrees
803 // with the size we used for multipart planning. The size-based
804 // multipart dispatch (issue #53) and the post-transfer
805 // progress event both consume the same `body_len`.
806 let file = tokio::fs::File::open(src).await.map_err(other_boxed)?;
807 let body_len = file.metadata().await.map_err(other_boxed)?.len();
808 if should_use_multipart(body_len) {
809 return self.multipart_put_path(key, file, body_len, opts).await;
810 }
811 // Below the threshold: single PutObject. `FsBuilder::file`
812 // consumes our already-open handle so the SDK does not
813 // re-open by path (which would re-introduce the race).
814 let stream = ByteStream::read_from()
815 .file(file)
816 .length(Length::Exact(body_len))
817 .build()
818 .await
819 .map_err(other_boxed)?;
820 let progress = opts.progress.clone();
821 self.put_body(key, stream, opts).await?;
822 if let Some(sink) = progress
823 && body_len > 0
824 {
825 sink.report(body_len);
826 }
827 Ok(())
828 }
829
830 async fn put_if_absent(&self, key: &str, body: Bytes) -> Result<bool, ObjectStoreError> {
831 let resp = self
832 .client
833 .put_object()
834 .bucket(&self.bucket)
835 .key(key)
836 .if_none_match("*")
837 .body(ByteStream::from(body))
838 .send()
839 .await;
840 match resp.map_err(|e| classify(e, key)) {
841 Ok(_) => Ok(true),
842 Err(ObjectStoreError::PreconditionFailed(_) | ObjectStoreError::Conflict(_)) => {
843 Ok(false)
844 }
845 Err(other) => Err(other),
846 }
847 }
848
849 async fn head(&self, key: &str) -> Result<ObjectMeta, ObjectStoreError> {
850 let resp = self
851 .client
852 .head_object()
853 .bucket(&self.bucket)
854 .key(key)
855 .send()
856 .await
857 .map_err(|e| classify(e, key))?;
858 head_output_to_meta(
859 key,
860 resp.content_length(),
861 resp.last_modified(),
862 resp.e_tag(),
863 )
864 }
865
866 async fn copy(&self, src: &str, dst: &str) -> Result<(), ObjectStoreError> {
867 // `CopyObject` rejects bodies > 5 GiB; above the multipart
868 // threshold we HEAD `src` and use `UploadPartCopy` per part
869 // (issue #53). The HEAD adds one round trip for small copies,
870 // but the only production caller (`Doctor::evict_losing_bundle`)
871 // is a quarantine path on bundles that can be multi-GiB —
872 // paying one HEAD to learn whether to multipart is worth it.
873 //
874 // The HEAD also yields the source `ETag`, which we pass as
875 // `x-amz-copy-source-if-match` on every subsequent
876 // `CopyObject` / `UploadPartCopy`. Without it, a source
877 // mutation between HEAD and copy would silently produce a
878 // destination whose bytes are a mix of the pre- and post-
879 // mutation source. With it, S3 returns 412
880 // (`PreconditionFailed`) and the caller can retry. Azure's
881 // `copy()` already has this property because it routes
882 // through `head_then_download`'s 412 retry.
883 let meta = self.head(src).await?;
884 if should_use_multipart(meta.size) {
885 return self
886 .multipart_copy(src, dst, meta.size, meta.etag.as_deref())
887 .await;
888 }
889 let copy_source = encode_copy_source(&self.bucket, src);
890 // `MetadataDirective::Replace` makes S3 consistent with the Azure
891 // backend (which drops metadata on copy via download-then-reupload):
892 // neither backend preserves user metadata, matching the trait
893 // contract in `ObjectStore::copy`.
894 // Pass `src` as the key context so a 404 surfaces as
895 // `NotFound(src)` — that's what the trait promises.
896 let mut req = self
897 .client
898 .copy_object()
899 .bucket(&self.bucket)
900 .key(dst)
901 .copy_source(copy_source)
902 .metadata_directive(MetadataDirective::Replace);
903 if let Some(etag) = meta.etag.as_deref() {
904 req = req.copy_source_if_match(etag);
905 }
906 req.send().await.map_err(|e| classify(e, src))?;
907 Ok(())
908 }
909
910 async fn delete(&self, key: &str) -> Result<(), ObjectStoreError> {
911 // S3 DeleteObject is idempotent (returns 204 even for missing
912 // keys), but the trait contract demands `Err(NotFound)` on a
913 // missing key — so HEAD first. Concurrent deletion between this
914 // HEAD and the DELETE will return Ok rather than NotFound;
915 // semantically acceptable since the key existed at some point
916 // during the call.
917 self.head(key).await?;
918 self.client
919 .delete_object()
920 .bucket(&self.bucket)
921 .key(key)
922 .send()
923 .await
924 .map_err(|e| classify(e, key))?;
925 Ok(())
926 }
927
928 /// Presign a `GetObject` request for `key` valid for `ttl`. Used
929 /// by the `bundle-uri` capability (issue #76) to advertise
930 /// time-limited download URLs against private buckets. The
931 /// returned URL carries an `X-Amz-Signature` query parameter
932 /// derived from the SDK's resolved `SigV4` credentials and an
933 /// `X-Amz-Expires=<ttl-seconds>` parameter that the operator
934 /// can use to verify the requested TTL was honoured.
935 ///
936 /// # Errors
937 ///
938 /// Returns [`ObjectStoreError::Other`] when the SDK rejects the
939 /// TTL (AWS caps presigned URLs at 7 days) or when the
940 /// presigning step fails (e.g. credential provider returned no
941 /// credentials).
942 async fn presigned_get_url(
943 &self,
944 key: &str,
945 ttl: std::time::Duration,
946 ) -> Result<String, ObjectStoreError> {
947 let config = aws_sdk_s3::presigning::PresigningConfig::expires_in(ttl).map_err(|e| {
948 ObjectStoreError::Other(format!("PresigningConfig::expires_in({ttl:?}): {e}").into())
949 })?;
950 let presigned = self
951 .client
952 .get_object()
953 .bucket(&self.bucket)
954 .key(key)
955 .presigned(config)
956 .await
957 .map_err(|e| classify(e, key))?;
958 Ok(presigned.uri().to_owned())
959 }
960}
961
962impl S3Store {
963 /// One head→tempfile→download→persist round trip.
964 ///
965 /// Factored out so [`get_to_file`](ObjectStore::get_to_file) can invoke
966 /// it twice: once normally, once more on a 412 retry. Mirrors
967 /// `AzureStore::head_then_download` so both backends share the same
968 /// retry shape.
969 async fn head_then_download(
970 &self,
971 key: &str,
972 dest: &Path,
973 parent: &Path,
974 progress: Option<&ProgressSink>,
975 ) -> Result<(), ObjectStoreError> {
976 let meta = self.head(key).await?;
977 let temp = NamedTempFile::new_in(parent).map_err(other_boxed)?;
978 if meta.size == 0 {
979 // Skip the GET entirely for zero-byte objects (lock files):
980 // `download_single` would issue a plain GET for an empty body
981 // and `download_multipart` would set_len(0) with no ranges —
982 // both correct but a wasted round trip and a wasted file
983 // open, respectively.
984 return persist_temp(temp, dest);
985 }
986
987 if meta.size <= MULTIPART_THRESHOLD {
988 self.download_single(key, temp.path(), meta.etag.as_deref(), progress)
989 .await?;
990 } else {
991 self.download_multipart(key, temp.path(), meta.size, meta.etag.as_deref(), progress)
992 .await?;
993 }
994 persist_temp(temp, dest)
995 }
996
997 /// Common upload helper: sends the given [`ByteStream`] to S3 with
998 /// optional `Content-Disposition` and user metadata from [`PutOpts`].
999 /// Shared by `put_bytes` (in-memory) and `put_path` (streamed from
1000 /// disk).
1001 async fn put_body(
1002 &self,
1003 key: &str,
1004 body: ByteStream,
1005 opts: PutOpts,
1006 ) -> Result<(), ObjectStoreError> {
1007 let mut req = self
1008 .client
1009 .put_object()
1010 .bucket(&self.bucket)
1011 .key(key)
1012 .body(body);
1013 if let Some(cd) = &opts.content_disposition {
1014 req = req.content_disposition(cd);
1015 }
1016 for (k, v) in &opts.user_metadata {
1017 // S3 lowercases user-metadata keys on storage and limits the
1018 // combined header set to ~2 KB; ASCII only (RFC 2047 encode
1019 // non-ASCII upstream).
1020 req = req.metadata(k, v);
1021 }
1022 // Disable read_timeout for this operation: smithy's read_timeout
1023 // resolves the HTTP connector future at "response-headers received,"
1024 // which for uploads includes the entire request body upload. The
1025 // global READ_TIMEOUT (30 s) would otherwise abort any bundle
1026 // upload that takes longer than 30 s. GET/HEAD/LIST operations keep
1027 // the timeout via the client-level config; uploads opt out here.
1028 //
1029 // Caveat: smithy's `MergeTimeoutConfig::merge_iter` treats an
1030 // override whose `has_timeouts()` is false (no field in `Set` state
1031 // — only `Disabled` and `Unset` count as "no timeouts") as a no-op
1032 // and skips inheriting from the client-level config. So this
1033 // override does NOT inherit `connect_timeout` (or any future
1034 // client-level timeout) from the SDK's config. That is fine for the
1035 // current use case — the only timeout we configure at the client
1036 // level is `read_timeout`, which we explicitly want to disable —
1037 // but a future client-level `connect_timeout` would have to be
1038 // duplicated here to take effect on uploads.
1039 req.customize()
1040 .config_override(
1041 aws_sdk_s3::config::Builder::new().timeout_config(upload_timeout_config()),
1042 )
1043 .send()
1044 .await
1045 .map_err(|e| classify(e, key))?;
1046 Ok(())
1047 }
1048
1049 /// Stream a small (<= [`MULTIPART_THRESHOLD`]) object directly to the
1050 /// temp-file path. Caller is responsible for `persist`-ing the file.
1051 ///
1052 /// When `etag` is `Some`, the request carries `If-Match` so S3
1053 /// returns 412 if the object was overwritten since the `head` call.
1054 /// When `progress` is `Some`, fires once per SDK body chunk read
1055 /// off the wire — chunk sizes follow the SDK's internal aggregation
1056 /// (typically 1 MiB-ish for HTTPS).
1057 async fn download_single(
1058 &self,
1059 key: &str,
1060 temp_path: &Path,
1061 etag: Option<&str>,
1062 progress: Option<&ProgressSink>,
1063 ) -> Result<(), ObjectStoreError> {
1064 let mut req = self.client.get_object().bucket(&self.bucket).key(key);
1065 if let Some(etag) = etag {
1066 req = req.if_match(etag);
1067 }
1068 let mut resp = req.send().await.map_err(|e| classify(e, key))?;
1069
1070 let mut file = tokio::fs::OpenOptions::new()
1071 .write(true)
1072 .truncate(true)
1073 .open(temp_path)
1074 .await
1075 .map_err(other_boxed)?;
1076
1077 while let Some(chunk) = resp.body.next().await {
1078 let bytes = chunk.map_err(network_boxed)?;
1079 let chunk_len = bytes.len() as u64;
1080 file.write_all(&bytes).await.map_err(other_boxed)?;
1081 if let Some(sink) = progress
1082 && chunk_len > 0
1083 {
1084 sink.report(chunk_len);
1085 }
1086 }
1087 file.flush().await.map_err(other_boxed)?;
1088 Ok(())
1089 }
1090
1091 /// Download a large object via parallel ranged GETs, writing each
1092 /// range at its absolute offset into the pre-allocated temp file.
1093 ///
1094 /// When `etag` is `Some`, every ranged GET carries `If-Match` so
1095 /// S3 returns 412 if the object is overwritten mid-download. When
1096 /// `progress` is `Some`, fires once per completed range with the
1097 /// range's byte count — events arrive out of order, matching the
1098 /// concurrent-GET schedule, but cumulative bytes equal `size` after
1099 /// the last event.
1100 async fn download_multipart(
1101 &self,
1102 key: &str,
1103 temp_path: &Path,
1104 size: u64,
1105 etag: Option<&str>,
1106 progress: Option<&ProgressSink>,
1107 ) -> Result<(), ObjectStoreError> {
1108 let async_file = tokio::fs::OpenOptions::new()
1109 .write(true)
1110 .truncate(false)
1111 .open(temp_path)
1112 .await
1113 .map_err(other_boxed)?;
1114 async_file.set_len(size).await.map_err(other_boxed)?;
1115
1116 let file = Arc::new(Mutex::new(async_file));
1117 let semaphore = Arc::new(Semaphore::new(MULTIPART_MAX_CONCURRENCY));
1118 let mut tasks: JoinSet<Result<(), ObjectStoreError>> = JoinSet::new();
1119
1120 let etag_owned = etag.map(str::to_owned);
1121 let progress_owned = progress.cloned();
1122 for (start, end) in plan_ranges(size, MULTIPART_CHUNK_SIZE) {
1123 let client = self.client.clone();
1124 let bucket = self.bucket.clone();
1125 let key = key.to_owned();
1126 let etag = etag_owned.clone();
1127 let file = Arc::clone(&file);
1128 let semaphore = Arc::clone(&semaphore);
1129 let progress = progress_owned.clone();
1130 tasks.spawn(async move {
1131 let _permit = semaphore.acquire_owned().await.map_err(other_boxed)?;
1132 let mut req = client
1133 .get_object()
1134 .bucket(&bucket)
1135 .key(&key)
1136 .range(format!("bytes={start}-{end}"));
1137 if let Some(etag) = &etag {
1138 req = req.if_match(etag);
1139 }
1140 let resp = req.send().await.map_err(|e| classify(e, &key))?;
1141 let bytes = resp
1142 .body
1143 .collect()
1144 .await
1145 .map_err(network_boxed)?
1146 .into_bytes();
1147 let expected = end - start + 1;
1148 if bytes.len() as u64 != expected {
1149 return Err(ObjectStoreError::Other(
1150 format!(
1151 "range bytes={start}-{end} returned {} bytes, expected {expected}",
1152 bytes.len()
1153 )
1154 .into(),
1155 ));
1156 }
1157 let chunk_len = bytes.len() as u64;
1158 let mut f = file.lock().await;
1159 f.seek(SeekFrom::Start(start)).await.map_err(other_boxed)?;
1160 f.write_all(&bytes).await.map_err(other_boxed)?;
1161 drop(f);
1162 if let Some(sink) = &progress {
1163 sink.report(chunk_len);
1164 }
1165 Ok(())
1166 });
1167 }
1168
1169 while let Some(joined) = tasks.join_next().await {
1170 joined.map_err(other_boxed)??;
1171 }
1172
1173 // All spawned tasks have been joined above — each task's
1174 // captured `Arc` clone was dropped when its closure
1175 // completed, so this is the only outstanding reference. If
1176 // some future refactor accidentally leaks a clone, surface a
1177 // structured error rather than aborting the process: flush via
1178 // the `Mutex` instead of taking sole ownership.
1179 match Arc::try_unwrap(file) {
1180 Ok(mutex) => {
1181 let mut f = mutex.into_inner();
1182 f.flush().await.map_err(other_boxed)?;
1183 }
1184 Err(shared) => {
1185 let mut f = shared.lock().await;
1186 f.flush().await.map_err(other_boxed)?;
1187 }
1188 }
1189 Ok(())
1190 }
1191
1192 /// Drive a multipart upload from a fully-buffered `Bytes` body.
1193 ///
1194 /// `Bytes::slice` is zero-copy — every part borrows into the same
1195 /// underlying allocation, so peak memory equals the caller's body
1196 /// rather than `body × parts`.
1197 async fn multipart_put_bytes(
1198 &self,
1199 key: &str,
1200 body: Bytes,
1201 size: u64,
1202 opts: PutOpts,
1203 ) -> Result<(), ObjectStoreError> {
1204 let parts = plan_upload_parts(size, MULTIPART_PUT_PART_SIZE, S3_MAX_PARTS);
1205 let guard = self.start_multipart_upload(key, &opts).await?;
1206 let progress = opts.progress.clone();
1207 let result = self
1208 .upload_parts_with_bodies(key, guard.upload_id(), &parts, progress, |part| {
1209 slice_bytes_part(&body, part)
1210 })
1211 .await;
1212 self.finish_multipart_upload(guard, result).await
1213 }
1214
1215 /// Drive a multipart upload by streaming a local file part-by-part.
1216 ///
1217 /// All tasks share one `Arc<std::fs::File>`; per-task
1218 /// `read_file_part` uses `pread` so reads are concurrent without
1219 /// offset contention. Sharing one open file description closes
1220 /// the metadata/upload race that would otherwise let a
1221 /// concurrent rename or truncate produce parts with inconsistent
1222 /// content. With `MULTIPART_PUT_MAX_CONCURRENCY = 8` and
1223 /// `MULTIPART_PUT_PART_SIZE = 16 MiB`, peak memory is bounded at
1224 /// 128 MiB regardless of file size — acceptable for LFS uploads.
1225 async fn multipart_put_path(
1226 &self,
1227 key: &str,
1228 file: tokio::fs::File,
1229 size: u64,
1230 opts: PutOpts,
1231 ) -> Result<(), ObjectStoreError> {
1232 let parts = plan_upload_parts(size, MULTIPART_PUT_PART_SIZE, S3_MAX_PARTS);
1233 let guard = self.start_multipart_upload(key, &opts).await?;
1234 let progress = opts.progress.clone();
1235 let file: Arc<std::fs::File> = Arc::new(file.into_std().await);
1236 let result = self
1237 .upload_parts_from_file(key, guard.upload_id(), file, &parts, progress)
1238 .await;
1239 self.finish_multipart_upload(guard, result).await
1240 }
1241
1242 /// Drive a multipart server-side copy via `UploadPartCopy`.
1243 ///
1244 /// Each part issues an `UploadPartCopy` request with
1245 /// `x-amz-copy-source` (the bucket+key, percent-encoded) and
1246 /// `x-amz-copy-source-range: bytes=<start>-<end>` so the copy
1247 /// runs entirely server-side — no body crosses the wire.
1248 /// The destination object's metadata starts empty (matching the
1249 /// trait contract: copy drops user metadata) because
1250 /// `CreateMultipartUpload` is invoked without any metadata.
1251 ///
1252 /// `src_etag`, if `Some`, is forwarded as
1253 /// `x-amz-copy-source-if-match` on every part so a mid-copy
1254 /// source mutation surfaces as `PreconditionFailed` rather than
1255 /// silently producing a destination with mixed pre/post-mutation
1256 /// bytes.
1257 async fn multipart_copy(
1258 &self,
1259 src: &str,
1260 dst: &str,
1261 size: u64,
1262 src_etag: Option<&str>,
1263 ) -> Result<(), ObjectStoreError> {
1264 let parts = plan_upload_parts(size, MULTIPART_PUT_PART_SIZE, S3_MAX_PARTS);
1265 // `copy()` uses default opts (no progress, no metadata) so the
1266 // create-call below also carries no metadata. That preserves
1267 // the "copy drops user metadata" contract.
1268 let guard = self
1269 .start_multipart_upload(dst, &PutOpts::default())
1270 .await?;
1271 let copy_source = encode_copy_source(&self.bucket, src);
1272 let result = self
1273 .upload_parts_via_copy(src, dst, guard.upload_id(), ©_source, src_etag, &parts)
1274 .await;
1275 // The upload_id lives under `dst` even though the bytes come
1276 // from `src`; the guard already carries `dst` as its key.
1277 // Errors mid-copy are reported with `src` as context inside
1278 // the per-part task (matches single-call `copy()` at the trait
1279 // surface).
1280 self.finish_multipart_upload(guard, result).await
1281 }
1282
1283 /// Begin a multipart upload, returning a guard that owns the
1284 /// upload-id and aborts the upload on drop.
1285 ///
1286 /// `content_disposition` and `user_metadata` from `PutOpts` flow
1287 /// onto `CreateMultipartUpload` — the destination object inherits
1288 /// them on `CompleteMultipartUpload` (`UploadPart`/`UploadPartCopy`
1289 /// have no metadata fields of their own).
1290 ///
1291 /// The returned [`MultipartUploadGuard`] keeps the upload-id alive
1292 /// for the duration of the upload. If the calling future is
1293 /// dropped (cancelled, panicked, or the caller picks the other arm
1294 /// of a `select!`) before [`finish_multipart_upload`] runs, the
1295 /// guard's [`Drop`] best-effort dispatches `AbortMultipartUpload`
1296 /// so the upload-id is reclaimed and the caller is not billed for
1297 /// orphaned parts (issues #169, #171). S3 retains uncompleted
1298 /// multipart uploads indefinitely without an explicit lifecycle
1299 /// rule; Azure has no equivalent need (uncommitted blocks auto-
1300 /// expire after seven days).
1301 ///
1302 /// [`finish_multipart_upload`]: Self::finish_multipart_upload
1303 async fn start_multipart_upload(
1304 &self,
1305 key: &str,
1306 opts: &PutOpts,
1307 ) -> Result<MultipartUploadGuard, ObjectStoreError> {
1308 let mut req = self
1309 .client
1310 .create_multipart_upload()
1311 .bucket(&self.bucket)
1312 .key(key);
1313 if let Some(cd) = &opts.content_disposition {
1314 req = req.content_disposition(cd);
1315 }
1316 for (k, v) in &opts.user_metadata {
1317 req = req.metadata(k, v);
1318 }
1319 let resp = req.send().await.map_err(|e| classify(e, key))?;
1320 let upload_id = resp.upload_id().map(str::to_owned).ok_or_else(|| {
1321 ObjectStoreError::Other(
1322 format!("CreateMultipartUpload for `{key}` returned no upload-id").into(),
1323 )
1324 })?;
1325 Ok(MultipartUploadGuard::new(
1326 self.client.clone(),
1327 self.bucket.clone(),
1328 key.to_owned(),
1329 upload_id,
1330 ))
1331 }
1332
1333 /// Spawn parallel `UploadPart` tasks, one per planned part, with
1334 /// the part body produced by `make_body(part) -> Bytes`. Used by
1335 /// `multipart_put_bytes` (slice from in-memory `Bytes`).
1336 async fn upload_parts_with_bodies<F>(
1337 &self,
1338 key: &str,
1339 upload_id: &str,
1340 parts: &[UploadPart],
1341 progress: Option<ProgressSink>,
1342 make_body: F,
1343 ) -> Result<Vec<CompletedPart>, ObjectStoreError>
1344 where
1345 F: Fn(UploadPart) -> Result<Bytes, ObjectStoreError>,
1346 {
1347 let semaphore = Arc::new(Semaphore::new(MULTIPART_PUT_MAX_CONCURRENCY));
1348 let mut tasks: JoinSet<Result<CompletedPart, ObjectStoreError>> = JoinSet::new();
1349 for (idx, part) in parts.iter().enumerate() {
1350 let part = *part;
1351 // S3 caps multipart uploads at S3_MAX_PARTS = 10 000
1352 // (`plan_upload_parts` enforces this), so `idx + 1` always
1353 // fits in i32.
1354 let part_number = i32::try_from(idx + 1)
1355 .expect("plan_upload_parts caps parts <= S3_MAX_PARTS = 10_000");
1356 let body = make_body(part)?;
1357 let client = self.client.clone();
1358 let bucket = self.bucket.clone();
1359 let key = key.to_owned();
1360 let upload_id = upload_id.to_owned();
1361 let semaphore = Arc::clone(&semaphore);
1362 let progress = progress.clone();
1363 tasks.spawn(async move {
1364 let _permit = semaphore.acquire_owned().await.map_err(other_boxed)?;
1365 let resp = client
1366 .upload_part()
1367 .bucket(&bucket)
1368 .key(&key)
1369 .upload_id(&upload_id)
1370 .part_number(part_number)
1371 .body(ByteStream::from(body))
1372 .customize()
1373 .config_override(
1374 aws_sdk_s3::config::Builder::new().timeout_config(upload_timeout_config()),
1375 )
1376 .send()
1377 .await
1378 .map_err(|e| classify(e, &key))?;
1379 let etag = resp.e_tag().map(str::to_owned).ok_or_else(|| {
1380 ObjectStoreError::Other(
1381 format!("UploadPart for `{key}` part {part_number} returned no ETag")
1382 .into(),
1383 )
1384 })?;
1385 if let Some(sink) = &progress {
1386 sink.report(part.length);
1387 }
1388 Ok(CompletedPart::builder()
1389 .part_number(part_number)
1390 .e_tag(etag)
1391 .build())
1392 });
1393 }
1394 join_completed_parts(tasks, parts.len()).await
1395 }
1396
1397 /// Spawn parallel `UploadPart` tasks that each read their part
1398 /// from the shared `Arc<std::fs::File>` via `pread`. With
1399 /// concurrency 8 and 16 MiB parts, peak memory is 128 MiB.
1400 async fn upload_parts_from_file(
1401 &self,
1402 key: &str,
1403 upload_id: &str,
1404 file: Arc<std::fs::File>,
1405 parts: &[UploadPart],
1406 progress: Option<ProgressSink>,
1407 ) -> Result<Vec<CompletedPart>, ObjectStoreError> {
1408 let semaphore = Arc::new(Semaphore::new(MULTIPART_PUT_MAX_CONCURRENCY));
1409 let mut tasks: JoinSet<Result<CompletedPart, ObjectStoreError>> = JoinSet::new();
1410 for (idx, part) in parts.iter().enumerate() {
1411 let part = *part;
1412 // S3 caps multipart uploads at S3_MAX_PARTS = 10 000
1413 // (`plan_upload_parts` enforces this), so `idx + 1` always
1414 // fits in i32.
1415 let part_number = i32::try_from(idx + 1)
1416 .expect("plan_upload_parts caps parts <= S3_MAX_PARTS = 10_000");
1417 let client = self.client.clone();
1418 let bucket = self.bucket.clone();
1419 let key = key.to_owned();
1420 let upload_id = upload_id.to_owned();
1421 let task_file = Arc::clone(&file);
1422 let semaphore = Arc::clone(&semaphore);
1423 let progress = progress.clone();
1424 tasks.spawn(async move {
1425 let _permit = semaphore.acquire_owned().await.map_err(other_boxed)?;
1426 let body = read_file_part(task_file, part).await?;
1427 let resp = client
1428 .upload_part()
1429 .bucket(&bucket)
1430 .key(&key)
1431 .upload_id(&upload_id)
1432 .part_number(part_number)
1433 .body(ByteStream::from(body))
1434 .customize()
1435 .config_override(
1436 aws_sdk_s3::config::Builder::new().timeout_config(upload_timeout_config()),
1437 )
1438 .send()
1439 .await
1440 .map_err(|e| classify(e, &key))?;
1441 let etag = resp.e_tag().map(str::to_owned).ok_or_else(|| {
1442 ObjectStoreError::Other(
1443 format!("UploadPart for `{key}` part {part_number} returned no ETag")
1444 .into(),
1445 )
1446 })?;
1447 if let Some(sink) = &progress {
1448 sink.report(part.length);
1449 }
1450 Ok(CompletedPart::builder()
1451 .part_number(part_number)
1452 .e_tag(etag)
1453 .build())
1454 });
1455 }
1456 join_completed_parts(tasks, parts.len()).await
1457 }
1458
1459 /// Spawn parallel `UploadPartCopy` tasks for a server-side
1460 /// multipart copy. No body crosses the wire — each task only
1461 /// sends the source identifier and a byte range header.
1462 ///
1463 /// `src_etag`, if `Some`, is set as `x-amz-copy-source-if-match`
1464 /// on every part. A mid-copy source mutation then surfaces as
1465 /// `PreconditionFailed` on the offending part rather than
1466 /// silently producing a mixed destination.
1467 async fn upload_parts_via_copy(
1468 &self,
1469 src: &str,
1470 dst: &str,
1471 upload_id: &str,
1472 copy_source: &str,
1473 src_etag: Option<&str>,
1474 parts: &[UploadPart],
1475 ) -> Result<Vec<CompletedPart>, ObjectStoreError> {
1476 let semaphore = Arc::new(Semaphore::new(MULTIPART_PUT_MAX_CONCURRENCY));
1477 let mut tasks: JoinSet<Result<CompletedPart, ObjectStoreError>> = JoinSet::new();
1478 for (idx, part) in parts.iter().enumerate() {
1479 let part = *part;
1480 // S3 caps multipart uploads at S3_MAX_PARTS = 10 000
1481 // (`plan_upload_parts` enforces this), so `idx + 1` always
1482 // fits in i32.
1483 let part_number = i32::try_from(idx + 1)
1484 .expect("plan_upload_parts caps parts <= S3_MAX_PARTS = 10_000");
1485 let client = self.client.clone();
1486 let bucket = self.bucket.clone();
1487 let dst = dst.to_owned();
1488 let src_ctx = src.to_owned();
1489 let upload_id = upload_id.to_owned();
1490 let copy_source = copy_source.to_owned();
1491 let src_etag = src_etag.map(str::to_owned);
1492 let range = format!("bytes={}-{}", part.offset, part.offset + part.length - 1);
1493 let semaphore = Arc::clone(&semaphore);
1494 tasks.spawn(async move {
1495 let _permit = semaphore.acquire_owned().await.map_err(other_boxed)?;
1496 // `UploadPartCopy` failures point at the source (a 404
1497 // or 403 means the source went away or is now denied)
1498 // so classify against `src_ctx`, mirroring single-call
1499 // `copy()` at the trait surface.
1500 // Disable read_timeout for the same reason `put_body`
1501 // does (lessons_learned.md #2 / issue #26): smithy
1502 // resolves the connector future at "response-headers
1503 // received," but `UploadPartCopy` doesn't return until
1504 // the server-side copy completes — which for a 16 MiB
1505 // part on a slow region can exceed the 30 s
1506 // [`READ_TIMEOUT`].
1507 let mut req = client
1508 .upload_part_copy()
1509 .bucket(&bucket)
1510 .key(&dst)
1511 .upload_id(&upload_id)
1512 .part_number(part_number)
1513 .copy_source(©_source)
1514 .copy_source_range(&range);
1515 if let Some(etag) = &src_etag {
1516 req = req.copy_source_if_match(etag);
1517 }
1518 let resp = req
1519 .customize()
1520 .config_override(
1521 aws_sdk_s3::config::Builder::new().timeout_config(upload_timeout_config()),
1522 )
1523 .send()
1524 .await
1525 .map_err(|e| classify(e, &src_ctx))?;
1526 let etag = resp
1527 .copy_part_result()
1528 .and_then(|r| r.e_tag())
1529 .map(str::to_owned)
1530 .ok_or_else(|| {
1531 ObjectStoreError::Other(
1532 format!(
1533 "UploadPartCopy for `{src_ctx}` → `{dst}` part {part_number} returned no ETag"
1534 )
1535 .into(),
1536 )
1537 })?;
1538 Ok(CompletedPart::builder()
1539 .part_number(part_number)
1540 .e_tag(etag)
1541 .build())
1542 });
1543 }
1544 join_completed_parts(tasks, parts.len()).await
1545 }
1546
1547 /// Finalize a multipart upload: complete on success, best-effort
1548 /// abort on error.
1549 ///
1550 /// On success, `complete_multipart_upload` runs and the guard is
1551 /// disarmed so its [`Drop`] does not fire a redundant abort.
1552 ///
1553 /// On a per-part error or join failure, the abort is issued
1554 /// inline (awaited) so the call returns only after the upload-id
1555 /// has been released — matching the synchronous error semantics
1556 /// callers had before the RAII guard was introduced. The guard
1557 /// is disarmed *after* the inline abort completes; if the inline
1558 /// abort itself panics or is cancelled, the guard's [`Drop`]
1559 /// fires the abort again on a detached task. A double-abort is
1560 /// harmless — S3 returns `NoSuchUpload` on the second call.
1561 ///
1562 /// If `complete_multipart_upload` itself fails, the function
1563 /// returns the classified error via `?` *before* `disarm()`;
1564 /// the still-armed guard's [`Drop`] then dispatches the abort
1565 /// on a detached task. This keeps the function short — no
1566 /// extra inline-abort branch — and folds the rare
1567 /// "Complete failed" case onto the same Drop path the
1568 /// future-cancellation case exercises.
1569 ///
1570 /// Abort failures are logged via `tracing::warn` but the
1571 /// original error wins; surfacing the abort error would mask
1572 /// the cause.
1573 async fn finish_multipart_upload(
1574 &self,
1575 mut guard: MultipartUploadGuard,
1576 parts: Result<Vec<CompletedPart>, ObjectStoreError>,
1577 ) -> Result<(), ObjectStoreError> {
1578 match parts {
1579 Ok(parts) => {
1580 let multipart = CompletedMultipartUpload::builder()
1581 .set_parts(Some(parts))
1582 .build();
1583 self.client
1584 .complete_multipart_upload()
1585 .bucket(&self.bucket)
1586 .key(guard.key())
1587 .upload_id(guard.upload_id())
1588 .multipart_upload(multipart)
1589 .send()
1590 .await
1591 .map_err(|e| classify(e, guard.key()))?;
1592 guard.disarm();
1593 Ok(())
1594 }
1595 Err(err) => {
1596 if let Err(abort_err) = self
1597 .client
1598 .abort_multipart_upload()
1599 .bucket(&self.bucket)
1600 .key(guard.key())
1601 .upload_id(guard.upload_id())
1602 .send()
1603 .await
1604 {
1605 tracing::warn!(
1606 key = %guard.key(),
1607 upload_id = %guard.upload_id(),
1608 ?abort_err,
1609 "AbortMultipartUpload failed; orphan upload may incur storage cost \
1610 until lifecycle expiry",
1611 );
1612 }
1613 guard.disarm();
1614 Err(err)
1615 }
1616 }
1617 }
1618}
1619
1620/// RAII guard for an in-flight S3 multipart upload.
1621///
1622/// Owns the inputs needed to issue `AbortMultipartUpload` without
1623/// re-borrowing the [`S3Store`]. While `armed`, the guard's [`Drop`]
1624/// best-effort dispatches `AbortMultipartUpload` on a detached
1625/// `tokio::spawn` task so a future dropped between
1626/// `CreateMultipartUpload` and `CompleteMultipartUpload` does not
1627/// orphan the upload-id on the bucket (issues #169, #171).
1628///
1629/// Call [`disarm`] once the upload has been completed or its abort
1630/// has been issued synchronously so [`Drop`] becomes a no-op.
1631///
1632/// [`disarm`]: Self::disarm
1633struct MultipartUploadGuard {
1634 client: aws_sdk_s3::Client,
1635 bucket: String,
1636 key: String,
1637 upload_id: String,
1638 armed: bool,
1639}
1640
1641impl MultipartUploadGuard {
1642 fn new(client: aws_sdk_s3::Client, bucket: String, key: String, upload_id: String) -> Self {
1643 Self {
1644 client,
1645 bucket,
1646 key,
1647 upload_id,
1648 armed: true,
1649 }
1650 }
1651
1652 fn upload_id(&self) -> &str {
1653 &self.upload_id
1654 }
1655
1656 fn key(&self) -> &str {
1657 &self.key
1658 }
1659
1660 /// Mark the upload as resolved so [`Drop`] does not fire a
1661 /// redundant `AbortMultipartUpload`.
1662 fn disarm(&mut self) {
1663 self.armed = false;
1664 }
1665}
1666
1667impl Drop for MultipartUploadGuard {
1668 fn drop(&mut self) {
1669 if !self.armed {
1670 return;
1671 }
1672 // `Drop` cannot `.await`, so the abort is issued on a
1673 // detached `tokio::spawn` task. `Handle::try_current()`
1674 // returns `Err` if Drop runs outside any runtime (e.g. a
1675 // test that constructs the guard and immediately drops it);
1676 // in that case the best we can do is warn-log — panicking
1677 // in Drop is forbidden by project rules.
1678 let Ok(handle) = tokio::runtime::Handle::try_current() else {
1679 tracing::warn!(
1680 key = %self.key,
1681 upload_id = %self.upload_id,
1682 "MultipartUploadGuard dropped outside a tokio runtime; \
1683 cannot dispatch AbortMultipartUpload (orphan upload may \
1684 incur storage cost until S3 lifecycle expiry)",
1685 );
1686 return;
1687 };
1688 // Move owned fields into the detached task so the abort
1689 // outlives the dropped future. `Client` clones cheaply
1690 // (internal `Arc`); the strings move via `mem::take`.
1691 let client = self.client.clone();
1692 let bucket = std::mem::take(&mut self.bucket);
1693 let key = std::mem::take(&mut self.key);
1694 let upload_id = std::mem::take(&mut self.upload_id);
1695 handle.spawn(async move {
1696 if let Err(abort_err) = client
1697 .abort_multipart_upload()
1698 .bucket(&bucket)
1699 .key(&key)
1700 .upload_id(&upload_id)
1701 .send()
1702 .await
1703 {
1704 tracing::warn!(
1705 key = %key,
1706 upload_id = %upload_id,
1707 ?abort_err,
1708 "AbortMultipartUpload (drop-fire) failed; orphan upload may \
1709 incur storage cost until S3 lifecycle expiry",
1710 );
1711 }
1712 });
1713 }
1714}
1715
1716/// Drain a `JoinSet` of part-upload tasks into a `Vec<CompletedPart>`,
1717/// short-circuiting on the first error and sorting the result by
1718/// `part_number` so the `CompleteMultipartUpload` request honours
1719/// S3's "parts in part-number order" requirement.
1720async fn join_completed_parts(
1721 mut tasks: JoinSet<Result<CompletedPart, ObjectStoreError>>,
1722 capacity: usize,
1723) -> Result<Vec<CompletedPart>, ObjectStoreError> {
1724 let mut completed = Vec::with_capacity(capacity);
1725 while let Some(joined) = tasks.join_next().await {
1726 let part = joined.map_err(other_boxed)??;
1727 completed.push(part);
1728 }
1729 completed.sort_by_key(|p| {
1730 // Each `CompletedPart` here was built via
1731 // `CompletedPart::builder().part_number(...).e_tag(...).build()`
1732 // in the spawn loops above, so `part_number` is always set.
1733 p.part_number()
1734 .expect("CompletedPart built with explicit part_number")
1735 });
1736 Ok(completed)
1737}
1738
1739#[cfg(test)]
1740mod tests {
1741 use super::*;
1742 use crate::url::{AzureAddressing, RemoteFlags};
1743 use aws_sdk_s3::primitives::DateTime;
1744 use aws_sdk_s3::types::Object;
1745
1746 fn parse_endpoint(s: &str) -> Url {
1747 Url::parse(s).expect("test endpoint URL parses")
1748 }
1749
1750 // --- object_to_meta -----------------------------------------------
1751
1752 #[test]
1753 fn object_to_meta_round_trips_well_formed_object() {
1754 let modified = DateTime::from_secs(1_700_000_000);
1755 let obj = Object::builder()
1756 .key("refs/heads/main/abc.bundle")
1757 .size(42)
1758 .last_modified(modified)
1759 .build();
1760 let meta = object_to_meta(&obj).expect("conversion succeeds");
1761 assert_eq!(meta.key, "refs/heads/main/abc.bundle");
1762 assert_eq!(meta.size, 42);
1763 assert_eq!(meta.last_modified.unix_timestamp(), 1_700_000_000);
1764 }
1765
1766 #[test]
1767 fn object_to_meta_rejects_missing_key() {
1768 let obj = Object::builder()
1769 .last_modified(DateTime::from_secs(1_700_000_000))
1770 .build();
1771 let err = object_to_meta(&obj).expect_err("missing key must error");
1772 match err {
1773 ObjectStoreError::Other(inner) => {
1774 assert!(
1775 inner.to_string().contains("without a key"),
1776 "error message names the failure: {inner}"
1777 );
1778 }
1779 other => panic!("expected ObjectStoreError::Other for missing key, got {other:?}"),
1780 }
1781 }
1782
1783 #[test]
1784 fn object_to_meta_rejects_missing_last_modified() {
1785 let obj = Object::builder().key("k").size(0).build();
1786 let err = object_to_meta(&obj).expect_err("missing last_modified must error");
1787 match err {
1788 ObjectStoreError::Other(inner) => {
1789 let msg = inner.to_string();
1790 assert!(
1791 msg.contains("without last_modified"),
1792 "names failure: {msg}"
1793 );
1794 assert!(msg.contains("`k`"), "includes the key for context: {msg}");
1795 }
1796 other => {
1797 panic!("expected ObjectStoreError::Other for missing last_modified, got {other:?}")
1798 }
1799 }
1800 }
1801
1802 // --- head_output_to_meta -------------------------------------------
1803
1804 #[test]
1805 fn head_output_to_meta_round_trips_well_formed_response() {
1806 let modified = DateTime::from_secs(1_700_000_000);
1807 let meta = head_output_to_meta("k", Some(42), Some(&modified), Some("\"abc\""))
1808 .expect("conversion succeeds");
1809 assert_eq!(meta.key, "k");
1810 assert_eq!(meta.size, 42);
1811 assert_eq!(meta.last_modified.unix_timestamp(), 1_700_000_000);
1812 assert_eq!(meta.etag.as_deref(), Some("\"abc\""));
1813 }
1814
1815 #[test]
1816 fn head_output_to_meta_preserves_legitimate_zero_size() {
1817 // Zero-byte lock files are legitimate in this codebase; a
1818 // `Content-Length: 0` header (i.e. `Some(0)`) must round-trip
1819 // as `size == 0`, distinct from the missing-header error.
1820 let modified = DateTime::from_secs(1_700_000_000);
1821 let meta = head_output_to_meta("LOCK", Some(0), Some(&modified), None)
1822 .expect("conversion succeeds");
1823 assert_eq!(meta.size, 0);
1824 }
1825
1826 #[test]
1827 fn head_output_to_meta_rejects_missing_content_length() {
1828 let modified = DateTime::from_secs(1_700_000_000);
1829 let err = head_output_to_meta("k", None, Some(&modified), None)
1830 .expect_err("missing content-length must error");
1831 match err {
1832 ObjectStoreError::Other(inner) => {
1833 let msg = inner.to_string();
1834 assert!(msg.contains("no content-length"), "names failure: {msg}");
1835 assert!(msg.contains("`k`"), "includes the key for context: {msg}");
1836 }
1837 other => {
1838 panic!("expected ObjectStoreError::Other for missing content-length, got {other:?}")
1839 }
1840 }
1841 }
1842
1843 #[test]
1844 fn head_output_to_meta_rejects_missing_last_modified() {
1845 let err = head_output_to_meta("k", Some(0), None, None)
1846 .expect_err("missing last_modified must error");
1847 match err {
1848 ObjectStoreError::Other(inner) => {
1849 let msg = inner.to_string();
1850 assert!(msg.contains("no last_modified"), "names failure: {msg}");
1851 assert!(msg.contains("`k`"), "includes the key for context: {msg}");
1852 }
1853 other => {
1854 panic!("expected ObjectStoreError::Other for missing last_modified, got {other:?}")
1855 }
1856 }
1857 }
1858
1859 #[test]
1860 fn head_output_to_meta_clamps_negative_size_to_zero() {
1861 // The SDK types content_length as `Option<i64>`; a (legally
1862 // impossible) negative value clamps to 0 rather than wrapping
1863 // to a huge u64. Mirrors `object_to_meta` behavior.
1864 let modified = DateTime::from_secs(1_700_000_000);
1865 let meta =
1866 head_output_to_meta("k", Some(-1), Some(&modified), None).expect("conversion succeeds");
1867 assert_eq!(meta.size, 0);
1868 }
1869
1870 #[test]
1871 fn object_to_meta_clamps_negative_size_to_zero() {
1872 // S3 cannot legally return a negative size, but the SDK types
1873 // it as `i64`. Defensive default: clamp to 0 rather than
1874 // sign-extend to a huge u64.
1875 let obj = Object::builder()
1876 .key("k")
1877 .size(-1)
1878 .last_modified(DateTime::from_secs(1_700_000_000))
1879 .build();
1880 let meta = object_to_meta(&obj).expect("conversion succeeds");
1881 assert_eq!(meta.size, 0);
1882 }
1883
1884 // --- plan_ranges --------------------------------------------------
1885
1886 #[test]
1887 fn plan_ranges_zero_size_yields_empty_vec() {
1888 assert!(plan_ranges(0, 16).is_empty());
1889 }
1890
1891 #[test]
1892 fn plan_ranges_zero_chunk_yields_empty_vec() {
1893 assert!(plan_ranges(100, 0).is_empty());
1894 }
1895
1896 #[test]
1897 fn plan_ranges_size_one_byte() {
1898 assert_eq!(plan_ranges(1, 16), vec![(0, 0)]);
1899 }
1900
1901 #[test]
1902 fn plan_ranges_size_below_chunk() {
1903 assert_eq!(plan_ranges(10, 16), vec![(0, 9)]);
1904 }
1905
1906 #[test]
1907 fn plan_ranges_size_equals_chunk() {
1908 assert_eq!(plan_ranges(16, 16), vec![(0, 15)]);
1909 }
1910
1911 #[test]
1912 fn plan_ranges_size_one_byte_above_chunk() {
1913 assert_eq!(plan_ranges(17, 16), vec![(0, 15), (16, 16)]);
1914 }
1915
1916 #[test]
1917 fn plan_ranges_exact_multiple_of_chunk() {
1918 assert_eq!(
1919 plan_ranges(48, 16),
1920 vec![(0, 15), (16, 31), (32, 47)],
1921 "three full chunks, no leftover"
1922 );
1923 }
1924
1925 #[test]
1926 fn plan_ranges_with_partial_final_chunk() {
1927 assert_eq!(
1928 plan_ranges(50, 16),
1929 vec![(0, 15), (16, 31), (32, 47), (48, 49)]
1930 );
1931 }
1932
1933 #[test]
1934 fn plan_ranges_handles_huge_size_without_overflow() {
1935 // 6 GiB at 16 MiB chunks → 384 chunks, all valid u64 arithmetic.
1936 let size = 6u64 * 1024 * 1024 * 1024;
1937 let chunk = 16u64 * 1024 * 1024;
1938 let ranges = plan_ranges(size, chunk);
1939 assert_eq!(ranges.len(), 384);
1940 assert_eq!(ranges.first().copied(), Some((0, chunk - 1)));
1941 assert_eq!(ranges.last().copied(), Some((size - chunk, size - 1)));
1942 }
1943
1944 // --- normalize_endpoint -------------------------------------------
1945
1946 #[test]
1947 fn normalize_endpoint_path_style_strips_bucket_path() {
1948 let url = parse_endpoint("https://s3.us-west-2.amazonaws.com/my-bucket");
1949 let out = normalize_endpoint(&url, S3Addressing::PathStyle).unwrap();
1950 assert_eq!(out.host_str(), Some("s3.us-west-2.amazonaws.com"));
1951 assert_eq!(out.path(), "/");
1952 assert!(out.query().is_none());
1953 }
1954
1955 #[test]
1956 fn normalize_endpoint_strips_query_string() {
1957 // Our URL parser leaves `?addressing=path` etc. on the endpoint;
1958 // the SDK rejects any query component.
1959 let url = parse_endpoint("http://127.0.0.1:9000/my-bucket?addressing=path");
1960 let out = normalize_endpoint(&url, S3Addressing::PathStyle).unwrap();
1961 assert!(out.query().is_none(), "query must be stripped: {out}");
1962 assert_eq!(out.path(), "/");
1963 assert_eq!(out.host_str(), Some("127.0.0.1"));
1964 assert_eq!(out.port(), Some(9000));
1965 }
1966
1967 #[test]
1968 fn normalize_endpoint_strips_bucket_label_for_virtual_hosted() {
1969 let url = parse_endpoint("https://my-bucket.s3.us-west-2.amazonaws.com/");
1970 let out = normalize_endpoint(&url, S3Addressing::VirtualHosted).unwrap();
1971 assert_eq!(out.host_str(), Some("s3.us-west-2.amazonaws.com"));
1972 assert_eq!(out.scheme(), "https");
1973 assert_eq!(out.path(), "/");
1974 }
1975
1976 #[test]
1977 fn normalize_endpoint_virtual_hosted_preserves_port_and_scheme() {
1978 let url = parse_endpoint("http://my-bucket.s3.example.com:9000/some/path?x=1");
1979 let out = normalize_endpoint(&url, S3Addressing::VirtualHosted).unwrap();
1980 assert_eq!(out.scheme(), "http");
1981 assert_eq!(out.host_str(), Some("s3.example.com"));
1982 assert_eq!(out.port(), Some(9000));
1983 assert_eq!(out.path(), "/");
1984 assert!(out.query().is_none());
1985 }
1986
1987 #[test]
1988 fn normalize_endpoint_dotted_bucket_virtual_hosted() {
1989 // Bucket name contains dots (e.g. "bucketname.com"). A plain
1990 // `split_once('.')` would stop at the first dot and produce
1991 // "com.s3.us-west-2.amazonaws.com" instead of the correct
1992 // "s3.us-west-2.amazonaws.com".
1993 let url = parse_endpoint("https://bucketname.com.s3.us-west-2.amazonaws.com/some/path");
1994 let out = normalize_endpoint(&url, S3Addressing::VirtualHosted).unwrap();
1995 assert_eq!(out.host_str(), Some("s3.us-west-2.amazonaws.com"));
1996 assert_eq!(out.path(), "/");
1997 assert!(out.query().is_none());
1998 }
1999
2000 // --- resolve_region -----------------------------------------------
2001
2002 #[test]
2003 fn resolve_region_flag_takes_precedence() {
2004 let url = parse_endpoint("https://my-bucket.s3.us-west-2.amazonaws.com/");
2005 assert_eq!(
2006 resolve_region(&url, Some("eu-central-1")),
2007 Some("eu-central-1".to_owned())
2008 );
2009 }
2010
2011 #[test]
2012 fn resolve_region_extracts_from_virtual_hosted_aws_host() {
2013 let url = parse_endpoint("https://my-bucket.s3.us-west-2.amazonaws.com/");
2014 assert_eq!(resolve_region(&url, None), Some("us-west-2".to_owned()));
2015 }
2016
2017 #[test]
2018 fn resolve_region_extracts_from_path_style_aws_host() {
2019 let url = parse_endpoint("https://s3.eu-west-1.amazonaws.com/my-bucket");
2020 assert_eq!(resolve_region(&url, None), Some("eu-west-1".to_owned()));
2021 }
2022
2023 #[test]
2024 fn resolve_region_handles_legacy_hyphenated_form() {
2025 let url = parse_endpoint("https://s3-ap-south-1.amazonaws.com/my-bucket");
2026 assert_eq!(resolve_region(&url, None), Some("ap-south-1".to_owned()));
2027 }
2028
2029 #[test]
2030 fn resolve_region_legacy_no_segment_returns_none() {
2031 // s3.amazonaws.com (no region segment) — let the SDK's provider
2032 // chain pick from env/profile.
2033 let url = parse_endpoint("https://s3.amazonaws.com/my-bucket");
2034 assert_eq!(resolve_region(&url, None), None);
2035 }
2036
2037 #[test]
2038 fn resolve_region_non_aws_host_defaults_to_us_east_1() {
2039 let url = parse_endpoint("http://localhost:9000/my-bucket");
2040 assert_eq!(resolve_region(&url, None), Some("us-east-1".to_owned()));
2041 }
2042
2043 #[test]
2044 fn resolve_region_r2_endpoint_defaults_to_us_east_1() {
2045 let url = parse_endpoint("https://abc123.r2.cloudflarestorage.com/my-bucket");
2046 assert_eq!(resolve_region(&url, None), Some("us-east-1".to_owned()));
2047 }
2048
2049 #[test]
2050 fn resolve_region_dotted_bucket_virtual_hosted() {
2051 // Bucket name contains dots. The host has 4+ labels after stripping
2052 // `.amazonaws.com`; `resolve_region` must still find the region.
2053 let url = parse_endpoint("https://bucketname.com.s3.us-west-2.amazonaws.com/some/path");
2054 assert_eq!(resolve_region(&url, None), Some("us-west-2".to_owned()));
2055 }
2056
2057 #[test]
2058 fn resolve_region_china_partition_virtual_hosted() {
2059 // China partition (`.amazonaws.com.cn`) — the suffix list in
2060 // `crate::url::AWS_HOST_SUFFIXES` is the single source of truth
2061 // for which suffixes count as AWS. This test pins parity between
2062 // `check_aws_s3_host` (which accepts the suffix) and
2063 // `resolve_region` (which must extract the region from it).
2064 let url = parse_endpoint("https://my-bucket.s3.cn-north-1.amazonaws.com.cn/repo");
2065 assert_eq!(resolve_region(&url, None), Some("cn-north-1".to_owned()));
2066 }
2067
2068 #[test]
2069 fn resolve_region_china_partition_path_style() {
2070 let url = parse_endpoint("https://s3.cn-northwest-1.amazonaws.com.cn/my-bucket");
2071 assert_eq!(
2072 resolve_region(&url, None),
2073 Some("cn-northwest-1".to_owned())
2074 );
2075 }
2076
2077 // --- encode_copy_source -------------------------------------------
2078
2079 #[test]
2080 fn encode_copy_source_preserves_slash_between_bucket_and_key() {
2081 let out = encode_copy_source("my-bucket", "refs/heads/main/abc.bundle");
2082 assert_eq!(out, "my-bucket/refs/heads/main/abc.bundle");
2083 }
2084
2085 #[test]
2086 fn encode_copy_source_encodes_hash_in_lock_keys() {
2087 // LOCK#.lock from the per-ref locking scheme — # is reserved.
2088 let out = encode_copy_source("my-bucket", "refs/heads/main/LOCK#.lock");
2089 assert_eq!(out, "my-bucket/refs/heads/main/LOCK%23.lock");
2090 }
2091
2092 #[test]
2093 fn encode_copy_source_encodes_spaces_and_query_chars() {
2094 let out = encode_copy_source("my-bucket", "weird key?with=stuff");
2095 assert!(out.contains("%20"), "space encoded: {out}");
2096 assert!(out.contains("%3F"), "? encoded: {out}");
2097 assert!(out.contains("%3D"), "= encoded: {out}");
2098 }
2099
2100 #[test]
2101 fn encode_copy_source_passes_unreserved_through() {
2102 let out = encode_copy_source("my.bucket-name_v1~", "abc-def_ghi.txt");
2103 assert_eq!(out, "my.bucket-name_v1~/abc-def_ghi.txt");
2104 }
2105
2106 // --- classify_status_and_code ------------------------------------
2107
2108 #[test]
2109 fn classify_404_status_is_not_found() {
2110 assert!(matches!(
2111 classify_status_and_code(404, None, "k"),
2112 Some(ObjectStoreError::NotFound(s)) if s == "k"
2113 ));
2114 }
2115
2116 #[test]
2117 fn classify_403_status_is_access_denied() {
2118 assert!(matches!(
2119 classify_status_and_code(403, None, "k"),
2120 Some(ObjectStoreError::AccessDenied(s)) if s == "k"
2121 ));
2122 }
2123
2124 #[test]
2125 fn classify_412_status_is_precondition_failed() {
2126 assert!(matches!(
2127 classify_status_and_code(412, None, "k"),
2128 Some(ObjectStoreError::PreconditionFailed(s)) if s == "k"
2129 ));
2130 }
2131
2132 #[test]
2133 fn classify_409_status_is_conflict() {
2134 // The 409 case is critical: AWS S3 returns 409 when two
2135 // If-None-Match: "*" PUTs race even on a key that did not exist
2136 // beforehand. Without this branch, put_if_absent would surface
2137 // racing-write contention as a hard error instead of Ok(false).
2138 assert!(matches!(
2139 classify_status_and_code(409, None, "k"),
2140 Some(ObjectStoreError::Conflict(s)) if s == "k"
2141 ));
2142 }
2143
2144 #[test]
2145 fn classify_no_such_key_code_falls_back_to_not_found() {
2146 assert!(matches!(
2147 classify_status_and_code(500, Some("NoSuchKey"), "k"),
2148 Some(ObjectStoreError::NotFound(s)) if s == "k"
2149 ));
2150 }
2151
2152 #[test]
2153 fn classify_conditional_request_conflict_code_is_conflict() {
2154 assert!(matches!(
2155 classify_status_and_code(500, Some("ConditionalRequestConflict"), "k"),
2156 Some(ObjectStoreError::Conflict(s)) if s == "k"
2157 ));
2158 }
2159
2160 #[test]
2161 fn classify_entity_too_large_code_is_payload_too_large() {
2162 // S3 returns HTTP 400 + `EntityTooLarge` when a single-PUT body
2163 // exceeds 5 GiB. Status 400 alone is too broad; route via code.
2164 assert!(matches!(
2165 classify_status_and_code(400, Some("EntityTooLarge"), "k"),
2166 Some(ObjectStoreError::PayloadTooLarge { limit_bytes })
2167 if limit_bytes == SINGLE_PUT_LIMIT_BYTES
2168 ));
2169 }
2170
2171 #[test]
2172 fn classify_413_status_is_payload_too_large() {
2173 // Front-door / proxy paths can surface HTTP 413 directly even
2174 // when the canonical S3 response is 400; treat 413 the same.
2175 assert!(matches!(
2176 classify_status_and_code(413, None, "k"),
2177 Some(ObjectStoreError::PayloadTooLarge { limit_bytes })
2178 if limit_bytes == SINGLE_PUT_LIMIT_BYTES
2179 ));
2180 }
2181
2182 #[test]
2183 fn classify_unrecognised_returns_none() {
2184 assert!(classify_status_and_code(500, Some("InternalError"), "k").is_none());
2185 assert!(classify_status_and_code(500, None, "k").is_none());
2186 // Plain 400 with no recognised code stays in `Other` so callers
2187 // see the SDK chain rather than a misleading PayloadTooLarge.
2188 assert!(classify_status_and_code(400, None, "k").is_none());
2189 assert!(classify_status_and_code(400, Some("MalformedXML"), "k").is_none());
2190 }
2191
2192 // --- from_remote_url constructor branch ---------------------------
2193
2194 fn azure_url() -> RemoteUrl {
2195 RemoteUrl::Azure {
2196 endpoint: parse_endpoint("https://acct.blob.core.windows.net/container"),
2197 account: "acct".to_owned(),
2198 container: "container".to_owned(),
2199 prefix: None,
2200 addressing: AzureAddressing::VirtualHosted,
2201 flags: RemoteFlags::default(),
2202 }
2203 }
2204
2205 #[tokio::test]
2206 async fn from_remote_url_rejects_azure() {
2207 let result = S3Store::from_remote_url(&azure_url()).await;
2208 match result {
2209 Err(ObjectStoreError::Other(_)) => {}
2210 Err(other) => panic!("expected ObjectStoreError::Other, got {other:?}"),
2211 Ok(_) => panic!("expected Azure URL to be rejected"),
2212 }
2213 }
2214
2215 // --- ResolvedS3Config (URL → decisions) ---------------------------
2216
2217 #[test]
2218 fn resolved_path_style_minio() {
2219 let endpoint = parse_endpoint("http://127.0.0.1:9000/my-bucket?addressing=path");
2220 let resolved =
2221 ResolvedS3Config::from_url_parts(&endpoint, S3Addressing::PathStyle, None, None)
2222 .expect("resolves");
2223 assert!(resolved.force_path_style);
2224 assert_eq!(resolved.endpoint_url.host_str(), Some("127.0.0.1"));
2225 assert_eq!(resolved.endpoint_url.port(), Some(9000));
2226 assert_eq!(resolved.endpoint_url.path(), "/");
2227 assert!(resolved.endpoint_url.query().is_none());
2228 assert_eq!(resolved.region.as_deref(), Some("us-east-1"));
2229 assert!(resolved.profile.is_none());
2230 }
2231
2232 #[test]
2233 fn resolved_virtual_hosted_aws_strips_bucket_and_picks_region() {
2234 let endpoint = parse_endpoint("https://my-bucket.s3.us-west-2.amazonaws.com/");
2235 let resolved =
2236 ResolvedS3Config::from_url_parts(&endpoint, S3Addressing::VirtualHosted, None, None)
2237 .expect("resolves");
2238 assert!(!resolved.force_path_style);
2239 assert_eq!(
2240 resolved.endpoint_url.host_str(),
2241 Some("s3.us-west-2.amazonaws.com")
2242 );
2243 assert!(
2244 !resolved.endpoint_url.as_str().contains("my-bucket"),
2245 "bucket label must be stripped: {}",
2246 resolved.endpoint_url
2247 );
2248 assert_eq!(resolved.region.as_deref(), Some("us-west-2"));
2249 }
2250
2251 #[test]
2252 fn resolved_explicit_flags_propagate() {
2253 let endpoint = parse_endpoint("http://127.0.0.1:9000/my-bucket");
2254 let resolved = ResolvedS3Config::from_url_parts(
2255 &endpoint,
2256 S3Addressing::PathStyle,
2257 Some("dev-profile"),
2258 Some("eu-central-1"),
2259 )
2260 .expect("resolves");
2261 assert_eq!(resolved.region.as_deref(), Some("eu-central-1"));
2262 assert_eq!(resolved.profile.as_deref(), Some("dev-profile"));
2263 }
2264
2265 #[tokio::test]
2266 async fn build_s3_config_round_trips_resolved_decisions() {
2267 // We can't peek into aws_sdk_s3::Config getters reliably across
2268 // SDK 1.x patch releases, so just confirm the build call accepts
2269 // every decision shape without panicking. The decisions
2270 // themselves are tested via `ResolvedS3Config` above.
2271 //
2272 // Coverage scope: this test catches a panic during
2273 // `Builder::build_https()` construction (e.g. a missing TLS
2274 // provider feature), but does NOT catch a regression that
2275 // silently drops `.http_client(...)` from the loader chain —
2276 // that call is optional, so removing it still compiles and
2277 // returns a config. The constant-pin test below guards the
2278 // value; only an integration test against a real server with
2279 // observable connection-pool timing would catch a regression
2280 // in the wiring itself.
2281 let endpoint = parse_endpoint("http://127.0.0.1:9000/my-bucket");
2282 let resolved =
2283 ResolvedS3Config::from_url_parts(&endpoint, S3Addressing::PathStyle, None, None)
2284 .expect("resolves");
2285 let _config = build_s3_config(&resolved).await;
2286 }
2287
2288 /// Pin the timeout values. A future copy-paste mistake
2289 /// (`from_millis` instead of `from_secs`, an accidental zero)
2290 /// silently disables the very behaviour the constants exist for;
2291 /// fail fast instead. If a constant is deliberately changed,
2292 /// update the expected value on the right-hand side together —
2293 /// the test exists to make such a change deliberate, not to lock
2294 /// the value forever. See the matching Azure-side test for the
2295 /// same rationale.
2296 #[test]
2297 fn timeout_constants_have_expected_values() {
2298 assert_eq!(POOL_IDLE_TIMEOUT, Duration::from_secs(30));
2299 assert_eq!(READ_TIMEOUT, Duration::from_secs(30));
2300 }
2301
2302 /// Pin the `should_use_multipart` predicate at and around the
2303 /// shared threshold (issue #53).
2304 ///
2305 /// `put_bytes`, `put_path`, and `copy` route through this
2306 /// predicate to decide single-PUT vs multipart. The integration
2307 /// tests `multipart_put_emits_per_part_progress_events` and the
2308 /// env-gated `multipart_put_path_above_5_gib_round_trips` cover
2309 /// the dispatch *call* (only multipart emits >= 2 progress
2310 /// events; only multipart succeeds above 5 GiB); this unit test
2311 /// pins the predicate's boundary semantics so the constant can't
2312 /// be moved out from under those tests without something failing.
2313 #[test]
2314 fn should_use_multipart_pins_threshold_boundary() {
2315 use super::super::multipart::MULTIPART_PUT_THRESHOLD;
2316 assert!(!should_use_multipart(MULTIPART_PUT_THRESHOLD - 1));
2317 assert!(should_use_multipart(MULTIPART_PUT_THRESHOLD));
2318 assert!(should_use_multipart(MULTIPART_PUT_THRESHOLD + 1));
2319 // A 6 GiB body must take the multipart path: this is the
2320 // failure mode named in the issue (`EntityTooLarge` on bare
2321 // `PutObject`).
2322 assert!(should_use_multipart(6 * (1 << 30)));
2323 }
2324
2325 /// Tripwire for the `disable_read_timeout()` fix in commit bfec2f4.
2326 ///
2327 /// `put_body` overrides the SDK timeout config per-operation so
2328 /// large bundle uploads are not aborted at [`READ_TIMEOUT`].
2329 /// A regression that drops `.disable_read_timeout()` from the
2330 /// override (e.g. a bare `TimeoutConfig::builder().build()`) would
2331 /// re-introduce the upload-abort bug silently.
2332 ///
2333 /// `TimeoutConfig` does not expose the per-field state via getters
2334 /// (both `Unset` and `Disabled` return `None`), so the assertion
2335 /// uses the merge semantics: build a base config that *does* set
2336 /// `read_timeout`, then verify that merging via `take_defaults_from`
2337 /// keeps `read_timeout` disabled rather than inheriting the base.
2338 #[test]
2339 fn put_body_upload_override_disables_read_timeout() {
2340 let base = TimeoutConfig::builder()
2341 .read_timeout(Duration::from_secs(99))
2342 .build();
2343
2344 // `upload_timeout_config()` is the function `put_body` calls.
2345 let mut override_cfg = upload_timeout_config();
2346 let merged = override_cfg.take_defaults_from(&base);
2347
2348 // If `disable_read_timeout()` is in place, the merged config
2349 // returns `None` (Disabled). If a regression dropped it, the
2350 // merged config would inherit `Some(99s)` from the base.
2351 assert_eq!(
2352 merged.read_timeout(),
2353 None,
2354 "upload override must disable read_timeout, not just leave it Unset",
2355 );
2356 }
2357
2358 // --- MultipartUploadGuard (#169, #171) ----------------------------
2359
2360 /// Build a no-network `Client` for guard tests. The Client is
2361 /// only used to construct guards; no requests are issued from
2362 /// within these tests (we never let an armed guard's spawned
2363 /// abort actually reach the SDK).
2364 fn test_client() -> aws_sdk_s3::Client {
2365 let conf = aws_sdk_s3::Config::builder()
2366 .behavior_version(BehaviorVersion::latest())
2367 .region(Region::new("us-east-1"))
2368 .endpoint_url("http://127.0.0.1:1/")
2369 .build();
2370 aws_sdk_s3::Client::from_conf(conf)
2371 }
2372
2373 fn make_guard() -> MultipartUploadGuard {
2374 MultipartUploadGuard::new(
2375 test_client(),
2376 "bkt".to_owned(),
2377 "k".to_owned(),
2378 "uid".to_owned(),
2379 )
2380 }
2381
2382 #[test]
2383 fn multipart_upload_guard_exposes_constructor_fields() {
2384 // The accessors are load-bearing: `finish_multipart_upload`
2385 // reads them to address the complete/abort calls.
2386 let mut guard = make_guard();
2387 assert_eq!(guard.key(), "k");
2388 assert_eq!(guard.upload_id(), "uid");
2389 // Disarm so Drop is a no-op (the constructor-fields test
2390 // should not exercise the spawn-on-Drop path).
2391 guard.disarm();
2392 }
2393
2394 #[test]
2395 fn multipart_upload_guard_disarmed_drop_outside_runtime_is_silent() {
2396 // Observable contract: a disarmed guard must Drop without
2397 // attempting `Handle::try_current()` or `spawn`. We exercise
2398 // this outside any tokio runtime — `spawn` would panic there
2399 // and a `try_current()` lookup would warn-log. Neither must
2400 // happen on the success path.
2401 let mut guard = make_guard();
2402 guard.disarm();
2403 drop(guard);
2404 }
2405
2406 #[test]
2407 fn multipart_upload_guard_armed_drop_outside_runtime_does_not_panic() {
2408 // Project rule: Drop must never panic. When dropped armed
2409 // outside any tokio runtime, the guard logs a warn and
2410 // returns cleanly rather than panicking on a missing
2411 // runtime handle.
2412 let guard = make_guard();
2413 drop(guard);
2414 }
2415
2416 #[tokio::test]
2417 async fn multipart_upload_guard_armed_drop_inside_runtime_spawns_abort_task() {
2418 // Production failure mode: a future dropped between
2419 // `start_multipart_upload` and `finish_multipart_upload`
2420 // must dispatch `AbortMultipartUpload` on a detached
2421 // tokio task. This test pins the cheap observable contract:
2422 //
2423 // 1. Drop neither panics nor blocks.
2424 // 2. The spawned abort task makes forward progress (the
2425 // unreachable endpoint `127.0.0.1:1` forces a connect
2426 // failure inside `send().await`, exercising the spawned
2427 // closure's `Err`-arm warn-log).
2428 //
2429 // The companion test below
2430 // (`..._drop_issues_abort_multipart_upload`) goes further
2431 // and byte-equality-checks the captured HTTP request, so
2432 // this test only needs to keep the no-panic / forward-
2433 // progress contract on the warn-log path.
2434 //
2435 // Yielding `JoinSet`-style would let us join the task and
2436 // observe completion, but Drop uses a detached `spawn` by
2437 // design (a `JoinHandle` would require Drop to hold state
2438 // for the lifetime of the runtime). Yielding the runtime
2439 // a handful of times lets the spawned task reach its
2440 // `send().await` before the test returns and the runtime
2441 // tears down.
2442 let guard = make_guard();
2443 drop(guard);
2444 for _ in 0..4 {
2445 tokio::task::yield_now().await;
2446 }
2447 }
2448
2449 /// Build an `aws_sdk_s3::Client` whose HTTP layer is the
2450 /// smithy `capture_request` handler — every request the SDK
2451 /// emits is recorded on the returned `CaptureRequestReceiver`
2452 /// instead of touching the network. Static test credentials
2453 /// keep `SigV4` happy so the SDK actually reaches the HTTP
2454 /// layer (an unsigned chain would short-circuit earlier).
2455 fn capture_client() -> (
2456 aws_sdk_s3::Client,
2457 aws_smithy_http_client::test_util::CaptureRequestReceiver,
2458 ) {
2459 use aws_sdk_s3::config::Credentials;
2460 let (http_client, rx) = aws_smithy_http_client::test_util::capture_request(None);
2461 let conf = aws_sdk_s3::Config::builder()
2462 .behavior_version(BehaviorVersion::latest())
2463 .region(Region::new("us-east-1"))
2464 .credentials_provider(Credentials::new(
2465 "AKIAIOSFODNN7EXAMPLE",
2466 "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
2467 None,
2468 None,
2469 "test",
2470 ))
2471 .http_client(http_client)
2472 .force_path_style(true)
2473 .build();
2474 (aws_sdk_s3::Client::from_conf(conf), rx)
2475 }
2476
2477 #[tokio::test]
2478 async fn multipart_upload_guard_drop_issues_abort_multipart_upload() {
2479 // Byte-equality contract for issue #173: when an armed
2480 // `MultipartUploadGuard` is dropped inside a tokio runtime,
2481 // the detached abort task must issue an
2482 // `AbortMultipartUpload` request addressing the exact
2483 // (bucket, key, upload-id) the guard was constructed with.
2484 //
2485 // S3's wire form for `AbortMultipartUpload` is
2486 // `DELETE /<bucket>/<key>?uploadId=<id>` (path style)
2487 // so we assert: method=DELETE, the captured URI contains
2488 // the key, and `uploadId=<id>` appears in the query.
2489 // We deliberately do NOT couple to the exact host, scheme,
2490 // or full URL — those are SDK-internal details unrelated
2491 // to the abort contract.
2492 let (client, rx) = capture_client();
2493 let guard = MultipartUploadGuard::new(
2494 client,
2495 "test-bucket".to_owned(),
2496 "test/key.pack".to_owned(),
2497 "test-upload-id-abc123".to_owned(),
2498 );
2499 drop(guard);
2500
2501 // The detached `tokio::spawn` task must run far enough to
2502 // submit the request through the capture client. Yielding
2503 // a handful of times lets the SDK's signing + request
2504 // pipeline complete; `capture_request` resolves the call
2505 // synchronously once it sees the request, so no real
2506 // network wait is needed.
2507 for _ in 0..16 {
2508 tokio::task::yield_now().await;
2509 }
2510
2511 let request = rx.expect_request();
2512 assert_eq!(
2513 request.method(),
2514 "DELETE",
2515 "AbortMultipartUpload must be DELETE; got {}",
2516 request.method(),
2517 );
2518 let uri = request.uri();
2519 assert!(
2520 uri.contains("test/key.pack"),
2521 "captured URI must address the guard's key; got {uri}",
2522 );
2523 assert!(
2524 uri.contains("uploadId=test-upload-id-abc123"),
2525 "captured URI must carry the guard's upload-id in the \
2526 query string; got {uri}",
2527 );
2528 }
2529}