Skip to main content

s4_server/
service.rs

1//! `s3s::S3` 実装 — `s3s_aws::Proxy` への delegation を default にしつつ、
2//! `put_object` / `get_object` 経路で `s4_codec::CodecRegistry` を呼ぶ。
3//!
4//! ## カバー範囲 (Phase 1 月 2)
5//!
6//! - 圧縮 hook あり: `put_object`, `get_object`
7//! - 純 delegation (圧縮なし): `head_bucket`, `list_buckets`, `create_bucket`, `delete_bucket`,
8//!   `head_object`, `delete_object`, `delete_objects`, `copy_object`, `list_objects`,
9//!   `list_objects_v2`, `create_multipart_upload`, `upload_part`,
10//!   `complete_multipart_upload`, `abort_multipart_upload`, `list_multipart_uploads`,
11//!   `list_parts`
12//! - 未対応 (デフォルトで NotImplemented): その他 80+ ops (Tagging / ACL / Lifecycle 等は Phase 2)
13//!
14//! ## アーキテクチャ
15//!
16//! - `S4Service<B>` は backend (B: S3) と `Arc<CodecRegistry>` と `Arc<dyn CodecDispatcher>`
17//!   を保持する。`CodecRegistry` 経由で複数 codec を抱えられるので、ひとつの S4 インスタンスが
18//!   複数 codec で書かれた object を透過的に GET できる
19//! - PUT: dispatcher が body の先頭 sample から codec を選び、registry で compress、
20//!   manifest を S3 metadata に書いて backend に forward
21//! - GET: backend から取得 → metadata から manifest を復元 → registry.decompress で
22//!   manifest 指定の codec で解凍 → 元の bytes を return
23//!
24//! ## 既知の制限事項
25//!
26//! - **Multipart Upload は per-part 圧縮が未実装**: 現状は upload_part を素通し。
27//!   Phase 1 月 2 後半で per-part compress + complete_multipart_upload で manifest 集約。
28//! - **PUT body は memory に collect**: max_body_bytes 上限あり (default 5 GiB = S3 単発 PUT 上限)。
29//!   Streaming-aware 圧縮は Phase 2。
30
31use std::sync::Arc;
32
33use base64::Engine as _;
34use bytes::BytesMut;
35use s3s::dto::*;
36use s3s::{S3, S3Error, S3ErrorCode, S3Request, S3Response, S3Result};
37use s4_codec::index::{FrameIndex, build_index_from_body, decode_index, encode_index, sidecar_key};
38use s4_codec::multipart::{
39    FRAME_HEADER_BYTES, FrameHeader, FrameIter, S3_MULTIPART_MIN_PART_BYTES, pad_to_minimum,
40    write_frame,
41};
42use s4_codec::{ChunkManifest, CodecDispatcher, CodecKind, CodecRegistry, CompressTelemetry};
43use std::time::Instant;
44use tracing::{debug, info};
45
46use crate::blob::{
47    bytes_to_blob, chain_sample_with_rest, collect_blob, collect_with_sample, peek_sample,
48};
49use crate::streaming::{
50    cpu_zstd_decompress_stream, pick_chunk_size, streaming_compress_to_frames,
51    supports_streaming_compress, supports_streaming_decompress,
52};
53
54/// PUT body の先頭 sampling で渡す最大 byte 数。
55const SAMPLE_BYTES: usize = 4096;
56
57/// v0.8 #55: stamp the GPU pipeline metrics (`s4_gpu_compress_seconds`,
58/// `s4_gpu_throughput_bytes_per_sec`, `s4_gpu_oom_total`) from a
59/// `CompressTelemetry` returned by `CodecRegistry::compress_with_telemetry`.
60/// CPU codecs (`gpu_seconds = None`) are no-ops here — they're already
61/// covered by the existing `s4_request_latency_seconds` / `s4_bytes_*`
62/// counters in the request-level `record_put` / `record_get` calls.
63#[inline]
64fn stamp_gpu_compress_telemetry(tel: &CompressTelemetry) {
65    if let Some(secs) = tel.gpu_seconds {
66        crate::metrics::record_gpu_compress(tel.codec, secs, tel.bytes_in, tel.bytes_out);
67    }
68    if tel.oom {
69        crate::metrics::record_gpu_oom(tel.codec);
70    }
71}
72
73/// v0.7 #49: percent-encoding set covering everything that is **not** an
74/// `unreserved` character per RFC 3986 §2.3, **plus** we additionally
75/// encode the path-reserved sub-delims that `http::Uri` rejects in a
76/// path segment (`?`, `#`, `%`, control bytes, space, etc.). We
77/// deliberately keep `/` un-encoded because S3 keys legally use `/` as
78/// a logical separator and the rest of the synthetic URI relies on the
79/// path layout `/{bucket}/{key}` round-tripping byte-for-byte.
80const URI_KEY_ENCODE_SET: &percent_encoding::AsciiSet = &percent_encoding::CONTROLS
81    .add(b' ')
82    .add(b'"')
83    .add(b'#')
84    .add(b'<')
85    .add(b'>')
86    .add(b'?')
87    .add(b'`')
88    .add(b'{')
89    .add(b'}')
90    .add(b'|')
91    .add(b'\\')
92    .add(b'^')
93    .add(b'[')
94    .add(b']')
95    .add(b'%');
96
97/// v0.7 #49: build the synthetic `/{bucket}/{key}` request URI used by
98/// the sidecar / replication helpers when they re-enter the backend
99/// trait without going through the HTTP layer. S3 object keys can
100/// contain spaces, control bytes, and arbitrary Unicode that would
101/// make `format!(...).parse::<http::Uri>()` panic; we percent-encode
102/// the key bytes (RFC 3986 path segment) and the bucket name (defensive
103/// — bucket names are normally DNS-safe, but the helper is the single
104/// choke-point) before splicing them in. If the encoded form *still*
105/// fails to parse (extremely unlikely once everything outside the
106/// unreserved set is escaped) we surface a typed `400 InvalidObjectName`
107/// instead of crashing the worker.
108pub(crate) fn safe_object_uri(bucket: &str, key: &str) -> S3Result<http::Uri> {
109    use percent_encoding::utf8_percent_encode;
110    let bucket_enc = utf8_percent_encode(bucket, URI_KEY_ENCODE_SET);
111    let key_enc = utf8_percent_encode(key, URI_KEY_ENCODE_SET);
112    let raw = format!("/{bucket_enc}/{key_enc}");
113    raw.parse::<http::Uri>().map_err(|e| {
114        // S3 spec uses `InvalidObjectName` (HTTP 400) for keys that
115        // can't be represented in a request URI. The generated
116        // `S3ErrorCode` enum doesn't expose a typed variant for it,
117        // so we round-trip through `from_bytes` which preserves the
118        // canonical wire string while falling back to InvalidArgument
119        // if even that lookup fails (cannot happen at runtime — kept
120        // as a belt-and-suspenders branch so this helper never
121        // panics).
122        let code = S3ErrorCode::from_bytes(b"InvalidObjectName")
123            .unwrap_or(S3ErrorCode::InvalidArgument);
124        S3Error::with_message(
125            code,
126            format!("object key cannot be encoded as a request URI: {e}"),
127        )
128    })
129}
130
131/// v0.4 #20: captured at the start of a handler, before the request is
132/// consumed by the backend call, so the matching `record_access` at
133/// end-of-request can fill in the structured access log entry.
134struct AccessLogPreamble {
135    remote_ip: Option<String>,
136    requester: Option<String>,
137    request_uri: String,
138    user_agent: Option<String>,
139}
140
141pub struct S4Service<B: S3> {
142    /// Wrapped in `Arc` so the v0.6 #40 cross-bucket replication
143    /// dispatcher can clone it into a detached `tokio::spawn` task
144    /// (Arc::clone is cheap; backend trait methods take `&self` so no
145    /// other handler is affected by the indirection).
146    backend: Arc<B>,
147    registry: Arc<CodecRegistry>,
148    dispatcher: Arc<dyn CodecDispatcher>,
149    max_body_bytes: usize,
150    policy: Option<crate::policy::SharedPolicy>,
151    /// v0.3 #13: surfaced as the `aws:SecureTransport` Condition key. Set
152    /// to `true` when the listener is wrapped in TLS (or ACME), so policies
153    /// gating "deny if not over TLS" can do their job. Defaults to `false`
154    /// (HTTP); set via [`S4Service::with_secure_transport`] at boot.
155    secure_transport: bool,
156    /// v0.4 #19: optional per-(principal, bucket) token-bucket limiter.
157    rate_limits: Option<crate::rate_limit::SharedRateLimits>,
158    /// v0.4 #20: optional S3-style access log emitter.
159    access_log: Option<crate::access_log::SharedAccessLog>,
160    /// v0.4 #21 / v0.5 #29: optional server-side encryption keyring
161    /// (AES-256-GCM). When set, every PUT body gets wrapped in S4E2
162    /// (with the keyring's active key id) after the compress + framing
163    /// steps; every GET that sniffs as S4E1/S4E2 is decrypted before
164    /// frame parsing. A `with_sse_key(...)` call wraps the supplied
165    /// key in a 1-slot keyring so single-key (v0.4) operators get the
166    /// same behaviour they had before, just on the v2 frame.
167    sse_keyring: Option<crate::sse::SharedSseKeyring>,
168    /// v0.5 #34: optional first-class versioning state machine. When
169    /// `Some(...)`, S4-server itself owns the per-bucket versioning
170    /// state + per-(bucket, key) version chain; PUT / GET / DELETE /
171    /// list_object_versions / get_bucket_versioning /
172    /// put_bucket_versioning handlers consult the manager instead of
173    /// passing through. When `None` (default), the legacy
174    /// backend-passthrough behaviour applies so existing v0.4
175    /// deployments are unaffected until they explicitly call
176    /// `with_versioning(...)`.
177    versioning: Option<Arc<crate::versioning::VersioningManager>>,
178    /// v0.5 #28: optional SSE-KMS envelope-encryption backend. When
179    /// `Some(...)`, PUTs carrying `x-amz-server-side-encryption: aws:kms`
180    /// generate a fresh DEK via the backend, encrypt the body with it
181    /// (S4E4 frame), and persist only the wrapped DEK. GETs sniffing as
182    /// S4E4 unwrap the DEK through the same backend before decrypt.
183    /// `kms_default_key_id` is used when the request omits an explicit
184    /// `x-amz-server-side-encryption-aws-kms-key-id` (mirrors AWS S3
185    /// bucket-default behaviour).
186    kms: Option<Arc<dyn crate::kms::KmsBackend>>,
187    kms_default_key_id: Option<String>,
188    /// v0.5 #30: optional Object Lock (WORM) enforcement layer. When
189    /// `Some(...)`, `delete_object` and overwrite-style `put_object`
190    /// consult the manager and refuse the operation with HTTP 403
191    /// `AccessDenied` while the object is locked (Compliance until
192    /// expiry, Governance unless the bypass header is set, or any time
193    /// a legal hold is on). PUT also auto-applies the bucket-default
194    /// retention to brand-new objects when configured. When `None`
195    /// (default), the legacy backend-passthrough behaviour applies, so
196    /// existing v0.4 deployments are unaffected until they explicitly
197    /// call `with_object_lock(...)`.
198    object_lock: Option<Arc<crate::object_lock::ObjectLockManager>>,
199    /// v0.6 #38: optional first-class CORS bucket configuration manager.
200    /// When `Some(...)`, S4-server itself owns per-bucket CORS rules and
201    /// `put_bucket_cors` / `get_bucket_cors` / `delete_bucket_cors`
202    /// consult the manager instead of passing through to the backend.
203    /// `handle_preflight` (public method on `S4Service`) routes OPTIONS-
204    /// style preflight matching through the same store; the actual HTTP
205    /// OPTIONS routing wire-up at the listener level is a follow-up
206    /// (s3s framework does not surface OPTIONS as a typed handler).
207    cors: Option<Arc<crate::cors::CorsManager>>,
208    /// v0.6 #36: optional first-class S3 Inventory manager. When
209    /// `Some(...)`, S4-server itself owns per-(bucket, id) inventory
210    /// configurations and `put_bucket_inventory_configuration` /
211    /// `get_bucket_inventory_configuration` /
212    /// `list_bucket_inventory_configurations` /
213    /// `delete_bucket_inventory_configuration` consult the manager
214    /// instead of passing through to the backend. The actual periodic
215    /// CSV emission is driven by a tokio task in `main.rs` that calls
216    /// `InventoryManager::run_once_for_test` on a fixed cadence; the
217    /// service handlers below only deal with config-level CRUD.
218    inventory: Option<Arc<crate::inventory::InventoryManager>>,
219    /// v0.6 #35: optional first-class S3 bucket-notification manager.
220    /// When `Some(...)`, S4-server itself owns per-bucket notification
221    /// configurations and `put_bucket_notification_configuration` /
222    /// `get_bucket_notification_configuration` consult the manager
223    /// instead of passing through to the backend. Successful PUT /
224    /// DELETE handlers fire matching destinations on a detached tokio
225    /// task (best-effort; see `crate::notifications::dispatch_event`).
226    notifications: Option<Arc<crate::notifications::NotificationManager>>,
227    /// v0.6 #37: optional first-class S3 Lifecycle configuration
228    /// manager. When `Some(...)`, S4-server itself owns per-bucket
229    /// lifecycle rules and `put_bucket_lifecycle_configuration` /
230    /// `get_bucket_lifecycle_configuration` /
231    /// `delete_bucket_lifecycle` consult the manager instead of
232    /// passing through to the backend. The actual background scanner
233    /// (list_objects_v2 -> evaluate -> delete / metadata-rewrite per
234    /// rule) is a v0.7+ follow-up; the test path
235    /// `S4Service::run_lifecycle_once_for_test` exercises the
236    /// evaluator end-to-end so this v0.6 #37 wiring is enough to ship
237    /// the configuration-management half without putting a
238    /// half-wired bucket-walk in front of users.
239    lifecycle: Option<Arc<crate::lifecycle::LifecycleManager>>,
240    /// v0.6 #39: optional first-class object + bucket Tagging manager.
241    /// When `Some(...)`, S4-server itself owns per-(bucket, key) and
242    /// per-bucket tag state — `PutObjectTagging` /
243    /// `GetObjectTagging` / `DeleteObjectTagging` /
244    /// `PutBucketTagging` / `GetBucketTagging` /
245    /// `DeleteBucketTagging` route through the manager (replacing the
246    /// previous backend-passthrough behaviour). `put_object` also
247    /// pre-parses the `x-amz-tagging` header / `Tagging` input field
248    /// so the IAM policy evaluator can gate on
249    /// `s3:RequestObjectTag/<key>` and `s3:ExistingObjectTag/<key>`.
250    /// On a successful PUT the parsed tags are persisted; on a
251    /// successful DELETE the matching tag entry is dropped.
252    tagging: Option<Arc<crate::tagging::TagManager>>,
253    /// v0.6 #40: optional first-class cross-bucket replication manager.
254    /// When `Some(...)`, S4-server itself owns per-bucket replication
255    /// rules; `PutBucketReplication` / `GetBucketReplication` /
256    /// `DeleteBucketReplication` route through the manager (replacing
257    /// the previous backend-passthrough behaviour). On every successful
258    /// `put_object` the manager's rule list is consulted; the
259    /// highest-priority matching enabled rule wins, the per-key status
260    /// is recorded as `Pending`, and the source body and metadata are
261    /// handed to a detached tokio task that PUTs to the destination
262    /// bucket through the same backend. The replica is stamped with
263    /// `x-amz-replication-status: REPLICA` in its metadata; the
264    /// source-side status is updated to `Completed` on success or
265    /// `Failed` after the 3-attempt retry budget is exhausted (drop
266    /// counter bumps in either-side case so dashboards see the loss).
267    /// `head_object` / `get_object` echo the recorded status back as
268    /// `x-amz-replication-status` so consumers can poll progress.
269    /// Limited to single-instance (same `S4Service`) replication; true
270    /// cross-region (multi-instance) is a v0.7+ follow-up.
271    replication: Option<Arc<crate::replication::ReplicationManager>>,
272    /// v0.6 #42: optional MFA-Delete enforcement layer. When `Some(...)`,
273    /// every DELETE / DELETE-version / delete-marker / `PutBucketVersioning`
274    /// request against a bucket whose MFA-Delete state is `Enabled`
275    /// must carry `x-amz-mfa: <serial> <code>` (RFC 6238 6-digit TOTP);
276    /// missing or invalid tokens return HTTP 403 `AccessDenied`. When
277    /// `None` (default), the gate is a no-op so existing v0.4 / v0.5
278    /// deployments are unaffected until they explicitly call
279    /// `with_mfa_delete(...)`.
280    mfa_delete: Option<Arc<crate::mfa::MfaDeleteManager>>,
281    /// v0.5 #32: when `true`, every PUT must carry an SSE indicator
282    /// (`x-amz-server-side-encryption`, the SSE-C customer-key headers,
283    /// or be matched against a configured server-managed keyring/KMS).
284    /// Set by `--compliance-mode strict` after the boot-time
285    /// prerequisite check passes.
286    compliance_strict: bool,
287    /// v0.7 #47: optional SigV4a (asymmetric ECDSA-P256-SHA256) verify
288    /// gate. When `Some(...)`, the listener-side middleware (see
289    /// [`crate::routing::try_sigv4a_verify`]) inspects every incoming
290    /// request and short-circuits SigV4a-signed ones — verifying the
291    /// signature against the credential store and returning 403
292    /// `SignatureDoesNotMatch` / `InvalidAccessKeyId` on failure. Plain
293    /// SigV4 (HMAC-SHA256) requests pass through to s3s untouched. When
294    /// `None`, the middleware is a no-op so the existing SigV4 path is
295    /// unaffected (operators opt in via `--sigv4a-credentials <DIR>`).
296    sigv4a_gate: Option<Arc<SigV4aGate>>,
297    /// v0.8 #54 BUG-5..10: per-`upload_id` side-table that ferries the
298    /// SSE / Tagging / Object-Lock context captured at
299    /// `CreateMultipartUpload` time through to `UploadPart` /
300    /// `CompleteMultipartUpload`. Always-on (no `with_*` flag) — the
301    /// store is gateway-internal and idle when no multipart is in
302    /// flight. See [`crate::multipart_state`] for rationale.
303    multipart_state: Arc<crate::multipart_state::MultipartStateStore>,
304    /// v0.8 #52: plaintext bytes per S4E5 chunk on the SSE-S4 PUT
305    /// path. `0` (default) → use the legacy buffered S4E2 path
306    /// (whole-body AES-GCM tag, GET buffers + verifies before
307    /// emitting). Non-zero → use the chunked S4E5 frame so GET can
308    /// stream-decrypt chunk-by-chunk. Wired by `--sse-chunk-size`
309    /// in `main.rs`. SSE-C and SSE-KMS are intentionally unaffected
310    /// (chunked variants tracked in a follow-up issue).
311    sse_chunk_size: usize,
312}
313
314impl<B: S3> S4Service<B> {
315    /// AWS S3 単発 PUT の API 上限 (5 GiB)
316    pub const DEFAULT_MAX_BODY_BYTES: usize = 5 * 1024 * 1024 * 1024;
317
318    pub fn new(
319        backend: B,
320        registry: Arc<CodecRegistry>,
321        dispatcher: Arc<dyn CodecDispatcher>,
322    ) -> Self {
323        Self {
324            backend: Arc::new(backend),
325            registry,
326            dispatcher,
327            max_body_bytes: Self::DEFAULT_MAX_BODY_BYTES,
328            policy: None,
329            secure_transport: false,
330            rate_limits: None,
331            access_log: None,
332            sse_keyring: None,
333            versioning: None,
334            kms: None,
335            kms_default_key_id: None,
336            object_lock: None,
337            cors: None,
338            inventory: None,
339            notifications: None,
340            lifecycle: None,
341            tagging: None,
342            replication: None,
343            mfa_delete: None,
344            compliance_strict: false,
345            sigv4a_gate: None,
346            multipart_state: Arc::new(crate::multipart_state::MultipartStateStore::new()),
347            // v0.8 #52: chunked SSE-S4 disabled by default — opt
348            // in via `S4Service::with_sse_chunk_size(...)` /
349            // `--sse-chunk-size <BYTES>`. Default keeps the legacy
350            // S4E2 buffered path so existing deployments are
351            // bit-for-bit unchanged.
352            sse_chunk_size: 0,
353        }
354    }
355
356    /// v0.7 #47: attach the SigV4a verify gate. Once set, the
357    /// listener-side middleware (`crate::routing::try_sigv4a_verify`)
358    /// short-circuits any incoming `AWS4-ECDSA-P256-SHA256` request,
359    /// verifying it against the supplied credential store and
360    /// returning 403 on failure. Plain SigV4 (HMAC-SHA256) requests
361    /// are unaffected. When the gate is unset (default), the
362    /// middleware skips entirely so existing SigV4 deployments keep
363    /// working.
364    #[must_use]
365    pub fn with_sigv4a_gate(mut self, gate: Arc<SigV4aGate>) -> Self {
366        self.sigv4a_gate = Some(gate);
367        self
368    }
369
370    /// v0.7 #47: borrow the attached SigV4a gate. Used by `main.rs`
371    /// to snapshot the gate `Arc` before the s3s `ServiceBuilder`
372    /// consumes the `S4Service` (the listener-side middleware needs
373    /// the same `Arc` because s3s' SigV4 verifier rejects SigV4a
374    /// algorithm tokens with "unknown algorithm" — match has to
375    /// happen at the hyper layer instead).
376    #[must_use]
377    pub fn sigv4a_gate(&self) -> Option<&Arc<SigV4aGate>> {
378        self.sigv4a_gate.as_ref()
379    }
380
381    /// v0.8.2 #62: borrow the multipart state store so `main.rs` can
382    /// snapshot the `Arc` before the s3s `ServiceBuilder` consumes
383    /// the `S4Service`. The background `sweep_stale` task in `main.rs`
384    /// holds this `Arc` and ticks once an hour to drop abandoned
385    /// upload contexts (and their `Zeroizing<[u8; 32]>` SSE-C keys).
386    #[must_use]
387    pub fn multipart_state(&self) -> &Arc<crate::multipart_state::MultipartStateStore> {
388        &self.multipart_state
389    }
390
391    /// v0.6 #39: attach the in-memory object + bucket Tagging manager.
392    /// Once set, `Put/Get/Delete` `Object/Bucket Tagging` route
393    /// through the manager (instead of forwarding to the backend),
394    /// and `put_object`'s `x-amz-tagging` parse path becomes the
395    /// source of `s3:RequestObjectTag/<key>` for the IAM policy
396    /// evaluator. The manager itself is shared via `Arc`.
397    #[must_use]
398    pub fn with_tagging(mut self, mgr: Arc<crate::tagging::TagManager>) -> Self {
399        self.tagging = Some(mgr);
400        self
401    }
402
403    /// v0.6 #39: borrow the attached tagging manager (test /
404    /// introspection — the snapshotter in `main.rs`, when wired,
405    /// will keep its own `Arc` clone).
406    #[must_use]
407    pub fn tag_manager(&self) -> Option<&Arc<crate::tagging::TagManager>> {
408        self.tagging.as_ref()
409    }
410
411    /// v0.6 #36: attach the in-memory S3 Inventory manager. Once set,
412    /// `put_bucket_inventory_configuration` /
413    /// `get_bucket_inventory_configuration` /
414    /// `list_bucket_inventory_configurations` /
415    /// `delete_bucket_inventory_configuration` route through the
416    /// manager. The actual periodic CSV / manifest emission is
417    /// orchestrated by a tokio task started in `main.rs`; the manager
418    /// itself is shared between the handler and the scheduler via
419    /// `Arc`.
420    #[must_use]
421    pub fn with_inventory(mut self, mgr: Arc<crate::inventory::InventoryManager>) -> Self {
422        self.inventory = Some(mgr);
423        self
424    }
425
426    /// v0.6 #36: borrow the attached inventory manager (test /
427    /// introspection — the background scheduler in `main.rs` keeps its
428    /// own `Arc` clone, so this accessor is for the test path that
429    /// invokes `run_once_for_test` directly).
430    #[must_use]
431    pub fn inventory_manager(&self) -> Option<&Arc<crate::inventory::InventoryManager>> {
432        self.inventory.as_ref()
433    }
434
435    /// v0.6 #37: attach the in-memory S3 Lifecycle configuration
436    /// manager. Once set, `put_bucket_lifecycle_configuration` /
437    /// `get_bucket_lifecycle_configuration` / `delete_bucket_lifecycle`
438    /// route through the manager (replacing the previous backend-
439    /// passthrough behaviour). The actual periodic scanner that walks
440    /// the source bucket and invokes Expiration / Transition /
441    /// NoncurrentExpiration actions is a v0.7+ follow-up — see
442    /// [`Self::run_lifecycle_once_for_test`] for the in-memory test
443    /// path that exercises the evaluator end-to-end.
444    #[must_use]
445    pub fn with_lifecycle(mut self, mgr: Arc<crate::lifecycle::LifecycleManager>) -> Self {
446        self.lifecycle = Some(mgr);
447        self
448    }
449
450    /// v0.6 #37: borrow the attached lifecycle manager (test /
451    /// introspection — the background scheduler in `main.rs` keeps its
452    /// own `Arc` clone, so this accessor is for the test path that
453    /// invokes the evaluator directly).
454    #[must_use]
455    pub fn lifecycle_manager(&self) -> Option<&Arc<crate::lifecycle::LifecycleManager>> {
456        self.lifecycle.as_ref()
457    }
458
459    /// v0.6 #37: synchronous test entry that runs the lifecycle evaluator
460    /// against a caller-provided list of `(key, age, size, tags)` tuples
461    /// and returns the `(key, action)` pairs that should fire. The actual
462    /// backend invocation (S3.delete_object / metadata rewrite) is left
463    /// to the caller — the unit + E2E tests use this to verify the
464    /// evaluator without spawning the (deferred) background scanner.
465    /// Returns an empty `Vec` when no lifecycle manager is attached or
466    /// no rule matches.
467    #[must_use]
468    pub fn run_lifecycle_once_for_test(
469        &self,
470        bucket: &str,
471        objects: &[crate::lifecycle::EvaluateBatchEntry],
472    ) -> Vec<(String, crate::lifecycle::LifecycleAction)> {
473        let Some(mgr) = self.lifecycle.as_ref() else {
474            return Vec::new();
475        };
476        crate::lifecycle::evaluate_batch(mgr, bucket, objects)
477    }
478
479    /// v0.6 #35: attach the in-memory bucket-notification manager. Once
480    /// set, `put_bucket_notification_configuration` /
481    /// `get_bucket_notification_configuration` route through the manager
482    /// (replacing the previous backend-passthrough behaviour); successful
483    /// `put_object` / `delete_object` calls fire matching destinations
484    /// on a detached tokio task via
485    /// `crate::notifications::dispatch_event` (best-effort, fire-and-
486    /// forget — failures bump the manager's `dropped_total` counter and
487    /// log at warn but do NOT fail the originating S3 request).
488    #[must_use]
489    pub fn with_notifications(
490        mut self,
491        mgr: Arc<crate::notifications::NotificationManager>,
492    ) -> Self {
493        self.notifications = Some(mgr);
494        self
495    }
496
497    /// v0.6 #35: borrow the attached notifications manager (test /
498    /// introspection — used by the metrics layer to read
499    /// `dropped_total`).
500    #[must_use]
501    pub fn notifications_manager(
502        &self,
503    ) -> Option<&Arc<crate::notifications::NotificationManager>> {
504        self.notifications.as_ref()
505    }
506
507    /// v0.6 #35: internal helper used by the DELETE handlers to fire a
508    /// matching notification on a detached tokio task. No-op when no
509    /// manager is attached or no rule on the bucket matches the given
510    /// (event, key) tuple.
511    fn fire_delete_notification(
512        &self,
513        bucket: &str,
514        key: &str,
515        event: crate::notifications::EventType,
516        version_id: Option<String>,
517    ) {
518        let Some(mgr) = self.notifications.as_ref() else {
519            return;
520        };
521        let dests = mgr.match_destinations(bucket, &event, key);
522        if dests.is_empty() {
523            return;
524        }
525        tokio::spawn(crate::notifications::dispatch_event(
526            Arc::clone(mgr),
527            bucket.to_owned(),
528            key.to_owned(),
529            event,
530            None,
531            None,
532            version_id,
533            format!("S4-{}", uuid::Uuid::new_v4()),
534        ));
535    }
536
537    /// v0.6 #40: attach the in-memory cross-bucket replication manager.
538    /// Once set, `put_bucket_replication` / `get_bucket_replication` /
539    /// `delete_bucket_replication` route through the manager (replacing
540    /// the previous backend-passthrough behaviour); a successful
541    /// `put_object` whose key matches an enabled rule fires a detached
542    /// tokio task that PUTs the same body + metadata to the rule's
543    /// destination bucket, stamping the replica with
544    /// `x-amz-replication-status: REPLICA`. Failures after the retry
545    /// budget bump the manager's `dropped_total` counter and are
546    /// surfaced in the `s4_replication_dropped_total` Prometheus
547    /// counter; successes bump `s4_replication_replicated_total`.
548    #[must_use]
549    pub fn with_replication(
550        mut self,
551        mgr: Arc<crate::replication::ReplicationManager>,
552    ) -> Self {
553        self.replication = Some(mgr);
554        self
555    }
556
557    /// v0.6 #40: borrow the attached replication manager (test /
558    /// introspection — used by the metrics layer to read
559    /// `dropped_total`).
560    #[must_use]
561    pub fn replication_manager(
562        &self,
563    ) -> Option<&Arc<crate::replication::ReplicationManager>> {
564        self.replication.as_ref()
565    }
566
567    /// v0.6 #40: internal helper used by the PUT handlers to fire a
568    /// detached cross-bucket replication task. No-op when no manager
569    /// is attached, the source backend PUT failed, or no rule on the
570    /// source bucket matches the (key, tags) tuple. The `body` is the
571    /// post-compression / post-encryption `Bytes` that was sent to
572    /// the source backend (refcount-cloned), and `metadata` is the
573    /// metadata map that already includes the manifest /
574    /// `s4-encrypted` markers — the replica decodes through the same
575    /// path. The destination PUT runs through `Arc<B>::put_object`.
576    ///
577    /// ## v0.8.2 #61: generation token + shadow-key destination
578    ///
579    /// `pending_version` is the source-side `PutOutcome` minted by the
580    /// caller's versioning branch (or `None` for unversioned /
581    /// suspended buckets). When `pending_version.versioned_response`
582    /// is `true`, the dispatcher writes the destination under the same
583    /// shadow path the source uses (`<key>.__s4ver__/<vid>`) so the
584    /// destination's version chain receives the new version the same
585    /// way `?versionId=` GET resolves it. Closes audit C-1.
586    ///
587    /// The dispatcher also mints a fresh `generation` token before
588    /// spawning, threaded through to [`crate::replication::
589    /// replicate_object`]. Closes audit C-3 — a stale retry of an
590    /// older PUT can no longer overwrite the destination's newer bytes
591    /// because the CAS guard sees the higher stored generation and
592    /// drops its destination write.
593    ///
594    /// ## Asymmetric versioning policy (out of scope)
595    ///
596    /// We assume source + destination buckets share the same
597    /// versioning policy (both Enabled or both Suspended /
598    /// Unversioned). Cross-bucket policy queries would require a
599    /// backend round-trip per replication, which is not worth it for
600    /// the single-instance scope. Operators who configure asymmetric
601    /// versioning will see destination-side `?versionId=` lookups
602    /// miss — documented as out-of-scope until a future per-rule
603    /// `destination_versioning_policy` knob lands.
604    // 8 args is the post-#61 shape: replication needs the
605    // source bucket+key, the canonical tag set for rule-matching,
606    // the post-codec body+metadata for the destination PUT, the
607    // backend-success gate, and the pending version-id for the
608    // shadow-key destination override. A shape struct would just
609    // split the (single) call site so opt for the inline form.
610    #[allow(clippy::too_many_arguments)]
611    fn spawn_replication_if_matched(
612        &self,
613        source_bucket: &str,
614        source_key: &str,
615        request_tags: &Option<crate::tagging::TagSet>,
616        body: &bytes::Bytes,
617        metadata: &Option<std::collections::HashMap<String, String>>,
618        backend_ok: bool,
619        pending_version: Option<&crate::versioning::PutOutcome>,
620    ) where
621        B: Send + Sync + 'static,
622    {
623        if !backend_ok {
624            return;
625        }
626        let Some(mgr) = self.replication.as_ref() else {
627            return;
628        };
629        // Pull the request's tags into the (k, v) shape the matcher
630        // expects. The tagging manager would have the canonical
631        // post-PUT view but at this point in the pipeline it's
632        // already been written above; for the rule-match decision
633        // the request's tags are sufficient (= the tags this PUT
634        // applies, S3 PutObject is full-replace on tags).
635        let object_tags: Vec<(String, String)> = request_tags
636            .as_ref()
637            .map(|ts| ts.iter().cloned().collect())
638            .unwrap_or_default();
639        let Some(rule) = mgr.match_rule(source_bucket, source_key, &object_tags) else {
640            return;
641        };
642        // v0.8.2 #61: mint the per-PUT generation BEFORE the eager
643        // Pending stamp so the stamp itself carries the right
644        // generation (the CAS in `record_status_if_newer` would
645        // otherwise see a `generation=0` Pending and accept any
646        // stale retry).
647        let generation = mgr.next_generation();
648        // Eagerly mark the source key as Pending so a HEAD between
649        // the source PUT returning and the spawned task completing
650        // surfaces the in-flight state. CAS-guarded so a slower
651        // older PUT can't downgrade a newer Completed back to Pending.
652        let _ = mgr.record_status_if_newer(
653            source_bucket,
654            source_key,
655            generation,
656            crate::replication::ReplicationStatus::Pending,
657        );
658        // v0.8.2 #61: derive the destination storage key. For a
659        // versioning-Enabled source the destination receives the
660        // same shadow-key path so a `?versionId=<vid>` GET on the
661        // destination resolves through the same lookup the source
662        // uses. Suspended / Unversioned sources keep the logical
663        // key (= `None` override = dispatcher uses `source_key`).
664        let destination_key_override = pending_version
665            .filter(|pv| pv.versioned_response)
666            .map(|pv| versioned_shadow_key(source_key, &pv.version_id));
667        let mgr_cl = Arc::clone(mgr);
668        let backend = Arc::clone(&self.backend);
669        let body_cl = body.clone();
670        let metadata_cl = metadata.clone();
671        let source_bucket_cl = source_bucket.to_owned();
672        let source_key_cl = source_key.to_owned();
673        tokio::spawn(async move {
674            let do_put = move |dest_bucket: String,
675                               dest_key: String,
676                               dest_body: bytes::Bytes,
677                               dest_meta: Option<std::collections::HashMap<String, String>>| {
678                let backend = Arc::clone(&backend);
679                async move {
680                    let req = S3Request {
681                        input: PutObjectInput {
682                            bucket: dest_bucket,
683                            key: dest_key,
684                            body: Some(bytes_to_blob(dest_body)),
685                            metadata: dest_meta,
686                            ..Default::default()
687                        },
688                        method: http::Method::PUT,
689                        uri: "/".parse().unwrap(),
690                        headers: http::HeaderMap::new(),
691                        extensions: http::Extensions::new(),
692                        credentials: None,
693                        region: None,
694                        service: None,
695                        trailing_headers: None,
696                    };
697                    backend
698                        .put_object(req)
699                        .await
700                        .map(|_| ())
701                        .map_err(|e| format!("destination put_object: {e}"))
702                }
703            };
704            crate::replication::replicate_object(
705                rule,
706                source_bucket_cl,
707                source_key_cl,
708                body_cl,
709                metadata_cl,
710                do_put,
711                mgr_cl,
712                generation,
713                destination_key_override,
714            )
715            .await;
716        });
717    }
718
719    /// v0.6 #42: attach the in-memory MFA-Delete enforcement manager.
720    /// Once set, every DELETE / DELETE-version / delete-marker /
721    /// `PutBucketVersioning` request against a bucket whose MFA-Delete
722    /// state is `Enabled` requires a valid `x-amz-mfa: <serial> <code>`
723    /// header (RFC 6238 6-digit TOTP); the gate is a no-op for buckets
724    /// where MFA-Delete is `Disabled` (S3 default).
725    #[must_use]
726    pub fn with_mfa_delete(mut self, mgr: Arc<crate::mfa::MfaDeleteManager>) -> Self {
727        self.mfa_delete = Some(mgr);
728        self
729    }
730
731    /// v0.6 #42: borrow the attached MFA-Delete manager (test /
732    /// introspection — used by the snapshot path in `main.rs` to call
733    /// `to_json` for restart-recoverable state).
734    #[must_use]
735    pub fn mfa_delete_manager(&self) -> Option<&Arc<crate::mfa::MfaDeleteManager>> {
736        self.mfa_delete.as_ref()
737    }
738
739    /// v0.6 #38: attach the in-memory CORS configuration manager. Once
740    /// set, `put_bucket_cors` / `get_bucket_cors` / `delete_bucket_cors`
741    /// route through the manager instead of forwarding to the backend,
742    /// and [`Self::handle_preflight`] becomes useful for the (future)
743    /// listener-side OPTIONS interceptor.
744    #[must_use]
745    pub fn with_cors(mut self, mgr: Arc<crate::cors::CorsManager>) -> Self {
746        self.cors = Some(mgr);
747        self
748    }
749
750    /// v0.6 #38: Borrow the attached CORS manager (test / introspection).
751    #[must_use]
752    pub fn cors_manager(&self) -> Option<&Arc<crate::cors::CorsManager>> {
753        self.cors.as_ref()
754    }
755
756    /// v0.6 #38: evaluate a CORS preflight request against the bucket's
757    /// configured rules and, if a rule matches, return the headers that
758    /// the (future) listener-side OPTIONS interceptor must put on the
759    /// 200 response: `Access-Control-Allow-Origin`, `Access-Control-
760    /// Allow-Methods`, `Access-Control-Allow-Headers`, optionally
761    /// `Access-Control-Max-Age` and `Access-Control-Expose-Headers`.
762    ///
763    /// Returns `None` when no manager is attached, no config is
764    /// registered for the bucket, or no rule matches the (origin,
765    /// method, headers) triple. The caller is responsible for turning
766    /// `None` into the appropriate 403 response.
767    ///
768    /// **Note:** the OPTIONS routing itself (i.e. wiring this method
769    /// into the hyper-util listener path) is a follow-up — s3s does not
770    /// surface OPTIONS as a typed S3 handler, so this method is
771    /// currently call-able only from inside other handlers and tests.
772    #[must_use]
773    pub fn handle_preflight(
774        &self,
775        bucket: &str,
776        origin: &str,
777        method: &str,
778        request_headers: &[String],
779    ) -> Option<std::collections::HashMap<String, String>> {
780        let mgr = self.cors.as_ref()?;
781        let rule = mgr.match_preflight(bucket, origin, method, request_headers)?;
782        let mut h = std::collections::HashMap::new();
783        // Echo the matched origin back. If the rule used "*" we still
784        // echo "*" (S3 spec — the spec does not require us to echo the
785        // *requesting* origin when the wildcard matched).
786        let allow_origin = if rule.allowed_origins.iter().any(|o| o == "*") {
787            "*".to_string()
788        } else {
789            origin.to_string()
790        };
791        h.insert("Access-Control-Allow-Origin".to_string(), allow_origin);
792        h.insert(
793            "Access-Control-Allow-Methods".to_string(),
794            rule.allowed_methods.join(", "),
795        );
796        if !rule.allowed_headers.is_empty() {
797            // For the Allow-Headers response, echo back the rule's
798            // pattern list verbatim (S3 echoes the configured list,
799            // including "*" if present). Browsers honour exact-match
800            // rules.
801            h.insert(
802                "Access-Control-Allow-Headers".to_string(),
803                rule.allowed_headers.join(", "),
804            );
805        }
806        if let Some(secs) = rule.max_age_seconds {
807            h.insert("Access-Control-Max-Age".to_string(), secs.to_string());
808        }
809        if !rule.expose_headers.is_empty() {
810            h.insert(
811                "Access-Control-Expose-Headers".to_string(),
812                rule.expose_headers.join(", "),
813            );
814        }
815        Some(h)
816    }
817
818    /// v0.5 #32: enable strict compliance mode. Every PUT must carry an
819    /// SSE indicator (server-side encryption header or SSE-C customer
820    /// key); requests without one are rejected with 400 InvalidRequest.
821    /// Boot-time prerequisite checking lives in the binary
822    /// (`validate_compliance_mode`) so this flag is purely the runtime
823    /// switch.
824    #[must_use]
825    pub fn with_compliance_strict(mut self, on: bool) -> Self {
826        self.compliance_strict = on;
827        self
828    }
829
830    /// v0.5 #30: attach the in-memory Object Lock (WORM) enforcement
831    /// manager. Once set, `delete_object` and overwrite-path
832    /// `put_object` refuse operations on locked keys with HTTP 403
833    /// `AccessDenied`; new PUTs to a bucket with a default retention
834    /// policy auto-create per-object lock state.
835    #[must_use]
836    pub fn with_object_lock(
837        mut self,
838        mgr: Arc<crate::object_lock::ObjectLockManager>,
839    ) -> Self {
840        self.object_lock = Some(mgr);
841        self
842    }
843
844    /// v0.7 #45: borrow the attached Object Lock manager (read-only —
845    /// the lifecycle scanner uses this to skip currently-locked objects
846    /// before issuing `delete_object`, since an Object Lock always wins
847    /// over Lifecycle Expiration in AWS S3 semantics). Mirrors the
848    /// shape of [`Self::lifecycle_manager`] /
849    /// [`Self::tag_manager`] — purely additive accessor, no handler
850    /// behaviour change.
851    #[must_use]
852    pub fn object_lock_manager(&self) -> Option<&Arc<crate::object_lock::ObjectLockManager>> {
853        self.object_lock.as_ref()
854    }
855
856    /// v0.5 #28: attach an SSE-KMS backend. `default_key_id` is used
857    /// when a PUT requests SSE-KMS without naming a specific KMS key
858    /// (operators set this to mirror AWS S3's bucket-default key).
859    #[must_use]
860    pub fn with_kms_backend(
861        mut self,
862        kms: Arc<dyn crate::kms::KmsBackend>,
863        default_key_id: Option<String>,
864    ) -> Self {
865        self.kms = Some(kms);
866        self.kms_default_key_id = default_key_id;
867        self
868    }
869
870    /// v0.5 #34: attach the first-class versioning state machine. Once
871    /// set, this `S4Service` owns the per-bucket versioning state +
872    /// per-(bucket, key) version chain; `put_object` / `get_object` /
873    /// `delete_object` / `list_object_versions` /
874    /// `get_bucket_versioning` / `put_bucket_versioning` consult the
875    /// manager instead of passing through to the backend. The backend
876    /// is still used as the byte store: Suspended / Unversioned buckets
877    /// keep using `<key>` directly (legacy), Enabled buckets redirect
878    /// each version's bytes to a shadow key
879    /// (`<key>.__s4ver__/<version-id>`) so older versions survive newer
880    /// PUTs to the same logical key.
881    #[must_use]
882    pub fn with_versioning(mut self, mgr: Arc<crate::versioning::VersioningManager>) -> Self {
883        self.versioning = Some(mgr);
884        self
885    }
886
887    /// v0.4 #21 (kept for back-compat): attach a single SSE-S4 key.
888    /// Internally wraps it in a 1-slot keyring with id=1 active, so
889    /// new objects ride the v0.5 S4E2 frame while previously-written
890    /// S4E1 bytes (this same key) still decrypt via the keyring's S4E1
891    /// fallback path. Operators wanting true rotation should call
892    /// [`Self::with_sse_keyring`] instead.
893    #[must_use]
894    pub fn with_sse_key(mut self, key: crate::sse::SharedSseKey) -> Self {
895        let keyring = crate::sse::SseKeyring::new(1, key);
896        self.sse_keyring = Some(std::sync::Arc::new(keyring));
897        self
898    }
899
900    /// v0.5 #29: attach a multi-key SSE-S4 keyring. PUT encrypts under
901    /// the active key (S4E2 frame stamped with that key's id); GET
902    /// dispatches on the body's magic — S4E1 falls back to trying every
903    /// key in the ring (active first) so v0.4 objects survive a
904    /// migration; S4E2 looks up the explicit key_id from the header.
905    #[must_use]
906    pub fn with_sse_keyring(mut self, keyring: crate::sse::SharedSseKeyring) -> Self {
907        self.sse_keyring = Some(keyring);
908        self
909    }
910
911    /// v0.8 #52: opt the SSE-S4 PUT path into the chunked S4E5 frame
912    /// (so the matching GET can stream-decrypt chunk-by-chunk
913    /// instead of buffering the entire body before tag verify).
914    /// `bytes` is the plaintext slice size — typically 1 MiB; 0
915    /// disables the path and reverts to the legacy S4E2 buffered
916    /// frame.
917    ///
918    /// SSE-C (S4E3) and SSE-KMS (S4E4) are intentionally untouched:
919    /// the chunked envelopes for those flows are a follow-up issue
920    /// (the customer-key wire surface needs separate version
921    /// negotiation).
922    ///
923    /// Has no effect when `with_sse_keyring` / `with_sse_key` is
924    /// not also set — the chunked path runs only on the SSE-S4
925    /// branch of `put_object`.
926    #[must_use]
927    pub fn with_sse_chunk_size(mut self, bytes: usize) -> Self {
928        self.sse_chunk_size = bytes;
929        self
930    }
931
932    /// v0.4 #20: attach an S3-style access-log emitter. Each completed
933    /// PUT / GET / DELETE / List handler emits one entry into the
934    /// emitter's buffer; a background flusher (started separately, see
935    /// [`crate::access_log::AccessLog::spawn_flusher`]) writes hourly
936    /// rotated `.log` files into the configured directory.
937    #[must_use]
938    pub fn with_access_log(mut self, log: crate::access_log::SharedAccessLog) -> Self {
939        self.access_log = Some(log);
940        self
941    }
942
943    /// Capture the per-request access-log preamble before the request is
944    /// consumed by the backend call. Returns `None` if no access logger
945    /// is configured (cheap early-out so the handler doesn't pay the
946    /// header-clone cost when access logging is off).
947    fn access_log_preamble<I>(&self, req: &S3Request<I>) -> Option<AccessLogPreamble> {
948        self.access_log.as_ref()?;
949        Some(AccessLogPreamble {
950            remote_ip: req
951                .headers
952                .get("x-forwarded-for")
953                .and_then(|v| v.to_str().ok())
954                .and_then(|raw| raw.split(',').next())
955                .map(|s| s.trim().to_owned()),
956            requester: Self::principal_of(req).map(str::to_owned),
957            request_uri: format!("{} {}", req.method, req.uri.path()),
958            user_agent: req
959                .headers
960                .get("user-agent")
961                .and_then(|v| v.to_str().ok())
962                .map(str::to_owned),
963        })
964    }
965
966    /// Internal — called by handlers at end-of-request with a captured
967    /// preamble. Best-effort: swallows the await fast (clones Arc +
968    /// pushes), no error propagation back to the request path.
969    #[allow(clippy::too_many_arguments)]
970    async fn record_access(
971        &self,
972        preamble: Option<AccessLogPreamble>,
973        operation: &'static str,
974        bucket: &str,
975        key: Option<&str>,
976        http_status: u16,
977        bytes_sent: u64,
978        object_size: u64,
979        total_time_ms: u64,
980        error_code: Option<&str>,
981    ) {
982        let (Some(log), Some(p)) = (self.access_log.as_ref(), preamble) else {
983            return;
984        };
985        log.record(crate::access_log::AccessLogEntry {
986            time: std::time::SystemTime::now(),
987            bucket: bucket.to_owned(),
988            remote_ip: p.remote_ip,
989            requester: p.requester,
990            operation,
991            key: key.map(str::to_owned),
992            request_uri: p.request_uri,
993            http_status,
994            error_code: error_code.map(str::to_owned),
995            bytes_sent,
996            object_size,
997            total_time_ms,
998            user_agent: p.user_agent,
999        })
1000        .await;
1001    }
1002
1003    /// v0.4 #19: attach a per-(principal, bucket) token-bucket rate limiter.
1004    /// When set, every PUT / GET / DELETE / List / Copy / multipart op is
1005    /// throttle-checked before the policy gate; throttled requests return
1006    /// `S3ErrorCode::SlowDown` (HTTP 503) and bump
1007    /// `s4_rate_limit_throttled_total{principal,bucket}`.
1008    #[must_use]
1009    pub fn with_rate_limits(mut self, rl: crate::rate_limit::SharedRateLimits) -> Self {
1010        self.rate_limits = Some(rl);
1011        self
1012    }
1013
1014    /// Helper used by request handlers to apply the rate limit. Returns
1015    /// `Ok(())` when allowed (or no rate limiter is configured), or a
1016    /// `SlowDown` S3Error otherwise.
1017    fn enforce_rate_limit<I>(&self, req: &S3Request<I>, bucket: &str) -> S3Result<()> {
1018        let Some(rl) = self.rate_limits.as_ref() else {
1019            return Ok(());
1020        };
1021        let principal_id = Self::principal_of(req);
1022        if !rl.check(principal_id, bucket) {
1023            crate::metrics::record_rate_limit_throttle(principal_id.unwrap_or("-"), bucket);
1024            return Err(S3Error::with_message(
1025                S3ErrorCode::SlowDown,
1026                format!("rate-limited: bucket={bucket}"),
1027            ));
1028        }
1029        Ok(())
1030    }
1031
1032    /// Tell the policy evaluator that the listener is reached over TLS
1033    /// (or ACME). When `true`, the `aws:SecureTransport` Condition key
1034    /// resolves to `true`. Defaults to `false`.
1035    #[must_use]
1036    pub fn with_secure_transport(mut self, on: bool) -> Self {
1037        self.secure_transport = on;
1038        self
1039    }
1040
1041    #[must_use]
1042    pub fn with_max_body_bytes(mut self, n: usize) -> Self {
1043        self.max_body_bytes = n;
1044        self
1045    }
1046
1047    /// Attach an optional bucket policy (v0.2 #7). When `Some(...)`, every
1048    /// PUT / GET / DELETE / List handler runs `policy.evaluate(...)` before
1049    /// delegating to the backend; failures return `S3ErrorCode::AccessDenied`.
1050    /// When `None` (the default), no policy enforcement happens.
1051    #[must_use]
1052    pub fn with_policy(mut self, policy: crate::policy::SharedPolicy) -> Self {
1053        self.policy = Some(policy);
1054        self
1055    }
1056
1057    /// Pull the SigV4 access key id off the request's credentials, if any.
1058    /// Used as the `principal_id` for policy evaluation.
1059    fn principal_of<I>(req: &S3Request<I>) -> Option<&str> {
1060        req.credentials.as_ref().map(|c| c.access_key.as_str())
1061    }
1062
1063    /// v0.3 #13: build the per-request policy context from the incoming
1064    /// `S3Request`. Pulls `aws:UserAgent` from the User-Agent header,
1065    /// `aws:SourceIp` from the standard `X-Forwarded-For` header (most
1066    /// production deployments are behind an LB / reverse proxy that sets
1067    /// this), `aws:CurrentTime` from the system clock, and
1068    /// `aws:SecureTransport` from the per-listener TLS flag.
1069    fn request_context<I>(&self, req: &S3Request<I>) -> crate::policy::RequestContext {
1070        let user_agent = req
1071            .headers
1072            .get("user-agent")
1073            .and_then(|v| v.to_str().ok())
1074            .map(str::to_owned);
1075        // X-Forwarded-For is `client, proxy1, proxy2`; the leftmost entry
1076        // is the original client. Trim and parse leniently.
1077        let source_ip = req
1078            .headers
1079            .get("x-forwarded-for")
1080            .and_then(|v| v.to_str().ok())
1081            .and_then(|raw| raw.split(',').next())
1082            .and_then(|s| s.trim().parse().ok());
1083        crate::policy::RequestContext {
1084            source_ip,
1085            user_agent,
1086            request_time: Some(std::time::SystemTime::now()),
1087            secure_transport: self.secure_transport,
1088            existing_object_tags: None,
1089            request_object_tags: None,
1090            extra: Default::default(),
1091        }
1092    }
1093
1094    /// Helper used by request handlers to enforce the optional policy.
1095    /// Returns `Ok(())` when allowed (or no policy is configured), or an
1096    /// `AccessDenied` S3Error otherwise. Bumps the policy denial Prometheus
1097    /// counter on deny.
1098    fn enforce_policy<I>(
1099        &self,
1100        req: &S3Request<I>,
1101        action: &'static str,
1102        bucket: &str,
1103        key: Option<&str>,
1104    ) -> S3Result<()> {
1105        self.enforce_policy_with_extra(req, action, bucket, key, None, None)
1106    }
1107
1108    /// v0.6 #39: variant of [`Self::enforce_policy`] that lets the
1109    /// caller plumb tag context (existing-on-object + on-request) into
1110    /// the policy evaluator. Both arguments default to `None`, in
1111    /// which case the resulting `RequestContext` is identical to
1112    /// [`Self::enforce_policy`]'s — so for handlers that don't deal
1113    /// with tags this is a transparent no-op.
1114    fn enforce_policy_with_extra<I>(
1115        &self,
1116        req: &S3Request<I>,
1117        action: &'static str,
1118        bucket: &str,
1119        key: Option<&str>,
1120        request_tags: Option<&crate::tagging::TagSet>,
1121        existing_tags: Option<&crate::tagging::TagSet>,
1122    ) -> S3Result<()> {
1123        let Some(policy) = self.policy.as_ref() else {
1124            return Ok(());
1125        };
1126        let principal_id = Self::principal_of(req);
1127        let mut ctx = self.request_context(req);
1128        if let Some(t) = request_tags {
1129            ctx.request_object_tags = Some(t.clone());
1130        }
1131        if let Some(t) = existing_tags {
1132            ctx.existing_object_tags = Some(t.clone());
1133        }
1134        let decision = policy.evaluate_with(action, bucket, key, principal_id, &ctx);
1135        if decision.allow {
1136            Ok(())
1137        } else {
1138            crate::metrics::record_policy_denial(action, bucket);
1139            tracing::info!(
1140                action,
1141                bucket,
1142                key = ?key,
1143                principal = ?principal_id,
1144                source_ip = ?ctx.source_ip,
1145                user_agent = ?ctx.user_agent,
1146                secure_transport = ctx.secure_transport,
1147                matched_sid = ?decision.matched_sid,
1148                effect = ?decision.matched_effect,
1149                "S4 policy denied request"
1150            );
1151            Err(S3Error::with_message(
1152                S3ErrorCode::AccessDenied,
1153                format!("denied by S4 policy: {action} on bucket={bucket}"),
1154            ))
1155        }
1156    }
1157
1158    /// テスト用: backend を取り戻す (test helper、production では使わない).
1159    /// v0.6 #40 で `backend` が `Arc<B>` 化したので `Arc::try_unwrap` で
1160    /// 1-clone の場合のみ返す。共有されている (= replication dispatcher が
1161    /// 同じ Arc を持っていて未完了) 場合は `Err` を返さず panic させる
1162    /// (test 用途専用 helper の caller 契約を維持)。
1163    pub fn into_backend(self) -> B {
1164        Arc::try_unwrap(self.backend)
1165            .unwrap_or_else(|_| panic!("into_backend: backend Arc still shared (replication dispatcher in flight?)"))
1166    }
1167
1168    /// 必要 frame だけを backend に Range GET し、frame parse + decompress + slice
1169    /// した結果を返す sidecar fast path。Range request の **帯域節約版**。
1170    async fn partial_range_get(
1171        &self,
1172        req: &S3Request<GetObjectInput>,
1173        plan: s4_codec::index::RangePlan,
1174        client_start: u64,
1175        client_end_exclusive: u64,
1176        total_original: u64,
1177        get_start: Instant,
1178    ) -> S3Result<S3Response<GetObjectOutput>> {
1179        // 必要 byte 範囲だけを backend に partial GET
1180        let backend_range = s3s::dto::Range::Int {
1181            first: plan.byte_start,
1182            last: Some(plan.byte_end_exclusive - 1),
1183        };
1184        let backend_input = GetObjectInput {
1185            bucket: req.input.bucket.clone(),
1186            key: req.input.key.clone(),
1187            range: Some(backend_range),
1188            ..Default::default()
1189        };
1190        let backend_req = S3Request {
1191            input: backend_input,
1192            method: req.method.clone(),
1193            uri: req.uri.clone(),
1194            headers: req.headers.clone(),
1195            extensions: http::Extensions::new(),
1196            credentials: req.credentials.clone(),
1197            region: req.region.clone(),
1198            service: req.service.clone(),
1199            trailing_headers: None,
1200        };
1201        let mut backend_resp = self.backend.get_object(backend_req).await?;
1202        let blob = backend_resp.output.body.take().ok_or_else(|| {
1203            S3Error::with_message(
1204                S3ErrorCode::InternalError,
1205                "backend partial GET returned empty body",
1206            )
1207        })?;
1208        let bytes = collect_blob(blob, self.max_body_bytes)
1209            .await
1210            .map_err(internal("collect partial body"))?;
1211
1212        // frame parse + decompress
1213        let mut combined = BytesMut::new();
1214        for frame in FrameIter::new(bytes) {
1215            let (header, payload) = frame.map_err(|e| {
1216                S3Error::with_message(
1217                    S3ErrorCode::InternalError,
1218                    format!("partial-range frame parse: {e}"),
1219                )
1220            })?;
1221            let chunk_manifest = ChunkManifest {
1222                codec: header.codec,
1223                original_size: header.original_size,
1224                compressed_size: header.compressed_size,
1225                crc32c: header.crc32c,
1226            };
1227            let decompressed = self
1228                .registry
1229                .decompress(payload, &chunk_manifest)
1230                .await
1231                .map_err(internal("partial-range decompress"))?;
1232            combined.extend_from_slice(&decompressed);
1233        }
1234        let combined = combined.freeze();
1235        let sliced = combined
1236            .slice(plan.slice_start_in_combined as usize..plan.slice_end_in_combined as usize);
1237
1238        // response 組立て
1239        let returned_size = sliced.len() as u64;
1240        backend_resp.output.content_length = Some(returned_size as i64);
1241        backend_resp.output.content_range = Some(format!(
1242            "bytes {client_start}-{}/{total_original}",
1243            client_end_exclusive - 1
1244        ));
1245        backend_resp.output.checksum_crc32 = None;
1246        backend_resp.output.checksum_crc32c = None;
1247        backend_resp.output.checksum_crc64nvme = None;
1248        backend_resp.output.checksum_sha1 = None;
1249        backend_resp.output.checksum_sha256 = None;
1250        backend_resp.output.e_tag = None;
1251        backend_resp.output.body = Some(bytes_to_blob(sliced));
1252        backend_resp.status = Some(http::StatusCode::PARTIAL_CONTENT);
1253
1254        let elapsed = get_start.elapsed();
1255        crate::metrics::record_get(
1256            "partial",
1257            plan.byte_end_exclusive - plan.byte_start,
1258            returned_size,
1259            elapsed.as_secs_f64(),
1260            true,
1261        );
1262        info!(
1263            op = "get_object",
1264            bucket = %req.input.bucket,
1265            key = %req.input.key,
1266            bytes_in = plan.byte_end_exclusive - plan.byte_start,
1267            bytes_out = returned_size,
1268            total_object_size = total_original,
1269            range = true,
1270            path = "sidecar-partial",
1271            latency_ms = elapsed.as_millis() as u64,
1272            "S4 partial Range GET via sidecar index"
1273        );
1274        Ok(backend_resp)
1275    }
1276
1277    /// `<key>.s4index` sidecar object を backend に書く。失敗しても本体 PUT は
1278    /// 成功扱いにしたいので、err は warn ログのみ (Range GET の partial path が
1279    /// 使えなくなるが、full read fallback で意味的には正しい結果を返す)。
1280    async fn write_sidecar(&self, bucket: &str, key: &str, index: &FrameIndex) {
1281        let bytes = encode_index(index);
1282        let len = bytes.len() as i64;
1283        let sidecar = sidecar_key(key);
1284        // v0.7 #49: synthetic re-entry URI must be percent-encoded; if
1285        // the (already legally-arbitrary) S3 key produces something we
1286        // cannot encode at all, drop the sidecar PUT (the GET path
1287        // falls back to a full read on a missing sidecar) instead of
1288        // panicking on `parse().unwrap()`.
1289        let uri = match safe_object_uri(bucket, &sidecar) {
1290            Ok(u) => u,
1291            Err(e) => {
1292                tracing::warn!(
1293                    bucket,
1294                    key,
1295                    "S4 write_sidecar skipped (key not URI-encodable): {e}"
1296                );
1297                return;
1298            }
1299        };
1300        let put_input = PutObjectInput {
1301            bucket: bucket.into(),
1302            key: sidecar,
1303            body: Some(bytes_to_blob(bytes)),
1304            content_length: Some(len),
1305            content_type: Some("application/x-s4-index".into()),
1306            ..Default::default()
1307        };
1308        let put_req = S3Request {
1309            input: put_input,
1310            method: http::Method::PUT,
1311            uri,
1312            headers: http::HeaderMap::new(),
1313            extensions: http::Extensions::new(),
1314            credentials: None,
1315            region: None,
1316            service: None,
1317            trailing_headers: None,
1318        };
1319        if let Err(e) = self.backend.put_object(put_req).await {
1320            tracing::warn!(
1321                bucket,
1322                key,
1323                "S4 write_sidecar failed (Range GET will fall back to full read): {e}"
1324            );
1325        }
1326    }
1327
1328    /// `<key>.s4index` sidecar を backend から読み出す。なければ None。
1329    async fn read_sidecar(&self, bucket: &str, key: &str) -> Option<FrameIndex> {
1330        let sidecar = sidecar_key(key);
1331        // v0.7 #49: same encode-or-bail treatment as write_sidecar.
1332        let uri = safe_object_uri(bucket, &sidecar).ok()?;
1333        let get_input = GetObjectInput {
1334            bucket: bucket.into(),
1335            key: sidecar,
1336            ..Default::default()
1337        };
1338        let get_req = S3Request {
1339            input: get_input,
1340            method: http::Method::GET,
1341            uri,
1342            headers: http::HeaderMap::new(),
1343            extensions: http::Extensions::new(),
1344            credentials: None,
1345            region: None,
1346            service: None,
1347            trailing_headers: None,
1348        };
1349        let resp = self.backend.get_object(get_req).await.ok()?;
1350        let blob = resp.output.body?;
1351        let bytes = collect_blob(blob, 64 * 1024 * 1024).await.ok()?;
1352        decode_index(bytes).ok()
1353    }
1354
1355    /// Multipart object (frame 列) を解凍 → 元 bytes を再構築。
1356    ///
1357    /// **per-frame codec dispatch**: 各 frame header に codec_id が入っているので、
1358    /// frame ごとに registry が違う codec を呼ぶことができる。同一 object 内で
1359    /// 異なる codec が混在していても透過的に解凍可能 (parquet 風 mixed columns 等)。
1360    async fn decompress_multipart(&self, bytes: bytes::Bytes) -> S3Result<bytes::Bytes> {
1361        let mut out = BytesMut::new();
1362        for frame in FrameIter::new(bytes) {
1363            let (header, payload) = frame.map_err(|e| {
1364                S3Error::with_message(
1365                    S3ErrorCode::InternalError,
1366                    format!("multipart frame parse: {e}"),
1367                )
1368            })?;
1369            let chunk_manifest = ChunkManifest {
1370                codec: header.codec,
1371                original_size: header.original_size,
1372                compressed_size: header.compressed_size,
1373                crc32c: header.crc32c,
1374            };
1375            let decompressed = self
1376                .registry
1377                .decompress(payload, &chunk_manifest)
1378                .await
1379                .map_err(internal("multipart frame decompress"))?;
1380            out.extend_from_slice(&decompressed);
1381        }
1382        Ok(out.freeze())
1383    }
1384}
1385
1386/// Parse a CopySourceRange header value (`bytes=N-M`, `bytes=N-`, `bytes=-N`)
1387/// into the s3s::dto::Range used by the GetObject path. The S3 spec only
1388/// allows `bytes=N-M` for upload_part_copy (no suffix or open-ended), so
1389/// reject the other variants for parity with AWS.
1390fn parse_copy_source_range(s: &str) -> Result<s3s::dto::Range, String> {
1391    let rest = s
1392        .strip_prefix("bytes=")
1393        .ok_or_else(|| format!("CopySourceRange must start with 'bytes=', got {s:?}"))?;
1394    let (a, b) = rest
1395        .split_once('-')
1396        .ok_or_else(|| format!("CopySourceRange must be 'bytes=N-M', got {s:?}"))?;
1397    let first: u64 = a
1398        .parse()
1399        .map_err(|_| format!("CopySourceRange first byte not a number: {a:?}"))?;
1400    let last: u64 = b
1401        .parse()
1402        .map_err(|_| format!("CopySourceRange last byte not a number: {b:?}"))?;
1403    if last < first {
1404        return Err(format!("CopySourceRange last < first: {s:?}"));
1405    }
1406    Ok(s3s::dto::Range::Int {
1407        first,
1408        last: Some(last),
1409    })
1410}
1411
1412/// v0.5 #34: synthesize the backend storage key for a given
1413/// (logical key, version-id) pair on an Enabled-versioning bucket.
1414///
1415/// Uses the `__s4ver__/` infix because:
1416/// - it's not a substring of `.s4index` / `.s4ver` natural keys (no false-positive
1417///   listing filter collisions)
1418/// - directory-style separator keeps S3 console "browse by prefix" UX intact
1419///   (versions roll up under one virtual folder per object)
1420/// - human-readable on debug logs / `aws s3 ls`
1421///
1422/// `list_objects` / `list_objects_v2` / `list_object_versions` MUST filter
1423/// keys containing `.__s4ver__/` from results so customers don't see internal
1424/// shadow objects.
1425pub fn versioned_shadow_key(key: &str, version_id: &str) -> String {
1426    format!("{key}.__s4ver__/{version_id}")
1427}
1428
1429/// Test for the marker substring used by [`versioned_shadow_key`]. Cheap str
1430/// scan; both list_objects filter and the GET passthrough check use this.
1431fn is_versioning_shadow_key(key: &str) -> bool {
1432    key.contains(".__s4ver__/")
1433}
1434
1435/// v0.6 #42: wall-clock seconds since the UNIX epoch — fed to
1436/// `mfa::check_mfa` so the TOTP verifier can match the client's
1437/// authenticator app's view of "now". Falls back to `0` on the
1438/// (impossible-in-practice) clock-before-1970 path so the verifier
1439/// rejects rather than panicking.
1440fn current_unix_secs() -> u64 {
1441    std::time::SystemTime::now()
1442        .duration_since(std::time::UNIX_EPOCH)
1443        .map(|d| d.as_secs())
1444        .unwrap_or(0)
1445}
1446
1447/// v0.6 #42: translate an `MfaError` into the matching S3 wire error.
1448///
1449/// - `Missing` / `SerialMismatch` / `InvalidCode` → `403 AccessDenied`
1450///   (S3 spec for MFA Delete: every gating failure surfaces as
1451///   `AccessDenied`, not a separate `MFA*` code).
1452/// - `Malformed` → `400 InvalidRequest` (the request itself is
1453///   syntactically broken, not a permission issue).
1454fn mfa_error_to_s3(e: crate::mfa::MfaError) -> S3Error {
1455    match e {
1456        crate::mfa::MfaError::Missing => S3Error::with_message(
1457            S3ErrorCode::AccessDenied,
1458            "MFA token required for this operation",
1459        ),
1460        crate::mfa::MfaError::Malformed => S3Error::with_message(
1461            S3ErrorCode::InvalidRequest,
1462            "malformed x-amz-mfa header",
1463        ),
1464        crate::mfa::MfaError::SerialMismatch => S3Error::with_message(
1465            S3ErrorCode::AccessDenied,
1466            "MFA serial does not match configured device",
1467        ),
1468        crate::mfa::MfaError::InvalidCode => S3Error::with_message(
1469            S3ErrorCode::AccessDenied,
1470            "invalid MFA code",
1471        ),
1472    }
1473}
1474
1475fn is_multipart_object(metadata: &Option<Metadata>) -> bool {
1476    metadata
1477        .as_ref()
1478        .and_then(|m| m.get(META_MULTIPART))
1479        .map(|v| v == "true")
1480        .unwrap_or(false)
1481}
1482
1483const META_CODEC: &str = "s4-codec";
1484const META_ORIGINAL_SIZE: &str = "s4-original-size";
1485const META_COMPRESSED_SIZE: &str = "s4-compressed-size";
1486const META_CRC32C: &str = "s4-crc32c";
1487/// Multipart upload で per-part frame format を使ったオブジェクトであることを示す。
1488/// GET 時にこの flag を見て frame parser を起動する。
1489const META_MULTIPART: &str = "s4-multipart";
1490/// v0.2 #4: single-PUT でも S4F2 framed format で書かれていることを示す。
1491/// 旧 v0.1 single-PUT は raw 圧縮 bytes (この flag なし)。GET 時にこの flag を
1492/// 見て framed 経路 (= multipart と同じ FrameIter parse) に流す。
1493const META_FRAMED: &str = "s4-framed";
1494
1495fn is_framed_v2_object(metadata: &Option<Metadata>) -> bool {
1496    metadata
1497        .as_ref()
1498        .and_then(|m| m.get(META_FRAMED))
1499        .map(|v| v == "true")
1500        .unwrap_or(false)
1501}
1502
1503/// v0.4 #21: detect SSE-S4 by the metadata flag we set on PUT.
1504fn is_sse_encrypted(metadata: &Option<Metadata>) -> bool {
1505    metadata
1506        .as_ref()
1507        .and_then(|m| m.get("s4-encrypted"))
1508        .map(|v| v == "aes-256-gcm")
1509        .unwrap_or(false)
1510}
1511
1512/// v0.5 #27: pull the three SSE-C headers off an input struct. The S3
1513/// contract is "all three or none" — partial sets are a 400.
1514///
1515/// Returns `Ok(None)` when no SSE-C headers were sent (server-managed or
1516/// no encryption), `Ok(Some(material))` on validated client key, and
1517/// `Err` for malformed or partial inputs.
1518fn extract_sse_c_material(
1519    algorithm: &Option<String>,
1520    key: &Option<String>,
1521    md5: &Option<String>,
1522) -> S3Result<Option<crate::sse::CustomerKeyMaterial>> {
1523    match (algorithm, key, md5) {
1524        (None, None, None) => Ok(None),
1525        (Some(a), Some(k), Some(m)) => crate::sse::parse_customer_key_headers(a, k, m)
1526            .map(Some)
1527            .map_err(sse_c_error_to_s3),
1528        _ => Err(S3Error::with_message(
1529            S3ErrorCode::InvalidRequest,
1530            "SSE-C requires all three of: x-amz-server-side-encryption-customer-{algorithm,key,key-MD5}",
1531        )),
1532    }
1533}
1534
1535/// v0.5 #28: detect SSE-KMS request — `x-amz-server-side-encryption: aws:kms`.
1536/// Returns the key-id to wrap under, falling back to the gateway default.
1537fn extract_kms_key_id(
1538    sse: &Option<ServerSideEncryption>,
1539    sse_kms_key_id: &Option<String>,
1540    gateway_default: Option<&str>,
1541) -> Option<String> {
1542    let asks_for_kms = sse
1543        .as_ref()
1544        .map(|s| s.as_str() == ServerSideEncryption::AWS_KMS)
1545        .unwrap_or(false);
1546    if !asks_for_kms {
1547        return None;
1548    }
1549    sse_kms_key_id
1550        .clone()
1551        .or_else(|| gateway_default.map(str::to_owned))
1552}
1553
1554/// v0.5 #28: map kms module errors to AWS-shaped S3 error codes.
1555/// `KeyNotFound` is operator misconfig (400); `BackendUnavailable` is a
1556/// transient KMS outage (503). Other variants are 500 InternalError.
1557fn kms_error_to_s3(e: crate::kms::KmsError) -> S3Error {
1558    use crate::kms::KmsError as K;
1559    match e {
1560        K::KeyNotFound { key_id } => S3Error::with_message(
1561            S3ErrorCode::InvalidArgument,
1562            format!("KMS key not found: {key_id}"),
1563        ),
1564        K::BackendUnavailable { message } => S3Error::with_message(
1565            S3ErrorCode::ServiceUnavailable,
1566            format!("KMS backend unavailable: {message}"),
1567        ),
1568        other => S3Error::with_message(
1569            S3ErrorCode::InternalError,
1570            format!("KMS error: {other}"),
1571        ),
1572    }
1573}
1574
1575/// v0.5 #27: map sse module errors to AWS-shaped S3 error codes.
1576/// `WrongCustomerKey` → 403 AccessDenied (matches AWS behaviour);
1577/// `InvalidCustomerKey` / algorithm / required / unexpected → 400.
1578fn sse_c_error_to_s3(e: crate::sse::SseError) -> S3Error {
1579    use crate::sse::SseError as E;
1580    match e {
1581        E::WrongCustomerKey => S3Error::with_message(
1582            S3ErrorCode::AccessDenied,
1583            "SSE-C key does not match the key used at PUT time",
1584        ),
1585        E::InvalidCustomerKey { reason } => S3Error::with_message(
1586            S3ErrorCode::InvalidArgument,
1587            format!("SSE-C: {reason}"),
1588        ),
1589        E::CustomerKeyAlgorithmUnsupported { algo } => S3Error::with_message(
1590            S3ErrorCode::InvalidArgument,
1591            format!("SSE-C unsupported algorithm: {algo:?} (only AES256 is allowed)"),
1592        ),
1593        E::CustomerKeyRequired => S3Error::with_message(
1594            S3ErrorCode::InvalidRequest,
1595            "object is SSE-C encrypted; supply x-amz-server-side-encryption-customer-* headers",
1596        ),
1597        E::CustomerKeyUnexpected => S3Error::with_message(
1598            S3ErrorCode::InvalidRequest,
1599            "object is not SSE-C encrypted; do not send x-amz-server-side-encryption-customer-* headers",
1600        ),
1601        other => S3Error::with_message(S3ErrorCode::InternalError, format!("SSE error: {other}")),
1602    }
1603}
1604
1605fn extract_manifest(metadata: &Option<Metadata>) -> Option<ChunkManifest> {
1606    let m = metadata.as_ref()?;
1607    let codec = m
1608        .get(META_CODEC)
1609        .and_then(|s| s.parse::<CodecKind>().ok())?;
1610    let original_size = m.get(META_ORIGINAL_SIZE)?.parse().ok()?;
1611    let compressed_size = m.get(META_COMPRESSED_SIZE)?.parse().ok()?;
1612    let crc32c = m.get(META_CRC32C)?.parse().ok()?;
1613    Some(ChunkManifest {
1614        codec,
1615        original_size,
1616        compressed_size,
1617        crc32c,
1618    })
1619}
1620
1621fn write_manifest(metadata: &mut Option<Metadata>, manifest: &ChunkManifest) {
1622    let meta = metadata.get_or_insert_with(Default::default);
1623    meta.insert(META_CODEC.into(), manifest.codec.as_str().into());
1624    meta.insert(
1625        META_ORIGINAL_SIZE.into(),
1626        manifest.original_size.to_string(),
1627    );
1628    meta.insert(
1629        META_COMPRESSED_SIZE.into(),
1630        manifest.compressed_size.to_string(),
1631    );
1632    meta.insert(META_CRC32C.into(), manifest.crc32c.to_string());
1633}
1634
1635fn internal<E: std::fmt::Display>(prefix: &'static str) -> impl FnOnce(E) -> S3Error {
1636    move |e| S3Error::with_message(S3ErrorCode::InternalError, format!("{prefix}: {e}"))
1637}
1638
1639/// v0.6 #41: map a `select::SelectError` to the S3 error surface. AWS
1640/// uses a domain-specific `InvalidSqlExpression` code for parse / unsupported
1641/// errors, but s3s 0.13 doesn't expose that as a typed variant — we
1642/// fall back to the well-known `InvalidRequest` 400 with a descriptive
1643/// message that includes the original error context.
1644fn select_error_to_s3(e: crate::select::SelectError, fmt: &str) -> S3Error {
1645    use crate::select::SelectError;
1646    match e {
1647        SelectError::Parse(msg) => S3Error::with_message(
1648            S3ErrorCode::InvalidRequest,
1649            format!("SQL parse error: {msg}"),
1650        ),
1651        SelectError::UnsupportedFeature(msg) => S3Error::with_message(
1652            S3ErrorCode::InvalidRequest,
1653            format!("unsupported SQL feature: {msg}"),
1654        ),
1655        SelectError::RowEval(msg) => S3Error::with_message(
1656            S3ErrorCode::InvalidRequest,
1657            format!("SQL row evaluation error: {msg}"),
1658        ),
1659        SelectError::InputFormat(msg) => S3Error::with_message(
1660            S3ErrorCode::InvalidRequest,
1661            format!("{fmt} input format error: {msg}"),
1662        ),
1663    }
1664}
1665
1666/// v0.5 #30: parse the `x-amz-bypass-governance-retention` header into a
1667/// boolean flag. AWS S3 accepts `true` (case-insensitive); any other value
1668/// (including missing) is treated as `false`.
1669fn parse_bypass_governance_header(headers: &http::HeaderMap) -> bool {
1670    headers
1671        .get("x-amz-bypass-governance-retention")
1672        .and_then(|v| v.to_str().ok())
1673        .map(|s| s.eq_ignore_ascii_case("true"))
1674        .unwrap_or(false)
1675}
1676
1677/// Convert s3s `Timestamp` into a `chrono::DateTime<Utc>` by formatting it
1678/// as an RFC3339 string and re-parsing through `chrono`. The string format
1679/// avoids pulling the `time` crate (transitive dep of s3s, not declared by
1680/// s4-server) into our direct deps. Returns `None` if the format/parse fails
1681/// or the value is outside `chrono`'s supported range.
1682fn timestamp_to_chrono_utc(ts: &Timestamp) -> Option<chrono::DateTime<chrono::Utc>> {
1683    let mut buf = Vec::new();
1684    ts.format(s3s::dto::TimestampFormat::DateTime, &mut buf).ok()?;
1685    let s = std::str::from_utf8(&buf).ok()?;
1686    chrono::DateTime::parse_from_rfc3339(s)
1687        .ok()
1688        .map(|dt| dt.with_timezone(&chrono::Utc))
1689}
1690
1691/// Inverse of [`timestamp_to_chrono_utc`] — emit RFC3339 (the s3s
1692/// `DateTime` wire format) and re-parse via `Timestamp::parse`.
1693fn chrono_utc_to_timestamp(dt: chrono::DateTime<chrono::Utc>) -> Timestamp {
1694    // chrono's RFC3339 output format matches s3s' parser ("...Z" with
1695    // optional sub-second precision). Fall back to UNIX_EPOCH if anything
1696    // unexpected happens — we never produce malformed strings, so this
1697    // branch is unreachable in practice.
1698    let s = dt.to_rfc3339_opts(chrono::SecondsFormat::Millis, true);
1699    Timestamp::parse(s3s::dto::TimestampFormat::DateTime, &s).unwrap_or_default()
1700}
1701
1702/// v0.6 #39: convert our internal [`crate::tagging::TagSet`] into the
1703/// s3s `Vec<Tag>` wire shape used on `GetObject/BucketTaggingOutput`.
1704/// Both halves of every pair land in the `Some(_)` slot — AWS marks
1705/// the field optional but always populates it on response.
1706fn tagset_to_aws(set: &crate::tagging::TagSet) -> Vec<Tag> {
1707    set.iter()
1708        .map(|(k, v)| Tag {
1709            key: Some(k.clone()),
1710            value: Some(v.clone()),
1711        })
1712        .collect()
1713}
1714
1715/// v0.6 #39: inverse of [`tagset_to_aws`] for input handlers. Missing
1716/// keys / values become empty strings (mirrors AWS, which rejects
1717/// `<Key/>` with InvalidTag at the parser layer; downstream
1718/// `TagSet::validate` then enforces our size limits).
1719fn aws_to_tagset(tags: &[Tag]) -> Result<crate::tagging::TagSet, crate::tagging::TagError> {
1720    let pairs = tags
1721        .iter()
1722        .map(|t| {
1723            (
1724                t.key.clone().unwrap_or_default(),
1725                t.value.clone().unwrap_or_default(),
1726            )
1727        })
1728        .collect();
1729    crate::tagging::TagSet::from_pairs(pairs)
1730}
1731
1732/// `Range` request を decompressed object サイズ `total` に適用して `(start, end_exclusive)`
1733/// を返す。`Range::Int { first, last }` は `bytes=first-last` (last は inclusive)、
1734/// `Range::Suffix { length }` は末尾 `length` byte。S3 仕様に準拠。
1735pub fn resolve_range(range: &s3s::dto::Range, total: u64) -> Result<(u64, u64), String> {
1736    if total == 0 {
1737        return Err("cannot range-get zero-length object".into());
1738    }
1739    match range {
1740        s3s::dto::Range::Int { first, last } => {
1741            let start = *first;
1742            let end_inclusive = match last {
1743                Some(l) => (*l).min(total - 1),
1744                None => total - 1,
1745            };
1746            if start > end_inclusive || start >= total {
1747                return Err(format!(
1748                    "range bytes={start}-{:?} out of object size {total}",
1749                    last
1750                ));
1751            }
1752            Ok((start, end_inclusive + 1))
1753        }
1754        s3s::dto::Range::Suffix { length } => {
1755            let len = (*length).min(total);
1756            Ok((total - len, total))
1757        }
1758    }
1759}
1760
1761#[async_trait::async_trait]
1762impl<B: S3> S3 for S4Service<B> {
1763    // === 圧縮を挟む path (PUT) ===
1764    #[tracing::instrument(
1765        name = "s4.put_object",
1766        skip(self, req),
1767        fields(bucket = %req.input.bucket, key = %req.input.key, codec, bytes_in, bytes_out, latency_ms)
1768    )]
1769    async fn put_object(
1770        &self,
1771        mut req: S3Request<PutObjectInput>,
1772    ) -> S3Result<S3Response<PutObjectOutput>> {
1773        let put_start = Instant::now();
1774        let put_bucket = req.input.bucket.clone();
1775        let put_key = req.input.key.clone();
1776        let access_preamble = self.access_log_preamble(&req);
1777        self.enforce_rate_limit(&req, &put_bucket)?;
1778        // v0.6 #39: parse `x-amz-tagging` (URL-encoded query string) so
1779        // the IAM policy gate sees the request's tags via
1780        // `s3:RequestObjectTag/<key>`. `existing_object_tags` is also
1781        // resolved from the Tagging manager (when wired) so
1782        // `s3:ExistingObjectTag/<key>` works on overwrite.
1783        let request_tags: Option<crate::tagging::TagSet> = req
1784            .input
1785            .tagging
1786            .as_deref()
1787            .map(crate::tagging::parse_tagging_header)
1788            .transpose()
1789            .map_err(|e| S3Error::with_message(S3ErrorCode::InvalidArgument, e.to_string()))?;
1790        let existing_tags: Option<crate::tagging::TagSet> = self
1791            .tagging
1792            .as_ref()
1793            .and_then(|m| m.get_object_tags(&put_bucket, &put_key));
1794        self.enforce_policy_with_extra(
1795            &req,
1796            "s3:PutObject",
1797            &put_bucket,
1798            Some(&put_key),
1799            request_tags.as_ref(),
1800            existing_tags.as_ref(),
1801        )?;
1802        // v0.5 #30: an Object Lock-protected key cannot be overwritten by
1803        // a non-versioned PUT (Suspended / Unversioned bucket). Enabled
1804        // bucket PUTs are exempt because they materialise a fresh
1805        // version under a shadow key (`<key>.__s4ver__/<vid>`) — the
1806        // locked version's bytes are untouched. The check mirrors the
1807        // delete path (Compliance never bypassable, Governance via the
1808        // bypass header, legal hold never).
1809        if let Some(mgr) = self.object_lock.as_ref()
1810            && let Some(state) = mgr.get(&put_bucket, &put_key)
1811        {
1812            let bucket_versioned_enabled = self
1813                .versioning
1814                .as_ref()
1815                .map(|v| v.state(&put_bucket) == crate::versioning::VersioningState::Enabled)
1816                .unwrap_or(false);
1817            if !bucket_versioned_enabled {
1818                let bypass = parse_bypass_governance_header(&req.headers);
1819                let now = chrono::Utc::now();
1820                if !state.can_delete(now, bypass) {
1821                    crate::metrics::record_policy_denial("s3:PutObject", &put_bucket);
1822                    return Err(S3Error::with_message(
1823                        S3ErrorCode::AccessDenied,
1824                        "Access Denied because object protected by object lock",
1825                    ));
1826                }
1827            }
1828        }
1829        // v0.5 #30: per-PUT explicit retention / legal hold (S3
1830        // `x-amz-object-lock-mode`, `x-amz-object-lock-retain-until-date`,
1831        // `x-amz-object-lock-legal-hold`). Captured before the body
1832        // moves into the backend; persisted into the manager only on
1833        // backend success below.
1834        let explicit_lock_mode: Option<crate::object_lock::LockMode> = req
1835            .input
1836            .object_lock_mode
1837            .as_ref()
1838            .and_then(|m| crate::object_lock::LockMode::from_aws_str(m.as_str()));
1839        let explicit_retain_until: Option<chrono::DateTime<chrono::Utc>> = req
1840            .input
1841            .object_lock_retain_until_date
1842            .as_ref()
1843            .and_then(timestamp_to_chrono_utc);
1844        let explicit_legal_hold_on: Option<bool> = req
1845            .input
1846            .object_lock_legal_hold_status
1847            .as_ref()
1848            .map(|s| s.as_str().eq_ignore_ascii_case("ON"));
1849        if let Some(blob) = req.input.body.take() {
1850            // Sample 4 KiB から codec を決定。streaming-aware codec なら streaming
1851            // compress fast path、そうでなければ従来の collect-then-compress。
1852            let (sample, rest_stream) = peek_sample(blob, SAMPLE_BYTES)
1853                .await
1854                .map_err(internal("peek put sample"))?;
1855            let sample_len = sample.len().min(SAMPLE_BYTES);
1856            // v0.8 #56: pass the request's Content-Length (when present) so
1857            // the sampling dispatcher can promote large objects to a GPU
1858            // codec. Chunked transfers (no Content-Length) keep CPU.
1859            let total_size_hint = req.input.content_length.and_then(|n| u64::try_from(n).ok());
1860            let kind = self
1861                .dispatcher
1862                .pick_with_size_hint(&sample[..sample_len], total_size_hint)
1863                .await;
1864
1865            // Passthrough buys nothing from S4F2 wrapping (no compression =
1866            // no per-chunk frame to skip past) and the +28-byte header
1867            // overhead breaks size-sensitive callers that expect a true
1868            // pass-through. So passthrough always uses the legacy raw-blob
1869            // path; only compressing codecs go through the framed path.
1870            let use_framed = supports_streaming_compress(kind) && kind != CodecKind::Passthrough;
1871            let (compressed, manifest, is_framed) = if use_framed {
1872                // streaming fast path: input は memory に collect しない
1873                let chained = chain_sample_with_rest(sample, rest_stream);
1874                debug!(
1875                    bucket = ?req.input.bucket,
1876                    key = ?req.input.key,
1877                    codec = kind.as_str(),
1878                    path = "streaming-framed",
1879                    "S4 put_object: compressing (streaming, S4F2 multi-frame)"
1880                );
1881                // v0.4 #16: pick the chunk size based on the request's
1882                // Content-Length when known, falling back to the 4 MiB
1883                // default for chunked transfers.
1884                let chunk_size = pick_chunk_size(req.input.content_length.map(|n| n as u64));
1885                let (body, manifest) = streaming_compress_to_frames(
1886                    chained,
1887                    Arc::clone(&self.registry),
1888                    kind,
1889                    chunk_size,
1890                )
1891                .await
1892                .map_err(internal("streaming framed compress"))?;
1893                (body, manifest, true)
1894            } else {
1895                // GPU codec 等で streaming-aware でないものは bytes-buffered path
1896                // (raw 圧縮 bytes、framed なし — back-compat 互換 path)
1897                let bytes = collect_with_sample(sample, rest_stream, self.max_body_bytes)
1898                    .await
1899                    .map_err(internal("collect put body (buffered path)"))?;
1900                debug!(
1901                    bucket = ?req.input.bucket,
1902                    key = ?req.input.key,
1903                    bytes = bytes.len(),
1904                    codec = kind.as_str(),
1905                    path = "buffered",
1906                    "S4 put_object: compressing (buffered, raw blob)"
1907                );
1908                // v0.8 #55: telemetry-returning compress so we can stamp
1909                // GPU-pipeline Prometheus metrics (`s4_gpu_compress_seconds`,
1910                // throughput gauge, OOM counter) for nvcomp / dietgpu codecs.
1911                // CPU codecs come back with `gpu_seconds = None` and the
1912                // stamp helper short-circuits — no extra cost on CPU path.
1913                let (compress_res, tel) =
1914                    self.registry.compress_with_telemetry(bytes, kind).await;
1915                stamp_gpu_compress_telemetry(&tel);
1916                let (body, m) = compress_res.map_err(internal("registry compress"))?;
1917                (body, m, false)
1918            };
1919
1920            write_manifest(&mut req.input.metadata, &manifest);
1921            if is_framed {
1922                // v0.2 #4: framed body であることを GET 側に伝える meta flag。
1923                req.input
1924                    .metadata
1925                    .get_or_insert_with(Default::default)
1926                    .insert(META_FRAMED.into(), "true".into());
1927            }
1928            // 重要: content_length を圧縮後サイズで更新する。
1929            // これを忘れると下流 (aws-sdk-s3 → S3) が宣言サイズ分の bytes を
1930            // 待ち続けて RequestTimeout で失敗する (S3 仕様)。
1931            req.input.content_length = Some(compressed.len() as i64);
1932            // body を書き換えたので、客側が送ってきた original body 用の
1933            // checksum / MD5 ヘッダは無効化する (そのまま転送すると下流 S3 が
1934            // XAmzContentChecksumMismatch を返す)。S4 自身の整合性は
1935            // ChunkManifest.crc32c で担保している。
1936            req.input.checksum_algorithm = None;
1937            req.input.checksum_crc32 = None;
1938            req.input.checksum_crc32c = None;
1939            req.input.checksum_crc64nvme = None;
1940            req.input.checksum_sha1 = None;
1941            req.input.checksum_sha256 = None;
1942            req.input.content_md5 = None;
1943            let original_size = manifest.original_size;
1944            let compressed_size = manifest.compressed_size;
1945            let codec_label = manifest.codec.as_str();
1946            // framed body は GET 側で sidecar partial-fetch を効かせるため
1947            // build_index_from_body で sidecar を組み立てて backend に PUT する。
1948            let sidecar_index = if is_framed {
1949                s4_codec::index::build_index_from_body(&compressed).ok()
1950            } else {
1951                None
1952            };
1953            // v0.4 #21 / v0.5 #29 / v0.5 #27: encrypt-after-compress.
1954            // Precedence:
1955            //   - SSE-C headers present → per-request customer key (S4E3)
1956            //   - server-managed keyring configured → active key (S4E2)
1957            //   - neither → no encryption (raw compressed body)
1958            // The `s4-encrypted: aes-256-gcm` metadata flag is set in
1959            // both encrypted modes; the on-disk frame magic distinguishes
1960            // S4E1 / S4E2 / S4E3 so GET picks the right decrypt path.
1961            // v0.7 #48 BUG-2/3 fix: take() the SSE fields off req.input
1962            // so the encryption headers are NOT forwarded to the
1963            // backend. S4 owns the encrypt-then-store contract; if we
1964            // leave the headers in place, real S3-compat backends
1965            // (MinIO / AWS) try to apply their own SSE on top and
1966            // either reject (MinIO requires HTTPS for SSE-C) or fail
1967            // (MinIO has no KMS configured). MemoryBackend ignored
1968            // these so mock tests passed.
1969            let sse_c_alg = req.input.sse_customer_algorithm.take();
1970            let sse_c_key = req.input.sse_customer_key.take();
1971            let sse_c_md5 = req.input.sse_customer_key_md5.take();
1972            let sse_header = req.input.server_side_encryption.take();
1973            let sse_kms_key = req.input.ssekms_key_id.take();
1974            let sse_c_material = extract_sse_c_material(&sse_c_alg, &sse_c_key, &sse_c_md5)?;
1975            // v0.5 #28: SSE-KMS request? Resolves to None unless the
1976            // request asks for `aws:kms` AND a key id is available
1977            // (explicit header or gateway default). When set, we'll
1978            // generate a per-object DEK below.
1979            let kms_key_id = extract_kms_key_id(
1980                &sse_header,
1981                &sse_kms_key,
1982                self.kms_default_key_id.as_deref(),
1983            );
1984            // v0.5 #32: in compliance-strict mode, every PUT must
1985            // declare SSE — either client-supplied (SSE-C), KMS, or by
1986            // virtue of a server-side keyring being configured (which
1987            // applies SSE-S4 to every PUT automatically). Requests that
1988            // would otherwise land as plain compressed bytes are
1989            // rejected with 400 InvalidRequest.
1990            if self.compliance_strict
1991                && sse_c_material.is_none()
1992                && kms_key_id.is_none()
1993                && self.sse_keyring.is_none()
1994                && sse_header.as_ref().map(|s| s.as_str())
1995                    != Some(ServerSideEncryption::AES256)
1996            {
1997                return Err(S3Error::with_message(
1998                    S3ErrorCode::InvalidRequest,
1999                    "compliance-mode strict: PUT must include x-amz-server-side-encryption \
2000                     (AES256 or aws:kms) or x-amz-server-side-encryption-customer-* headers",
2001                ));
2002            }
2003            // SSE-C and SSE-KMS are mutually exclusive on a single PUT
2004            // (AWS S3 returns 400 InvalidArgument). SSE-C wins by spec.
2005            if sse_c_material.is_some() && kms_key_id.is_some() {
2006                return Err(S3Error::with_message(
2007                    S3ErrorCode::InvalidArgument,
2008                    "SSE-C and SSE-KMS cannot be used together on the same PUT",
2009                ));
2010            }
2011            // KMS path needs to call generate_dek().await before the
2012            // body_to_send branch; capture the result here.
2013            //
2014            // v0.8.1 #58: the plaintext DEK lives in three places
2015            // during one PUT:
2016            //
2017            //   1. The `Zeroizing<Vec<u8>>` returned by `generate_dek`
2018            //      — wiped when the binding `dek` falls out of scope at
2019            //      the end of this `if`-arm.
2020            //   2. The stack `[u8; 32]` we copy into for `SseSource::Kms`
2021            //      — wrapped in `Zeroizing<[u8; 32]>` so it's wiped when
2022            //      the outer `kms_wrap` `Option` is dropped at the end
2023            //      of `put_object`.
2024            //   3. AES-GCM internal key state inside the `aes-gcm`
2025            //      crate during `encrypt_with_source` — out of scope
2026            //      for this fix; tracked separately in v0.8.2.
2027            let kms_wrap: Option<(zeroize::Zeroizing<[u8; 32]>, crate::kms::WrappedDek)> =
2028                if let Some(ref key_id) = kms_key_id {
2029                let kms = self.kms.as_ref().ok_or_else(|| {
2030                    S3Error::with_message(
2031                        S3ErrorCode::InvalidRequest,
2032                        "SSE-KMS requested but no --kms-local-dir / --kms-aws-region is configured on this gateway",
2033                    )
2034                })?;
2035                // `dek` is `Zeroizing<Vec<u8>>`; deref + slice access
2036                // works unchanged via `Deref<Target=Vec<u8>>`.
2037                let (dek, wrapped) = kms
2038                    .generate_dek(key_id)
2039                    .await
2040                    .map_err(kms_error_to_s3)?;
2041                if dek.len() != 32 {
2042                    return Err(S3Error::with_message(
2043                        S3ErrorCode::InternalError,
2044                        format!("KMS backend returned a DEK of {} bytes (expected 32)", dek.len()),
2045                    ));
2046                }
2047                let mut dek_arr: zeroize::Zeroizing<[u8; 32]> =
2048                    zeroize::Zeroizing::new([0u8; 32]);
2049                dek_arr.copy_from_slice(&dek);
2050                // `dek` (the `Zeroizing<Vec<u8>>`) is dropped at the
2051                // end of this scope, wiping the heap allocation.
2052                Some((dek_arr, wrapped))
2053            } else {
2054                None
2055            };
2056            // v0.7 #48 BUG-4 fix: stamp the SSE *type* into metadata
2057            // alongside `s4-encrypted` so HEAD (which doesn't fetch the
2058            // body) can echo the correct `x-amz-server-side-encryption`
2059            // value. Without this, HEAD on an SSE-KMS object would not
2060            // echo `aws:kms` because the frame magic is only available
2061            // on the body (which HEAD doesn't read).
2062            let body_to_send = if let Some(ref m) = sse_c_material {
2063                let meta = req.input.metadata.get_or_insert_with(Default::default);
2064                meta.insert("s4-encrypted".into(), "aes-256-gcm".into());
2065                meta.insert("s4-sse-type".into(), "AES256".into());
2066                meta.insert("s4-sse-c-key-md5".into(),
2067                    base64::engine::general_purpose::STANDARD.encode(m.key_md5));
2068                crate::sse::encrypt_with_source(
2069                    &compressed,
2070                    crate::sse::SseSource::CustomerKey {
2071                        key: &m.key,
2072                        key_md5: &m.key_md5,
2073                    },
2074                )
2075            } else if let Some((ref dek, ref wrapped)) = kms_wrap {
2076                let meta = req.input.metadata.get_or_insert_with(Default::default);
2077                meta.insert("s4-encrypted".into(), "aes-256-gcm".into());
2078                meta.insert("s4-sse-type".into(), "aws:kms".into());
2079                meta.insert("s4-sse-kms-key-id".into(), wrapped.key_id.clone());
2080                // v0.8.1 #58: `dek` is `&Zeroizing<[u8; 32]>`; `SseSource::Kms`
2081                // wants `&[u8; 32]`. Rust auto-derefs `&Zeroizing<T>` to
2082                // `&T` here via `Deref<Target=T>`, so the binding picks
2083                // up the inner array reference without copying. The array
2084                // stays in the `Zeroizing` wrapper that owns it and gets
2085                // wiped when `kms_wrap` drops at the end of `put_object`.
2086                let dek_ref: &[u8; 32] = dek;
2087                crate::sse::encrypt_with_source(
2088                    &compressed,
2089                    crate::sse::SseSource::Kms { dek: dek_ref, wrapped },
2090                )
2091            } else if let Some(keyring) = self.sse_keyring.as_ref() {
2092                // SSE-S4 is server-driven transparent encryption; the
2093                // client didn't ask for SSE. We stamp `s4-encrypted`
2094                // (internal flag the GET path needs) but deliberately
2095                // do NOT stamp `s4-sse-type` — that lights up the HEAD
2096                // echo of `x-amz-server-side-encryption: AES256`,
2097                // which would falsely advertise AWS-style SSE-S3
2098                // semantics the operator didn't request.
2099                let meta = req.input.metadata.get_or_insert_with(Default::default);
2100                meta.insert("s4-encrypted".into(), "aes-256-gcm".into());
2101                // v0.8 #52: when `--sse-chunk-size > 0` is configured,
2102                // emit the chunked S4E5 frame so the matching GET can
2103                // stream-decrypt instead of buffering 5 GiB before
2104                // emitting a byte. Falls back to the buffered S4E2
2105                // frame at chunk_size=0 (default) so existing
2106                // deployments are bit-for-bit unchanged.
2107                if self.sse_chunk_size > 0 {
2108                    crate::sse::encrypt_v2_chunked(
2109                        &compressed,
2110                        keyring,
2111                        self.sse_chunk_size,
2112                    )
2113                    .map_err(|e| {
2114                        S3Error::with_message(
2115                            S3ErrorCode::InternalError,
2116                            format!("SSE-S4 chunked encrypt failed: {e}"),
2117                        )
2118                    })?
2119                } else {
2120                    crate::sse::encrypt_v2(&compressed, keyring)
2121                }
2122            } else {
2123                compressed.clone()
2124            };
2125            // v0.6 #40: capture the about-to-be-sent body + metadata so
2126            // the replication dispatcher (run after the source PUT
2127            // succeeds) can hand the same backend bytes to the
2128            // destination bucket. `Bytes` clone is cheap (refcounted).
2129            let replication_body = body_to_send.clone();
2130            let replication_metadata = req.input.metadata.clone();
2131            // v0.7 #48 BUG-1 fix: SSE encryption (S4E1/E2/E3/E4 frames)
2132            // makes the body longer than the post-compression bytes
2133            // (header + nonce + tag overhead). The earlier
2134            // content_length stamp at compressed.len() is now stale, so
2135            // re-stamp from the actual bytes about to be sent or the
2136            // backend (real S3 / MinIO) rejects with
2137            // `StreamLengthMismatch`. MemoryBackend never validated
2138            // this, which is why mock-only tests passed.
2139            req.input.content_length = Some(body_to_send.len() as i64);
2140            req.input.body = Some(bytes_to_blob(body_to_send));
2141            // v0.5 #34: pre-allocate a version-id when the bucket is
2142            // Enabled, then redirect the backend storage key to the
2143            // shadow path so older versions survive newer PUTs.
2144            // Suspended / Unversioned buckets keep using the plain
2145            // `<key>` (S3 spec: Suspended overwrites the same backend
2146            // object). Pre-allocation (instead of recording after PUT)
2147            // ensures the shadow key + the response's
2148            // `x-amz-version-id` use the same vid.
2149            let pending_version: Option<crate::versioning::PutOutcome> = self
2150                .versioning
2151                .as_ref()
2152                .map(|mgr| mgr.state(&put_bucket))
2153                .map(|state| match state {
2154                    crate::versioning::VersioningState::Enabled => {
2155                        crate::versioning::PutOutcome {
2156                            version_id: crate::versioning::VersioningManager::new_version_id(),
2157                            versioned_response: true,
2158                        }
2159                    }
2160                    crate::versioning::VersioningState::Suspended
2161                    | crate::versioning::VersioningState::Unversioned => {
2162                        crate::versioning::PutOutcome {
2163                            version_id: crate::versioning::NULL_VERSION_ID.to_owned(),
2164                            versioned_response: false,
2165                        }
2166                    }
2167                });
2168            if let Some(ref pv) = pending_version
2169                && pv.versioned_response
2170            {
2171                req.input.key = versioned_shadow_key(&put_key, &pv.version_id);
2172            }
2173            let mut backend_resp = self.backend.put_object(req).await;
2174            if let Some(idx) = sidecar_index
2175                && backend_resp.is_ok()
2176                && idx.entries.len() > 1
2177            {
2178                // 1 chunk しかない (small object) なら sidecar は意味がない (=
2179                // partial fetch しても full body と同じ範囲) ので省略。
2180                // Sidecar は user-visible key で書く (latest version の
2181                // partial fetch path 用)。Old versions の Range GET は今 task
2182                // の scope 外 (full read fallback でも意味的には正しい)。
2183                self.write_sidecar(&put_bucket, &put_key, &idx).await;
2184            }
2185            // v0.5 #34: commit the new version into the manager only on
2186            // backend success. Use the pre-allocated vid so the response
2187            // header and the chain entry agree.
2188            if let (Some(mgr), Some(pv), Ok(resp)) = (
2189                self.versioning.as_ref(),
2190                pending_version.as_ref(),
2191                backend_resp.as_mut(),
2192            ) {
2193                let etag = resp
2194                    .output
2195                    .e_tag
2196                    .clone()
2197                    .map(ETag::into_value)
2198                    .unwrap_or_else(|| format!("\"crc32c-{}\"", manifest.crc32c));
2199                let now = chrono::Utc::now();
2200                mgr.commit_put_with_version(
2201                    &put_bucket,
2202                    &put_key,
2203                    crate::versioning::VersionEntry {
2204                        version_id: pv.version_id.clone(),
2205                        etag,
2206                        size: original_size,
2207                        is_delete_marker: false,
2208                        created_at: now,
2209                    },
2210                );
2211                if pv.versioned_response {
2212                    resp.output.version_id = Some(pv.version_id.clone());
2213                }
2214            }
2215            // v0.5 #27: AWS S3 echoes the SSE-C headers back on success
2216            // so the client knows the server actually applied the
2217            // requested algorithm and which key fingerprint matched.
2218            if let (Some(m), Ok(resp)) = (sse_c_material.as_ref(), backend_resp.as_mut()) {
2219                resp.output.sse_customer_algorithm = Some(crate::sse::SSE_C_ALGORITHM.into());
2220                resp.output.sse_customer_key_md5 = Some(
2221                    base64::engine::general_purpose::STANDARD.encode(m.key_md5),
2222                );
2223            }
2224            // v0.5 #28: SSE-KMS echo — `aws:kms` + the canonical key id
2225            // the backend returned (AWS KMS returns the ARN even when
2226            // the request used an alias).
2227            if let (Some((_, wrapped)), Ok(resp)) =
2228                (kms_wrap.as_ref(), backend_resp.as_mut())
2229            {
2230                resp.output.server_side_encryption =
2231                    Some(ServerSideEncryption::from_static(ServerSideEncryption::AWS_KMS));
2232                resp.output.ssekms_key_id = Some(wrapped.key_id.clone());
2233            }
2234            // v0.5 #30: persist any per-PUT explicit retention / legal
2235            // hold the client supplied, then auto-apply the bucket
2236            // default (no-op when state is already populated). The
2237            // explicit fields take precedence — the bucket-default
2238            // helper bails out as soon as it sees any retention.
2239            if let (Some(mgr), Ok(_)) = (self.object_lock.as_ref(), backend_resp.as_ref()) {
2240                if explicit_lock_mode.is_some()
2241                    || explicit_retain_until.is_some()
2242                    || explicit_legal_hold_on.is_some()
2243                {
2244                    let mut state = mgr.get(&put_bucket, &put_key).unwrap_or_default();
2245                    if let Some(m) = explicit_lock_mode {
2246                        state.mode = Some(m);
2247                    }
2248                    if let Some(u) = explicit_retain_until {
2249                        state.retain_until = Some(u);
2250                    }
2251                    if let Some(lh) = explicit_legal_hold_on {
2252                        state.legal_hold_on = lh;
2253                    }
2254                    mgr.set(&put_bucket, &put_key, state);
2255                }
2256                mgr.apply_default_on_put(&put_bucket, &put_key, chrono::Utc::now());
2257            }
2258            let _ = (original_size, compressed_size); // mute unused warnings
2259            let elapsed = put_start.elapsed();
2260            crate::metrics::record_put(
2261                codec_label,
2262                original_size,
2263                compressed_size,
2264                elapsed.as_secs_f64(),
2265                backend_resp.is_ok(),
2266            );
2267            // v0.4 #20: structured access-log entry (best-effort).
2268            self.record_access(
2269                access_preamble,
2270                "REST.PUT.OBJECT",
2271                &put_bucket,
2272                Some(&put_key),
2273                if backend_resp.is_ok() { 200 } else { 500 },
2274                compressed_size,
2275                original_size,
2276                elapsed.as_millis() as u64,
2277                backend_resp.as_ref().err().map(|e| e.code().as_str()),
2278            )
2279            .await;
2280            info!(
2281                op = "put_object",
2282                bucket = %put_bucket,
2283                key = %put_key,
2284                codec = codec_label,
2285                bytes_in = original_size,
2286                bytes_out = compressed_size,
2287                ratio = format!(
2288                    "{:.3}",
2289                    if original_size == 0 { 1.0 } else { compressed_size as f64 / original_size as f64 }
2290                ),
2291                latency_ms = elapsed.as_millis() as u64,
2292                ok = backend_resp.is_ok(),
2293                "S4 put completed"
2294            );
2295            // v0.6 #35: fire bucket-notification destinations (best-effort,
2296            // detached). Skipped when no manager is attached or when the
2297            // bucket has no rule matching `s3:ObjectCreated:Put` for this
2298            // key.
2299            if backend_resp.is_ok()
2300                && let Some(mgr) = self.notifications.as_ref()
2301            {
2302                let dests = mgr.match_destinations(
2303                    &put_bucket,
2304                    &crate::notifications::EventType::ObjectCreatedPut,
2305                    &put_key,
2306                );
2307                if !dests.is_empty() {
2308                    let etag = backend_resp
2309                        .as_ref()
2310                        .ok()
2311                        .and_then(|r| r.output.e_tag.clone())
2312                        .map(ETag::into_value);
2313                    let version_id = pending_version
2314                        .as_ref()
2315                        .filter(|pv| pv.versioned_response)
2316                        .map(|pv| pv.version_id.clone());
2317                    tokio::spawn(crate::notifications::dispatch_event(
2318                        Arc::clone(mgr),
2319                        put_bucket.clone(),
2320                        put_key.clone(),
2321                        crate::notifications::EventType::ObjectCreatedPut,
2322                        Some(original_size),
2323                        etag,
2324                        version_id,
2325                        format!("S4-{}", uuid::Uuid::new_v4()),
2326                    ));
2327                }
2328            }
2329            // v0.6 #39: persist parsed `x-amz-tagging` tags into the
2330            // tagging manager on a successful PUT. AWS PutObject's
2331            // tagging is a full-replace operation (not a merge), so
2332            // any pre-existing entry for `(bucket, key)` is overwritten.
2333            if backend_resp.is_ok()
2334                && let (Some(mgr), Some(tags)) =
2335                    (self.tagging.as_ref(), request_tags.clone())
2336            {
2337                mgr.put_object_tags(&put_bucket, &put_key, tags);
2338            }
2339            // v0.6 #40: cross-bucket replication fire-point. On
2340            // successful source PUT, consult the replication manager;
2341            // when an enabled rule matches, mark the source key
2342            // `Pending` and spawn a detached task that PUTs the same
2343            // backend bytes + metadata to the rule's destination
2344            // bucket. The dispatcher itself records `Completed` /
2345            // `Failed` and bumps the drop counter on retry-budget
2346            // exhaustion.
2347            self.spawn_replication_if_matched(
2348                &put_bucket,
2349                &put_key,
2350                &request_tags,
2351                &replication_body,
2352                &replication_metadata,
2353                backend_resp.is_ok(),
2354                pending_version.as_ref(),
2355            );
2356            return backend_resp;
2357        }
2358        // Body-less PUT (rare: zero-length object). Mirror the body-full
2359        // versioning hooks so list_object_versions / GET-by-version still see
2360        // empty-body objects in the chain.
2361        let pending_version: Option<crate::versioning::PutOutcome> = self
2362            .versioning
2363            .as_ref()
2364            .map(|mgr| mgr.state(&put_bucket))
2365            .map(|state| match state {
2366                crate::versioning::VersioningState::Enabled => crate::versioning::PutOutcome {
2367                    version_id: crate::versioning::VersioningManager::new_version_id(),
2368                    versioned_response: true,
2369                },
2370                _ => crate::versioning::PutOutcome {
2371                    version_id: crate::versioning::NULL_VERSION_ID.to_owned(),
2372                    versioned_response: false,
2373                },
2374            });
2375        if let Some(ref pv) = pending_version
2376            && pv.versioned_response
2377        {
2378            req.input.key = versioned_shadow_key(&put_key, &pv.version_id);
2379        }
2380        let mut backend_resp = self.backend.put_object(req).await;
2381        if let (Some(mgr), Some(pv), Ok(resp)) = (
2382            self.versioning.as_ref(),
2383            pending_version.as_ref(),
2384            backend_resp.as_mut(),
2385        ) {
2386            let etag = resp
2387                .output
2388                .e_tag
2389                .clone()
2390                .map(ETag::into_value)
2391                .unwrap_or_default();
2392            let now = chrono::Utc::now();
2393            mgr.commit_put_with_version(
2394                &put_bucket,
2395                &put_key,
2396                crate::versioning::VersionEntry {
2397                    version_id: pv.version_id.clone(),
2398                    etag,
2399                    size: 0,
2400                    is_delete_marker: false,
2401                    created_at: now,
2402                },
2403            );
2404            if pv.versioned_response {
2405                resp.output.version_id = Some(pv.version_id.clone());
2406            }
2407        }
2408        // v0.5 #30: same explicit-then-default lock-state commit as the
2409        // body-bearing branch above, so a zero-length PUT also picks up
2410        // bucket-default retention.
2411        if let (Some(mgr), Ok(_)) = (self.object_lock.as_ref(), backend_resp.as_ref()) {
2412            if explicit_lock_mode.is_some()
2413                || explicit_retain_until.is_some()
2414                || explicit_legal_hold_on.is_some()
2415            {
2416                let mut state = mgr.get(&put_bucket, &put_key).unwrap_or_default();
2417                if let Some(m) = explicit_lock_mode {
2418                    state.mode = Some(m);
2419                }
2420                if let Some(u) = explicit_retain_until {
2421                    state.retain_until = Some(u);
2422                }
2423                if let Some(lh) = explicit_legal_hold_on {
2424                    state.legal_hold_on = lh;
2425                }
2426                mgr.set(&put_bucket, &put_key, state);
2427            }
2428            mgr.apply_default_on_put(&put_bucket, &put_key, chrono::Utc::now());
2429        }
2430        // v0.6 #35: same notification fire-point as the body-bearing PUT
2431        // branch above (zero-length objects still match `ObjectCreated:Put`
2432        // rules per the AWS event taxonomy).
2433        if backend_resp.is_ok()
2434            && let Some(mgr) = self.notifications.as_ref()
2435        {
2436            let dests = mgr.match_destinations(
2437                &put_bucket,
2438                &crate::notifications::EventType::ObjectCreatedPut,
2439                &put_key,
2440            );
2441            if !dests.is_empty() {
2442                let etag = backend_resp
2443                    .as_ref()
2444                    .ok()
2445                    .and_then(|r| r.output.e_tag.clone())
2446                    .map(ETag::into_value);
2447                let version_id = pending_version
2448                    .as_ref()
2449                    .filter(|pv| pv.versioned_response)
2450                    .map(|pv| pv.version_id.clone());
2451                tokio::spawn(crate::notifications::dispatch_event(
2452                    Arc::clone(mgr),
2453                    put_bucket.clone(),
2454                    put_key.clone(),
2455                    crate::notifications::EventType::ObjectCreatedPut,
2456                    Some(0),
2457                    etag,
2458                    version_id,
2459                    format!("S4-{}", uuid::Uuid::new_v4()),
2460                ));
2461            }
2462        }
2463        // v0.6 #39: persist parsed `x-amz-tagging` for the body-less
2464        // (zero-length) PUT branch too — same shape as the body-bearing
2465        // branch above.
2466        if backend_resp.is_ok()
2467            && let (Some(mgr), Some(tags)) = (self.tagging.as_ref(), request_tags.clone())
2468        {
2469            mgr.put_object_tags(&put_bucket, &put_key, tags);
2470        }
2471        // v0.6 #40: cross-bucket replication for the zero-length PUT
2472        // branch — same shape as the body-bearing branch above.
2473        // v0.8.2 #61: pass `pending_version` so a versioned source's
2474        // destination receives the same shadow-key path.
2475        self.spawn_replication_if_matched(
2476            &put_bucket,
2477            &put_key,
2478            &request_tags,
2479            &bytes::Bytes::new(),
2480            &None,
2481            backend_resp.is_ok(),
2482            pending_version.as_ref(),
2483        );
2484        backend_resp
2485    }
2486
2487    // === 圧縮を解く path (GET) ===
2488    #[tracing::instrument(
2489        name = "s4.get_object",
2490        skip(self, req),
2491        fields(bucket = %req.input.bucket, key = %req.input.key, codec, bytes_out, range, path)
2492    )]
2493    async fn get_object(
2494        &self,
2495        mut req: S3Request<GetObjectInput>,
2496    ) -> S3Result<S3Response<GetObjectOutput>> {
2497        let get_start = Instant::now();
2498        let get_bucket = req.input.bucket.clone();
2499        let get_key = req.input.key.clone();
2500        self.enforce_rate_limit(&req, &get_bucket)?;
2501        self.enforce_policy(&req, "s3:GetObject", &get_bucket, Some(&get_key))?;
2502        // Range request の事前検出 (decompress 後 slice する path に使う)。
2503        let range_request = req.input.range.take();
2504        // v0.5 #27: pull SSE-C material from the input headers before
2505        // the request is moved into the backend. A header parse error
2506        // fails fast (no body fetch). The material is consumed below
2507        // when decrypting an S4E3-framed body; the SSE-C headers on
2508        // `req.input` are cleared so the backend doesn't see them.
2509        let sse_c_alg = req.input.sse_customer_algorithm.take();
2510        let sse_c_key = req.input.sse_customer_key.take();
2511        let sse_c_md5 = req.input.sse_customer_key_md5.take();
2512        let get_sse_c_material =
2513            extract_sse_c_material(&sse_c_alg, &sse_c_key, &sse_c_md5)?;
2514
2515        // v0.5 #34: route the GET through the VersioningManager when
2516        // attached AND the bucket is in a versioning-aware state.
2517        // Resolves which version to fetch (explicit `?versionId=` query
2518        // param vs. chain latest), translates a delete-marker into 404
2519        // NoSuchKey, and rewrites the backend storage key to the shadow
2520        // path (`<key>.__s4ver__/<vid>`) for non-null Enabled-bucket
2521        // versions. `resolved_version_id` is stamped onto the response
2522        // so clients see a coherent `x-amz-version-id` header.
2523        //
2524        // When the bucket is Unversioned (or no manager attached), the
2525        // chain-resolution step is skipped and the request flows
2526        // through the existing single-key path unchanged.
2527        let resolved_version_id: Option<String> = match self.versioning.as_ref() {
2528            Some(mgr)
2529                if mgr.state(&get_bucket) != crate::versioning::VersioningState::Unversioned =>
2530            {
2531                let req_vid = req.input.version_id.take();
2532                let entry = match req_vid.as_deref() {
2533                    Some(vid) => mgr.lookup_version(&get_bucket, &get_key, vid).ok_or_else(
2534                        || S3Error::with_message(
2535                            S3ErrorCode::NoSuchVersion,
2536                            format!("no such version: {vid}"),
2537                        ),
2538                    )?,
2539                    None => mgr.lookup_latest(&get_bucket, &get_key).ok_or_else(|| {
2540                        S3Error::with_message(
2541                            S3ErrorCode::NoSuchKey,
2542                            format!("no such key: {get_key}"),
2543                        )
2544                    })?,
2545                };
2546                if entry.is_delete_marker {
2547                    // S3 spec: GET without versionId on a
2548                    // delete-marker latest → 404 NoSuchKey + the
2549                    // response carries `x-amz-delete-marker: true`.
2550                    // GET with explicit versionId pointing at a delete
2551                    // marker → 405 MethodNotAllowed; we surface
2552                    // NoSuchKey here for both since s3s collapses them
2553                    // into the same not-found error path.
2554                    return Err(S3Error::with_message(
2555                        S3ErrorCode::NoSuchKey,
2556                        format!("delete marker is the current version of {get_key}"),
2557                    ));
2558                }
2559                if entry.version_id != crate::versioning::NULL_VERSION_ID {
2560                    req.input.key = versioned_shadow_key(&get_key, &entry.version_id);
2561                }
2562                Some(entry.version_id)
2563            }
2564            _ => None,
2565        };
2566
2567        // ====== Range GET の partial-fetch fast path (sidecar index 利用) ======
2568        // sidecar `<key>.s4index` が存在し、multipart-framed object であれば
2569        // 必要 frame だけを backend に Range GET し帯域節約する。
2570        if let Some(ref r) = range_request
2571            && let Some(index) = self.read_sidecar(&req.input.bucket, &req.input.key).await
2572        {
2573            let total = index.total_original_size();
2574            let (start, end_exclusive) = match resolve_range(r, total) {
2575                Ok(v) => v,
2576                Err(e) => {
2577                    return Err(S3Error::with_message(S3ErrorCode::InvalidRange, e));
2578                }
2579            };
2580            if let Some(plan) = index.lookup_range(start, end_exclusive) {
2581                return self
2582                    .partial_range_get(&req, plan, start, end_exclusive, total, get_start)
2583                    .await;
2584            }
2585        }
2586        let mut resp = self.backend.get_object(req).await?;
2587        // v0.5 #34: stamp the resolved version-id so the client sees a
2588        // coherent `x-amz-version-id` header (only for chains owned by
2589        // the manager — Unversioned buckets / no-manager paths never
2590        // set this).
2591        if let Some(ref vid) = resolved_version_id {
2592            resp.output.version_id = Some(vid.clone());
2593        }
2594        let is_multipart = is_multipart_object(&resp.output.metadata);
2595        let is_framed_v2 = is_framed_v2_object(&resp.output.metadata);
2596        // v0.2 #4: framed-v2 single-PUT は多 frame parse が必要なので
2597        // multipart と同じ path に流す。
2598        let needs_frame_parse = is_multipart || is_framed_v2;
2599        let manifest_opt = extract_manifest(&resp.output.metadata);
2600
2601        if !needs_frame_parse && manifest_opt.is_none() {
2602            // S4 が書いていないオブジェクトは透過 (raw bucket pre-existing object 等)
2603            debug!("S4 get_object: object lacks s4-codec metadata, returning as-is");
2604            return Ok(resp);
2605        }
2606
2607        if let Some(blob) = resp.output.body.take() {
2608            // v0.4 #21 / v0.5 #27: if the object was stored under SSE
2609            // (metadata flag `s4-encrypted: aes-256-gcm`), decrypt
2610            // before any frame parse / streaming decompress. Encrypted
2611            // bodies are opaque to the codec; this also forces the
2612            // buffered path because AES-GCM needs the full body for tag
2613            // verify. SSE-C uses the per-request customer key, SSE-S4
2614            // falls back to the configured keyring.
2615            let blob = if is_sse_encrypted(&resp.output.metadata) {
2616                let body = collect_blob(blob, self.max_body_bytes)
2617                    .await
2618                    .map_err(internal("collect SSE-encrypted body"))?;
2619                // v0.5 #28: peek the frame magic to route the right
2620                // decrypt path. S4E4 means SSE-KMS — unwrap the DEK
2621                // through the KMS backend (async). S4E1/E2/E3 take
2622                // the sync path (keyring or customer key).
2623                //
2624                // v0.8 #52 (S4E5) / v0.8.1 #57 (S4E6): the chunked
2625                // SSE-S4 frames take the *streaming* path — we hand
2626                // the response body a per-chunk verify-and-emit
2627                // Stream so the client sees chunk 0 plaintext after
2628                // one chunk-worth of AES-GCM verify (vs. waiting
2629                // for the whole body's tag), and the gateway no
2630                // longer needs to materialize the full plaintext
2631                // in memory before responding. SSE-C is out of
2632                // scope for the chunked path (chunked S4E3 is a
2633                // follow-up), so this branch requires the SSE-S4
2634                // keyring to be wired and `get_sse_c_material` to
2635                // be absent — otherwise we surface a clear
2636                // misconfiguration error instead of silently
2637                // falling through to the buffered chunked path.
2638                if matches!(
2639                    crate::sse::peek_magic(&body),
2640                    Some("S4E5") | Some("S4E6")
2641                ) && get_sse_c_material.is_none()
2642                {
2643                    let keyring_arc = self.sse_keyring.clone().ok_or_else(|| {
2644                        S3Error::with_message(
2645                            S3ErrorCode::InvalidRequest,
2646                            "object is SSE-S4 encrypted (S4E5/S4E6) but no --sse-s4-key is configured on this gateway",
2647                        )
2648                    })?;
2649                    let body_len = body.len() as u64;
2650                    let stream =
2651                        crate::sse::decrypt_chunked_stream(body, keyring_arc.as_ref());
2652                    // Stream is `'static` (the keyring borrow is
2653                    // consumed up front; the cipher lives inside
2654                    // the stream state — see decrypt_chunked_stream
2655                    // doc), so we can move it straight into a
2656                    // StreamingBlob without lifetime gymnastics.
2657                    use futures::StreamExt;
2658                    let mapped = stream.map(|r| {
2659                        r.map_err(|e| std::io::Error::other(format!(
2660                            "SSE-S4 chunked decrypt: {e}"
2661                        )))
2662                    });
2663                    use s3s::dto::StreamingBlob;
2664                    resp.output.body = Some(StreamingBlob::wrap(mapped));
2665                    // Plaintext content_length is unknown until all
2666                    // chunks have been verified; null it out so the
2667                    // ByteStream wrapper reports `unknown` to the
2668                    // HTTP layer (which then emits chunked transfer-
2669                    // encoding) rather than lying about the size.
2670                    resp.output.content_length = None;
2671                    // The backend's checksums + ETag describe the
2672                    // encrypted body (S4E5/S4E6 wire format), not
2673                    // the plaintext we're about to stream — clear them
2674                    // so the AWS SDK doesn't fail the GET with a
2675                    // ChecksumMismatch on a successful round-trip.
2676                    // Mirrors the streaming-zstd path at L1180-1185.
2677                    resp.output.checksum_crc32 = None;
2678                    resp.output.checksum_crc32c = None;
2679                    resp.output.checksum_crc64nvme = None;
2680                    resp.output.checksum_sha1 = None;
2681                    resp.output.checksum_sha256 = None;
2682                    resp.output.e_tag = None;
2683                    let elapsed = get_start.elapsed();
2684                    crate::metrics::record_get(
2685                        "sse-s4-chunked",
2686                        body_len,
2687                        body_len,
2688                        elapsed.as_secs_f64(),
2689                        true,
2690                    );
2691                    return Ok(resp);
2692                }
2693                let plain = match crate::sse::peek_magic(&body) {
2694                    Some("S4E4") => {
2695                        let kms = self.kms.as_ref().ok_or_else(|| {
2696                            S3Error::with_message(
2697                                S3ErrorCode::InvalidRequest,
2698                                "object is SSE-KMS encrypted but no --kms-local-dir / --kms-aws-region is configured on this gateway",
2699                            )
2700                        })?;
2701                        let kms_ref: &dyn crate::kms::KmsBackend = kms.as_ref();
2702                        crate::sse::decrypt_with_kms(&body, kms_ref)
2703                            .await
2704                            .map_err(|e| match e {
2705                                crate::sse::SseError::KmsBackend(k) => kms_error_to_s3(k),
2706                                other => S3Error::with_message(
2707                                    S3ErrorCode::InternalError,
2708                                    format!("SSE-KMS decrypt failed: {other}"),
2709                                ),
2710                            })?
2711                    }
2712                    _ => {
2713                        if let Some(ref m) = get_sse_c_material {
2714                            crate::sse::decrypt(
2715                                &body,
2716                                crate::sse::SseSource::CustomerKey {
2717                                    key: &m.key,
2718                                    key_md5: &m.key_md5,
2719                                },
2720                            )
2721                            .map_err(sse_c_error_to_s3)?
2722                        } else {
2723                            let keyring = self.sse_keyring.as_ref().ok_or_else(|| {
2724                                S3Error::with_message(
2725                                    S3ErrorCode::InvalidRequest,
2726                                    "object is SSE-S4 encrypted but no --sse-s4-key is configured on this gateway",
2727                                )
2728                            })?;
2729                            crate::sse::decrypt(&body, keyring).map_err(|e| {
2730                                S3Error::with_message(
2731                                    S3ErrorCode::InternalError,
2732                                    format!("SSE-S4 decrypt failed: {e}"),
2733                                )
2734                            })?
2735                        }
2736                    }
2737                };
2738                // v0.5 #28: parse out the on-disk wrapped DEK's key id
2739                // so the GET response can echo `x-amz-server-side-encryption-aws-kms-key-id`.
2740                if matches!(crate::sse::peek_magic(&body), Some("S4E4"))
2741                    && let Ok(hdr) = crate::sse::parse_s4e4_header(&body)
2742                {
2743                    resp.output.server_side_encryption = Some(
2744                        ServerSideEncryption::from_static(ServerSideEncryption::AWS_KMS),
2745                    );
2746                    resp.output.ssekms_key_id = Some(hdr.key_id.to_string());
2747                }
2748                bytes_to_blob(plain)
2749            } else if let Some(ref m) = get_sse_c_material {
2750                // Client sent SSE-C headers for an unencrypted object —
2751                // mirror AWS S3's 400 InvalidRequest.
2752                let _ = m;
2753                return Err(sse_c_error_to_s3(crate::sse::SseError::CustomerKeyUnexpected));
2754            } else {
2755                blob
2756            };
2757            // v0.5 #27: SSE-C echo on success — algorithm + key MD5
2758            // tell the client that the supplied key was the one used.
2759            if let Some(ref m) = get_sse_c_material {
2760                resp.output.sse_customer_algorithm = Some(crate::sse::SSE_C_ALGORITHM.into());
2761                resp.output.sse_customer_key_md5 = Some(
2762                    base64::engine::general_purpose::STANDARD.encode(m.key_md5),
2763                );
2764            }
2765            // ====== Streaming fast path (CpuZstd, non-multipart, codec supports it) ======
2766            // 大規模 object (e.g. 5 GB) を memory に collect すると OOM するので、
2767            // codec が streaming-aware なら body を chunk-by-chunk で decompress して
2768            // 即座に client に流す。
2769            //
2770            // ただし Range request 時は streaming できない (slice するため total bytes
2771            // が必要) → buffered path に fall through。
2772            if range_request.is_none()
2773                && !needs_frame_parse
2774                && let Some(ref m) = manifest_opt
2775                && supports_streaming_decompress(m.codec)
2776                && m.codec == CodecKind::CpuZstd
2777            {
2778                let decompressed_blob = cpu_zstd_decompress_stream(blob);
2779                resp.output.content_length = Some(m.original_size as i64);
2780                resp.output.checksum_crc32 = None;
2781                resp.output.checksum_crc32c = None;
2782                resp.output.checksum_crc64nvme = None;
2783                resp.output.checksum_sha1 = None;
2784                resp.output.checksum_sha256 = None;
2785                resp.output.e_tag = None;
2786                resp.output.body = Some(decompressed_blob);
2787                let elapsed = get_start.elapsed();
2788                crate::metrics::record_get(
2789                    m.codec.as_str(),
2790                    m.compressed_size,
2791                    m.original_size,
2792                    elapsed.as_secs_f64(),
2793                    true,
2794                );
2795                info!(
2796                    op = "get_object",
2797                    bucket = %get_bucket,
2798                    key = %get_key,
2799                    codec = m.codec.as_str(),
2800                    bytes_in = m.compressed_size,
2801                    bytes_out = m.original_size,
2802                    path = "streaming",
2803                    setup_latency_ms = elapsed.as_millis() as u64,
2804                    "S4 get started (streaming)"
2805                );
2806                return Ok(resp);
2807            }
2808            // Passthrough: そのまま流す (Range なしの場合のみ streaming)
2809            if range_request.is_none()
2810                && !needs_frame_parse
2811                && let Some(ref m) = manifest_opt
2812                && m.codec == CodecKind::Passthrough
2813            {
2814                resp.output.content_length = Some(m.original_size as i64);
2815                resp.output.checksum_crc32 = None;
2816                resp.output.checksum_crc32c = None;
2817                resp.output.checksum_crc64nvme = None;
2818                resp.output.checksum_sha1 = None;
2819                resp.output.checksum_sha256 = None;
2820                resp.output.e_tag = None;
2821                resp.output.body = Some(blob);
2822                debug!("S4 get_object: passthrough streaming");
2823                return Ok(resp);
2824            }
2825
2826            // ====== Buffered slow path (multipart frame parser, GPU codecs) ======
2827            let bytes = collect_blob(blob, self.max_body_bytes)
2828                .await
2829                .map_err(internal("collect get body"))?;
2830
2831            let decompressed = if needs_frame_parse {
2832                // multipart objects と framed-v2 single-PUT objects は同じ
2833                // S4F2 frame 列なので decompress_multipart で統一処理
2834                self.decompress_multipart(bytes).await?
2835            } else {
2836                let manifest = manifest_opt.as_ref().expect("non-multipart guarded above");
2837                self.registry
2838                    .decompress(bytes, manifest)
2839                    .await
2840                    .map_err(internal("registry decompress"))?
2841            };
2842
2843            // Range request があれば slice。なければ full body を返す。
2844            let total_size = decompressed.len() as u64;
2845            let (final_bytes, status_override) = if let Some(r) = range_request.as_ref() {
2846                let (start, end) = resolve_range(r, total_size)
2847                    .map_err(|e| S3Error::with_message(S3ErrorCode::InvalidRange, e))?;
2848                let sliced = decompressed.slice(start as usize..end as usize);
2849                resp.output.content_range = Some(format!(
2850                    "bytes {start}-{}/{total_size}",
2851                    end.saturating_sub(1)
2852                ));
2853                (sliced, Some(http::StatusCode::PARTIAL_CONTENT))
2854            } else {
2855                (decompressed, None)
2856            };
2857            // 解凍後の真のサイズを返す (S3 client は content_length を信頼するので
2858            // 圧縮 size のままだと downstream が body を途中で切ってしまう)
2859            resp.output.content_length = Some(final_bytes.len() as i64);
2860            // 圧縮済 bytes の checksum を返すと AWS SDK 側で StreamingError
2861            // (ChecksumMismatch) になる。ETag も backend が返した「圧縮済 bytes の
2862            // MD5/checksum」なので意味的にズレる — クリアして S4 自身の crc32c
2863            // (manifest 内 / frame 内) で integrity を保証する設計にする。
2864            resp.output.checksum_crc32 = None;
2865            resp.output.checksum_crc32c = None;
2866            resp.output.checksum_crc64nvme = None;
2867            resp.output.checksum_sha1 = None;
2868            resp.output.checksum_sha256 = None;
2869            resp.output.e_tag = None;
2870            let returned_size = final_bytes.len() as u64;
2871            let codec_label = manifest_opt
2872                .as_ref()
2873                .map(|m| m.codec.as_str())
2874                .unwrap_or("multipart");
2875            resp.output.body = Some(bytes_to_blob(final_bytes));
2876            if let Some(status) = status_override {
2877                resp.status = Some(status);
2878            }
2879            let elapsed = get_start.elapsed();
2880            crate::metrics::record_get(codec_label, 0, returned_size, elapsed.as_secs_f64(), true);
2881            info!(
2882                op = "get_object",
2883                bucket = %get_bucket,
2884                key = %get_key,
2885                codec = codec_label,
2886                bytes_out = returned_size,
2887                total_object_size = total_size,
2888                range = range_request.is_some(),
2889                path = "buffered",
2890                latency_ms = elapsed.as_millis() as u64,
2891                "S4 get completed (buffered)"
2892            );
2893        }
2894        // v0.6 #40: echo the recorded `x-amz-replication-status` so
2895        // consumers can poll progress (PENDING / COMPLETED / FAILED).
2896        if let Some(mgr) = self.replication.as_ref()
2897            && let Some(status) = mgr.lookup_status(&get_bucket, &get_key)
2898        {
2899            resp.output.replication_status =
2900                Some(s3s::dto::ReplicationStatus::from(status.as_aws_str().to_owned()));
2901        }
2902        Ok(resp)
2903    }
2904
2905    // === passthrough delegations ===
2906    async fn head_bucket(
2907        &self,
2908        req: S3Request<HeadBucketInput>,
2909    ) -> S3Result<S3Response<HeadBucketOutput>> {
2910        self.backend.head_bucket(req).await
2911    }
2912    async fn list_buckets(
2913        &self,
2914        req: S3Request<ListBucketsInput>,
2915    ) -> S3Result<S3Response<ListBucketsOutput>> {
2916        self.backend.list_buckets(req).await
2917    }
2918    async fn create_bucket(
2919        &self,
2920        req: S3Request<CreateBucketInput>,
2921    ) -> S3Result<S3Response<CreateBucketOutput>> {
2922        self.backend.create_bucket(req).await
2923    }
2924    async fn delete_bucket(
2925        &self,
2926        req: S3Request<DeleteBucketInput>,
2927    ) -> S3Result<S3Response<DeleteBucketOutput>> {
2928        self.backend.delete_bucket(req).await
2929    }
2930    async fn head_object(
2931        &self,
2932        req: S3Request<HeadObjectInput>,
2933    ) -> S3Result<S3Response<HeadObjectOutput>> {
2934        // v0.6 #40: capture bucket/key before req is consumed so the
2935        // replication-status echo can look the entry up.
2936        let head_bucket = req.input.bucket.clone();
2937        let head_key = req.input.key.clone();
2938        let mut resp = self.backend.head_object(req).await?;
2939        if let Some(manifest) = extract_manifest(&resp.output.metadata) {
2940            // 客側には decompress 後の意味のある content_length / checksum を返す。
2941            // backend が返す圧縮済 bytes の checksum / e_tag は意味が違うため除去
2942            // (S4 は manifest 内の crc32c で integrity を担保する)。
2943            resp.output.content_length = Some(manifest.original_size as i64);
2944            resp.output.checksum_crc32 = None;
2945            resp.output.checksum_crc32c = None;
2946            resp.output.checksum_crc64nvme = None;
2947            resp.output.checksum_sha1 = None;
2948            resp.output.checksum_sha256 = None;
2949            resp.output.e_tag = None;
2950        }
2951        // v0.6 #40: echo `x-amz-replication-status` (PENDING / COMPLETED
2952        // / FAILED) so consumers can poll progress without a GET.
2953        if let Some(mgr) = self.replication.as_ref()
2954            && let Some(status) = mgr.lookup_status(&head_bucket, &head_key)
2955        {
2956            resp.output.replication_status =
2957                Some(s3s::dto::ReplicationStatus::from(status.as_aws_str().to_owned()));
2958        }
2959        // v0.7 #48 BUG-4 fix: HEAD must echo SSE indicators so SDKs
2960        // and pipelines see the same posture they got on PUT. The PUT
2961        // path stamps `s4-sse-type` metadata for exactly this — HEAD
2962        // doesn't fetch the body, so it can't peek frame magic.
2963        if let Some(meta) = resp.output.metadata.as_ref()
2964            && let Some(sse_type) = meta.get("s4-sse-type")
2965        {
2966            {
2967                match sse_type.as_str() {
2968                    "aws:kms" => {
2969                        resp.output.server_side_encryption = Some(
2970                            ServerSideEncryption::from_static(ServerSideEncryption::AWS_KMS),
2971                        );
2972                        if let Some(key_id) = meta.get("s4-sse-kms-key-id") {
2973                            resp.output.ssekms_key_id = Some(key_id.clone());
2974                        }
2975                    }
2976                    _ => {
2977                        resp.output.server_side_encryption = Some(
2978                            ServerSideEncryption::from_static(ServerSideEncryption::AES256),
2979                        );
2980                        if let Some(md5) = meta.get("s4-sse-c-key-md5") {
2981                            resp.output.sse_customer_algorithm =
2982                                Some(crate::sse::SSE_C_ALGORITHM.into());
2983                            resp.output.sse_customer_key_md5 = Some(md5.clone());
2984                        }
2985                    }
2986                }
2987            }
2988        }
2989        Ok(resp)
2990    }
2991    async fn delete_object(
2992        &self,
2993        mut req: S3Request<DeleteObjectInput>,
2994    ) -> S3Result<S3Response<DeleteObjectOutput>> {
2995        let bucket = req.input.bucket.clone();
2996        let key = req.input.key.clone();
2997        self.enforce_rate_limit(&req, &bucket)?;
2998        self.enforce_policy(&req, "s3:DeleteObject", &bucket, Some(&key))?;
2999        // v0.6 #42: MFA Delete enforcement. When the bucket has
3000        // MFA-Delete = Enabled, every DELETE / DELETE-version /
3001        // delete-marker form needs `x-amz-mfa: <serial> <code>` (RFC 6238
3002        // 6-digit TOTP). Runs *before* the WORM / versioning routers so
3003        // a missing token is denied for free regardless of which delete
3004        // path the request would otherwise take.
3005        if let Some(mgr) = self.mfa_delete.as_ref()
3006            && mgr.is_enabled(&bucket)
3007        {
3008            let header = req.input.mfa.as_deref();
3009            if let Err(e) = crate::mfa::check_mfa(&bucket, header, mgr, current_unix_secs()) {
3010                crate::metrics::record_mfa_delete_denial(&bucket);
3011                return Err(mfa_error_to_s3(e));
3012            }
3013        }
3014        // v0.5 #30: refuse the delete while a WORM lock is in effect.
3015        // Compliance can never be bypassed; Governance can be overridden
3016        // via `x-amz-bypass-governance-retention: true`; legal hold
3017        // never. The check happens before the versioning router so a
3018        // locked object can't be soft-deleted (delete-marker push) on an
3019        // Enabled bucket either — S3 spec says lock applies to all
3020        // delete forms.
3021        if let Some(mgr) = self.object_lock.as_ref()
3022            && let Some(state) = mgr.get(&bucket, &key)
3023        {
3024            let bypass = req.input.bypass_governance_retention.unwrap_or(false);
3025            let now = chrono::Utc::now();
3026            if !state.can_delete(now, bypass) {
3027                crate::metrics::record_policy_denial("s3:DeleteObject", &bucket);
3028                return Err(S3Error::with_message(
3029                    S3ErrorCode::AccessDenied,
3030                    "Access Denied because object protected by object lock",
3031                ));
3032            }
3033        }
3034        // v0.5 #34: route DELETE through the VersioningManager when the
3035        // bucket is in a versioning-aware state.
3036        //
3037        // - Enabled bucket, no version_id → push a delete marker into
3038        //   the chain. NO backend object is touched (older versions
3039        //   stay reachable via specific-version GET).
3040        // - Enabled / Suspended bucket, with version_id → physical
3041        //   delete. Backend bytes at the shadow key (or `<key>` for
3042        //   `null`) are removed; chain entry is dropped. If the deleted
3043        //   entry was a delete marker, no backend bytes exist for it
3044        //   (record-only).
3045        // - Suspended bucket, no version_id → push a "null" delete
3046        //   marker (S3 spec); backend bytes at `<key>` are physically
3047        //   removed (same as legacy).
3048        // - Unversioned bucket → fall through to legacy passthrough.
3049        if let Some(mgr) = self.versioning.as_ref() {
3050            let state = mgr.state(&bucket);
3051            if state != crate::versioning::VersioningState::Unversioned {
3052                let req_vid = req.input.version_id.take();
3053                if let Some(vid) = req_vid {
3054                    // Specific-version DELETE: touch backend bytes only
3055                    // when the entry was a real version (not a delete
3056                    // marker, which has no backend bytes).
3057                    let outcome = mgr.record_delete_specific(&bucket, &key, &vid);
3058                    let backend_target = if vid == crate::versioning::NULL_VERSION_ID {
3059                        key.clone()
3060                    } else {
3061                        versioned_shadow_key(&key, &vid)
3062                    };
3063                    let was_real_version = outcome
3064                        .as_ref()
3065                        .map(|o| !o.is_delete_marker)
3066                        .unwrap_or(false);
3067                    if was_real_version {
3068                        // Best-effort backend cleanup; missing bytes
3069                        // are not an error (e.g. shadow key already
3070                        // GC'd).
3071                        let backend_input = DeleteObjectInput {
3072                            bucket: bucket.clone(),
3073                            key: backend_target,
3074                            ..Default::default()
3075                        };
3076                        let backend_req = S3Request {
3077                            input: backend_input,
3078                            method: http::Method::DELETE,
3079                            uri: req.uri.clone(),
3080                            headers: req.headers.clone(),
3081                            extensions: http::Extensions::new(),
3082                            credentials: req.credentials.clone(),
3083                            region: req.region.clone(),
3084                            service: req.service.clone(),
3085                            trailing_headers: None,
3086                        };
3087                        let _ = self.backend.delete_object(backend_req).await;
3088                    }
3089                    let mut output = DeleteObjectOutput {
3090                        version_id: Some(vid.clone()),
3091                        ..Default::default()
3092                    };
3093                    if let Some(o) = outcome.as_ref()
3094                        && o.is_delete_marker
3095                    {
3096                        output.delete_marker = Some(true);
3097                    }
3098                    // v0.6 #35: specific-version DELETE always counts as
3099                    // a hard `ObjectRemoved:Delete` event (the chain
3100                    // entry, marker or not, is gone after this call).
3101                    self.fire_delete_notification(
3102                        &bucket,
3103                        &key,
3104                        crate::notifications::EventType::ObjectRemovedDelete,
3105                        Some(vid.clone()),
3106                    );
3107                    return Ok(S3Response::new(output));
3108                }
3109                // No version_id: record a delete marker (state-aware).
3110                let outcome = mgr.record_delete(&bucket, &key);
3111                if state == crate::versioning::VersioningState::Suspended {
3112                    // Suspended buckets also evict the prior `<key>`
3113                    // bytes (the previous null version is gone too).
3114                    let backend_input = DeleteObjectInput {
3115                        bucket: bucket.clone(),
3116                        key: key.clone(),
3117                        ..Default::default()
3118                    };
3119                    let backend_req = S3Request {
3120                        input: backend_input,
3121                        method: http::Method::DELETE,
3122                        uri: req.uri.clone(),
3123                        headers: req.headers.clone(),
3124                        extensions: http::Extensions::new(),
3125                        credentials: req.credentials.clone(),
3126                        region: req.region.clone(),
3127                        service: req.service.clone(),
3128                        trailing_headers: None,
3129                    };
3130                    let _ = self.backend.delete_object(backend_req).await;
3131                }
3132                let output = DeleteObjectOutput {
3133                    delete_marker: Some(true),
3134                    version_id: outcome.version_id.clone(),
3135                    ..Default::default()
3136                };
3137                // v0.6 #35: versioned bucket DELETE without a version-id
3138                // creates a delete marker — the dedicated AWS event
3139                // taxonomy entry. Suspended-state buckets also push a
3140                // (null) marker, so the same event fires there.
3141                self.fire_delete_notification(
3142                    &bucket,
3143                    &key,
3144                    crate::notifications::EventType::ObjectRemovedDeleteMarker,
3145                    outcome.version_id,
3146                );
3147                return Ok(S3Response::new(output));
3148            }
3149        }
3150        // Legacy / Unversioned path: physical delete on the backend +
3151        // best-effort sidecar cleanup (mirrors v0.4 behaviour).
3152        let resp = self.backend.delete_object(req).await?;
3153        // v0.5 #30: drop any per-object lock state once the delete has
3154        // succeeded so the freed key can be re-armed by a future PUT
3155        // under the bucket default. Reaching here implies the lock had
3156        // already passed `can_delete` above, so this is purely cleanup.
3157        if let Some(mgr) = self.object_lock.as_ref() {
3158            mgr.clear(&bucket, &key);
3159        }
3160        // v0.6 #39: drop any object-level tag set on physical delete —
3161        // the freed key starts a fresh tag history if a future PUT
3162        // re-creates it. (Versioned-delete branches above return early
3163        // and do NOT touch tags, mirroring AWS where tag state is
3164        // attached to the logical key, not the version chain.)
3165        if let Some(mgr) = self.tagging.as_ref() {
3166            mgr.delete_object_tags(&bucket, &key);
3167        }
3168        let sidecar = sidecar_key(&key);
3169        // v0.7 #49: skip the sidecar DELETE if the key + sidecar suffix
3170        // can't be encoded into a request URI — the primary delete
3171        // already succeeded and a stale sidecar is harmless (Range GET
3172        // re-validates the underlying object on next read).
3173        if let Ok(uri) = safe_object_uri(&bucket, &sidecar) {
3174            let sidecar_input = DeleteObjectInput {
3175                bucket: bucket.clone(),
3176                key: sidecar,
3177                ..Default::default()
3178            };
3179            let sidecar_req = S3Request {
3180                input: sidecar_input,
3181                method: http::Method::DELETE,
3182                uri,
3183                headers: http::HeaderMap::new(),
3184                extensions: http::Extensions::new(),
3185                credentials: None,
3186                region: None,
3187                service: None,
3188                trailing_headers: None,
3189            };
3190            let _ = self.backend.delete_object(sidecar_req).await;
3191        }
3192        // v0.6 #35: legacy unversioned-bucket hard delete fires the
3193        // canonical `ObjectRemoved:Delete` event.
3194        self.fire_delete_notification(
3195            &bucket,
3196            &key,
3197            crate::notifications::EventType::ObjectRemovedDelete,
3198            None,
3199        );
3200        Ok(resp)
3201    }
3202    async fn delete_objects(
3203        &self,
3204        req: S3Request<DeleteObjectsInput>,
3205    ) -> S3Result<S3Response<DeleteObjectsOutput>> {
3206        // v0.6 #42: MFA Delete applies once to the whole batch (S3 spec:
3207        // when MFA-Delete is on the bucket, a missing / invalid token
3208        // fails the entire DeleteObjects request, not per-object).
3209        if let Some(mgr) = self.mfa_delete.as_ref()
3210            && mgr.is_enabled(&req.input.bucket)
3211        {
3212            let header = req.input.mfa.as_deref();
3213            if let Err(e) =
3214                crate::mfa::check_mfa(&req.input.bucket, header, mgr, current_unix_secs())
3215            {
3216                crate::metrics::record_mfa_delete_denial(&req.input.bucket);
3217                return Err(mfa_error_to_s3(e));
3218            }
3219        }
3220        self.backend.delete_objects(req).await
3221    }
3222    async fn copy_object(
3223        &self,
3224        mut req: S3Request<CopyObjectInput>,
3225    ) -> S3Result<S3Response<CopyObjectOutput>> {
3226        // copy is conceptually "GetObject src + PutObject dst" — enforce both.
3227        let dst_bucket = req.input.bucket.clone();
3228        let dst_key = req.input.key.clone();
3229        self.enforce_policy(&req, "s3:PutObject", &dst_bucket, Some(&dst_key))?;
3230        if let CopySource::Bucket { bucket, key, .. } = &req.input.copy_source {
3231            self.enforce_policy(&req, "s3:GetObject", bucket, Some(key))?;
3232        }
3233        // S4-aware copy: source object に s4-* metadata がある場合、それを
3234        // destination に確実に preserve する。
3235        //
3236        // - MetadataDirective::COPY (default): backend が source metadata を
3237        //   そのまま copy するので S4 metadata も自動で渡る。介入不要
3238        // - MetadataDirective::REPLACE: 客が指定した metadata で source を
3239        //   上書き → s4-* metadata が消えると destination は decompress 不能に
3240        //   なる (silent corruption)。S4 が source metadata を HEAD で取得し、
3241        //   s4-* fields を input.metadata に強制 merge する
3242        let needs_merge = req
3243            .input
3244            .metadata_directive
3245            .as_ref()
3246            .map(|d| d.as_str() == MetadataDirective::REPLACE)
3247            .unwrap_or(false);
3248        if needs_merge && let CopySource::Bucket { bucket, key, .. } = &req.input.copy_source {
3249            let head_input = HeadObjectInput {
3250                bucket: bucket.to_string(),
3251                key: key.to_string(),
3252                ..Default::default()
3253            };
3254            let head_req = S3Request {
3255                input: head_input,
3256                method: req.method.clone(),
3257                uri: req.uri.clone(),
3258                headers: req.headers.clone(),
3259                extensions: http::Extensions::new(),
3260                credentials: req.credentials.clone(),
3261                region: req.region.clone(),
3262                service: req.service.clone(),
3263                trailing_headers: None,
3264            };
3265            if let Ok(head) = self.backend.head_object(head_req).await
3266                && let Some(src_meta) = head.output.metadata.as_ref()
3267            {
3268                let dest_meta = req.input.metadata.get_or_insert_with(Default::default);
3269                for key in [
3270                    META_CODEC,
3271                    META_ORIGINAL_SIZE,
3272                    META_COMPRESSED_SIZE,
3273                    META_CRC32C,
3274                    META_MULTIPART,
3275                    META_FRAMED,
3276                ] {
3277                    if let Some(v) = src_meta.get(key) {
3278                        // 客が同じ key を指定していたら preserve しない (= 上書き許可)
3279                        // していたら何もしない。指定していなければ insert
3280                        dest_meta
3281                            .entry(key.to_string())
3282                            .or_insert_with(|| v.clone());
3283                    }
3284                }
3285                debug!(
3286                    src_bucket = %bucket,
3287                    src_key = %key,
3288                    "S4 copy_object: preserved s4-* metadata across REPLACE directive"
3289                );
3290            }
3291        }
3292        self.backend.copy_object(req).await
3293    }
3294    async fn list_objects(
3295        &self,
3296        req: S3Request<ListObjectsInput>,
3297    ) -> S3Result<S3Response<ListObjectsOutput>> {
3298        self.enforce_rate_limit(&req, &req.input.bucket)?;
3299        self.enforce_policy(&req, "s3:ListBucket", &req.input.bucket, None)?;
3300        let mut resp = self.backend.list_objects(req).await?;
3301        // S4 内部 object (`*.s4index` sidecar、`.__s4ver__/` shadow versions
3302        // — v0.5 #34) を顧客から隠す。
3303        if let Some(contents) = resp.output.contents.as_mut() {
3304            contents.retain(|o| {
3305                o.key
3306                    .as_ref()
3307                    .map(|k| !k.ends_with(".s4index") && !is_versioning_shadow_key(k))
3308                    .unwrap_or(true)
3309            });
3310        }
3311        Ok(resp)
3312    }
3313    async fn list_objects_v2(
3314        &self,
3315        req: S3Request<ListObjectsV2Input>,
3316    ) -> S3Result<S3Response<ListObjectsV2Output>> {
3317        self.enforce_rate_limit(&req, &req.input.bucket)?;
3318        self.enforce_policy(&req, "s3:ListBucket", &req.input.bucket, None)?;
3319        let mut resp = self.backend.list_objects_v2(req).await?;
3320        if let Some(contents) = resp.output.contents.as_mut() {
3321            let before = contents.len();
3322            contents.retain(|o| {
3323                o.key
3324                    .as_ref()
3325                    .map(|k| !k.ends_with(".s4index") && !is_versioning_shadow_key(k))
3326                    .unwrap_or(true)
3327            });
3328            // key_count も補正 (S3 spec compliance)
3329            if let Some(kc) = resp.output.key_count.as_mut() {
3330                *kc -= (before - contents.len()) as i32;
3331            }
3332        }
3333        Ok(resp)
3334    }
3335    /// v0.4 #17: filter S4-internal sidecars from versioned listings.
3336    /// v0.5 #34: when a [`crate::versioning::VersioningManager`] is
3337    /// attached AND the bucket is in a versioning-aware state, build
3338    /// the `Versions` / `DeleteMarkers` arrays directly from the
3339    /// in-memory chain (paginated + ordered the S3 way: key asc,
3340    /// version newest-first inside each key). Otherwise fall back to
3341    /// passthrough + sidecar-filter (legacy v0.4 behaviour).
3342    async fn list_object_versions(
3343        &self,
3344        req: S3Request<ListObjectVersionsInput>,
3345    ) -> S3Result<S3Response<ListObjectVersionsOutput>> {
3346        self.enforce_rate_limit(&req, &req.input.bucket)?;
3347        self.enforce_policy(&req, "s3:ListBucket", &req.input.bucket, None)?;
3348        // v0.5 #34: VersioningManager-owned path.
3349        if let Some(mgr) = self.versioning.as_ref()
3350            && mgr.state(&req.input.bucket) != crate::versioning::VersioningState::Unversioned
3351        {
3352            let max_keys = req.input.max_keys.unwrap_or(1000) as usize;
3353            let page = mgr.list_versions(
3354                &req.input.bucket,
3355                req.input.prefix.as_deref(),
3356                req.input.key_marker.as_deref(),
3357                req.input.version_id_marker.as_deref(),
3358                max_keys,
3359            );
3360            let versions: Vec<ObjectVersion> = page
3361                .versions
3362                .into_iter()
3363                .map(|e| ObjectVersion {
3364                    key: Some(e.key),
3365                    version_id: Some(e.version_id),
3366                    is_latest: Some(e.is_latest),
3367                    e_tag: Some(ETag::Strong(e.etag)),
3368                    size: Some(e.size as i64),
3369                    last_modified: Some(std::time::SystemTime::from(e.last_modified).into()),
3370                    ..Default::default()
3371                })
3372                .collect();
3373            let delete_markers: Vec<DeleteMarkerEntry> = page
3374                .delete_markers
3375                .into_iter()
3376                .map(|e| DeleteMarkerEntry {
3377                    key: Some(e.key),
3378                    version_id: Some(e.version_id),
3379                    is_latest: Some(e.is_latest),
3380                    last_modified: Some(std::time::SystemTime::from(e.last_modified).into()),
3381                    ..Default::default()
3382                })
3383                .collect();
3384            let output = ListObjectVersionsOutput {
3385                name: Some(req.input.bucket.clone()),
3386                prefix: req.input.prefix.clone(),
3387                key_marker: req.input.key_marker.clone(),
3388                version_id_marker: req.input.version_id_marker.clone(),
3389                max_keys: req.input.max_keys,
3390                versions: if versions.is_empty() {
3391                    None
3392                } else {
3393                    Some(versions)
3394                },
3395                delete_markers: if delete_markers.is_empty() {
3396                    None
3397                } else {
3398                    Some(delete_markers)
3399                },
3400                is_truncated: Some(page.is_truncated),
3401                next_key_marker: page.next_key_marker,
3402                next_version_id_marker: page.next_version_id_marker,
3403                ..Default::default()
3404            };
3405            return Ok(S3Response::new(output));
3406        }
3407        // Legacy passthrough path (v0.4 #17 sidecar filter retained).
3408        let mut resp = self.backend.list_object_versions(req).await?;
3409        if let Some(versions) = resp.output.versions.as_mut() {
3410            versions.retain(|v| {
3411                v.key
3412                    .as_ref()
3413                    .map(|k| !k.ends_with(".s4index") && !is_versioning_shadow_key(k))
3414                    .unwrap_or(true)
3415            });
3416        }
3417        if let Some(markers) = resp.output.delete_markers.as_mut() {
3418            markers.retain(|m| {
3419                m.key
3420                    .as_ref()
3421                    .map(|k| !k.ends_with(".s4index") && !is_versioning_shadow_key(k))
3422                    .unwrap_or(true)
3423            });
3424        }
3425        Ok(resp)
3426    }
3427
3428    async fn create_multipart_upload(
3429        &self,
3430        mut req: S3Request<CreateMultipartUploadInput>,
3431    ) -> S3Result<S3Response<CreateMultipartUploadOutput>> {
3432        // Multipart object は per-part 圧縮 + frame 形式で書く。GET 時に
3433        // frame parse を起動するため、object metadata に flag を立てる。
3434        // codec は dispatcher の default kind を採用 (per-part 別 codec は Phase 2)。
3435        let codec_kind = self.registry.default_kind();
3436        let meta = req.input.metadata.get_or_insert_with(Default::default);
3437        meta.insert(META_MULTIPART.into(), "true".into());
3438        meta.insert(META_CODEC.into(), codec_kind.as_str().into());
3439        // v0.8 #54 BUG-10 fix: take() the SSE request fields off
3440        // `req.input` so they are NOT forwarded to the backend on
3441        // CreateMultipartUpload. Same root cause as v0.7 #48 BUG-2/3 on
3442        // single-PUT — MinIO rejects SSE-C with "HTTPS required" and
3443        // SSE-KMS with "KMS not configured" when the headers reach it.
3444        // S4 owns the encrypt-then-store contract; we capture the
3445        // recipe in `multipart_state` here and apply it on Complete.
3446        let sse_c_alg = req.input.sse_customer_algorithm.take();
3447        let sse_c_key = req.input.sse_customer_key.take();
3448        let sse_c_md5 = req.input.sse_customer_key_md5.take();
3449        let sse_header = req.input.server_side_encryption.take();
3450        let sse_kms_key = req.input.ssekms_key_id.take();
3451        // Strip the encryption-context too — leaving it would make
3452        // MinIO try to validate it against a non-existent KMS key.
3453        let _ = req.input.ssekms_encryption_context.take();
3454        let sse_c_material = extract_sse_c_material(&sse_c_alg, &sse_c_key, &sse_c_md5)?;
3455        let kms_key_id = extract_kms_key_id(
3456            &sse_header,
3457            &sse_kms_key,
3458            self.kms_default_key_id.as_deref(),
3459        );
3460        // SSE-C / SSE-KMS exclusivity (mirrors put_object L1870).
3461        if sse_c_material.is_some() && kms_key_id.is_some() {
3462            return Err(S3Error::with_message(
3463                S3ErrorCode::InvalidArgument,
3464                "SSE-C and SSE-KMS cannot be used together on the same multipart upload",
3465            ));
3466        }
3467        let sse_mode = if let Some(ref m) = sse_c_material {
3468            // v0.8.2 #62 (H-6 audit fix): wrap the customer-supplied
3469            // 32-byte key in `Zeroizing` so abandoned uploads (or
3470            // normal Complete/Abort) wipe the key bytes on drop. The
3471            // `key_md5` is the public fingerprint and stays as a
3472            // bare `[u8; 16]`.
3473            crate::multipart_state::MultipartSseMode::SseC {
3474                key: zeroize::Zeroizing::new(m.key),
3475                key_md5: m.key_md5,
3476            }
3477        } else if let Some(ref kid) = kms_key_id {
3478            // KMS pre-flight: fail at Create rather than at Complete if
3479            // the gateway has no KMS backend wired (mirrors the
3480            // put_object L1879 check).
3481            if self.kms.is_none() {
3482                return Err(S3Error::with_message(
3483                    S3ErrorCode::InvalidRequest,
3484                    "SSE-KMS requested but no --kms-local-dir / --kms-aws-region is configured on this gateway",
3485                ));
3486            }
3487            crate::multipart_state::MultipartSseMode::SseKms { key_id: kid.clone() }
3488        } else if self.sse_keyring.is_some() {
3489            // SSE-S4: server-driven transparent encryption. Activates
3490            // whenever the gateway has a keyring configured AND the
3491            // client didn't pick a different SSE mode.
3492            crate::multipart_state::MultipartSseMode::SseS4
3493        } else {
3494            crate::multipart_state::MultipartSseMode::None
3495        };
3496        // v0.8 #54 BUG-9 fix: parse the Tagging header on Create. The
3497        // single-PUT path does this on PutObject; the multipart path
3498        // captures it now and commits via TagManager on Complete.
3499        let request_tags: Option<crate::tagging::TagSet> = req
3500            .input
3501            .tagging
3502            .as_deref()
3503            .map(crate::tagging::parse_tagging_header)
3504            .transpose()
3505            .map_err(|e| S3Error::with_message(S3ErrorCode::InvalidArgument, e.to_string()))?;
3506        // Strip the `Tagging` field off the input so the backend
3507        // doesn't try to apply it (no-op on MinIO but keeps the wire
3508        // clean).
3509        let _ = req.input.tagging.take();
3510        // Object Lock recipe (BUG-7 — captured here, applied on Complete).
3511        let explicit_lock_mode: Option<crate::object_lock::LockMode> = req
3512            .input
3513            .object_lock_mode
3514            .as_ref()
3515            .and_then(|m| crate::object_lock::LockMode::from_aws_str(m.as_str()));
3516        let explicit_retain_until: Option<chrono::DateTime<chrono::Utc>> = req
3517            .input
3518            .object_lock_retain_until_date
3519            .as_ref()
3520            .and_then(timestamp_to_chrono_utc);
3521        let explicit_legal_hold_on: bool = req
3522            .input
3523            .object_lock_legal_hold_status
3524            .as_ref()
3525            .map(|s| s.as_str().eq_ignore_ascii_case("ON"))
3526            .unwrap_or(false);
3527        let bucket = req.input.bucket.clone();
3528        let key = req.input.key.clone();
3529        debug!(
3530            bucket = %bucket,
3531            key = %key,
3532            codec = codec_kind.as_str(),
3533            sse = ?sse_mode,
3534            "S4 create_multipart_upload: marking object for per-part compression"
3535        );
3536        let mut resp = self.backend.create_multipart_upload(req).await?;
3537        // Stash the per-upload context only after the backend handed
3538        // us an upload_id (failed Creates leave nothing in the store).
3539        if let Some(upload_id) = resp.output.upload_id.as_ref() {
3540            self.multipart_state.put(
3541                upload_id,
3542                crate::multipart_state::MultipartUploadContext {
3543                    bucket,
3544                    key,
3545                    sse: sse_mode.clone(),
3546                    tags: request_tags,
3547                    object_lock_mode: explicit_lock_mode,
3548                    object_lock_retain_until: explicit_retain_until,
3549                    object_lock_legal_hold: explicit_legal_hold_on,
3550                },
3551            );
3552        }
3553        // SSE-C / SSE-KMS response echo (mirrors put_object L2036-L2050).
3554        match &sse_mode {
3555            crate::multipart_state::MultipartSseMode::SseC { key_md5, .. } => {
3556                resp.output.sse_customer_algorithm = Some(crate::sse::SSE_C_ALGORITHM.into());
3557                resp.output.sse_customer_key_md5 = Some(
3558                    base64::engine::general_purpose::STANDARD.encode(key_md5),
3559                );
3560            }
3561            crate::multipart_state::MultipartSseMode::SseKms { key_id } => {
3562                resp.output.server_side_encryption = Some(
3563                    ServerSideEncryption::from_static(ServerSideEncryption::AWS_KMS),
3564                );
3565                resp.output.ssekms_key_id = Some(key_id.clone());
3566            }
3567            _ => {}
3568        }
3569        Ok(resp)
3570    }
3571
3572    async fn upload_part(
3573        &self,
3574        mut req: S3Request<UploadPartInput>,
3575    ) -> S3Result<S3Response<UploadPartOutput>> {
3576        // 各 part を圧縮して frame header 付きで forward。GET 時に
3577        // `decompress_multipart` が frame iter で順に解凍する。
3578        // **per-part codec dispatch**: dispatcher が body 先頭 sample から
3579        // codec を選ぶので、parquet 風の mixed-content multipart で part ごとに
3580        // 最適 codec を使える (整数列 part → Bitcomp、text 列 part → zstd 等)。
3581        //
3582        // v0.8 #54 BUG-5/BUG-10 fix: lookup the per-upload SSE
3583        // context captured by `create_multipart_upload` and (a) strip
3584        // any SSE-C request headers off `req.input` so the backend
3585        // doesn't see them — same root cause as v0.7 #48 BUG-2/3 on
3586        // single-PUT; MinIO refuses SSE-C parts over HTTP — and (b)
3587        // observe that an upload context exists for `upload_id`. The
3588        // actual encrypt happens once at `complete_multipart_upload`
3589        // time on the assembled body (the per-part-encrypt approach
3590        // would require a matching multi-segment decrypt path on GET;
3591        // encrypting the whole assembled body keeps the GET path's
3592        // `is_sse_encrypted` branch in get_object L2429 working
3593        // unchanged).
3594        let sse_ctx = self
3595            .multipart_state
3596            .get(req.input.upload_id.as_str());
3597        // v0.8.2 #62 (H-1 audit fix): SSE-C key consistency check.
3598        // The AWS S3 spec requires the same SSE-C key headers on
3599        // every UploadPart and rejects mismatches with 400. Prior to
3600        // #62 we silently stripped the headers (BUG-10 fix) without
3601        // validating them, allowing a client to send part 1 under
3602        // key-A and part 2 under key-B; both got stored, then
3603        // re-encrypted with key-A on Complete — the client thinks
3604        // part 2 is under key-B but a GET with key-B would in fact
3605        // hit the part-1 ciphertext that was actually encrypted with
3606        // key-A. That would either decrypt successfully (silent
3607        // corruption: client lost track of which key encrypts what)
3608        // or fail in a confusing way. Validate the per-part headers
3609        // now and reject with 400 InvalidArgument on mismatch /
3610        // omission / partial supply, matching real-S3 behaviour.
3611        if let Some(ref ctx) = sse_ctx {
3612            if let crate::multipart_state::MultipartSseMode::SseC { key_md5: ctx_md5, .. } =
3613                &ctx.sse
3614            {
3615                let alg = req.input.sse_customer_algorithm.take();
3616                let key_b64 = req.input.sse_customer_key.take();
3617                let md5_b64 = req.input.sse_customer_key_md5.take();
3618                match (alg, key_b64, md5_b64) {
3619                    (Some(a), Some(k), Some(m)) => {
3620                        // Parse + validate; if the per-part headers
3621                        // are themselves malformed (algorithm not
3622                        // AES256, MD5 mismatch, key not 32 bytes)
3623                        // surface the same 400 the single-PUT path
3624                        // would. Then compare the parsed MD5 to the
3625                        // upload-context's MD5; mismatch is a
3626                        // different-key UploadPart and must reject.
3627                        let part_material =
3628                            crate::sse::parse_customer_key_headers(&a, &k, &m)
3629                                .map_err(sse_c_error_to_s3)?;
3630                        if part_material.key_md5 != *ctx_md5 {
3631                            return Err(S3Error::with_message(
3632                                S3ErrorCode::InvalidArgument,
3633                                "SSE-C key on UploadPart does not match the key supplied on CreateMultipartUpload",
3634                            ));
3635                        }
3636                        // OK — same key as Create. Headers are
3637                        // already taken off `req.input` so the
3638                        // backend never sees them.
3639                    }
3640                    (None, None, None) => {
3641                        // AWS S3 spec: SSE-C headers MUST be replayed
3642                        // on every UploadPart of an SSE-C multipart.
3643                        // Real-S3 returns 400 InvalidRequest in this
3644                        // case; mirror that.
3645                        return Err(S3Error::with_message(
3646                            S3ErrorCode::InvalidRequest,
3647                            "SSE-C requires customer-key headers on every UploadPart (CreateMultipartUpload was SSE-C)",
3648                        ));
3649                    }
3650                    _ => {
3651                        // Partial header set (e.g. algorithm + key
3652                        // but no MD5) — same handling as the
3653                        // single-PUT `extract_sse_c_material` helper.
3654                        return Err(S3Error::with_message(
3655                            S3ErrorCode::InvalidRequest,
3656                            "SSE-C requires all three of: x-amz-server-side-encryption-customer-{algorithm,key,key-MD5}",
3657                        ));
3658                    }
3659                }
3660            } else {
3661                // CreateMultipartUpload was non-SSE-C (None / SseS4 /
3662                // SseKms). A part that arrives carrying SSE-C headers
3663                // is either a confused client or an attempt to
3664                // smuggle SSE-C around the gateway-internal SSE
3665                // recipe. Reject with 400 InvalidRequest rather than
3666                // silently strip — the strip would let the client
3667                // believe the part was encrypted under their key
3668                // when in fact the upload's encryption recipe is
3669                // whatever the Create captured.
3670                if req.input.sse_customer_algorithm.is_some()
3671                    || req.input.sse_customer_key.is_some()
3672                    || req.input.sse_customer_key_md5.is_some()
3673                {
3674                    return Err(S3Error::with_message(
3675                        S3ErrorCode::InvalidRequest,
3676                        "UploadPart sent SSE-C headers but CreateMultipartUpload was not SSE-C",
3677                    ));
3678                }
3679            }
3680        } else {
3681            // No upload context registered (gateway crashed between
3682            // Create and Part, or pre-#62 abandoned-upload restore).
3683            // We can't check key consistency in this case — strip
3684            // the headers and let the request through unchanged so
3685            // the backend's `NoSuchUpload` reply (or whatever it
3686            // chooses to do) flows back to the client.
3687            let _ = req.input.sse_customer_algorithm.take();
3688            let _ = req.input.sse_customer_key.take();
3689            let _ = req.input.sse_customer_key_md5.take();
3690        }
3691        let _sse_ctx = sse_ctx;
3692        if let Some(blob) = req.input.body.take() {
3693            let bytes = collect_blob(blob, self.max_body_bytes)
3694                .await
3695                .map_err(internal("collect upload_part body"))?;
3696            let sample_len = bytes.len().min(SAMPLE_BYTES);
3697            // v0.8 #56: full part body is already in memory here; use its
3698            // length as the size hint so the dispatcher can promote to GPU
3699            // if it's big enough.
3700            let codec_kind = self
3701                .dispatcher
3702                .pick_with_size_hint(&bytes[..sample_len], Some(bytes.len() as u64))
3703                .await;
3704            let original_size = bytes.len() as u64;
3705            // v0.8 #55: telemetry-returning compress (GPU metrics stamp).
3706            let (compress_res, tel) = self
3707                .registry
3708                .compress_with_telemetry(bytes, codec_kind)
3709                .await;
3710            stamp_gpu_compress_telemetry(&tel);
3711            let (compressed, manifest) = compress_res.map_err(internal("registry compress part"))?;
3712            let header = FrameHeader {
3713                codec: codec_kind,
3714                original_size,
3715                compressed_size: compressed.len() as u64,
3716                crc32c: manifest.crc32c,
3717            };
3718            let mut framed = BytesMut::with_capacity(FRAME_HEADER_BYTES + compressed.len());
3719            write_frame(&mut framed, header, &compressed);
3720            // v0.2 #5: heuristic-based padding skip for likely-final parts.
3721            //
3722            // AWS SDK / aws-cli / boto3 always send the final (and only the
3723            // final) part below the configured part_size. So if the raw user
3724            // part is already smaller than S3's 5 MiB multipart minimum, this
3725            // is overwhelmingly likely to be the final part — and the final
3726            // part is exempt from S3's size constraint. Skipping padding here
3727            // saves up to ~5 MiB per object on highly compressible workloads.
3728            //
3729            // If a misbehaving client sends a tiny **non-final** part, S3
3730            // itself rejects with EntityTooSmall at CompleteMultipartUpload —
3731            // identical outcome to a vanilla S3 PUT, just earlier than
3732            // padding-then-complete would catch it.
3733            let likely_final = original_size < S3_MULTIPART_MIN_PART_BYTES as u64;
3734            if !likely_final {
3735                pad_to_minimum(&mut framed, S3_MULTIPART_MIN_PART_BYTES);
3736            }
3737            let framed_bytes = framed.freeze();
3738            let new_len = framed_bytes.len() as i64;
3739            // 同じ wire 互換問題が multipart にもある (content-length / checksum)
3740            req.input.content_length = Some(new_len);
3741            req.input.checksum_algorithm = None;
3742            req.input.checksum_crc32 = None;
3743            req.input.checksum_crc32c = None;
3744            req.input.checksum_crc64nvme = None;
3745            req.input.checksum_sha1 = None;
3746            req.input.checksum_sha256 = None;
3747            req.input.content_md5 = None;
3748            req.input.body = Some(bytes_to_blob(framed_bytes));
3749            debug!(
3750                part_number = ?req.input.part_number,
3751                upload_id = ?req.input.upload_id,
3752                original_size,
3753                framed_size = new_len,
3754                "S4 upload_part: framed compressed payload"
3755            );
3756        }
3757        self.backend.upload_part(req).await
3758    }
3759    async fn complete_multipart_upload(
3760        &self,
3761        mut req: S3Request<CompleteMultipartUploadInput>,
3762    ) -> S3Result<S3Response<CompleteMultipartUploadOutput>> {
3763        let bucket = req.input.bucket.clone();
3764        let key = req.input.key.clone();
3765        let upload_id = req.input.upload_id.clone();
3766        // v0.8.1 #59: serialise concurrent Complete invocations on the
3767        // same `(bucket, key)`. The race window the lock closes is the
3768        // GET-assembled-body → encrypt → PUT-encrypted-body triple
3769        // below (BUG-5 fix); without serialisation, two Completes for
3770        // different `upload_id` but the same logical key could each
3771        // read the other's plaintext assembled body and overwrite the
3772        // peer's encrypted result. The guard is held to function exit
3773        // (drop on `Ok` / `Err`), covering version-id mint, object-
3774        // lock apply, tagging persist, and replication enqueue too.
3775        let completion_lock = self.multipart_state.completion_lock(&bucket, &key);
3776        let _completion_guard = completion_lock.lock().await;
3777        // v0.8 #54 — fetch the per-upload context captured on Create.
3778        // `None` means an abandoned / unknown upload_id (gateway
3779        // crashed between Create and Complete, or pre-v0.8 state
3780        // restore); we still let the backend do its thing for
3781        // transparency, but we can't apply any SSE / version / lock /
3782        // tag / replication post-processing because we never captured
3783        // the recipe.
3784        let ctx = self.multipart_state.get(upload_id.as_str());
3785        // v0.8 #54 BUG-10 fix: same SSE-C header strip as upload_part
3786        // — some clients (boto3 / aws-sdk-cpp older versions) replay
3787        // the SSE-C triple on Complete too, and MinIO will choke if
3788        // they reach the backend.
3789        let _ = req.input.sse_customer_algorithm.take();
3790        let _ = req.input.sse_customer_key.take();
3791        let _ = req.input.sse_customer_key_md5.take();
3792        let mut resp = self.backend.complete_multipart_upload(req).await?;
3793        // CompleteMultipartUpload 成功 → 完成した object を full fetch して frame
3794        // index を build、`<key>.s4index` sidecar として保存。これで Range GET の
3795        // partial fetch path が利用可能になる (Range request の帯域節約)。
3796        // 注: 巨大 object の場合この pass は重いが、Range query は一度 sidecar が
3797        // できれば爆速になるので 1 回の cost は payback される
3798        //
3799        // v0.8 #54 BUG-5..9: this same fetch is the choke-point for
3800        // the SSE encrypt re-PUT + versioning shadow-key rewrite +
3801        // replication source-bytes capture, so we GET once and reuse
3802        // the bytes for every post-processing step.
3803        let assembled_body: Option<bytes::Bytes> =
3804            if let Ok(uri) = safe_object_uri(&bucket, &key) {
3805                let get_input = GetObjectInput {
3806                    bucket: bucket.clone(),
3807                    key: key.clone(),
3808                    ..Default::default()
3809                };
3810                let get_req = S3Request {
3811                    input: get_input,
3812                    method: http::Method::GET,
3813                    uri,
3814                    headers: http::HeaderMap::new(),
3815                    extensions: http::Extensions::new(),
3816                    credentials: None,
3817                    region: None,
3818                    service: None,
3819                    trailing_headers: None,
3820                };
3821                match self.backend.get_object(get_req).await {
3822                    Ok(get_resp) => match get_resp.output.body {
3823                        Some(blob) => collect_blob(blob, self.max_body_bytes).await.ok(),
3824                        None => None,
3825                    },
3826                    Err(_) => None,
3827                }
3828            } else {
3829                None
3830            };
3831        // Sidecar build (existing behaviour, gated on assembled body).
3832        if let Some(ref body) = assembled_body
3833            && let Ok(index) = build_index_from_body(body)
3834        {
3835            self.write_sidecar(&bucket, &key, &index).await;
3836        }
3837        // From here on, post-processing depends on the context —
3838        // short-circuit when the upload had no captured recipe
3839        // (legacy / crashed-Create / pre-v0.8 state restore).
3840        if let Some(ctx) = ctx {
3841            // v0.8 #54 BUG-6 fix: mint a version-id when the bucket
3842            // is versioning-Enabled. The single-PUT path does this in
3843            // `put_object` ~L1968; multipart was the missing branch.
3844            // We mint here (post-Complete, before any re-PUT) so the
3845            // same vid threads into both the shadow-key rewrite and
3846            // the VersionEntry the manager records.
3847            let pending_version: Option<crate::versioning::PutOutcome> = self
3848                .versioning
3849                .as_ref()
3850                .map(|mgr| mgr.state(&bucket))
3851                .map(|state| match state {
3852                    crate::versioning::VersioningState::Enabled => {
3853                        crate::versioning::PutOutcome {
3854                            version_id: crate::versioning::VersioningManager::new_version_id(),
3855                            versioned_response: true,
3856                        }
3857                    }
3858                    crate::versioning::VersioningState::Suspended
3859                    | crate::versioning::VersioningState::Unversioned => {
3860                        crate::versioning::PutOutcome {
3861                            version_id: crate::versioning::NULL_VERSION_ID.to_owned(),
3862                            versioned_response: false,
3863                        }
3864                    }
3865                });
3866            // v0.8 #54 BUG-5 fix: encrypt the assembled framed body
3867            // and re-PUT it to the backend so the on-disk bytes are
3868            // SSE-encrypted. The single-PUT path does this body-by-
3869            // body inside `put_object` (L1907-L1942); for multipart,
3870            // encrypt-per-part would require a multi-segment decrypt
3871            // path on GET — we instead do a single encrypt over the
3872            // assembled framed body so the existing GET decrypt
3873            // branch (`is_sse_encrypted` → `decrypt(body, source)` →
3874            // FrameIter) handles it unchanged.
3875            //
3876            // The cost is one extra round-trip per Complete for SSE-
3877            // enabled multipart (already-paid for the sidecar build).
3878            // For single-instance gateways pointing at a co-located
3879            // backend this is negligible; cross-region operators
3880            // would benefit from per-part encrypt + multi-segment
3881            // decrypt as a follow-up.
3882            let needs_re_put = matches!(
3883                ctx.sse,
3884                crate::multipart_state::MultipartSseMode::SseS4
3885                    | crate::multipart_state::MultipartSseMode::SseC { .. }
3886                    | crate::multipart_state::MultipartSseMode::SseKms { .. }
3887            ) || pending_version
3888                .as_ref()
3889                .map(|pv| pv.versioned_response)
3890                .unwrap_or(false);
3891            // Snapshot replication body in advance so we can pass it
3892            // to the spawn helper after the (possibly absent) re-PUT.
3893            let replication_body = assembled_body.clone();
3894            let mut applied_metadata: Option<std::collections::HashMap<String, String>> = None;
3895            if needs_re_put && let Some(body) = assembled_body {
3896                // v0.8.1 #58: same Zeroizing pattern as put_object's
3897                // single-PUT KMS branch — DEK plaintext lives in
3898                // `Zeroizing<[u8; 32]>` for the lifetime of this
3899                // Complete handler, then is wiped on drop.
3900                let kms_wrap: Option<(
3901                    zeroize::Zeroizing<[u8; 32]>,
3902                    crate::kms::WrappedDek,
3903                )> = if let crate::multipart_state::MultipartSseMode::SseKms {
3904                    ref key_id,
3905                } = ctx.sse
3906                {
3907                    let kms = self.kms.as_ref().ok_or_else(|| {
3908                        S3Error::with_message(
3909                            S3ErrorCode::InvalidRequest,
3910                            "SSE-KMS requested but no --kms-local-dir / --kms-aws-region is configured on this gateway",
3911                        )
3912                    })?;
3913                    let (dek, wrapped) = kms
3914                        .generate_dek(key_id)
3915                        .await
3916                        .map_err(kms_error_to_s3)?;
3917                    if dek.len() != 32 {
3918                        return Err(S3Error::with_message(
3919                            S3ErrorCode::InternalError,
3920                            format!(
3921                                "KMS backend returned a DEK of {} bytes (expected 32)",
3922                                dek.len()
3923                            ),
3924                        ));
3925                    }
3926                    let mut dek_arr: zeroize::Zeroizing<[u8; 32]> =
3927                        zeroize::Zeroizing::new([0u8; 32]);
3928                    dek_arr.copy_from_slice(&dek);
3929                    // `dek` (Zeroizing<Vec<u8>>) is dropped at scope end.
3930                    Some((dek_arr, wrapped))
3931                } else {
3932                    None
3933                };
3934                // Build the new metadata map: re-fetch via HEAD so
3935                // the multipart / codec markers the backend stamped
3936                // on Create flow through unchanged, then layer the
3937                // SSE markers on top.
3938                let head_req = S3Request {
3939                    input: HeadObjectInput {
3940                        bucket: bucket.clone(),
3941                        key: key.clone(),
3942                        ..Default::default()
3943                    },
3944                    method: http::Method::HEAD,
3945                    uri: safe_object_uri(&bucket, &key)?,
3946                    headers: http::HeaderMap::new(),
3947                    extensions: http::Extensions::new(),
3948                    credentials: None,
3949                    region: None,
3950                    service: None,
3951                    trailing_headers: None,
3952                };
3953                let mut new_metadata: std::collections::HashMap<String, String> =
3954                    match self.backend.head_object(head_req).await {
3955                        Ok(h) => h.output.metadata.unwrap_or_default(),
3956                        Err(_) => std::collections::HashMap::new(),
3957                    };
3958                let new_body = match &ctx.sse {
3959                    crate::multipart_state::MultipartSseMode::SseC { key, key_md5 } => {
3960                        new_metadata.insert("s4-encrypted".into(), "aes-256-gcm".into());
3961                        new_metadata.insert("s4-sse-type".into(), "AES256".into());
3962                        new_metadata.insert(
3963                            "s4-sse-c-key-md5".into(),
3964                            base64::engine::general_purpose::STANDARD.encode(key_md5),
3965                        );
3966                        // v0.8.2 #62: `key` is `&Zeroizing<[u8; 32]>`;
3967                        // auto-deref through one explicit binding so
3968                        // `SseSource::CustomerKey` gets the `&[u8; 32]`
3969                        // it expects (mirrors the SSE-KMS DEK shape
3970                        // a few lines down).
3971                        let key_ref: &[u8; 32] = key;
3972                        crate::sse::encrypt_with_source(
3973                            &body,
3974                            crate::sse::SseSource::CustomerKey { key: key_ref, key_md5 },
3975                        )
3976                    }
3977                    crate::multipart_state::MultipartSseMode::SseKms { .. } => {
3978                        let (dek, wrapped) = kms_wrap
3979                            .as_ref()
3980                            .expect("SseKms branch implies kms_wrap is Some");
3981                        new_metadata.insert("s4-encrypted".into(), "aes-256-gcm".into());
3982                        new_metadata.insert("s4-sse-type".into(), "aws:kms".into());
3983                        new_metadata
3984                            .insert("s4-sse-kms-key-id".into(), wrapped.key_id.clone());
3985                        // v0.8.1 #58: auto-deref from `&Zeroizing<[u8; 32]>`
3986                        // to `&[u8; 32]` (same shape as the put_object
3987                        // single-PUT branch).
3988                        let dek_ref: &[u8; 32] = dek;
3989                        crate::sse::encrypt_with_source(
3990                            &body,
3991                            crate::sse::SseSource::Kms { dek: dek_ref, wrapped },
3992                        )
3993                    }
3994                    crate::multipart_state::MultipartSseMode::SseS4 => {
3995                        let keyring = self.sse_keyring.as_ref().ok_or_else(|| {
3996                            S3Error::with_message(
3997                                S3ErrorCode::InternalError,
3998                                "SSE-S4 captured at Create but keyring missing at Complete",
3999                            )
4000                        })?;
4001                        new_metadata.insert("s4-encrypted".into(), "aes-256-gcm".into());
4002                        // SSE-S4 deliberately omits `s4-sse-type` so
4003                        // HEAD doesn't falsely advertise AWS-style
4004                        // SSE-S3 (matches the put_object L1929-L1939
4005                        // comment).
4006                        // v0.8 #52: same chunk_size dispatch as the
4007                        // single-PUT branch — multipart Complete
4008                        // re-encrypts the assembled body, so honoring
4009                        // the chunked path here is required to keep
4010                        // GET streaming on multipart-uploaded objects.
4011                        if self.sse_chunk_size > 0 {
4012                            crate::sse::encrypt_v2_chunked(
4013                                &body,
4014                                keyring,
4015                                self.sse_chunk_size,
4016                            )
4017                            .map_err(|e| {
4018                                S3Error::with_message(
4019                                    S3ErrorCode::InternalError,
4020                                    format!(
4021                                        "SSE-S4 chunked encrypt failed at Complete: {e}"
4022                                    ),
4023                                )
4024                            })?
4025                        } else {
4026                            crate::sse::encrypt_v2(&body, keyring)
4027                        }
4028                    }
4029                    crate::multipart_state::MultipartSseMode::None => body.clone(),
4030                };
4031                // v0.8 #54 BUG-6 fix: write the re-PUT under the
4032                // shadow key so the version chain doesn't overwrite
4033                // the previous version on a versioned bucket. The
4034                // original (unshadowed) key was assembled by the
4035                // backend on Complete; we delete it after the shadow
4036                // PUT lands.
4037                let put_target_key = if let Some(pv) = pending_version.as_ref() {
4038                    if pv.versioned_response {
4039                        versioned_shadow_key(&key, &pv.version_id)
4040                    } else {
4041                        key.clone()
4042                    }
4043                } else {
4044                    key.clone()
4045                };
4046                let new_body_len = new_body.len() as i64;
4047                let put_req = S3Request {
4048                    input: PutObjectInput {
4049                        bucket: bucket.clone(),
4050                        key: put_target_key.clone(),
4051                        body: Some(bytes_to_blob(new_body.clone())),
4052                        metadata: Some(new_metadata.clone()),
4053                        content_length: Some(new_body_len),
4054                        ..Default::default()
4055                    },
4056                    method: http::Method::PUT,
4057                    uri: safe_object_uri(&bucket, &put_target_key)?,
4058                    headers: http::HeaderMap::new(),
4059                    extensions: http::Extensions::new(),
4060                    credentials: None,
4061                    region: None,
4062                    service: None,
4063                    trailing_headers: None,
4064                };
4065                self.backend.put_object(put_req).await?;
4066                // If we rewrote the storage key (versioning shadow),
4067                // we must drop the original (unshadowed) Complete-
4068                // assembled bytes so subsequent listings don't see a
4069                // duplicate.
4070                if put_target_key != key {
4071                    let del_req = S3Request {
4072                        input: DeleteObjectInput {
4073                            bucket: bucket.clone(),
4074                            key: key.clone(),
4075                            ..Default::default()
4076                        },
4077                        method: http::Method::DELETE,
4078                        uri: safe_object_uri(&bucket, &key)?,
4079                        headers: http::HeaderMap::new(),
4080                        extensions: http::Extensions::new(),
4081                        credentials: None,
4082                        region: None,
4083                        service: None,
4084                        trailing_headers: None,
4085                    };
4086                    let _ = self.backend.delete_object(del_req).await;
4087                }
4088                applied_metadata = Some(new_metadata);
4089            }
4090            // v0.8 #54 BUG-6 commit: register the new version with
4091            // the VersioningManager so list_object_versions /
4092            // GET ?versionId= see it.
4093            if let (Some(mgr), Some(pv)) = (self.versioning.as_ref(), pending_version.as_ref()) {
4094                let etag = resp
4095                    .output
4096                    .e_tag
4097                    .clone()
4098                    .map(ETag::into_value)
4099                    .unwrap_or_default();
4100                let now = chrono::Utc::now();
4101                mgr.commit_put_with_version(
4102                    &bucket,
4103                    &key,
4104                    crate::versioning::VersionEntry {
4105                        version_id: pv.version_id.clone(),
4106                        etag,
4107                        size: replication_body
4108                            .as_ref()
4109                            .map(|b| b.len() as u64)
4110                            .unwrap_or(0),
4111                        is_delete_marker: false,
4112                        created_at: now,
4113                    },
4114                );
4115                if pv.versioned_response {
4116                    resp.output.version_id = Some(pv.version_id.clone());
4117                }
4118            }
4119            // v0.8 #54 BUG-7 fix: persist any per-upload Object Lock
4120            // recipe + auto-apply the bucket default. Mirrors the
4121            // put_object L2057-L2074 block.
4122            if let Some(mgr) = self.object_lock.as_ref() {
4123                if ctx.object_lock_mode.is_some()
4124                    || ctx.object_lock_retain_until.is_some()
4125                    || ctx.object_lock_legal_hold
4126                {
4127                    let mut state = mgr.get(&bucket, &key).unwrap_or_default();
4128                    if let Some(m) = ctx.object_lock_mode {
4129                        state.mode = Some(m);
4130                    }
4131                    if let Some(u) = ctx.object_lock_retain_until {
4132                        state.retain_until = Some(u);
4133                    }
4134                    if ctx.object_lock_legal_hold {
4135                        state.legal_hold_on = true;
4136                    }
4137                    mgr.set(&bucket, &key, state);
4138                }
4139                mgr.apply_default_on_put(&bucket, &key, chrono::Utc::now());
4140            }
4141            // v0.8 #54 BUG-9 fix: persist the captured tags via the
4142            // TagManager so GetObjectTagging returns them.
4143            if let (Some(mgr), Some(tags)) = (self.tagging.as_ref(), ctx.tags.as_ref()) {
4144                mgr.put_object_tags(&bucket, &key, tags.clone());
4145            }
4146            // SSE-C / SSE-KMS response echo. The
4147            // CompleteMultipartUploadOutput only exposes
4148            // `server_side_encryption` + `ssekms_key_id` (no
4149            // sse_customer_* — those round-tripped on Create / parts).
4150            match &ctx.sse {
4151                crate::multipart_state::MultipartSseMode::SseC { .. } => {
4152                    resp.output.server_side_encryption = Some(
4153                        ServerSideEncryption::from_static(ServerSideEncryption::AES256),
4154                    );
4155                }
4156                crate::multipart_state::MultipartSseMode::SseKms { key_id } => {
4157                    resp.output.server_side_encryption = Some(
4158                        ServerSideEncryption::from_static(ServerSideEncryption::AWS_KMS),
4159                    );
4160                    resp.output.ssekms_key_id = Some(key_id.clone());
4161                }
4162                _ => {}
4163            }
4164            // v0.8 #54 BUG-8 fix: fire cross-bucket replication just
4165            // like put_object L2165 does. We hand the dispatcher the
4166            // assembled body bytes (post-encrypt where applicable, so
4167            // the destination ends up byte-identical to the source's
4168            // on-disk shape) plus the metadata that was actually
4169            // committed.
4170            let replication_body_bytes = replication_body.unwrap_or_default();
4171            // v0.8.2 #61: thread the multipart-Complete `pending_version`
4172            // through so a versioning-Enabled source's destination
4173            // receives the same shadow-key path (mirror of the
4174            // single-PUT branch above).
4175            self.spawn_replication_if_matched(
4176                &bucket,
4177                &key,
4178                &ctx.tags,
4179                &replication_body_bytes,
4180                &applied_metadata,
4181                true,
4182                pending_version.as_ref(),
4183            );
4184            self.multipart_state.remove(upload_id.as_str());
4185        }
4186        // v0.8.1 #59 janitor: best-effort sweep of stale completion
4187        // locks while we are still on the critical path of a single
4188        // Complete (so steady-state workloads of unique keys don't
4189        // accumulate `DashMap` entries). The sweep only retires
4190        // entries whose `Arc::strong_count == 1`, so any other in-
4191        // flight Complete on a different key keeps its lock alive.
4192        // Our own `_completion_guard` keeps `bucket`/`key`'s entry
4193        // alive across this call; it's reaped on the next Complete or
4194        // the next caller-driven prune.
4195        self.multipart_state.prune_completion_locks();
4196        Ok(resp)
4197    }
4198    async fn abort_multipart_upload(
4199        &self,
4200        req: S3Request<AbortMultipartUploadInput>,
4201    ) -> S3Result<S3Response<AbortMultipartUploadOutput>> {
4202        // v0.8 #54: drop the per-upload state (SSE-C key bytes / tag
4203        // set) promptly so an aborted upload doesn't leak the
4204        // customer's key into a long-running gateway's RSS.
4205        self.multipart_state.remove(req.input.upload_id.as_str());
4206        self.backend.abort_multipart_upload(req).await
4207    }
4208    async fn list_multipart_uploads(
4209        &self,
4210        req: S3Request<ListMultipartUploadsInput>,
4211    ) -> S3Result<S3Response<ListMultipartUploadsOutput>> {
4212        self.backend.list_multipart_uploads(req).await
4213    }
4214    async fn list_parts(
4215        &self,
4216        req: S3Request<ListPartsInput>,
4217    ) -> S3Result<S3Response<ListPartsOutput>> {
4218        self.backend.list_parts(req).await
4219    }
4220
4221    // =========================================================================
4222    // Phase 2 — pure passthrough delegations。S4 はこれらに対して圧縮 hook を
4223    // 持たないので、backend (= AWS S3) の動作と完全に同一。
4224    //
4225    // 既知の制限事項:
4226    // - copy_object / upload_part_copy: source object が S4-compressed の場合、
4227    //   backend が bytes を copy するだけなので metadata (s4-codec etc) も一緒に
4228    //   coppied される (AWS S3 default = MetadataDirective COPY)。GET は manifest
4229    //   経由で正しく decompress できる。MetadataDirective REPLACE で上書き
4230    //   されると圧縮 metadata が消えて壊れる — 顧客側の運用で注意
4231    // - list_object_versions: versioning enabled bucket では各 version も S4
4232    //   metadata を維持する。古い version も S4 経由で正しく GET できる。
4233    // =========================================================================
4234
4235    // ---- Object ACL / tagging / attributes ----
4236    async fn get_object_acl(
4237        &self,
4238        req: S3Request<GetObjectAclInput>,
4239    ) -> S3Result<S3Response<GetObjectAclOutput>> {
4240        self.backend.get_object_acl(req).await
4241    }
4242    async fn put_object_acl(
4243        &self,
4244        req: S3Request<PutObjectAclInput>,
4245    ) -> S3Result<S3Response<PutObjectAclOutput>> {
4246        self.backend.put_object_acl(req).await
4247    }
4248    // v0.6 #39: object tagging — when a `TagManager` is attached the
4249    // configuration / per-(bucket, key) state lives in the manager and
4250    // these handlers serve directly from it; when no manager is
4251    // attached they fall back to the backend (legacy passthrough so
4252    // v0.5 deployments are unaffected).
4253    async fn get_object_tagging(
4254        &self,
4255        req: S3Request<GetObjectTaggingInput>,
4256    ) -> S3Result<S3Response<GetObjectTaggingOutput>> {
4257        let Some(mgr) = self.tagging.as_ref() else {
4258            return self.backend.get_object_tagging(req).await;
4259        };
4260        let tags = mgr
4261            .get_object_tags(&req.input.bucket, &req.input.key)
4262            .unwrap_or_default();
4263        Ok(S3Response::new(GetObjectTaggingOutput {
4264            tag_set: tagset_to_aws(&tags),
4265            ..Default::default()
4266        }))
4267    }
4268    async fn put_object_tagging(
4269        &self,
4270        req: S3Request<PutObjectTaggingInput>,
4271    ) -> S3Result<S3Response<PutObjectTaggingOutput>> {
4272        let Some(mgr) = self.tagging.as_ref() else {
4273            return self.backend.put_object_tagging(req).await;
4274        };
4275        let bucket = req.input.bucket.clone();
4276        let key = req.input.key.clone();
4277        let parsed = aws_to_tagset(&req.input.tagging.tag_set).map_err(|e| {
4278            S3Error::with_message(S3ErrorCode::InvalidArgument, e.to_string())
4279        })?;
4280        // v0.6 #39: gate via IAM policy with both the request tags
4281        // (`s3:RequestObjectTag/<key>`) and any existing tags on the
4282        // target object (`s3:ExistingObjectTag/<key>`).
4283        let existing = mgr.get_object_tags(&bucket, &key);
4284        self.enforce_policy_with_extra(
4285            &req,
4286            "s3:PutObjectTagging",
4287            &bucket,
4288            Some(&key),
4289            Some(&parsed),
4290            existing.as_ref(),
4291        )?;
4292        mgr.put_object_tags(&bucket, &key, parsed);
4293        Ok(S3Response::new(PutObjectTaggingOutput::default()))
4294    }
4295    async fn delete_object_tagging(
4296        &self,
4297        req: S3Request<DeleteObjectTaggingInput>,
4298    ) -> S3Result<S3Response<DeleteObjectTaggingOutput>> {
4299        let Some(mgr) = self.tagging.as_ref() else {
4300            return self.backend.delete_object_tagging(req).await;
4301        };
4302        let bucket = req.input.bucket.clone();
4303        let key = req.input.key.clone();
4304        let existing = mgr.get_object_tags(&bucket, &key);
4305        self.enforce_policy_with_extra(
4306            &req,
4307            "s3:DeleteObjectTagging",
4308            &bucket,
4309            Some(&key),
4310            None,
4311            existing.as_ref(),
4312        )?;
4313        mgr.delete_object_tags(&bucket, &key);
4314        Ok(S3Response::new(DeleteObjectTaggingOutput::default()))
4315    }
4316    async fn get_object_attributes(
4317        &self,
4318        req: S3Request<GetObjectAttributesInput>,
4319    ) -> S3Result<S3Response<GetObjectAttributesOutput>> {
4320        self.backend.get_object_attributes(req).await
4321    }
4322    async fn restore_object(
4323        &self,
4324        req: S3Request<RestoreObjectInput>,
4325    ) -> S3Result<S3Response<RestoreObjectOutput>> {
4326        self.backend.restore_object(req).await
4327    }
4328    async fn upload_part_copy(
4329        &self,
4330        req: S3Request<UploadPartCopyInput>,
4331    ) -> S3Result<S3Response<UploadPartCopyOutput>> {
4332        // v0.2 #6: byte-range aware copy when the source is S4-framed.
4333        //
4334        // For a framed source (multipart upload OR single-PUT framed-v2),
4335        // a naive byte-range passthrough would copy compressed bytes that
4336        // don't align with S4 frame boundaries — silently corrupting the
4337        // result. Instead we GET the source through S4 (which handles
4338        // decompression + Range), re-compress + re-frame as a new part,
4339        // and forward as upload_part. For non-framed sources (S4-untouched
4340        // raw objects), passthrough is correct and we keep the original
4341        // (cheaper) code path.
4342        let CopySource::Bucket {
4343            bucket: src_bucket,
4344            key: src_key,
4345            ..
4346        } = &req.input.copy_source
4347        else {
4348            return self.backend.upload_part_copy(req).await;
4349        };
4350        let src_bucket = src_bucket.to_string();
4351        let src_key = src_key.to_string();
4352
4353        // Probe metadata to decide whether the source needs S4-aware copy.
4354        let head_input = HeadObjectInput {
4355            bucket: src_bucket.clone(),
4356            key: src_key.clone(),
4357            ..Default::default()
4358        };
4359        let head_req = S3Request {
4360            input: head_input,
4361            method: http::Method::HEAD,
4362            uri: req.uri.clone(),
4363            headers: req.headers.clone(),
4364            extensions: http::Extensions::new(),
4365            credentials: req.credentials.clone(),
4366            region: req.region.clone(),
4367            service: req.service.clone(),
4368            trailing_headers: None,
4369        };
4370        let needs_s4_copy = match self.backend.head_object(head_req).await {
4371            Ok(h) => {
4372                is_multipart_object(&h.output.metadata) || is_framed_v2_object(&h.output.metadata)
4373            }
4374            Err(_) => false,
4375        };
4376        if !needs_s4_copy {
4377            return self.backend.upload_part_copy(req).await;
4378        }
4379
4380        // Resolve the optional source byte range to pass to GET.
4381        let source_range = req
4382            .input
4383            .copy_source_range
4384            .as_ref()
4385            .map(|r| parse_copy_source_range(r))
4386            .transpose()
4387            .map_err(|e| S3Error::with_message(S3ErrorCode::InvalidRange, e))?;
4388
4389        // GET source via S4 (handles decompression + sidecar partial fetch
4390        // when range is present). The result is the requested user-visible
4391        // byte range, fully decompressed.
4392        let mut get_input = GetObjectInput {
4393            bucket: src_bucket.clone(),
4394            key: src_key.clone(),
4395            ..Default::default()
4396        };
4397        get_input.range = source_range;
4398        let get_req = S3Request {
4399            input: get_input,
4400            method: http::Method::GET,
4401            uri: req.uri.clone(),
4402            headers: req.headers.clone(),
4403            extensions: http::Extensions::new(),
4404            credentials: req.credentials.clone(),
4405            region: req.region.clone(),
4406            service: req.service.clone(),
4407            trailing_headers: None,
4408        };
4409        let get_resp = self.get_object(get_req).await?;
4410        let blob = get_resp.output.body.ok_or_else(|| {
4411            S3Error::with_message(
4412                S3ErrorCode::InternalError,
4413                "upload_part_copy: empty body from source GET",
4414            )
4415        })?;
4416        let bytes = collect_blob(blob, self.max_body_bytes)
4417            .await
4418            .map_err(internal("collect upload_part_copy source body"))?;
4419
4420        // Compress + frame as a fresh part (mirrors upload_part path).
4421        let sample_len = bytes.len().min(SAMPLE_BYTES);
4422        // v0.8 #56: same size-hint promotion as the upload_part path.
4423        let codec_kind = self
4424            .dispatcher
4425            .pick_with_size_hint(&bytes[..sample_len], Some(bytes.len() as u64))
4426            .await;
4427        let original_size = bytes.len() as u64;
4428        // v0.8 #55: telemetry-returning compress (GPU metrics stamp).
4429        let (compress_res, tel) = self
4430            .registry
4431            .compress_with_telemetry(bytes, codec_kind)
4432            .await;
4433        stamp_gpu_compress_telemetry(&tel);
4434        let (compressed, manifest) =
4435            compress_res.map_err(internal("registry compress upload_part_copy"))?;
4436        let header = FrameHeader {
4437            codec: codec_kind,
4438            original_size,
4439            compressed_size: compressed.len() as u64,
4440            crc32c: manifest.crc32c,
4441        };
4442        let mut framed = BytesMut::with_capacity(FRAME_HEADER_BYTES + compressed.len());
4443        write_frame(&mut framed, header, &compressed);
4444        let likely_final = original_size < S3_MULTIPART_MIN_PART_BYTES as u64;
4445        if !likely_final {
4446            pad_to_minimum(&mut framed, S3_MULTIPART_MIN_PART_BYTES);
4447        }
4448        let framed_bytes = framed.freeze();
4449        let framed_len = framed_bytes.len() as i64;
4450
4451        // Forward as upload_part to the destination multipart upload.
4452        let part_input = UploadPartInput {
4453            bucket: req.input.bucket.clone(),
4454            key: req.input.key.clone(),
4455            part_number: req.input.part_number,
4456            upload_id: req.input.upload_id.clone(),
4457            body: Some(bytes_to_blob(framed_bytes)),
4458            content_length: Some(framed_len),
4459            ..Default::default()
4460        };
4461        let part_req = S3Request {
4462            input: part_input,
4463            method: http::Method::PUT,
4464            uri: req.uri.clone(),
4465            headers: req.headers.clone(),
4466            extensions: http::Extensions::new(),
4467            credentials: req.credentials.clone(),
4468            region: req.region.clone(),
4469            service: req.service.clone(),
4470            trailing_headers: None,
4471        };
4472        let upload_resp = self.backend.upload_part(part_req).await?;
4473
4474        let copy_output = UploadPartCopyOutput {
4475            copy_part_result: Some(CopyPartResult {
4476                e_tag: upload_resp.output.e_tag.clone(),
4477                ..Default::default()
4478            }),
4479            ..Default::default()
4480        };
4481        Ok(S3Response::new(copy_output))
4482    }
4483
4484    // ---- Object lock / retention / legal hold (v0.5 #30) ----
4485    //
4486    // When an `ObjectLockManager` is attached the configuration / per-object
4487    // state lives in the manager and these handlers serve directly from it;
4488    // when no manager is attached they fall back to the backend (legacy
4489    // passthrough so v0.4 deployments are unaffected).
4490    async fn get_object_lock_configuration(
4491        &self,
4492        req: S3Request<GetObjectLockConfigurationInput>,
4493    ) -> S3Result<S3Response<GetObjectLockConfigurationOutput>> {
4494        if let Some(mgr) = self.object_lock.as_ref() {
4495            let cfg = mgr.bucket_default(&req.input.bucket).map(|d| {
4496                ObjectLockConfiguration {
4497                    object_lock_enabled: Some(ObjectLockEnabled::from_static(
4498                        ObjectLockEnabled::ENABLED,
4499                    )),
4500                    rule: Some(ObjectLockRule {
4501                        default_retention: Some(DefaultRetention {
4502                            days: Some(d.retention_days as i32),
4503                            mode: Some(ObjectLockRetentionMode::from_static(
4504                                match d.mode {
4505                                    crate::object_lock::LockMode::Governance => {
4506                                        ObjectLockRetentionMode::GOVERNANCE
4507                                    }
4508                                    crate::object_lock::LockMode::Compliance => {
4509                                        ObjectLockRetentionMode::COMPLIANCE
4510                                    }
4511                                },
4512                            )),
4513                            years: None,
4514                        }),
4515                    }),
4516                }
4517            });
4518            let output = GetObjectLockConfigurationOutput {
4519                object_lock_configuration: cfg,
4520            };
4521            return Ok(S3Response::new(output));
4522        }
4523        self.backend.get_object_lock_configuration(req).await
4524    }
4525    async fn put_object_lock_configuration(
4526        &self,
4527        req: S3Request<PutObjectLockConfigurationInput>,
4528    ) -> S3Result<S3Response<PutObjectLockConfigurationOutput>> {
4529        if let Some(mgr) = self.object_lock.as_ref() {
4530            let bucket = req.input.bucket.clone();
4531            if let Some(cfg) = req.input.object_lock_configuration.as_ref()
4532                && let Some(rule) = cfg.rule.as_ref()
4533                && let Some(d) = rule.default_retention.as_ref()
4534            {
4535                let mode = d
4536                    .mode
4537                    .as_ref()
4538                    .and_then(|m| crate::object_lock::LockMode::from_aws_str(m.as_str()))
4539                    .ok_or_else(|| {
4540                        S3Error::with_message(
4541                            S3ErrorCode::InvalidRequest,
4542                            "Object Lock default retention requires a valid Mode (GOVERNANCE | COMPLIANCE)",
4543                        )
4544                    })?;
4545                // S3 spec: exactly one of Days / Years (we accept Days
4546                // outright and convert Years → Days for storage; Years
4547                // is just a UX shorthand on the wire).
4548                let days: u32 = match (d.days, d.years) {
4549                    (Some(d), None) if d > 0 => d as u32,
4550                    (None, Some(y)) if y > 0 => (y as u32).saturating_mul(365),
4551                    _ => {
4552                        return Err(S3Error::with_message(
4553                            S3ErrorCode::InvalidRequest,
4554                            "Object Lock default retention requires exactly one of Days or Years (positive integer)",
4555                        ));
4556                    }
4557                };
4558                mgr.set_bucket_default(
4559                    &bucket,
4560                    crate::object_lock::BucketObjectLockDefault {
4561                        mode,
4562                        retention_days: days,
4563                    },
4564                );
4565            }
4566            return Ok(S3Response::new(PutObjectLockConfigurationOutput::default()));
4567        }
4568        self.backend.put_object_lock_configuration(req).await
4569    }
4570    async fn get_object_legal_hold(
4571        &self,
4572        req: S3Request<GetObjectLegalHoldInput>,
4573    ) -> S3Result<S3Response<GetObjectLegalHoldOutput>> {
4574        if let Some(mgr) = self.object_lock.as_ref() {
4575            let on = mgr
4576                .get(&req.input.bucket, &req.input.key)
4577                .map(|s| s.legal_hold_on)
4578                .unwrap_or(false);
4579            let status = ObjectLockLegalHoldStatus::from_static(if on {
4580                ObjectLockLegalHoldStatus::ON
4581            } else {
4582                ObjectLockLegalHoldStatus::OFF
4583            });
4584            let output = GetObjectLegalHoldOutput {
4585                legal_hold: Some(ObjectLockLegalHold {
4586                    status: Some(status),
4587                }),
4588            };
4589            return Ok(S3Response::new(output));
4590        }
4591        self.backend.get_object_legal_hold(req).await
4592    }
4593    async fn put_object_legal_hold(
4594        &self,
4595        req: S3Request<PutObjectLegalHoldInput>,
4596    ) -> S3Result<S3Response<PutObjectLegalHoldOutput>> {
4597        if let Some(mgr) = self.object_lock.as_ref() {
4598            let on = req
4599                .input
4600                .legal_hold
4601                .as_ref()
4602                .and_then(|h| h.status.as_ref())
4603                .map(|s| s.as_str().eq_ignore_ascii_case("ON"))
4604                .unwrap_or(false);
4605            mgr.set_legal_hold(&req.input.bucket, &req.input.key, on);
4606            return Ok(S3Response::new(PutObjectLegalHoldOutput::default()));
4607        }
4608        self.backend.put_object_legal_hold(req).await
4609    }
4610    async fn get_object_retention(
4611        &self,
4612        req: S3Request<GetObjectRetentionInput>,
4613    ) -> S3Result<S3Response<GetObjectRetentionOutput>> {
4614        if let Some(mgr) = self.object_lock.as_ref() {
4615            let retention = mgr
4616                .get(&req.input.bucket, &req.input.key)
4617                .filter(|s| s.mode.is_some() || s.retain_until.is_some())
4618                .map(|s| {
4619                    let mode = s.mode.map(|m| {
4620                        ObjectLockRetentionMode::from_static(match m {
4621                            crate::object_lock::LockMode::Governance => {
4622                                ObjectLockRetentionMode::GOVERNANCE
4623                            }
4624                            crate::object_lock::LockMode::Compliance => {
4625                                ObjectLockRetentionMode::COMPLIANCE
4626                            }
4627                        })
4628                    });
4629                    let until = s.retain_until.map(chrono_utc_to_timestamp);
4630                    ObjectLockRetention {
4631                        mode,
4632                        retain_until_date: until,
4633                    }
4634                });
4635            let output = GetObjectRetentionOutput { retention };
4636            return Ok(S3Response::new(output));
4637        }
4638        self.backend.get_object_retention(req).await
4639    }
4640    async fn put_object_retention(
4641        &self,
4642        req: S3Request<PutObjectRetentionInput>,
4643    ) -> S3Result<S3Response<PutObjectRetentionOutput>> {
4644        if let Some(mgr) = self.object_lock.as_ref() {
4645            let bucket = req.input.bucket.clone();
4646            let key = req.input.key.clone();
4647            let bypass = req.input.bypass_governance_retention.unwrap_or(false);
4648            let retention = req.input.retention.as_ref().ok_or_else(|| {
4649                S3Error::with_message(
4650                    S3ErrorCode::InvalidRequest,
4651                    "PutObjectRetention requires a Retention element",
4652                )
4653            })?;
4654            let new_mode = retention
4655                .mode
4656                .as_ref()
4657                .and_then(|m| crate::object_lock::LockMode::from_aws_str(m.as_str()));
4658            let new_until = retention
4659                .retain_until_date
4660                .as_ref()
4661                .map(timestamp_to_chrono_utc)
4662                .unwrap_or(None);
4663            let now = chrono::Utc::now();
4664            let existing = mgr.get(&bucket, &key).unwrap_or_default();
4665            // S3 immutability rules:
4666            //   - Compliance is one-way: once set, mode cannot move to
4667            //     Governance, and retain-until cannot be shortened.
4668            //   - Governance can be lengthened freely; shortened only
4669            //     with bypass=true.
4670            if let Some(existing_mode) = existing.mode
4671                && existing_mode == crate::object_lock::LockMode::Compliance
4672                && existing.is_locked(now)
4673            {
4674                if matches!(new_mode, Some(crate::object_lock::LockMode::Governance)) {
4675                    return Err(S3Error::with_message(
4676                        S3ErrorCode::AccessDenied,
4677                        "Cannot downgrade Compliance retention to Governance while lock is active",
4678                    ));
4679                }
4680                if let (Some(prev), Some(next)) = (existing.retain_until, new_until)
4681                    && next < prev
4682                {
4683                    return Err(S3Error::with_message(
4684                        S3ErrorCode::AccessDenied,
4685                        "Cannot shorten Compliance retention while lock is active",
4686                    ));
4687                }
4688            }
4689            if let Some(existing_mode) = existing.mode
4690                && existing_mode == crate::object_lock::LockMode::Governance
4691                && existing.is_locked(now)
4692                && !bypass
4693                && let (Some(prev), Some(next)) = (existing.retain_until, new_until)
4694                && next < prev
4695            {
4696                return Err(S3Error::with_message(
4697                    S3ErrorCode::AccessDenied,
4698                    "Shortening Governance retention requires x-amz-bypass-governance-retention: true",
4699                ));
4700            }
4701            let mut state = existing;
4702            if new_mode.is_some() {
4703                state.mode = new_mode;
4704            }
4705            if new_until.is_some() {
4706                state.retain_until = new_until;
4707            }
4708            mgr.set(&bucket, &key, state);
4709            return Ok(S3Response::new(PutObjectRetentionOutput::default()));
4710        }
4711        self.backend.put_object_retention(req).await
4712    }
4713
4714    // ---- Versioning ----
4715    // list_object_versions is implemented above in the compression-hook
4716    // section so it filters S4-internal sidecars (v0.4 #17) AND, when a
4717    // VersioningManager is attached (v0.5 #34), serves chains directly
4718    // from the in-memory index.
4719    async fn get_bucket_versioning(
4720        &self,
4721        req: S3Request<GetBucketVersioningInput>,
4722    ) -> S3Result<S3Response<GetBucketVersioningOutput>> {
4723        // v0.5 #34: when a VersioningManager is attached, the bucket's
4724        // versioning state lives in the manager (= S4-server's
4725        // authoritative source). Pass-through hits the backend only
4726        // when no manager is configured (legacy v0.4 behaviour).
4727        if let Some(mgr) = self.versioning.as_ref() {
4728            let output = match mgr.state(&req.input.bucket).as_aws_status() {
4729                Some(s) => GetBucketVersioningOutput {
4730                    status: Some(BucketVersioningStatus::from(s.to_owned())),
4731                    ..Default::default()
4732                },
4733                None => GetBucketVersioningOutput::default(),
4734            };
4735            return Ok(S3Response::new(output));
4736        }
4737        self.backend.get_bucket_versioning(req).await
4738    }
4739    async fn put_bucket_versioning(
4740        &self,
4741        req: S3Request<PutBucketVersioningInput>,
4742    ) -> S3Result<S3Response<PutBucketVersioningOutput>> {
4743        // v0.6 #42: MFA gating on the `PutBucketVersioning` request
4744        // itself. S3 spec: when the request body carries an
4745        // `MfaDelete` element (either `Enabled` or `Disabled`), the
4746        // request must include a valid `x-amz-mfa` token — both for
4747        // the *first* enable (so the operator can't quietly side-step
4748        // the gate by never enabling it) and for any subsequent
4749        // change (so a leaked credential alone can't disable MFA
4750        // Delete to bypass it on subsequent DELETEs). Requests that
4751        // omit the `MfaDelete` element entirely (i.e. they flip only
4752        // `Status`) skip this gate, matching AWS.
4753        if let Some(mgr) = self.mfa_delete.as_ref()
4754            && let Some(target_enabled) = req
4755                .input
4756                .versioning_configuration
4757                .mfa_delete
4758                .as_ref()
4759                .map(|m| m.as_str().eq_ignore_ascii_case("Enabled"))
4760        {
4761            let bucket = req.input.bucket.clone();
4762            let header = req.input.mfa.as_deref();
4763            let secret = mgr.lookup_secret(&bucket);
4764            let verified = match (header, secret.as_ref()) {
4765                (Some(h), Some(s)) => match crate::mfa::parse_mfa_header(h) {
4766                    Ok((serial, code)) => {
4767                        serial == s.serial
4768                            && crate::mfa::verify_totp(
4769                                &s.secret_base32,
4770                                &code,
4771                                current_unix_secs(),
4772                            )
4773                    }
4774                    Err(_) => false,
4775                },
4776                _ => false,
4777            };
4778            if !verified {
4779                crate::metrics::record_mfa_delete_denial(&bucket);
4780                let err = if header.is_none() {
4781                    crate::mfa::MfaError::Missing
4782                } else {
4783                    crate::mfa::MfaError::InvalidCode
4784                };
4785                return Err(mfa_error_to_s3(err));
4786            }
4787            mgr.set_bucket_state(&bucket, target_enabled);
4788        }
4789        // v0.5 #34: stash the new state in the manager, then forward to
4790        // the backend so any downstream that *also* tracks state
4791        // (e.g. a real S3 backend) stays in sync. Manager-attached but
4792        // backend rejection is treated as a soft-fail (state is still
4793        // owned by the manager).
4794        if let Some(mgr) = self.versioning.as_ref() {
4795            let new_state = match req
4796                .input
4797                .versioning_configuration
4798                .status
4799                .as_ref()
4800                .map(|s| s.as_str())
4801            {
4802                Some(s) if s.eq_ignore_ascii_case("Enabled") => {
4803                    crate::versioning::VersioningState::Enabled
4804                }
4805                Some(s) if s.eq_ignore_ascii_case("Suspended") => {
4806                    crate::versioning::VersioningState::Suspended
4807                }
4808                _ => crate::versioning::VersioningState::Unversioned,
4809            };
4810            mgr.set_state(&req.input.bucket, new_state);
4811            return Ok(S3Response::new(PutBucketVersioningOutput::default()));
4812        }
4813        self.backend.put_bucket_versioning(req).await
4814    }
4815
4816    // ---- Bucket location ----
4817    async fn get_bucket_location(
4818        &self,
4819        req: S3Request<GetBucketLocationInput>,
4820    ) -> S3Result<S3Response<GetBucketLocationOutput>> {
4821        self.backend.get_bucket_location(req).await
4822    }
4823
4824    // ---- Bucket policy ----
4825    async fn get_bucket_policy(
4826        &self,
4827        req: S3Request<GetBucketPolicyInput>,
4828    ) -> S3Result<S3Response<GetBucketPolicyOutput>> {
4829        self.backend.get_bucket_policy(req).await
4830    }
4831    async fn put_bucket_policy(
4832        &self,
4833        req: S3Request<PutBucketPolicyInput>,
4834    ) -> S3Result<S3Response<PutBucketPolicyOutput>> {
4835        self.backend.put_bucket_policy(req).await
4836    }
4837    async fn delete_bucket_policy(
4838        &self,
4839        req: S3Request<DeleteBucketPolicyInput>,
4840    ) -> S3Result<S3Response<DeleteBucketPolicyOutput>> {
4841        self.backend.delete_bucket_policy(req).await
4842    }
4843    async fn get_bucket_policy_status(
4844        &self,
4845        req: S3Request<GetBucketPolicyStatusInput>,
4846    ) -> S3Result<S3Response<GetBucketPolicyStatusOutput>> {
4847        self.backend.get_bucket_policy_status(req).await
4848    }
4849
4850    // ---- Bucket ACL ----
4851    async fn get_bucket_acl(
4852        &self,
4853        req: S3Request<GetBucketAclInput>,
4854    ) -> S3Result<S3Response<GetBucketAclOutput>> {
4855        self.backend.get_bucket_acl(req).await
4856    }
4857    async fn put_bucket_acl(
4858        &self,
4859        req: S3Request<PutBucketAclInput>,
4860    ) -> S3Result<S3Response<PutBucketAclOutput>> {
4861        self.backend.put_bucket_acl(req).await
4862    }
4863
4864    // ---- Bucket CORS (v0.6 #38) ----
4865    async fn get_bucket_cors(
4866        &self,
4867        req: S3Request<GetBucketCorsInput>,
4868    ) -> S3Result<S3Response<GetBucketCorsOutput>> {
4869        if let Some(mgr) = self.cors.as_ref() {
4870            let cfg = mgr.get(&req.input.bucket).ok_or_else(|| {
4871                S3Error::with_message(
4872                    S3ErrorCode::NoSuchCORSConfiguration,
4873                    "The CORS configuration does not exist".to_string(),
4874                )
4875            })?;
4876            let rules: Vec<CORSRule> = cfg
4877                .rules
4878                .into_iter()
4879                .map(|r| CORSRule {
4880                    allowed_headers: if r.allowed_headers.is_empty() {
4881                        None
4882                    } else {
4883                        Some(r.allowed_headers)
4884                    },
4885                    allowed_methods: r.allowed_methods,
4886                    allowed_origins: r.allowed_origins,
4887                    expose_headers: if r.expose_headers.is_empty() {
4888                        None
4889                    } else {
4890                        Some(r.expose_headers)
4891                    },
4892                    id: r.id,
4893                    max_age_seconds: r.max_age_seconds.map(|s| s as i32),
4894                })
4895                .collect();
4896            return Ok(S3Response::new(GetBucketCorsOutput {
4897                cors_rules: Some(rules),
4898            }));
4899        }
4900        self.backend.get_bucket_cors(req).await
4901    }
4902    async fn put_bucket_cors(
4903        &self,
4904        req: S3Request<PutBucketCorsInput>,
4905    ) -> S3Result<S3Response<PutBucketCorsOutput>> {
4906        if let Some(mgr) = self.cors.as_ref() {
4907            let cfg = crate::cors::CorsConfig {
4908                rules: req
4909                    .input
4910                    .cors_configuration
4911                    .cors_rules
4912                    .into_iter()
4913                    .map(|r| crate::cors::CorsRule {
4914                        allowed_origins: r.allowed_origins,
4915                        allowed_methods: r.allowed_methods,
4916                        allowed_headers: r.allowed_headers.unwrap_or_default(),
4917                        expose_headers: r.expose_headers.unwrap_or_default(),
4918                        max_age_seconds: r.max_age_seconds.and_then(|s| {
4919                            if s < 0 { None } else { Some(s as u32) }
4920                        }),
4921                        id: r.id,
4922                    })
4923                    .collect(),
4924            };
4925            mgr.put(&req.input.bucket, cfg);
4926            return Ok(S3Response::new(PutBucketCorsOutput::default()));
4927        }
4928        self.backend.put_bucket_cors(req).await
4929    }
4930    async fn delete_bucket_cors(
4931        &self,
4932        req: S3Request<DeleteBucketCorsInput>,
4933    ) -> S3Result<S3Response<DeleteBucketCorsOutput>> {
4934        if let Some(mgr) = self.cors.as_ref() {
4935            mgr.delete(&req.input.bucket);
4936            return Ok(S3Response::new(DeleteBucketCorsOutput::default()));
4937        }
4938        self.backend.delete_bucket_cors(req).await
4939    }
4940
4941    // ---- Bucket lifecycle (v0.6 #37) ----
4942    async fn get_bucket_lifecycle_configuration(
4943        &self,
4944        req: S3Request<GetBucketLifecycleConfigurationInput>,
4945    ) -> S3Result<S3Response<GetBucketLifecycleConfigurationOutput>> {
4946        if let Some(mgr) = self.lifecycle.as_ref() {
4947            let cfg = mgr.get(&req.input.bucket).ok_or_else(|| {
4948                S3Error::with_message(
4949                    S3ErrorCode::NoSuchLifecycleConfiguration,
4950                    "The lifecycle configuration does not exist".to_string(),
4951                )
4952            })?;
4953            let rules: Vec<LifecycleRule> = cfg.rules.iter().map(internal_rule_to_dto).collect();
4954            return Ok(S3Response::new(GetBucketLifecycleConfigurationOutput {
4955                rules: Some(rules),
4956                transition_default_minimum_object_size: None,
4957            }));
4958        }
4959        self.backend.get_bucket_lifecycle_configuration(req).await
4960    }
4961    async fn put_bucket_lifecycle_configuration(
4962        &self,
4963        req: S3Request<PutBucketLifecycleConfigurationInput>,
4964    ) -> S3Result<S3Response<PutBucketLifecycleConfigurationOutput>> {
4965        if let Some(mgr) = self.lifecycle.as_ref() {
4966            let bucket = req.input.bucket.clone();
4967            let dto_cfg = req.input.lifecycle_configuration.unwrap_or_default();
4968            let cfg = dto_lifecycle_to_internal(&dto_cfg);
4969            mgr.put(&bucket, cfg);
4970            return Ok(S3Response::new(
4971                PutBucketLifecycleConfigurationOutput::default(),
4972            ));
4973        }
4974        self.backend.put_bucket_lifecycle_configuration(req).await
4975    }
4976    async fn delete_bucket_lifecycle(
4977        &self,
4978        req: S3Request<DeleteBucketLifecycleInput>,
4979    ) -> S3Result<S3Response<DeleteBucketLifecycleOutput>> {
4980        if let Some(mgr) = self.lifecycle.as_ref() {
4981            mgr.delete(&req.input.bucket);
4982            return Ok(S3Response::new(DeleteBucketLifecycleOutput::default()));
4983        }
4984        self.backend.delete_bucket_lifecycle(req).await
4985    }
4986
4987    // ---- Bucket tagging (v0.6 #39) ----
4988    async fn get_bucket_tagging(
4989        &self,
4990        req: S3Request<GetBucketTaggingInput>,
4991    ) -> S3Result<S3Response<GetBucketTaggingOutput>> {
4992        let Some(mgr) = self.tagging.as_ref() else {
4993            return self.backend.get_bucket_tagging(req).await;
4994        };
4995        let tags = mgr.get_bucket_tags(&req.input.bucket).unwrap_or_default();
4996        Ok(S3Response::new(GetBucketTaggingOutput {
4997            tag_set: tagset_to_aws(&tags),
4998        }))
4999    }
5000    async fn put_bucket_tagging(
5001        &self,
5002        req: S3Request<PutBucketTaggingInput>,
5003    ) -> S3Result<S3Response<PutBucketTaggingOutput>> {
5004        let Some(mgr) = self.tagging.as_ref() else {
5005            return self.backend.put_bucket_tagging(req).await;
5006        };
5007        let bucket = req.input.bucket.clone();
5008        let parsed = aws_to_tagset(&req.input.tagging.tag_set).map_err(|e| {
5009            S3Error::with_message(S3ErrorCode::InvalidArgument, e.to_string())
5010        })?;
5011        self.enforce_policy(&req, "s3:PutBucketTagging", &bucket, None)?;
5012        mgr.put_bucket_tags(&bucket, parsed);
5013        Ok(S3Response::new(PutBucketTaggingOutput::default()))
5014    }
5015    async fn delete_bucket_tagging(
5016        &self,
5017        req: S3Request<DeleteBucketTaggingInput>,
5018    ) -> S3Result<S3Response<DeleteBucketTaggingOutput>> {
5019        let Some(mgr) = self.tagging.as_ref() else {
5020            return self.backend.delete_bucket_tagging(req).await;
5021        };
5022        let bucket = req.input.bucket.clone();
5023        self.enforce_policy(&req, "s3:PutBucketTagging", &bucket, None)?;
5024        mgr.delete_bucket_tags(&bucket);
5025        Ok(S3Response::new(DeleteBucketTaggingOutput::default()))
5026    }
5027
5028    // ---- Bucket encryption ----
5029    async fn get_bucket_encryption(
5030        &self,
5031        req: S3Request<GetBucketEncryptionInput>,
5032    ) -> S3Result<S3Response<GetBucketEncryptionOutput>> {
5033        self.backend.get_bucket_encryption(req).await
5034    }
5035    async fn put_bucket_encryption(
5036        &self,
5037        req: S3Request<PutBucketEncryptionInput>,
5038    ) -> S3Result<S3Response<PutBucketEncryptionOutput>> {
5039        self.backend.put_bucket_encryption(req).await
5040    }
5041    async fn delete_bucket_encryption(
5042        &self,
5043        req: S3Request<DeleteBucketEncryptionInput>,
5044    ) -> S3Result<S3Response<DeleteBucketEncryptionOutput>> {
5045        self.backend.delete_bucket_encryption(req).await
5046    }
5047
5048    // ---- Bucket logging ----
5049    async fn get_bucket_logging(
5050        &self,
5051        req: S3Request<GetBucketLoggingInput>,
5052    ) -> S3Result<S3Response<GetBucketLoggingOutput>> {
5053        self.backend.get_bucket_logging(req).await
5054    }
5055    async fn put_bucket_logging(
5056        &self,
5057        req: S3Request<PutBucketLoggingInput>,
5058    ) -> S3Result<S3Response<PutBucketLoggingOutput>> {
5059        self.backend.put_bucket_logging(req).await
5060    }
5061
5062    // ---- Bucket notification (v0.6 #35) ----
5063    //
5064    // When a `NotificationManager` is attached, S4 itself owns per-bucket
5065    // notification configurations and the PUT / GET handlers route through
5066    // the manager. The wire DTO's queue / topic configurations map onto
5067    // S4's `Destination::Sqs` / `Destination::Sns`; LambdaFunction and
5068    // EventBridge configurations are accepted on PUT but silently dropped
5069    // (out of scope for v0.6 #35). When no manager is attached the legacy
5070    // backend-passthrough behaviour applies.
5071    async fn get_bucket_notification_configuration(
5072        &self,
5073        req: S3Request<GetBucketNotificationConfigurationInput>,
5074    ) -> S3Result<S3Response<GetBucketNotificationConfigurationOutput>> {
5075        if let Some(mgr) = self.notifications.as_ref() {
5076            let cfg = mgr.get(&req.input.bucket).unwrap_or_default();
5077            let dto = notif_to_dto(&cfg);
5078            return Ok(S3Response::new(GetBucketNotificationConfigurationOutput {
5079                event_bridge_configuration: dto.event_bridge_configuration,
5080                lambda_function_configurations: dto.lambda_function_configurations,
5081                queue_configurations: dto.queue_configurations,
5082                topic_configurations: dto.topic_configurations,
5083            }));
5084        }
5085        self.backend
5086            .get_bucket_notification_configuration(req)
5087            .await
5088    }
5089    async fn put_bucket_notification_configuration(
5090        &self,
5091        req: S3Request<PutBucketNotificationConfigurationInput>,
5092    ) -> S3Result<S3Response<PutBucketNotificationConfigurationOutput>> {
5093        if let Some(mgr) = self.notifications.as_ref() {
5094            let cfg = notif_from_dto(&req.input.notification_configuration);
5095            mgr.put(&req.input.bucket, cfg);
5096            return Ok(S3Response::new(
5097                PutBucketNotificationConfigurationOutput::default(),
5098            ));
5099        }
5100        self.backend
5101            .put_bucket_notification_configuration(req)
5102            .await
5103    }
5104
5105    // ---- Bucket request payment ----
5106    async fn get_bucket_request_payment(
5107        &self,
5108        req: S3Request<GetBucketRequestPaymentInput>,
5109    ) -> S3Result<S3Response<GetBucketRequestPaymentOutput>> {
5110        self.backend.get_bucket_request_payment(req).await
5111    }
5112    async fn put_bucket_request_payment(
5113        &self,
5114        req: S3Request<PutBucketRequestPaymentInput>,
5115    ) -> S3Result<S3Response<PutBucketRequestPaymentOutput>> {
5116        self.backend.put_bucket_request_payment(req).await
5117    }
5118
5119    // ---- Bucket website ----
5120    async fn get_bucket_website(
5121        &self,
5122        req: S3Request<GetBucketWebsiteInput>,
5123    ) -> S3Result<S3Response<GetBucketWebsiteOutput>> {
5124        self.backend.get_bucket_website(req).await
5125    }
5126    async fn put_bucket_website(
5127        &self,
5128        req: S3Request<PutBucketWebsiteInput>,
5129    ) -> S3Result<S3Response<PutBucketWebsiteOutput>> {
5130        self.backend.put_bucket_website(req).await
5131    }
5132    async fn delete_bucket_website(
5133        &self,
5134        req: S3Request<DeleteBucketWebsiteInput>,
5135    ) -> S3Result<S3Response<DeleteBucketWebsiteOutput>> {
5136        self.backend.delete_bucket_website(req).await
5137    }
5138
5139    // ---- Bucket replication (v0.6 #40) ----
5140    async fn get_bucket_replication(
5141        &self,
5142        req: S3Request<GetBucketReplicationInput>,
5143    ) -> S3Result<S3Response<GetBucketReplicationOutput>> {
5144        if let Some(mgr) = self.replication.as_ref() {
5145            return match mgr.get(&req.input.bucket) {
5146                Some(cfg) => Ok(S3Response::new(GetBucketReplicationOutput {
5147                    replication_configuration: Some(replication_to_dto(&cfg)),
5148                })),
5149                None => Err(S3Error::with_message(
5150                    S3ErrorCode::Custom("ReplicationConfigurationNotFoundError".into()),
5151                    format!("no replication configuration on bucket {}", req.input.bucket),
5152                )),
5153            };
5154        }
5155        self.backend.get_bucket_replication(req).await
5156    }
5157    async fn put_bucket_replication(
5158        &self,
5159        req: S3Request<PutBucketReplicationInput>,
5160    ) -> S3Result<S3Response<PutBucketReplicationOutput>> {
5161        if let Some(mgr) = self.replication.as_ref() {
5162            let cfg = replication_from_dto(&req.input.replication_configuration);
5163            mgr.put(&req.input.bucket, cfg);
5164            return Ok(S3Response::new(PutBucketReplicationOutput::default()));
5165        }
5166        self.backend.put_bucket_replication(req).await
5167    }
5168    async fn delete_bucket_replication(
5169        &self,
5170        req: S3Request<DeleteBucketReplicationInput>,
5171    ) -> S3Result<S3Response<DeleteBucketReplicationOutput>> {
5172        if let Some(mgr) = self.replication.as_ref() {
5173            mgr.delete(&req.input.bucket);
5174            return Ok(S3Response::new(DeleteBucketReplicationOutput::default()));
5175        }
5176        self.backend.delete_bucket_replication(req).await
5177    }
5178
5179    // ---- Bucket accelerate ----
5180    async fn get_bucket_accelerate_configuration(
5181        &self,
5182        req: S3Request<GetBucketAccelerateConfigurationInput>,
5183    ) -> S3Result<S3Response<GetBucketAccelerateConfigurationOutput>> {
5184        self.backend.get_bucket_accelerate_configuration(req).await
5185    }
5186    async fn put_bucket_accelerate_configuration(
5187        &self,
5188        req: S3Request<PutBucketAccelerateConfigurationInput>,
5189    ) -> S3Result<S3Response<PutBucketAccelerateConfigurationOutput>> {
5190        self.backend.put_bucket_accelerate_configuration(req).await
5191    }
5192
5193    // ---- Bucket ownership controls ----
5194    async fn get_bucket_ownership_controls(
5195        &self,
5196        req: S3Request<GetBucketOwnershipControlsInput>,
5197    ) -> S3Result<S3Response<GetBucketOwnershipControlsOutput>> {
5198        self.backend.get_bucket_ownership_controls(req).await
5199    }
5200    async fn put_bucket_ownership_controls(
5201        &self,
5202        req: S3Request<PutBucketOwnershipControlsInput>,
5203    ) -> S3Result<S3Response<PutBucketOwnershipControlsOutput>> {
5204        self.backend.put_bucket_ownership_controls(req).await
5205    }
5206    async fn delete_bucket_ownership_controls(
5207        &self,
5208        req: S3Request<DeleteBucketOwnershipControlsInput>,
5209    ) -> S3Result<S3Response<DeleteBucketOwnershipControlsOutput>> {
5210        self.backend.delete_bucket_ownership_controls(req).await
5211    }
5212
5213    // ---- Public access block ----
5214    async fn get_public_access_block(
5215        &self,
5216        req: S3Request<GetPublicAccessBlockInput>,
5217    ) -> S3Result<S3Response<GetPublicAccessBlockOutput>> {
5218        self.backend.get_public_access_block(req).await
5219    }
5220    async fn put_public_access_block(
5221        &self,
5222        req: S3Request<PutPublicAccessBlockInput>,
5223    ) -> S3Result<S3Response<PutPublicAccessBlockOutput>> {
5224        self.backend.put_public_access_block(req).await
5225    }
5226    async fn delete_public_access_block(
5227        &self,
5228        req: S3Request<DeletePublicAccessBlockInput>,
5229    ) -> S3Result<S3Response<DeletePublicAccessBlockOutput>> {
5230        self.backend.delete_public_access_block(req).await
5231    }
5232
5233    // ====================================================================
5234    // v0.6 #41: S3 Select — server-side SQL filter on object body.
5235    //
5236    // Fetch the object via the regular `get_object` path (so SSE-C /
5237    // SSE-S4 / SSE-KMS / S4 codec all decompress + decrypt transparently),
5238    // run a small SQL subset (CSV + JSON Lines, equality / inequality /
5239    // LIKE / AND / OR / NOT) over the in-memory body, and stream the
5240    // matched rows back as AWS event-stream `Records` + `Stats` + `End`
5241    // frames.
5242    //
5243    // Limitations (deliberate, documented):
5244    //   - Parquet input is rejected with NotImplemented.
5245    //   - Aggregates / GROUP BY / JOIN / ORDER BY / LIMIT are rejected at
5246    //     parse time as InvalidRequest (s3s 0.13 doesn't expose AWS's
5247    //     domain-specific `InvalidSqlExpression` code).
5248    //   - The body is fully buffered before SQL evaluation (S3 Select
5249    //     streaming-during-evaluation is v0.7 scope).
5250    //   - GPU-accelerated WHERE evaluation is stubbed out (always None).
5251    async fn select_object_content(
5252        &self,
5253        req: S3Request<SelectObjectContentInput>,
5254    ) -> S3Result<S3Response<SelectObjectContentOutput>> {
5255        use crate::select::{
5256            EventStreamWriter, SelectInputFormat, SelectOutputFormat, run_select_csv,
5257            run_select_jsonlines,
5258        };
5259
5260        let select_bucket = req.input.bucket.clone();
5261        let select_key = req.input.key.clone();
5262        self.enforce_rate_limit(&req, &select_bucket)?;
5263        self.enforce_policy(
5264            &req,
5265            "s3:GetObject",
5266            &select_bucket,
5267            Some(&select_key),
5268        )?;
5269
5270        let request = req.input.request;
5271        let sql = request.expression.clone();
5272        if request.expression_type.as_str() != "SQL" {
5273            return Err(S3Error::with_message(
5274                S3ErrorCode::InvalidExpressionType,
5275                format!(
5276                    "ExpressionType must be SQL, got: {}",
5277                    request.expression_type.as_str()
5278                ),
5279            ));
5280        }
5281
5282        let input_format = if let Some(_json) = request.input_serialization.json.as_ref() {
5283            SelectInputFormat::JsonLines
5284        } else if let Some(csv) = request.input_serialization.csv.as_ref() {
5285            let has_header = csv
5286                .file_header_info
5287                .as_ref()
5288                .map(|h| {
5289                    let s = h.as_str();
5290                    s.eq_ignore_ascii_case("USE") || s.eq_ignore_ascii_case("IGNORE")
5291                })
5292                .unwrap_or(false);
5293            let delim = csv
5294                .field_delimiter
5295                .as_deref()
5296                .and_then(|s| s.chars().next())
5297                .unwrap_or(',');
5298            SelectInputFormat::Csv {
5299                has_header,
5300                delimiter: delim,
5301            }
5302        } else if request.input_serialization.parquet.is_some() {
5303            return Err(S3Error::with_message(
5304                S3ErrorCode::NotImplemented,
5305                "Parquet input is not supported by this S3 Select implementation (v0.6: CSV / JSON Lines only)",
5306            ));
5307        } else {
5308            return Err(S3Error::with_message(
5309                S3ErrorCode::InvalidRequest,
5310                "InputSerialization requires exactly one of CSV / JSON / Parquet",
5311            ));
5312        };
5313        if let Some(ct) = request.input_serialization.compression_type.as_ref()
5314            && !ct.as_str().eq_ignore_ascii_case("NONE")
5315        {
5316            return Err(S3Error::with_message(
5317                S3ErrorCode::NotImplemented,
5318                format!(
5319                    "InputSerialization CompressionType={} is not supported (v0.6: NONE only)",
5320                    ct.as_str()
5321                ),
5322            ));
5323        }
5324
5325        let output_format = if request.output_serialization.json.is_some() {
5326            SelectOutputFormat::Json
5327        } else if request.output_serialization.csv.is_some() {
5328            SelectOutputFormat::Csv
5329        } else {
5330            return Err(S3Error::with_message(
5331                S3ErrorCode::InvalidRequest,
5332                "OutputSerialization requires exactly one of CSV / JSON",
5333            ));
5334        };
5335
5336        let get_input = GetObjectInput {
5337            bucket: select_bucket.clone(),
5338            key: select_key.clone(),
5339            sse_customer_algorithm: req.input.sse_customer_algorithm.clone(),
5340            sse_customer_key: req.input.sse_customer_key.clone(),
5341            sse_customer_key_md5: req.input.sse_customer_key_md5.clone(),
5342            ..Default::default()
5343        };
5344        let get_req = S3Request {
5345            input: get_input,
5346            method: http::Method::GET,
5347            uri: format!("/{}/{}", select_bucket, select_key)
5348                .parse()
5349                .map_err(|e| {
5350                    S3Error::with_message(
5351                        S3ErrorCode::InternalError,
5352                        format!("constructing inner GET URI: {e}"),
5353                    )
5354                })?,
5355            headers: http::HeaderMap::new(),
5356            extensions: http::Extensions::new(),
5357            credentials: req.credentials.clone(),
5358            region: req.region.clone(),
5359            service: req.service.clone(),
5360            trailing_headers: None,
5361        };
5362        let mut get_resp = self.get_object(get_req).await?;
5363        let blob = get_resp.output.body.take().ok_or_else(|| {
5364            S3Error::with_message(
5365                S3ErrorCode::InternalError,
5366                "Select: object body was empty after GET",
5367            )
5368        })?;
5369        let body_bytes = crate::blob::collect_blob(blob, self.max_body_bytes)
5370            .await
5371            .map_err(internal("collect Select body"))?;
5372        let scanned = body_bytes.len() as u64;
5373
5374        let matched_payload = match input_format {
5375            SelectInputFormat::JsonLines => {
5376                run_select_jsonlines(&sql, &body_bytes, output_format).map_err(
5377                    |e| select_error_to_s3(e, "JSON Lines"),
5378                )?
5379            }
5380            SelectInputFormat::Csv { .. } => {
5381                run_select_csv(&sql, &body_bytes, input_format, output_format)
5382                    .map_err(|e| select_error_to_s3(e, "CSV"))?
5383            }
5384        };
5385
5386        let returned = matched_payload.len() as u64;
5387        let processed = scanned;
5388        let mut events: Vec<S3Result<SelectObjectContentEvent>> = Vec::with_capacity(3);
5389        if !matched_payload.is_empty() {
5390            events.push(Ok(SelectObjectContentEvent::Records(RecordsEvent {
5391                payload: Some(bytes::Bytes::from(matched_payload)),
5392            })));
5393        }
5394        events.push(Ok(SelectObjectContentEvent::Stats(StatsEvent {
5395            details: Some(Stats {
5396                bytes_scanned: Some(scanned as i64),
5397                bytes_processed: Some(processed as i64),
5398                bytes_returned: Some(returned as i64),
5399            }),
5400        })));
5401        events.push(Ok(SelectObjectContentEvent::End(EndEvent {})));
5402        // Touch EventStreamWriter so the public API stays linked into the
5403        // build (the actual wire framing is delegated to s3s).
5404        let _writer = EventStreamWriter::new();
5405
5406        let stream =
5407            SelectObjectContentEventStream::new(futures::stream::iter(events));
5408        let output = SelectObjectContentOutput {
5409            payload: Some(stream),
5410        };
5411        Ok(S3Response::new(output))
5412    }
5413
5414    // ---- Bucket Inventory configuration (v0.6 #36) ----
5415    //
5416    // When an `InventoryManager` is attached, S4-server owns the
5417    // configuration store and these handlers no longer pass through to
5418    // the backend. The mapping between the s3s-typed
5419    // `InventoryConfiguration` and the inventory module's internal
5420    // `InventoryConfig` is intentionally lossy: only the fields S4
5421    // actually uses for periodic CSV emission survive the round trip
5422    // (id, source bucket, destination bucket / prefix, format, included
5423    // versions, schedule frequency). Optional fields, encryption, and
5424    // filter prefixes are accepted on PUT and re-surfaced on GET via
5425    // a best-effort default-shape `InventoryConfiguration` so the
5426    // client sees a roundtrip-clean response.
5427    async fn put_bucket_inventory_configuration(
5428        &self,
5429        req: S3Request<PutBucketInventoryConfigurationInput>,
5430    ) -> S3Result<S3Response<PutBucketInventoryConfigurationOutput>> {
5431        if let Some(mgr) = self.inventory.as_ref() {
5432            let cfg = inv_from_dto(
5433                &req.input.bucket,
5434                &req.input.id,
5435                &req.input.inventory_configuration,
5436            );
5437            mgr.put(cfg);
5438            return Ok(S3Response::new(PutBucketInventoryConfigurationOutput::default()));
5439        }
5440        self.backend.put_bucket_inventory_configuration(req).await
5441    }
5442
5443    async fn get_bucket_inventory_configuration(
5444        &self,
5445        req: S3Request<GetBucketInventoryConfigurationInput>,
5446    ) -> S3Result<S3Response<GetBucketInventoryConfigurationOutput>> {
5447        if let Some(mgr) = self.inventory.as_ref() {
5448            let cfg = mgr.get(&req.input.bucket, &req.input.id);
5449            if let Some(cfg) = cfg {
5450                let out = GetBucketInventoryConfigurationOutput {
5451                    inventory_configuration: Some(inv_to_dto(&cfg)),
5452                };
5453                return Ok(S3Response::new(out));
5454            }
5455            // AWS returns `NoSuchConfiguration` (404) when the id has no
5456            // matching inventory configuration on the bucket. The
5457            // generated `S3ErrorCode` enum doesn't expose a typed variant
5458            // for this code, so we round-trip through `from_bytes` which
5459            // wraps unknown codes as `Custom(...)` (= the AWS-canonical
5460            // error-code string survives into the XML response envelope).
5461            let code = S3ErrorCode::from_bytes(b"NoSuchConfiguration")
5462                .unwrap_or(S3ErrorCode::NoSuchKey);
5463            return Err(S3Error::with_message(
5464                code,
5465                format!(
5466                    "no inventory configuration with id={} on bucket={}",
5467                    req.input.id, req.input.bucket
5468                ),
5469            ));
5470        }
5471        self.backend.get_bucket_inventory_configuration(req).await
5472    }
5473
5474    async fn list_bucket_inventory_configurations(
5475        &self,
5476        req: S3Request<ListBucketInventoryConfigurationsInput>,
5477    ) -> S3Result<S3Response<ListBucketInventoryConfigurationsOutput>> {
5478        if let Some(mgr) = self.inventory.as_ref() {
5479            let list = mgr.list_for_bucket(&req.input.bucket);
5480            let dto_list: Vec<InventoryConfiguration> = list.iter().map(inv_to_dto).collect();
5481            let out = ListBucketInventoryConfigurationsOutput {
5482                continuation_token: req.input.continuation_token.clone(),
5483                inventory_configuration_list: if dto_list.is_empty() {
5484                    None
5485                } else {
5486                    Some(dto_list)
5487                },
5488                is_truncated: Some(false),
5489                next_continuation_token: None,
5490            };
5491            return Ok(S3Response::new(out));
5492        }
5493        self.backend.list_bucket_inventory_configurations(req).await
5494    }
5495
5496    async fn delete_bucket_inventory_configuration(
5497        &self,
5498        req: S3Request<DeleteBucketInventoryConfigurationInput>,
5499    ) -> S3Result<S3Response<DeleteBucketInventoryConfigurationOutput>> {
5500        if let Some(mgr) = self.inventory.as_ref() {
5501            mgr.delete(&req.input.bucket, &req.input.id);
5502            return Ok(S3Response::new(
5503                DeleteBucketInventoryConfigurationOutput::default(),
5504            ));
5505        }
5506        self.backend.delete_bucket_inventory_configuration(req).await
5507    }
5508}
5509
5510// ---------------------------------------------------------------------------
5511// v0.6 #36: Convert between the s3s-typed `InventoryConfiguration` (the wire
5512// surface) and our internal `crate::inventory::InventoryConfig`. Only the
5513// fields S4 actually uses for CSV emission survive the round trip; the
5514// missing fields (filter prefix, optional fields, encryption) are dropped on
5515// PUT and re-rendered as the AWS-default shape on GET so the client sees a
5516// well-formed `InventoryConfiguration`.
5517// ---------------------------------------------------------------------------
5518
5519fn inv_from_dto(
5520    bucket: &str,
5521    id: &str,
5522    dto: &InventoryConfiguration,
5523) -> crate::inventory::InventoryConfig {
5524    let frequency_hours = match dto.schedule.frequency.as_str() {
5525        "Weekly" => 24 * 7,
5526        // Daily is the default; anything S4 doesn't recognise (incl.
5527        // empty, which is the s3s-default) maps to Daily so the
5528        // operator's PUT doesn't silently turn into a no-op cadence.
5529        _ => 24,
5530    };
5531    // Parquet/ORC are not supported (issue #36 scope); we still accept
5532    // the PUT so callers don't fail-loud, but we record CSV and rely on
5533    // the operator catching the discrepancy on GET.
5534    let format = crate::inventory::InventoryFormat::Csv;
5535    crate::inventory::InventoryConfig {
5536        id: id.to_owned(),
5537        bucket: bucket.to_owned(),
5538        destination_bucket: dto.destination.s3_bucket_destination.bucket.clone(),
5539        destination_prefix: dto
5540            .destination
5541            .s3_bucket_destination
5542            .prefix
5543            .clone()
5544            .unwrap_or_default(),
5545        frequency_hours,
5546        format,
5547        included_object_versions: crate::inventory::IncludedVersions::from_aws_str(
5548            dto.included_object_versions.as_str(),
5549        ),
5550    }
5551}
5552
5553fn inv_to_dto(cfg: &crate::inventory::InventoryConfig) -> InventoryConfiguration {
5554    InventoryConfiguration {
5555        id: cfg.id.clone(),
5556        is_enabled: true,
5557        included_object_versions: InventoryIncludedObjectVersions::from(
5558            cfg.included_object_versions.as_aws_str().to_owned(),
5559        ),
5560        destination: InventoryDestination {
5561            s3_bucket_destination: InventoryS3BucketDestination {
5562                account_id: None,
5563                bucket: cfg.destination_bucket.clone(),
5564                encryption: None,
5565                format: InventoryFormat::from(cfg.format.as_aws_str().to_owned()),
5566                prefix: if cfg.destination_prefix.is_empty() {
5567                    None
5568                } else {
5569                    Some(cfg.destination_prefix.clone())
5570                },
5571            },
5572        },
5573        schedule: InventorySchedule {
5574            // `frequency_hours == 168` -> Weekly; everything else maps to
5575            // Daily for the wire response (the manager keeps the precise
5576            // hour count internally for due-checking).
5577            frequency: InventoryFrequency::from(
5578                if cfg.frequency_hours == 24 * 7 {
5579                    "Weekly"
5580                } else {
5581                    "Daily"
5582                }
5583                .to_owned(),
5584            ),
5585        },
5586        filter: None,
5587        optional_fields: None,
5588    }
5589}
5590
5591// ---------------------------------------------------------------------------
5592// v0.6 #35: Convert between the s3s-typed `NotificationConfiguration` (the
5593// wire surface) and our internal `crate::notifications::NotificationConfig`.
5594//
5595// We support TopicConfiguration (-> Destination::Sns) and QueueConfiguration
5596// (-> Destination::Sqs). LambdaFunction and EventBridge configurations are
5597// silently dropped on PUT (out of scope for v0.6 #35); the GET response only
5598// surfaces topic / queue rules.
5599//
5600// The webhook destination has no AWS-native wire form: operators configure
5601// webhooks via the JSON snapshot file (`--notifications-state-file`) or by
5602// poking `NotificationManager::put` directly from a custom binary. This
5603// keeps the wire surface AWS-compatible while still letting the always-
5604// available `Webhook` destination be reachable.
5605// ---------------------------------------------------------------------------
5606
5607fn notif_from_dto(
5608    dto: &NotificationConfiguration,
5609) -> crate::notifications::NotificationConfig {
5610    let mut rules: Vec<crate::notifications::NotificationRule> = Vec::new();
5611    if let Some(topics) = dto.topic_configurations.as_ref() {
5612        for (idx, t) in topics.iter().enumerate() {
5613            let events = events_from_dto(&t.events);
5614            let (prefix, suffix) = filter_from_dto(t.filter.as_ref());
5615            rules.push(crate::notifications::NotificationRule {
5616                id: t.id.clone().unwrap_or_else(|| format!("topic-{idx}")),
5617                events,
5618                destination: crate::notifications::Destination::Sns {
5619                    topic_arn: t.topic_arn.clone(),
5620                },
5621                filter_prefix: prefix,
5622                filter_suffix: suffix,
5623            });
5624        }
5625    }
5626    if let Some(queues) = dto.queue_configurations.as_ref() {
5627        for (idx, q) in queues.iter().enumerate() {
5628            let events = events_from_dto(&q.events);
5629            let (prefix, suffix) = filter_from_dto(q.filter.as_ref());
5630            rules.push(crate::notifications::NotificationRule {
5631                id: q.id.clone().unwrap_or_else(|| format!("queue-{idx}")),
5632                events,
5633                destination: crate::notifications::Destination::Sqs {
5634                    queue_arn: q.queue_arn.clone(),
5635                },
5636                filter_prefix: prefix,
5637                filter_suffix: suffix,
5638            });
5639        }
5640    }
5641    crate::notifications::NotificationConfig { rules }
5642}
5643
5644fn notif_to_dto(
5645    cfg: &crate::notifications::NotificationConfig,
5646) -> NotificationConfiguration {
5647    let mut topics: Vec<TopicConfiguration> = Vec::new();
5648    let mut queues: Vec<QueueConfiguration> = Vec::new();
5649    for rule in &cfg.rules {
5650        let events: Vec<Event> = rule
5651            .events
5652            .iter()
5653            .map(|e| Event::from(e.as_aws_str().to_owned()))
5654            .collect();
5655        let filter = filter_to_dto(rule.filter_prefix.as_deref(), rule.filter_suffix.as_deref());
5656        match &rule.destination {
5657            crate::notifications::Destination::Sns { topic_arn } => {
5658                topics.push(TopicConfiguration {
5659                    events,
5660                    filter,
5661                    id: Some(rule.id.clone()),
5662                    topic_arn: topic_arn.clone(),
5663                });
5664            }
5665            crate::notifications::Destination::Sqs { queue_arn } => {
5666                queues.push(QueueConfiguration {
5667                    events,
5668                    filter,
5669                    id: Some(rule.id.clone()),
5670                    queue_arn: queue_arn.clone(),
5671                });
5672            }
5673            // Webhook destinations have no AWS wire equivalent — they
5674            // round-trip through the JSON snapshot only. Skip them on the
5675            // GET surface (an SDK consumer wouldn't know what to do with
5676            // them anyway).
5677            crate::notifications::Destination::Webhook { .. } => {}
5678        }
5679    }
5680    NotificationConfiguration {
5681        event_bridge_configuration: None,
5682        lambda_function_configurations: None,
5683        queue_configurations: if queues.is_empty() { None } else { Some(queues) },
5684        topic_configurations: if topics.is_empty() { None } else { Some(topics) },
5685    }
5686}
5687
5688fn events_from_dto(events: &[Event]) -> Vec<crate::notifications::EventType> {
5689    events
5690        .iter()
5691        .filter_map(|e| crate::notifications::EventType::from_aws_str(e.as_ref()))
5692        .collect()
5693}
5694
5695fn filter_from_dto(
5696    f: Option<&NotificationConfigurationFilter>,
5697) -> (Option<String>, Option<String>) {
5698    let Some(f) = f else {
5699        return (None, None);
5700    };
5701    let Some(key) = f.key.as_ref() else {
5702        return (None, None);
5703    };
5704    let Some(rules) = key.filter_rules.as_ref() else {
5705        return (None, None);
5706    };
5707    let mut prefix = None;
5708    let mut suffix = None;
5709    for r in rules {
5710        let name = r.name.as_ref().map(|n| n.as_str().to_ascii_lowercase());
5711        let value = r.value.clone();
5712        match name.as_deref() {
5713            Some("prefix") => prefix = value,
5714            Some("suffix") => suffix = value,
5715            _ => {}
5716        }
5717    }
5718    (prefix, suffix)
5719}
5720
5721fn filter_to_dto(
5722    prefix: Option<&str>,
5723    suffix: Option<&str>,
5724) -> Option<NotificationConfigurationFilter> {
5725    if prefix.is_none() && suffix.is_none() {
5726        return None;
5727    }
5728    let mut rules: Vec<FilterRule> = Vec::new();
5729    if let Some(p) = prefix {
5730        rules.push(FilterRule {
5731            name: Some(FilterRuleName::from("prefix".to_owned())),
5732            value: Some(p.to_owned()),
5733        });
5734    }
5735    if let Some(s) = suffix {
5736        rules.push(FilterRule {
5737            name: Some(FilterRuleName::from("suffix".to_owned())),
5738            value: Some(s.to_owned()),
5739        });
5740    }
5741    Some(NotificationConfigurationFilter {
5742        key: Some(S3KeyFilter {
5743            filter_rules: Some(rules),
5744        }),
5745    })
5746}
5747
5748// ---------------------------------------------------------------------------
5749// v0.6 #40: Convert between the s3s-typed `ReplicationConfiguration` (the
5750// wire surface) and our internal `crate::replication::ReplicationConfig`.
5751// AWS's `ReplicationRuleFilter` is a sum type — `Prefix | Tag | And { Prefix,
5752// Tags }`; we flatten it into the single `(prefix, tag-vec)` representation
5753// the matcher needs. Sub-blocks v0.6 #40 does not implement
5754// (DeleteMarkerReplication / SourceSelectionCriteria / ReplicationTime /
5755// Metrics / EncryptionConfiguration) round-trip as `None` on GET — operators
5756// who set them on PUT see them silently dropped, mirroring "feature not
5757// supported in this release" semantics.
5758// ---------------------------------------------------------------------------
5759
5760fn replication_from_dto(
5761    dto: &ReplicationConfiguration,
5762) -> crate::replication::ReplicationConfig {
5763    let rules = dto
5764        .rules
5765        .iter()
5766        .enumerate()
5767        .map(|(idx, r)| {
5768            let id = r
5769                .id
5770                .as_ref()
5771                .map(|s| s.as_str().to_owned())
5772                .unwrap_or_else(|| format!("rule-{idx}"));
5773            let priority = r.priority.unwrap_or(0).max(0) as u32;
5774            let status_enabled = r.status.as_str() == ReplicationRuleStatus::ENABLED;
5775            let filter = replication_filter_from_dto(r.filter.as_ref(), r.prefix.as_deref());
5776            let destination_bucket = r.destination.bucket.clone();
5777            let destination_storage_class = r
5778                .destination
5779                .storage_class
5780                .as_ref()
5781                .map(|s| s.as_str().to_owned());
5782            crate::replication::ReplicationRule {
5783                id,
5784                priority,
5785                status_enabled,
5786                filter,
5787                destination_bucket,
5788                destination_storage_class,
5789            }
5790        })
5791        .collect();
5792    crate::replication::ReplicationConfig {
5793        role: dto.role.clone(),
5794        rules,
5795    }
5796}
5797
5798fn replication_to_dto(
5799    cfg: &crate::replication::ReplicationConfig,
5800) -> ReplicationConfiguration {
5801    let rules = cfg
5802        .rules
5803        .iter()
5804        .map(|r| {
5805            let status = if r.status_enabled {
5806                ReplicationRuleStatus::from_static(ReplicationRuleStatus::ENABLED)
5807            } else {
5808                ReplicationRuleStatus::from_static(ReplicationRuleStatus::DISABLED)
5809            };
5810            let destination = Destination {
5811                access_control_translation: None,
5812                account: None,
5813                bucket: r.destination_bucket.clone(),
5814                encryption_configuration: None,
5815                metrics: None,
5816                replication_time: None,
5817                storage_class: r
5818                    .destination_storage_class
5819                    .as_ref()
5820                    .map(|s| StorageClass::from(s.clone())),
5821            };
5822            let filter = Some(replication_filter_to_dto(&r.filter));
5823            ReplicationRule {
5824                delete_marker_replication: None,
5825                destination,
5826                existing_object_replication: None,
5827                filter,
5828                id: Some(r.id.clone()),
5829                prefix: None,
5830                priority: Some(r.priority as i32),
5831                source_selection_criteria: None,
5832                status,
5833            }
5834        })
5835        .collect();
5836    ReplicationConfiguration {
5837        role: cfg.role.clone(),
5838        rules,
5839    }
5840}
5841
5842fn replication_filter_from_dto(
5843    f: Option<&ReplicationRuleFilter>,
5844    rule_level_prefix: Option<&str>,
5845) -> crate::replication::ReplicationFilter {
5846    let mut prefix: Option<String> = rule_level_prefix.map(str::to_owned);
5847    let mut tags: Vec<(String, String)> = Vec::new();
5848    if let Some(f) = f {
5849        if let Some(p) = f.prefix.as_ref()
5850            && prefix.is_none()
5851        {
5852            prefix = Some(p.clone());
5853        }
5854        if let Some(t) = f.tag.as_ref()
5855            && let (Some(k), Some(v)) = (t.key.as_ref(), t.value.as_ref())
5856        {
5857            tags.push((k.clone(), v.clone()));
5858        }
5859        if let Some(and) = f.and.as_ref() {
5860            if let Some(p) = and.prefix.as_ref()
5861                && prefix.is_none()
5862            {
5863                prefix = Some(p.clone());
5864            }
5865            if let Some(ts) = and.tags.as_ref() {
5866                for t in ts {
5867                    if let (Some(k), Some(v)) = (t.key.as_ref(), t.value.as_ref()) {
5868                        tags.push((k.clone(), v.clone()));
5869                    }
5870                }
5871            }
5872        }
5873    }
5874    crate::replication::ReplicationFilter { prefix, tags }
5875}
5876
5877fn replication_filter_to_dto(
5878    f: &crate::replication::ReplicationFilter,
5879) -> ReplicationRuleFilter {
5880    if f.tags.is_empty() {
5881        ReplicationRuleFilter {
5882            and: None,
5883            prefix: f.prefix.clone(),
5884            tag: None,
5885        }
5886    } else if f.tags.len() == 1 && f.prefix.is_none() {
5887        let (k, v) = &f.tags[0];
5888        ReplicationRuleFilter {
5889            and: None,
5890            prefix: None,
5891            tag: Some(Tag {
5892                key: Some(k.clone()),
5893                value: Some(v.clone()),
5894            }),
5895        }
5896    } else {
5897        let tags: Vec<Tag> = f
5898            .tags
5899            .iter()
5900            .map(|(k, v)| Tag {
5901                key: Some(k.clone()),
5902                value: Some(v.clone()),
5903            })
5904            .collect();
5905        ReplicationRuleFilter {
5906            and: Some(ReplicationRuleAndOperator {
5907                prefix: f.prefix.clone(),
5908                tags: Some(tags),
5909            }),
5910            prefix: None,
5911            tag: None,
5912        }
5913    }
5914}
5915
5916// ---------------------------------------------------------------------------
5917// v0.6 #37: Convert between the s3s-typed `BucketLifecycleConfiguration`
5918// (the wire surface) and our internal `crate::lifecycle::LifecycleConfig`.
5919// The internal representation flattens AWS's "Filter | And" disjunction
5920// into a single `LifecycleFilter` struct of optional fields plus a tag
5921// vector. Fields S4's evaluator does not consume
5922// (`expired_object_delete_marker`, `noncurrent_version_transitions`,
5923// `transition_default_minimum_object_size`, the storage class on the
5924// noncurrent expiration) are dropped on PUT and re-rendered as their
5925// AWS-default shape on GET so the client always sees a well-formed
5926// configuration.
5927// ---------------------------------------------------------------------------
5928
5929fn dto_lifecycle_to_internal(
5930    dto: &BucketLifecycleConfiguration,
5931) -> crate::lifecycle::LifecycleConfig {
5932    crate::lifecycle::LifecycleConfig {
5933        rules: dto.rules.iter().map(dto_rule_to_internal).collect(),
5934    }
5935}
5936
5937fn dto_rule_to_internal(rule: &LifecycleRule) -> crate::lifecycle::LifecycleRule {
5938    let status = crate::lifecycle::LifecycleStatus::from_aws_str(rule.status.as_str());
5939    let filter = rule
5940        .filter
5941        .as_ref()
5942        .map(dto_filter_to_internal)
5943        .unwrap_or_default();
5944    let expiration_days = rule
5945        .expiration
5946        .as_ref()
5947        .and_then(|e| e.days)
5948        .and_then(|d| u32::try_from(d).ok());
5949    let expiration_date = rule
5950        .expiration
5951        .as_ref()
5952        .and_then(|e| e.date.as_ref())
5953        .and_then(timestamp_to_chrono_utc);
5954    let transitions: Vec<crate::lifecycle::TransitionRule> = rule
5955        .transitions
5956        .as_ref()
5957        .map(|ts| {
5958            ts.iter()
5959                .filter_map(|t| {
5960                    let days = u32::try_from(t.days?).ok()?;
5961                    let storage_class = t.storage_class.as_ref()?.as_str().to_owned();
5962                    Some(crate::lifecycle::TransitionRule {
5963                        days,
5964                        storage_class,
5965                    })
5966                })
5967                .collect()
5968        })
5969        .unwrap_or_default();
5970    let noncurrent_version_expiration_days = rule
5971        .noncurrent_version_expiration
5972        .as_ref()
5973        .and_then(|n| n.noncurrent_days)
5974        .and_then(|d| u32::try_from(d).ok());
5975    let abort_incomplete_multipart_upload_days = rule
5976        .abort_incomplete_multipart_upload
5977        .as_ref()
5978        .and_then(|a| a.days_after_initiation)
5979        .and_then(|d| u32::try_from(d).ok());
5980    crate::lifecycle::LifecycleRule {
5981        id: rule.id.clone().unwrap_or_default(),
5982        status,
5983        filter,
5984        expiration_days,
5985        expiration_date,
5986        transitions,
5987        noncurrent_version_expiration_days,
5988        abort_incomplete_multipart_upload_days,
5989    }
5990}
5991
5992fn dto_filter_to_internal(filter: &LifecycleRuleFilter) -> crate::lifecycle::LifecycleFilter {
5993    let mut prefix = filter.prefix.clone();
5994    let mut tags: Vec<(String, String)> = Vec::new();
5995    let mut size_gt: Option<u64> = filter
5996        .object_size_greater_than
5997        .and_then(|n| u64::try_from(n).ok());
5998    let mut size_lt: Option<u64> = filter
5999        .object_size_less_than
6000        .and_then(|n| u64::try_from(n).ok());
6001    if let Some(t) = &filter.tag
6002        && let (Some(k), Some(v)) = (t.key.as_ref(), t.value.as_ref())
6003    {
6004        tags.push((k.clone(), v.clone()));
6005    }
6006    if let Some(and) = &filter.and {
6007        if prefix.is_none() {
6008            prefix = and.prefix.clone();
6009        }
6010        if size_gt.is_none() {
6011            size_gt = and
6012                .object_size_greater_than
6013                .and_then(|n| u64::try_from(n).ok());
6014        }
6015        if size_lt.is_none() {
6016            size_lt = and
6017                .object_size_less_than
6018                .and_then(|n| u64::try_from(n).ok());
6019        }
6020        if let Some(ts) = &and.tags {
6021            for t in ts {
6022                if let (Some(k), Some(v)) = (t.key.as_ref(), t.value.as_ref()) {
6023                    tags.push((k.clone(), v.clone()));
6024                }
6025            }
6026        }
6027    }
6028    crate::lifecycle::LifecycleFilter {
6029        prefix,
6030        tags,
6031        object_size_greater_than: size_gt,
6032        object_size_less_than: size_lt,
6033    }
6034}
6035
6036fn internal_rule_to_dto(rule: &crate::lifecycle::LifecycleRule) -> LifecycleRule {
6037    let expiration = if rule.expiration_days.is_some() || rule.expiration_date.is_some() {
6038        Some(LifecycleExpiration {
6039            date: rule.expiration_date.map(chrono_utc_to_timestamp),
6040            days: rule.expiration_days.map(|d| d as i32),
6041            expired_object_delete_marker: None,
6042        })
6043    } else {
6044        None
6045    };
6046    let transitions: Option<TransitionList> = if rule.transitions.is_empty() {
6047        None
6048    } else {
6049        Some(
6050            rule.transitions
6051                .iter()
6052                .map(|t| Transition {
6053                    date: None,
6054                    days: Some(t.days as i32),
6055                    storage_class: Some(TransitionStorageClass::from(t.storage_class.clone())),
6056                })
6057                .collect(),
6058        )
6059    };
6060    let noncurrent_version_expiration =
6061        rule.noncurrent_version_expiration_days
6062            .map(|d| NoncurrentVersionExpiration {
6063                newer_noncurrent_versions: None,
6064                noncurrent_days: Some(d as i32),
6065            });
6066    let abort_incomplete_multipart_upload =
6067        rule.abort_incomplete_multipart_upload_days
6068            .map(|d| AbortIncompleteMultipartUpload {
6069                days_after_initiation: Some(d as i32),
6070            });
6071    let filter = if rule.filter.tags.is_empty()
6072        && rule.filter.object_size_greater_than.is_none()
6073        && rule.filter.object_size_less_than.is_none()
6074    {
6075        rule.filter.prefix.as_ref().map(|p| LifecycleRuleFilter {
6076            and: None,
6077            object_size_greater_than: None,
6078            object_size_less_than: None,
6079            prefix: Some(p.clone()),
6080            tag: None,
6081        })
6082    } else if rule.filter.tags.len() == 1
6083        && rule.filter.prefix.is_none()
6084        && rule.filter.object_size_greater_than.is_none()
6085        && rule.filter.object_size_less_than.is_none()
6086    {
6087        let (k, v) = rule.filter.tags[0].clone();
6088        Some(LifecycleRuleFilter {
6089            and: None,
6090            object_size_greater_than: None,
6091            object_size_less_than: None,
6092            prefix: None,
6093            tag: Some(Tag {
6094                key: Some(k),
6095                value: Some(v),
6096            }),
6097        })
6098    } else {
6099        let tags = if rule.filter.tags.is_empty() {
6100            None
6101        } else {
6102            Some(
6103                rule.filter
6104                    .tags
6105                    .iter()
6106                    .map(|(k, v)| Tag {
6107                        key: Some(k.clone()),
6108                        value: Some(v.clone()),
6109                    })
6110                    .collect(),
6111            )
6112        };
6113        Some(LifecycleRuleFilter {
6114            and: Some(LifecycleRuleAndOperator {
6115                object_size_greater_than: rule
6116                    .filter
6117                    .object_size_greater_than
6118                    .and_then(|n| i64::try_from(n).ok()),
6119                object_size_less_than: rule
6120                    .filter
6121                    .object_size_less_than
6122                    .and_then(|n| i64::try_from(n).ok()),
6123                prefix: rule.filter.prefix.clone(),
6124                tags,
6125            }),
6126            object_size_greater_than: None,
6127            object_size_less_than: None,
6128            prefix: None,
6129            tag: None,
6130        })
6131    };
6132    LifecycleRule {
6133        abort_incomplete_multipart_upload,
6134        expiration,
6135        filter,
6136        id: if rule.id.is_empty() {
6137            None
6138        } else {
6139            Some(rule.id.clone())
6140        },
6141        noncurrent_version_expiration,
6142        noncurrent_version_transitions: None,
6143        prefix: None,
6144        status: ExpirationStatus::from(rule.status.as_aws_str().to_owned()),
6145        transitions,
6146    }
6147}
6148
6149// (timestamp <-> chrono helpers `timestamp_to_chrono_utc` /
6150// `chrono_utc_to_timestamp` are defined earlier in this file for the
6151// tagging/notifications work; the lifecycle DTO converters reuse them.)
6152
6153// ---------------------------------------------------------------------------
6154// v0.5 #33: SigV4a (asymmetric ECDSA-P256) integration hook.
6155//
6156// Kept as a self-contained block at the bottom of the file so it doesn't
6157// touch the existing `S4Service` struct, `new()`, or any of the per-op
6158// handlers above. The hook is wired in by the binary at server-build time
6159// as a hyper middleware layer (see `main.rs`), NOT inside `S4Service`.
6160//
6161// Lifecycle:
6162//   1. `SigV4aGate::new(store)` is constructed once at boot from the
6163//      operator-supplied credential directory.
6164//   2. For each incoming request, `SigV4aGate::pre_route(&req,
6165//      &requested_region, &canonical_request_bytes)` is invoked BEFORE
6166//      the request hits the S3 framework. If the request claims SigV4a
6167//      and verifies, control returns to the framework. Otherwise a 403
6168//      `SignatureDoesNotMatch` is produced.
6169//   3. Plain SigV4 (HMAC-SHA256) requests pass through untouched.
6170// ---------------------------------------------------------------------------
6171
6172/// Gate that fronts the S3 service path with SigV4a verification (v0.5 #33).
6173///
6174/// Wraps a [`crate::sigv4a::SigV4aCredentialStore`] and exposes a single
6175/// `pre_route` entry point that returns `Ok(())` for both
6176/// "request is plain SigV4 — pass through" and "request is SigV4a and
6177/// verified", and an `Err(...)` containing a 403-equivalent diagnostic
6178/// otherwise. Cheap to clone (the inner store is `Arc`-backed).
6179#[derive(Debug, Clone)]
6180pub struct SigV4aGate {
6181    store: crate::sigv4a::SharedSigV4aCredentialStore,
6182}
6183
6184impl SigV4aGate {
6185    #[must_use]
6186    pub fn new(store: crate::sigv4a::SharedSigV4aCredentialStore) -> Self {
6187        Self { store }
6188    }
6189
6190    /// Inspect an incoming HTTP request. Behaviour:
6191    ///
6192    /// - Not SigV4a (no `X-Amz-Region-Set` and no SigV4a `Authorization`
6193    ///   prefix) → returns `Ok(())`; the framework's existing SigV4
6194    ///   path handles the request.
6195    /// - SigV4a + valid signature + region match → `Ok(())`.
6196    /// - SigV4a + unknown access-key-id → `Err` with `InvalidAccessKeyId`.
6197    /// - SigV4a + bad signature / region mismatch → `Err` with
6198    ///   `SignatureDoesNotMatch`.
6199    ///
6200    /// `canonical_request_bytes` is the SigV4a string-to-sign (or
6201    /// canonical-request bytes; the caller decides) that the framework
6202    /// has already produced for this request. Keeping it as a parameter
6203    /// instead of rebuilding it inside the hook avoids duplicating the
6204    /// canonicalisation logic.
6205    pub fn pre_route<B>(
6206        &self,
6207        req: &http::Request<B>,
6208        requested_region: &str,
6209        canonical_request_bytes: &[u8],
6210    ) -> Result<(), SigV4aGateError> {
6211        if !crate::sigv4a::detect(req) {
6212            return Ok(());
6213        }
6214        let auth_hdr = req
6215            .headers()
6216            .get(http::header::AUTHORIZATION)
6217            .and_then(|v| v.to_str().ok())
6218            .ok_or(SigV4aGateError::MissingAuthorization)?;
6219        let parsed = crate::sigv4a::parse_authorization_header(auth_hdr)
6220            .ok_or(SigV4aGateError::MalformedAuthorization)?;
6221        let region_set = req
6222            .headers()
6223            .get(crate::sigv4a::REGION_SET_HEADER)
6224            .and_then(|v| v.to_str().ok())
6225            .unwrap_or("*");
6226        let key = self
6227            .store
6228            .get(&parsed.access_key_id)
6229            .ok_or_else(|| SigV4aGateError::UnknownAccessKey(parsed.access_key_id.clone()))?;
6230        crate::sigv4a::verify(
6231            &crate::sigv4a::CanonicalRequest::new(canonical_request_bytes),
6232            &parsed.signature_der,
6233            key,
6234            region_set,
6235            requested_region,
6236        )
6237        .map_err(SigV4aGateError::Verify)?;
6238        Ok(())
6239    }
6240}
6241
6242/// Failure modes from [`SigV4aGate::pre_route`]. All variants map to
6243/// HTTP 403 with one of the two AWS-standard error codes
6244/// (`InvalidAccessKeyId` or `SignatureDoesNotMatch`).
6245#[derive(Debug, thiserror::Error)]
6246pub enum SigV4aGateError {
6247    #[error("missing Authorization header")]
6248    MissingAuthorization,
6249    #[error("malformed SigV4a Authorization header")]
6250    MalformedAuthorization,
6251    #[error("unknown SigV4a access-key-id: {0}")]
6252    UnknownAccessKey(String),
6253    #[error("SigV4a verification failed: {0}")]
6254    Verify(#[source] crate::sigv4a::SigV4aError),
6255}
6256
6257impl SigV4aGateError {
6258    /// AWS S3 error code that should accompany a 403 response.
6259    #[must_use]
6260    pub fn s3_error_code(&self) -> &'static str {
6261        match self {
6262            Self::UnknownAccessKey(_) => "InvalidAccessKeyId",
6263            _ => "SignatureDoesNotMatch",
6264        }
6265    }
6266}
6267
6268#[cfg(test)]
6269mod tests {
6270    use super::*;
6271
6272    #[test]
6273    fn manifest_roundtrip_via_metadata() {
6274        let original = ChunkManifest {
6275            codec: CodecKind::CpuZstd,
6276            original_size: 1234,
6277            compressed_size: 567,
6278            crc32c: 0xdead_beef,
6279        };
6280        let mut meta: Option<Metadata> = None;
6281        write_manifest(&mut meta, &original);
6282        let extracted = extract_manifest(&meta).expect("manifest must round-trip");
6283        assert_eq!(extracted.codec, original.codec);
6284        assert_eq!(extracted.original_size, original.original_size);
6285        assert_eq!(extracted.compressed_size, original.compressed_size);
6286        assert_eq!(extracted.crc32c, original.crc32c);
6287    }
6288
6289    #[test]
6290    fn missing_metadata_yields_none() {
6291        let meta: Option<Metadata> = None;
6292        assert!(extract_manifest(&meta).is_none());
6293    }
6294
6295    #[test]
6296    fn partial_metadata_yields_none() {
6297        let mut meta = Metadata::new();
6298        meta.insert(META_CODEC.into(), "cpu-zstd".into());
6299        let opt = Some(meta);
6300        assert!(extract_manifest(&opt).is_none());
6301    }
6302
6303    #[test]
6304    fn parse_copy_source_range_basic() {
6305        let r = parse_copy_source_range("bytes=10-20").unwrap();
6306        match r {
6307            s3s::dto::Range::Int { first, last } => {
6308                assert_eq!(first, 10);
6309                assert_eq!(last, Some(20));
6310            }
6311            _ => panic!("expected Int range"),
6312        }
6313    }
6314
6315    #[test]
6316    fn parse_copy_source_range_rejects_inverted() {
6317        let err = parse_copy_source_range("bytes=20-10").unwrap_err();
6318        assert!(err.contains("last < first"));
6319    }
6320
6321    #[test]
6322    fn parse_copy_source_range_rejects_missing_prefix() {
6323        let err = parse_copy_source_range("10-20").unwrap_err();
6324        assert!(err.contains("must start with 'bytes='"));
6325    }
6326
6327    #[test]
6328    fn parse_copy_source_range_rejects_open_ended() {
6329        // S3 upload_part_copy spec requires N-M (closed); suffix and
6330        // open-ended forms are not allowed for this header.
6331        assert!(parse_copy_source_range("bytes=10-").is_err());
6332        assert!(parse_copy_source_range("bytes=-10").is_err());
6333    }
6334
6335    // v0.7 #49: safe_object_uri must round-trip every legal S3 key
6336    // (which includes spaces, slashes, control chars, raw UTF-8) into
6337    // a parseable `http::Uri` instead of panicking like the previous
6338    // `format!(...).parse().unwrap()` call sites did.
6339
6340    #[test]
6341    fn safe_object_uri_basic_ascii() {
6342        let uri = safe_object_uri("bucket", "key").expect("ascii must be safe");
6343        assert_eq!(uri.path(), "/bucket/key");
6344    }
6345
6346    #[test]
6347    fn safe_object_uri_encodes_spaces() {
6348        let uri = safe_object_uri("bucket", "key with spaces").expect("must encode spaces");
6349        // RFC 3986 path-segment encoding turns ' ' into %20.
6350        assert!(
6351            uri.path().contains("%20"),
6352            "expected percent-encoded space, got {}",
6353            uri.path()
6354        );
6355        assert!(uri.path().starts_with("/bucket/"));
6356    }
6357
6358    #[test]
6359    fn safe_object_uri_preserves_slashes() {
6360        // S3 keys legally contain '/' as a logical path separator —
6361        // the helper must NOT escape it (otherwise the synthetic URI
6362        // changes the perceived hierarchy).
6363        let uri =
6364            safe_object_uri("bucket", "key/with/slashes").expect("slashes must round-trip");
6365        assert_eq!(uri.path(), "/bucket/key/with/slashes");
6366    }
6367
6368    #[test]
6369    fn safe_object_uri_handles_newline_without_panic() {
6370        // Newlines are control chars in URIs; whether the result is
6371        // Ok (encoded as %0A) or Err (parse rejects), the helper
6372        // MUST NOT panic. Either outcome is acceptable.
6373        let _ = safe_object_uri("bucket", "key\n");
6374    }
6375
6376    #[test]
6377    fn safe_object_uri_handles_null_byte_without_panic() {
6378        let _ = safe_object_uri("bucket", "key\0bad");
6379    }
6380
6381    #[test]
6382    fn safe_object_uri_handles_unicode_without_panic() {
6383        // RTL override, BOM, plain Japanese — none should panic.
6384        let _ = safe_object_uri("bucket", "rtl\u{202E}override");
6385        let _ = safe_object_uri("bucket", "\u{FEFF}bom-key");
6386        let _ = safe_object_uri("bucket", "日本語キー");
6387    }
6388
6389    #[test]
6390    fn safe_object_uri_no_panic_for_every_byte() {
6391        // Exhaustive byte coverage: 0x00..=0xFF as a 1-byte key.
6392        // None of these may panic. (0x80..=0xFF are not valid UTF-8
6393        // by themselves; we go through `String::from_utf8_lossy` so
6394        // the helper sees a real `&str` regardless of the raw byte.)
6395        for b in 0u8..=255 {
6396            let s = String::from_utf8_lossy(&[b]).into_owned();
6397            let _ = safe_object_uri("bucket", &s);
6398        }
6399    }
6400
6401    /// v0.8.1 #58: smoke test for the DEK-handling shape used by the
6402    /// SSE-KMS branches of `put_object` and `complete_multipart_upload`.
6403    /// Mirrors the call pattern (generate_dek → length check → copy
6404    /// into stack `[u8; 32]` → reborrow as `&[u8; 32]` for `SseSource`)
6405    /// without spinning up a full `S4Service`.
6406    ///
6407    /// The real assertion this guards against is a regression where
6408    /// the `Zeroizing` wrapper is accidentally dropped before the
6409    /// stack copy lands (e.g. someone refactors to use
6410    /// `let dek = kms.generate_dek(...).await?.0; drop(dek); ...`)
6411    /// or where `&**dek` is rewritten in a way that doesn't compile.
6412    #[tokio::test]
6413    async fn kms_dek_lifetime_within_function_scope() {
6414        use crate::kms::{KmsBackend, LocalKms};
6415        use std::collections::HashMap;
6416        use std::path::PathBuf;
6417        use zeroize::Zeroizing;
6418
6419        let mut keks = HashMap::new();
6420        keks.insert("scope".to_string(), [33u8; 32]);
6421        let kms = LocalKms::from_keks(PathBuf::from("/tmp/kms-scope-test"), keks);
6422
6423        // Mirror the put_object KMS branch shape exactly.
6424        let (dek, wrapped) = kms.generate_dek("scope").await.unwrap();
6425        assert_eq!(dek.len(), 32);
6426        let mut dek_arr: Zeroizing<[u8; 32]> = Zeroizing::new([0u8; 32]);
6427        dek_arr.copy_from_slice(&dek);
6428
6429        // The reborrow used at the SseSource construction site —
6430        // mirrors the call-site pattern where `let dek_ref: &[u8; 32]`
6431        // auto-derefs from a `Zeroizing<[u8; 32]>` reference.
6432        let dek_ref: &[u8; 32] = &dek_arr;
6433        // Sanity: the reborrow points at the same bytes.
6434        assert_eq!(dek_ref, &*dek_arr);
6435        // Wrapped key id flows through unchanged.
6436        assert_eq!(wrapped.key_id, "scope");
6437
6438        // At end of scope, both `dek` (Zeroizing<Vec<u8>>) and
6439        // `dek_arr` (Zeroizing<[u8; 32]>) are dropped, wiping the
6440        // backing memory. Cannot directly assert the wipe (would be
6441        // UB to read freed memory), so this test instead enforces
6442        // that the call shape compiles and executes; the wipe itself
6443        // is exercised by the `zeroize` crate's own test suite.
6444    }
6445}