Skip to main content

s4_server/
service.rs

1//! `s3s::S3` 実装 — `s3s_aws::Proxy` への delegation を default にしつつ、
2//! `put_object` / `get_object` 経路で `s4_codec::CodecRegistry` を呼ぶ。
3//!
4//! ## カバー範囲 (Phase 1 月 2)
5//!
6//! - 圧縮 hook あり: `put_object`, `get_object`
7//! - 純 delegation (圧縮なし): `head_bucket`, `list_buckets`, `create_bucket`, `delete_bucket`,
8//!   `head_object`, `delete_object`, `delete_objects`, `copy_object`, `list_objects`,
9//!   `list_objects_v2`, `create_multipart_upload`, `upload_part`,
10//!   `complete_multipart_upload`, `abort_multipart_upload`, `list_multipart_uploads`,
11//!   `list_parts`
12//! - 未対応 (デフォルトで NotImplemented): その他 80+ ops (Tagging / ACL / Lifecycle 等は Phase 2)
13//!
14//! ## アーキテクチャ
15//!
16//! - `S4Service<B>` は backend (B: S3) と `Arc<CodecRegistry>` と `Arc<dyn CodecDispatcher>`
17//!   を保持する。`CodecRegistry` 経由で複数 codec を抱えられるので、ひとつの S4 インスタンスが
18//!   複数 codec で書かれた object を透過的に GET できる
19//! - PUT: dispatcher が body の先頭 sample から codec を選び、registry で compress、
20//!   manifest を S3 metadata に書いて backend に forward
21//! - GET: backend から取得 → metadata から manifest を復元 → registry.decompress で
22//!   manifest 指定の codec で解凍 → 元の bytes を return
23//!
24//! ## 既知の制限事項
25//!
26//! - **Multipart Upload は per-part 圧縮が未実装**: 現状は upload_part を素通し。
27//!   Phase 1 月 2 後半で per-part compress + complete_multipart_upload で manifest 集約。
28//! - **PUT body は memory に collect**: max_body_bytes 上限あり (default 5 GiB = S3 単発 PUT 上限)。
29//!   Streaming-aware 圧縮は Phase 2。
30
31use std::sync::Arc;
32
33use base64::Engine as _;
34use bytes::BytesMut;
35use s3s::dto::*;
36use s3s::{S3, S3Error, S3ErrorCode, S3Request, S3Response, S3Result};
37use s4_codec::index::{FrameIndex, build_index_from_body, decode_index, encode_index, sidecar_key};
38use s4_codec::multipart::{
39    FRAME_HEADER_BYTES, FrameHeader, FrameIter, S3_MULTIPART_MIN_PART_BYTES, pad_to_minimum,
40    write_frame,
41};
42use s4_codec::{ChunkManifest, CodecDispatcher, CodecKind, CodecRegistry, CompressTelemetry};
43use std::time::Instant;
44use tracing::{debug, info};
45
46use crate::blob::{
47    bytes_to_blob, chain_sample_with_rest, collect_blob, collect_with_sample, peek_sample,
48};
49use crate::streaming::{
50    cpu_zstd_decompress_stream, pick_chunk_size, streaming_compress_to_frames,
51    supports_streaming_compress, supports_streaming_decompress,
52};
53
54/// PUT body の先頭 sampling で渡す最大 byte 数。
55const SAMPLE_BYTES: usize = 4096;
56
57/// v0.8 #55: stamp the GPU pipeline metrics (`s4_gpu_compress_seconds`,
58/// `s4_gpu_throughput_bytes_per_sec`, `s4_gpu_oom_total`) from a
59/// `CompressTelemetry` returned by `CodecRegistry::compress_with_telemetry`.
60/// CPU codecs (`gpu_seconds = None`) are no-ops here — they're already
61/// covered by the existing `s4_request_latency_seconds` / `s4_bytes_*`
62/// counters in the request-level `record_put` / `record_get` calls.
63#[inline]
64fn stamp_gpu_compress_telemetry(tel: &CompressTelemetry) {
65    if let Some(secs) = tel.gpu_seconds {
66        crate::metrics::record_gpu_compress(tel.codec, secs, tel.bytes_in, tel.bytes_out);
67    }
68    if tel.oom {
69        crate::metrics::record_gpu_oom(tel.codec);
70    }
71}
72
73/// v0.7 #49: percent-encoding set covering everything that is **not** an
74/// `unreserved` character per RFC 3986 §2.3, **plus** we additionally
75/// encode the path-reserved sub-delims that `http::Uri` rejects in a
76/// path segment (`?`, `#`, `%`, control bytes, space, etc.). We
77/// deliberately keep `/` un-encoded because S3 keys legally use `/` as
78/// a logical separator and the rest of the synthetic URI relies on the
79/// path layout `/{bucket}/{key}` round-tripping byte-for-byte.
80const URI_KEY_ENCODE_SET: &percent_encoding::AsciiSet = &percent_encoding::CONTROLS
81    .add(b' ')
82    .add(b'"')
83    .add(b'#')
84    .add(b'<')
85    .add(b'>')
86    .add(b'?')
87    .add(b'`')
88    .add(b'{')
89    .add(b'}')
90    .add(b'|')
91    .add(b'\\')
92    .add(b'^')
93    .add(b'[')
94    .add(b']')
95    .add(b'%');
96
97/// v0.7 #49: build the synthetic `/{bucket}/{key}` request URI used by
98/// the sidecar / replication helpers when they re-enter the backend
99/// trait without going through the HTTP layer. S3 object keys can
100/// contain spaces, control bytes, and arbitrary Unicode that would
101/// make `format!(...).parse::<http::Uri>()` panic; we percent-encode
102/// the key bytes (RFC 3986 path segment) and the bucket name (defensive
103/// — bucket names are normally DNS-safe, but the helper is the single
104/// choke-point) before splicing them in. If the encoded form *still*
105/// fails to parse (extremely unlikely once everything outside the
106/// unreserved set is escaped) we surface a typed `400 InvalidObjectName`
107/// instead of crashing the worker.
108pub(crate) fn safe_object_uri(bucket: &str, key: &str) -> S3Result<http::Uri> {
109    use percent_encoding::utf8_percent_encode;
110    let bucket_enc = utf8_percent_encode(bucket, URI_KEY_ENCODE_SET);
111    let key_enc = utf8_percent_encode(key, URI_KEY_ENCODE_SET);
112    let raw = format!("/{bucket_enc}/{key_enc}");
113    raw.parse::<http::Uri>().map_err(|e| {
114        // S3 spec uses `InvalidObjectName` (HTTP 400) for keys that
115        // can't be represented in a request URI. The generated
116        // `S3ErrorCode` enum doesn't expose a typed variant for it,
117        // so we round-trip through `from_bytes` which preserves the
118        // canonical wire string while falling back to InvalidArgument
119        // if even that lookup fails (cannot happen at runtime — kept
120        // as a belt-and-suspenders branch so this helper never
121        // panics).
122        let code = S3ErrorCode::from_bytes(b"InvalidObjectName")
123            .unwrap_or(S3ErrorCode::InvalidArgument);
124        S3Error::with_message(
125            code,
126            format!("object key cannot be encoded as a request URI: {e}"),
127        )
128    })
129}
130
131/// v0.4 #20: captured at the start of a handler, before the request is
132/// consumed by the backend call, so the matching `record_access` at
133/// end-of-request can fill in the structured access log entry.
134struct AccessLogPreamble {
135    remote_ip: Option<String>,
136    requester: Option<String>,
137    request_uri: String,
138    user_agent: Option<String>,
139}
140
141pub struct S4Service<B: S3> {
142    /// Wrapped in `Arc` so the v0.6 #40 cross-bucket replication
143    /// dispatcher can clone it into a detached `tokio::spawn` task
144    /// (Arc::clone is cheap; backend trait methods take `&self` so no
145    /// other handler is affected by the indirection).
146    backend: Arc<B>,
147    registry: Arc<CodecRegistry>,
148    dispatcher: Arc<dyn CodecDispatcher>,
149    max_body_bytes: usize,
150    policy: Option<crate::policy::SharedPolicy>,
151    /// v0.3 #13: surfaced as the `aws:SecureTransport` Condition key. Set
152    /// to `true` when the listener is wrapped in TLS (or ACME), so policies
153    /// gating "deny if not over TLS" can do their job. Defaults to `false`
154    /// (HTTP); set via [`S4Service::with_secure_transport`] at boot.
155    secure_transport: bool,
156    /// v0.4 #19: optional per-(principal, bucket) token-bucket limiter.
157    rate_limits: Option<crate::rate_limit::SharedRateLimits>,
158    /// v0.4 #20: optional S3-style access log emitter.
159    access_log: Option<crate::access_log::SharedAccessLog>,
160    /// v0.4 #21 / v0.5 #29: optional server-side encryption keyring
161    /// (AES-256-GCM). When set, every PUT body gets wrapped in S4E2
162    /// (with the keyring's active key id) after the compress + framing
163    /// steps; every GET that sniffs as S4E1/S4E2 is decrypted before
164    /// frame parsing. A `with_sse_key(...)` call wraps the supplied
165    /// key in a 1-slot keyring so single-key (v0.4) operators get the
166    /// same behaviour they had before, just on the v2 frame.
167    sse_keyring: Option<crate::sse::SharedSseKeyring>,
168    /// v0.5 #34: optional first-class versioning state machine. When
169    /// `Some(...)`, S4-server itself owns the per-bucket versioning
170    /// state + per-(bucket, key) version chain; PUT / GET / DELETE /
171    /// list_object_versions / get_bucket_versioning /
172    /// put_bucket_versioning handlers consult the manager instead of
173    /// passing through. When `None` (default), the legacy
174    /// backend-passthrough behaviour applies so existing v0.4
175    /// deployments are unaffected until they explicitly call
176    /// `with_versioning(...)`.
177    versioning: Option<Arc<crate::versioning::VersioningManager>>,
178    /// v0.5 #28: optional SSE-KMS envelope-encryption backend. When
179    /// `Some(...)`, PUTs carrying `x-amz-server-side-encryption: aws:kms`
180    /// generate a fresh DEK via the backend, encrypt the body with it
181    /// (S4E4 frame), and persist only the wrapped DEK. GETs sniffing as
182    /// S4E4 unwrap the DEK through the same backend before decrypt.
183    /// `kms_default_key_id` is used when the request omits an explicit
184    /// `x-amz-server-side-encryption-aws-kms-key-id` (mirrors AWS S3
185    /// bucket-default behaviour).
186    kms: Option<Arc<dyn crate::kms::KmsBackend>>,
187    kms_default_key_id: Option<String>,
188    /// v0.5 #30: optional Object Lock (WORM) enforcement layer. When
189    /// `Some(...)`, `delete_object` and overwrite-style `put_object`
190    /// consult the manager and refuse the operation with HTTP 403
191    /// `AccessDenied` while the object is locked (Compliance until
192    /// expiry, Governance unless the bypass header is set, or any time
193    /// a legal hold is on). PUT also auto-applies the bucket-default
194    /// retention to brand-new objects when configured. When `None`
195    /// (default), the legacy backend-passthrough behaviour applies, so
196    /// existing v0.4 deployments are unaffected until they explicitly
197    /// call `with_object_lock(...)`.
198    object_lock: Option<Arc<crate::object_lock::ObjectLockManager>>,
199    /// v0.6 #38: optional first-class CORS bucket configuration manager.
200    /// When `Some(...)`, S4-server itself owns per-bucket CORS rules and
201    /// `put_bucket_cors` / `get_bucket_cors` / `delete_bucket_cors`
202    /// consult the manager instead of passing through to the backend.
203    /// `handle_preflight` (public method on `S4Service`) routes OPTIONS-
204    /// style preflight matching through the same store; the actual HTTP
205    /// OPTIONS routing wire-up at the listener level is a follow-up
206    /// (s3s framework does not surface OPTIONS as a typed handler).
207    cors: Option<Arc<crate::cors::CorsManager>>,
208    /// v0.6 #36: optional first-class S3 Inventory manager. When
209    /// `Some(...)`, S4-server itself owns per-(bucket, id) inventory
210    /// configurations and `put_bucket_inventory_configuration` /
211    /// `get_bucket_inventory_configuration` /
212    /// `list_bucket_inventory_configurations` /
213    /// `delete_bucket_inventory_configuration` consult the manager
214    /// instead of passing through to the backend. The actual periodic
215    /// CSV emission is driven by a tokio task in `main.rs` that calls
216    /// `InventoryManager::run_once_for_test` on a fixed cadence; the
217    /// service handlers below only deal with config-level CRUD.
218    inventory: Option<Arc<crate::inventory::InventoryManager>>,
219    /// v0.6 #35: optional first-class S3 bucket-notification manager.
220    /// When `Some(...)`, S4-server itself owns per-bucket notification
221    /// configurations and `put_bucket_notification_configuration` /
222    /// `get_bucket_notification_configuration` consult the manager
223    /// instead of passing through to the backend. Successful PUT /
224    /// DELETE handlers fire matching destinations on a detached tokio
225    /// task (best-effort; see `crate::notifications::dispatch_event`).
226    notifications: Option<Arc<crate::notifications::NotificationManager>>,
227    /// v0.6 #37: optional first-class S3 Lifecycle configuration
228    /// manager. When `Some(...)`, S4-server itself owns per-bucket
229    /// lifecycle rules and `put_bucket_lifecycle_configuration` /
230    /// `get_bucket_lifecycle_configuration` /
231    /// `delete_bucket_lifecycle` consult the manager instead of
232    /// passing through to the backend. The actual background scanner
233    /// (list_objects_v2 -> evaluate -> delete / metadata-rewrite per
234    /// rule) is a v0.7+ follow-up; the test path
235    /// `S4Service::run_lifecycle_once_for_test` exercises the
236    /// evaluator end-to-end so this v0.6 #37 wiring is enough to ship
237    /// the configuration-management half without putting a
238    /// half-wired bucket-walk in front of users.
239    lifecycle: Option<Arc<crate::lifecycle::LifecycleManager>>,
240    /// v0.6 #39: optional first-class object + bucket Tagging manager.
241    /// When `Some(...)`, S4-server itself owns per-(bucket, key) and
242    /// per-bucket tag state — `PutObjectTagging` /
243    /// `GetObjectTagging` / `DeleteObjectTagging` /
244    /// `PutBucketTagging` / `GetBucketTagging` /
245    /// `DeleteBucketTagging` route through the manager (replacing the
246    /// previous backend-passthrough behaviour). `put_object` also
247    /// pre-parses the `x-amz-tagging` header / `Tagging` input field
248    /// so the IAM policy evaluator can gate on
249    /// `s3:RequestObjectTag/<key>` and `s3:ExistingObjectTag/<key>`.
250    /// On a successful PUT the parsed tags are persisted; on a
251    /// successful DELETE the matching tag entry is dropped.
252    tagging: Option<Arc<crate::tagging::TagManager>>,
253    /// v0.6 #40: optional first-class cross-bucket replication manager.
254    /// When `Some(...)`, S4-server itself owns per-bucket replication
255    /// rules; `PutBucketReplication` / `GetBucketReplication` /
256    /// `DeleteBucketReplication` route through the manager (replacing
257    /// the previous backend-passthrough behaviour). On every successful
258    /// `put_object` the manager's rule list is consulted; the
259    /// highest-priority matching enabled rule wins, the per-key status
260    /// is recorded as `Pending`, and the source body and metadata are
261    /// handed to a detached tokio task that PUTs to the destination
262    /// bucket through the same backend. The replica is stamped with
263    /// `x-amz-replication-status: REPLICA` in its metadata; the
264    /// source-side status is updated to `Completed` on success or
265    /// `Failed` after the 3-attempt retry budget is exhausted (drop
266    /// counter bumps in either-side case so dashboards see the loss).
267    /// `head_object` / `get_object` echo the recorded status back as
268    /// `x-amz-replication-status` so consumers can poll progress.
269    /// Limited to single-instance (same `S4Service`) replication; true
270    /// cross-region (multi-instance) is a v0.7+ follow-up.
271    replication: Option<Arc<crate::replication::ReplicationManager>>,
272    /// v0.6 #42: optional MFA-Delete enforcement layer. When `Some(...)`,
273    /// every DELETE / DELETE-version / delete-marker / `PutBucketVersioning`
274    /// request against a bucket whose MFA-Delete state is `Enabled`
275    /// must carry `x-amz-mfa: <serial> <code>` (RFC 6238 6-digit TOTP);
276    /// missing or invalid tokens return HTTP 403 `AccessDenied`. When
277    /// `None` (default), the gate is a no-op so existing v0.4 / v0.5
278    /// deployments are unaffected until they explicitly call
279    /// `with_mfa_delete(...)`.
280    mfa_delete: Option<Arc<crate::mfa::MfaDeleteManager>>,
281    /// v0.5 #32: when `true`, every PUT must carry an SSE indicator
282    /// (`x-amz-server-side-encryption`, the SSE-C customer-key headers,
283    /// or be matched against a configured server-managed keyring/KMS).
284    /// Set by `--compliance-mode strict` after the boot-time
285    /// prerequisite check passes.
286    compliance_strict: bool,
287    /// v0.7 #47: optional SigV4a (asymmetric ECDSA-P256-SHA256) verify
288    /// gate. When `Some(...)`, the listener-side middleware (see
289    /// [`crate::routing::try_sigv4a_verify`]) inspects every incoming
290    /// request and short-circuits SigV4a-signed ones — verifying the
291    /// signature against the credential store and returning 403
292    /// `SignatureDoesNotMatch` / `InvalidAccessKeyId` on failure. Plain
293    /// SigV4 (HMAC-SHA256) requests pass through to s3s untouched. When
294    /// `None`, the middleware is a no-op so the existing SigV4 path is
295    /// unaffected (operators opt in via `--sigv4a-credentials <DIR>`).
296    sigv4a_gate: Option<Arc<SigV4aGate>>,
297    /// v0.8 #54 BUG-5..10: per-`upload_id` side-table that ferries the
298    /// SSE / Tagging / Object-Lock context captured at
299    /// `CreateMultipartUpload` time through to `UploadPart` /
300    /// `CompleteMultipartUpload`. Always-on (no `with_*` flag) — the
301    /// store is gateway-internal and idle when no multipart is in
302    /// flight. See [`crate::multipart_state`] for rationale.
303    multipart_state: Arc<crate::multipart_state::MultipartStateStore>,
304    /// v0.8 #52: plaintext bytes per S4E5 chunk on the SSE-S4 PUT
305    /// path. `0` (default) → use the legacy buffered S4E2 path
306    /// (whole-body AES-GCM tag, GET buffers + verifies before
307    /// emitting). Non-zero → use the chunked S4E5 frame so GET can
308    /// stream-decrypt chunk-by-chunk. Wired by `--sse-chunk-size`
309    /// in `main.rs`. SSE-C and SSE-KMS are intentionally unaffected
310    /// (chunked variants tracked in a follow-up issue).
311    sse_chunk_size: usize,
312}
313
314impl<B: S3> S4Service<B> {
315    /// AWS S3 単発 PUT の API 上限 (5 GiB)
316    pub const DEFAULT_MAX_BODY_BYTES: usize = 5 * 1024 * 1024 * 1024;
317
318    pub fn new(
319        backend: B,
320        registry: Arc<CodecRegistry>,
321        dispatcher: Arc<dyn CodecDispatcher>,
322    ) -> Self {
323        Self {
324            backend: Arc::new(backend),
325            registry,
326            dispatcher,
327            max_body_bytes: Self::DEFAULT_MAX_BODY_BYTES,
328            policy: None,
329            secure_transport: false,
330            rate_limits: None,
331            access_log: None,
332            sse_keyring: None,
333            versioning: None,
334            kms: None,
335            kms_default_key_id: None,
336            object_lock: None,
337            cors: None,
338            inventory: None,
339            notifications: None,
340            lifecycle: None,
341            tagging: None,
342            replication: None,
343            mfa_delete: None,
344            compliance_strict: false,
345            sigv4a_gate: None,
346            multipart_state: Arc::new(crate::multipart_state::MultipartStateStore::new()),
347            // v0.8 #52: chunked SSE-S4 disabled by default — opt
348            // in via `S4Service::with_sse_chunk_size(...)` /
349            // `--sse-chunk-size <BYTES>`. Default keeps the legacy
350            // S4E2 buffered path so existing deployments are
351            // bit-for-bit unchanged.
352            sse_chunk_size: 0,
353        }
354    }
355
356    /// v0.7 #47: attach the SigV4a verify gate. Once set, the
357    /// listener-side middleware (`crate::routing::try_sigv4a_verify`)
358    /// short-circuits any incoming `AWS4-ECDSA-P256-SHA256` request,
359    /// verifying it against the supplied credential store and
360    /// returning 403 on failure. Plain SigV4 (HMAC-SHA256) requests
361    /// are unaffected. When the gate is unset (default), the
362    /// middleware skips entirely so existing SigV4 deployments keep
363    /// working.
364    #[must_use]
365    pub fn with_sigv4a_gate(mut self, gate: Arc<SigV4aGate>) -> Self {
366        self.sigv4a_gate = Some(gate);
367        self
368    }
369
370    /// v0.7 #47: borrow the attached SigV4a gate. Used by `main.rs`
371    /// to snapshot the gate `Arc` before the s3s `ServiceBuilder`
372    /// consumes the `S4Service` (the listener-side middleware needs
373    /// the same `Arc` because s3s' SigV4 verifier rejects SigV4a
374    /// algorithm tokens with "unknown algorithm" — match has to
375    /// happen at the hyper layer instead).
376    #[must_use]
377    pub fn sigv4a_gate(&self) -> Option<&Arc<SigV4aGate>> {
378        self.sigv4a_gate.as_ref()
379    }
380
381    /// v0.6 #39: attach the in-memory object + bucket Tagging manager.
382    /// Once set, `Put/Get/Delete` `Object/Bucket Tagging` route
383    /// through the manager (instead of forwarding to the backend),
384    /// and `put_object`'s `x-amz-tagging` parse path becomes the
385    /// source of `s3:RequestObjectTag/<key>` for the IAM policy
386    /// evaluator. The manager itself is shared via `Arc`.
387    #[must_use]
388    pub fn with_tagging(mut self, mgr: Arc<crate::tagging::TagManager>) -> Self {
389        self.tagging = Some(mgr);
390        self
391    }
392
393    /// v0.6 #39: borrow the attached tagging manager (test /
394    /// introspection — the snapshotter in `main.rs`, when wired,
395    /// will keep its own `Arc` clone).
396    #[must_use]
397    pub fn tag_manager(&self) -> Option<&Arc<crate::tagging::TagManager>> {
398        self.tagging.as_ref()
399    }
400
401    /// v0.6 #36: attach the in-memory S3 Inventory manager. Once set,
402    /// `put_bucket_inventory_configuration` /
403    /// `get_bucket_inventory_configuration` /
404    /// `list_bucket_inventory_configurations` /
405    /// `delete_bucket_inventory_configuration` route through the
406    /// manager. The actual periodic CSV / manifest emission is
407    /// orchestrated by a tokio task started in `main.rs`; the manager
408    /// itself is shared between the handler and the scheduler via
409    /// `Arc`.
410    #[must_use]
411    pub fn with_inventory(mut self, mgr: Arc<crate::inventory::InventoryManager>) -> Self {
412        self.inventory = Some(mgr);
413        self
414    }
415
416    /// v0.6 #36: borrow the attached inventory manager (test /
417    /// introspection — the background scheduler in `main.rs` keeps its
418    /// own `Arc` clone, so this accessor is for the test path that
419    /// invokes `run_once_for_test` directly).
420    #[must_use]
421    pub fn inventory_manager(&self) -> Option<&Arc<crate::inventory::InventoryManager>> {
422        self.inventory.as_ref()
423    }
424
425    /// v0.6 #37: attach the in-memory S3 Lifecycle configuration
426    /// manager. Once set, `put_bucket_lifecycle_configuration` /
427    /// `get_bucket_lifecycle_configuration` / `delete_bucket_lifecycle`
428    /// route through the manager (replacing the previous backend-
429    /// passthrough behaviour). The actual periodic scanner that walks
430    /// the source bucket and invokes Expiration / Transition /
431    /// NoncurrentExpiration actions is a v0.7+ follow-up — see
432    /// [`Self::run_lifecycle_once_for_test`] for the in-memory test
433    /// path that exercises the evaluator end-to-end.
434    #[must_use]
435    pub fn with_lifecycle(mut self, mgr: Arc<crate::lifecycle::LifecycleManager>) -> Self {
436        self.lifecycle = Some(mgr);
437        self
438    }
439
440    /// v0.6 #37: borrow the attached lifecycle manager (test /
441    /// introspection — the background scheduler in `main.rs` keeps its
442    /// own `Arc` clone, so this accessor is for the test path that
443    /// invokes the evaluator directly).
444    #[must_use]
445    pub fn lifecycle_manager(&self) -> Option<&Arc<crate::lifecycle::LifecycleManager>> {
446        self.lifecycle.as_ref()
447    }
448
449    /// v0.6 #37: synchronous test entry that runs the lifecycle evaluator
450    /// against a caller-provided list of `(key, age, size, tags)` tuples
451    /// and returns the `(key, action)` pairs that should fire. The actual
452    /// backend invocation (S3.delete_object / metadata rewrite) is left
453    /// to the caller — the unit + E2E tests use this to verify the
454    /// evaluator without spawning the (deferred) background scanner.
455    /// Returns an empty `Vec` when no lifecycle manager is attached or
456    /// no rule matches.
457    #[must_use]
458    pub fn run_lifecycle_once_for_test(
459        &self,
460        bucket: &str,
461        objects: &[crate::lifecycle::EvaluateBatchEntry],
462    ) -> Vec<(String, crate::lifecycle::LifecycleAction)> {
463        let Some(mgr) = self.lifecycle.as_ref() else {
464            return Vec::new();
465        };
466        crate::lifecycle::evaluate_batch(mgr, bucket, objects)
467    }
468
469    /// v0.6 #35: attach the in-memory bucket-notification manager. Once
470    /// set, `put_bucket_notification_configuration` /
471    /// `get_bucket_notification_configuration` route through the manager
472    /// (replacing the previous backend-passthrough behaviour); successful
473    /// `put_object` / `delete_object` calls fire matching destinations
474    /// on a detached tokio task via
475    /// `crate::notifications::dispatch_event` (best-effort, fire-and-
476    /// forget — failures bump the manager's `dropped_total` counter and
477    /// log at warn but do NOT fail the originating S3 request).
478    #[must_use]
479    pub fn with_notifications(
480        mut self,
481        mgr: Arc<crate::notifications::NotificationManager>,
482    ) -> Self {
483        self.notifications = Some(mgr);
484        self
485    }
486
487    /// v0.6 #35: borrow the attached notifications manager (test /
488    /// introspection — used by the metrics layer to read
489    /// `dropped_total`).
490    #[must_use]
491    pub fn notifications_manager(
492        &self,
493    ) -> Option<&Arc<crate::notifications::NotificationManager>> {
494        self.notifications.as_ref()
495    }
496
497    /// v0.6 #35: internal helper used by the DELETE handlers to fire a
498    /// matching notification on a detached tokio task. No-op when no
499    /// manager is attached or no rule on the bucket matches the given
500    /// (event, key) tuple.
501    fn fire_delete_notification(
502        &self,
503        bucket: &str,
504        key: &str,
505        event: crate::notifications::EventType,
506        version_id: Option<String>,
507    ) {
508        let Some(mgr) = self.notifications.as_ref() else {
509            return;
510        };
511        let dests = mgr.match_destinations(bucket, &event, key);
512        if dests.is_empty() {
513            return;
514        }
515        tokio::spawn(crate::notifications::dispatch_event(
516            Arc::clone(mgr),
517            bucket.to_owned(),
518            key.to_owned(),
519            event,
520            None,
521            None,
522            version_id,
523            format!("S4-{}", uuid::Uuid::new_v4()),
524        ));
525    }
526
527    /// v0.6 #40: attach the in-memory cross-bucket replication manager.
528    /// Once set, `put_bucket_replication` / `get_bucket_replication` /
529    /// `delete_bucket_replication` route through the manager (replacing
530    /// the previous backend-passthrough behaviour); a successful
531    /// `put_object` whose key matches an enabled rule fires a detached
532    /// tokio task that PUTs the same body + metadata to the rule's
533    /// destination bucket, stamping the replica with
534    /// `x-amz-replication-status: REPLICA`. Failures after the retry
535    /// budget bump the manager's `dropped_total` counter and are
536    /// surfaced in the `s4_replication_dropped_total` Prometheus
537    /// counter; successes bump `s4_replication_replicated_total`.
538    #[must_use]
539    pub fn with_replication(
540        mut self,
541        mgr: Arc<crate::replication::ReplicationManager>,
542    ) -> Self {
543        self.replication = Some(mgr);
544        self
545    }
546
547    /// v0.6 #40: borrow the attached replication manager (test /
548    /// introspection — used by the metrics layer to read
549    /// `dropped_total`).
550    #[must_use]
551    pub fn replication_manager(
552        &self,
553    ) -> Option<&Arc<crate::replication::ReplicationManager>> {
554        self.replication.as_ref()
555    }
556
557    /// v0.6 #40: internal helper used by the PUT handlers to fire a
558    /// detached cross-bucket replication task. No-op when no manager
559    /// is attached, the source backend PUT failed, or no rule on the
560    /// source bucket matches the (key, tags) tuple. The `body` is the
561    /// post-compression / post-encryption `Bytes` that was sent to
562    /// the source backend (refcount-cloned), and `metadata` is the
563    /// metadata map that already includes the manifest /
564    /// `s4-encrypted` markers — the replica decodes through the same
565    /// path. The destination PUT runs through `Arc<B>::put_object`.
566    fn spawn_replication_if_matched(
567        &self,
568        source_bucket: &str,
569        source_key: &str,
570        request_tags: &Option<crate::tagging::TagSet>,
571        body: &bytes::Bytes,
572        metadata: &Option<std::collections::HashMap<String, String>>,
573        backend_ok: bool,
574    ) where
575        B: Send + Sync + 'static,
576    {
577        if !backend_ok {
578            return;
579        }
580        let Some(mgr) = self.replication.as_ref() else {
581            return;
582        };
583        // Pull the request's tags into the (k, v) shape the matcher
584        // expects. The tagging manager would have the canonical
585        // post-PUT view but at this point in the pipeline it's
586        // already been written above; for the rule-match decision
587        // the request's tags are sufficient (= the tags this PUT
588        // applies, S3 PutObject is full-replace on tags).
589        let object_tags: Vec<(String, String)> = request_tags
590            .as_ref()
591            .map(|ts| ts.iter().cloned().collect())
592            .unwrap_or_default();
593        let Some(rule) = mgr.match_rule(source_bucket, source_key, &object_tags) else {
594            return;
595        };
596        // Eagerly mark the source key as Pending so a HEAD between
597        // the source PUT returning and the spawned task completing
598        // surfaces the in-flight state.
599        mgr.record_status(
600            source_bucket,
601            source_key,
602            crate::replication::ReplicationStatus::Pending,
603        );
604        let mgr_cl = Arc::clone(mgr);
605        let backend = Arc::clone(&self.backend);
606        let body_cl = body.clone();
607        let metadata_cl = metadata.clone();
608        let source_bucket_cl = source_bucket.to_owned();
609        let source_key_cl = source_key.to_owned();
610        tokio::spawn(async move {
611            let do_put = move |dest_bucket: String,
612                               dest_key: String,
613                               dest_body: bytes::Bytes,
614                               dest_meta: Option<std::collections::HashMap<String, String>>| {
615                let backend = Arc::clone(&backend);
616                async move {
617                    let req = S3Request {
618                        input: PutObjectInput {
619                            bucket: dest_bucket,
620                            key: dest_key,
621                            body: Some(bytes_to_blob(dest_body)),
622                            metadata: dest_meta,
623                            ..Default::default()
624                        },
625                        method: http::Method::PUT,
626                        uri: "/".parse().unwrap(),
627                        headers: http::HeaderMap::new(),
628                        extensions: http::Extensions::new(),
629                        credentials: None,
630                        region: None,
631                        service: None,
632                        trailing_headers: None,
633                    };
634                    backend
635                        .put_object(req)
636                        .await
637                        .map(|_| ())
638                        .map_err(|e| format!("destination put_object: {e}"))
639                }
640            };
641            crate::replication::replicate_object(
642                rule,
643                source_bucket_cl,
644                source_key_cl,
645                body_cl,
646                metadata_cl,
647                do_put,
648                mgr_cl,
649            )
650            .await;
651        });
652    }
653
654    /// v0.6 #42: attach the in-memory MFA-Delete enforcement manager.
655    /// Once set, every DELETE / DELETE-version / delete-marker /
656    /// `PutBucketVersioning` request against a bucket whose MFA-Delete
657    /// state is `Enabled` requires a valid `x-amz-mfa: <serial> <code>`
658    /// header (RFC 6238 6-digit TOTP); the gate is a no-op for buckets
659    /// where MFA-Delete is `Disabled` (S3 default).
660    #[must_use]
661    pub fn with_mfa_delete(mut self, mgr: Arc<crate::mfa::MfaDeleteManager>) -> Self {
662        self.mfa_delete = Some(mgr);
663        self
664    }
665
666    /// v0.6 #42: borrow the attached MFA-Delete manager (test /
667    /// introspection — used by the snapshot path in `main.rs` to call
668    /// `to_json` for restart-recoverable state).
669    #[must_use]
670    pub fn mfa_delete_manager(&self) -> Option<&Arc<crate::mfa::MfaDeleteManager>> {
671        self.mfa_delete.as_ref()
672    }
673
674    /// v0.6 #38: attach the in-memory CORS configuration manager. Once
675    /// set, `put_bucket_cors` / `get_bucket_cors` / `delete_bucket_cors`
676    /// route through the manager instead of forwarding to the backend,
677    /// and [`Self::handle_preflight`] becomes useful for the (future)
678    /// listener-side OPTIONS interceptor.
679    #[must_use]
680    pub fn with_cors(mut self, mgr: Arc<crate::cors::CorsManager>) -> Self {
681        self.cors = Some(mgr);
682        self
683    }
684
685    /// v0.6 #38: Borrow the attached CORS manager (test / introspection).
686    #[must_use]
687    pub fn cors_manager(&self) -> Option<&Arc<crate::cors::CorsManager>> {
688        self.cors.as_ref()
689    }
690
691    /// v0.6 #38: evaluate a CORS preflight request against the bucket's
692    /// configured rules and, if a rule matches, return the headers that
693    /// the (future) listener-side OPTIONS interceptor must put on the
694    /// 200 response: `Access-Control-Allow-Origin`, `Access-Control-
695    /// Allow-Methods`, `Access-Control-Allow-Headers`, optionally
696    /// `Access-Control-Max-Age` and `Access-Control-Expose-Headers`.
697    ///
698    /// Returns `None` when no manager is attached, no config is
699    /// registered for the bucket, or no rule matches the (origin,
700    /// method, headers) triple. The caller is responsible for turning
701    /// `None` into the appropriate 403 response.
702    ///
703    /// **Note:** the OPTIONS routing itself (i.e. wiring this method
704    /// into the hyper-util listener path) is a follow-up — s3s does not
705    /// surface OPTIONS as a typed S3 handler, so this method is
706    /// currently call-able only from inside other handlers and tests.
707    #[must_use]
708    pub fn handle_preflight(
709        &self,
710        bucket: &str,
711        origin: &str,
712        method: &str,
713        request_headers: &[String],
714    ) -> Option<std::collections::HashMap<String, String>> {
715        let mgr = self.cors.as_ref()?;
716        let rule = mgr.match_preflight(bucket, origin, method, request_headers)?;
717        let mut h = std::collections::HashMap::new();
718        // Echo the matched origin back. If the rule used "*" we still
719        // echo "*" (S3 spec — the spec does not require us to echo the
720        // *requesting* origin when the wildcard matched).
721        let allow_origin = if rule.allowed_origins.iter().any(|o| o == "*") {
722            "*".to_string()
723        } else {
724            origin.to_string()
725        };
726        h.insert("Access-Control-Allow-Origin".to_string(), allow_origin);
727        h.insert(
728            "Access-Control-Allow-Methods".to_string(),
729            rule.allowed_methods.join(", "),
730        );
731        if !rule.allowed_headers.is_empty() {
732            // For the Allow-Headers response, echo back the rule's
733            // pattern list verbatim (S3 echoes the configured list,
734            // including "*" if present). Browsers honour exact-match
735            // rules.
736            h.insert(
737                "Access-Control-Allow-Headers".to_string(),
738                rule.allowed_headers.join(", "),
739            );
740        }
741        if let Some(secs) = rule.max_age_seconds {
742            h.insert("Access-Control-Max-Age".to_string(), secs.to_string());
743        }
744        if !rule.expose_headers.is_empty() {
745            h.insert(
746                "Access-Control-Expose-Headers".to_string(),
747                rule.expose_headers.join(", "),
748            );
749        }
750        Some(h)
751    }
752
753    /// v0.5 #32: enable strict compliance mode. Every PUT must carry an
754    /// SSE indicator (server-side encryption header or SSE-C customer
755    /// key); requests without one are rejected with 400 InvalidRequest.
756    /// Boot-time prerequisite checking lives in the binary
757    /// (`validate_compliance_mode`) so this flag is purely the runtime
758    /// switch.
759    #[must_use]
760    pub fn with_compliance_strict(mut self, on: bool) -> Self {
761        self.compliance_strict = on;
762        self
763    }
764
765    /// v0.5 #30: attach the in-memory Object Lock (WORM) enforcement
766    /// manager. Once set, `delete_object` and overwrite-path
767    /// `put_object` refuse operations on locked keys with HTTP 403
768    /// `AccessDenied`; new PUTs to a bucket with a default retention
769    /// policy auto-create per-object lock state.
770    #[must_use]
771    pub fn with_object_lock(
772        mut self,
773        mgr: Arc<crate::object_lock::ObjectLockManager>,
774    ) -> Self {
775        self.object_lock = Some(mgr);
776        self
777    }
778
779    /// v0.7 #45: borrow the attached Object Lock manager (read-only —
780    /// the lifecycle scanner uses this to skip currently-locked objects
781    /// before issuing `delete_object`, since an Object Lock always wins
782    /// over Lifecycle Expiration in AWS S3 semantics). Mirrors the
783    /// shape of [`Self::lifecycle_manager`] /
784    /// [`Self::tag_manager`] — purely additive accessor, no handler
785    /// behaviour change.
786    #[must_use]
787    pub fn object_lock_manager(&self) -> Option<&Arc<crate::object_lock::ObjectLockManager>> {
788        self.object_lock.as_ref()
789    }
790
791    /// v0.5 #28: attach an SSE-KMS backend. `default_key_id` is used
792    /// when a PUT requests SSE-KMS without naming a specific KMS key
793    /// (operators set this to mirror AWS S3's bucket-default key).
794    #[must_use]
795    pub fn with_kms_backend(
796        mut self,
797        kms: Arc<dyn crate::kms::KmsBackend>,
798        default_key_id: Option<String>,
799    ) -> Self {
800        self.kms = Some(kms);
801        self.kms_default_key_id = default_key_id;
802        self
803    }
804
805    /// v0.5 #34: attach the first-class versioning state machine. Once
806    /// set, this `S4Service` owns the per-bucket versioning state +
807    /// per-(bucket, key) version chain; `put_object` / `get_object` /
808    /// `delete_object` / `list_object_versions` /
809    /// `get_bucket_versioning` / `put_bucket_versioning` consult the
810    /// manager instead of passing through to the backend. The backend
811    /// is still used as the byte store: Suspended / Unversioned buckets
812    /// keep using `<key>` directly (legacy), Enabled buckets redirect
813    /// each version's bytes to a shadow key
814    /// (`<key>.__s4ver__/<version-id>`) so older versions survive newer
815    /// PUTs to the same logical key.
816    #[must_use]
817    pub fn with_versioning(mut self, mgr: Arc<crate::versioning::VersioningManager>) -> Self {
818        self.versioning = Some(mgr);
819        self
820    }
821
822    /// v0.4 #21 (kept for back-compat): attach a single SSE-S4 key.
823    /// Internally wraps it in a 1-slot keyring with id=1 active, so
824    /// new objects ride the v0.5 S4E2 frame while previously-written
825    /// S4E1 bytes (this same key) still decrypt via the keyring's S4E1
826    /// fallback path. Operators wanting true rotation should call
827    /// [`Self::with_sse_keyring`] instead.
828    #[must_use]
829    pub fn with_sse_key(mut self, key: crate::sse::SharedSseKey) -> Self {
830        let keyring = crate::sse::SseKeyring::new(1, key);
831        self.sse_keyring = Some(std::sync::Arc::new(keyring));
832        self
833    }
834
835    /// v0.5 #29: attach a multi-key SSE-S4 keyring. PUT encrypts under
836    /// the active key (S4E2 frame stamped with that key's id); GET
837    /// dispatches on the body's magic — S4E1 falls back to trying every
838    /// key in the ring (active first) so v0.4 objects survive a
839    /// migration; S4E2 looks up the explicit key_id from the header.
840    #[must_use]
841    pub fn with_sse_keyring(mut self, keyring: crate::sse::SharedSseKeyring) -> Self {
842        self.sse_keyring = Some(keyring);
843        self
844    }
845
846    /// v0.8 #52: opt the SSE-S4 PUT path into the chunked S4E5 frame
847    /// (so the matching GET can stream-decrypt chunk-by-chunk
848    /// instead of buffering the entire body before tag verify).
849    /// `bytes` is the plaintext slice size — typically 1 MiB; 0
850    /// disables the path and reverts to the legacy S4E2 buffered
851    /// frame.
852    ///
853    /// SSE-C (S4E3) and SSE-KMS (S4E4) are intentionally untouched:
854    /// the chunked envelopes for those flows are a follow-up issue
855    /// (the customer-key wire surface needs separate version
856    /// negotiation).
857    ///
858    /// Has no effect when `with_sse_keyring` / `with_sse_key` is
859    /// not also set — the chunked path runs only on the SSE-S4
860    /// branch of `put_object`.
861    #[must_use]
862    pub fn with_sse_chunk_size(mut self, bytes: usize) -> Self {
863        self.sse_chunk_size = bytes;
864        self
865    }
866
867    /// v0.4 #20: attach an S3-style access-log emitter. Each completed
868    /// PUT / GET / DELETE / List handler emits one entry into the
869    /// emitter's buffer; a background flusher (started separately, see
870    /// [`crate::access_log::AccessLog::spawn_flusher`]) writes hourly
871    /// rotated `.log` files into the configured directory.
872    #[must_use]
873    pub fn with_access_log(mut self, log: crate::access_log::SharedAccessLog) -> Self {
874        self.access_log = Some(log);
875        self
876    }
877
878    /// Capture the per-request access-log preamble before the request is
879    /// consumed by the backend call. Returns `None` if no access logger
880    /// is configured (cheap early-out so the handler doesn't pay the
881    /// header-clone cost when access logging is off).
882    fn access_log_preamble<I>(&self, req: &S3Request<I>) -> Option<AccessLogPreamble> {
883        self.access_log.as_ref()?;
884        Some(AccessLogPreamble {
885            remote_ip: req
886                .headers
887                .get("x-forwarded-for")
888                .and_then(|v| v.to_str().ok())
889                .and_then(|raw| raw.split(',').next())
890                .map(|s| s.trim().to_owned()),
891            requester: Self::principal_of(req).map(str::to_owned),
892            request_uri: format!("{} {}", req.method, req.uri.path()),
893            user_agent: req
894                .headers
895                .get("user-agent")
896                .and_then(|v| v.to_str().ok())
897                .map(str::to_owned),
898        })
899    }
900
901    /// Internal — called by handlers at end-of-request with a captured
902    /// preamble. Best-effort: swallows the await fast (clones Arc +
903    /// pushes), no error propagation back to the request path.
904    #[allow(clippy::too_many_arguments)]
905    async fn record_access(
906        &self,
907        preamble: Option<AccessLogPreamble>,
908        operation: &'static str,
909        bucket: &str,
910        key: Option<&str>,
911        http_status: u16,
912        bytes_sent: u64,
913        object_size: u64,
914        total_time_ms: u64,
915        error_code: Option<&str>,
916    ) {
917        let (Some(log), Some(p)) = (self.access_log.as_ref(), preamble) else {
918            return;
919        };
920        log.record(crate::access_log::AccessLogEntry {
921            time: std::time::SystemTime::now(),
922            bucket: bucket.to_owned(),
923            remote_ip: p.remote_ip,
924            requester: p.requester,
925            operation,
926            key: key.map(str::to_owned),
927            request_uri: p.request_uri,
928            http_status,
929            error_code: error_code.map(str::to_owned),
930            bytes_sent,
931            object_size,
932            total_time_ms,
933            user_agent: p.user_agent,
934        })
935        .await;
936    }
937
938    /// v0.4 #19: attach a per-(principal, bucket) token-bucket rate limiter.
939    /// When set, every PUT / GET / DELETE / List / Copy / multipart op is
940    /// throttle-checked before the policy gate; throttled requests return
941    /// `S3ErrorCode::SlowDown` (HTTP 503) and bump
942    /// `s4_rate_limit_throttled_total{principal,bucket}`.
943    #[must_use]
944    pub fn with_rate_limits(mut self, rl: crate::rate_limit::SharedRateLimits) -> Self {
945        self.rate_limits = Some(rl);
946        self
947    }
948
949    /// Helper used by request handlers to apply the rate limit. Returns
950    /// `Ok(())` when allowed (or no rate limiter is configured), or a
951    /// `SlowDown` S3Error otherwise.
952    fn enforce_rate_limit<I>(&self, req: &S3Request<I>, bucket: &str) -> S3Result<()> {
953        let Some(rl) = self.rate_limits.as_ref() else {
954            return Ok(());
955        };
956        let principal_id = Self::principal_of(req);
957        if !rl.check(principal_id, bucket) {
958            crate::metrics::record_rate_limit_throttle(principal_id.unwrap_or("-"), bucket);
959            return Err(S3Error::with_message(
960                S3ErrorCode::SlowDown,
961                format!("rate-limited: bucket={bucket}"),
962            ));
963        }
964        Ok(())
965    }
966
967    /// Tell the policy evaluator that the listener is reached over TLS
968    /// (or ACME). When `true`, the `aws:SecureTransport` Condition key
969    /// resolves to `true`. Defaults to `false`.
970    #[must_use]
971    pub fn with_secure_transport(mut self, on: bool) -> Self {
972        self.secure_transport = on;
973        self
974    }
975
976    #[must_use]
977    pub fn with_max_body_bytes(mut self, n: usize) -> Self {
978        self.max_body_bytes = n;
979        self
980    }
981
982    /// Attach an optional bucket policy (v0.2 #7). When `Some(...)`, every
983    /// PUT / GET / DELETE / List handler runs `policy.evaluate(...)` before
984    /// delegating to the backend; failures return `S3ErrorCode::AccessDenied`.
985    /// When `None` (the default), no policy enforcement happens.
986    #[must_use]
987    pub fn with_policy(mut self, policy: crate::policy::SharedPolicy) -> Self {
988        self.policy = Some(policy);
989        self
990    }
991
992    /// Pull the SigV4 access key id off the request's credentials, if any.
993    /// Used as the `principal_id` for policy evaluation.
994    fn principal_of<I>(req: &S3Request<I>) -> Option<&str> {
995        req.credentials.as_ref().map(|c| c.access_key.as_str())
996    }
997
998    /// v0.3 #13: build the per-request policy context from the incoming
999    /// `S3Request`. Pulls `aws:UserAgent` from the User-Agent header,
1000    /// `aws:SourceIp` from the standard `X-Forwarded-For` header (most
1001    /// production deployments are behind an LB / reverse proxy that sets
1002    /// this), `aws:CurrentTime` from the system clock, and
1003    /// `aws:SecureTransport` from the per-listener TLS flag.
1004    fn request_context<I>(&self, req: &S3Request<I>) -> crate::policy::RequestContext {
1005        let user_agent = req
1006            .headers
1007            .get("user-agent")
1008            .and_then(|v| v.to_str().ok())
1009            .map(str::to_owned);
1010        // X-Forwarded-For is `client, proxy1, proxy2`; the leftmost entry
1011        // is the original client. Trim and parse leniently.
1012        let source_ip = req
1013            .headers
1014            .get("x-forwarded-for")
1015            .and_then(|v| v.to_str().ok())
1016            .and_then(|raw| raw.split(',').next())
1017            .and_then(|s| s.trim().parse().ok());
1018        crate::policy::RequestContext {
1019            source_ip,
1020            user_agent,
1021            request_time: Some(std::time::SystemTime::now()),
1022            secure_transport: self.secure_transport,
1023            existing_object_tags: None,
1024            request_object_tags: None,
1025            extra: Default::default(),
1026        }
1027    }
1028
1029    /// Helper used by request handlers to enforce the optional policy.
1030    /// Returns `Ok(())` when allowed (or no policy is configured), or an
1031    /// `AccessDenied` S3Error otherwise. Bumps the policy denial Prometheus
1032    /// counter on deny.
1033    fn enforce_policy<I>(
1034        &self,
1035        req: &S3Request<I>,
1036        action: &'static str,
1037        bucket: &str,
1038        key: Option<&str>,
1039    ) -> S3Result<()> {
1040        self.enforce_policy_with_extra(req, action, bucket, key, None, None)
1041    }
1042
1043    /// v0.6 #39: variant of [`Self::enforce_policy`] that lets the
1044    /// caller plumb tag context (existing-on-object + on-request) into
1045    /// the policy evaluator. Both arguments default to `None`, in
1046    /// which case the resulting `RequestContext` is identical to
1047    /// [`Self::enforce_policy`]'s — so for handlers that don't deal
1048    /// with tags this is a transparent no-op.
1049    fn enforce_policy_with_extra<I>(
1050        &self,
1051        req: &S3Request<I>,
1052        action: &'static str,
1053        bucket: &str,
1054        key: Option<&str>,
1055        request_tags: Option<&crate::tagging::TagSet>,
1056        existing_tags: Option<&crate::tagging::TagSet>,
1057    ) -> S3Result<()> {
1058        let Some(policy) = self.policy.as_ref() else {
1059            return Ok(());
1060        };
1061        let principal_id = Self::principal_of(req);
1062        let mut ctx = self.request_context(req);
1063        if let Some(t) = request_tags {
1064            ctx.request_object_tags = Some(t.clone());
1065        }
1066        if let Some(t) = existing_tags {
1067            ctx.existing_object_tags = Some(t.clone());
1068        }
1069        let decision = policy.evaluate_with(action, bucket, key, principal_id, &ctx);
1070        if decision.allow {
1071            Ok(())
1072        } else {
1073            crate::metrics::record_policy_denial(action, bucket);
1074            tracing::info!(
1075                action,
1076                bucket,
1077                key = ?key,
1078                principal = ?principal_id,
1079                source_ip = ?ctx.source_ip,
1080                user_agent = ?ctx.user_agent,
1081                secure_transport = ctx.secure_transport,
1082                matched_sid = ?decision.matched_sid,
1083                effect = ?decision.matched_effect,
1084                "S4 policy denied request"
1085            );
1086            Err(S3Error::with_message(
1087                S3ErrorCode::AccessDenied,
1088                format!("denied by S4 policy: {action} on bucket={bucket}"),
1089            ))
1090        }
1091    }
1092
1093    /// テスト用: backend を取り戻す (test helper、production では使わない).
1094    /// v0.6 #40 で `backend` が `Arc<B>` 化したので `Arc::try_unwrap` で
1095    /// 1-clone の場合のみ返す。共有されている (= replication dispatcher が
1096    /// 同じ Arc を持っていて未完了) 場合は `Err` を返さず panic させる
1097    /// (test 用途専用 helper の caller 契約を維持)。
1098    pub fn into_backend(self) -> B {
1099        Arc::try_unwrap(self.backend)
1100            .unwrap_or_else(|_| panic!("into_backend: backend Arc still shared (replication dispatcher in flight?)"))
1101    }
1102
1103    /// 必要 frame だけを backend に Range GET し、frame parse + decompress + slice
1104    /// した結果を返す sidecar fast path。Range request の **帯域節約版**。
1105    async fn partial_range_get(
1106        &self,
1107        req: &S3Request<GetObjectInput>,
1108        plan: s4_codec::index::RangePlan,
1109        client_start: u64,
1110        client_end_exclusive: u64,
1111        total_original: u64,
1112        get_start: Instant,
1113    ) -> S3Result<S3Response<GetObjectOutput>> {
1114        // 必要 byte 範囲だけを backend に partial GET
1115        let backend_range = s3s::dto::Range::Int {
1116            first: plan.byte_start,
1117            last: Some(plan.byte_end_exclusive - 1),
1118        };
1119        let backend_input = GetObjectInput {
1120            bucket: req.input.bucket.clone(),
1121            key: req.input.key.clone(),
1122            range: Some(backend_range),
1123            ..Default::default()
1124        };
1125        let backend_req = S3Request {
1126            input: backend_input,
1127            method: req.method.clone(),
1128            uri: req.uri.clone(),
1129            headers: req.headers.clone(),
1130            extensions: http::Extensions::new(),
1131            credentials: req.credentials.clone(),
1132            region: req.region.clone(),
1133            service: req.service.clone(),
1134            trailing_headers: None,
1135        };
1136        let mut backend_resp = self.backend.get_object(backend_req).await?;
1137        let blob = backend_resp.output.body.take().ok_or_else(|| {
1138            S3Error::with_message(
1139                S3ErrorCode::InternalError,
1140                "backend partial GET returned empty body",
1141            )
1142        })?;
1143        let bytes = collect_blob(blob, self.max_body_bytes)
1144            .await
1145            .map_err(internal("collect partial body"))?;
1146
1147        // frame parse + decompress
1148        let mut combined = BytesMut::new();
1149        for frame in FrameIter::new(bytes) {
1150            let (header, payload) = frame.map_err(|e| {
1151                S3Error::with_message(
1152                    S3ErrorCode::InternalError,
1153                    format!("partial-range frame parse: {e}"),
1154                )
1155            })?;
1156            let chunk_manifest = ChunkManifest {
1157                codec: header.codec,
1158                original_size: header.original_size,
1159                compressed_size: header.compressed_size,
1160                crc32c: header.crc32c,
1161            };
1162            let decompressed = self
1163                .registry
1164                .decompress(payload, &chunk_manifest)
1165                .await
1166                .map_err(internal("partial-range decompress"))?;
1167            combined.extend_from_slice(&decompressed);
1168        }
1169        let combined = combined.freeze();
1170        let sliced = combined
1171            .slice(plan.slice_start_in_combined as usize..plan.slice_end_in_combined as usize);
1172
1173        // response 組立て
1174        let returned_size = sliced.len() as u64;
1175        backend_resp.output.content_length = Some(returned_size as i64);
1176        backend_resp.output.content_range = Some(format!(
1177            "bytes {client_start}-{}/{total_original}",
1178            client_end_exclusive - 1
1179        ));
1180        backend_resp.output.checksum_crc32 = None;
1181        backend_resp.output.checksum_crc32c = None;
1182        backend_resp.output.checksum_crc64nvme = None;
1183        backend_resp.output.checksum_sha1 = None;
1184        backend_resp.output.checksum_sha256 = None;
1185        backend_resp.output.e_tag = None;
1186        backend_resp.output.body = Some(bytes_to_blob(sliced));
1187        backend_resp.status = Some(http::StatusCode::PARTIAL_CONTENT);
1188
1189        let elapsed = get_start.elapsed();
1190        crate::metrics::record_get(
1191            "partial",
1192            plan.byte_end_exclusive - plan.byte_start,
1193            returned_size,
1194            elapsed.as_secs_f64(),
1195            true,
1196        );
1197        info!(
1198            op = "get_object",
1199            bucket = %req.input.bucket,
1200            key = %req.input.key,
1201            bytes_in = plan.byte_end_exclusive - plan.byte_start,
1202            bytes_out = returned_size,
1203            total_object_size = total_original,
1204            range = true,
1205            path = "sidecar-partial",
1206            latency_ms = elapsed.as_millis() as u64,
1207            "S4 partial Range GET via sidecar index"
1208        );
1209        Ok(backend_resp)
1210    }
1211
1212    /// `<key>.s4index` sidecar object を backend に書く。失敗しても本体 PUT は
1213    /// 成功扱いにしたいので、err は warn ログのみ (Range GET の partial path が
1214    /// 使えなくなるが、full read fallback で意味的には正しい結果を返す)。
1215    async fn write_sidecar(&self, bucket: &str, key: &str, index: &FrameIndex) {
1216        let bytes = encode_index(index);
1217        let len = bytes.len() as i64;
1218        let sidecar = sidecar_key(key);
1219        // v0.7 #49: synthetic re-entry URI must be percent-encoded; if
1220        // the (already legally-arbitrary) S3 key produces something we
1221        // cannot encode at all, drop the sidecar PUT (the GET path
1222        // falls back to a full read on a missing sidecar) instead of
1223        // panicking on `parse().unwrap()`.
1224        let uri = match safe_object_uri(bucket, &sidecar) {
1225            Ok(u) => u,
1226            Err(e) => {
1227                tracing::warn!(
1228                    bucket,
1229                    key,
1230                    "S4 write_sidecar skipped (key not URI-encodable): {e}"
1231                );
1232                return;
1233            }
1234        };
1235        let put_input = PutObjectInput {
1236            bucket: bucket.into(),
1237            key: sidecar,
1238            body: Some(bytes_to_blob(bytes)),
1239            content_length: Some(len),
1240            content_type: Some("application/x-s4-index".into()),
1241            ..Default::default()
1242        };
1243        let put_req = S3Request {
1244            input: put_input,
1245            method: http::Method::PUT,
1246            uri,
1247            headers: http::HeaderMap::new(),
1248            extensions: http::Extensions::new(),
1249            credentials: None,
1250            region: None,
1251            service: None,
1252            trailing_headers: None,
1253        };
1254        if let Err(e) = self.backend.put_object(put_req).await {
1255            tracing::warn!(
1256                bucket,
1257                key,
1258                "S4 write_sidecar failed (Range GET will fall back to full read): {e}"
1259            );
1260        }
1261    }
1262
1263    /// `<key>.s4index` sidecar を backend から読み出す。なければ None。
1264    async fn read_sidecar(&self, bucket: &str, key: &str) -> Option<FrameIndex> {
1265        let sidecar = sidecar_key(key);
1266        // v0.7 #49: same encode-or-bail treatment as write_sidecar.
1267        let uri = safe_object_uri(bucket, &sidecar).ok()?;
1268        let get_input = GetObjectInput {
1269            bucket: bucket.into(),
1270            key: sidecar,
1271            ..Default::default()
1272        };
1273        let get_req = S3Request {
1274            input: get_input,
1275            method: http::Method::GET,
1276            uri,
1277            headers: http::HeaderMap::new(),
1278            extensions: http::Extensions::new(),
1279            credentials: None,
1280            region: None,
1281            service: None,
1282            trailing_headers: None,
1283        };
1284        let resp = self.backend.get_object(get_req).await.ok()?;
1285        let blob = resp.output.body?;
1286        let bytes = collect_blob(blob, 64 * 1024 * 1024).await.ok()?;
1287        decode_index(bytes).ok()
1288    }
1289
1290    /// Multipart object (frame 列) を解凍 → 元 bytes を再構築。
1291    ///
1292    /// **per-frame codec dispatch**: 各 frame header に codec_id が入っているので、
1293    /// frame ごとに registry が違う codec を呼ぶことができる。同一 object 内で
1294    /// 異なる codec が混在していても透過的に解凍可能 (parquet 風 mixed columns 等)。
1295    async fn decompress_multipart(&self, bytes: bytes::Bytes) -> S3Result<bytes::Bytes> {
1296        let mut out = BytesMut::new();
1297        for frame in FrameIter::new(bytes) {
1298            let (header, payload) = frame.map_err(|e| {
1299                S3Error::with_message(
1300                    S3ErrorCode::InternalError,
1301                    format!("multipart frame parse: {e}"),
1302                )
1303            })?;
1304            let chunk_manifest = ChunkManifest {
1305                codec: header.codec,
1306                original_size: header.original_size,
1307                compressed_size: header.compressed_size,
1308                crc32c: header.crc32c,
1309            };
1310            let decompressed = self
1311                .registry
1312                .decompress(payload, &chunk_manifest)
1313                .await
1314                .map_err(internal("multipart frame decompress"))?;
1315            out.extend_from_slice(&decompressed);
1316        }
1317        Ok(out.freeze())
1318    }
1319}
1320
1321/// Parse a CopySourceRange header value (`bytes=N-M`, `bytes=N-`, `bytes=-N`)
1322/// into the s3s::dto::Range used by the GetObject path. The S3 spec only
1323/// allows `bytes=N-M` for upload_part_copy (no suffix or open-ended), so
1324/// reject the other variants for parity with AWS.
1325fn parse_copy_source_range(s: &str) -> Result<s3s::dto::Range, String> {
1326    let rest = s
1327        .strip_prefix("bytes=")
1328        .ok_or_else(|| format!("CopySourceRange must start with 'bytes=', got {s:?}"))?;
1329    let (a, b) = rest
1330        .split_once('-')
1331        .ok_or_else(|| format!("CopySourceRange must be 'bytes=N-M', got {s:?}"))?;
1332    let first: u64 = a
1333        .parse()
1334        .map_err(|_| format!("CopySourceRange first byte not a number: {a:?}"))?;
1335    let last: u64 = b
1336        .parse()
1337        .map_err(|_| format!("CopySourceRange last byte not a number: {b:?}"))?;
1338    if last < first {
1339        return Err(format!("CopySourceRange last < first: {s:?}"));
1340    }
1341    Ok(s3s::dto::Range::Int {
1342        first,
1343        last: Some(last),
1344    })
1345}
1346
1347/// v0.5 #34: synthesize the backend storage key for a given
1348/// (logical key, version-id) pair on an Enabled-versioning bucket.
1349///
1350/// Uses the `__s4ver__/` infix because:
1351/// - it's not a substring of `.s4index` / `.s4ver` natural keys (no false-positive
1352///   listing filter collisions)
1353/// - directory-style separator keeps S3 console "browse by prefix" UX intact
1354///   (versions roll up under one virtual folder per object)
1355/// - human-readable on debug logs / `aws s3 ls`
1356///
1357/// `list_objects` / `list_objects_v2` / `list_object_versions` MUST filter
1358/// keys containing `.__s4ver__/` from results so customers don't see internal
1359/// shadow objects.
1360pub fn versioned_shadow_key(key: &str, version_id: &str) -> String {
1361    format!("{key}.__s4ver__/{version_id}")
1362}
1363
1364/// Test for the marker substring used by [`versioned_shadow_key`]. Cheap str
1365/// scan; both list_objects filter and the GET passthrough check use this.
1366fn is_versioning_shadow_key(key: &str) -> bool {
1367    key.contains(".__s4ver__/")
1368}
1369
1370/// v0.6 #42: wall-clock seconds since the UNIX epoch — fed to
1371/// `mfa::check_mfa` so the TOTP verifier can match the client's
1372/// authenticator app's view of "now". Falls back to `0` on the
1373/// (impossible-in-practice) clock-before-1970 path so the verifier
1374/// rejects rather than panicking.
1375fn current_unix_secs() -> u64 {
1376    std::time::SystemTime::now()
1377        .duration_since(std::time::UNIX_EPOCH)
1378        .map(|d| d.as_secs())
1379        .unwrap_or(0)
1380}
1381
1382/// v0.6 #42: translate an `MfaError` into the matching S3 wire error.
1383///
1384/// - `Missing` / `SerialMismatch` / `InvalidCode` → `403 AccessDenied`
1385///   (S3 spec for MFA Delete: every gating failure surfaces as
1386///   `AccessDenied`, not a separate `MFA*` code).
1387/// - `Malformed` → `400 InvalidRequest` (the request itself is
1388///   syntactically broken, not a permission issue).
1389fn mfa_error_to_s3(e: crate::mfa::MfaError) -> S3Error {
1390    match e {
1391        crate::mfa::MfaError::Missing => S3Error::with_message(
1392            S3ErrorCode::AccessDenied,
1393            "MFA token required for this operation",
1394        ),
1395        crate::mfa::MfaError::Malformed => S3Error::with_message(
1396            S3ErrorCode::InvalidRequest,
1397            "malformed x-amz-mfa header",
1398        ),
1399        crate::mfa::MfaError::SerialMismatch => S3Error::with_message(
1400            S3ErrorCode::AccessDenied,
1401            "MFA serial does not match configured device",
1402        ),
1403        crate::mfa::MfaError::InvalidCode => S3Error::with_message(
1404            S3ErrorCode::AccessDenied,
1405            "invalid MFA code",
1406        ),
1407    }
1408}
1409
1410fn is_multipart_object(metadata: &Option<Metadata>) -> bool {
1411    metadata
1412        .as_ref()
1413        .and_then(|m| m.get(META_MULTIPART))
1414        .map(|v| v == "true")
1415        .unwrap_or(false)
1416}
1417
1418const META_CODEC: &str = "s4-codec";
1419const META_ORIGINAL_SIZE: &str = "s4-original-size";
1420const META_COMPRESSED_SIZE: &str = "s4-compressed-size";
1421const META_CRC32C: &str = "s4-crc32c";
1422/// Multipart upload で per-part frame format を使ったオブジェクトであることを示す。
1423/// GET 時にこの flag を見て frame parser を起動する。
1424const META_MULTIPART: &str = "s4-multipart";
1425/// v0.2 #4: single-PUT でも S4F2 framed format で書かれていることを示す。
1426/// 旧 v0.1 single-PUT は raw 圧縮 bytes (この flag なし)。GET 時にこの flag を
1427/// 見て framed 経路 (= multipart と同じ FrameIter parse) に流す。
1428const META_FRAMED: &str = "s4-framed";
1429
1430fn is_framed_v2_object(metadata: &Option<Metadata>) -> bool {
1431    metadata
1432        .as_ref()
1433        .and_then(|m| m.get(META_FRAMED))
1434        .map(|v| v == "true")
1435        .unwrap_or(false)
1436}
1437
1438/// v0.4 #21: detect SSE-S4 by the metadata flag we set on PUT.
1439fn is_sse_encrypted(metadata: &Option<Metadata>) -> bool {
1440    metadata
1441        .as_ref()
1442        .and_then(|m| m.get("s4-encrypted"))
1443        .map(|v| v == "aes-256-gcm")
1444        .unwrap_or(false)
1445}
1446
1447/// v0.5 #27: pull the three SSE-C headers off an input struct. The S3
1448/// contract is "all three or none" — partial sets are a 400.
1449///
1450/// Returns `Ok(None)` when no SSE-C headers were sent (server-managed or
1451/// no encryption), `Ok(Some(material))` on validated client key, and
1452/// `Err` for malformed or partial inputs.
1453fn extract_sse_c_material(
1454    algorithm: &Option<String>,
1455    key: &Option<String>,
1456    md5: &Option<String>,
1457) -> S3Result<Option<crate::sse::CustomerKeyMaterial>> {
1458    match (algorithm, key, md5) {
1459        (None, None, None) => Ok(None),
1460        (Some(a), Some(k), Some(m)) => crate::sse::parse_customer_key_headers(a, k, m)
1461            .map(Some)
1462            .map_err(sse_c_error_to_s3),
1463        _ => Err(S3Error::with_message(
1464            S3ErrorCode::InvalidRequest,
1465            "SSE-C requires all three of: x-amz-server-side-encryption-customer-{algorithm,key,key-MD5}",
1466        )),
1467    }
1468}
1469
1470/// v0.5 #28: detect SSE-KMS request — `x-amz-server-side-encryption: aws:kms`.
1471/// Returns the key-id to wrap under, falling back to the gateway default.
1472fn extract_kms_key_id(
1473    sse: &Option<ServerSideEncryption>,
1474    sse_kms_key_id: &Option<String>,
1475    gateway_default: Option<&str>,
1476) -> Option<String> {
1477    let asks_for_kms = sse
1478        .as_ref()
1479        .map(|s| s.as_str() == ServerSideEncryption::AWS_KMS)
1480        .unwrap_or(false);
1481    if !asks_for_kms {
1482        return None;
1483    }
1484    sse_kms_key_id
1485        .clone()
1486        .or_else(|| gateway_default.map(str::to_owned))
1487}
1488
1489/// v0.5 #28: map kms module errors to AWS-shaped S3 error codes.
1490/// `KeyNotFound` is operator misconfig (400); `BackendUnavailable` is a
1491/// transient KMS outage (503). Other variants are 500 InternalError.
1492fn kms_error_to_s3(e: crate::kms::KmsError) -> S3Error {
1493    use crate::kms::KmsError as K;
1494    match e {
1495        K::KeyNotFound { key_id } => S3Error::with_message(
1496            S3ErrorCode::InvalidArgument,
1497            format!("KMS key not found: {key_id}"),
1498        ),
1499        K::BackendUnavailable { message } => S3Error::with_message(
1500            S3ErrorCode::ServiceUnavailable,
1501            format!("KMS backend unavailable: {message}"),
1502        ),
1503        other => S3Error::with_message(
1504            S3ErrorCode::InternalError,
1505            format!("KMS error: {other}"),
1506        ),
1507    }
1508}
1509
1510/// v0.5 #27: map sse module errors to AWS-shaped S3 error codes.
1511/// `WrongCustomerKey` → 403 AccessDenied (matches AWS behaviour);
1512/// `InvalidCustomerKey` / algorithm / required / unexpected → 400.
1513fn sse_c_error_to_s3(e: crate::sse::SseError) -> S3Error {
1514    use crate::sse::SseError as E;
1515    match e {
1516        E::WrongCustomerKey => S3Error::with_message(
1517            S3ErrorCode::AccessDenied,
1518            "SSE-C key does not match the key used at PUT time",
1519        ),
1520        E::InvalidCustomerKey { reason } => S3Error::with_message(
1521            S3ErrorCode::InvalidArgument,
1522            format!("SSE-C: {reason}"),
1523        ),
1524        E::CustomerKeyAlgorithmUnsupported { algo } => S3Error::with_message(
1525            S3ErrorCode::InvalidArgument,
1526            format!("SSE-C unsupported algorithm: {algo:?} (only AES256 is allowed)"),
1527        ),
1528        E::CustomerKeyRequired => S3Error::with_message(
1529            S3ErrorCode::InvalidRequest,
1530            "object is SSE-C encrypted; supply x-amz-server-side-encryption-customer-* headers",
1531        ),
1532        E::CustomerKeyUnexpected => S3Error::with_message(
1533            S3ErrorCode::InvalidRequest,
1534            "object is not SSE-C encrypted; do not send x-amz-server-side-encryption-customer-* headers",
1535        ),
1536        other => S3Error::with_message(S3ErrorCode::InternalError, format!("SSE error: {other}")),
1537    }
1538}
1539
1540fn extract_manifest(metadata: &Option<Metadata>) -> Option<ChunkManifest> {
1541    let m = metadata.as_ref()?;
1542    let codec = m
1543        .get(META_CODEC)
1544        .and_then(|s| s.parse::<CodecKind>().ok())?;
1545    let original_size = m.get(META_ORIGINAL_SIZE)?.parse().ok()?;
1546    let compressed_size = m.get(META_COMPRESSED_SIZE)?.parse().ok()?;
1547    let crc32c = m.get(META_CRC32C)?.parse().ok()?;
1548    Some(ChunkManifest {
1549        codec,
1550        original_size,
1551        compressed_size,
1552        crc32c,
1553    })
1554}
1555
1556fn write_manifest(metadata: &mut Option<Metadata>, manifest: &ChunkManifest) {
1557    let meta = metadata.get_or_insert_with(Default::default);
1558    meta.insert(META_CODEC.into(), manifest.codec.as_str().into());
1559    meta.insert(
1560        META_ORIGINAL_SIZE.into(),
1561        manifest.original_size.to_string(),
1562    );
1563    meta.insert(
1564        META_COMPRESSED_SIZE.into(),
1565        manifest.compressed_size.to_string(),
1566    );
1567    meta.insert(META_CRC32C.into(), manifest.crc32c.to_string());
1568}
1569
1570fn internal<E: std::fmt::Display>(prefix: &'static str) -> impl FnOnce(E) -> S3Error {
1571    move |e| S3Error::with_message(S3ErrorCode::InternalError, format!("{prefix}: {e}"))
1572}
1573
1574/// v0.6 #41: map a `select::SelectError` to the S3 error surface. AWS
1575/// uses a domain-specific `InvalidSqlExpression` code for parse / unsupported
1576/// errors, but s3s 0.13 doesn't expose that as a typed variant — we
1577/// fall back to the well-known `InvalidRequest` 400 with a descriptive
1578/// message that includes the original error context.
1579fn select_error_to_s3(e: crate::select::SelectError, fmt: &str) -> S3Error {
1580    use crate::select::SelectError;
1581    match e {
1582        SelectError::Parse(msg) => S3Error::with_message(
1583            S3ErrorCode::InvalidRequest,
1584            format!("SQL parse error: {msg}"),
1585        ),
1586        SelectError::UnsupportedFeature(msg) => S3Error::with_message(
1587            S3ErrorCode::InvalidRequest,
1588            format!("unsupported SQL feature: {msg}"),
1589        ),
1590        SelectError::RowEval(msg) => S3Error::with_message(
1591            S3ErrorCode::InvalidRequest,
1592            format!("SQL row evaluation error: {msg}"),
1593        ),
1594        SelectError::InputFormat(msg) => S3Error::with_message(
1595            S3ErrorCode::InvalidRequest,
1596            format!("{fmt} input format error: {msg}"),
1597        ),
1598    }
1599}
1600
1601/// v0.5 #30: parse the `x-amz-bypass-governance-retention` header into a
1602/// boolean flag. AWS S3 accepts `true` (case-insensitive); any other value
1603/// (including missing) is treated as `false`.
1604fn parse_bypass_governance_header(headers: &http::HeaderMap) -> bool {
1605    headers
1606        .get("x-amz-bypass-governance-retention")
1607        .and_then(|v| v.to_str().ok())
1608        .map(|s| s.eq_ignore_ascii_case("true"))
1609        .unwrap_or(false)
1610}
1611
1612/// Convert s3s `Timestamp` into a `chrono::DateTime<Utc>` by formatting it
1613/// as an RFC3339 string and re-parsing through `chrono`. The string format
1614/// avoids pulling the `time` crate (transitive dep of s3s, not declared by
1615/// s4-server) into our direct deps. Returns `None` if the format/parse fails
1616/// or the value is outside `chrono`'s supported range.
1617fn timestamp_to_chrono_utc(ts: &Timestamp) -> Option<chrono::DateTime<chrono::Utc>> {
1618    let mut buf = Vec::new();
1619    ts.format(s3s::dto::TimestampFormat::DateTime, &mut buf).ok()?;
1620    let s = std::str::from_utf8(&buf).ok()?;
1621    chrono::DateTime::parse_from_rfc3339(s)
1622        .ok()
1623        .map(|dt| dt.with_timezone(&chrono::Utc))
1624}
1625
1626/// Inverse of [`timestamp_to_chrono_utc`] — emit RFC3339 (the s3s
1627/// `DateTime` wire format) and re-parse via `Timestamp::parse`.
1628fn chrono_utc_to_timestamp(dt: chrono::DateTime<chrono::Utc>) -> Timestamp {
1629    // chrono's RFC3339 output format matches s3s' parser ("...Z" with
1630    // optional sub-second precision). Fall back to UNIX_EPOCH if anything
1631    // unexpected happens — we never produce malformed strings, so this
1632    // branch is unreachable in practice.
1633    let s = dt.to_rfc3339_opts(chrono::SecondsFormat::Millis, true);
1634    Timestamp::parse(s3s::dto::TimestampFormat::DateTime, &s).unwrap_or_default()
1635}
1636
1637/// v0.6 #39: convert our internal [`crate::tagging::TagSet`] into the
1638/// s3s `Vec<Tag>` wire shape used on `GetObject/BucketTaggingOutput`.
1639/// Both halves of every pair land in the `Some(_)` slot — AWS marks
1640/// the field optional but always populates it on response.
1641fn tagset_to_aws(set: &crate::tagging::TagSet) -> Vec<Tag> {
1642    set.iter()
1643        .map(|(k, v)| Tag {
1644            key: Some(k.clone()),
1645            value: Some(v.clone()),
1646        })
1647        .collect()
1648}
1649
1650/// v0.6 #39: inverse of [`tagset_to_aws`] for input handlers. Missing
1651/// keys / values become empty strings (mirrors AWS, which rejects
1652/// `<Key/>` with InvalidTag at the parser layer; downstream
1653/// `TagSet::validate` then enforces our size limits).
1654fn aws_to_tagset(tags: &[Tag]) -> Result<crate::tagging::TagSet, crate::tagging::TagError> {
1655    let pairs = tags
1656        .iter()
1657        .map(|t| {
1658            (
1659                t.key.clone().unwrap_or_default(),
1660                t.value.clone().unwrap_or_default(),
1661            )
1662        })
1663        .collect();
1664    crate::tagging::TagSet::from_pairs(pairs)
1665}
1666
1667/// `Range` request を decompressed object サイズ `total` に適用して `(start, end_exclusive)`
1668/// を返す。`Range::Int { first, last }` は `bytes=first-last` (last は inclusive)、
1669/// `Range::Suffix { length }` は末尾 `length` byte。S3 仕様に準拠。
1670pub fn resolve_range(range: &s3s::dto::Range, total: u64) -> Result<(u64, u64), String> {
1671    if total == 0 {
1672        return Err("cannot range-get zero-length object".into());
1673    }
1674    match range {
1675        s3s::dto::Range::Int { first, last } => {
1676            let start = *first;
1677            let end_inclusive = match last {
1678                Some(l) => (*l).min(total - 1),
1679                None => total - 1,
1680            };
1681            if start > end_inclusive || start >= total {
1682                return Err(format!(
1683                    "range bytes={start}-{:?} out of object size {total}",
1684                    last
1685                ));
1686            }
1687            Ok((start, end_inclusive + 1))
1688        }
1689        s3s::dto::Range::Suffix { length } => {
1690            let len = (*length).min(total);
1691            Ok((total - len, total))
1692        }
1693    }
1694}
1695
1696#[async_trait::async_trait]
1697impl<B: S3> S3 for S4Service<B> {
1698    // === 圧縮を挟む path (PUT) ===
1699    #[tracing::instrument(
1700        name = "s4.put_object",
1701        skip(self, req),
1702        fields(bucket = %req.input.bucket, key = %req.input.key, codec, bytes_in, bytes_out, latency_ms)
1703    )]
1704    async fn put_object(
1705        &self,
1706        mut req: S3Request<PutObjectInput>,
1707    ) -> S3Result<S3Response<PutObjectOutput>> {
1708        let put_start = Instant::now();
1709        let put_bucket = req.input.bucket.clone();
1710        let put_key = req.input.key.clone();
1711        let access_preamble = self.access_log_preamble(&req);
1712        self.enforce_rate_limit(&req, &put_bucket)?;
1713        // v0.6 #39: parse `x-amz-tagging` (URL-encoded query string) so
1714        // the IAM policy gate sees the request's tags via
1715        // `s3:RequestObjectTag/<key>`. `existing_object_tags` is also
1716        // resolved from the Tagging manager (when wired) so
1717        // `s3:ExistingObjectTag/<key>` works on overwrite.
1718        let request_tags: Option<crate::tagging::TagSet> = req
1719            .input
1720            .tagging
1721            .as_deref()
1722            .map(crate::tagging::parse_tagging_header)
1723            .transpose()
1724            .map_err(|e| S3Error::with_message(S3ErrorCode::InvalidArgument, e.to_string()))?;
1725        let existing_tags: Option<crate::tagging::TagSet> = self
1726            .tagging
1727            .as_ref()
1728            .and_then(|m| m.get_object_tags(&put_bucket, &put_key));
1729        self.enforce_policy_with_extra(
1730            &req,
1731            "s3:PutObject",
1732            &put_bucket,
1733            Some(&put_key),
1734            request_tags.as_ref(),
1735            existing_tags.as_ref(),
1736        )?;
1737        // v0.5 #30: an Object Lock-protected key cannot be overwritten by
1738        // a non-versioned PUT (Suspended / Unversioned bucket). Enabled
1739        // bucket PUTs are exempt because they materialise a fresh
1740        // version under a shadow key (`<key>.__s4ver__/<vid>`) — the
1741        // locked version's bytes are untouched. The check mirrors the
1742        // delete path (Compliance never bypassable, Governance via the
1743        // bypass header, legal hold never).
1744        if let Some(mgr) = self.object_lock.as_ref()
1745            && let Some(state) = mgr.get(&put_bucket, &put_key)
1746        {
1747            let bucket_versioned_enabled = self
1748                .versioning
1749                .as_ref()
1750                .map(|v| v.state(&put_bucket) == crate::versioning::VersioningState::Enabled)
1751                .unwrap_or(false);
1752            if !bucket_versioned_enabled {
1753                let bypass = parse_bypass_governance_header(&req.headers);
1754                let now = chrono::Utc::now();
1755                if !state.can_delete(now, bypass) {
1756                    crate::metrics::record_policy_denial("s3:PutObject", &put_bucket);
1757                    return Err(S3Error::with_message(
1758                        S3ErrorCode::AccessDenied,
1759                        "Access Denied because object protected by object lock",
1760                    ));
1761                }
1762            }
1763        }
1764        // v0.5 #30: per-PUT explicit retention / legal hold (S3
1765        // `x-amz-object-lock-mode`, `x-amz-object-lock-retain-until-date`,
1766        // `x-amz-object-lock-legal-hold`). Captured before the body
1767        // moves into the backend; persisted into the manager only on
1768        // backend success below.
1769        let explicit_lock_mode: Option<crate::object_lock::LockMode> = req
1770            .input
1771            .object_lock_mode
1772            .as_ref()
1773            .and_then(|m| crate::object_lock::LockMode::from_aws_str(m.as_str()));
1774        let explicit_retain_until: Option<chrono::DateTime<chrono::Utc>> = req
1775            .input
1776            .object_lock_retain_until_date
1777            .as_ref()
1778            .and_then(timestamp_to_chrono_utc);
1779        let explicit_legal_hold_on: Option<bool> = req
1780            .input
1781            .object_lock_legal_hold_status
1782            .as_ref()
1783            .map(|s| s.as_str().eq_ignore_ascii_case("ON"));
1784        if let Some(blob) = req.input.body.take() {
1785            // Sample 4 KiB から codec を決定。streaming-aware codec なら streaming
1786            // compress fast path、そうでなければ従来の collect-then-compress。
1787            let (sample, rest_stream) = peek_sample(blob, SAMPLE_BYTES)
1788                .await
1789                .map_err(internal("peek put sample"))?;
1790            let sample_len = sample.len().min(SAMPLE_BYTES);
1791            // v0.8 #56: pass the request's Content-Length (when present) so
1792            // the sampling dispatcher can promote large objects to a GPU
1793            // codec. Chunked transfers (no Content-Length) keep CPU.
1794            let total_size_hint = req.input.content_length.and_then(|n| u64::try_from(n).ok());
1795            let kind = self
1796                .dispatcher
1797                .pick_with_size_hint(&sample[..sample_len], total_size_hint)
1798                .await;
1799
1800            // Passthrough buys nothing from S4F2 wrapping (no compression =
1801            // no per-chunk frame to skip past) and the +28-byte header
1802            // overhead breaks size-sensitive callers that expect a true
1803            // pass-through. So passthrough always uses the legacy raw-blob
1804            // path; only compressing codecs go through the framed path.
1805            let use_framed = supports_streaming_compress(kind) && kind != CodecKind::Passthrough;
1806            let (compressed, manifest, is_framed) = if use_framed {
1807                // streaming fast path: input は memory に collect しない
1808                let chained = chain_sample_with_rest(sample, rest_stream);
1809                debug!(
1810                    bucket = ?req.input.bucket,
1811                    key = ?req.input.key,
1812                    codec = kind.as_str(),
1813                    path = "streaming-framed",
1814                    "S4 put_object: compressing (streaming, S4F2 multi-frame)"
1815                );
1816                // v0.4 #16: pick the chunk size based on the request's
1817                // Content-Length when known, falling back to the 4 MiB
1818                // default for chunked transfers.
1819                let chunk_size = pick_chunk_size(req.input.content_length.map(|n| n as u64));
1820                let (body, manifest) = streaming_compress_to_frames(
1821                    chained,
1822                    Arc::clone(&self.registry),
1823                    kind,
1824                    chunk_size,
1825                )
1826                .await
1827                .map_err(internal("streaming framed compress"))?;
1828                (body, manifest, true)
1829            } else {
1830                // GPU codec 等で streaming-aware でないものは bytes-buffered path
1831                // (raw 圧縮 bytes、framed なし — back-compat 互換 path)
1832                let bytes = collect_with_sample(sample, rest_stream, self.max_body_bytes)
1833                    .await
1834                    .map_err(internal("collect put body (buffered path)"))?;
1835                debug!(
1836                    bucket = ?req.input.bucket,
1837                    key = ?req.input.key,
1838                    bytes = bytes.len(),
1839                    codec = kind.as_str(),
1840                    path = "buffered",
1841                    "S4 put_object: compressing (buffered, raw blob)"
1842                );
1843                // v0.8 #55: telemetry-returning compress so we can stamp
1844                // GPU-pipeline Prometheus metrics (`s4_gpu_compress_seconds`,
1845                // throughput gauge, OOM counter) for nvcomp / dietgpu codecs.
1846                // CPU codecs come back with `gpu_seconds = None` and the
1847                // stamp helper short-circuits — no extra cost on CPU path.
1848                let (compress_res, tel) =
1849                    self.registry.compress_with_telemetry(bytes, kind).await;
1850                stamp_gpu_compress_telemetry(&tel);
1851                let (body, m) = compress_res.map_err(internal("registry compress"))?;
1852                (body, m, false)
1853            };
1854
1855            write_manifest(&mut req.input.metadata, &manifest);
1856            if is_framed {
1857                // v0.2 #4: framed body であることを GET 側に伝える meta flag。
1858                req.input
1859                    .metadata
1860                    .get_or_insert_with(Default::default)
1861                    .insert(META_FRAMED.into(), "true".into());
1862            }
1863            // 重要: content_length を圧縮後サイズで更新する。
1864            // これを忘れると下流 (aws-sdk-s3 → S3) が宣言サイズ分の bytes を
1865            // 待ち続けて RequestTimeout で失敗する (S3 仕様)。
1866            req.input.content_length = Some(compressed.len() as i64);
1867            // body を書き換えたので、客側が送ってきた original body 用の
1868            // checksum / MD5 ヘッダは無効化する (そのまま転送すると下流 S3 が
1869            // XAmzContentChecksumMismatch を返す)。S4 自身の整合性は
1870            // ChunkManifest.crc32c で担保している。
1871            req.input.checksum_algorithm = None;
1872            req.input.checksum_crc32 = None;
1873            req.input.checksum_crc32c = None;
1874            req.input.checksum_crc64nvme = None;
1875            req.input.checksum_sha1 = None;
1876            req.input.checksum_sha256 = None;
1877            req.input.content_md5 = None;
1878            let original_size = manifest.original_size;
1879            let compressed_size = manifest.compressed_size;
1880            let codec_label = manifest.codec.as_str();
1881            // framed body は GET 側で sidecar partial-fetch を効かせるため
1882            // build_index_from_body で sidecar を組み立てて backend に PUT する。
1883            let sidecar_index = if is_framed {
1884                s4_codec::index::build_index_from_body(&compressed).ok()
1885            } else {
1886                None
1887            };
1888            // v0.4 #21 / v0.5 #29 / v0.5 #27: encrypt-after-compress.
1889            // Precedence:
1890            //   - SSE-C headers present → per-request customer key (S4E3)
1891            //   - server-managed keyring configured → active key (S4E2)
1892            //   - neither → no encryption (raw compressed body)
1893            // The `s4-encrypted: aes-256-gcm` metadata flag is set in
1894            // both encrypted modes; the on-disk frame magic distinguishes
1895            // S4E1 / S4E2 / S4E3 so GET picks the right decrypt path.
1896            // v0.7 #48 BUG-2/3 fix: take() the SSE fields off req.input
1897            // so the encryption headers are NOT forwarded to the
1898            // backend. S4 owns the encrypt-then-store contract; if we
1899            // leave the headers in place, real S3-compat backends
1900            // (MinIO / AWS) try to apply their own SSE on top and
1901            // either reject (MinIO requires HTTPS for SSE-C) or fail
1902            // (MinIO has no KMS configured). MemoryBackend ignored
1903            // these so mock tests passed.
1904            let sse_c_alg = req.input.sse_customer_algorithm.take();
1905            let sse_c_key = req.input.sse_customer_key.take();
1906            let sse_c_md5 = req.input.sse_customer_key_md5.take();
1907            let sse_header = req.input.server_side_encryption.take();
1908            let sse_kms_key = req.input.ssekms_key_id.take();
1909            let sse_c_material = extract_sse_c_material(&sse_c_alg, &sse_c_key, &sse_c_md5)?;
1910            // v0.5 #28: SSE-KMS request? Resolves to None unless the
1911            // request asks for `aws:kms` AND a key id is available
1912            // (explicit header or gateway default). When set, we'll
1913            // generate a per-object DEK below.
1914            let kms_key_id = extract_kms_key_id(
1915                &sse_header,
1916                &sse_kms_key,
1917                self.kms_default_key_id.as_deref(),
1918            );
1919            // v0.5 #32: in compliance-strict mode, every PUT must
1920            // declare SSE — either client-supplied (SSE-C), KMS, or by
1921            // virtue of a server-side keyring being configured (which
1922            // applies SSE-S4 to every PUT automatically). Requests that
1923            // would otherwise land as plain compressed bytes are
1924            // rejected with 400 InvalidRequest.
1925            if self.compliance_strict
1926                && sse_c_material.is_none()
1927                && kms_key_id.is_none()
1928                && self.sse_keyring.is_none()
1929                && sse_header.as_ref().map(|s| s.as_str())
1930                    != Some(ServerSideEncryption::AES256)
1931            {
1932                return Err(S3Error::with_message(
1933                    S3ErrorCode::InvalidRequest,
1934                    "compliance-mode strict: PUT must include x-amz-server-side-encryption \
1935                     (AES256 or aws:kms) or x-amz-server-side-encryption-customer-* headers",
1936                ));
1937            }
1938            // SSE-C and SSE-KMS are mutually exclusive on a single PUT
1939            // (AWS S3 returns 400 InvalidArgument). SSE-C wins by spec.
1940            if sse_c_material.is_some() && kms_key_id.is_some() {
1941                return Err(S3Error::with_message(
1942                    S3ErrorCode::InvalidArgument,
1943                    "SSE-C and SSE-KMS cannot be used together on the same PUT",
1944                ));
1945            }
1946            // KMS path needs to call generate_dek().await before the
1947            // body_to_send branch; capture the result here.
1948            //
1949            // v0.8.1 #58: the plaintext DEK lives in three places
1950            // during one PUT:
1951            //
1952            //   1. The `Zeroizing<Vec<u8>>` returned by `generate_dek`
1953            //      — wiped when the binding `dek` falls out of scope at
1954            //      the end of this `if`-arm.
1955            //   2. The stack `[u8; 32]` we copy into for `SseSource::Kms`
1956            //      — wrapped in `Zeroizing<[u8; 32]>` so it's wiped when
1957            //      the outer `kms_wrap` `Option` is dropped at the end
1958            //      of `put_object`.
1959            //   3. AES-GCM internal key state inside the `aes-gcm`
1960            //      crate during `encrypt_with_source` — out of scope
1961            //      for this fix; tracked separately in v0.8.2.
1962            let kms_wrap: Option<(zeroize::Zeroizing<[u8; 32]>, crate::kms::WrappedDek)> =
1963                if let Some(ref key_id) = kms_key_id {
1964                let kms = self.kms.as_ref().ok_or_else(|| {
1965                    S3Error::with_message(
1966                        S3ErrorCode::InvalidRequest,
1967                        "SSE-KMS requested but no --kms-local-dir / --kms-aws-region is configured on this gateway",
1968                    )
1969                })?;
1970                // `dek` is `Zeroizing<Vec<u8>>`; deref + slice access
1971                // works unchanged via `Deref<Target=Vec<u8>>`.
1972                let (dek, wrapped) = kms
1973                    .generate_dek(key_id)
1974                    .await
1975                    .map_err(kms_error_to_s3)?;
1976                if dek.len() != 32 {
1977                    return Err(S3Error::with_message(
1978                        S3ErrorCode::InternalError,
1979                        format!("KMS backend returned a DEK of {} bytes (expected 32)", dek.len()),
1980                    ));
1981                }
1982                let mut dek_arr: zeroize::Zeroizing<[u8; 32]> =
1983                    zeroize::Zeroizing::new([0u8; 32]);
1984                dek_arr.copy_from_slice(&dek);
1985                // `dek` (the `Zeroizing<Vec<u8>>`) is dropped at the
1986                // end of this scope, wiping the heap allocation.
1987                Some((dek_arr, wrapped))
1988            } else {
1989                None
1990            };
1991            // v0.7 #48 BUG-4 fix: stamp the SSE *type* into metadata
1992            // alongside `s4-encrypted` so HEAD (which doesn't fetch the
1993            // body) can echo the correct `x-amz-server-side-encryption`
1994            // value. Without this, HEAD on an SSE-KMS object would not
1995            // echo `aws:kms` because the frame magic is only available
1996            // on the body (which HEAD doesn't read).
1997            let body_to_send = if let Some(ref m) = sse_c_material {
1998                let meta = req.input.metadata.get_or_insert_with(Default::default);
1999                meta.insert("s4-encrypted".into(), "aes-256-gcm".into());
2000                meta.insert("s4-sse-type".into(), "AES256".into());
2001                meta.insert("s4-sse-c-key-md5".into(),
2002                    base64::engine::general_purpose::STANDARD.encode(m.key_md5));
2003                crate::sse::encrypt_with_source(
2004                    &compressed,
2005                    crate::sse::SseSource::CustomerKey {
2006                        key: &m.key,
2007                        key_md5: &m.key_md5,
2008                    },
2009                )
2010            } else if let Some((ref dek, ref wrapped)) = kms_wrap {
2011                let meta = req.input.metadata.get_or_insert_with(Default::default);
2012                meta.insert("s4-encrypted".into(), "aes-256-gcm".into());
2013                meta.insert("s4-sse-type".into(), "aws:kms".into());
2014                meta.insert("s4-sse-kms-key-id".into(), wrapped.key_id.clone());
2015                // v0.8.1 #58: `dek` is `&Zeroizing<[u8; 32]>`; `SseSource::Kms`
2016                // wants `&[u8; 32]`. Rust auto-derefs `&Zeroizing<T>` to
2017                // `&T` here via `Deref<Target=T>`, so the binding picks
2018                // up the inner array reference without copying. The array
2019                // stays in the `Zeroizing` wrapper that owns it and gets
2020                // wiped when `kms_wrap` drops at the end of `put_object`.
2021                let dek_ref: &[u8; 32] = dek;
2022                crate::sse::encrypt_with_source(
2023                    &compressed,
2024                    crate::sse::SseSource::Kms { dek: dek_ref, wrapped },
2025                )
2026            } else if let Some(keyring) = self.sse_keyring.as_ref() {
2027                // SSE-S4 is server-driven transparent encryption; the
2028                // client didn't ask for SSE. We stamp `s4-encrypted`
2029                // (internal flag the GET path needs) but deliberately
2030                // do NOT stamp `s4-sse-type` — that lights up the HEAD
2031                // echo of `x-amz-server-side-encryption: AES256`,
2032                // which would falsely advertise AWS-style SSE-S3
2033                // semantics the operator didn't request.
2034                let meta = req.input.metadata.get_or_insert_with(Default::default);
2035                meta.insert("s4-encrypted".into(), "aes-256-gcm".into());
2036                // v0.8 #52: when `--sse-chunk-size > 0` is configured,
2037                // emit the chunked S4E5 frame so the matching GET can
2038                // stream-decrypt instead of buffering 5 GiB before
2039                // emitting a byte. Falls back to the buffered S4E2
2040                // frame at chunk_size=0 (default) so existing
2041                // deployments are bit-for-bit unchanged.
2042                if self.sse_chunk_size > 0 {
2043                    crate::sse::encrypt_v2_chunked(
2044                        &compressed,
2045                        keyring,
2046                        self.sse_chunk_size,
2047                    )
2048                    .map_err(|e| {
2049                        S3Error::with_message(
2050                            S3ErrorCode::InternalError,
2051                            format!("SSE-S4 chunked encrypt failed: {e}"),
2052                        )
2053                    })?
2054                } else {
2055                    crate::sse::encrypt_v2(&compressed, keyring)
2056                }
2057            } else {
2058                compressed.clone()
2059            };
2060            // v0.6 #40: capture the about-to-be-sent body + metadata so
2061            // the replication dispatcher (run after the source PUT
2062            // succeeds) can hand the same backend bytes to the
2063            // destination bucket. `Bytes` clone is cheap (refcounted).
2064            let replication_body = body_to_send.clone();
2065            let replication_metadata = req.input.metadata.clone();
2066            // v0.7 #48 BUG-1 fix: SSE encryption (S4E1/E2/E3/E4 frames)
2067            // makes the body longer than the post-compression bytes
2068            // (header + nonce + tag overhead). The earlier
2069            // content_length stamp at compressed.len() is now stale, so
2070            // re-stamp from the actual bytes about to be sent or the
2071            // backend (real S3 / MinIO) rejects with
2072            // `StreamLengthMismatch`. MemoryBackend never validated
2073            // this, which is why mock-only tests passed.
2074            req.input.content_length = Some(body_to_send.len() as i64);
2075            req.input.body = Some(bytes_to_blob(body_to_send));
2076            // v0.5 #34: pre-allocate a version-id when the bucket is
2077            // Enabled, then redirect the backend storage key to the
2078            // shadow path so older versions survive newer PUTs.
2079            // Suspended / Unversioned buckets keep using the plain
2080            // `<key>` (S3 spec: Suspended overwrites the same backend
2081            // object). Pre-allocation (instead of recording after PUT)
2082            // ensures the shadow key + the response's
2083            // `x-amz-version-id` use the same vid.
2084            let pending_version: Option<crate::versioning::PutOutcome> = self
2085                .versioning
2086                .as_ref()
2087                .map(|mgr| mgr.state(&put_bucket))
2088                .map(|state| match state {
2089                    crate::versioning::VersioningState::Enabled => {
2090                        crate::versioning::PutOutcome {
2091                            version_id: crate::versioning::VersioningManager::new_version_id(),
2092                            versioned_response: true,
2093                        }
2094                    }
2095                    crate::versioning::VersioningState::Suspended
2096                    | crate::versioning::VersioningState::Unversioned => {
2097                        crate::versioning::PutOutcome {
2098                            version_id: crate::versioning::NULL_VERSION_ID.to_owned(),
2099                            versioned_response: false,
2100                        }
2101                    }
2102                });
2103            if let Some(ref pv) = pending_version
2104                && pv.versioned_response
2105            {
2106                req.input.key = versioned_shadow_key(&put_key, &pv.version_id);
2107            }
2108            let mut backend_resp = self.backend.put_object(req).await;
2109            if let Some(idx) = sidecar_index
2110                && backend_resp.is_ok()
2111                && idx.entries.len() > 1
2112            {
2113                // 1 chunk しかない (small object) なら sidecar は意味がない (=
2114                // partial fetch しても full body と同じ範囲) ので省略。
2115                // Sidecar は user-visible key で書く (latest version の
2116                // partial fetch path 用)。Old versions の Range GET は今 task
2117                // の scope 外 (full read fallback でも意味的には正しい)。
2118                self.write_sidecar(&put_bucket, &put_key, &idx).await;
2119            }
2120            // v0.5 #34: commit the new version into the manager only on
2121            // backend success. Use the pre-allocated vid so the response
2122            // header and the chain entry agree.
2123            if let (Some(mgr), Some(pv), Ok(resp)) = (
2124                self.versioning.as_ref(),
2125                pending_version.as_ref(),
2126                backend_resp.as_mut(),
2127            ) {
2128                let etag = resp
2129                    .output
2130                    .e_tag
2131                    .clone()
2132                    .map(ETag::into_value)
2133                    .unwrap_or_else(|| format!("\"crc32c-{}\"", manifest.crc32c));
2134                let now = chrono::Utc::now();
2135                mgr.commit_put_with_version(
2136                    &put_bucket,
2137                    &put_key,
2138                    crate::versioning::VersionEntry {
2139                        version_id: pv.version_id.clone(),
2140                        etag,
2141                        size: original_size,
2142                        is_delete_marker: false,
2143                        created_at: now,
2144                    },
2145                );
2146                if pv.versioned_response {
2147                    resp.output.version_id = Some(pv.version_id.clone());
2148                }
2149            }
2150            // v0.5 #27: AWS S3 echoes the SSE-C headers back on success
2151            // so the client knows the server actually applied the
2152            // requested algorithm and which key fingerprint matched.
2153            if let (Some(m), Ok(resp)) = (sse_c_material.as_ref(), backend_resp.as_mut()) {
2154                resp.output.sse_customer_algorithm = Some(crate::sse::SSE_C_ALGORITHM.into());
2155                resp.output.sse_customer_key_md5 = Some(
2156                    base64::engine::general_purpose::STANDARD.encode(m.key_md5),
2157                );
2158            }
2159            // v0.5 #28: SSE-KMS echo — `aws:kms` + the canonical key id
2160            // the backend returned (AWS KMS returns the ARN even when
2161            // the request used an alias).
2162            if let (Some((_, wrapped)), Ok(resp)) =
2163                (kms_wrap.as_ref(), backend_resp.as_mut())
2164            {
2165                resp.output.server_side_encryption =
2166                    Some(ServerSideEncryption::from_static(ServerSideEncryption::AWS_KMS));
2167                resp.output.ssekms_key_id = Some(wrapped.key_id.clone());
2168            }
2169            // v0.5 #30: persist any per-PUT explicit retention / legal
2170            // hold the client supplied, then auto-apply the bucket
2171            // default (no-op when state is already populated). The
2172            // explicit fields take precedence — the bucket-default
2173            // helper bails out as soon as it sees any retention.
2174            if let (Some(mgr), Ok(_)) = (self.object_lock.as_ref(), backend_resp.as_ref()) {
2175                if explicit_lock_mode.is_some()
2176                    || explicit_retain_until.is_some()
2177                    || explicit_legal_hold_on.is_some()
2178                {
2179                    let mut state = mgr.get(&put_bucket, &put_key).unwrap_or_default();
2180                    if let Some(m) = explicit_lock_mode {
2181                        state.mode = Some(m);
2182                    }
2183                    if let Some(u) = explicit_retain_until {
2184                        state.retain_until = Some(u);
2185                    }
2186                    if let Some(lh) = explicit_legal_hold_on {
2187                        state.legal_hold_on = lh;
2188                    }
2189                    mgr.set(&put_bucket, &put_key, state);
2190                }
2191                mgr.apply_default_on_put(&put_bucket, &put_key, chrono::Utc::now());
2192            }
2193            let _ = (original_size, compressed_size); // mute unused warnings
2194            let elapsed = put_start.elapsed();
2195            crate::metrics::record_put(
2196                codec_label,
2197                original_size,
2198                compressed_size,
2199                elapsed.as_secs_f64(),
2200                backend_resp.is_ok(),
2201            );
2202            // v0.4 #20: structured access-log entry (best-effort).
2203            self.record_access(
2204                access_preamble,
2205                "REST.PUT.OBJECT",
2206                &put_bucket,
2207                Some(&put_key),
2208                if backend_resp.is_ok() { 200 } else { 500 },
2209                compressed_size,
2210                original_size,
2211                elapsed.as_millis() as u64,
2212                backend_resp.as_ref().err().map(|e| e.code().as_str()),
2213            )
2214            .await;
2215            info!(
2216                op = "put_object",
2217                bucket = %put_bucket,
2218                key = %put_key,
2219                codec = codec_label,
2220                bytes_in = original_size,
2221                bytes_out = compressed_size,
2222                ratio = format!(
2223                    "{:.3}",
2224                    if original_size == 0 { 1.0 } else { compressed_size as f64 / original_size as f64 }
2225                ),
2226                latency_ms = elapsed.as_millis() as u64,
2227                ok = backend_resp.is_ok(),
2228                "S4 put completed"
2229            );
2230            // v0.6 #35: fire bucket-notification destinations (best-effort,
2231            // detached). Skipped when no manager is attached or when the
2232            // bucket has no rule matching `s3:ObjectCreated:Put` for this
2233            // key.
2234            if backend_resp.is_ok()
2235                && let Some(mgr) = self.notifications.as_ref()
2236            {
2237                let dests = mgr.match_destinations(
2238                    &put_bucket,
2239                    &crate::notifications::EventType::ObjectCreatedPut,
2240                    &put_key,
2241                );
2242                if !dests.is_empty() {
2243                    let etag = backend_resp
2244                        .as_ref()
2245                        .ok()
2246                        .and_then(|r| r.output.e_tag.clone())
2247                        .map(ETag::into_value);
2248                    let version_id = pending_version
2249                        .as_ref()
2250                        .filter(|pv| pv.versioned_response)
2251                        .map(|pv| pv.version_id.clone());
2252                    tokio::spawn(crate::notifications::dispatch_event(
2253                        Arc::clone(mgr),
2254                        put_bucket.clone(),
2255                        put_key.clone(),
2256                        crate::notifications::EventType::ObjectCreatedPut,
2257                        Some(original_size),
2258                        etag,
2259                        version_id,
2260                        format!("S4-{}", uuid::Uuid::new_v4()),
2261                    ));
2262                }
2263            }
2264            // v0.6 #39: persist parsed `x-amz-tagging` tags into the
2265            // tagging manager on a successful PUT. AWS PutObject's
2266            // tagging is a full-replace operation (not a merge), so
2267            // any pre-existing entry for `(bucket, key)` is overwritten.
2268            if backend_resp.is_ok()
2269                && let (Some(mgr), Some(tags)) =
2270                    (self.tagging.as_ref(), request_tags.clone())
2271            {
2272                mgr.put_object_tags(&put_bucket, &put_key, tags);
2273            }
2274            // v0.6 #40: cross-bucket replication fire-point. On
2275            // successful source PUT, consult the replication manager;
2276            // when an enabled rule matches, mark the source key
2277            // `Pending` and spawn a detached task that PUTs the same
2278            // backend bytes + metadata to the rule's destination
2279            // bucket. The dispatcher itself records `Completed` /
2280            // `Failed` and bumps the drop counter on retry-budget
2281            // exhaustion.
2282            self.spawn_replication_if_matched(
2283                &put_bucket,
2284                &put_key,
2285                &request_tags,
2286                &replication_body,
2287                &replication_metadata,
2288                backend_resp.is_ok(),
2289            );
2290            return backend_resp;
2291        }
2292        // Body-less PUT (rare: zero-length object). Mirror the body-full
2293        // versioning hooks so list_object_versions / GET-by-version still see
2294        // empty-body objects in the chain.
2295        let pending_version: Option<crate::versioning::PutOutcome> = self
2296            .versioning
2297            .as_ref()
2298            .map(|mgr| mgr.state(&put_bucket))
2299            .map(|state| match state {
2300                crate::versioning::VersioningState::Enabled => crate::versioning::PutOutcome {
2301                    version_id: crate::versioning::VersioningManager::new_version_id(),
2302                    versioned_response: true,
2303                },
2304                _ => crate::versioning::PutOutcome {
2305                    version_id: crate::versioning::NULL_VERSION_ID.to_owned(),
2306                    versioned_response: false,
2307                },
2308            });
2309        if let Some(ref pv) = pending_version
2310            && pv.versioned_response
2311        {
2312            req.input.key = versioned_shadow_key(&put_key, &pv.version_id);
2313        }
2314        let mut backend_resp = self.backend.put_object(req).await;
2315        if let (Some(mgr), Some(pv), Ok(resp)) = (
2316            self.versioning.as_ref(),
2317            pending_version.as_ref(),
2318            backend_resp.as_mut(),
2319        ) {
2320            let etag = resp
2321                .output
2322                .e_tag
2323                .clone()
2324                .map(ETag::into_value)
2325                .unwrap_or_default();
2326            let now = chrono::Utc::now();
2327            mgr.commit_put_with_version(
2328                &put_bucket,
2329                &put_key,
2330                crate::versioning::VersionEntry {
2331                    version_id: pv.version_id.clone(),
2332                    etag,
2333                    size: 0,
2334                    is_delete_marker: false,
2335                    created_at: now,
2336                },
2337            );
2338            if pv.versioned_response {
2339                resp.output.version_id = Some(pv.version_id.clone());
2340            }
2341        }
2342        // v0.5 #30: same explicit-then-default lock-state commit as the
2343        // body-bearing branch above, so a zero-length PUT also picks up
2344        // bucket-default retention.
2345        if let (Some(mgr), Ok(_)) = (self.object_lock.as_ref(), backend_resp.as_ref()) {
2346            if explicit_lock_mode.is_some()
2347                || explicit_retain_until.is_some()
2348                || explicit_legal_hold_on.is_some()
2349            {
2350                let mut state = mgr.get(&put_bucket, &put_key).unwrap_or_default();
2351                if let Some(m) = explicit_lock_mode {
2352                    state.mode = Some(m);
2353                }
2354                if let Some(u) = explicit_retain_until {
2355                    state.retain_until = Some(u);
2356                }
2357                if let Some(lh) = explicit_legal_hold_on {
2358                    state.legal_hold_on = lh;
2359                }
2360                mgr.set(&put_bucket, &put_key, state);
2361            }
2362            mgr.apply_default_on_put(&put_bucket, &put_key, chrono::Utc::now());
2363        }
2364        // v0.6 #35: same notification fire-point as the body-bearing PUT
2365        // branch above (zero-length objects still match `ObjectCreated:Put`
2366        // rules per the AWS event taxonomy).
2367        if backend_resp.is_ok()
2368            && let Some(mgr) = self.notifications.as_ref()
2369        {
2370            let dests = mgr.match_destinations(
2371                &put_bucket,
2372                &crate::notifications::EventType::ObjectCreatedPut,
2373                &put_key,
2374            );
2375            if !dests.is_empty() {
2376                let etag = backend_resp
2377                    .as_ref()
2378                    .ok()
2379                    .and_then(|r| r.output.e_tag.clone())
2380                    .map(ETag::into_value);
2381                let version_id = pending_version
2382                    .as_ref()
2383                    .filter(|pv| pv.versioned_response)
2384                    .map(|pv| pv.version_id.clone());
2385                tokio::spawn(crate::notifications::dispatch_event(
2386                    Arc::clone(mgr),
2387                    put_bucket.clone(),
2388                    put_key.clone(),
2389                    crate::notifications::EventType::ObjectCreatedPut,
2390                    Some(0),
2391                    etag,
2392                    version_id,
2393                    format!("S4-{}", uuid::Uuid::new_v4()),
2394                ));
2395            }
2396        }
2397        // v0.6 #39: persist parsed `x-amz-tagging` for the body-less
2398        // (zero-length) PUT branch too — same shape as the body-bearing
2399        // branch above.
2400        if backend_resp.is_ok()
2401            && let (Some(mgr), Some(tags)) = (self.tagging.as_ref(), request_tags.clone())
2402        {
2403            mgr.put_object_tags(&put_bucket, &put_key, tags);
2404        }
2405        // v0.6 #40: cross-bucket replication for the zero-length PUT
2406        // branch — same shape as the body-bearing branch above.
2407        self.spawn_replication_if_matched(
2408            &put_bucket,
2409            &put_key,
2410            &request_tags,
2411            &bytes::Bytes::new(),
2412            &None,
2413            backend_resp.is_ok(),
2414        );
2415        backend_resp
2416    }
2417
2418    // === 圧縮を解く path (GET) ===
2419    #[tracing::instrument(
2420        name = "s4.get_object",
2421        skip(self, req),
2422        fields(bucket = %req.input.bucket, key = %req.input.key, codec, bytes_out, range, path)
2423    )]
2424    async fn get_object(
2425        &self,
2426        mut req: S3Request<GetObjectInput>,
2427    ) -> S3Result<S3Response<GetObjectOutput>> {
2428        let get_start = Instant::now();
2429        let get_bucket = req.input.bucket.clone();
2430        let get_key = req.input.key.clone();
2431        self.enforce_rate_limit(&req, &get_bucket)?;
2432        self.enforce_policy(&req, "s3:GetObject", &get_bucket, Some(&get_key))?;
2433        // Range request の事前検出 (decompress 後 slice する path に使う)。
2434        let range_request = req.input.range.take();
2435        // v0.5 #27: pull SSE-C material from the input headers before
2436        // the request is moved into the backend. A header parse error
2437        // fails fast (no body fetch). The material is consumed below
2438        // when decrypting an S4E3-framed body; the SSE-C headers on
2439        // `req.input` are cleared so the backend doesn't see them.
2440        let sse_c_alg = req.input.sse_customer_algorithm.take();
2441        let sse_c_key = req.input.sse_customer_key.take();
2442        let sse_c_md5 = req.input.sse_customer_key_md5.take();
2443        let get_sse_c_material =
2444            extract_sse_c_material(&sse_c_alg, &sse_c_key, &sse_c_md5)?;
2445
2446        // v0.5 #34: route the GET through the VersioningManager when
2447        // attached AND the bucket is in a versioning-aware state.
2448        // Resolves which version to fetch (explicit `?versionId=` query
2449        // param vs. chain latest), translates a delete-marker into 404
2450        // NoSuchKey, and rewrites the backend storage key to the shadow
2451        // path (`<key>.__s4ver__/<vid>`) for non-null Enabled-bucket
2452        // versions. `resolved_version_id` is stamped onto the response
2453        // so clients see a coherent `x-amz-version-id` header.
2454        //
2455        // When the bucket is Unversioned (or no manager attached), the
2456        // chain-resolution step is skipped and the request flows
2457        // through the existing single-key path unchanged.
2458        let resolved_version_id: Option<String> = match self.versioning.as_ref() {
2459            Some(mgr)
2460                if mgr.state(&get_bucket) != crate::versioning::VersioningState::Unversioned =>
2461            {
2462                let req_vid = req.input.version_id.take();
2463                let entry = match req_vid.as_deref() {
2464                    Some(vid) => mgr.lookup_version(&get_bucket, &get_key, vid).ok_or_else(
2465                        || S3Error::with_message(
2466                            S3ErrorCode::NoSuchVersion,
2467                            format!("no such version: {vid}"),
2468                        ),
2469                    )?,
2470                    None => mgr.lookup_latest(&get_bucket, &get_key).ok_or_else(|| {
2471                        S3Error::with_message(
2472                            S3ErrorCode::NoSuchKey,
2473                            format!("no such key: {get_key}"),
2474                        )
2475                    })?,
2476                };
2477                if entry.is_delete_marker {
2478                    // S3 spec: GET without versionId on a
2479                    // delete-marker latest → 404 NoSuchKey + the
2480                    // response carries `x-amz-delete-marker: true`.
2481                    // GET with explicit versionId pointing at a delete
2482                    // marker → 405 MethodNotAllowed; we surface
2483                    // NoSuchKey here for both since s3s collapses them
2484                    // into the same not-found error path.
2485                    return Err(S3Error::with_message(
2486                        S3ErrorCode::NoSuchKey,
2487                        format!("delete marker is the current version of {get_key}"),
2488                    ));
2489                }
2490                if entry.version_id != crate::versioning::NULL_VERSION_ID {
2491                    req.input.key = versioned_shadow_key(&get_key, &entry.version_id);
2492                }
2493                Some(entry.version_id)
2494            }
2495            _ => None,
2496        };
2497
2498        // ====== Range GET の partial-fetch fast path (sidecar index 利用) ======
2499        // sidecar `<key>.s4index` が存在し、multipart-framed object であれば
2500        // 必要 frame だけを backend に Range GET し帯域節約する。
2501        if let Some(ref r) = range_request
2502            && let Some(index) = self.read_sidecar(&req.input.bucket, &req.input.key).await
2503        {
2504            let total = index.total_original_size();
2505            let (start, end_exclusive) = match resolve_range(r, total) {
2506                Ok(v) => v,
2507                Err(e) => {
2508                    return Err(S3Error::with_message(S3ErrorCode::InvalidRange, e));
2509                }
2510            };
2511            if let Some(plan) = index.lookup_range(start, end_exclusive) {
2512                return self
2513                    .partial_range_get(&req, plan, start, end_exclusive, total, get_start)
2514                    .await;
2515            }
2516        }
2517        let mut resp = self.backend.get_object(req).await?;
2518        // v0.5 #34: stamp the resolved version-id so the client sees a
2519        // coherent `x-amz-version-id` header (only for chains owned by
2520        // the manager — Unversioned buckets / no-manager paths never
2521        // set this).
2522        if let Some(ref vid) = resolved_version_id {
2523            resp.output.version_id = Some(vid.clone());
2524        }
2525        let is_multipart = is_multipart_object(&resp.output.metadata);
2526        let is_framed_v2 = is_framed_v2_object(&resp.output.metadata);
2527        // v0.2 #4: framed-v2 single-PUT は多 frame parse が必要なので
2528        // multipart と同じ path に流す。
2529        let needs_frame_parse = is_multipart || is_framed_v2;
2530        let manifest_opt = extract_manifest(&resp.output.metadata);
2531
2532        if !needs_frame_parse && manifest_opt.is_none() {
2533            // S4 が書いていないオブジェクトは透過 (raw bucket pre-existing object 等)
2534            debug!("S4 get_object: object lacks s4-codec metadata, returning as-is");
2535            return Ok(resp);
2536        }
2537
2538        if let Some(blob) = resp.output.body.take() {
2539            // v0.4 #21 / v0.5 #27: if the object was stored under SSE
2540            // (metadata flag `s4-encrypted: aes-256-gcm`), decrypt
2541            // before any frame parse / streaming decompress. Encrypted
2542            // bodies are opaque to the codec; this also forces the
2543            // buffered path because AES-GCM needs the full body for tag
2544            // verify. SSE-C uses the per-request customer key, SSE-S4
2545            // falls back to the configured keyring.
2546            let blob = if is_sse_encrypted(&resp.output.metadata) {
2547                let body = collect_blob(blob, self.max_body_bytes)
2548                    .await
2549                    .map_err(internal("collect SSE-encrypted body"))?;
2550                // v0.5 #28: peek the frame magic to route the right
2551                // decrypt path. S4E4 means SSE-KMS — unwrap the DEK
2552                // through the KMS backend (async). S4E1/E2/E3 take
2553                // the sync path (keyring or customer key).
2554                //
2555                // v0.8 #52 (S4E5) / v0.8.1 #57 (S4E6): the chunked
2556                // SSE-S4 frames take the *streaming* path — we hand
2557                // the response body a per-chunk verify-and-emit
2558                // Stream so the client sees chunk 0 plaintext after
2559                // one chunk-worth of AES-GCM verify (vs. waiting
2560                // for the whole body's tag), and the gateway no
2561                // longer needs to materialize the full plaintext
2562                // in memory before responding. SSE-C is out of
2563                // scope for the chunked path (chunked S4E3 is a
2564                // follow-up), so this branch requires the SSE-S4
2565                // keyring to be wired and `get_sse_c_material` to
2566                // be absent — otherwise we surface a clear
2567                // misconfiguration error instead of silently
2568                // falling through to the buffered chunked path.
2569                if matches!(
2570                    crate::sse::peek_magic(&body),
2571                    Some("S4E5") | Some("S4E6")
2572                ) && get_sse_c_material.is_none()
2573                {
2574                    let keyring_arc = self.sse_keyring.clone().ok_or_else(|| {
2575                        S3Error::with_message(
2576                            S3ErrorCode::InvalidRequest,
2577                            "object is SSE-S4 encrypted (S4E5/S4E6) but no --sse-s4-key is configured on this gateway",
2578                        )
2579                    })?;
2580                    let body_len = body.len() as u64;
2581                    let stream =
2582                        crate::sse::decrypt_chunked_stream(body, keyring_arc.as_ref());
2583                    // Stream is `'static` (the keyring borrow is
2584                    // consumed up front; the cipher lives inside
2585                    // the stream state — see decrypt_chunked_stream
2586                    // doc), so we can move it straight into a
2587                    // StreamingBlob without lifetime gymnastics.
2588                    use futures::StreamExt;
2589                    let mapped = stream.map(|r| {
2590                        r.map_err(|e| std::io::Error::other(format!(
2591                            "SSE-S4 chunked decrypt: {e}"
2592                        )))
2593                    });
2594                    use s3s::dto::StreamingBlob;
2595                    resp.output.body = Some(StreamingBlob::wrap(mapped));
2596                    // Plaintext content_length is unknown until all
2597                    // chunks have been verified; null it out so the
2598                    // ByteStream wrapper reports `unknown` to the
2599                    // HTTP layer (which then emits chunked transfer-
2600                    // encoding) rather than lying about the size.
2601                    resp.output.content_length = None;
2602                    // The backend's checksums + ETag describe the
2603                    // encrypted body (S4E5/S4E6 wire format), not
2604                    // the plaintext we're about to stream — clear them
2605                    // so the AWS SDK doesn't fail the GET with a
2606                    // ChecksumMismatch on a successful round-trip.
2607                    // Mirrors the streaming-zstd path at L1180-1185.
2608                    resp.output.checksum_crc32 = None;
2609                    resp.output.checksum_crc32c = None;
2610                    resp.output.checksum_crc64nvme = None;
2611                    resp.output.checksum_sha1 = None;
2612                    resp.output.checksum_sha256 = None;
2613                    resp.output.e_tag = None;
2614                    let elapsed = get_start.elapsed();
2615                    crate::metrics::record_get(
2616                        "sse-s4-chunked",
2617                        body_len,
2618                        body_len,
2619                        elapsed.as_secs_f64(),
2620                        true,
2621                    );
2622                    return Ok(resp);
2623                }
2624                let plain = match crate::sse::peek_magic(&body) {
2625                    Some("S4E4") => {
2626                        let kms = self.kms.as_ref().ok_or_else(|| {
2627                            S3Error::with_message(
2628                                S3ErrorCode::InvalidRequest,
2629                                "object is SSE-KMS encrypted but no --kms-local-dir / --kms-aws-region is configured on this gateway",
2630                            )
2631                        })?;
2632                        let kms_ref: &dyn crate::kms::KmsBackend = kms.as_ref();
2633                        crate::sse::decrypt_with_kms(&body, kms_ref)
2634                            .await
2635                            .map_err(|e| match e {
2636                                crate::sse::SseError::KmsBackend(k) => kms_error_to_s3(k),
2637                                other => S3Error::with_message(
2638                                    S3ErrorCode::InternalError,
2639                                    format!("SSE-KMS decrypt failed: {other}"),
2640                                ),
2641                            })?
2642                    }
2643                    _ => {
2644                        if let Some(ref m) = get_sse_c_material {
2645                            crate::sse::decrypt(
2646                                &body,
2647                                crate::sse::SseSource::CustomerKey {
2648                                    key: &m.key,
2649                                    key_md5: &m.key_md5,
2650                                },
2651                            )
2652                            .map_err(sse_c_error_to_s3)?
2653                        } else {
2654                            let keyring = self.sse_keyring.as_ref().ok_or_else(|| {
2655                                S3Error::with_message(
2656                                    S3ErrorCode::InvalidRequest,
2657                                    "object is SSE-S4 encrypted but no --sse-s4-key is configured on this gateway",
2658                                )
2659                            })?;
2660                            crate::sse::decrypt(&body, keyring).map_err(|e| {
2661                                S3Error::with_message(
2662                                    S3ErrorCode::InternalError,
2663                                    format!("SSE-S4 decrypt failed: {e}"),
2664                                )
2665                            })?
2666                        }
2667                    }
2668                };
2669                // v0.5 #28: parse out the on-disk wrapped DEK's key id
2670                // so the GET response can echo `x-amz-server-side-encryption-aws-kms-key-id`.
2671                if matches!(crate::sse::peek_magic(&body), Some("S4E4"))
2672                    && let Ok(hdr) = crate::sse::parse_s4e4_header(&body)
2673                {
2674                    resp.output.server_side_encryption = Some(
2675                        ServerSideEncryption::from_static(ServerSideEncryption::AWS_KMS),
2676                    );
2677                    resp.output.ssekms_key_id = Some(hdr.key_id.to_string());
2678                }
2679                bytes_to_blob(plain)
2680            } else if let Some(ref m) = get_sse_c_material {
2681                // Client sent SSE-C headers for an unencrypted object —
2682                // mirror AWS S3's 400 InvalidRequest.
2683                let _ = m;
2684                return Err(sse_c_error_to_s3(crate::sse::SseError::CustomerKeyUnexpected));
2685            } else {
2686                blob
2687            };
2688            // v0.5 #27: SSE-C echo on success — algorithm + key MD5
2689            // tell the client that the supplied key was the one used.
2690            if let Some(ref m) = get_sse_c_material {
2691                resp.output.sse_customer_algorithm = Some(crate::sse::SSE_C_ALGORITHM.into());
2692                resp.output.sse_customer_key_md5 = Some(
2693                    base64::engine::general_purpose::STANDARD.encode(m.key_md5),
2694                );
2695            }
2696            // ====== Streaming fast path (CpuZstd, non-multipart, codec supports it) ======
2697            // 大規模 object (e.g. 5 GB) を memory に collect すると OOM するので、
2698            // codec が streaming-aware なら body を chunk-by-chunk で decompress して
2699            // 即座に client に流す。
2700            //
2701            // ただし Range request 時は streaming できない (slice するため total bytes
2702            // が必要) → buffered path に fall through。
2703            if range_request.is_none()
2704                && !needs_frame_parse
2705                && let Some(ref m) = manifest_opt
2706                && supports_streaming_decompress(m.codec)
2707                && m.codec == CodecKind::CpuZstd
2708            {
2709                let decompressed_blob = cpu_zstd_decompress_stream(blob);
2710                resp.output.content_length = Some(m.original_size as i64);
2711                resp.output.checksum_crc32 = None;
2712                resp.output.checksum_crc32c = None;
2713                resp.output.checksum_crc64nvme = None;
2714                resp.output.checksum_sha1 = None;
2715                resp.output.checksum_sha256 = None;
2716                resp.output.e_tag = None;
2717                resp.output.body = Some(decompressed_blob);
2718                let elapsed = get_start.elapsed();
2719                crate::metrics::record_get(
2720                    m.codec.as_str(),
2721                    m.compressed_size,
2722                    m.original_size,
2723                    elapsed.as_secs_f64(),
2724                    true,
2725                );
2726                info!(
2727                    op = "get_object",
2728                    bucket = %get_bucket,
2729                    key = %get_key,
2730                    codec = m.codec.as_str(),
2731                    bytes_in = m.compressed_size,
2732                    bytes_out = m.original_size,
2733                    path = "streaming",
2734                    setup_latency_ms = elapsed.as_millis() as u64,
2735                    "S4 get started (streaming)"
2736                );
2737                return Ok(resp);
2738            }
2739            // Passthrough: そのまま流す (Range なしの場合のみ streaming)
2740            if range_request.is_none()
2741                && !needs_frame_parse
2742                && let Some(ref m) = manifest_opt
2743                && m.codec == CodecKind::Passthrough
2744            {
2745                resp.output.content_length = Some(m.original_size as i64);
2746                resp.output.checksum_crc32 = None;
2747                resp.output.checksum_crc32c = None;
2748                resp.output.checksum_crc64nvme = None;
2749                resp.output.checksum_sha1 = None;
2750                resp.output.checksum_sha256 = None;
2751                resp.output.e_tag = None;
2752                resp.output.body = Some(blob);
2753                debug!("S4 get_object: passthrough streaming");
2754                return Ok(resp);
2755            }
2756
2757            // ====== Buffered slow path (multipart frame parser, GPU codecs) ======
2758            let bytes = collect_blob(blob, self.max_body_bytes)
2759                .await
2760                .map_err(internal("collect get body"))?;
2761
2762            let decompressed = if needs_frame_parse {
2763                // multipart objects と framed-v2 single-PUT objects は同じ
2764                // S4F2 frame 列なので decompress_multipart で統一処理
2765                self.decompress_multipart(bytes).await?
2766            } else {
2767                let manifest = manifest_opt.as_ref().expect("non-multipart guarded above");
2768                self.registry
2769                    .decompress(bytes, manifest)
2770                    .await
2771                    .map_err(internal("registry decompress"))?
2772            };
2773
2774            // Range request があれば slice。なければ full body を返す。
2775            let total_size = decompressed.len() as u64;
2776            let (final_bytes, status_override) = if let Some(r) = range_request.as_ref() {
2777                let (start, end) = resolve_range(r, total_size)
2778                    .map_err(|e| S3Error::with_message(S3ErrorCode::InvalidRange, e))?;
2779                let sliced = decompressed.slice(start as usize..end as usize);
2780                resp.output.content_range = Some(format!(
2781                    "bytes {start}-{}/{total_size}",
2782                    end.saturating_sub(1)
2783                ));
2784                (sliced, Some(http::StatusCode::PARTIAL_CONTENT))
2785            } else {
2786                (decompressed, None)
2787            };
2788            // 解凍後の真のサイズを返す (S3 client は content_length を信頼するので
2789            // 圧縮 size のままだと downstream が body を途中で切ってしまう)
2790            resp.output.content_length = Some(final_bytes.len() as i64);
2791            // 圧縮済 bytes の checksum を返すと AWS SDK 側で StreamingError
2792            // (ChecksumMismatch) になる。ETag も backend が返した「圧縮済 bytes の
2793            // MD5/checksum」なので意味的にズレる — クリアして S4 自身の crc32c
2794            // (manifest 内 / frame 内) で integrity を保証する設計にする。
2795            resp.output.checksum_crc32 = None;
2796            resp.output.checksum_crc32c = None;
2797            resp.output.checksum_crc64nvme = None;
2798            resp.output.checksum_sha1 = None;
2799            resp.output.checksum_sha256 = None;
2800            resp.output.e_tag = None;
2801            let returned_size = final_bytes.len() as u64;
2802            let codec_label = manifest_opt
2803                .as_ref()
2804                .map(|m| m.codec.as_str())
2805                .unwrap_or("multipart");
2806            resp.output.body = Some(bytes_to_blob(final_bytes));
2807            if let Some(status) = status_override {
2808                resp.status = Some(status);
2809            }
2810            let elapsed = get_start.elapsed();
2811            crate::metrics::record_get(codec_label, 0, returned_size, elapsed.as_secs_f64(), true);
2812            info!(
2813                op = "get_object",
2814                bucket = %get_bucket,
2815                key = %get_key,
2816                codec = codec_label,
2817                bytes_out = returned_size,
2818                total_object_size = total_size,
2819                range = range_request.is_some(),
2820                path = "buffered",
2821                latency_ms = elapsed.as_millis() as u64,
2822                "S4 get completed (buffered)"
2823            );
2824        }
2825        // v0.6 #40: echo the recorded `x-amz-replication-status` so
2826        // consumers can poll progress (PENDING / COMPLETED / FAILED).
2827        if let Some(mgr) = self.replication.as_ref()
2828            && let Some(status) = mgr.lookup_status(&get_bucket, &get_key)
2829        {
2830            resp.output.replication_status =
2831                Some(s3s::dto::ReplicationStatus::from(status.as_aws_str().to_owned()));
2832        }
2833        Ok(resp)
2834    }
2835
2836    // === passthrough delegations ===
2837    async fn head_bucket(
2838        &self,
2839        req: S3Request<HeadBucketInput>,
2840    ) -> S3Result<S3Response<HeadBucketOutput>> {
2841        self.backend.head_bucket(req).await
2842    }
2843    async fn list_buckets(
2844        &self,
2845        req: S3Request<ListBucketsInput>,
2846    ) -> S3Result<S3Response<ListBucketsOutput>> {
2847        self.backend.list_buckets(req).await
2848    }
2849    async fn create_bucket(
2850        &self,
2851        req: S3Request<CreateBucketInput>,
2852    ) -> S3Result<S3Response<CreateBucketOutput>> {
2853        self.backend.create_bucket(req).await
2854    }
2855    async fn delete_bucket(
2856        &self,
2857        req: S3Request<DeleteBucketInput>,
2858    ) -> S3Result<S3Response<DeleteBucketOutput>> {
2859        self.backend.delete_bucket(req).await
2860    }
2861    async fn head_object(
2862        &self,
2863        req: S3Request<HeadObjectInput>,
2864    ) -> S3Result<S3Response<HeadObjectOutput>> {
2865        // v0.6 #40: capture bucket/key before req is consumed so the
2866        // replication-status echo can look the entry up.
2867        let head_bucket = req.input.bucket.clone();
2868        let head_key = req.input.key.clone();
2869        let mut resp = self.backend.head_object(req).await?;
2870        if let Some(manifest) = extract_manifest(&resp.output.metadata) {
2871            // 客側には decompress 後の意味のある content_length / checksum を返す。
2872            // backend が返す圧縮済 bytes の checksum / e_tag は意味が違うため除去
2873            // (S4 は manifest 内の crc32c で integrity を担保する)。
2874            resp.output.content_length = Some(manifest.original_size as i64);
2875            resp.output.checksum_crc32 = None;
2876            resp.output.checksum_crc32c = None;
2877            resp.output.checksum_crc64nvme = None;
2878            resp.output.checksum_sha1 = None;
2879            resp.output.checksum_sha256 = None;
2880            resp.output.e_tag = None;
2881        }
2882        // v0.6 #40: echo `x-amz-replication-status` (PENDING / COMPLETED
2883        // / FAILED) so consumers can poll progress without a GET.
2884        if let Some(mgr) = self.replication.as_ref()
2885            && let Some(status) = mgr.lookup_status(&head_bucket, &head_key)
2886        {
2887            resp.output.replication_status =
2888                Some(s3s::dto::ReplicationStatus::from(status.as_aws_str().to_owned()));
2889        }
2890        // v0.7 #48 BUG-4 fix: HEAD must echo SSE indicators so SDKs
2891        // and pipelines see the same posture they got on PUT. The PUT
2892        // path stamps `s4-sse-type` metadata for exactly this — HEAD
2893        // doesn't fetch the body, so it can't peek frame magic.
2894        if let Some(meta) = resp.output.metadata.as_ref()
2895            && let Some(sse_type) = meta.get("s4-sse-type")
2896        {
2897            {
2898                match sse_type.as_str() {
2899                    "aws:kms" => {
2900                        resp.output.server_side_encryption = Some(
2901                            ServerSideEncryption::from_static(ServerSideEncryption::AWS_KMS),
2902                        );
2903                        if let Some(key_id) = meta.get("s4-sse-kms-key-id") {
2904                            resp.output.ssekms_key_id = Some(key_id.clone());
2905                        }
2906                    }
2907                    _ => {
2908                        resp.output.server_side_encryption = Some(
2909                            ServerSideEncryption::from_static(ServerSideEncryption::AES256),
2910                        );
2911                        if let Some(md5) = meta.get("s4-sse-c-key-md5") {
2912                            resp.output.sse_customer_algorithm =
2913                                Some(crate::sse::SSE_C_ALGORITHM.into());
2914                            resp.output.sse_customer_key_md5 = Some(md5.clone());
2915                        }
2916                    }
2917                }
2918            }
2919        }
2920        Ok(resp)
2921    }
2922    async fn delete_object(
2923        &self,
2924        mut req: S3Request<DeleteObjectInput>,
2925    ) -> S3Result<S3Response<DeleteObjectOutput>> {
2926        let bucket = req.input.bucket.clone();
2927        let key = req.input.key.clone();
2928        self.enforce_rate_limit(&req, &bucket)?;
2929        self.enforce_policy(&req, "s3:DeleteObject", &bucket, Some(&key))?;
2930        // v0.6 #42: MFA Delete enforcement. When the bucket has
2931        // MFA-Delete = Enabled, every DELETE / DELETE-version /
2932        // delete-marker form needs `x-amz-mfa: <serial> <code>` (RFC 6238
2933        // 6-digit TOTP). Runs *before* the WORM / versioning routers so
2934        // a missing token is denied for free regardless of which delete
2935        // path the request would otherwise take.
2936        if let Some(mgr) = self.mfa_delete.as_ref()
2937            && mgr.is_enabled(&bucket)
2938        {
2939            let header = req.input.mfa.as_deref();
2940            if let Err(e) = crate::mfa::check_mfa(&bucket, header, mgr, current_unix_secs()) {
2941                crate::metrics::record_mfa_delete_denial(&bucket);
2942                return Err(mfa_error_to_s3(e));
2943            }
2944        }
2945        // v0.5 #30: refuse the delete while a WORM lock is in effect.
2946        // Compliance can never be bypassed; Governance can be overridden
2947        // via `x-amz-bypass-governance-retention: true`; legal hold
2948        // never. The check happens before the versioning router so a
2949        // locked object can't be soft-deleted (delete-marker push) on an
2950        // Enabled bucket either — S3 spec says lock applies to all
2951        // delete forms.
2952        if let Some(mgr) = self.object_lock.as_ref()
2953            && let Some(state) = mgr.get(&bucket, &key)
2954        {
2955            let bypass = req.input.bypass_governance_retention.unwrap_or(false);
2956            let now = chrono::Utc::now();
2957            if !state.can_delete(now, bypass) {
2958                crate::metrics::record_policy_denial("s3:DeleteObject", &bucket);
2959                return Err(S3Error::with_message(
2960                    S3ErrorCode::AccessDenied,
2961                    "Access Denied because object protected by object lock",
2962                ));
2963            }
2964        }
2965        // v0.5 #34: route DELETE through the VersioningManager when the
2966        // bucket is in a versioning-aware state.
2967        //
2968        // - Enabled bucket, no version_id → push a delete marker into
2969        //   the chain. NO backend object is touched (older versions
2970        //   stay reachable via specific-version GET).
2971        // - Enabled / Suspended bucket, with version_id → physical
2972        //   delete. Backend bytes at the shadow key (or `<key>` for
2973        //   `null`) are removed; chain entry is dropped. If the deleted
2974        //   entry was a delete marker, no backend bytes exist for it
2975        //   (record-only).
2976        // - Suspended bucket, no version_id → push a "null" delete
2977        //   marker (S3 spec); backend bytes at `<key>` are physically
2978        //   removed (same as legacy).
2979        // - Unversioned bucket → fall through to legacy passthrough.
2980        if let Some(mgr) = self.versioning.as_ref() {
2981            let state = mgr.state(&bucket);
2982            if state != crate::versioning::VersioningState::Unversioned {
2983                let req_vid = req.input.version_id.take();
2984                if let Some(vid) = req_vid {
2985                    // Specific-version DELETE: touch backend bytes only
2986                    // when the entry was a real version (not a delete
2987                    // marker, which has no backend bytes).
2988                    let outcome = mgr.record_delete_specific(&bucket, &key, &vid);
2989                    let backend_target = if vid == crate::versioning::NULL_VERSION_ID {
2990                        key.clone()
2991                    } else {
2992                        versioned_shadow_key(&key, &vid)
2993                    };
2994                    let was_real_version = outcome
2995                        .as_ref()
2996                        .map(|o| !o.is_delete_marker)
2997                        .unwrap_or(false);
2998                    if was_real_version {
2999                        // Best-effort backend cleanup; missing bytes
3000                        // are not an error (e.g. shadow key already
3001                        // GC'd).
3002                        let backend_input = DeleteObjectInput {
3003                            bucket: bucket.clone(),
3004                            key: backend_target,
3005                            ..Default::default()
3006                        };
3007                        let backend_req = S3Request {
3008                            input: backend_input,
3009                            method: http::Method::DELETE,
3010                            uri: req.uri.clone(),
3011                            headers: req.headers.clone(),
3012                            extensions: http::Extensions::new(),
3013                            credentials: req.credentials.clone(),
3014                            region: req.region.clone(),
3015                            service: req.service.clone(),
3016                            trailing_headers: None,
3017                        };
3018                        let _ = self.backend.delete_object(backend_req).await;
3019                    }
3020                    let mut output = DeleteObjectOutput {
3021                        version_id: Some(vid.clone()),
3022                        ..Default::default()
3023                    };
3024                    if let Some(o) = outcome.as_ref()
3025                        && o.is_delete_marker
3026                    {
3027                        output.delete_marker = Some(true);
3028                    }
3029                    // v0.6 #35: specific-version DELETE always counts as
3030                    // a hard `ObjectRemoved:Delete` event (the chain
3031                    // entry, marker or not, is gone after this call).
3032                    self.fire_delete_notification(
3033                        &bucket,
3034                        &key,
3035                        crate::notifications::EventType::ObjectRemovedDelete,
3036                        Some(vid.clone()),
3037                    );
3038                    return Ok(S3Response::new(output));
3039                }
3040                // No version_id: record a delete marker (state-aware).
3041                let outcome = mgr.record_delete(&bucket, &key);
3042                if state == crate::versioning::VersioningState::Suspended {
3043                    // Suspended buckets also evict the prior `<key>`
3044                    // bytes (the previous null version is gone too).
3045                    let backend_input = DeleteObjectInput {
3046                        bucket: bucket.clone(),
3047                        key: key.clone(),
3048                        ..Default::default()
3049                    };
3050                    let backend_req = S3Request {
3051                        input: backend_input,
3052                        method: http::Method::DELETE,
3053                        uri: req.uri.clone(),
3054                        headers: req.headers.clone(),
3055                        extensions: http::Extensions::new(),
3056                        credentials: req.credentials.clone(),
3057                        region: req.region.clone(),
3058                        service: req.service.clone(),
3059                        trailing_headers: None,
3060                    };
3061                    let _ = self.backend.delete_object(backend_req).await;
3062                }
3063                let output = DeleteObjectOutput {
3064                    delete_marker: Some(true),
3065                    version_id: outcome.version_id.clone(),
3066                    ..Default::default()
3067                };
3068                // v0.6 #35: versioned bucket DELETE without a version-id
3069                // creates a delete marker — the dedicated AWS event
3070                // taxonomy entry. Suspended-state buckets also push a
3071                // (null) marker, so the same event fires there.
3072                self.fire_delete_notification(
3073                    &bucket,
3074                    &key,
3075                    crate::notifications::EventType::ObjectRemovedDeleteMarker,
3076                    outcome.version_id,
3077                );
3078                return Ok(S3Response::new(output));
3079            }
3080        }
3081        // Legacy / Unversioned path: physical delete on the backend +
3082        // best-effort sidecar cleanup (mirrors v0.4 behaviour).
3083        let resp = self.backend.delete_object(req).await?;
3084        // v0.5 #30: drop any per-object lock state once the delete has
3085        // succeeded so the freed key can be re-armed by a future PUT
3086        // under the bucket default. Reaching here implies the lock had
3087        // already passed `can_delete` above, so this is purely cleanup.
3088        if let Some(mgr) = self.object_lock.as_ref() {
3089            mgr.clear(&bucket, &key);
3090        }
3091        // v0.6 #39: drop any object-level tag set on physical delete —
3092        // the freed key starts a fresh tag history if a future PUT
3093        // re-creates it. (Versioned-delete branches above return early
3094        // and do NOT touch tags, mirroring AWS where tag state is
3095        // attached to the logical key, not the version chain.)
3096        if let Some(mgr) = self.tagging.as_ref() {
3097            mgr.delete_object_tags(&bucket, &key);
3098        }
3099        let sidecar = sidecar_key(&key);
3100        // v0.7 #49: skip the sidecar DELETE if the key + sidecar suffix
3101        // can't be encoded into a request URI — the primary delete
3102        // already succeeded and a stale sidecar is harmless (Range GET
3103        // re-validates the underlying object on next read).
3104        if let Ok(uri) = safe_object_uri(&bucket, &sidecar) {
3105            let sidecar_input = DeleteObjectInput {
3106                bucket: bucket.clone(),
3107                key: sidecar,
3108                ..Default::default()
3109            };
3110            let sidecar_req = S3Request {
3111                input: sidecar_input,
3112                method: http::Method::DELETE,
3113                uri,
3114                headers: http::HeaderMap::new(),
3115                extensions: http::Extensions::new(),
3116                credentials: None,
3117                region: None,
3118                service: None,
3119                trailing_headers: None,
3120            };
3121            let _ = self.backend.delete_object(sidecar_req).await;
3122        }
3123        // v0.6 #35: legacy unversioned-bucket hard delete fires the
3124        // canonical `ObjectRemoved:Delete` event.
3125        self.fire_delete_notification(
3126            &bucket,
3127            &key,
3128            crate::notifications::EventType::ObjectRemovedDelete,
3129            None,
3130        );
3131        Ok(resp)
3132    }
3133    async fn delete_objects(
3134        &self,
3135        req: S3Request<DeleteObjectsInput>,
3136    ) -> S3Result<S3Response<DeleteObjectsOutput>> {
3137        // v0.6 #42: MFA Delete applies once to the whole batch (S3 spec:
3138        // when MFA-Delete is on the bucket, a missing / invalid token
3139        // fails the entire DeleteObjects request, not per-object).
3140        if let Some(mgr) = self.mfa_delete.as_ref()
3141            && mgr.is_enabled(&req.input.bucket)
3142        {
3143            let header = req.input.mfa.as_deref();
3144            if let Err(e) =
3145                crate::mfa::check_mfa(&req.input.bucket, header, mgr, current_unix_secs())
3146            {
3147                crate::metrics::record_mfa_delete_denial(&req.input.bucket);
3148                return Err(mfa_error_to_s3(e));
3149            }
3150        }
3151        self.backend.delete_objects(req).await
3152    }
3153    async fn copy_object(
3154        &self,
3155        mut req: S3Request<CopyObjectInput>,
3156    ) -> S3Result<S3Response<CopyObjectOutput>> {
3157        // copy is conceptually "GetObject src + PutObject dst" — enforce both.
3158        let dst_bucket = req.input.bucket.clone();
3159        let dst_key = req.input.key.clone();
3160        self.enforce_policy(&req, "s3:PutObject", &dst_bucket, Some(&dst_key))?;
3161        if let CopySource::Bucket { bucket, key, .. } = &req.input.copy_source {
3162            self.enforce_policy(&req, "s3:GetObject", bucket, Some(key))?;
3163        }
3164        // S4-aware copy: source object に s4-* metadata がある場合、それを
3165        // destination に確実に preserve する。
3166        //
3167        // - MetadataDirective::COPY (default): backend が source metadata を
3168        //   そのまま copy するので S4 metadata も自動で渡る。介入不要
3169        // - MetadataDirective::REPLACE: 客が指定した metadata で source を
3170        //   上書き → s4-* metadata が消えると destination は decompress 不能に
3171        //   なる (silent corruption)。S4 が source metadata を HEAD で取得し、
3172        //   s4-* fields を input.metadata に強制 merge する
3173        let needs_merge = req
3174            .input
3175            .metadata_directive
3176            .as_ref()
3177            .map(|d| d.as_str() == MetadataDirective::REPLACE)
3178            .unwrap_or(false);
3179        if needs_merge && let CopySource::Bucket { bucket, key, .. } = &req.input.copy_source {
3180            let head_input = HeadObjectInput {
3181                bucket: bucket.to_string(),
3182                key: key.to_string(),
3183                ..Default::default()
3184            };
3185            let head_req = S3Request {
3186                input: head_input,
3187                method: req.method.clone(),
3188                uri: req.uri.clone(),
3189                headers: req.headers.clone(),
3190                extensions: http::Extensions::new(),
3191                credentials: req.credentials.clone(),
3192                region: req.region.clone(),
3193                service: req.service.clone(),
3194                trailing_headers: None,
3195            };
3196            if let Ok(head) = self.backend.head_object(head_req).await
3197                && let Some(src_meta) = head.output.metadata.as_ref()
3198            {
3199                let dest_meta = req.input.metadata.get_or_insert_with(Default::default);
3200                for key in [
3201                    META_CODEC,
3202                    META_ORIGINAL_SIZE,
3203                    META_COMPRESSED_SIZE,
3204                    META_CRC32C,
3205                    META_MULTIPART,
3206                    META_FRAMED,
3207                ] {
3208                    if let Some(v) = src_meta.get(key) {
3209                        // 客が同じ key を指定していたら preserve しない (= 上書き許可)
3210                        // していたら何もしない。指定していなければ insert
3211                        dest_meta
3212                            .entry(key.to_string())
3213                            .or_insert_with(|| v.clone());
3214                    }
3215                }
3216                debug!(
3217                    src_bucket = %bucket,
3218                    src_key = %key,
3219                    "S4 copy_object: preserved s4-* metadata across REPLACE directive"
3220                );
3221            }
3222        }
3223        self.backend.copy_object(req).await
3224    }
3225    async fn list_objects(
3226        &self,
3227        req: S3Request<ListObjectsInput>,
3228    ) -> S3Result<S3Response<ListObjectsOutput>> {
3229        self.enforce_rate_limit(&req, &req.input.bucket)?;
3230        self.enforce_policy(&req, "s3:ListBucket", &req.input.bucket, None)?;
3231        let mut resp = self.backend.list_objects(req).await?;
3232        // S4 内部 object (`*.s4index` sidecar、`.__s4ver__/` shadow versions
3233        // — v0.5 #34) を顧客から隠す。
3234        if let Some(contents) = resp.output.contents.as_mut() {
3235            contents.retain(|o| {
3236                o.key
3237                    .as_ref()
3238                    .map(|k| !k.ends_with(".s4index") && !is_versioning_shadow_key(k))
3239                    .unwrap_or(true)
3240            });
3241        }
3242        Ok(resp)
3243    }
3244    async fn list_objects_v2(
3245        &self,
3246        req: S3Request<ListObjectsV2Input>,
3247    ) -> S3Result<S3Response<ListObjectsV2Output>> {
3248        self.enforce_rate_limit(&req, &req.input.bucket)?;
3249        self.enforce_policy(&req, "s3:ListBucket", &req.input.bucket, None)?;
3250        let mut resp = self.backend.list_objects_v2(req).await?;
3251        if let Some(contents) = resp.output.contents.as_mut() {
3252            let before = contents.len();
3253            contents.retain(|o| {
3254                o.key
3255                    .as_ref()
3256                    .map(|k| !k.ends_with(".s4index") && !is_versioning_shadow_key(k))
3257                    .unwrap_or(true)
3258            });
3259            // key_count も補正 (S3 spec compliance)
3260            if let Some(kc) = resp.output.key_count.as_mut() {
3261                *kc -= (before - contents.len()) as i32;
3262            }
3263        }
3264        Ok(resp)
3265    }
3266    /// v0.4 #17: filter S4-internal sidecars from versioned listings.
3267    /// v0.5 #34: when a [`crate::versioning::VersioningManager`] is
3268    /// attached AND the bucket is in a versioning-aware state, build
3269    /// the `Versions` / `DeleteMarkers` arrays directly from the
3270    /// in-memory chain (paginated + ordered the S3 way: key asc,
3271    /// version newest-first inside each key). Otherwise fall back to
3272    /// passthrough + sidecar-filter (legacy v0.4 behaviour).
3273    async fn list_object_versions(
3274        &self,
3275        req: S3Request<ListObjectVersionsInput>,
3276    ) -> S3Result<S3Response<ListObjectVersionsOutput>> {
3277        self.enforce_rate_limit(&req, &req.input.bucket)?;
3278        self.enforce_policy(&req, "s3:ListBucket", &req.input.bucket, None)?;
3279        // v0.5 #34: VersioningManager-owned path.
3280        if let Some(mgr) = self.versioning.as_ref()
3281            && mgr.state(&req.input.bucket) != crate::versioning::VersioningState::Unversioned
3282        {
3283            let max_keys = req.input.max_keys.unwrap_or(1000) as usize;
3284            let page = mgr.list_versions(
3285                &req.input.bucket,
3286                req.input.prefix.as_deref(),
3287                req.input.key_marker.as_deref(),
3288                req.input.version_id_marker.as_deref(),
3289                max_keys,
3290            );
3291            let versions: Vec<ObjectVersion> = page
3292                .versions
3293                .into_iter()
3294                .map(|e| ObjectVersion {
3295                    key: Some(e.key),
3296                    version_id: Some(e.version_id),
3297                    is_latest: Some(e.is_latest),
3298                    e_tag: Some(ETag::Strong(e.etag)),
3299                    size: Some(e.size as i64),
3300                    last_modified: Some(std::time::SystemTime::from(e.last_modified).into()),
3301                    ..Default::default()
3302                })
3303                .collect();
3304            let delete_markers: Vec<DeleteMarkerEntry> = page
3305                .delete_markers
3306                .into_iter()
3307                .map(|e| DeleteMarkerEntry {
3308                    key: Some(e.key),
3309                    version_id: Some(e.version_id),
3310                    is_latest: Some(e.is_latest),
3311                    last_modified: Some(std::time::SystemTime::from(e.last_modified).into()),
3312                    ..Default::default()
3313                })
3314                .collect();
3315            let output = ListObjectVersionsOutput {
3316                name: Some(req.input.bucket.clone()),
3317                prefix: req.input.prefix.clone(),
3318                key_marker: req.input.key_marker.clone(),
3319                version_id_marker: req.input.version_id_marker.clone(),
3320                max_keys: req.input.max_keys,
3321                versions: if versions.is_empty() {
3322                    None
3323                } else {
3324                    Some(versions)
3325                },
3326                delete_markers: if delete_markers.is_empty() {
3327                    None
3328                } else {
3329                    Some(delete_markers)
3330                },
3331                is_truncated: Some(page.is_truncated),
3332                next_key_marker: page.next_key_marker,
3333                next_version_id_marker: page.next_version_id_marker,
3334                ..Default::default()
3335            };
3336            return Ok(S3Response::new(output));
3337        }
3338        // Legacy passthrough path (v0.4 #17 sidecar filter retained).
3339        let mut resp = self.backend.list_object_versions(req).await?;
3340        if let Some(versions) = resp.output.versions.as_mut() {
3341            versions.retain(|v| {
3342                v.key
3343                    .as_ref()
3344                    .map(|k| !k.ends_with(".s4index") && !is_versioning_shadow_key(k))
3345                    .unwrap_or(true)
3346            });
3347        }
3348        if let Some(markers) = resp.output.delete_markers.as_mut() {
3349            markers.retain(|m| {
3350                m.key
3351                    .as_ref()
3352                    .map(|k| !k.ends_with(".s4index") && !is_versioning_shadow_key(k))
3353                    .unwrap_or(true)
3354            });
3355        }
3356        Ok(resp)
3357    }
3358
3359    async fn create_multipart_upload(
3360        &self,
3361        mut req: S3Request<CreateMultipartUploadInput>,
3362    ) -> S3Result<S3Response<CreateMultipartUploadOutput>> {
3363        // Multipart object は per-part 圧縮 + frame 形式で書く。GET 時に
3364        // frame parse を起動するため、object metadata に flag を立てる。
3365        // codec は dispatcher の default kind を採用 (per-part 別 codec は Phase 2)。
3366        let codec_kind = self.registry.default_kind();
3367        let meta = req.input.metadata.get_or_insert_with(Default::default);
3368        meta.insert(META_MULTIPART.into(), "true".into());
3369        meta.insert(META_CODEC.into(), codec_kind.as_str().into());
3370        // v0.8 #54 BUG-10 fix: take() the SSE request fields off
3371        // `req.input` so they are NOT forwarded to the backend on
3372        // CreateMultipartUpload. Same root cause as v0.7 #48 BUG-2/3 on
3373        // single-PUT — MinIO rejects SSE-C with "HTTPS required" and
3374        // SSE-KMS with "KMS not configured" when the headers reach it.
3375        // S4 owns the encrypt-then-store contract; we capture the
3376        // recipe in `multipart_state` here and apply it on Complete.
3377        let sse_c_alg = req.input.sse_customer_algorithm.take();
3378        let sse_c_key = req.input.sse_customer_key.take();
3379        let sse_c_md5 = req.input.sse_customer_key_md5.take();
3380        let sse_header = req.input.server_side_encryption.take();
3381        let sse_kms_key = req.input.ssekms_key_id.take();
3382        // Strip the encryption-context too — leaving it would make
3383        // MinIO try to validate it against a non-existent KMS key.
3384        let _ = req.input.ssekms_encryption_context.take();
3385        let sse_c_material = extract_sse_c_material(&sse_c_alg, &sse_c_key, &sse_c_md5)?;
3386        let kms_key_id = extract_kms_key_id(
3387            &sse_header,
3388            &sse_kms_key,
3389            self.kms_default_key_id.as_deref(),
3390        );
3391        // SSE-C / SSE-KMS exclusivity (mirrors put_object L1870).
3392        if sse_c_material.is_some() && kms_key_id.is_some() {
3393            return Err(S3Error::with_message(
3394                S3ErrorCode::InvalidArgument,
3395                "SSE-C and SSE-KMS cannot be used together on the same multipart upload",
3396            ));
3397        }
3398        let sse_mode = if let Some(ref m) = sse_c_material {
3399            crate::multipart_state::MultipartSseMode::SseC {
3400                key: m.key,
3401                key_md5: m.key_md5,
3402            }
3403        } else if let Some(ref kid) = kms_key_id {
3404            // KMS pre-flight: fail at Create rather than at Complete if
3405            // the gateway has no KMS backend wired (mirrors the
3406            // put_object L1879 check).
3407            if self.kms.is_none() {
3408                return Err(S3Error::with_message(
3409                    S3ErrorCode::InvalidRequest,
3410                    "SSE-KMS requested but no --kms-local-dir / --kms-aws-region is configured on this gateway",
3411                ));
3412            }
3413            crate::multipart_state::MultipartSseMode::SseKms { key_id: kid.clone() }
3414        } else if self.sse_keyring.is_some() {
3415            // SSE-S4: server-driven transparent encryption. Activates
3416            // whenever the gateway has a keyring configured AND the
3417            // client didn't pick a different SSE mode.
3418            crate::multipart_state::MultipartSseMode::SseS4
3419        } else {
3420            crate::multipart_state::MultipartSseMode::None
3421        };
3422        // v0.8 #54 BUG-9 fix: parse the Tagging header on Create. The
3423        // single-PUT path does this on PutObject; the multipart path
3424        // captures it now and commits via TagManager on Complete.
3425        let request_tags: Option<crate::tagging::TagSet> = req
3426            .input
3427            .tagging
3428            .as_deref()
3429            .map(crate::tagging::parse_tagging_header)
3430            .transpose()
3431            .map_err(|e| S3Error::with_message(S3ErrorCode::InvalidArgument, e.to_string()))?;
3432        // Strip the `Tagging` field off the input so the backend
3433        // doesn't try to apply it (no-op on MinIO but keeps the wire
3434        // clean).
3435        let _ = req.input.tagging.take();
3436        // Object Lock recipe (BUG-7 — captured here, applied on Complete).
3437        let explicit_lock_mode: Option<crate::object_lock::LockMode> = req
3438            .input
3439            .object_lock_mode
3440            .as_ref()
3441            .and_then(|m| crate::object_lock::LockMode::from_aws_str(m.as_str()));
3442        let explicit_retain_until: Option<chrono::DateTime<chrono::Utc>> = req
3443            .input
3444            .object_lock_retain_until_date
3445            .as_ref()
3446            .and_then(timestamp_to_chrono_utc);
3447        let explicit_legal_hold_on: bool = req
3448            .input
3449            .object_lock_legal_hold_status
3450            .as_ref()
3451            .map(|s| s.as_str().eq_ignore_ascii_case("ON"))
3452            .unwrap_or(false);
3453        let bucket = req.input.bucket.clone();
3454        let key = req.input.key.clone();
3455        debug!(
3456            bucket = %bucket,
3457            key = %key,
3458            codec = codec_kind.as_str(),
3459            sse = ?sse_mode,
3460            "S4 create_multipart_upload: marking object for per-part compression"
3461        );
3462        let mut resp = self.backend.create_multipart_upload(req).await?;
3463        // Stash the per-upload context only after the backend handed
3464        // us an upload_id (failed Creates leave nothing in the store).
3465        if let Some(upload_id) = resp.output.upload_id.as_ref() {
3466            self.multipart_state.put(
3467                upload_id,
3468                crate::multipart_state::MultipartUploadContext {
3469                    bucket,
3470                    key,
3471                    sse: sse_mode.clone(),
3472                    tags: request_tags,
3473                    object_lock_mode: explicit_lock_mode,
3474                    object_lock_retain_until: explicit_retain_until,
3475                    object_lock_legal_hold: explicit_legal_hold_on,
3476                },
3477            );
3478        }
3479        // SSE-C / SSE-KMS response echo (mirrors put_object L2036-L2050).
3480        match &sse_mode {
3481            crate::multipart_state::MultipartSseMode::SseC { key_md5, .. } => {
3482                resp.output.sse_customer_algorithm = Some(crate::sse::SSE_C_ALGORITHM.into());
3483                resp.output.sse_customer_key_md5 = Some(
3484                    base64::engine::general_purpose::STANDARD.encode(key_md5),
3485                );
3486            }
3487            crate::multipart_state::MultipartSseMode::SseKms { key_id } => {
3488                resp.output.server_side_encryption = Some(
3489                    ServerSideEncryption::from_static(ServerSideEncryption::AWS_KMS),
3490                );
3491                resp.output.ssekms_key_id = Some(key_id.clone());
3492            }
3493            _ => {}
3494        }
3495        Ok(resp)
3496    }
3497
3498    async fn upload_part(
3499        &self,
3500        mut req: S3Request<UploadPartInput>,
3501    ) -> S3Result<S3Response<UploadPartOutput>> {
3502        // 各 part を圧縮して frame header 付きで forward。GET 時に
3503        // `decompress_multipart` が frame iter で順に解凍する。
3504        // **per-part codec dispatch**: dispatcher が body 先頭 sample から
3505        // codec を選ぶので、parquet 風の mixed-content multipart で part ごとに
3506        // 最適 codec を使える (整数列 part → Bitcomp、text 列 part → zstd 等)。
3507        //
3508        // v0.8 #54 BUG-5/BUG-10 fix: lookup the per-upload SSE
3509        // context captured by `create_multipart_upload` and (a) strip
3510        // any SSE-C request headers off `req.input` so the backend
3511        // doesn't see them — same root cause as v0.7 #48 BUG-2/3 on
3512        // single-PUT; MinIO refuses SSE-C parts over HTTP — and (b)
3513        // observe that an upload context exists for `upload_id`. The
3514        // actual encrypt happens once at `complete_multipart_upload`
3515        // time on the assembled body (the per-part-encrypt approach
3516        // would require a matching multi-segment decrypt path on GET;
3517        // encrypting the whole assembled body keeps the GET path's
3518        // `is_sse_encrypted` branch in get_object L2429 working
3519        // unchanged).
3520        let _sse_ctx = self
3521            .multipart_state
3522            .get(req.input.upload_id.as_str());
3523        // Strip the SSE-C client headers that AWS clients echo on
3524        // every UploadPart per the multipart-with-SSE-C spec. We
3525        // accept them here as authentication that the caller is the
3526        // same one that initiated the upload (real-S3 enforces match)
3527        // but do NOT forward to the backend; the `_sse_ctx` capture
3528        // above already has the canonical key bytes from Create.
3529        // Always-strip is safe even when no context is registered
3530        // (legacy abandoned upload) — the backend gets a clean part
3531        // request either way.
3532        let _ = req.input.sse_customer_algorithm.take();
3533        let _ = req.input.sse_customer_key.take();
3534        let _ = req.input.sse_customer_key_md5.take();
3535        if let Some(blob) = req.input.body.take() {
3536            let bytes = collect_blob(blob, self.max_body_bytes)
3537                .await
3538                .map_err(internal("collect upload_part body"))?;
3539            let sample_len = bytes.len().min(SAMPLE_BYTES);
3540            // v0.8 #56: full part body is already in memory here; use its
3541            // length as the size hint so the dispatcher can promote to GPU
3542            // if it's big enough.
3543            let codec_kind = self
3544                .dispatcher
3545                .pick_with_size_hint(&bytes[..sample_len], Some(bytes.len() as u64))
3546                .await;
3547            let original_size = bytes.len() as u64;
3548            // v0.8 #55: telemetry-returning compress (GPU metrics stamp).
3549            let (compress_res, tel) = self
3550                .registry
3551                .compress_with_telemetry(bytes, codec_kind)
3552                .await;
3553            stamp_gpu_compress_telemetry(&tel);
3554            let (compressed, manifest) = compress_res.map_err(internal("registry compress part"))?;
3555            let header = FrameHeader {
3556                codec: codec_kind,
3557                original_size,
3558                compressed_size: compressed.len() as u64,
3559                crc32c: manifest.crc32c,
3560            };
3561            let mut framed = BytesMut::with_capacity(FRAME_HEADER_BYTES + compressed.len());
3562            write_frame(&mut framed, header, &compressed);
3563            // v0.2 #5: heuristic-based padding skip for likely-final parts.
3564            //
3565            // AWS SDK / aws-cli / boto3 always send the final (and only the
3566            // final) part below the configured part_size. So if the raw user
3567            // part is already smaller than S3's 5 MiB multipart minimum, this
3568            // is overwhelmingly likely to be the final part — and the final
3569            // part is exempt from S3's size constraint. Skipping padding here
3570            // saves up to ~5 MiB per object on highly compressible workloads.
3571            //
3572            // If a misbehaving client sends a tiny **non-final** part, S3
3573            // itself rejects with EntityTooSmall at CompleteMultipartUpload —
3574            // identical outcome to a vanilla S3 PUT, just earlier than
3575            // padding-then-complete would catch it.
3576            let likely_final = original_size < S3_MULTIPART_MIN_PART_BYTES as u64;
3577            if !likely_final {
3578                pad_to_minimum(&mut framed, S3_MULTIPART_MIN_PART_BYTES);
3579            }
3580            let framed_bytes = framed.freeze();
3581            let new_len = framed_bytes.len() as i64;
3582            // 同じ wire 互換問題が multipart にもある (content-length / checksum)
3583            req.input.content_length = Some(new_len);
3584            req.input.checksum_algorithm = None;
3585            req.input.checksum_crc32 = None;
3586            req.input.checksum_crc32c = None;
3587            req.input.checksum_crc64nvme = None;
3588            req.input.checksum_sha1 = None;
3589            req.input.checksum_sha256 = None;
3590            req.input.content_md5 = None;
3591            req.input.body = Some(bytes_to_blob(framed_bytes));
3592            debug!(
3593                part_number = ?req.input.part_number,
3594                upload_id = ?req.input.upload_id,
3595                original_size,
3596                framed_size = new_len,
3597                "S4 upload_part: framed compressed payload"
3598            );
3599        }
3600        self.backend.upload_part(req).await
3601    }
3602    async fn complete_multipart_upload(
3603        &self,
3604        mut req: S3Request<CompleteMultipartUploadInput>,
3605    ) -> S3Result<S3Response<CompleteMultipartUploadOutput>> {
3606        let bucket = req.input.bucket.clone();
3607        let key = req.input.key.clone();
3608        let upload_id = req.input.upload_id.clone();
3609        // v0.8.1 #59: serialise concurrent Complete invocations on the
3610        // same `(bucket, key)`. The race window the lock closes is the
3611        // GET-assembled-body → encrypt → PUT-encrypted-body triple
3612        // below (BUG-5 fix); without serialisation, two Completes for
3613        // different `upload_id` but the same logical key could each
3614        // read the other's plaintext assembled body and overwrite the
3615        // peer's encrypted result. The guard is held to function exit
3616        // (drop on `Ok` / `Err`), covering version-id mint, object-
3617        // lock apply, tagging persist, and replication enqueue too.
3618        let completion_lock = self.multipart_state.completion_lock(&bucket, &key);
3619        let _completion_guard = completion_lock.lock().await;
3620        // v0.8 #54 — fetch the per-upload context captured on Create.
3621        // `None` means an abandoned / unknown upload_id (gateway
3622        // crashed between Create and Complete, or pre-v0.8 state
3623        // restore); we still let the backend do its thing for
3624        // transparency, but we can't apply any SSE / version / lock /
3625        // tag / replication post-processing because we never captured
3626        // the recipe.
3627        let ctx = self.multipart_state.get(upload_id.as_str());
3628        // v0.8 #54 BUG-10 fix: same SSE-C header strip as upload_part
3629        // — some clients (boto3 / aws-sdk-cpp older versions) replay
3630        // the SSE-C triple on Complete too, and MinIO will choke if
3631        // they reach the backend.
3632        let _ = req.input.sse_customer_algorithm.take();
3633        let _ = req.input.sse_customer_key.take();
3634        let _ = req.input.sse_customer_key_md5.take();
3635        let mut resp = self.backend.complete_multipart_upload(req).await?;
3636        // CompleteMultipartUpload 成功 → 完成した object を full fetch して frame
3637        // index を build、`<key>.s4index` sidecar として保存。これで Range GET の
3638        // partial fetch path が利用可能になる (Range request の帯域節約)。
3639        // 注: 巨大 object の場合この pass は重いが、Range query は一度 sidecar が
3640        // できれば爆速になるので 1 回の cost は payback される
3641        //
3642        // v0.8 #54 BUG-5..9: this same fetch is the choke-point for
3643        // the SSE encrypt re-PUT + versioning shadow-key rewrite +
3644        // replication source-bytes capture, so we GET once and reuse
3645        // the bytes for every post-processing step.
3646        let assembled_body: Option<bytes::Bytes> =
3647            if let Ok(uri) = safe_object_uri(&bucket, &key) {
3648                let get_input = GetObjectInput {
3649                    bucket: bucket.clone(),
3650                    key: key.clone(),
3651                    ..Default::default()
3652                };
3653                let get_req = S3Request {
3654                    input: get_input,
3655                    method: http::Method::GET,
3656                    uri,
3657                    headers: http::HeaderMap::new(),
3658                    extensions: http::Extensions::new(),
3659                    credentials: None,
3660                    region: None,
3661                    service: None,
3662                    trailing_headers: None,
3663                };
3664                match self.backend.get_object(get_req).await {
3665                    Ok(get_resp) => match get_resp.output.body {
3666                        Some(blob) => collect_blob(blob, self.max_body_bytes).await.ok(),
3667                        None => None,
3668                    },
3669                    Err(_) => None,
3670                }
3671            } else {
3672                None
3673            };
3674        // Sidecar build (existing behaviour, gated on assembled body).
3675        if let Some(ref body) = assembled_body
3676            && let Ok(index) = build_index_from_body(body)
3677        {
3678            self.write_sidecar(&bucket, &key, &index).await;
3679        }
3680        // From here on, post-processing depends on the context —
3681        // short-circuit when the upload had no captured recipe
3682        // (legacy / crashed-Create / pre-v0.8 state restore).
3683        if let Some(ctx) = ctx {
3684            // v0.8 #54 BUG-6 fix: mint a version-id when the bucket
3685            // is versioning-Enabled. The single-PUT path does this in
3686            // `put_object` ~L1968; multipart was the missing branch.
3687            // We mint here (post-Complete, before any re-PUT) so the
3688            // same vid threads into both the shadow-key rewrite and
3689            // the VersionEntry the manager records.
3690            let pending_version: Option<crate::versioning::PutOutcome> = self
3691                .versioning
3692                .as_ref()
3693                .map(|mgr| mgr.state(&bucket))
3694                .map(|state| match state {
3695                    crate::versioning::VersioningState::Enabled => {
3696                        crate::versioning::PutOutcome {
3697                            version_id: crate::versioning::VersioningManager::new_version_id(),
3698                            versioned_response: true,
3699                        }
3700                    }
3701                    crate::versioning::VersioningState::Suspended
3702                    | crate::versioning::VersioningState::Unversioned => {
3703                        crate::versioning::PutOutcome {
3704                            version_id: crate::versioning::NULL_VERSION_ID.to_owned(),
3705                            versioned_response: false,
3706                        }
3707                    }
3708                });
3709            // v0.8 #54 BUG-5 fix: encrypt the assembled framed body
3710            // and re-PUT it to the backend so the on-disk bytes are
3711            // SSE-encrypted. The single-PUT path does this body-by-
3712            // body inside `put_object` (L1907-L1942); for multipart,
3713            // encrypt-per-part would require a multi-segment decrypt
3714            // path on GET — we instead do a single encrypt over the
3715            // assembled framed body so the existing GET decrypt
3716            // branch (`is_sse_encrypted` → `decrypt(body, source)` →
3717            // FrameIter) handles it unchanged.
3718            //
3719            // The cost is one extra round-trip per Complete for SSE-
3720            // enabled multipart (already-paid for the sidecar build).
3721            // For single-instance gateways pointing at a co-located
3722            // backend this is negligible; cross-region operators
3723            // would benefit from per-part encrypt + multi-segment
3724            // decrypt as a follow-up.
3725            let needs_re_put = matches!(
3726                ctx.sse,
3727                crate::multipart_state::MultipartSseMode::SseS4
3728                    | crate::multipart_state::MultipartSseMode::SseC { .. }
3729                    | crate::multipart_state::MultipartSseMode::SseKms { .. }
3730            ) || pending_version
3731                .as_ref()
3732                .map(|pv| pv.versioned_response)
3733                .unwrap_or(false);
3734            // Snapshot replication body in advance so we can pass it
3735            // to the spawn helper after the (possibly absent) re-PUT.
3736            let replication_body = assembled_body.clone();
3737            let mut applied_metadata: Option<std::collections::HashMap<String, String>> = None;
3738            if needs_re_put && let Some(body) = assembled_body {
3739                // v0.8.1 #58: same Zeroizing pattern as put_object's
3740                // single-PUT KMS branch — DEK plaintext lives in
3741                // `Zeroizing<[u8; 32]>` for the lifetime of this
3742                // Complete handler, then is wiped on drop.
3743                let kms_wrap: Option<(
3744                    zeroize::Zeroizing<[u8; 32]>,
3745                    crate::kms::WrappedDek,
3746                )> = if let crate::multipart_state::MultipartSseMode::SseKms {
3747                    ref key_id,
3748                } = ctx.sse
3749                {
3750                    let kms = self.kms.as_ref().ok_or_else(|| {
3751                        S3Error::with_message(
3752                            S3ErrorCode::InvalidRequest,
3753                            "SSE-KMS requested but no --kms-local-dir / --kms-aws-region is configured on this gateway",
3754                        )
3755                    })?;
3756                    let (dek, wrapped) = kms
3757                        .generate_dek(key_id)
3758                        .await
3759                        .map_err(kms_error_to_s3)?;
3760                    if dek.len() != 32 {
3761                        return Err(S3Error::with_message(
3762                            S3ErrorCode::InternalError,
3763                            format!(
3764                                "KMS backend returned a DEK of {} bytes (expected 32)",
3765                                dek.len()
3766                            ),
3767                        ));
3768                    }
3769                    let mut dek_arr: zeroize::Zeroizing<[u8; 32]> =
3770                        zeroize::Zeroizing::new([0u8; 32]);
3771                    dek_arr.copy_from_slice(&dek);
3772                    // `dek` (Zeroizing<Vec<u8>>) is dropped at scope end.
3773                    Some((dek_arr, wrapped))
3774                } else {
3775                    None
3776                };
3777                // Build the new metadata map: re-fetch via HEAD so
3778                // the multipart / codec markers the backend stamped
3779                // on Create flow through unchanged, then layer the
3780                // SSE markers on top.
3781                let head_req = S3Request {
3782                    input: HeadObjectInput {
3783                        bucket: bucket.clone(),
3784                        key: key.clone(),
3785                        ..Default::default()
3786                    },
3787                    method: http::Method::HEAD,
3788                    uri: safe_object_uri(&bucket, &key)?,
3789                    headers: http::HeaderMap::new(),
3790                    extensions: http::Extensions::new(),
3791                    credentials: None,
3792                    region: None,
3793                    service: None,
3794                    trailing_headers: None,
3795                };
3796                let mut new_metadata: std::collections::HashMap<String, String> =
3797                    match self.backend.head_object(head_req).await {
3798                        Ok(h) => h.output.metadata.unwrap_or_default(),
3799                        Err(_) => std::collections::HashMap::new(),
3800                    };
3801                let new_body = match &ctx.sse {
3802                    crate::multipart_state::MultipartSseMode::SseC { key, key_md5 } => {
3803                        new_metadata.insert("s4-encrypted".into(), "aes-256-gcm".into());
3804                        new_metadata.insert("s4-sse-type".into(), "AES256".into());
3805                        new_metadata.insert(
3806                            "s4-sse-c-key-md5".into(),
3807                            base64::engine::general_purpose::STANDARD.encode(key_md5),
3808                        );
3809                        crate::sse::encrypt_with_source(
3810                            &body,
3811                            crate::sse::SseSource::CustomerKey { key, key_md5 },
3812                        )
3813                    }
3814                    crate::multipart_state::MultipartSseMode::SseKms { .. } => {
3815                        let (dek, wrapped) = kms_wrap
3816                            .as_ref()
3817                            .expect("SseKms branch implies kms_wrap is Some");
3818                        new_metadata.insert("s4-encrypted".into(), "aes-256-gcm".into());
3819                        new_metadata.insert("s4-sse-type".into(), "aws:kms".into());
3820                        new_metadata
3821                            .insert("s4-sse-kms-key-id".into(), wrapped.key_id.clone());
3822                        // v0.8.1 #58: auto-deref from `&Zeroizing<[u8; 32]>`
3823                        // to `&[u8; 32]` (same shape as the put_object
3824                        // single-PUT branch).
3825                        let dek_ref: &[u8; 32] = dek;
3826                        crate::sse::encrypt_with_source(
3827                            &body,
3828                            crate::sse::SseSource::Kms { dek: dek_ref, wrapped },
3829                        )
3830                    }
3831                    crate::multipart_state::MultipartSseMode::SseS4 => {
3832                        let keyring = self.sse_keyring.as_ref().ok_or_else(|| {
3833                            S3Error::with_message(
3834                                S3ErrorCode::InternalError,
3835                                "SSE-S4 captured at Create but keyring missing at Complete",
3836                            )
3837                        })?;
3838                        new_metadata.insert("s4-encrypted".into(), "aes-256-gcm".into());
3839                        // SSE-S4 deliberately omits `s4-sse-type` so
3840                        // HEAD doesn't falsely advertise AWS-style
3841                        // SSE-S3 (matches the put_object L1929-L1939
3842                        // comment).
3843                        // v0.8 #52: same chunk_size dispatch as the
3844                        // single-PUT branch — multipart Complete
3845                        // re-encrypts the assembled body, so honoring
3846                        // the chunked path here is required to keep
3847                        // GET streaming on multipart-uploaded objects.
3848                        if self.sse_chunk_size > 0 {
3849                            crate::sse::encrypt_v2_chunked(
3850                                &body,
3851                                keyring,
3852                                self.sse_chunk_size,
3853                            )
3854                            .map_err(|e| {
3855                                S3Error::with_message(
3856                                    S3ErrorCode::InternalError,
3857                                    format!(
3858                                        "SSE-S4 chunked encrypt failed at Complete: {e}"
3859                                    ),
3860                                )
3861                            })?
3862                        } else {
3863                            crate::sse::encrypt_v2(&body, keyring)
3864                        }
3865                    }
3866                    crate::multipart_state::MultipartSseMode::None => body.clone(),
3867                };
3868                // v0.8 #54 BUG-6 fix: write the re-PUT under the
3869                // shadow key so the version chain doesn't overwrite
3870                // the previous version on a versioned bucket. The
3871                // original (unshadowed) key was assembled by the
3872                // backend on Complete; we delete it after the shadow
3873                // PUT lands.
3874                let put_target_key = if let Some(pv) = pending_version.as_ref() {
3875                    if pv.versioned_response {
3876                        versioned_shadow_key(&key, &pv.version_id)
3877                    } else {
3878                        key.clone()
3879                    }
3880                } else {
3881                    key.clone()
3882                };
3883                let new_body_len = new_body.len() as i64;
3884                let put_req = S3Request {
3885                    input: PutObjectInput {
3886                        bucket: bucket.clone(),
3887                        key: put_target_key.clone(),
3888                        body: Some(bytes_to_blob(new_body.clone())),
3889                        metadata: Some(new_metadata.clone()),
3890                        content_length: Some(new_body_len),
3891                        ..Default::default()
3892                    },
3893                    method: http::Method::PUT,
3894                    uri: safe_object_uri(&bucket, &put_target_key)?,
3895                    headers: http::HeaderMap::new(),
3896                    extensions: http::Extensions::new(),
3897                    credentials: None,
3898                    region: None,
3899                    service: None,
3900                    trailing_headers: None,
3901                };
3902                self.backend.put_object(put_req).await?;
3903                // If we rewrote the storage key (versioning shadow),
3904                // we must drop the original (unshadowed) Complete-
3905                // assembled bytes so subsequent listings don't see a
3906                // duplicate.
3907                if put_target_key != key {
3908                    let del_req = S3Request {
3909                        input: DeleteObjectInput {
3910                            bucket: bucket.clone(),
3911                            key: key.clone(),
3912                            ..Default::default()
3913                        },
3914                        method: http::Method::DELETE,
3915                        uri: safe_object_uri(&bucket, &key)?,
3916                        headers: http::HeaderMap::new(),
3917                        extensions: http::Extensions::new(),
3918                        credentials: None,
3919                        region: None,
3920                        service: None,
3921                        trailing_headers: None,
3922                    };
3923                    let _ = self.backend.delete_object(del_req).await;
3924                }
3925                applied_metadata = Some(new_metadata);
3926            }
3927            // v0.8 #54 BUG-6 commit: register the new version with
3928            // the VersioningManager so list_object_versions /
3929            // GET ?versionId= see it.
3930            if let (Some(mgr), Some(pv)) = (self.versioning.as_ref(), pending_version.as_ref()) {
3931                let etag = resp
3932                    .output
3933                    .e_tag
3934                    .clone()
3935                    .map(ETag::into_value)
3936                    .unwrap_or_default();
3937                let now = chrono::Utc::now();
3938                mgr.commit_put_with_version(
3939                    &bucket,
3940                    &key,
3941                    crate::versioning::VersionEntry {
3942                        version_id: pv.version_id.clone(),
3943                        etag,
3944                        size: replication_body
3945                            .as_ref()
3946                            .map(|b| b.len() as u64)
3947                            .unwrap_or(0),
3948                        is_delete_marker: false,
3949                        created_at: now,
3950                    },
3951                );
3952                if pv.versioned_response {
3953                    resp.output.version_id = Some(pv.version_id.clone());
3954                }
3955            }
3956            // v0.8 #54 BUG-7 fix: persist any per-upload Object Lock
3957            // recipe + auto-apply the bucket default. Mirrors the
3958            // put_object L2057-L2074 block.
3959            if let Some(mgr) = self.object_lock.as_ref() {
3960                if ctx.object_lock_mode.is_some()
3961                    || ctx.object_lock_retain_until.is_some()
3962                    || ctx.object_lock_legal_hold
3963                {
3964                    let mut state = mgr.get(&bucket, &key).unwrap_or_default();
3965                    if let Some(m) = ctx.object_lock_mode {
3966                        state.mode = Some(m);
3967                    }
3968                    if let Some(u) = ctx.object_lock_retain_until {
3969                        state.retain_until = Some(u);
3970                    }
3971                    if ctx.object_lock_legal_hold {
3972                        state.legal_hold_on = true;
3973                    }
3974                    mgr.set(&bucket, &key, state);
3975                }
3976                mgr.apply_default_on_put(&bucket, &key, chrono::Utc::now());
3977            }
3978            // v0.8 #54 BUG-9 fix: persist the captured tags via the
3979            // TagManager so GetObjectTagging returns them.
3980            if let (Some(mgr), Some(tags)) = (self.tagging.as_ref(), ctx.tags.as_ref()) {
3981                mgr.put_object_tags(&bucket, &key, tags.clone());
3982            }
3983            // SSE-C / SSE-KMS response echo. The
3984            // CompleteMultipartUploadOutput only exposes
3985            // `server_side_encryption` + `ssekms_key_id` (no
3986            // sse_customer_* — those round-tripped on Create / parts).
3987            match &ctx.sse {
3988                crate::multipart_state::MultipartSseMode::SseC { .. } => {
3989                    resp.output.server_side_encryption = Some(
3990                        ServerSideEncryption::from_static(ServerSideEncryption::AES256),
3991                    );
3992                }
3993                crate::multipart_state::MultipartSseMode::SseKms { key_id } => {
3994                    resp.output.server_side_encryption = Some(
3995                        ServerSideEncryption::from_static(ServerSideEncryption::AWS_KMS),
3996                    );
3997                    resp.output.ssekms_key_id = Some(key_id.clone());
3998                }
3999                _ => {}
4000            }
4001            // v0.8 #54 BUG-8 fix: fire cross-bucket replication just
4002            // like put_object L2165 does. We hand the dispatcher the
4003            // assembled body bytes (post-encrypt where applicable, so
4004            // the destination ends up byte-identical to the source's
4005            // on-disk shape) plus the metadata that was actually
4006            // committed.
4007            let replication_body_bytes = replication_body.unwrap_or_default();
4008            self.spawn_replication_if_matched(
4009                &bucket,
4010                &key,
4011                &ctx.tags,
4012                &replication_body_bytes,
4013                &applied_metadata,
4014                true,
4015            );
4016            self.multipart_state.remove(upload_id.as_str());
4017        }
4018        // v0.8.1 #59 janitor: best-effort sweep of stale completion
4019        // locks while we are still on the critical path of a single
4020        // Complete (so steady-state workloads of unique keys don't
4021        // accumulate `DashMap` entries). The sweep only retires
4022        // entries whose `Arc::strong_count == 1`, so any other in-
4023        // flight Complete on a different key keeps its lock alive.
4024        // Our own `_completion_guard` keeps `bucket`/`key`'s entry
4025        // alive across this call; it's reaped on the next Complete or
4026        // the next caller-driven prune.
4027        self.multipart_state.prune_completion_locks();
4028        Ok(resp)
4029    }
4030    async fn abort_multipart_upload(
4031        &self,
4032        req: S3Request<AbortMultipartUploadInput>,
4033    ) -> S3Result<S3Response<AbortMultipartUploadOutput>> {
4034        // v0.8 #54: drop the per-upload state (SSE-C key bytes / tag
4035        // set) promptly so an aborted upload doesn't leak the
4036        // customer's key into a long-running gateway's RSS.
4037        self.multipart_state.remove(req.input.upload_id.as_str());
4038        self.backend.abort_multipart_upload(req).await
4039    }
4040    async fn list_multipart_uploads(
4041        &self,
4042        req: S3Request<ListMultipartUploadsInput>,
4043    ) -> S3Result<S3Response<ListMultipartUploadsOutput>> {
4044        self.backend.list_multipart_uploads(req).await
4045    }
4046    async fn list_parts(
4047        &self,
4048        req: S3Request<ListPartsInput>,
4049    ) -> S3Result<S3Response<ListPartsOutput>> {
4050        self.backend.list_parts(req).await
4051    }
4052
4053    // =========================================================================
4054    // Phase 2 — pure passthrough delegations。S4 はこれらに対して圧縮 hook を
4055    // 持たないので、backend (= AWS S3) の動作と完全に同一。
4056    //
4057    // 既知の制限事項:
4058    // - copy_object / upload_part_copy: source object が S4-compressed の場合、
4059    //   backend が bytes を copy するだけなので metadata (s4-codec etc) も一緒に
4060    //   coppied される (AWS S3 default = MetadataDirective COPY)。GET は manifest
4061    //   経由で正しく decompress できる。MetadataDirective REPLACE で上書き
4062    //   されると圧縮 metadata が消えて壊れる — 顧客側の運用で注意
4063    // - list_object_versions: versioning enabled bucket では各 version も S4
4064    //   metadata を維持する。古い version も S4 経由で正しく GET できる。
4065    // =========================================================================
4066
4067    // ---- Object ACL / tagging / attributes ----
4068    async fn get_object_acl(
4069        &self,
4070        req: S3Request<GetObjectAclInput>,
4071    ) -> S3Result<S3Response<GetObjectAclOutput>> {
4072        self.backend.get_object_acl(req).await
4073    }
4074    async fn put_object_acl(
4075        &self,
4076        req: S3Request<PutObjectAclInput>,
4077    ) -> S3Result<S3Response<PutObjectAclOutput>> {
4078        self.backend.put_object_acl(req).await
4079    }
4080    // v0.6 #39: object tagging — when a `TagManager` is attached the
4081    // configuration / per-(bucket, key) state lives in the manager and
4082    // these handlers serve directly from it; when no manager is
4083    // attached they fall back to the backend (legacy passthrough so
4084    // v0.5 deployments are unaffected).
4085    async fn get_object_tagging(
4086        &self,
4087        req: S3Request<GetObjectTaggingInput>,
4088    ) -> S3Result<S3Response<GetObjectTaggingOutput>> {
4089        let Some(mgr) = self.tagging.as_ref() else {
4090            return self.backend.get_object_tagging(req).await;
4091        };
4092        let tags = mgr
4093            .get_object_tags(&req.input.bucket, &req.input.key)
4094            .unwrap_or_default();
4095        Ok(S3Response::new(GetObjectTaggingOutput {
4096            tag_set: tagset_to_aws(&tags),
4097            ..Default::default()
4098        }))
4099    }
4100    async fn put_object_tagging(
4101        &self,
4102        req: S3Request<PutObjectTaggingInput>,
4103    ) -> S3Result<S3Response<PutObjectTaggingOutput>> {
4104        let Some(mgr) = self.tagging.as_ref() else {
4105            return self.backend.put_object_tagging(req).await;
4106        };
4107        let bucket = req.input.bucket.clone();
4108        let key = req.input.key.clone();
4109        let parsed = aws_to_tagset(&req.input.tagging.tag_set).map_err(|e| {
4110            S3Error::with_message(S3ErrorCode::InvalidArgument, e.to_string())
4111        })?;
4112        // v0.6 #39: gate via IAM policy with both the request tags
4113        // (`s3:RequestObjectTag/<key>`) and any existing tags on the
4114        // target object (`s3:ExistingObjectTag/<key>`).
4115        let existing = mgr.get_object_tags(&bucket, &key);
4116        self.enforce_policy_with_extra(
4117            &req,
4118            "s3:PutObjectTagging",
4119            &bucket,
4120            Some(&key),
4121            Some(&parsed),
4122            existing.as_ref(),
4123        )?;
4124        mgr.put_object_tags(&bucket, &key, parsed);
4125        Ok(S3Response::new(PutObjectTaggingOutput::default()))
4126    }
4127    async fn delete_object_tagging(
4128        &self,
4129        req: S3Request<DeleteObjectTaggingInput>,
4130    ) -> S3Result<S3Response<DeleteObjectTaggingOutput>> {
4131        let Some(mgr) = self.tagging.as_ref() else {
4132            return self.backend.delete_object_tagging(req).await;
4133        };
4134        let bucket = req.input.bucket.clone();
4135        let key = req.input.key.clone();
4136        let existing = mgr.get_object_tags(&bucket, &key);
4137        self.enforce_policy_with_extra(
4138            &req,
4139            "s3:DeleteObjectTagging",
4140            &bucket,
4141            Some(&key),
4142            None,
4143            existing.as_ref(),
4144        )?;
4145        mgr.delete_object_tags(&bucket, &key);
4146        Ok(S3Response::new(DeleteObjectTaggingOutput::default()))
4147    }
4148    async fn get_object_attributes(
4149        &self,
4150        req: S3Request<GetObjectAttributesInput>,
4151    ) -> S3Result<S3Response<GetObjectAttributesOutput>> {
4152        self.backend.get_object_attributes(req).await
4153    }
4154    async fn restore_object(
4155        &self,
4156        req: S3Request<RestoreObjectInput>,
4157    ) -> S3Result<S3Response<RestoreObjectOutput>> {
4158        self.backend.restore_object(req).await
4159    }
4160    async fn upload_part_copy(
4161        &self,
4162        req: S3Request<UploadPartCopyInput>,
4163    ) -> S3Result<S3Response<UploadPartCopyOutput>> {
4164        // v0.2 #6: byte-range aware copy when the source is S4-framed.
4165        //
4166        // For a framed source (multipart upload OR single-PUT framed-v2),
4167        // a naive byte-range passthrough would copy compressed bytes that
4168        // don't align with S4 frame boundaries — silently corrupting the
4169        // result. Instead we GET the source through S4 (which handles
4170        // decompression + Range), re-compress + re-frame as a new part,
4171        // and forward as upload_part. For non-framed sources (S4-untouched
4172        // raw objects), passthrough is correct and we keep the original
4173        // (cheaper) code path.
4174        let CopySource::Bucket {
4175            bucket: src_bucket,
4176            key: src_key,
4177            ..
4178        } = &req.input.copy_source
4179        else {
4180            return self.backend.upload_part_copy(req).await;
4181        };
4182        let src_bucket = src_bucket.to_string();
4183        let src_key = src_key.to_string();
4184
4185        // Probe metadata to decide whether the source needs S4-aware copy.
4186        let head_input = HeadObjectInput {
4187            bucket: src_bucket.clone(),
4188            key: src_key.clone(),
4189            ..Default::default()
4190        };
4191        let head_req = S3Request {
4192            input: head_input,
4193            method: http::Method::HEAD,
4194            uri: req.uri.clone(),
4195            headers: req.headers.clone(),
4196            extensions: http::Extensions::new(),
4197            credentials: req.credentials.clone(),
4198            region: req.region.clone(),
4199            service: req.service.clone(),
4200            trailing_headers: None,
4201        };
4202        let needs_s4_copy = match self.backend.head_object(head_req).await {
4203            Ok(h) => {
4204                is_multipart_object(&h.output.metadata) || is_framed_v2_object(&h.output.metadata)
4205            }
4206            Err(_) => false,
4207        };
4208        if !needs_s4_copy {
4209            return self.backend.upload_part_copy(req).await;
4210        }
4211
4212        // Resolve the optional source byte range to pass to GET.
4213        let source_range = req
4214            .input
4215            .copy_source_range
4216            .as_ref()
4217            .map(|r| parse_copy_source_range(r))
4218            .transpose()
4219            .map_err(|e| S3Error::with_message(S3ErrorCode::InvalidRange, e))?;
4220
4221        // GET source via S4 (handles decompression + sidecar partial fetch
4222        // when range is present). The result is the requested user-visible
4223        // byte range, fully decompressed.
4224        let mut get_input = GetObjectInput {
4225            bucket: src_bucket.clone(),
4226            key: src_key.clone(),
4227            ..Default::default()
4228        };
4229        get_input.range = source_range;
4230        let get_req = S3Request {
4231            input: get_input,
4232            method: http::Method::GET,
4233            uri: req.uri.clone(),
4234            headers: req.headers.clone(),
4235            extensions: http::Extensions::new(),
4236            credentials: req.credentials.clone(),
4237            region: req.region.clone(),
4238            service: req.service.clone(),
4239            trailing_headers: None,
4240        };
4241        let get_resp = self.get_object(get_req).await?;
4242        let blob = get_resp.output.body.ok_or_else(|| {
4243            S3Error::with_message(
4244                S3ErrorCode::InternalError,
4245                "upload_part_copy: empty body from source GET",
4246            )
4247        })?;
4248        let bytes = collect_blob(blob, self.max_body_bytes)
4249            .await
4250            .map_err(internal("collect upload_part_copy source body"))?;
4251
4252        // Compress + frame as a fresh part (mirrors upload_part path).
4253        let sample_len = bytes.len().min(SAMPLE_BYTES);
4254        // v0.8 #56: same size-hint promotion as the upload_part path.
4255        let codec_kind = self
4256            .dispatcher
4257            .pick_with_size_hint(&bytes[..sample_len], Some(bytes.len() as u64))
4258            .await;
4259        let original_size = bytes.len() as u64;
4260        // v0.8 #55: telemetry-returning compress (GPU metrics stamp).
4261        let (compress_res, tel) = self
4262            .registry
4263            .compress_with_telemetry(bytes, codec_kind)
4264            .await;
4265        stamp_gpu_compress_telemetry(&tel);
4266        let (compressed, manifest) =
4267            compress_res.map_err(internal("registry compress upload_part_copy"))?;
4268        let header = FrameHeader {
4269            codec: codec_kind,
4270            original_size,
4271            compressed_size: compressed.len() as u64,
4272            crc32c: manifest.crc32c,
4273        };
4274        let mut framed = BytesMut::with_capacity(FRAME_HEADER_BYTES + compressed.len());
4275        write_frame(&mut framed, header, &compressed);
4276        let likely_final = original_size < S3_MULTIPART_MIN_PART_BYTES as u64;
4277        if !likely_final {
4278            pad_to_minimum(&mut framed, S3_MULTIPART_MIN_PART_BYTES);
4279        }
4280        let framed_bytes = framed.freeze();
4281        let framed_len = framed_bytes.len() as i64;
4282
4283        // Forward as upload_part to the destination multipart upload.
4284        let part_input = UploadPartInput {
4285            bucket: req.input.bucket.clone(),
4286            key: req.input.key.clone(),
4287            part_number: req.input.part_number,
4288            upload_id: req.input.upload_id.clone(),
4289            body: Some(bytes_to_blob(framed_bytes)),
4290            content_length: Some(framed_len),
4291            ..Default::default()
4292        };
4293        let part_req = S3Request {
4294            input: part_input,
4295            method: http::Method::PUT,
4296            uri: req.uri.clone(),
4297            headers: req.headers.clone(),
4298            extensions: http::Extensions::new(),
4299            credentials: req.credentials.clone(),
4300            region: req.region.clone(),
4301            service: req.service.clone(),
4302            trailing_headers: None,
4303        };
4304        let upload_resp = self.backend.upload_part(part_req).await?;
4305
4306        let copy_output = UploadPartCopyOutput {
4307            copy_part_result: Some(CopyPartResult {
4308                e_tag: upload_resp.output.e_tag.clone(),
4309                ..Default::default()
4310            }),
4311            ..Default::default()
4312        };
4313        Ok(S3Response::new(copy_output))
4314    }
4315
4316    // ---- Object lock / retention / legal hold (v0.5 #30) ----
4317    //
4318    // When an `ObjectLockManager` is attached the configuration / per-object
4319    // state lives in the manager and these handlers serve directly from it;
4320    // when no manager is attached they fall back to the backend (legacy
4321    // passthrough so v0.4 deployments are unaffected).
4322    async fn get_object_lock_configuration(
4323        &self,
4324        req: S3Request<GetObjectLockConfigurationInput>,
4325    ) -> S3Result<S3Response<GetObjectLockConfigurationOutput>> {
4326        if let Some(mgr) = self.object_lock.as_ref() {
4327            let cfg = mgr.bucket_default(&req.input.bucket).map(|d| {
4328                ObjectLockConfiguration {
4329                    object_lock_enabled: Some(ObjectLockEnabled::from_static(
4330                        ObjectLockEnabled::ENABLED,
4331                    )),
4332                    rule: Some(ObjectLockRule {
4333                        default_retention: Some(DefaultRetention {
4334                            days: Some(d.retention_days as i32),
4335                            mode: Some(ObjectLockRetentionMode::from_static(
4336                                match d.mode {
4337                                    crate::object_lock::LockMode::Governance => {
4338                                        ObjectLockRetentionMode::GOVERNANCE
4339                                    }
4340                                    crate::object_lock::LockMode::Compliance => {
4341                                        ObjectLockRetentionMode::COMPLIANCE
4342                                    }
4343                                },
4344                            )),
4345                            years: None,
4346                        }),
4347                    }),
4348                }
4349            });
4350            let output = GetObjectLockConfigurationOutput {
4351                object_lock_configuration: cfg,
4352            };
4353            return Ok(S3Response::new(output));
4354        }
4355        self.backend.get_object_lock_configuration(req).await
4356    }
4357    async fn put_object_lock_configuration(
4358        &self,
4359        req: S3Request<PutObjectLockConfigurationInput>,
4360    ) -> S3Result<S3Response<PutObjectLockConfigurationOutput>> {
4361        if let Some(mgr) = self.object_lock.as_ref() {
4362            let bucket = req.input.bucket.clone();
4363            if let Some(cfg) = req.input.object_lock_configuration.as_ref()
4364                && let Some(rule) = cfg.rule.as_ref()
4365                && let Some(d) = rule.default_retention.as_ref()
4366            {
4367                let mode = d
4368                    .mode
4369                    .as_ref()
4370                    .and_then(|m| crate::object_lock::LockMode::from_aws_str(m.as_str()))
4371                    .ok_or_else(|| {
4372                        S3Error::with_message(
4373                            S3ErrorCode::InvalidRequest,
4374                            "Object Lock default retention requires a valid Mode (GOVERNANCE | COMPLIANCE)",
4375                        )
4376                    })?;
4377                // S3 spec: exactly one of Days / Years (we accept Days
4378                // outright and convert Years → Days for storage; Years
4379                // is just a UX shorthand on the wire).
4380                let days: u32 = match (d.days, d.years) {
4381                    (Some(d), None) if d > 0 => d as u32,
4382                    (None, Some(y)) if y > 0 => (y as u32).saturating_mul(365),
4383                    _ => {
4384                        return Err(S3Error::with_message(
4385                            S3ErrorCode::InvalidRequest,
4386                            "Object Lock default retention requires exactly one of Days or Years (positive integer)",
4387                        ));
4388                    }
4389                };
4390                mgr.set_bucket_default(
4391                    &bucket,
4392                    crate::object_lock::BucketObjectLockDefault {
4393                        mode,
4394                        retention_days: days,
4395                    },
4396                );
4397            }
4398            return Ok(S3Response::new(PutObjectLockConfigurationOutput::default()));
4399        }
4400        self.backend.put_object_lock_configuration(req).await
4401    }
4402    async fn get_object_legal_hold(
4403        &self,
4404        req: S3Request<GetObjectLegalHoldInput>,
4405    ) -> S3Result<S3Response<GetObjectLegalHoldOutput>> {
4406        if let Some(mgr) = self.object_lock.as_ref() {
4407            let on = mgr
4408                .get(&req.input.bucket, &req.input.key)
4409                .map(|s| s.legal_hold_on)
4410                .unwrap_or(false);
4411            let status = ObjectLockLegalHoldStatus::from_static(if on {
4412                ObjectLockLegalHoldStatus::ON
4413            } else {
4414                ObjectLockLegalHoldStatus::OFF
4415            });
4416            let output = GetObjectLegalHoldOutput {
4417                legal_hold: Some(ObjectLockLegalHold {
4418                    status: Some(status),
4419                }),
4420            };
4421            return Ok(S3Response::new(output));
4422        }
4423        self.backend.get_object_legal_hold(req).await
4424    }
4425    async fn put_object_legal_hold(
4426        &self,
4427        req: S3Request<PutObjectLegalHoldInput>,
4428    ) -> S3Result<S3Response<PutObjectLegalHoldOutput>> {
4429        if let Some(mgr) = self.object_lock.as_ref() {
4430            let on = req
4431                .input
4432                .legal_hold
4433                .as_ref()
4434                .and_then(|h| h.status.as_ref())
4435                .map(|s| s.as_str().eq_ignore_ascii_case("ON"))
4436                .unwrap_or(false);
4437            mgr.set_legal_hold(&req.input.bucket, &req.input.key, on);
4438            return Ok(S3Response::new(PutObjectLegalHoldOutput::default()));
4439        }
4440        self.backend.put_object_legal_hold(req).await
4441    }
4442    async fn get_object_retention(
4443        &self,
4444        req: S3Request<GetObjectRetentionInput>,
4445    ) -> S3Result<S3Response<GetObjectRetentionOutput>> {
4446        if let Some(mgr) = self.object_lock.as_ref() {
4447            let retention = mgr
4448                .get(&req.input.bucket, &req.input.key)
4449                .filter(|s| s.mode.is_some() || s.retain_until.is_some())
4450                .map(|s| {
4451                    let mode = s.mode.map(|m| {
4452                        ObjectLockRetentionMode::from_static(match m {
4453                            crate::object_lock::LockMode::Governance => {
4454                                ObjectLockRetentionMode::GOVERNANCE
4455                            }
4456                            crate::object_lock::LockMode::Compliance => {
4457                                ObjectLockRetentionMode::COMPLIANCE
4458                            }
4459                        })
4460                    });
4461                    let until = s.retain_until.map(chrono_utc_to_timestamp);
4462                    ObjectLockRetention {
4463                        mode,
4464                        retain_until_date: until,
4465                    }
4466                });
4467            let output = GetObjectRetentionOutput { retention };
4468            return Ok(S3Response::new(output));
4469        }
4470        self.backend.get_object_retention(req).await
4471    }
4472    async fn put_object_retention(
4473        &self,
4474        req: S3Request<PutObjectRetentionInput>,
4475    ) -> S3Result<S3Response<PutObjectRetentionOutput>> {
4476        if let Some(mgr) = self.object_lock.as_ref() {
4477            let bucket = req.input.bucket.clone();
4478            let key = req.input.key.clone();
4479            let bypass = req.input.bypass_governance_retention.unwrap_or(false);
4480            let retention = req.input.retention.as_ref().ok_or_else(|| {
4481                S3Error::with_message(
4482                    S3ErrorCode::InvalidRequest,
4483                    "PutObjectRetention requires a Retention element",
4484                )
4485            })?;
4486            let new_mode = retention
4487                .mode
4488                .as_ref()
4489                .and_then(|m| crate::object_lock::LockMode::from_aws_str(m.as_str()));
4490            let new_until = retention
4491                .retain_until_date
4492                .as_ref()
4493                .map(timestamp_to_chrono_utc)
4494                .unwrap_or(None);
4495            let now = chrono::Utc::now();
4496            let existing = mgr.get(&bucket, &key).unwrap_or_default();
4497            // S3 immutability rules:
4498            //   - Compliance is one-way: once set, mode cannot move to
4499            //     Governance, and retain-until cannot be shortened.
4500            //   - Governance can be lengthened freely; shortened only
4501            //     with bypass=true.
4502            if let Some(existing_mode) = existing.mode
4503                && existing_mode == crate::object_lock::LockMode::Compliance
4504                && existing.is_locked(now)
4505            {
4506                if matches!(new_mode, Some(crate::object_lock::LockMode::Governance)) {
4507                    return Err(S3Error::with_message(
4508                        S3ErrorCode::AccessDenied,
4509                        "Cannot downgrade Compliance retention to Governance while lock is active",
4510                    ));
4511                }
4512                if let (Some(prev), Some(next)) = (existing.retain_until, new_until)
4513                    && next < prev
4514                {
4515                    return Err(S3Error::with_message(
4516                        S3ErrorCode::AccessDenied,
4517                        "Cannot shorten Compliance retention while lock is active",
4518                    ));
4519                }
4520            }
4521            if let Some(existing_mode) = existing.mode
4522                && existing_mode == crate::object_lock::LockMode::Governance
4523                && existing.is_locked(now)
4524                && !bypass
4525                && let (Some(prev), Some(next)) = (existing.retain_until, new_until)
4526                && next < prev
4527            {
4528                return Err(S3Error::with_message(
4529                    S3ErrorCode::AccessDenied,
4530                    "Shortening Governance retention requires x-amz-bypass-governance-retention: true",
4531                ));
4532            }
4533            let mut state = existing;
4534            if new_mode.is_some() {
4535                state.mode = new_mode;
4536            }
4537            if new_until.is_some() {
4538                state.retain_until = new_until;
4539            }
4540            mgr.set(&bucket, &key, state);
4541            return Ok(S3Response::new(PutObjectRetentionOutput::default()));
4542        }
4543        self.backend.put_object_retention(req).await
4544    }
4545
4546    // ---- Versioning ----
4547    // list_object_versions is implemented above in the compression-hook
4548    // section so it filters S4-internal sidecars (v0.4 #17) AND, when a
4549    // VersioningManager is attached (v0.5 #34), serves chains directly
4550    // from the in-memory index.
4551    async fn get_bucket_versioning(
4552        &self,
4553        req: S3Request<GetBucketVersioningInput>,
4554    ) -> S3Result<S3Response<GetBucketVersioningOutput>> {
4555        // v0.5 #34: when a VersioningManager is attached, the bucket's
4556        // versioning state lives in the manager (= S4-server's
4557        // authoritative source). Pass-through hits the backend only
4558        // when no manager is configured (legacy v0.4 behaviour).
4559        if let Some(mgr) = self.versioning.as_ref() {
4560            let output = match mgr.state(&req.input.bucket).as_aws_status() {
4561                Some(s) => GetBucketVersioningOutput {
4562                    status: Some(BucketVersioningStatus::from(s.to_owned())),
4563                    ..Default::default()
4564                },
4565                None => GetBucketVersioningOutput::default(),
4566            };
4567            return Ok(S3Response::new(output));
4568        }
4569        self.backend.get_bucket_versioning(req).await
4570    }
4571    async fn put_bucket_versioning(
4572        &self,
4573        req: S3Request<PutBucketVersioningInput>,
4574    ) -> S3Result<S3Response<PutBucketVersioningOutput>> {
4575        // v0.6 #42: MFA gating on the `PutBucketVersioning` request
4576        // itself. S3 spec: when the request body carries an
4577        // `MfaDelete` element (either `Enabled` or `Disabled`), the
4578        // request must include a valid `x-amz-mfa` token — both for
4579        // the *first* enable (so the operator can't quietly side-step
4580        // the gate by never enabling it) and for any subsequent
4581        // change (so a leaked credential alone can't disable MFA
4582        // Delete to bypass it on subsequent DELETEs). Requests that
4583        // omit the `MfaDelete` element entirely (i.e. they flip only
4584        // `Status`) skip this gate, matching AWS.
4585        if let Some(mgr) = self.mfa_delete.as_ref()
4586            && let Some(target_enabled) = req
4587                .input
4588                .versioning_configuration
4589                .mfa_delete
4590                .as_ref()
4591                .map(|m| m.as_str().eq_ignore_ascii_case("Enabled"))
4592        {
4593            let bucket = req.input.bucket.clone();
4594            let header = req.input.mfa.as_deref();
4595            let secret = mgr.lookup_secret(&bucket);
4596            let verified = match (header, secret.as_ref()) {
4597                (Some(h), Some(s)) => match crate::mfa::parse_mfa_header(h) {
4598                    Ok((serial, code)) => {
4599                        serial == s.serial
4600                            && crate::mfa::verify_totp(
4601                                &s.secret_base32,
4602                                &code,
4603                                current_unix_secs(),
4604                            )
4605                    }
4606                    Err(_) => false,
4607                },
4608                _ => false,
4609            };
4610            if !verified {
4611                crate::metrics::record_mfa_delete_denial(&bucket);
4612                let err = if header.is_none() {
4613                    crate::mfa::MfaError::Missing
4614                } else {
4615                    crate::mfa::MfaError::InvalidCode
4616                };
4617                return Err(mfa_error_to_s3(err));
4618            }
4619            mgr.set_bucket_state(&bucket, target_enabled);
4620        }
4621        // v0.5 #34: stash the new state in the manager, then forward to
4622        // the backend so any downstream that *also* tracks state
4623        // (e.g. a real S3 backend) stays in sync. Manager-attached but
4624        // backend rejection is treated as a soft-fail (state is still
4625        // owned by the manager).
4626        if let Some(mgr) = self.versioning.as_ref() {
4627            let new_state = match req
4628                .input
4629                .versioning_configuration
4630                .status
4631                .as_ref()
4632                .map(|s| s.as_str())
4633            {
4634                Some(s) if s.eq_ignore_ascii_case("Enabled") => {
4635                    crate::versioning::VersioningState::Enabled
4636                }
4637                Some(s) if s.eq_ignore_ascii_case("Suspended") => {
4638                    crate::versioning::VersioningState::Suspended
4639                }
4640                _ => crate::versioning::VersioningState::Unversioned,
4641            };
4642            mgr.set_state(&req.input.bucket, new_state);
4643            return Ok(S3Response::new(PutBucketVersioningOutput::default()));
4644        }
4645        self.backend.put_bucket_versioning(req).await
4646    }
4647
4648    // ---- Bucket location ----
4649    async fn get_bucket_location(
4650        &self,
4651        req: S3Request<GetBucketLocationInput>,
4652    ) -> S3Result<S3Response<GetBucketLocationOutput>> {
4653        self.backend.get_bucket_location(req).await
4654    }
4655
4656    // ---- Bucket policy ----
4657    async fn get_bucket_policy(
4658        &self,
4659        req: S3Request<GetBucketPolicyInput>,
4660    ) -> S3Result<S3Response<GetBucketPolicyOutput>> {
4661        self.backend.get_bucket_policy(req).await
4662    }
4663    async fn put_bucket_policy(
4664        &self,
4665        req: S3Request<PutBucketPolicyInput>,
4666    ) -> S3Result<S3Response<PutBucketPolicyOutput>> {
4667        self.backend.put_bucket_policy(req).await
4668    }
4669    async fn delete_bucket_policy(
4670        &self,
4671        req: S3Request<DeleteBucketPolicyInput>,
4672    ) -> S3Result<S3Response<DeleteBucketPolicyOutput>> {
4673        self.backend.delete_bucket_policy(req).await
4674    }
4675    async fn get_bucket_policy_status(
4676        &self,
4677        req: S3Request<GetBucketPolicyStatusInput>,
4678    ) -> S3Result<S3Response<GetBucketPolicyStatusOutput>> {
4679        self.backend.get_bucket_policy_status(req).await
4680    }
4681
4682    // ---- Bucket ACL ----
4683    async fn get_bucket_acl(
4684        &self,
4685        req: S3Request<GetBucketAclInput>,
4686    ) -> S3Result<S3Response<GetBucketAclOutput>> {
4687        self.backend.get_bucket_acl(req).await
4688    }
4689    async fn put_bucket_acl(
4690        &self,
4691        req: S3Request<PutBucketAclInput>,
4692    ) -> S3Result<S3Response<PutBucketAclOutput>> {
4693        self.backend.put_bucket_acl(req).await
4694    }
4695
4696    // ---- Bucket CORS (v0.6 #38) ----
4697    async fn get_bucket_cors(
4698        &self,
4699        req: S3Request<GetBucketCorsInput>,
4700    ) -> S3Result<S3Response<GetBucketCorsOutput>> {
4701        if let Some(mgr) = self.cors.as_ref() {
4702            let cfg = mgr.get(&req.input.bucket).ok_or_else(|| {
4703                S3Error::with_message(
4704                    S3ErrorCode::NoSuchCORSConfiguration,
4705                    "The CORS configuration does not exist".to_string(),
4706                )
4707            })?;
4708            let rules: Vec<CORSRule> = cfg
4709                .rules
4710                .into_iter()
4711                .map(|r| CORSRule {
4712                    allowed_headers: if r.allowed_headers.is_empty() {
4713                        None
4714                    } else {
4715                        Some(r.allowed_headers)
4716                    },
4717                    allowed_methods: r.allowed_methods,
4718                    allowed_origins: r.allowed_origins,
4719                    expose_headers: if r.expose_headers.is_empty() {
4720                        None
4721                    } else {
4722                        Some(r.expose_headers)
4723                    },
4724                    id: r.id,
4725                    max_age_seconds: r.max_age_seconds.map(|s| s as i32),
4726                })
4727                .collect();
4728            return Ok(S3Response::new(GetBucketCorsOutput {
4729                cors_rules: Some(rules),
4730            }));
4731        }
4732        self.backend.get_bucket_cors(req).await
4733    }
4734    async fn put_bucket_cors(
4735        &self,
4736        req: S3Request<PutBucketCorsInput>,
4737    ) -> S3Result<S3Response<PutBucketCorsOutput>> {
4738        if let Some(mgr) = self.cors.as_ref() {
4739            let cfg = crate::cors::CorsConfig {
4740                rules: req
4741                    .input
4742                    .cors_configuration
4743                    .cors_rules
4744                    .into_iter()
4745                    .map(|r| crate::cors::CorsRule {
4746                        allowed_origins: r.allowed_origins,
4747                        allowed_methods: r.allowed_methods,
4748                        allowed_headers: r.allowed_headers.unwrap_or_default(),
4749                        expose_headers: r.expose_headers.unwrap_or_default(),
4750                        max_age_seconds: r.max_age_seconds.and_then(|s| {
4751                            if s < 0 { None } else { Some(s as u32) }
4752                        }),
4753                        id: r.id,
4754                    })
4755                    .collect(),
4756            };
4757            mgr.put(&req.input.bucket, cfg);
4758            return Ok(S3Response::new(PutBucketCorsOutput::default()));
4759        }
4760        self.backend.put_bucket_cors(req).await
4761    }
4762    async fn delete_bucket_cors(
4763        &self,
4764        req: S3Request<DeleteBucketCorsInput>,
4765    ) -> S3Result<S3Response<DeleteBucketCorsOutput>> {
4766        if let Some(mgr) = self.cors.as_ref() {
4767            mgr.delete(&req.input.bucket);
4768            return Ok(S3Response::new(DeleteBucketCorsOutput::default()));
4769        }
4770        self.backend.delete_bucket_cors(req).await
4771    }
4772
4773    // ---- Bucket lifecycle (v0.6 #37) ----
4774    async fn get_bucket_lifecycle_configuration(
4775        &self,
4776        req: S3Request<GetBucketLifecycleConfigurationInput>,
4777    ) -> S3Result<S3Response<GetBucketLifecycleConfigurationOutput>> {
4778        if let Some(mgr) = self.lifecycle.as_ref() {
4779            let cfg = mgr.get(&req.input.bucket).ok_or_else(|| {
4780                S3Error::with_message(
4781                    S3ErrorCode::NoSuchLifecycleConfiguration,
4782                    "The lifecycle configuration does not exist".to_string(),
4783                )
4784            })?;
4785            let rules: Vec<LifecycleRule> = cfg.rules.iter().map(internal_rule_to_dto).collect();
4786            return Ok(S3Response::new(GetBucketLifecycleConfigurationOutput {
4787                rules: Some(rules),
4788                transition_default_minimum_object_size: None,
4789            }));
4790        }
4791        self.backend.get_bucket_lifecycle_configuration(req).await
4792    }
4793    async fn put_bucket_lifecycle_configuration(
4794        &self,
4795        req: S3Request<PutBucketLifecycleConfigurationInput>,
4796    ) -> S3Result<S3Response<PutBucketLifecycleConfigurationOutput>> {
4797        if let Some(mgr) = self.lifecycle.as_ref() {
4798            let bucket = req.input.bucket.clone();
4799            let dto_cfg = req.input.lifecycle_configuration.unwrap_or_default();
4800            let cfg = dto_lifecycle_to_internal(&dto_cfg);
4801            mgr.put(&bucket, cfg);
4802            return Ok(S3Response::new(
4803                PutBucketLifecycleConfigurationOutput::default(),
4804            ));
4805        }
4806        self.backend.put_bucket_lifecycle_configuration(req).await
4807    }
4808    async fn delete_bucket_lifecycle(
4809        &self,
4810        req: S3Request<DeleteBucketLifecycleInput>,
4811    ) -> S3Result<S3Response<DeleteBucketLifecycleOutput>> {
4812        if let Some(mgr) = self.lifecycle.as_ref() {
4813            mgr.delete(&req.input.bucket);
4814            return Ok(S3Response::new(DeleteBucketLifecycleOutput::default()));
4815        }
4816        self.backend.delete_bucket_lifecycle(req).await
4817    }
4818
4819    // ---- Bucket tagging (v0.6 #39) ----
4820    async fn get_bucket_tagging(
4821        &self,
4822        req: S3Request<GetBucketTaggingInput>,
4823    ) -> S3Result<S3Response<GetBucketTaggingOutput>> {
4824        let Some(mgr) = self.tagging.as_ref() else {
4825            return self.backend.get_bucket_tagging(req).await;
4826        };
4827        let tags = mgr.get_bucket_tags(&req.input.bucket).unwrap_or_default();
4828        Ok(S3Response::new(GetBucketTaggingOutput {
4829            tag_set: tagset_to_aws(&tags),
4830        }))
4831    }
4832    async fn put_bucket_tagging(
4833        &self,
4834        req: S3Request<PutBucketTaggingInput>,
4835    ) -> S3Result<S3Response<PutBucketTaggingOutput>> {
4836        let Some(mgr) = self.tagging.as_ref() else {
4837            return self.backend.put_bucket_tagging(req).await;
4838        };
4839        let bucket = req.input.bucket.clone();
4840        let parsed = aws_to_tagset(&req.input.tagging.tag_set).map_err(|e| {
4841            S3Error::with_message(S3ErrorCode::InvalidArgument, e.to_string())
4842        })?;
4843        self.enforce_policy(&req, "s3:PutBucketTagging", &bucket, None)?;
4844        mgr.put_bucket_tags(&bucket, parsed);
4845        Ok(S3Response::new(PutBucketTaggingOutput::default()))
4846    }
4847    async fn delete_bucket_tagging(
4848        &self,
4849        req: S3Request<DeleteBucketTaggingInput>,
4850    ) -> S3Result<S3Response<DeleteBucketTaggingOutput>> {
4851        let Some(mgr) = self.tagging.as_ref() else {
4852            return self.backend.delete_bucket_tagging(req).await;
4853        };
4854        let bucket = req.input.bucket.clone();
4855        self.enforce_policy(&req, "s3:PutBucketTagging", &bucket, None)?;
4856        mgr.delete_bucket_tags(&bucket);
4857        Ok(S3Response::new(DeleteBucketTaggingOutput::default()))
4858    }
4859
4860    // ---- Bucket encryption ----
4861    async fn get_bucket_encryption(
4862        &self,
4863        req: S3Request<GetBucketEncryptionInput>,
4864    ) -> S3Result<S3Response<GetBucketEncryptionOutput>> {
4865        self.backend.get_bucket_encryption(req).await
4866    }
4867    async fn put_bucket_encryption(
4868        &self,
4869        req: S3Request<PutBucketEncryptionInput>,
4870    ) -> S3Result<S3Response<PutBucketEncryptionOutput>> {
4871        self.backend.put_bucket_encryption(req).await
4872    }
4873    async fn delete_bucket_encryption(
4874        &self,
4875        req: S3Request<DeleteBucketEncryptionInput>,
4876    ) -> S3Result<S3Response<DeleteBucketEncryptionOutput>> {
4877        self.backend.delete_bucket_encryption(req).await
4878    }
4879
4880    // ---- Bucket logging ----
4881    async fn get_bucket_logging(
4882        &self,
4883        req: S3Request<GetBucketLoggingInput>,
4884    ) -> S3Result<S3Response<GetBucketLoggingOutput>> {
4885        self.backend.get_bucket_logging(req).await
4886    }
4887    async fn put_bucket_logging(
4888        &self,
4889        req: S3Request<PutBucketLoggingInput>,
4890    ) -> S3Result<S3Response<PutBucketLoggingOutput>> {
4891        self.backend.put_bucket_logging(req).await
4892    }
4893
4894    // ---- Bucket notification (v0.6 #35) ----
4895    //
4896    // When a `NotificationManager` is attached, S4 itself owns per-bucket
4897    // notification configurations and the PUT / GET handlers route through
4898    // the manager. The wire DTO's queue / topic configurations map onto
4899    // S4's `Destination::Sqs` / `Destination::Sns`; LambdaFunction and
4900    // EventBridge configurations are accepted on PUT but silently dropped
4901    // (out of scope for v0.6 #35). When no manager is attached the legacy
4902    // backend-passthrough behaviour applies.
4903    async fn get_bucket_notification_configuration(
4904        &self,
4905        req: S3Request<GetBucketNotificationConfigurationInput>,
4906    ) -> S3Result<S3Response<GetBucketNotificationConfigurationOutput>> {
4907        if let Some(mgr) = self.notifications.as_ref() {
4908            let cfg = mgr.get(&req.input.bucket).unwrap_or_default();
4909            let dto = notif_to_dto(&cfg);
4910            return Ok(S3Response::new(GetBucketNotificationConfigurationOutput {
4911                event_bridge_configuration: dto.event_bridge_configuration,
4912                lambda_function_configurations: dto.lambda_function_configurations,
4913                queue_configurations: dto.queue_configurations,
4914                topic_configurations: dto.topic_configurations,
4915            }));
4916        }
4917        self.backend
4918            .get_bucket_notification_configuration(req)
4919            .await
4920    }
4921    async fn put_bucket_notification_configuration(
4922        &self,
4923        req: S3Request<PutBucketNotificationConfigurationInput>,
4924    ) -> S3Result<S3Response<PutBucketNotificationConfigurationOutput>> {
4925        if let Some(mgr) = self.notifications.as_ref() {
4926            let cfg = notif_from_dto(&req.input.notification_configuration);
4927            mgr.put(&req.input.bucket, cfg);
4928            return Ok(S3Response::new(
4929                PutBucketNotificationConfigurationOutput::default(),
4930            ));
4931        }
4932        self.backend
4933            .put_bucket_notification_configuration(req)
4934            .await
4935    }
4936
4937    // ---- Bucket request payment ----
4938    async fn get_bucket_request_payment(
4939        &self,
4940        req: S3Request<GetBucketRequestPaymentInput>,
4941    ) -> S3Result<S3Response<GetBucketRequestPaymentOutput>> {
4942        self.backend.get_bucket_request_payment(req).await
4943    }
4944    async fn put_bucket_request_payment(
4945        &self,
4946        req: S3Request<PutBucketRequestPaymentInput>,
4947    ) -> S3Result<S3Response<PutBucketRequestPaymentOutput>> {
4948        self.backend.put_bucket_request_payment(req).await
4949    }
4950
4951    // ---- Bucket website ----
4952    async fn get_bucket_website(
4953        &self,
4954        req: S3Request<GetBucketWebsiteInput>,
4955    ) -> S3Result<S3Response<GetBucketWebsiteOutput>> {
4956        self.backend.get_bucket_website(req).await
4957    }
4958    async fn put_bucket_website(
4959        &self,
4960        req: S3Request<PutBucketWebsiteInput>,
4961    ) -> S3Result<S3Response<PutBucketWebsiteOutput>> {
4962        self.backend.put_bucket_website(req).await
4963    }
4964    async fn delete_bucket_website(
4965        &self,
4966        req: S3Request<DeleteBucketWebsiteInput>,
4967    ) -> S3Result<S3Response<DeleteBucketWebsiteOutput>> {
4968        self.backend.delete_bucket_website(req).await
4969    }
4970
4971    // ---- Bucket replication (v0.6 #40) ----
4972    async fn get_bucket_replication(
4973        &self,
4974        req: S3Request<GetBucketReplicationInput>,
4975    ) -> S3Result<S3Response<GetBucketReplicationOutput>> {
4976        if let Some(mgr) = self.replication.as_ref() {
4977            return match mgr.get(&req.input.bucket) {
4978                Some(cfg) => Ok(S3Response::new(GetBucketReplicationOutput {
4979                    replication_configuration: Some(replication_to_dto(&cfg)),
4980                })),
4981                None => Err(S3Error::with_message(
4982                    S3ErrorCode::Custom("ReplicationConfigurationNotFoundError".into()),
4983                    format!("no replication configuration on bucket {}", req.input.bucket),
4984                )),
4985            };
4986        }
4987        self.backend.get_bucket_replication(req).await
4988    }
4989    async fn put_bucket_replication(
4990        &self,
4991        req: S3Request<PutBucketReplicationInput>,
4992    ) -> S3Result<S3Response<PutBucketReplicationOutput>> {
4993        if let Some(mgr) = self.replication.as_ref() {
4994            let cfg = replication_from_dto(&req.input.replication_configuration);
4995            mgr.put(&req.input.bucket, cfg);
4996            return Ok(S3Response::new(PutBucketReplicationOutput::default()));
4997        }
4998        self.backend.put_bucket_replication(req).await
4999    }
5000    async fn delete_bucket_replication(
5001        &self,
5002        req: S3Request<DeleteBucketReplicationInput>,
5003    ) -> S3Result<S3Response<DeleteBucketReplicationOutput>> {
5004        if let Some(mgr) = self.replication.as_ref() {
5005            mgr.delete(&req.input.bucket);
5006            return Ok(S3Response::new(DeleteBucketReplicationOutput::default()));
5007        }
5008        self.backend.delete_bucket_replication(req).await
5009    }
5010
5011    // ---- Bucket accelerate ----
5012    async fn get_bucket_accelerate_configuration(
5013        &self,
5014        req: S3Request<GetBucketAccelerateConfigurationInput>,
5015    ) -> S3Result<S3Response<GetBucketAccelerateConfigurationOutput>> {
5016        self.backend.get_bucket_accelerate_configuration(req).await
5017    }
5018    async fn put_bucket_accelerate_configuration(
5019        &self,
5020        req: S3Request<PutBucketAccelerateConfigurationInput>,
5021    ) -> S3Result<S3Response<PutBucketAccelerateConfigurationOutput>> {
5022        self.backend.put_bucket_accelerate_configuration(req).await
5023    }
5024
5025    // ---- Bucket ownership controls ----
5026    async fn get_bucket_ownership_controls(
5027        &self,
5028        req: S3Request<GetBucketOwnershipControlsInput>,
5029    ) -> S3Result<S3Response<GetBucketOwnershipControlsOutput>> {
5030        self.backend.get_bucket_ownership_controls(req).await
5031    }
5032    async fn put_bucket_ownership_controls(
5033        &self,
5034        req: S3Request<PutBucketOwnershipControlsInput>,
5035    ) -> S3Result<S3Response<PutBucketOwnershipControlsOutput>> {
5036        self.backend.put_bucket_ownership_controls(req).await
5037    }
5038    async fn delete_bucket_ownership_controls(
5039        &self,
5040        req: S3Request<DeleteBucketOwnershipControlsInput>,
5041    ) -> S3Result<S3Response<DeleteBucketOwnershipControlsOutput>> {
5042        self.backend.delete_bucket_ownership_controls(req).await
5043    }
5044
5045    // ---- Public access block ----
5046    async fn get_public_access_block(
5047        &self,
5048        req: S3Request<GetPublicAccessBlockInput>,
5049    ) -> S3Result<S3Response<GetPublicAccessBlockOutput>> {
5050        self.backend.get_public_access_block(req).await
5051    }
5052    async fn put_public_access_block(
5053        &self,
5054        req: S3Request<PutPublicAccessBlockInput>,
5055    ) -> S3Result<S3Response<PutPublicAccessBlockOutput>> {
5056        self.backend.put_public_access_block(req).await
5057    }
5058    async fn delete_public_access_block(
5059        &self,
5060        req: S3Request<DeletePublicAccessBlockInput>,
5061    ) -> S3Result<S3Response<DeletePublicAccessBlockOutput>> {
5062        self.backend.delete_public_access_block(req).await
5063    }
5064
5065    // ====================================================================
5066    // v0.6 #41: S3 Select — server-side SQL filter on object body.
5067    //
5068    // Fetch the object via the regular `get_object` path (so SSE-C /
5069    // SSE-S4 / SSE-KMS / S4 codec all decompress + decrypt transparently),
5070    // run a small SQL subset (CSV + JSON Lines, equality / inequality /
5071    // LIKE / AND / OR / NOT) over the in-memory body, and stream the
5072    // matched rows back as AWS event-stream `Records` + `Stats` + `End`
5073    // frames.
5074    //
5075    // Limitations (deliberate, documented):
5076    //   - Parquet input is rejected with NotImplemented.
5077    //   - Aggregates / GROUP BY / JOIN / ORDER BY / LIMIT are rejected at
5078    //     parse time as InvalidRequest (s3s 0.13 doesn't expose AWS's
5079    //     domain-specific `InvalidSqlExpression` code).
5080    //   - The body is fully buffered before SQL evaluation (S3 Select
5081    //     streaming-during-evaluation is v0.7 scope).
5082    //   - GPU-accelerated WHERE evaluation is stubbed out (always None).
5083    async fn select_object_content(
5084        &self,
5085        req: S3Request<SelectObjectContentInput>,
5086    ) -> S3Result<S3Response<SelectObjectContentOutput>> {
5087        use crate::select::{
5088            EventStreamWriter, SelectInputFormat, SelectOutputFormat, run_select_csv,
5089            run_select_jsonlines,
5090        };
5091
5092        let select_bucket = req.input.bucket.clone();
5093        let select_key = req.input.key.clone();
5094        self.enforce_rate_limit(&req, &select_bucket)?;
5095        self.enforce_policy(
5096            &req,
5097            "s3:GetObject",
5098            &select_bucket,
5099            Some(&select_key),
5100        )?;
5101
5102        let request = req.input.request;
5103        let sql = request.expression.clone();
5104        if request.expression_type.as_str() != "SQL" {
5105            return Err(S3Error::with_message(
5106                S3ErrorCode::InvalidExpressionType,
5107                format!(
5108                    "ExpressionType must be SQL, got: {}",
5109                    request.expression_type.as_str()
5110                ),
5111            ));
5112        }
5113
5114        let input_format = if let Some(_json) = request.input_serialization.json.as_ref() {
5115            SelectInputFormat::JsonLines
5116        } else if let Some(csv) = request.input_serialization.csv.as_ref() {
5117            let has_header = csv
5118                .file_header_info
5119                .as_ref()
5120                .map(|h| {
5121                    let s = h.as_str();
5122                    s.eq_ignore_ascii_case("USE") || s.eq_ignore_ascii_case("IGNORE")
5123                })
5124                .unwrap_or(false);
5125            let delim = csv
5126                .field_delimiter
5127                .as_deref()
5128                .and_then(|s| s.chars().next())
5129                .unwrap_or(',');
5130            SelectInputFormat::Csv {
5131                has_header,
5132                delimiter: delim,
5133            }
5134        } else if request.input_serialization.parquet.is_some() {
5135            return Err(S3Error::with_message(
5136                S3ErrorCode::NotImplemented,
5137                "Parquet input is not supported by this S3 Select implementation (v0.6: CSV / JSON Lines only)",
5138            ));
5139        } else {
5140            return Err(S3Error::with_message(
5141                S3ErrorCode::InvalidRequest,
5142                "InputSerialization requires exactly one of CSV / JSON / Parquet",
5143            ));
5144        };
5145        if let Some(ct) = request.input_serialization.compression_type.as_ref()
5146            && !ct.as_str().eq_ignore_ascii_case("NONE")
5147        {
5148            return Err(S3Error::with_message(
5149                S3ErrorCode::NotImplemented,
5150                format!(
5151                    "InputSerialization CompressionType={} is not supported (v0.6: NONE only)",
5152                    ct.as_str()
5153                ),
5154            ));
5155        }
5156
5157        let output_format = if request.output_serialization.json.is_some() {
5158            SelectOutputFormat::Json
5159        } else if request.output_serialization.csv.is_some() {
5160            SelectOutputFormat::Csv
5161        } else {
5162            return Err(S3Error::with_message(
5163                S3ErrorCode::InvalidRequest,
5164                "OutputSerialization requires exactly one of CSV / JSON",
5165            ));
5166        };
5167
5168        let get_input = GetObjectInput {
5169            bucket: select_bucket.clone(),
5170            key: select_key.clone(),
5171            sse_customer_algorithm: req.input.sse_customer_algorithm.clone(),
5172            sse_customer_key: req.input.sse_customer_key.clone(),
5173            sse_customer_key_md5: req.input.sse_customer_key_md5.clone(),
5174            ..Default::default()
5175        };
5176        let get_req = S3Request {
5177            input: get_input,
5178            method: http::Method::GET,
5179            uri: format!("/{}/{}", select_bucket, select_key)
5180                .parse()
5181                .map_err(|e| {
5182                    S3Error::with_message(
5183                        S3ErrorCode::InternalError,
5184                        format!("constructing inner GET URI: {e}"),
5185                    )
5186                })?,
5187            headers: http::HeaderMap::new(),
5188            extensions: http::Extensions::new(),
5189            credentials: req.credentials.clone(),
5190            region: req.region.clone(),
5191            service: req.service.clone(),
5192            trailing_headers: None,
5193        };
5194        let mut get_resp = self.get_object(get_req).await?;
5195        let blob = get_resp.output.body.take().ok_or_else(|| {
5196            S3Error::with_message(
5197                S3ErrorCode::InternalError,
5198                "Select: object body was empty after GET",
5199            )
5200        })?;
5201        let body_bytes = crate::blob::collect_blob(blob, self.max_body_bytes)
5202            .await
5203            .map_err(internal("collect Select body"))?;
5204        let scanned = body_bytes.len() as u64;
5205
5206        let matched_payload = match input_format {
5207            SelectInputFormat::JsonLines => {
5208                run_select_jsonlines(&sql, &body_bytes, output_format).map_err(
5209                    |e| select_error_to_s3(e, "JSON Lines"),
5210                )?
5211            }
5212            SelectInputFormat::Csv { .. } => {
5213                run_select_csv(&sql, &body_bytes, input_format, output_format)
5214                    .map_err(|e| select_error_to_s3(e, "CSV"))?
5215            }
5216        };
5217
5218        let returned = matched_payload.len() as u64;
5219        let processed = scanned;
5220        let mut events: Vec<S3Result<SelectObjectContentEvent>> = Vec::with_capacity(3);
5221        if !matched_payload.is_empty() {
5222            events.push(Ok(SelectObjectContentEvent::Records(RecordsEvent {
5223                payload: Some(bytes::Bytes::from(matched_payload)),
5224            })));
5225        }
5226        events.push(Ok(SelectObjectContentEvent::Stats(StatsEvent {
5227            details: Some(Stats {
5228                bytes_scanned: Some(scanned as i64),
5229                bytes_processed: Some(processed as i64),
5230                bytes_returned: Some(returned as i64),
5231            }),
5232        })));
5233        events.push(Ok(SelectObjectContentEvent::End(EndEvent {})));
5234        // Touch EventStreamWriter so the public API stays linked into the
5235        // build (the actual wire framing is delegated to s3s).
5236        let _writer = EventStreamWriter::new();
5237
5238        let stream =
5239            SelectObjectContentEventStream::new(futures::stream::iter(events));
5240        let output = SelectObjectContentOutput {
5241            payload: Some(stream),
5242        };
5243        Ok(S3Response::new(output))
5244    }
5245
5246    // ---- Bucket Inventory configuration (v0.6 #36) ----
5247    //
5248    // When an `InventoryManager` is attached, S4-server owns the
5249    // configuration store and these handlers no longer pass through to
5250    // the backend. The mapping between the s3s-typed
5251    // `InventoryConfiguration` and the inventory module's internal
5252    // `InventoryConfig` is intentionally lossy: only the fields S4
5253    // actually uses for periodic CSV emission survive the round trip
5254    // (id, source bucket, destination bucket / prefix, format, included
5255    // versions, schedule frequency). Optional fields, encryption, and
5256    // filter prefixes are accepted on PUT and re-surfaced on GET via
5257    // a best-effort default-shape `InventoryConfiguration` so the
5258    // client sees a roundtrip-clean response.
5259    async fn put_bucket_inventory_configuration(
5260        &self,
5261        req: S3Request<PutBucketInventoryConfigurationInput>,
5262    ) -> S3Result<S3Response<PutBucketInventoryConfigurationOutput>> {
5263        if let Some(mgr) = self.inventory.as_ref() {
5264            let cfg = inv_from_dto(
5265                &req.input.bucket,
5266                &req.input.id,
5267                &req.input.inventory_configuration,
5268            );
5269            mgr.put(cfg);
5270            return Ok(S3Response::new(PutBucketInventoryConfigurationOutput::default()));
5271        }
5272        self.backend.put_bucket_inventory_configuration(req).await
5273    }
5274
5275    async fn get_bucket_inventory_configuration(
5276        &self,
5277        req: S3Request<GetBucketInventoryConfigurationInput>,
5278    ) -> S3Result<S3Response<GetBucketInventoryConfigurationOutput>> {
5279        if let Some(mgr) = self.inventory.as_ref() {
5280            let cfg = mgr.get(&req.input.bucket, &req.input.id);
5281            if let Some(cfg) = cfg {
5282                let out = GetBucketInventoryConfigurationOutput {
5283                    inventory_configuration: Some(inv_to_dto(&cfg)),
5284                };
5285                return Ok(S3Response::new(out));
5286            }
5287            // AWS returns `NoSuchConfiguration` (404) when the id has no
5288            // matching inventory configuration on the bucket. The
5289            // generated `S3ErrorCode` enum doesn't expose a typed variant
5290            // for this code, so we round-trip through `from_bytes` which
5291            // wraps unknown codes as `Custom(...)` (= the AWS-canonical
5292            // error-code string survives into the XML response envelope).
5293            let code = S3ErrorCode::from_bytes(b"NoSuchConfiguration")
5294                .unwrap_or(S3ErrorCode::NoSuchKey);
5295            return Err(S3Error::with_message(
5296                code,
5297                format!(
5298                    "no inventory configuration with id={} on bucket={}",
5299                    req.input.id, req.input.bucket
5300                ),
5301            ));
5302        }
5303        self.backend.get_bucket_inventory_configuration(req).await
5304    }
5305
5306    async fn list_bucket_inventory_configurations(
5307        &self,
5308        req: S3Request<ListBucketInventoryConfigurationsInput>,
5309    ) -> S3Result<S3Response<ListBucketInventoryConfigurationsOutput>> {
5310        if let Some(mgr) = self.inventory.as_ref() {
5311            let list = mgr.list_for_bucket(&req.input.bucket);
5312            let dto_list: Vec<InventoryConfiguration> = list.iter().map(inv_to_dto).collect();
5313            let out = ListBucketInventoryConfigurationsOutput {
5314                continuation_token: req.input.continuation_token.clone(),
5315                inventory_configuration_list: if dto_list.is_empty() {
5316                    None
5317                } else {
5318                    Some(dto_list)
5319                },
5320                is_truncated: Some(false),
5321                next_continuation_token: None,
5322            };
5323            return Ok(S3Response::new(out));
5324        }
5325        self.backend.list_bucket_inventory_configurations(req).await
5326    }
5327
5328    async fn delete_bucket_inventory_configuration(
5329        &self,
5330        req: S3Request<DeleteBucketInventoryConfigurationInput>,
5331    ) -> S3Result<S3Response<DeleteBucketInventoryConfigurationOutput>> {
5332        if let Some(mgr) = self.inventory.as_ref() {
5333            mgr.delete(&req.input.bucket, &req.input.id);
5334            return Ok(S3Response::new(
5335                DeleteBucketInventoryConfigurationOutput::default(),
5336            ));
5337        }
5338        self.backend.delete_bucket_inventory_configuration(req).await
5339    }
5340}
5341
5342// ---------------------------------------------------------------------------
5343// v0.6 #36: Convert between the s3s-typed `InventoryConfiguration` (the wire
5344// surface) and our internal `crate::inventory::InventoryConfig`. Only the
5345// fields S4 actually uses for CSV emission survive the round trip; the
5346// missing fields (filter prefix, optional fields, encryption) are dropped on
5347// PUT and re-rendered as the AWS-default shape on GET so the client sees a
5348// well-formed `InventoryConfiguration`.
5349// ---------------------------------------------------------------------------
5350
5351fn inv_from_dto(
5352    bucket: &str,
5353    id: &str,
5354    dto: &InventoryConfiguration,
5355) -> crate::inventory::InventoryConfig {
5356    let frequency_hours = match dto.schedule.frequency.as_str() {
5357        "Weekly" => 24 * 7,
5358        // Daily is the default; anything S4 doesn't recognise (incl.
5359        // empty, which is the s3s-default) maps to Daily so the
5360        // operator's PUT doesn't silently turn into a no-op cadence.
5361        _ => 24,
5362    };
5363    // Parquet/ORC are not supported (issue #36 scope); we still accept
5364    // the PUT so callers don't fail-loud, but we record CSV and rely on
5365    // the operator catching the discrepancy on GET.
5366    let format = crate::inventory::InventoryFormat::Csv;
5367    crate::inventory::InventoryConfig {
5368        id: id.to_owned(),
5369        bucket: bucket.to_owned(),
5370        destination_bucket: dto.destination.s3_bucket_destination.bucket.clone(),
5371        destination_prefix: dto
5372            .destination
5373            .s3_bucket_destination
5374            .prefix
5375            .clone()
5376            .unwrap_or_default(),
5377        frequency_hours,
5378        format,
5379        included_object_versions: crate::inventory::IncludedVersions::from_aws_str(
5380            dto.included_object_versions.as_str(),
5381        ),
5382    }
5383}
5384
5385fn inv_to_dto(cfg: &crate::inventory::InventoryConfig) -> InventoryConfiguration {
5386    InventoryConfiguration {
5387        id: cfg.id.clone(),
5388        is_enabled: true,
5389        included_object_versions: InventoryIncludedObjectVersions::from(
5390            cfg.included_object_versions.as_aws_str().to_owned(),
5391        ),
5392        destination: InventoryDestination {
5393            s3_bucket_destination: InventoryS3BucketDestination {
5394                account_id: None,
5395                bucket: cfg.destination_bucket.clone(),
5396                encryption: None,
5397                format: InventoryFormat::from(cfg.format.as_aws_str().to_owned()),
5398                prefix: if cfg.destination_prefix.is_empty() {
5399                    None
5400                } else {
5401                    Some(cfg.destination_prefix.clone())
5402                },
5403            },
5404        },
5405        schedule: InventorySchedule {
5406            // `frequency_hours == 168` -> Weekly; everything else maps to
5407            // Daily for the wire response (the manager keeps the precise
5408            // hour count internally for due-checking).
5409            frequency: InventoryFrequency::from(
5410                if cfg.frequency_hours == 24 * 7 {
5411                    "Weekly"
5412                } else {
5413                    "Daily"
5414                }
5415                .to_owned(),
5416            ),
5417        },
5418        filter: None,
5419        optional_fields: None,
5420    }
5421}
5422
5423// ---------------------------------------------------------------------------
5424// v0.6 #35: Convert between the s3s-typed `NotificationConfiguration` (the
5425// wire surface) and our internal `crate::notifications::NotificationConfig`.
5426//
5427// We support TopicConfiguration (-> Destination::Sns) and QueueConfiguration
5428// (-> Destination::Sqs). LambdaFunction and EventBridge configurations are
5429// silently dropped on PUT (out of scope for v0.6 #35); the GET response only
5430// surfaces topic / queue rules.
5431//
5432// The webhook destination has no AWS-native wire form: operators configure
5433// webhooks via the JSON snapshot file (`--notifications-state-file`) or by
5434// poking `NotificationManager::put` directly from a custom binary. This
5435// keeps the wire surface AWS-compatible while still letting the always-
5436// available `Webhook` destination be reachable.
5437// ---------------------------------------------------------------------------
5438
5439fn notif_from_dto(
5440    dto: &NotificationConfiguration,
5441) -> crate::notifications::NotificationConfig {
5442    let mut rules: Vec<crate::notifications::NotificationRule> = Vec::new();
5443    if let Some(topics) = dto.topic_configurations.as_ref() {
5444        for (idx, t) in topics.iter().enumerate() {
5445            let events = events_from_dto(&t.events);
5446            let (prefix, suffix) = filter_from_dto(t.filter.as_ref());
5447            rules.push(crate::notifications::NotificationRule {
5448                id: t.id.clone().unwrap_or_else(|| format!("topic-{idx}")),
5449                events,
5450                destination: crate::notifications::Destination::Sns {
5451                    topic_arn: t.topic_arn.clone(),
5452                },
5453                filter_prefix: prefix,
5454                filter_suffix: suffix,
5455            });
5456        }
5457    }
5458    if let Some(queues) = dto.queue_configurations.as_ref() {
5459        for (idx, q) in queues.iter().enumerate() {
5460            let events = events_from_dto(&q.events);
5461            let (prefix, suffix) = filter_from_dto(q.filter.as_ref());
5462            rules.push(crate::notifications::NotificationRule {
5463                id: q.id.clone().unwrap_or_else(|| format!("queue-{idx}")),
5464                events,
5465                destination: crate::notifications::Destination::Sqs {
5466                    queue_arn: q.queue_arn.clone(),
5467                },
5468                filter_prefix: prefix,
5469                filter_suffix: suffix,
5470            });
5471        }
5472    }
5473    crate::notifications::NotificationConfig { rules }
5474}
5475
5476fn notif_to_dto(
5477    cfg: &crate::notifications::NotificationConfig,
5478) -> NotificationConfiguration {
5479    let mut topics: Vec<TopicConfiguration> = Vec::new();
5480    let mut queues: Vec<QueueConfiguration> = Vec::new();
5481    for rule in &cfg.rules {
5482        let events: Vec<Event> = rule
5483            .events
5484            .iter()
5485            .map(|e| Event::from(e.as_aws_str().to_owned()))
5486            .collect();
5487        let filter = filter_to_dto(rule.filter_prefix.as_deref(), rule.filter_suffix.as_deref());
5488        match &rule.destination {
5489            crate::notifications::Destination::Sns { topic_arn } => {
5490                topics.push(TopicConfiguration {
5491                    events,
5492                    filter,
5493                    id: Some(rule.id.clone()),
5494                    topic_arn: topic_arn.clone(),
5495                });
5496            }
5497            crate::notifications::Destination::Sqs { queue_arn } => {
5498                queues.push(QueueConfiguration {
5499                    events,
5500                    filter,
5501                    id: Some(rule.id.clone()),
5502                    queue_arn: queue_arn.clone(),
5503                });
5504            }
5505            // Webhook destinations have no AWS wire equivalent — they
5506            // round-trip through the JSON snapshot only. Skip them on the
5507            // GET surface (an SDK consumer wouldn't know what to do with
5508            // them anyway).
5509            crate::notifications::Destination::Webhook { .. } => {}
5510        }
5511    }
5512    NotificationConfiguration {
5513        event_bridge_configuration: None,
5514        lambda_function_configurations: None,
5515        queue_configurations: if queues.is_empty() { None } else { Some(queues) },
5516        topic_configurations: if topics.is_empty() { None } else { Some(topics) },
5517    }
5518}
5519
5520fn events_from_dto(events: &[Event]) -> Vec<crate::notifications::EventType> {
5521    events
5522        .iter()
5523        .filter_map(|e| crate::notifications::EventType::from_aws_str(e.as_ref()))
5524        .collect()
5525}
5526
5527fn filter_from_dto(
5528    f: Option<&NotificationConfigurationFilter>,
5529) -> (Option<String>, Option<String>) {
5530    let Some(f) = f else {
5531        return (None, None);
5532    };
5533    let Some(key) = f.key.as_ref() else {
5534        return (None, None);
5535    };
5536    let Some(rules) = key.filter_rules.as_ref() else {
5537        return (None, None);
5538    };
5539    let mut prefix = None;
5540    let mut suffix = None;
5541    for r in rules {
5542        let name = r.name.as_ref().map(|n| n.as_str().to_ascii_lowercase());
5543        let value = r.value.clone();
5544        match name.as_deref() {
5545            Some("prefix") => prefix = value,
5546            Some("suffix") => suffix = value,
5547            _ => {}
5548        }
5549    }
5550    (prefix, suffix)
5551}
5552
5553fn filter_to_dto(
5554    prefix: Option<&str>,
5555    suffix: Option<&str>,
5556) -> Option<NotificationConfigurationFilter> {
5557    if prefix.is_none() && suffix.is_none() {
5558        return None;
5559    }
5560    let mut rules: Vec<FilterRule> = Vec::new();
5561    if let Some(p) = prefix {
5562        rules.push(FilterRule {
5563            name: Some(FilterRuleName::from("prefix".to_owned())),
5564            value: Some(p.to_owned()),
5565        });
5566    }
5567    if let Some(s) = suffix {
5568        rules.push(FilterRule {
5569            name: Some(FilterRuleName::from("suffix".to_owned())),
5570            value: Some(s.to_owned()),
5571        });
5572    }
5573    Some(NotificationConfigurationFilter {
5574        key: Some(S3KeyFilter {
5575            filter_rules: Some(rules),
5576        }),
5577    })
5578}
5579
5580// ---------------------------------------------------------------------------
5581// v0.6 #40: Convert between the s3s-typed `ReplicationConfiguration` (the
5582// wire surface) and our internal `crate::replication::ReplicationConfig`.
5583// AWS's `ReplicationRuleFilter` is a sum type — `Prefix | Tag | And { Prefix,
5584// Tags }`; we flatten it into the single `(prefix, tag-vec)` representation
5585// the matcher needs. Sub-blocks v0.6 #40 does not implement
5586// (DeleteMarkerReplication / SourceSelectionCriteria / ReplicationTime /
5587// Metrics / EncryptionConfiguration) round-trip as `None` on GET — operators
5588// who set them on PUT see them silently dropped, mirroring "feature not
5589// supported in this release" semantics.
5590// ---------------------------------------------------------------------------
5591
5592fn replication_from_dto(
5593    dto: &ReplicationConfiguration,
5594) -> crate::replication::ReplicationConfig {
5595    let rules = dto
5596        .rules
5597        .iter()
5598        .enumerate()
5599        .map(|(idx, r)| {
5600            let id = r
5601                .id
5602                .as_ref()
5603                .map(|s| s.as_str().to_owned())
5604                .unwrap_or_else(|| format!("rule-{idx}"));
5605            let priority = r.priority.unwrap_or(0).max(0) as u32;
5606            let status_enabled = r.status.as_str() == ReplicationRuleStatus::ENABLED;
5607            let filter = replication_filter_from_dto(r.filter.as_ref(), r.prefix.as_deref());
5608            let destination_bucket = r.destination.bucket.clone();
5609            let destination_storage_class = r
5610                .destination
5611                .storage_class
5612                .as_ref()
5613                .map(|s| s.as_str().to_owned());
5614            crate::replication::ReplicationRule {
5615                id,
5616                priority,
5617                status_enabled,
5618                filter,
5619                destination_bucket,
5620                destination_storage_class,
5621            }
5622        })
5623        .collect();
5624    crate::replication::ReplicationConfig {
5625        role: dto.role.clone(),
5626        rules,
5627    }
5628}
5629
5630fn replication_to_dto(
5631    cfg: &crate::replication::ReplicationConfig,
5632) -> ReplicationConfiguration {
5633    let rules = cfg
5634        .rules
5635        .iter()
5636        .map(|r| {
5637            let status = if r.status_enabled {
5638                ReplicationRuleStatus::from_static(ReplicationRuleStatus::ENABLED)
5639            } else {
5640                ReplicationRuleStatus::from_static(ReplicationRuleStatus::DISABLED)
5641            };
5642            let destination = Destination {
5643                access_control_translation: None,
5644                account: None,
5645                bucket: r.destination_bucket.clone(),
5646                encryption_configuration: None,
5647                metrics: None,
5648                replication_time: None,
5649                storage_class: r
5650                    .destination_storage_class
5651                    .as_ref()
5652                    .map(|s| StorageClass::from(s.clone())),
5653            };
5654            let filter = Some(replication_filter_to_dto(&r.filter));
5655            ReplicationRule {
5656                delete_marker_replication: None,
5657                destination,
5658                existing_object_replication: None,
5659                filter,
5660                id: Some(r.id.clone()),
5661                prefix: None,
5662                priority: Some(r.priority as i32),
5663                source_selection_criteria: None,
5664                status,
5665            }
5666        })
5667        .collect();
5668    ReplicationConfiguration {
5669        role: cfg.role.clone(),
5670        rules,
5671    }
5672}
5673
5674fn replication_filter_from_dto(
5675    f: Option<&ReplicationRuleFilter>,
5676    rule_level_prefix: Option<&str>,
5677) -> crate::replication::ReplicationFilter {
5678    let mut prefix: Option<String> = rule_level_prefix.map(str::to_owned);
5679    let mut tags: Vec<(String, String)> = Vec::new();
5680    if let Some(f) = f {
5681        if let Some(p) = f.prefix.as_ref()
5682            && prefix.is_none()
5683        {
5684            prefix = Some(p.clone());
5685        }
5686        if let Some(t) = f.tag.as_ref()
5687            && let (Some(k), Some(v)) = (t.key.as_ref(), t.value.as_ref())
5688        {
5689            tags.push((k.clone(), v.clone()));
5690        }
5691        if let Some(and) = f.and.as_ref() {
5692            if let Some(p) = and.prefix.as_ref()
5693                && prefix.is_none()
5694            {
5695                prefix = Some(p.clone());
5696            }
5697            if let Some(ts) = and.tags.as_ref() {
5698                for t in ts {
5699                    if let (Some(k), Some(v)) = (t.key.as_ref(), t.value.as_ref()) {
5700                        tags.push((k.clone(), v.clone()));
5701                    }
5702                }
5703            }
5704        }
5705    }
5706    crate::replication::ReplicationFilter { prefix, tags }
5707}
5708
5709fn replication_filter_to_dto(
5710    f: &crate::replication::ReplicationFilter,
5711) -> ReplicationRuleFilter {
5712    if f.tags.is_empty() {
5713        ReplicationRuleFilter {
5714            and: None,
5715            prefix: f.prefix.clone(),
5716            tag: None,
5717        }
5718    } else if f.tags.len() == 1 && f.prefix.is_none() {
5719        let (k, v) = &f.tags[0];
5720        ReplicationRuleFilter {
5721            and: None,
5722            prefix: None,
5723            tag: Some(Tag {
5724                key: Some(k.clone()),
5725                value: Some(v.clone()),
5726            }),
5727        }
5728    } else {
5729        let tags: Vec<Tag> = f
5730            .tags
5731            .iter()
5732            .map(|(k, v)| Tag {
5733                key: Some(k.clone()),
5734                value: Some(v.clone()),
5735            })
5736            .collect();
5737        ReplicationRuleFilter {
5738            and: Some(ReplicationRuleAndOperator {
5739                prefix: f.prefix.clone(),
5740                tags: Some(tags),
5741            }),
5742            prefix: None,
5743            tag: None,
5744        }
5745    }
5746}
5747
5748// ---------------------------------------------------------------------------
5749// v0.6 #37: Convert between the s3s-typed `BucketLifecycleConfiguration`
5750// (the wire surface) and our internal `crate::lifecycle::LifecycleConfig`.
5751// The internal representation flattens AWS's "Filter | And" disjunction
5752// into a single `LifecycleFilter` struct of optional fields plus a tag
5753// vector. Fields S4's evaluator does not consume
5754// (`expired_object_delete_marker`, `noncurrent_version_transitions`,
5755// `transition_default_minimum_object_size`, the storage class on the
5756// noncurrent expiration) are dropped on PUT and re-rendered as their
5757// AWS-default shape on GET so the client always sees a well-formed
5758// configuration.
5759// ---------------------------------------------------------------------------
5760
5761fn dto_lifecycle_to_internal(
5762    dto: &BucketLifecycleConfiguration,
5763) -> crate::lifecycle::LifecycleConfig {
5764    crate::lifecycle::LifecycleConfig {
5765        rules: dto.rules.iter().map(dto_rule_to_internal).collect(),
5766    }
5767}
5768
5769fn dto_rule_to_internal(rule: &LifecycleRule) -> crate::lifecycle::LifecycleRule {
5770    let status = crate::lifecycle::LifecycleStatus::from_aws_str(rule.status.as_str());
5771    let filter = rule
5772        .filter
5773        .as_ref()
5774        .map(dto_filter_to_internal)
5775        .unwrap_or_default();
5776    let expiration_days = rule
5777        .expiration
5778        .as_ref()
5779        .and_then(|e| e.days)
5780        .and_then(|d| u32::try_from(d).ok());
5781    let expiration_date = rule
5782        .expiration
5783        .as_ref()
5784        .and_then(|e| e.date.as_ref())
5785        .and_then(timestamp_to_chrono_utc);
5786    let transitions: Vec<crate::lifecycle::TransitionRule> = rule
5787        .transitions
5788        .as_ref()
5789        .map(|ts| {
5790            ts.iter()
5791                .filter_map(|t| {
5792                    let days = u32::try_from(t.days?).ok()?;
5793                    let storage_class = t.storage_class.as_ref()?.as_str().to_owned();
5794                    Some(crate::lifecycle::TransitionRule {
5795                        days,
5796                        storage_class,
5797                    })
5798                })
5799                .collect()
5800        })
5801        .unwrap_or_default();
5802    let noncurrent_version_expiration_days = rule
5803        .noncurrent_version_expiration
5804        .as_ref()
5805        .and_then(|n| n.noncurrent_days)
5806        .and_then(|d| u32::try_from(d).ok());
5807    let abort_incomplete_multipart_upload_days = rule
5808        .abort_incomplete_multipart_upload
5809        .as_ref()
5810        .and_then(|a| a.days_after_initiation)
5811        .and_then(|d| u32::try_from(d).ok());
5812    crate::lifecycle::LifecycleRule {
5813        id: rule.id.clone().unwrap_or_default(),
5814        status,
5815        filter,
5816        expiration_days,
5817        expiration_date,
5818        transitions,
5819        noncurrent_version_expiration_days,
5820        abort_incomplete_multipart_upload_days,
5821    }
5822}
5823
5824fn dto_filter_to_internal(filter: &LifecycleRuleFilter) -> crate::lifecycle::LifecycleFilter {
5825    let mut prefix = filter.prefix.clone();
5826    let mut tags: Vec<(String, String)> = Vec::new();
5827    let mut size_gt: Option<u64> = filter
5828        .object_size_greater_than
5829        .and_then(|n| u64::try_from(n).ok());
5830    let mut size_lt: Option<u64> = filter
5831        .object_size_less_than
5832        .and_then(|n| u64::try_from(n).ok());
5833    if let Some(t) = &filter.tag
5834        && let (Some(k), Some(v)) = (t.key.as_ref(), t.value.as_ref())
5835    {
5836        tags.push((k.clone(), v.clone()));
5837    }
5838    if let Some(and) = &filter.and {
5839        if prefix.is_none() {
5840            prefix = and.prefix.clone();
5841        }
5842        if size_gt.is_none() {
5843            size_gt = and
5844                .object_size_greater_than
5845                .and_then(|n| u64::try_from(n).ok());
5846        }
5847        if size_lt.is_none() {
5848            size_lt = and
5849                .object_size_less_than
5850                .and_then(|n| u64::try_from(n).ok());
5851        }
5852        if let Some(ts) = &and.tags {
5853            for t in ts {
5854                if let (Some(k), Some(v)) = (t.key.as_ref(), t.value.as_ref()) {
5855                    tags.push((k.clone(), v.clone()));
5856                }
5857            }
5858        }
5859    }
5860    crate::lifecycle::LifecycleFilter {
5861        prefix,
5862        tags,
5863        object_size_greater_than: size_gt,
5864        object_size_less_than: size_lt,
5865    }
5866}
5867
5868fn internal_rule_to_dto(rule: &crate::lifecycle::LifecycleRule) -> LifecycleRule {
5869    let expiration = if rule.expiration_days.is_some() || rule.expiration_date.is_some() {
5870        Some(LifecycleExpiration {
5871            date: rule.expiration_date.map(chrono_utc_to_timestamp),
5872            days: rule.expiration_days.map(|d| d as i32),
5873            expired_object_delete_marker: None,
5874        })
5875    } else {
5876        None
5877    };
5878    let transitions: Option<TransitionList> = if rule.transitions.is_empty() {
5879        None
5880    } else {
5881        Some(
5882            rule.transitions
5883                .iter()
5884                .map(|t| Transition {
5885                    date: None,
5886                    days: Some(t.days as i32),
5887                    storage_class: Some(TransitionStorageClass::from(t.storage_class.clone())),
5888                })
5889                .collect(),
5890        )
5891    };
5892    let noncurrent_version_expiration =
5893        rule.noncurrent_version_expiration_days
5894            .map(|d| NoncurrentVersionExpiration {
5895                newer_noncurrent_versions: None,
5896                noncurrent_days: Some(d as i32),
5897            });
5898    let abort_incomplete_multipart_upload =
5899        rule.abort_incomplete_multipart_upload_days
5900            .map(|d| AbortIncompleteMultipartUpload {
5901                days_after_initiation: Some(d as i32),
5902            });
5903    let filter = if rule.filter.tags.is_empty()
5904        && rule.filter.object_size_greater_than.is_none()
5905        && rule.filter.object_size_less_than.is_none()
5906    {
5907        rule.filter.prefix.as_ref().map(|p| LifecycleRuleFilter {
5908            and: None,
5909            object_size_greater_than: None,
5910            object_size_less_than: None,
5911            prefix: Some(p.clone()),
5912            tag: None,
5913        })
5914    } else if rule.filter.tags.len() == 1
5915        && rule.filter.prefix.is_none()
5916        && rule.filter.object_size_greater_than.is_none()
5917        && rule.filter.object_size_less_than.is_none()
5918    {
5919        let (k, v) = rule.filter.tags[0].clone();
5920        Some(LifecycleRuleFilter {
5921            and: None,
5922            object_size_greater_than: None,
5923            object_size_less_than: None,
5924            prefix: None,
5925            tag: Some(Tag {
5926                key: Some(k),
5927                value: Some(v),
5928            }),
5929        })
5930    } else {
5931        let tags = if rule.filter.tags.is_empty() {
5932            None
5933        } else {
5934            Some(
5935                rule.filter
5936                    .tags
5937                    .iter()
5938                    .map(|(k, v)| Tag {
5939                        key: Some(k.clone()),
5940                        value: Some(v.clone()),
5941                    })
5942                    .collect(),
5943            )
5944        };
5945        Some(LifecycleRuleFilter {
5946            and: Some(LifecycleRuleAndOperator {
5947                object_size_greater_than: rule
5948                    .filter
5949                    .object_size_greater_than
5950                    .and_then(|n| i64::try_from(n).ok()),
5951                object_size_less_than: rule
5952                    .filter
5953                    .object_size_less_than
5954                    .and_then(|n| i64::try_from(n).ok()),
5955                prefix: rule.filter.prefix.clone(),
5956                tags,
5957            }),
5958            object_size_greater_than: None,
5959            object_size_less_than: None,
5960            prefix: None,
5961            tag: None,
5962        })
5963    };
5964    LifecycleRule {
5965        abort_incomplete_multipart_upload,
5966        expiration,
5967        filter,
5968        id: if rule.id.is_empty() {
5969            None
5970        } else {
5971            Some(rule.id.clone())
5972        },
5973        noncurrent_version_expiration,
5974        noncurrent_version_transitions: None,
5975        prefix: None,
5976        status: ExpirationStatus::from(rule.status.as_aws_str().to_owned()),
5977        transitions,
5978    }
5979}
5980
5981// (timestamp <-> chrono helpers `timestamp_to_chrono_utc` /
5982// `chrono_utc_to_timestamp` are defined earlier in this file for the
5983// tagging/notifications work; the lifecycle DTO converters reuse them.)
5984
5985// ---------------------------------------------------------------------------
5986// v0.5 #33: SigV4a (asymmetric ECDSA-P256) integration hook.
5987//
5988// Kept as a self-contained block at the bottom of the file so it doesn't
5989// touch the existing `S4Service` struct, `new()`, or any of the per-op
5990// handlers above. The hook is wired in by the binary at server-build time
5991// as a hyper middleware layer (see `main.rs`), NOT inside `S4Service`.
5992//
5993// Lifecycle:
5994//   1. `SigV4aGate::new(store)` is constructed once at boot from the
5995//      operator-supplied credential directory.
5996//   2. For each incoming request, `SigV4aGate::pre_route(&req,
5997//      &requested_region, &canonical_request_bytes)` is invoked BEFORE
5998//      the request hits the S3 framework. If the request claims SigV4a
5999//      and verifies, control returns to the framework. Otherwise a 403
6000//      `SignatureDoesNotMatch` is produced.
6001//   3. Plain SigV4 (HMAC-SHA256) requests pass through untouched.
6002// ---------------------------------------------------------------------------
6003
6004/// Gate that fronts the S3 service path with SigV4a verification (v0.5 #33).
6005///
6006/// Wraps a [`crate::sigv4a::SigV4aCredentialStore`] and exposes a single
6007/// `pre_route` entry point that returns `Ok(())` for both
6008/// "request is plain SigV4 — pass through" and "request is SigV4a and
6009/// verified", and an `Err(...)` containing a 403-equivalent diagnostic
6010/// otherwise. Cheap to clone (the inner store is `Arc`-backed).
6011#[derive(Debug, Clone)]
6012pub struct SigV4aGate {
6013    store: crate::sigv4a::SharedSigV4aCredentialStore,
6014}
6015
6016impl SigV4aGate {
6017    #[must_use]
6018    pub fn new(store: crate::sigv4a::SharedSigV4aCredentialStore) -> Self {
6019        Self { store }
6020    }
6021
6022    /// Inspect an incoming HTTP request. Behaviour:
6023    ///
6024    /// - Not SigV4a (no `X-Amz-Region-Set` and no SigV4a `Authorization`
6025    ///   prefix) → returns `Ok(())`; the framework's existing SigV4
6026    ///   path handles the request.
6027    /// - SigV4a + valid signature + region match → `Ok(())`.
6028    /// - SigV4a + unknown access-key-id → `Err` with `InvalidAccessKeyId`.
6029    /// - SigV4a + bad signature / region mismatch → `Err` with
6030    ///   `SignatureDoesNotMatch`.
6031    ///
6032    /// `canonical_request_bytes` is the SigV4a string-to-sign (or
6033    /// canonical-request bytes; the caller decides) that the framework
6034    /// has already produced for this request. Keeping it as a parameter
6035    /// instead of rebuilding it inside the hook avoids duplicating the
6036    /// canonicalisation logic.
6037    pub fn pre_route<B>(
6038        &self,
6039        req: &http::Request<B>,
6040        requested_region: &str,
6041        canonical_request_bytes: &[u8],
6042    ) -> Result<(), SigV4aGateError> {
6043        if !crate::sigv4a::detect(req) {
6044            return Ok(());
6045        }
6046        let auth_hdr = req
6047            .headers()
6048            .get(http::header::AUTHORIZATION)
6049            .and_then(|v| v.to_str().ok())
6050            .ok_or(SigV4aGateError::MissingAuthorization)?;
6051        let parsed = crate::sigv4a::parse_authorization_header(auth_hdr)
6052            .ok_or(SigV4aGateError::MalformedAuthorization)?;
6053        let region_set = req
6054            .headers()
6055            .get(crate::sigv4a::REGION_SET_HEADER)
6056            .and_then(|v| v.to_str().ok())
6057            .unwrap_or("*");
6058        let key = self
6059            .store
6060            .get(&parsed.access_key_id)
6061            .ok_or_else(|| SigV4aGateError::UnknownAccessKey(parsed.access_key_id.clone()))?;
6062        crate::sigv4a::verify(
6063            &crate::sigv4a::CanonicalRequest::new(canonical_request_bytes),
6064            &parsed.signature_der,
6065            key,
6066            region_set,
6067            requested_region,
6068        )
6069        .map_err(SigV4aGateError::Verify)?;
6070        Ok(())
6071    }
6072}
6073
6074/// Failure modes from [`SigV4aGate::pre_route`]. All variants map to
6075/// HTTP 403 with one of the two AWS-standard error codes
6076/// (`InvalidAccessKeyId` or `SignatureDoesNotMatch`).
6077#[derive(Debug, thiserror::Error)]
6078pub enum SigV4aGateError {
6079    #[error("missing Authorization header")]
6080    MissingAuthorization,
6081    #[error("malformed SigV4a Authorization header")]
6082    MalformedAuthorization,
6083    #[error("unknown SigV4a access-key-id: {0}")]
6084    UnknownAccessKey(String),
6085    #[error("SigV4a verification failed: {0}")]
6086    Verify(#[source] crate::sigv4a::SigV4aError),
6087}
6088
6089impl SigV4aGateError {
6090    /// AWS S3 error code that should accompany a 403 response.
6091    #[must_use]
6092    pub fn s3_error_code(&self) -> &'static str {
6093        match self {
6094            Self::UnknownAccessKey(_) => "InvalidAccessKeyId",
6095            _ => "SignatureDoesNotMatch",
6096        }
6097    }
6098}
6099
6100#[cfg(test)]
6101mod tests {
6102    use super::*;
6103
6104    #[test]
6105    fn manifest_roundtrip_via_metadata() {
6106        let original = ChunkManifest {
6107            codec: CodecKind::CpuZstd,
6108            original_size: 1234,
6109            compressed_size: 567,
6110            crc32c: 0xdead_beef,
6111        };
6112        let mut meta: Option<Metadata> = None;
6113        write_manifest(&mut meta, &original);
6114        let extracted = extract_manifest(&meta).expect("manifest must round-trip");
6115        assert_eq!(extracted.codec, original.codec);
6116        assert_eq!(extracted.original_size, original.original_size);
6117        assert_eq!(extracted.compressed_size, original.compressed_size);
6118        assert_eq!(extracted.crc32c, original.crc32c);
6119    }
6120
6121    #[test]
6122    fn missing_metadata_yields_none() {
6123        let meta: Option<Metadata> = None;
6124        assert!(extract_manifest(&meta).is_none());
6125    }
6126
6127    #[test]
6128    fn partial_metadata_yields_none() {
6129        let mut meta = Metadata::new();
6130        meta.insert(META_CODEC.into(), "cpu-zstd".into());
6131        let opt = Some(meta);
6132        assert!(extract_manifest(&opt).is_none());
6133    }
6134
6135    #[test]
6136    fn parse_copy_source_range_basic() {
6137        let r = parse_copy_source_range("bytes=10-20").unwrap();
6138        match r {
6139            s3s::dto::Range::Int { first, last } => {
6140                assert_eq!(first, 10);
6141                assert_eq!(last, Some(20));
6142            }
6143            _ => panic!("expected Int range"),
6144        }
6145    }
6146
6147    #[test]
6148    fn parse_copy_source_range_rejects_inverted() {
6149        let err = parse_copy_source_range("bytes=20-10").unwrap_err();
6150        assert!(err.contains("last < first"));
6151    }
6152
6153    #[test]
6154    fn parse_copy_source_range_rejects_missing_prefix() {
6155        let err = parse_copy_source_range("10-20").unwrap_err();
6156        assert!(err.contains("must start with 'bytes='"));
6157    }
6158
6159    #[test]
6160    fn parse_copy_source_range_rejects_open_ended() {
6161        // S3 upload_part_copy spec requires N-M (closed); suffix and
6162        // open-ended forms are not allowed for this header.
6163        assert!(parse_copy_source_range("bytes=10-").is_err());
6164        assert!(parse_copy_source_range("bytes=-10").is_err());
6165    }
6166
6167    // v0.7 #49: safe_object_uri must round-trip every legal S3 key
6168    // (which includes spaces, slashes, control chars, raw UTF-8) into
6169    // a parseable `http::Uri` instead of panicking like the previous
6170    // `format!(...).parse().unwrap()` call sites did.
6171
6172    #[test]
6173    fn safe_object_uri_basic_ascii() {
6174        let uri = safe_object_uri("bucket", "key").expect("ascii must be safe");
6175        assert_eq!(uri.path(), "/bucket/key");
6176    }
6177
6178    #[test]
6179    fn safe_object_uri_encodes_spaces() {
6180        let uri = safe_object_uri("bucket", "key with spaces").expect("must encode spaces");
6181        // RFC 3986 path-segment encoding turns ' ' into %20.
6182        assert!(
6183            uri.path().contains("%20"),
6184            "expected percent-encoded space, got {}",
6185            uri.path()
6186        );
6187        assert!(uri.path().starts_with("/bucket/"));
6188    }
6189
6190    #[test]
6191    fn safe_object_uri_preserves_slashes() {
6192        // S3 keys legally contain '/' as a logical path separator —
6193        // the helper must NOT escape it (otherwise the synthetic URI
6194        // changes the perceived hierarchy).
6195        let uri =
6196            safe_object_uri("bucket", "key/with/slashes").expect("slashes must round-trip");
6197        assert_eq!(uri.path(), "/bucket/key/with/slashes");
6198    }
6199
6200    #[test]
6201    fn safe_object_uri_handles_newline_without_panic() {
6202        // Newlines are control chars in URIs; whether the result is
6203        // Ok (encoded as %0A) or Err (parse rejects), the helper
6204        // MUST NOT panic. Either outcome is acceptable.
6205        let _ = safe_object_uri("bucket", "key\n");
6206    }
6207
6208    #[test]
6209    fn safe_object_uri_handles_null_byte_without_panic() {
6210        let _ = safe_object_uri("bucket", "key\0bad");
6211    }
6212
6213    #[test]
6214    fn safe_object_uri_handles_unicode_without_panic() {
6215        // RTL override, BOM, plain Japanese — none should panic.
6216        let _ = safe_object_uri("bucket", "rtl\u{202E}override");
6217        let _ = safe_object_uri("bucket", "\u{FEFF}bom-key");
6218        let _ = safe_object_uri("bucket", "日本語キー");
6219    }
6220
6221    #[test]
6222    fn safe_object_uri_no_panic_for_every_byte() {
6223        // Exhaustive byte coverage: 0x00..=0xFF as a 1-byte key.
6224        // None of these may panic. (0x80..=0xFF are not valid UTF-8
6225        // by themselves; we go through `String::from_utf8_lossy` so
6226        // the helper sees a real `&str` regardless of the raw byte.)
6227        for b in 0u8..=255 {
6228            let s = String::from_utf8_lossy(&[b]).into_owned();
6229            let _ = safe_object_uri("bucket", &s);
6230        }
6231    }
6232
6233    /// v0.8.1 #58: smoke test for the DEK-handling shape used by the
6234    /// SSE-KMS branches of `put_object` and `complete_multipart_upload`.
6235    /// Mirrors the call pattern (generate_dek → length check → copy
6236    /// into stack `[u8; 32]` → reborrow as `&[u8; 32]` for `SseSource`)
6237    /// without spinning up a full `S4Service`.
6238    ///
6239    /// The real assertion this guards against is a regression where
6240    /// the `Zeroizing` wrapper is accidentally dropped before the
6241    /// stack copy lands (e.g. someone refactors to use
6242    /// `let dek = kms.generate_dek(...).await?.0; drop(dek); ...`)
6243    /// or where `&**dek` is rewritten in a way that doesn't compile.
6244    #[tokio::test]
6245    async fn kms_dek_lifetime_within_function_scope() {
6246        use crate::kms::{KmsBackend, LocalKms};
6247        use std::collections::HashMap;
6248        use std::path::PathBuf;
6249        use zeroize::Zeroizing;
6250
6251        let mut keks = HashMap::new();
6252        keks.insert("scope".to_string(), [33u8; 32]);
6253        let kms = LocalKms::from_keks(PathBuf::from("/tmp/kms-scope-test"), keks);
6254
6255        // Mirror the put_object KMS branch shape exactly.
6256        let (dek, wrapped) = kms.generate_dek("scope").await.unwrap();
6257        assert_eq!(dek.len(), 32);
6258        let mut dek_arr: Zeroizing<[u8; 32]> = Zeroizing::new([0u8; 32]);
6259        dek_arr.copy_from_slice(&dek);
6260
6261        // The reborrow used at the SseSource construction site —
6262        // mirrors the call-site pattern where `let dek_ref: &[u8; 32]`
6263        // auto-derefs from a `Zeroizing<[u8; 32]>` reference.
6264        let dek_ref: &[u8; 32] = &dek_arr;
6265        // Sanity: the reborrow points at the same bytes.
6266        assert_eq!(dek_ref, &*dek_arr);
6267        // Wrapped key id flows through unchanged.
6268        assert_eq!(wrapped.key_id, "scope");
6269
6270        // At end of scope, both `dek` (Zeroizing<Vec<u8>>) and
6271        // `dek_arr` (Zeroizing<[u8; 32]>) are dropped, wiping the
6272        // backing memory. Cannot directly assert the wipe (would be
6273        // UB to read freed memory), so this test instead enforces
6274        // that the call shape compiles and executes; the wipe itself
6275        // is exercised by the `zeroize` crate's own test suite.
6276    }
6277}