Skip to main content

s4_server/
service.rs

1//! `s3s::S3` 実装 — `s3s_aws::Proxy` への delegation を default にしつつ、
2//! `put_object` / `get_object` 経路で `s4_codec::CodecRegistry` を呼ぶ。
3//!
4//! ## カバー範囲 (Phase 1 月 2)
5//!
6//! - 圧縮 hook あり: `put_object`, `get_object`
7//! - 純 delegation (圧縮なし): `head_bucket`, `list_buckets`, `create_bucket`, `delete_bucket`,
8//!   `head_object`, `delete_object`, `delete_objects`, `copy_object`, `list_objects`,
9//!   `list_objects_v2`, `create_multipart_upload`, `upload_part`,
10//!   `complete_multipart_upload`, `abort_multipart_upload`, `list_multipart_uploads`,
11//!   `list_parts`
12//! - 未対応 (デフォルトで NotImplemented): その他 80+ ops (Tagging / ACL / Lifecycle 等は Phase 2)
13//!
14//! ## アーキテクチャ
15//!
16//! - `S4Service<B>` は backend (B: S3) と `Arc<CodecRegistry>` と `Arc<dyn CodecDispatcher>`
17//!   を保持する。`CodecRegistry` 経由で複数 codec を抱えられるので、ひとつの S4 インスタンスが
18//!   複数 codec で書かれた object を透過的に GET できる
19//! - PUT: dispatcher が body の先頭 sample から codec を選び、registry で compress、
20//!   manifest を S3 metadata に書いて backend に forward
21//! - GET: backend から取得 → metadata から manifest を復元 → registry.decompress で
22//!   manifest 指定の codec で解凍 → 元の bytes を return
23//!
24//! ## 既知の制限事項
25//!
26//! - **Multipart Upload は per-part 圧縮が未実装**: 現状は upload_part を素通し。
27//!   Phase 1 月 2 後半で per-part compress + complete_multipart_upload で manifest 集約。
28//! - **PUT body は memory に collect**: max_body_bytes 上限あり (default 5 GiB = S3 単発 PUT 上限)。
29//!   Streaming-aware 圧縮は Phase 2。
30
31use std::sync::Arc;
32
33use base64::Engine as _;
34use bytes::BytesMut;
35use s3s::dto::*;
36use s3s::{S3, S3Error, S3ErrorCode, S3Request, S3Response, S3Result};
37use s4_codec::index::{FrameIndex, build_index_from_body, decode_index, encode_index, sidecar_key};
38use s4_codec::multipart::{
39    FRAME_HEADER_BYTES, FrameHeader, FrameIter, S3_MULTIPART_MIN_PART_BYTES, pad_to_minimum,
40    write_frame,
41};
42use s4_codec::{ChunkManifest, CodecDispatcher, CodecKind, CodecRegistry};
43use std::time::Instant;
44use tracing::{debug, info};
45
46use crate::blob::{
47    bytes_to_blob, chain_sample_with_rest, collect_blob, collect_with_sample, peek_sample,
48};
49use crate::streaming::{
50    cpu_zstd_decompress_stream, pick_chunk_size, streaming_compress_to_frames,
51    supports_streaming_compress, supports_streaming_decompress,
52};
53
54/// PUT body の先頭 sampling で渡す最大 byte 数。
55const SAMPLE_BYTES: usize = 4096;
56
57/// v0.7 #49: percent-encoding set covering everything that is **not** an
58/// `unreserved` character per RFC 3986 §2.3, **plus** we additionally
59/// encode the path-reserved sub-delims that `http::Uri` rejects in a
60/// path segment (`?`, `#`, `%`, control bytes, space, etc.). We
61/// deliberately keep `/` un-encoded because S3 keys legally use `/` as
62/// a logical separator and the rest of the synthetic URI relies on the
63/// path layout `/{bucket}/{key}` round-tripping byte-for-byte.
64const URI_KEY_ENCODE_SET: &percent_encoding::AsciiSet = &percent_encoding::CONTROLS
65    .add(b' ')
66    .add(b'"')
67    .add(b'#')
68    .add(b'<')
69    .add(b'>')
70    .add(b'?')
71    .add(b'`')
72    .add(b'{')
73    .add(b'}')
74    .add(b'|')
75    .add(b'\\')
76    .add(b'^')
77    .add(b'[')
78    .add(b']')
79    .add(b'%');
80
81/// v0.7 #49: build the synthetic `/{bucket}/{key}` request URI used by
82/// the sidecar / replication helpers when they re-enter the backend
83/// trait without going through the HTTP layer. S3 object keys can
84/// contain spaces, control bytes, and arbitrary Unicode that would
85/// make `format!(...).parse::<http::Uri>()` panic; we percent-encode
86/// the key bytes (RFC 3986 path segment) and the bucket name (defensive
87/// — bucket names are normally DNS-safe, but the helper is the single
88/// choke-point) before splicing them in. If the encoded form *still*
89/// fails to parse (extremely unlikely once everything outside the
90/// unreserved set is escaped) we surface a typed `400 InvalidObjectName`
91/// instead of crashing the worker.
92pub(crate) fn safe_object_uri(bucket: &str, key: &str) -> S3Result<http::Uri> {
93    use percent_encoding::utf8_percent_encode;
94    let bucket_enc = utf8_percent_encode(bucket, URI_KEY_ENCODE_SET);
95    let key_enc = utf8_percent_encode(key, URI_KEY_ENCODE_SET);
96    let raw = format!("/{bucket_enc}/{key_enc}");
97    raw.parse::<http::Uri>().map_err(|e| {
98        // S3 spec uses `InvalidObjectName` (HTTP 400) for keys that
99        // can't be represented in a request URI. The generated
100        // `S3ErrorCode` enum doesn't expose a typed variant for it,
101        // so we round-trip through `from_bytes` which preserves the
102        // canonical wire string while falling back to InvalidArgument
103        // if even that lookup fails (cannot happen at runtime — kept
104        // as a belt-and-suspenders branch so this helper never
105        // panics).
106        let code = S3ErrorCode::from_bytes(b"InvalidObjectName")
107            .unwrap_or(S3ErrorCode::InvalidArgument);
108        S3Error::with_message(
109            code,
110            format!("object key cannot be encoded as a request URI: {e}"),
111        )
112    })
113}
114
115/// v0.4 #20: captured at the start of a handler, before the request is
116/// consumed by the backend call, so the matching `record_access` at
117/// end-of-request can fill in the structured access log entry.
118struct AccessLogPreamble {
119    remote_ip: Option<String>,
120    requester: Option<String>,
121    request_uri: String,
122    user_agent: Option<String>,
123}
124
125pub struct S4Service<B: S3> {
126    /// Wrapped in `Arc` so the v0.6 #40 cross-bucket replication
127    /// dispatcher can clone it into a detached `tokio::spawn` task
128    /// (Arc::clone is cheap; backend trait methods take `&self` so no
129    /// other handler is affected by the indirection).
130    backend: Arc<B>,
131    registry: Arc<CodecRegistry>,
132    dispatcher: Arc<dyn CodecDispatcher>,
133    max_body_bytes: usize,
134    policy: Option<crate::policy::SharedPolicy>,
135    /// v0.3 #13: surfaced as the `aws:SecureTransport` Condition key. Set
136    /// to `true` when the listener is wrapped in TLS (or ACME), so policies
137    /// gating "deny if not over TLS" can do their job. Defaults to `false`
138    /// (HTTP); set via [`S4Service::with_secure_transport`] at boot.
139    secure_transport: bool,
140    /// v0.4 #19: optional per-(principal, bucket) token-bucket limiter.
141    rate_limits: Option<crate::rate_limit::SharedRateLimits>,
142    /// v0.4 #20: optional S3-style access log emitter.
143    access_log: Option<crate::access_log::SharedAccessLog>,
144    /// v0.4 #21 / v0.5 #29: optional server-side encryption keyring
145    /// (AES-256-GCM). When set, every PUT body gets wrapped in S4E2
146    /// (with the keyring's active key id) after the compress + framing
147    /// steps; every GET that sniffs as S4E1/S4E2 is decrypted before
148    /// frame parsing. A `with_sse_key(...)` call wraps the supplied
149    /// key in a 1-slot keyring so single-key (v0.4) operators get the
150    /// same behaviour they had before, just on the v2 frame.
151    sse_keyring: Option<crate::sse::SharedSseKeyring>,
152    /// v0.5 #34: optional first-class versioning state machine. When
153    /// `Some(...)`, S4-server itself owns the per-bucket versioning
154    /// state + per-(bucket, key) version chain; PUT / GET / DELETE /
155    /// list_object_versions / get_bucket_versioning /
156    /// put_bucket_versioning handlers consult the manager instead of
157    /// passing through. When `None` (default), the legacy
158    /// backend-passthrough behaviour applies so existing v0.4
159    /// deployments are unaffected until they explicitly call
160    /// `with_versioning(...)`.
161    versioning: Option<Arc<crate::versioning::VersioningManager>>,
162    /// v0.5 #28: optional SSE-KMS envelope-encryption backend. When
163    /// `Some(...)`, PUTs carrying `x-amz-server-side-encryption: aws:kms`
164    /// generate a fresh DEK via the backend, encrypt the body with it
165    /// (S4E4 frame), and persist only the wrapped DEK. GETs sniffing as
166    /// S4E4 unwrap the DEK through the same backend before decrypt.
167    /// `kms_default_key_id` is used when the request omits an explicit
168    /// `x-amz-server-side-encryption-aws-kms-key-id` (mirrors AWS S3
169    /// bucket-default behaviour).
170    kms: Option<Arc<dyn crate::kms::KmsBackend>>,
171    kms_default_key_id: Option<String>,
172    /// v0.5 #30: optional Object Lock (WORM) enforcement layer. When
173    /// `Some(...)`, `delete_object` and overwrite-style `put_object`
174    /// consult the manager and refuse the operation with HTTP 403
175    /// `AccessDenied` while the object is locked (Compliance until
176    /// expiry, Governance unless the bypass header is set, or any time
177    /// a legal hold is on). PUT also auto-applies the bucket-default
178    /// retention to brand-new objects when configured. When `None`
179    /// (default), the legacy backend-passthrough behaviour applies, so
180    /// existing v0.4 deployments are unaffected until they explicitly
181    /// call `with_object_lock(...)`.
182    object_lock: Option<Arc<crate::object_lock::ObjectLockManager>>,
183    /// v0.6 #38: optional first-class CORS bucket configuration manager.
184    /// When `Some(...)`, S4-server itself owns per-bucket CORS rules and
185    /// `put_bucket_cors` / `get_bucket_cors` / `delete_bucket_cors`
186    /// consult the manager instead of passing through to the backend.
187    /// `handle_preflight` (public method on `S4Service`) routes OPTIONS-
188    /// style preflight matching through the same store; the actual HTTP
189    /// OPTIONS routing wire-up at the listener level is a follow-up
190    /// (s3s framework does not surface OPTIONS as a typed handler).
191    cors: Option<Arc<crate::cors::CorsManager>>,
192    /// v0.6 #36: optional first-class S3 Inventory manager. When
193    /// `Some(...)`, S4-server itself owns per-(bucket, id) inventory
194    /// configurations and `put_bucket_inventory_configuration` /
195    /// `get_bucket_inventory_configuration` /
196    /// `list_bucket_inventory_configurations` /
197    /// `delete_bucket_inventory_configuration` consult the manager
198    /// instead of passing through to the backend. The actual periodic
199    /// CSV emission is driven by a tokio task in `main.rs` that calls
200    /// `InventoryManager::run_once_for_test` on a fixed cadence; the
201    /// service handlers below only deal with config-level CRUD.
202    inventory: Option<Arc<crate::inventory::InventoryManager>>,
203    /// v0.6 #35: optional first-class S3 bucket-notification manager.
204    /// When `Some(...)`, S4-server itself owns per-bucket notification
205    /// configurations and `put_bucket_notification_configuration` /
206    /// `get_bucket_notification_configuration` consult the manager
207    /// instead of passing through to the backend. Successful PUT /
208    /// DELETE handlers fire matching destinations on a detached tokio
209    /// task (best-effort; see `crate::notifications::dispatch_event`).
210    notifications: Option<Arc<crate::notifications::NotificationManager>>,
211    /// v0.6 #37: optional first-class S3 Lifecycle configuration
212    /// manager. When `Some(...)`, S4-server itself owns per-bucket
213    /// lifecycle rules and `put_bucket_lifecycle_configuration` /
214    /// `get_bucket_lifecycle_configuration` /
215    /// `delete_bucket_lifecycle` consult the manager instead of
216    /// passing through to the backend. The actual background scanner
217    /// (list_objects_v2 -> evaluate -> delete / metadata-rewrite per
218    /// rule) is a v0.7+ follow-up; the test path
219    /// `S4Service::run_lifecycle_once_for_test` exercises the
220    /// evaluator end-to-end so this v0.6 #37 wiring is enough to ship
221    /// the configuration-management half without putting a
222    /// half-wired bucket-walk in front of users.
223    lifecycle: Option<Arc<crate::lifecycle::LifecycleManager>>,
224    /// v0.6 #39: optional first-class object + bucket Tagging manager.
225    /// When `Some(...)`, S4-server itself owns per-(bucket, key) and
226    /// per-bucket tag state — `PutObjectTagging` /
227    /// `GetObjectTagging` / `DeleteObjectTagging` /
228    /// `PutBucketTagging` / `GetBucketTagging` /
229    /// `DeleteBucketTagging` route through the manager (replacing the
230    /// previous backend-passthrough behaviour). `put_object` also
231    /// pre-parses the `x-amz-tagging` header / `Tagging` input field
232    /// so the IAM policy evaluator can gate on
233    /// `s3:RequestObjectTag/<key>` and `s3:ExistingObjectTag/<key>`.
234    /// On a successful PUT the parsed tags are persisted; on a
235    /// successful DELETE the matching tag entry is dropped.
236    tagging: Option<Arc<crate::tagging::TagManager>>,
237    /// v0.6 #40: optional first-class cross-bucket replication manager.
238    /// When `Some(...)`, S4-server itself owns per-bucket replication
239    /// rules; `PutBucketReplication` / `GetBucketReplication` /
240    /// `DeleteBucketReplication` route through the manager (replacing
241    /// the previous backend-passthrough behaviour). On every successful
242    /// `put_object` the manager's rule list is consulted; the
243    /// highest-priority matching enabled rule wins, the per-key status
244    /// is recorded as `Pending`, and the source body and metadata are
245    /// handed to a detached tokio task that PUTs to the destination
246    /// bucket through the same backend. The replica is stamped with
247    /// `x-amz-replication-status: REPLICA` in its metadata; the
248    /// source-side status is updated to `Completed` on success or
249    /// `Failed` after the 3-attempt retry budget is exhausted (drop
250    /// counter bumps in either-side case so dashboards see the loss).
251    /// `head_object` / `get_object` echo the recorded status back as
252    /// `x-amz-replication-status` so consumers can poll progress.
253    /// Limited to single-instance (same `S4Service`) replication; true
254    /// cross-region (multi-instance) is a v0.7+ follow-up.
255    replication: Option<Arc<crate::replication::ReplicationManager>>,
256    /// v0.6 #42: optional MFA-Delete enforcement layer. When `Some(...)`,
257    /// every DELETE / DELETE-version / delete-marker / `PutBucketVersioning`
258    /// request against a bucket whose MFA-Delete state is `Enabled`
259    /// must carry `x-amz-mfa: <serial> <code>` (RFC 6238 6-digit TOTP);
260    /// missing or invalid tokens return HTTP 403 `AccessDenied`. When
261    /// `None` (default), the gate is a no-op so existing v0.4 / v0.5
262    /// deployments are unaffected until they explicitly call
263    /// `with_mfa_delete(...)`.
264    mfa_delete: Option<Arc<crate::mfa::MfaDeleteManager>>,
265    /// v0.5 #32: when `true`, every PUT must carry an SSE indicator
266    /// (`x-amz-server-side-encryption`, the SSE-C customer-key headers,
267    /// or be matched against a configured server-managed keyring/KMS).
268    /// Set by `--compliance-mode strict` after the boot-time
269    /// prerequisite check passes.
270    compliance_strict: bool,
271    /// v0.7 #47: optional SigV4a (asymmetric ECDSA-P256-SHA256) verify
272    /// gate. When `Some(...)`, the listener-side middleware (see
273    /// [`crate::routing::try_sigv4a_verify`]) inspects every incoming
274    /// request and short-circuits SigV4a-signed ones — verifying the
275    /// signature against the credential store and returning 403
276    /// `SignatureDoesNotMatch` / `InvalidAccessKeyId` on failure. Plain
277    /// SigV4 (HMAC-SHA256) requests pass through to s3s untouched. When
278    /// `None`, the middleware is a no-op so the existing SigV4 path is
279    /// unaffected (operators opt in via `--sigv4a-credentials <DIR>`).
280    sigv4a_gate: Option<Arc<SigV4aGate>>,
281}
282
283impl<B: S3> S4Service<B> {
284    /// AWS S3 単発 PUT の API 上限 (5 GiB)
285    pub const DEFAULT_MAX_BODY_BYTES: usize = 5 * 1024 * 1024 * 1024;
286
287    pub fn new(
288        backend: B,
289        registry: Arc<CodecRegistry>,
290        dispatcher: Arc<dyn CodecDispatcher>,
291    ) -> Self {
292        Self {
293            backend: Arc::new(backend),
294            registry,
295            dispatcher,
296            max_body_bytes: Self::DEFAULT_MAX_BODY_BYTES,
297            policy: None,
298            secure_transport: false,
299            rate_limits: None,
300            access_log: None,
301            sse_keyring: None,
302            versioning: None,
303            kms: None,
304            kms_default_key_id: None,
305            object_lock: None,
306            cors: None,
307            inventory: None,
308            notifications: None,
309            lifecycle: None,
310            tagging: None,
311            replication: None,
312            mfa_delete: None,
313            compliance_strict: false,
314            sigv4a_gate: None,
315        }
316    }
317
318    /// v0.7 #47: attach the SigV4a verify gate. Once set, the
319    /// listener-side middleware (`crate::routing::try_sigv4a_verify`)
320    /// short-circuits any incoming `AWS4-ECDSA-P256-SHA256` request,
321    /// verifying it against the supplied credential store and
322    /// returning 403 on failure. Plain SigV4 (HMAC-SHA256) requests
323    /// are unaffected. When the gate is unset (default), the
324    /// middleware skips entirely so existing SigV4 deployments keep
325    /// working.
326    #[must_use]
327    pub fn with_sigv4a_gate(mut self, gate: Arc<SigV4aGate>) -> Self {
328        self.sigv4a_gate = Some(gate);
329        self
330    }
331
332    /// v0.7 #47: borrow the attached SigV4a gate. Used by `main.rs`
333    /// to snapshot the gate `Arc` before the s3s `ServiceBuilder`
334    /// consumes the `S4Service` (the listener-side middleware needs
335    /// the same `Arc` because s3s' SigV4 verifier rejects SigV4a
336    /// algorithm tokens with "unknown algorithm" — match has to
337    /// happen at the hyper layer instead).
338    #[must_use]
339    pub fn sigv4a_gate(&self) -> Option<&Arc<SigV4aGate>> {
340        self.sigv4a_gate.as_ref()
341    }
342
343    /// v0.6 #39: attach the in-memory object + bucket Tagging manager.
344    /// Once set, `Put/Get/Delete` `Object/Bucket Tagging` route
345    /// through the manager (instead of forwarding to the backend),
346    /// and `put_object`'s `x-amz-tagging` parse path becomes the
347    /// source of `s3:RequestObjectTag/<key>` for the IAM policy
348    /// evaluator. The manager itself is shared via `Arc`.
349    #[must_use]
350    pub fn with_tagging(mut self, mgr: Arc<crate::tagging::TagManager>) -> Self {
351        self.tagging = Some(mgr);
352        self
353    }
354
355    /// v0.6 #39: borrow the attached tagging manager (test /
356    /// introspection — the snapshotter in `main.rs`, when wired,
357    /// will keep its own `Arc` clone).
358    #[must_use]
359    pub fn tag_manager(&self) -> Option<&Arc<crate::tagging::TagManager>> {
360        self.tagging.as_ref()
361    }
362
363    /// v0.6 #36: attach the in-memory S3 Inventory manager. Once set,
364    /// `put_bucket_inventory_configuration` /
365    /// `get_bucket_inventory_configuration` /
366    /// `list_bucket_inventory_configurations` /
367    /// `delete_bucket_inventory_configuration` route through the
368    /// manager. The actual periodic CSV / manifest emission is
369    /// orchestrated by a tokio task started in `main.rs`; the manager
370    /// itself is shared between the handler and the scheduler via
371    /// `Arc`.
372    #[must_use]
373    pub fn with_inventory(mut self, mgr: Arc<crate::inventory::InventoryManager>) -> Self {
374        self.inventory = Some(mgr);
375        self
376    }
377
378    /// v0.6 #36: borrow the attached inventory manager (test /
379    /// introspection — the background scheduler in `main.rs` keeps its
380    /// own `Arc` clone, so this accessor is for the test path that
381    /// invokes `run_once_for_test` directly).
382    #[must_use]
383    pub fn inventory_manager(&self) -> Option<&Arc<crate::inventory::InventoryManager>> {
384        self.inventory.as_ref()
385    }
386
387    /// v0.6 #37: attach the in-memory S3 Lifecycle configuration
388    /// manager. Once set, `put_bucket_lifecycle_configuration` /
389    /// `get_bucket_lifecycle_configuration` / `delete_bucket_lifecycle`
390    /// route through the manager (replacing the previous backend-
391    /// passthrough behaviour). The actual periodic scanner that walks
392    /// the source bucket and invokes Expiration / Transition /
393    /// NoncurrentExpiration actions is a v0.7+ follow-up — see
394    /// [`Self::run_lifecycle_once_for_test`] for the in-memory test
395    /// path that exercises the evaluator end-to-end.
396    #[must_use]
397    pub fn with_lifecycle(mut self, mgr: Arc<crate::lifecycle::LifecycleManager>) -> Self {
398        self.lifecycle = Some(mgr);
399        self
400    }
401
402    /// v0.6 #37: borrow the attached lifecycle manager (test /
403    /// introspection — the background scheduler in `main.rs` keeps its
404    /// own `Arc` clone, so this accessor is for the test path that
405    /// invokes the evaluator directly).
406    #[must_use]
407    pub fn lifecycle_manager(&self) -> Option<&Arc<crate::lifecycle::LifecycleManager>> {
408        self.lifecycle.as_ref()
409    }
410
411    /// v0.6 #37: synchronous test entry that runs the lifecycle evaluator
412    /// against a caller-provided list of `(key, age, size, tags)` tuples
413    /// and returns the `(key, action)` pairs that should fire. The actual
414    /// backend invocation (S3.delete_object / metadata rewrite) is left
415    /// to the caller — the unit + E2E tests use this to verify the
416    /// evaluator without spawning the (deferred) background scanner.
417    /// Returns an empty `Vec` when no lifecycle manager is attached or
418    /// no rule matches.
419    #[must_use]
420    pub fn run_lifecycle_once_for_test(
421        &self,
422        bucket: &str,
423        objects: &[crate::lifecycle::EvaluateBatchEntry],
424    ) -> Vec<(String, crate::lifecycle::LifecycleAction)> {
425        let Some(mgr) = self.lifecycle.as_ref() else {
426            return Vec::new();
427        };
428        crate::lifecycle::evaluate_batch(mgr, bucket, objects)
429    }
430
431    /// v0.6 #35: attach the in-memory bucket-notification manager. Once
432    /// set, `put_bucket_notification_configuration` /
433    /// `get_bucket_notification_configuration` route through the manager
434    /// (replacing the previous backend-passthrough behaviour); successful
435    /// `put_object` / `delete_object` calls fire matching destinations
436    /// on a detached tokio task via
437    /// `crate::notifications::dispatch_event` (best-effort, fire-and-
438    /// forget — failures bump the manager's `dropped_total` counter and
439    /// log at warn but do NOT fail the originating S3 request).
440    #[must_use]
441    pub fn with_notifications(
442        mut self,
443        mgr: Arc<crate::notifications::NotificationManager>,
444    ) -> Self {
445        self.notifications = Some(mgr);
446        self
447    }
448
449    /// v0.6 #35: borrow the attached notifications manager (test /
450    /// introspection — used by the metrics layer to read
451    /// `dropped_total`).
452    #[must_use]
453    pub fn notifications_manager(
454        &self,
455    ) -> Option<&Arc<crate::notifications::NotificationManager>> {
456        self.notifications.as_ref()
457    }
458
459    /// v0.6 #35: internal helper used by the DELETE handlers to fire a
460    /// matching notification on a detached tokio task. No-op when no
461    /// manager is attached or no rule on the bucket matches the given
462    /// (event, key) tuple.
463    fn fire_delete_notification(
464        &self,
465        bucket: &str,
466        key: &str,
467        event: crate::notifications::EventType,
468        version_id: Option<String>,
469    ) {
470        let Some(mgr) = self.notifications.as_ref() else {
471            return;
472        };
473        let dests = mgr.match_destinations(bucket, &event, key);
474        if dests.is_empty() {
475            return;
476        }
477        tokio::spawn(crate::notifications::dispatch_event(
478            Arc::clone(mgr),
479            bucket.to_owned(),
480            key.to_owned(),
481            event,
482            None,
483            None,
484            version_id,
485            format!("S4-{}", uuid::Uuid::new_v4()),
486        ));
487    }
488
489    /// v0.6 #40: attach the in-memory cross-bucket replication manager.
490    /// Once set, `put_bucket_replication` / `get_bucket_replication` /
491    /// `delete_bucket_replication` route through the manager (replacing
492    /// the previous backend-passthrough behaviour); a successful
493    /// `put_object` whose key matches an enabled rule fires a detached
494    /// tokio task that PUTs the same body + metadata to the rule's
495    /// destination bucket, stamping the replica with
496    /// `x-amz-replication-status: REPLICA`. Failures after the retry
497    /// budget bump the manager's `dropped_total` counter and are
498    /// surfaced in the `s4_replication_dropped_total` Prometheus
499    /// counter; successes bump `s4_replication_replicated_total`.
500    #[must_use]
501    pub fn with_replication(
502        mut self,
503        mgr: Arc<crate::replication::ReplicationManager>,
504    ) -> Self {
505        self.replication = Some(mgr);
506        self
507    }
508
509    /// v0.6 #40: borrow the attached replication manager (test /
510    /// introspection — used by the metrics layer to read
511    /// `dropped_total`).
512    #[must_use]
513    pub fn replication_manager(
514        &self,
515    ) -> Option<&Arc<crate::replication::ReplicationManager>> {
516        self.replication.as_ref()
517    }
518
519    /// v0.6 #40: internal helper used by the PUT handlers to fire a
520    /// detached cross-bucket replication task. No-op when no manager
521    /// is attached, the source backend PUT failed, or no rule on the
522    /// source bucket matches the (key, tags) tuple. The `body` is the
523    /// post-compression / post-encryption `Bytes` that was sent to
524    /// the source backend (refcount-cloned), and `metadata` is the
525    /// metadata map that already includes the manifest /
526    /// `s4-encrypted` markers — the replica decodes through the same
527    /// path. The destination PUT runs through `Arc<B>::put_object`.
528    fn spawn_replication_if_matched(
529        &self,
530        source_bucket: &str,
531        source_key: &str,
532        request_tags: &Option<crate::tagging::TagSet>,
533        body: &bytes::Bytes,
534        metadata: &Option<std::collections::HashMap<String, String>>,
535        backend_ok: bool,
536    ) where
537        B: Send + Sync + 'static,
538    {
539        if !backend_ok {
540            return;
541        }
542        let Some(mgr) = self.replication.as_ref() else {
543            return;
544        };
545        // Pull the request's tags into the (k, v) shape the matcher
546        // expects. The tagging manager would have the canonical
547        // post-PUT view but at this point in the pipeline it's
548        // already been written above; for the rule-match decision
549        // the request's tags are sufficient (= the tags this PUT
550        // applies, S3 PutObject is full-replace on tags).
551        let object_tags: Vec<(String, String)> = request_tags
552            .as_ref()
553            .map(|ts| ts.iter().cloned().collect())
554            .unwrap_or_default();
555        let Some(rule) = mgr.match_rule(source_bucket, source_key, &object_tags) else {
556            return;
557        };
558        // Eagerly mark the source key as Pending so a HEAD between
559        // the source PUT returning and the spawned task completing
560        // surfaces the in-flight state.
561        mgr.record_status(
562            source_bucket,
563            source_key,
564            crate::replication::ReplicationStatus::Pending,
565        );
566        let mgr_cl = Arc::clone(mgr);
567        let backend = Arc::clone(&self.backend);
568        let body_cl = body.clone();
569        let metadata_cl = metadata.clone();
570        let source_bucket_cl = source_bucket.to_owned();
571        let source_key_cl = source_key.to_owned();
572        tokio::spawn(async move {
573            let do_put = move |dest_bucket: String,
574                               dest_key: String,
575                               dest_body: bytes::Bytes,
576                               dest_meta: Option<std::collections::HashMap<String, String>>| {
577                let backend = Arc::clone(&backend);
578                async move {
579                    let req = S3Request {
580                        input: PutObjectInput {
581                            bucket: dest_bucket,
582                            key: dest_key,
583                            body: Some(bytes_to_blob(dest_body)),
584                            metadata: dest_meta,
585                            ..Default::default()
586                        },
587                        method: http::Method::PUT,
588                        uri: "/".parse().unwrap(),
589                        headers: http::HeaderMap::new(),
590                        extensions: http::Extensions::new(),
591                        credentials: None,
592                        region: None,
593                        service: None,
594                        trailing_headers: None,
595                    };
596                    backend
597                        .put_object(req)
598                        .await
599                        .map(|_| ())
600                        .map_err(|e| format!("destination put_object: {e}"))
601                }
602            };
603            crate::replication::replicate_object(
604                rule,
605                source_bucket_cl,
606                source_key_cl,
607                body_cl,
608                metadata_cl,
609                do_put,
610                mgr_cl,
611            )
612            .await;
613        });
614    }
615
616    /// v0.6 #42: attach the in-memory MFA-Delete enforcement manager.
617    /// Once set, every DELETE / DELETE-version / delete-marker /
618    /// `PutBucketVersioning` request against a bucket whose MFA-Delete
619    /// state is `Enabled` requires a valid `x-amz-mfa: <serial> <code>`
620    /// header (RFC 6238 6-digit TOTP); the gate is a no-op for buckets
621    /// where MFA-Delete is `Disabled` (S3 default).
622    #[must_use]
623    pub fn with_mfa_delete(mut self, mgr: Arc<crate::mfa::MfaDeleteManager>) -> Self {
624        self.mfa_delete = Some(mgr);
625        self
626    }
627
628    /// v0.6 #42: borrow the attached MFA-Delete manager (test /
629    /// introspection — used by the snapshot path in `main.rs` to call
630    /// `to_json` for restart-recoverable state).
631    #[must_use]
632    pub fn mfa_delete_manager(&self) -> Option<&Arc<crate::mfa::MfaDeleteManager>> {
633        self.mfa_delete.as_ref()
634    }
635
636    /// v0.6 #38: attach the in-memory CORS configuration manager. Once
637    /// set, `put_bucket_cors` / `get_bucket_cors` / `delete_bucket_cors`
638    /// route through the manager instead of forwarding to the backend,
639    /// and [`Self::handle_preflight`] becomes useful for the (future)
640    /// listener-side OPTIONS interceptor.
641    #[must_use]
642    pub fn with_cors(mut self, mgr: Arc<crate::cors::CorsManager>) -> Self {
643        self.cors = Some(mgr);
644        self
645    }
646
647    /// v0.6 #38: Borrow the attached CORS manager (test / introspection).
648    #[must_use]
649    pub fn cors_manager(&self) -> Option<&Arc<crate::cors::CorsManager>> {
650        self.cors.as_ref()
651    }
652
653    /// v0.6 #38: evaluate a CORS preflight request against the bucket's
654    /// configured rules and, if a rule matches, return the headers that
655    /// the (future) listener-side OPTIONS interceptor must put on the
656    /// 200 response: `Access-Control-Allow-Origin`, `Access-Control-
657    /// Allow-Methods`, `Access-Control-Allow-Headers`, optionally
658    /// `Access-Control-Max-Age` and `Access-Control-Expose-Headers`.
659    ///
660    /// Returns `None` when no manager is attached, no config is
661    /// registered for the bucket, or no rule matches the (origin,
662    /// method, headers) triple. The caller is responsible for turning
663    /// `None` into the appropriate 403 response.
664    ///
665    /// **Note:** the OPTIONS routing itself (i.e. wiring this method
666    /// into the hyper-util listener path) is a follow-up — s3s does not
667    /// surface OPTIONS as a typed S3 handler, so this method is
668    /// currently call-able only from inside other handlers and tests.
669    #[must_use]
670    pub fn handle_preflight(
671        &self,
672        bucket: &str,
673        origin: &str,
674        method: &str,
675        request_headers: &[String],
676    ) -> Option<std::collections::HashMap<String, String>> {
677        let mgr = self.cors.as_ref()?;
678        let rule = mgr.match_preflight(bucket, origin, method, request_headers)?;
679        let mut h = std::collections::HashMap::new();
680        // Echo the matched origin back. If the rule used "*" we still
681        // echo "*" (S3 spec — the spec does not require us to echo the
682        // *requesting* origin when the wildcard matched).
683        let allow_origin = if rule.allowed_origins.iter().any(|o| o == "*") {
684            "*".to_string()
685        } else {
686            origin.to_string()
687        };
688        h.insert("Access-Control-Allow-Origin".to_string(), allow_origin);
689        h.insert(
690            "Access-Control-Allow-Methods".to_string(),
691            rule.allowed_methods.join(", "),
692        );
693        if !rule.allowed_headers.is_empty() {
694            // For the Allow-Headers response, echo back the rule's
695            // pattern list verbatim (S3 echoes the configured list,
696            // including "*" if present). Browsers honour exact-match
697            // rules.
698            h.insert(
699                "Access-Control-Allow-Headers".to_string(),
700                rule.allowed_headers.join(", "),
701            );
702        }
703        if let Some(secs) = rule.max_age_seconds {
704            h.insert("Access-Control-Max-Age".to_string(), secs.to_string());
705        }
706        if !rule.expose_headers.is_empty() {
707            h.insert(
708                "Access-Control-Expose-Headers".to_string(),
709                rule.expose_headers.join(", "),
710            );
711        }
712        Some(h)
713    }
714
715    /// v0.5 #32: enable strict compliance mode. Every PUT must carry an
716    /// SSE indicator (server-side encryption header or SSE-C customer
717    /// key); requests without one are rejected with 400 InvalidRequest.
718    /// Boot-time prerequisite checking lives in the binary
719    /// (`validate_compliance_mode`) so this flag is purely the runtime
720    /// switch.
721    #[must_use]
722    pub fn with_compliance_strict(mut self, on: bool) -> Self {
723        self.compliance_strict = on;
724        self
725    }
726
727    /// v0.5 #30: attach the in-memory Object Lock (WORM) enforcement
728    /// manager. Once set, `delete_object` and overwrite-path
729    /// `put_object` refuse operations on locked keys with HTTP 403
730    /// `AccessDenied`; new PUTs to a bucket with a default retention
731    /// policy auto-create per-object lock state.
732    #[must_use]
733    pub fn with_object_lock(
734        mut self,
735        mgr: Arc<crate::object_lock::ObjectLockManager>,
736    ) -> Self {
737        self.object_lock = Some(mgr);
738        self
739    }
740
741    /// v0.7 #45: borrow the attached Object Lock manager (read-only —
742    /// the lifecycle scanner uses this to skip currently-locked objects
743    /// before issuing `delete_object`, since an Object Lock always wins
744    /// over Lifecycle Expiration in AWS S3 semantics). Mirrors the
745    /// shape of [`Self::lifecycle_manager`] /
746    /// [`Self::tag_manager`] — purely additive accessor, no handler
747    /// behaviour change.
748    #[must_use]
749    pub fn object_lock_manager(&self) -> Option<&Arc<crate::object_lock::ObjectLockManager>> {
750        self.object_lock.as_ref()
751    }
752
753    /// v0.5 #28: attach an SSE-KMS backend. `default_key_id` is used
754    /// when a PUT requests SSE-KMS without naming a specific KMS key
755    /// (operators set this to mirror AWS S3's bucket-default key).
756    #[must_use]
757    pub fn with_kms_backend(
758        mut self,
759        kms: Arc<dyn crate::kms::KmsBackend>,
760        default_key_id: Option<String>,
761    ) -> Self {
762        self.kms = Some(kms);
763        self.kms_default_key_id = default_key_id;
764        self
765    }
766
767    /// v0.5 #34: attach the first-class versioning state machine. Once
768    /// set, this `S4Service` owns the per-bucket versioning state +
769    /// per-(bucket, key) version chain; `put_object` / `get_object` /
770    /// `delete_object` / `list_object_versions` /
771    /// `get_bucket_versioning` / `put_bucket_versioning` consult the
772    /// manager instead of passing through to the backend. The backend
773    /// is still used as the byte store: Suspended / Unversioned buckets
774    /// keep using `<key>` directly (legacy), Enabled buckets redirect
775    /// each version's bytes to a shadow key
776    /// (`<key>.__s4ver__/<version-id>`) so older versions survive newer
777    /// PUTs to the same logical key.
778    #[must_use]
779    pub fn with_versioning(mut self, mgr: Arc<crate::versioning::VersioningManager>) -> Self {
780        self.versioning = Some(mgr);
781        self
782    }
783
784    /// v0.4 #21 (kept for back-compat): attach a single SSE-S4 key.
785    /// Internally wraps it in a 1-slot keyring with id=1 active, so
786    /// new objects ride the v0.5 S4E2 frame while previously-written
787    /// S4E1 bytes (this same key) still decrypt via the keyring's S4E1
788    /// fallback path. Operators wanting true rotation should call
789    /// [`Self::with_sse_keyring`] instead.
790    #[must_use]
791    pub fn with_sse_key(mut self, key: crate::sse::SharedSseKey) -> Self {
792        let keyring = crate::sse::SseKeyring::new(1, key);
793        self.sse_keyring = Some(std::sync::Arc::new(keyring));
794        self
795    }
796
797    /// v0.5 #29: attach a multi-key SSE-S4 keyring. PUT encrypts under
798    /// the active key (S4E2 frame stamped with that key's id); GET
799    /// dispatches on the body's magic — S4E1 falls back to trying every
800    /// key in the ring (active first) so v0.4 objects survive a
801    /// migration; S4E2 looks up the explicit key_id from the header.
802    #[must_use]
803    pub fn with_sse_keyring(mut self, keyring: crate::sse::SharedSseKeyring) -> Self {
804        self.sse_keyring = Some(keyring);
805        self
806    }
807
808    /// v0.4 #20: attach an S3-style access-log emitter. Each completed
809    /// PUT / GET / DELETE / List handler emits one entry into the
810    /// emitter's buffer; a background flusher (started separately, see
811    /// [`crate::access_log::AccessLog::spawn_flusher`]) writes hourly
812    /// rotated `.log` files into the configured directory.
813    #[must_use]
814    pub fn with_access_log(mut self, log: crate::access_log::SharedAccessLog) -> Self {
815        self.access_log = Some(log);
816        self
817    }
818
819    /// Capture the per-request access-log preamble before the request is
820    /// consumed by the backend call. Returns `None` if no access logger
821    /// is configured (cheap early-out so the handler doesn't pay the
822    /// header-clone cost when access logging is off).
823    fn access_log_preamble<I>(&self, req: &S3Request<I>) -> Option<AccessLogPreamble> {
824        self.access_log.as_ref()?;
825        Some(AccessLogPreamble {
826            remote_ip: req
827                .headers
828                .get("x-forwarded-for")
829                .and_then(|v| v.to_str().ok())
830                .and_then(|raw| raw.split(',').next())
831                .map(|s| s.trim().to_owned()),
832            requester: Self::principal_of(req).map(str::to_owned),
833            request_uri: format!("{} {}", req.method, req.uri.path()),
834            user_agent: req
835                .headers
836                .get("user-agent")
837                .and_then(|v| v.to_str().ok())
838                .map(str::to_owned),
839        })
840    }
841
842    /// Internal — called by handlers at end-of-request with a captured
843    /// preamble. Best-effort: swallows the await fast (clones Arc +
844    /// pushes), no error propagation back to the request path.
845    #[allow(clippy::too_many_arguments)]
846    async fn record_access(
847        &self,
848        preamble: Option<AccessLogPreamble>,
849        operation: &'static str,
850        bucket: &str,
851        key: Option<&str>,
852        http_status: u16,
853        bytes_sent: u64,
854        object_size: u64,
855        total_time_ms: u64,
856        error_code: Option<&str>,
857    ) {
858        let (Some(log), Some(p)) = (self.access_log.as_ref(), preamble) else {
859            return;
860        };
861        log.record(crate::access_log::AccessLogEntry {
862            time: std::time::SystemTime::now(),
863            bucket: bucket.to_owned(),
864            remote_ip: p.remote_ip,
865            requester: p.requester,
866            operation,
867            key: key.map(str::to_owned),
868            request_uri: p.request_uri,
869            http_status,
870            error_code: error_code.map(str::to_owned),
871            bytes_sent,
872            object_size,
873            total_time_ms,
874            user_agent: p.user_agent,
875        })
876        .await;
877    }
878
879    /// v0.4 #19: attach a per-(principal, bucket) token-bucket rate limiter.
880    /// When set, every PUT / GET / DELETE / List / Copy / multipart op is
881    /// throttle-checked before the policy gate; throttled requests return
882    /// `S3ErrorCode::SlowDown` (HTTP 503) and bump
883    /// `s4_rate_limit_throttled_total{principal,bucket}`.
884    #[must_use]
885    pub fn with_rate_limits(mut self, rl: crate::rate_limit::SharedRateLimits) -> Self {
886        self.rate_limits = Some(rl);
887        self
888    }
889
890    /// Helper used by request handlers to apply the rate limit. Returns
891    /// `Ok(())` when allowed (or no rate limiter is configured), or a
892    /// `SlowDown` S3Error otherwise.
893    fn enforce_rate_limit<I>(&self, req: &S3Request<I>, bucket: &str) -> S3Result<()> {
894        let Some(rl) = self.rate_limits.as_ref() else {
895            return Ok(());
896        };
897        let principal_id = Self::principal_of(req);
898        if !rl.check(principal_id, bucket) {
899            crate::metrics::record_rate_limit_throttle(principal_id.unwrap_or("-"), bucket);
900            return Err(S3Error::with_message(
901                S3ErrorCode::SlowDown,
902                format!("rate-limited: bucket={bucket}"),
903            ));
904        }
905        Ok(())
906    }
907
908    /// Tell the policy evaluator that the listener is reached over TLS
909    /// (or ACME). When `true`, the `aws:SecureTransport` Condition key
910    /// resolves to `true`. Defaults to `false`.
911    #[must_use]
912    pub fn with_secure_transport(mut self, on: bool) -> Self {
913        self.secure_transport = on;
914        self
915    }
916
917    #[must_use]
918    pub fn with_max_body_bytes(mut self, n: usize) -> Self {
919        self.max_body_bytes = n;
920        self
921    }
922
923    /// Attach an optional bucket policy (v0.2 #7). When `Some(...)`, every
924    /// PUT / GET / DELETE / List handler runs `policy.evaluate(...)` before
925    /// delegating to the backend; failures return `S3ErrorCode::AccessDenied`.
926    /// When `None` (the default), no policy enforcement happens.
927    #[must_use]
928    pub fn with_policy(mut self, policy: crate::policy::SharedPolicy) -> Self {
929        self.policy = Some(policy);
930        self
931    }
932
933    /// Pull the SigV4 access key id off the request's credentials, if any.
934    /// Used as the `principal_id` for policy evaluation.
935    fn principal_of<I>(req: &S3Request<I>) -> Option<&str> {
936        req.credentials.as_ref().map(|c| c.access_key.as_str())
937    }
938
939    /// v0.3 #13: build the per-request policy context from the incoming
940    /// `S3Request`. Pulls `aws:UserAgent` from the User-Agent header,
941    /// `aws:SourceIp` from the standard `X-Forwarded-For` header (most
942    /// production deployments are behind an LB / reverse proxy that sets
943    /// this), `aws:CurrentTime` from the system clock, and
944    /// `aws:SecureTransport` from the per-listener TLS flag.
945    fn request_context<I>(&self, req: &S3Request<I>) -> crate::policy::RequestContext {
946        let user_agent = req
947            .headers
948            .get("user-agent")
949            .and_then(|v| v.to_str().ok())
950            .map(str::to_owned);
951        // X-Forwarded-For is `client, proxy1, proxy2`; the leftmost entry
952        // is the original client. Trim and parse leniently.
953        let source_ip = req
954            .headers
955            .get("x-forwarded-for")
956            .and_then(|v| v.to_str().ok())
957            .and_then(|raw| raw.split(',').next())
958            .and_then(|s| s.trim().parse().ok());
959        crate::policy::RequestContext {
960            source_ip,
961            user_agent,
962            request_time: Some(std::time::SystemTime::now()),
963            secure_transport: self.secure_transport,
964            existing_object_tags: None,
965            request_object_tags: None,
966            extra: Default::default(),
967        }
968    }
969
970    /// Helper used by request handlers to enforce the optional policy.
971    /// Returns `Ok(())` when allowed (or no policy is configured), or an
972    /// `AccessDenied` S3Error otherwise. Bumps the policy denial Prometheus
973    /// counter on deny.
974    fn enforce_policy<I>(
975        &self,
976        req: &S3Request<I>,
977        action: &'static str,
978        bucket: &str,
979        key: Option<&str>,
980    ) -> S3Result<()> {
981        self.enforce_policy_with_extra(req, action, bucket, key, None, None)
982    }
983
984    /// v0.6 #39: variant of [`Self::enforce_policy`] that lets the
985    /// caller plumb tag context (existing-on-object + on-request) into
986    /// the policy evaluator. Both arguments default to `None`, in
987    /// which case the resulting `RequestContext` is identical to
988    /// [`Self::enforce_policy`]'s — so for handlers that don't deal
989    /// with tags this is a transparent no-op.
990    fn enforce_policy_with_extra<I>(
991        &self,
992        req: &S3Request<I>,
993        action: &'static str,
994        bucket: &str,
995        key: Option<&str>,
996        request_tags: Option<&crate::tagging::TagSet>,
997        existing_tags: Option<&crate::tagging::TagSet>,
998    ) -> S3Result<()> {
999        let Some(policy) = self.policy.as_ref() else {
1000            return Ok(());
1001        };
1002        let principal_id = Self::principal_of(req);
1003        let mut ctx = self.request_context(req);
1004        if let Some(t) = request_tags {
1005            ctx.request_object_tags = Some(t.clone());
1006        }
1007        if let Some(t) = existing_tags {
1008            ctx.existing_object_tags = Some(t.clone());
1009        }
1010        let decision = policy.evaluate_with(action, bucket, key, principal_id, &ctx);
1011        if decision.allow {
1012            Ok(())
1013        } else {
1014            crate::metrics::record_policy_denial(action, bucket);
1015            tracing::info!(
1016                action,
1017                bucket,
1018                key = ?key,
1019                principal = ?principal_id,
1020                source_ip = ?ctx.source_ip,
1021                user_agent = ?ctx.user_agent,
1022                secure_transport = ctx.secure_transport,
1023                matched_sid = ?decision.matched_sid,
1024                effect = ?decision.matched_effect,
1025                "S4 policy denied request"
1026            );
1027            Err(S3Error::with_message(
1028                S3ErrorCode::AccessDenied,
1029                format!("denied by S4 policy: {action} on bucket={bucket}"),
1030            ))
1031        }
1032    }
1033
1034    /// テスト用: backend を取り戻す (test helper、production では使わない).
1035    /// v0.6 #40 で `backend` が `Arc<B>` 化したので `Arc::try_unwrap` で
1036    /// 1-clone の場合のみ返す。共有されている (= replication dispatcher が
1037    /// 同じ Arc を持っていて未完了) 場合は `Err` を返さず panic させる
1038    /// (test 用途専用 helper の caller 契約を維持)。
1039    pub fn into_backend(self) -> B {
1040        Arc::try_unwrap(self.backend)
1041            .unwrap_or_else(|_| panic!("into_backend: backend Arc still shared (replication dispatcher in flight?)"))
1042    }
1043
1044    /// 必要 frame だけを backend に Range GET し、frame parse + decompress + slice
1045    /// した結果を返す sidecar fast path。Range request の **帯域節約版**。
1046    async fn partial_range_get(
1047        &self,
1048        req: &S3Request<GetObjectInput>,
1049        plan: s4_codec::index::RangePlan,
1050        client_start: u64,
1051        client_end_exclusive: u64,
1052        total_original: u64,
1053        get_start: Instant,
1054    ) -> S3Result<S3Response<GetObjectOutput>> {
1055        // 必要 byte 範囲だけを backend に partial GET
1056        let backend_range = s3s::dto::Range::Int {
1057            first: plan.byte_start,
1058            last: Some(plan.byte_end_exclusive - 1),
1059        };
1060        let backend_input = GetObjectInput {
1061            bucket: req.input.bucket.clone(),
1062            key: req.input.key.clone(),
1063            range: Some(backend_range),
1064            ..Default::default()
1065        };
1066        let backend_req = S3Request {
1067            input: backend_input,
1068            method: req.method.clone(),
1069            uri: req.uri.clone(),
1070            headers: req.headers.clone(),
1071            extensions: http::Extensions::new(),
1072            credentials: req.credentials.clone(),
1073            region: req.region.clone(),
1074            service: req.service.clone(),
1075            trailing_headers: None,
1076        };
1077        let mut backend_resp = self.backend.get_object(backend_req).await?;
1078        let blob = backend_resp.output.body.take().ok_or_else(|| {
1079            S3Error::with_message(
1080                S3ErrorCode::InternalError,
1081                "backend partial GET returned empty body",
1082            )
1083        })?;
1084        let bytes = collect_blob(blob, self.max_body_bytes)
1085            .await
1086            .map_err(internal("collect partial body"))?;
1087
1088        // frame parse + decompress
1089        let mut combined = BytesMut::new();
1090        for frame in FrameIter::new(bytes) {
1091            let (header, payload) = frame.map_err(|e| {
1092                S3Error::with_message(
1093                    S3ErrorCode::InternalError,
1094                    format!("partial-range frame parse: {e}"),
1095                )
1096            })?;
1097            let chunk_manifest = ChunkManifest {
1098                codec: header.codec,
1099                original_size: header.original_size,
1100                compressed_size: header.compressed_size,
1101                crc32c: header.crc32c,
1102            };
1103            let decompressed = self
1104                .registry
1105                .decompress(payload, &chunk_manifest)
1106                .await
1107                .map_err(internal("partial-range decompress"))?;
1108            combined.extend_from_slice(&decompressed);
1109        }
1110        let combined = combined.freeze();
1111        let sliced = combined
1112            .slice(plan.slice_start_in_combined as usize..plan.slice_end_in_combined as usize);
1113
1114        // response 組立て
1115        let returned_size = sliced.len() as u64;
1116        backend_resp.output.content_length = Some(returned_size as i64);
1117        backend_resp.output.content_range = Some(format!(
1118            "bytes {client_start}-{}/{total_original}",
1119            client_end_exclusive - 1
1120        ));
1121        backend_resp.output.checksum_crc32 = None;
1122        backend_resp.output.checksum_crc32c = None;
1123        backend_resp.output.checksum_crc64nvme = None;
1124        backend_resp.output.checksum_sha1 = None;
1125        backend_resp.output.checksum_sha256 = None;
1126        backend_resp.output.e_tag = None;
1127        backend_resp.output.body = Some(bytes_to_blob(sliced));
1128        backend_resp.status = Some(http::StatusCode::PARTIAL_CONTENT);
1129
1130        let elapsed = get_start.elapsed();
1131        crate::metrics::record_get(
1132            "partial",
1133            plan.byte_end_exclusive - plan.byte_start,
1134            returned_size,
1135            elapsed.as_secs_f64(),
1136            true,
1137        );
1138        info!(
1139            op = "get_object",
1140            bucket = %req.input.bucket,
1141            key = %req.input.key,
1142            bytes_in = plan.byte_end_exclusive - plan.byte_start,
1143            bytes_out = returned_size,
1144            total_object_size = total_original,
1145            range = true,
1146            path = "sidecar-partial",
1147            latency_ms = elapsed.as_millis() as u64,
1148            "S4 partial Range GET via sidecar index"
1149        );
1150        Ok(backend_resp)
1151    }
1152
1153    /// `<key>.s4index` sidecar object を backend に書く。失敗しても本体 PUT は
1154    /// 成功扱いにしたいので、err は warn ログのみ (Range GET の partial path が
1155    /// 使えなくなるが、full read fallback で意味的には正しい結果を返す)。
1156    async fn write_sidecar(&self, bucket: &str, key: &str, index: &FrameIndex) {
1157        let bytes = encode_index(index);
1158        let len = bytes.len() as i64;
1159        let sidecar = sidecar_key(key);
1160        // v0.7 #49: synthetic re-entry URI must be percent-encoded; if
1161        // the (already legally-arbitrary) S3 key produces something we
1162        // cannot encode at all, drop the sidecar PUT (the GET path
1163        // falls back to a full read on a missing sidecar) instead of
1164        // panicking on `parse().unwrap()`.
1165        let uri = match safe_object_uri(bucket, &sidecar) {
1166            Ok(u) => u,
1167            Err(e) => {
1168                tracing::warn!(
1169                    bucket,
1170                    key,
1171                    "S4 write_sidecar skipped (key not URI-encodable): {e}"
1172                );
1173                return;
1174            }
1175        };
1176        let put_input = PutObjectInput {
1177            bucket: bucket.into(),
1178            key: sidecar,
1179            body: Some(bytes_to_blob(bytes)),
1180            content_length: Some(len),
1181            content_type: Some("application/x-s4-index".into()),
1182            ..Default::default()
1183        };
1184        let put_req = S3Request {
1185            input: put_input,
1186            method: http::Method::PUT,
1187            uri,
1188            headers: http::HeaderMap::new(),
1189            extensions: http::Extensions::new(),
1190            credentials: None,
1191            region: None,
1192            service: None,
1193            trailing_headers: None,
1194        };
1195        if let Err(e) = self.backend.put_object(put_req).await {
1196            tracing::warn!(
1197                bucket,
1198                key,
1199                "S4 write_sidecar failed (Range GET will fall back to full read): {e}"
1200            );
1201        }
1202    }
1203
1204    /// `<key>.s4index` sidecar を backend から読み出す。なければ None。
1205    async fn read_sidecar(&self, bucket: &str, key: &str) -> Option<FrameIndex> {
1206        let sidecar = sidecar_key(key);
1207        // v0.7 #49: same encode-or-bail treatment as write_sidecar.
1208        let uri = safe_object_uri(bucket, &sidecar).ok()?;
1209        let get_input = GetObjectInput {
1210            bucket: bucket.into(),
1211            key: sidecar,
1212            ..Default::default()
1213        };
1214        let get_req = S3Request {
1215            input: get_input,
1216            method: http::Method::GET,
1217            uri,
1218            headers: http::HeaderMap::new(),
1219            extensions: http::Extensions::new(),
1220            credentials: None,
1221            region: None,
1222            service: None,
1223            trailing_headers: None,
1224        };
1225        let resp = self.backend.get_object(get_req).await.ok()?;
1226        let blob = resp.output.body?;
1227        let bytes = collect_blob(blob, 64 * 1024 * 1024).await.ok()?;
1228        decode_index(bytes).ok()
1229    }
1230
1231    /// Multipart object (frame 列) を解凍 → 元 bytes を再構築。
1232    ///
1233    /// **per-frame codec dispatch**: 各 frame header に codec_id が入っているので、
1234    /// frame ごとに registry が違う codec を呼ぶことができる。同一 object 内で
1235    /// 異なる codec が混在していても透過的に解凍可能 (parquet 風 mixed columns 等)。
1236    async fn decompress_multipart(&self, bytes: bytes::Bytes) -> S3Result<bytes::Bytes> {
1237        let mut out = BytesMut::new();
1238        for frame in FrameIter::new(bytes) {
1239            let (header, payload) = frame.map_err(|e| {
1240                S3Error::with_message(
1241                    S3ErrorCode::InternalError,
1242                    format!("multipart frame parse: {e}"),
1243                )
1244            })?;
1245            let chunk_manifest = ChunkManifest {
1246                codec: header.codec,
1247                original_size: header.original_size,
1248                compressed_size: header.compressed_size,
1249                crc32c: header.crc32c,
1250            };
1251            let decompressed = self
1252                .registry
1253                .decompress(payload, &chunk_manifest)
1254                .await
1255                .map_err(internal("multipart frame decompress"))?;
1256            out.extend_from_slice(&decompressed);
1257        }
1258        Ok(out.freeze())
1259    }
1260}
1261
1262/// Parse a CopySourceRange header value (`bytes=N-M`, `bytes=N-`, `bytes=-N`)
1263/// into the s3s::dto::Range used by the GetObject path. The S3 spec only
1264/// allows `bytes=N-M` for upload_part_copy (no suffix or open-ended), so
1265/// reject the other variants for parity with AWS.
1266fn parse_copy_source_range(s: &str) -> Result<s3s::dto::Range, String> {
1267    let rest = s
1268        .strip_prefix("bytes=")
1269        .ok_or_else(|| format!("CopySourceRange must start with 'bytes=', got {s:?}"))?;
1270    let (a, b) = rest
1271        .split_once('-')
1272        .ok_or_else(|| format!("CopySourceRange must be 'bytes=N-M', got {s:?}"))?;
1273    let first: u64 = a
1274        .parse()
1275        .map_err(|_| format!("CopySourceRange first byte not a number: {a:?}"))?;
1276    let last: u64 = b
1277        .parse()
1278        .map_err(|_| format!("CopySourceRange last byte not a number: {b:?}"))?;
1279    if last < first {
1280        return Err(format!("CopySourceRange last < first: {s:?}"));
1281    }
1282    Ok(s3s::dto::Range::Int {
1283        first,
1284        last: Some(last),
1285    })
1286}
1287
1288/// v0.5 #34: synthesize the backend storage key for a given
1289/// (logical key, version-id) pair on an Enabled-versioning bucket.
1290///
1291/// Uses the `__s4ver__/` infix because:
1292/// - it's not a substring of `.s4index` / `.s4ver` natural keys (no false-positive
1293///   listing filter collisions)
1294/// - directory-style separator keeps S3 console "browse by prefix" UX intact
1295///   (versions roll up under one virtual folder per object)
1296/// - human-readable on debug logs / `aws s3 ls`
1297///
1298/// `list_objects` / `list_objects_v2` / `list_object_versions` MUST filter
1299/// keys containing `.__s4ver__/` from results so customers don't see internal
1300/// shadow objects.
1301pub fn versioned_shadow_key(key: &str, version_id: &str) -> String {
1302    format!("{key}.__s4ver__/{version_id}")
1303}
1304
1305/// Test for the marker substring used by [`versioned_shadow_key`]. Cheap str
1306/// scan; both list_objects filter and the GET passthrough check use this.
1307fn is_versioning_shadow_key(key: &str) -> bool {
1308    key.contains(".__s4ver__/")
1309}
1310
1311/// v0.6 #42: wall-clock seconds since the UNIX epoch — fed to
1312/// `mfa::check_mfa` so the TOTP verifier can match the client's
1313/// authenticator app's view of "now". Falls back to `0` on the
1314/// (impossible-in-practice) clock-before-1970 path so the verifier
1315/// rejects rather than panicking.
1316fn current_unix_secs() -> u64 {
1317    std::time::SystemTime::now()
1318        .duration_since(std::time::UNIX_EPOCH)
1319        .map(|d| d.as_secs())
1320        .unwrap_or(0)
1321}
1322
1323/// v0.6 #42: translate an `MfaError` into the matching S3 wire error.
1324///
1325/// - `Missing` / `SerialMismatch` / `InvalidCode` → `403 AccessDenied`
1326///   (S3 spec for MFA Delete: every gating failure surfaces as
1327///   `AccessDenied`, not a separate `MFA*` code).
1328/// - `Malformed` → `400 InvalidRequest` (the request itself is
1329///   syntactically broken, not a permission issue).
1330fn mfa_error_to_s3(e: crate::mfa::MfaError) -> S3Error {
1331    match e {
1332        crate::mfa::MfaError::Missing => S3Error::with_message(
1333            S3ErrorCode::AccessDenied,
1334            "MFA token required for this operation",
1335        ),
1336        crate::mfa::MfaError::Malformed => S3Error::with_message(
1337            S3ErrorCode::InvalidRequest,
1338            "malformed x-amz-mfa header",
1339        ),
1340        crate::mfa::MfaError::SerialMismatch => S3Error::with_message(
1341            S3ErrorCode::AccessDenied,
1342            "MFA serial does not match configured device",
1343        ),
1344        crate::mfa::MfaError::InvalidCode => S3Error::with_message(
1345            S3ErrorCode::AccessDenied,
1346            "invalid MFA code",
1347        ),
1348    }
1349}
1350
1351fn is_multipart_object(metadata: &Option<Metadata>) -> bool {
1352    metadata
1353        .as_ref()
1354        .and_then(|m| m.get(META_MULTIPART))
1355        .map(|v| v == "true")
1356        .unwrap_or(false)
1357}
1358
1359const META_CODEC: &str = "s4-codec";
1360const META_ORIGINAL_SIZE: &str = "s4-original-size";
1361const META_COMPRESSED_SIZE: &str = "s4-compressed-size";
1362const META_CRC32C: &str = "s4-crc32c";
1363/// Multipart upload で per-part frame format を使ったオブジェクトであることを示す。
1364/// GET 時にこの flag を見て frame parser を起動する。
1365const META_MULTIPART: &str = "s4-multipart";
1366/// v0.2 #4: single-PUT でも S4F2 framed format で書かれていることを示す。
1367/// 旧 v0.1 single-PUT は raw 圧縮 bytes (この flag なし)。GET 時にこの flag を
1368/// 見て framed 経路 (= multipart と同じ FrameIter parse) に流す。
1369const META_FRAMED: &str = "s4-framed";
1370
1371fn is_framed_v2_object(metadata: &Option<Metadata>) -> bool {
1372    metadata
1373        .as_ref()
1374        .and_then(|m| m.get(META_FRAMED))
1375        .map(|v| v == "true")
1376        .unwrap_or(false)
1377}
1378
1379/// v0.4 #21: detect SSE-S4 by the metadata flag we set on PUT.
1380fn is_sse_encrypted(metadata: &Option<Metadata>) -> bool {
1381    metadata
1382        .as_ref()
1383        .and_then(|m| m.get("s4-encrypted"))
1384        .map(|v| v == "aes-256-gcm")
1385        .unwrap_or(false)
1386}
1387
1388/// v0.5 #27: pull the three SSE-C headers off an input struct. The S3
1389/// contract is "all three or none" — partial sets are a 400.
1390///
1391/// Returns `Ok(None)` when no SSE-C headers were sent (server-managed or
1392/// no encryption), `Ok(Some(material))` on validated client key, and
1393/// `Err` for malformed or partial inputs.
1394fn extract_sse_c_material(
1395    algorithm: &Option<String>,
1396    key: &Option<String>,
1397    md5: &Option<String>,
1398) -> S3Result<Option<crate::sse::CustomerKeyMaterial>> {
1399    match (algorithm, key, md5) {
1400        (None, None, None) => Ok(None),
1401        (Some(a), Some(k), Some(m)) => crate::sse::parse_customer_key_headers(a, k, m)
1402            .map(Some)
1403            .map_err(sse_c_error_to_s3),
1404        _ => Err(S3Error::with_message(
1405            S3ErrorCode::InvalidRequest,
1406            "SSE-C requires all three of: x-amz-server-side-encryption-customer-{algorithm,key,key-MD5}",
1407        )),
1408    }
1409}
1410
1411/// v0.5 #28: detect SSE-KMS request — `x-amz-server-side-encryption: aws:kms`.
1412/// Returns the key-id to wrap under, falling back to the gateway default.
1413fn extract_kms_key_id(
1414    sse: &Option<ServerSideEncryption>,
1415    sse_kms_key_id: &Option<String>,
1416    gateway_default: Option<&str>,
1417) -> Option<String> {
1418    let asks_for_kms = sse
1419        .as_ref()
1420        .map(|s| s.as_str() == ServerSideEncryption::AWS_KMS)
1421        .unwrap_or(false);
1422    if !asks_for_kms {
1423        return None;
1424    }
1425    sse_kms_key_id
1426        .clone()
1427        .or_else(|| gateway_default.map(str::to_owned))
1428}
1429
1430/// v0.5 #28: map kms module errors to AWS-shaped S3 error codes.
1431/// `KeyNotFound` is operator misconfig (400); `BackendUnavailable` is a
1432/// transient KMS outage (503). Other variants are 500 InternalError.
1433fn kms_error_to_s3(e: crate::kms::KmsError) -> S3Error {
1434    use crate::kms::KmsError as K;
1435    match e {
1436        K::KeyNotFound { key_id } => S3Error::with_message(
1437            S3ErrorCode::InvalidArgument,
1438            format!("KMS key not found: {key_id}"),
1439        ),
1440        K::BackendUnavailable { message } => S3Error::with_message(
1441            S3ErrorCode::ServiceUnavailable,
1442            format!("KMS backend unavailable: {message}"),
1443        ),
1444        other => S3Error::with_message(
1445            S3ErrorCode::InternalError,
1446            format!("KMS error: {other}"),
1447        ),
1448    }
1449}
1450
1451/// v0.5 #27: map sse module errors to AWS-shaped S3 error codes.
1452/// `WrongCustomerKey` → 403 AccessDenied (matches AWS behaviour);
1453/// `InvalidCustomerKey` / algorithm / required / unexpected → 400.
1454fn sse_c_error_to_s3(e: crate::sse::SseError) -> S3Error {
1455    use crate::sse::SseError as E;
1456    match e {
1457        E::WrongCustomerKey => S3Error::with_message(
1458            S3ErrorCode::AccessDenied,
1459            "SSE-C key does not match the key used at PUT time",
1460        ),
1461        E::InvalidCustomerKey { reason } => S3Error::with_message(
1462            S3ErrorCode::InvalidArgument,
1463            format!("SSE-C: {reason}"),
1464        ),
1465        E::CustomerKeyAlgorithmUnsupported { algo } => S3Error::with_message(
1466            S3ErrorCode::InvalidArgument,
1467            format!("SSE-C unsupported algorithm: {algo:?} (only AES256 is allowed)"),
1468        ),
1469        E::CustomerKeyRequired => S3Error::with_message(
1470            S3ErrorCode::InvalidRequest,
1471            "object is SSE-C encrypted; supply x-amz-server-side-encryption-customer-* headers",
1472        ),
1473        E::CustomerKeyUnexpected => S3Error::with_message(
1474            S3ErrorCode::InvalidRequest,
1475            "object is not SSE-C encrypted; do not send x-amz-server-side-encryption-customer-* headers",
1476        ),
1477        other => S3Error::with_message(S3ErrorCode::InternalError, format!("SSE error: {other}")),
1478    }
1479}
1480
1481fn extract_manifest(metadata: &Option<Metadata>) -> Option<ChunkManifest> {
1482    let m = metadata.as_ref()?;
1483    let codec = m
1484        .get(META_CODEC)
1485        .and_then(|s| s.parse::<CodecKind>().ok())?;
1486    let original_size = m.get(META_ORIGINAL_SIZE)?.parse().ok()?;
1487    let compressed_size = m.get(META_COMPRESSED_SIZE)?.parse().ok()?;
1488    let crc32c = m.get(META_CRC32C)?.parse().ok()?;
1489    Some(ChunkManifest {
1490        codec,
1491        original_size,
1492        compressed_size,
1493        crc32c,
1494    })
1495}
1496
1497fn write_manifest(metadata: &mut Option<Metadata>, manifest: &ChunkManifest) {
1498    let meta = metadata.get_or_insert_with(Default::default);
1499    meta.insert(META_CODEC.into(), manifest.codec.as_str().into());
1500    meta.insert(
1501        META_ORIGINAL_SIZE.into(),
1502        manifest.original_size.to_string(),
1503    );
1504    meta.insert(
1505        META_COMPRESSED_SIZE.into(),
1506        manifest.compressed_size.to_string(),
1507    );
1508    meta.insert(META_CRC32C.into(), manifest.crc32c.to_string());
1509}
1510
1511fn internal<E: std::fmt::Display>(prefix: &'static str) -> impl FnOnce(E) -> S3Error {
1512    move |e| S3Error::with_message(S3ErrorCode::InternalError, format!("{prefix}: {e}"))
1513}
1514
1515/// v0.6 #41: map a `select::SelectError` to the S3 error surface. AWS
1516/// uses a domain-specific `InvalidSqlExpression` code for parse / unsupported
1517/// errors, but s3s 0.13 doesn't expose that as a typed variant — we
1518/// fall back to the well-known `InvalidRequest` 400 with a descriptive
1519/// message that includes the original error context.
1520fn select_error_to_s3(e: crate::select::SelectError, fmt: &str) -> S3Error {
1521    use crate::select::SelectError;
1522    match e {
1523        SelectError::Parse(msg) => S3Error::with_message(
1524            S3ErrorCode::InvalidRequest,
1525            format!("SQL parse error: {msg}"),
1526        ),
1527        SelectError::UnsupportedFeature(msg) => S3Error::with_message(
1528            S3ErrorCode::InvalidRequest,
1529            format!("unsupported SQL feature: {msg}"),
1530        ),
1531        SelectError::RowEval(msg) => S3Error::with_message(
1532            S3ErrorCode::InvalidRequest,
1533            format!("SQL row evaluation error: {msg}"),
1534        ),
1535        SelectError::InputFormat(msg) => S3Error::with_message(
1536            S3ErrorCode::InvalidRequest,
1537            format!("{fmt} input format error: {msg}"),
1538        ),
1539    }
1540}
1541
1542/// v0.5 #30: parse the `x-amz-bypass-governance-retention` header into a
1543/// boolean flag. AWS S3 accepts `true` (case-insensitive); any other value
1544/// (including missing) is treated as `false`.
1545fn parse_bypass_governance_header(headers: &http::HeaderMap) -> bool {
1546    headers
1547        .get("x-amz-bypass-governance-retention")
1548        .and_then(|v| v.to_str().ok())
1549        .map(|s| s.eq_ignore_ascii_case("true"))
1550        .unwrap_or(false)
1551}
1552
1553/// Convert s3s `Timestamp` into a `chrono::DateTime<Utc>` by formatting it
1554/// as an RFC3339 string and re-parsing through `chrono`. The string format
1555/// avoids pulling the `time` crate (transitive dep of s3s, not declared by
1556/// s4-server) into our direct deps. Returns `None` if the format/parse fails
1557/// or the value is outside `chrono`'s supported range.
1558fn timestamp_to_chrono_utc(ts: &Timestamp) -> Option<chrono::DateTime<chrono::Utc>> {
1559    let mut buf = Vec::new();
1560    ts.format(s3s::dto::TimestampFormat::DateTime, &mut buf).ok()?;
1561    let s = std::str::from_utf8(&buf).ok()?;
1562    chrono::DateTime::parse_from_rfc3339(s)
1563        .ok()
1564        .map(|dt| dt.with_timezone(&chrono::Utc))
1565}
1566
1567/// Inverse of [`timestamp_to_chrono_utc`] — emit RFC3339 (the s3s
1568/// `DateTime` wire format) and re-parse via `Timestamp::parse`.
1569fn chrono_utc_to_timestamp(dt: chrono::DateTime<chrono::Utc>) -> Timestamp {
1570    // chrono's RFC3339 output format matches s3s' parser ("...Z" with
1571    // optional sub-second precision). Fall back to UNIX_EPOCH if anything
1572    // unexpected happens — we never produce malformed strings, so this
1573    // branch is unreachable in practice.
1574    let s = dt.to_rfc3339_opts(chrono::SecondsFormat::Millis, true);
1575    Timestamp::parse(s3s::dto::TimestampFormat::DateTime, &s).unwrap_or_default()
1576}
1577
1578/// v0.6 #39: convert our internal [`crate::tagging::TagSet`] into the
1579/// s3s `Vec<Tag>` wire shape used on `GetObject/BucketTaggingOutput`.
1580/// Both halves of every pair land in the `Some(_)` slot — AWS marks
1581/// the field optional but always populates it on response.
1582fn tagset_to_aws(set: &crate::tagging::TagSet) -> Vec<Tag> {
1583    set.iter()
1584        .map(|(k, v)| Tag {
1585            key: Some(k.clone()),
1586            value: Some(v.clone()),
1587        })
1588        .collect()
1589}
1590
1591/// v0.6 #39: inverse of [`tagset_to_aws`] for input handlers. Missing
1592/// keys / values become empty strings (mirrors AWS, which rejects
1593/// `<Key/>` with InvalidTag at the parser layer; downstream
1594/// `TagSet::validate` then enforces our size limits).
1595fn aws_to_tagset(tags: &[Tag]) -> Result<crate::tagging::TagSet, crate::tagging::TagError> {
1596    let pairs = tags
1597        .iter()
1598        .map(|t| {
1599            (
1600                t.key.clone().unwrap_or_default(),
1601                t.value.clone().unwrap_or_default(),
1602            )
1603        })
1604        .collect();
1605    crate::tagging::TagSet::from_pairs(pairs)
1606}
1607
1608/// `Range` request を decompressed object サイズ `total` に適用して `(start, end_exclusive)`
1609/// を返す。`Range::Int { first, last }` は `bytes=first-last` (last は inclusive)、
1610/// `Range::Suffix { length }` は末尾 `length` byte。S3 仕様に準拠。
1611pub fn resolve_range(range: &s3s::dto::Range, total: u64) -> Result<(u64, u64), String> {
1612    if total == 0 {
1613        return Err("cannot range-get zero-length object".into());
1614    }
1615    match range {
1616        s3s::dto::Range::Int { first, last } => {
1617            let start = *first;
1618            let end_inclusive = match last {
1619                Some(l) => (*l).min(total - 1),
1620                None => total - 1,
1621            };
1622            if start > end_inclusive || start >= total {
1623                return Err(format!(
1624                    "range bytes={start}-{:?} out of object size {total}",
1625                    last
1626                ));
1627            }
1628            Ok((start, end_inclusive + 1))
1629        }
1630        s3s::dto::Range::Suffix { length } => {
1631            let len = (*length).min(total);
1632            Ok((total - len, total))
1633        }
1634    }
1635}
1636
1637#[async_trait::async_trait]
1638impl<B: S3> S3 for S4Service<B> {
1639    // === 圧縮を挟む path (PUT) ===
1640    #[tracing::instrument(
1641        name = "s4.put_object",
1642        skip(self, req),
1643        fields(bucket = %req.input.bucket, key = %req.input.key, codec, bytes_in, bytes_out, latency_ms)
1644    )]
1645    async fn put_object(
1646        &self,
1647        mut req: S3Request<PutObjectInput>,
1648    ) -> S3Result<S3Response<PutObjectOutput>> {
1649        let put_start = Instant::now();
1650        let put_bucket = req.input.bucket.clone();
1651        let put_key = req.input.key.clone();
1652        let access_preamble = self.access_log_preamble(&req);
1653        self.enforce_rate_limit(&req, &put_bucket)?;
1654        // v0.6 #39: parse `x-amz-tagging` (URL-encoded query string) so
1655        // the IAM policy gate sees the request's tags via
1656        // `s3:RequestObjectTag/<key>`. `existing_object_tags` is also
1657        // resolved from the Tagging manager (when wired) so
1658        // `s3:ExistingObjectTag/<key>` works on overwrite.
1659        let request_tags: Option<crate::tagging::TagSet> = req
1660            .input
1661            .tagging
1662            .as_deref()
1663            .map(crate::tagging::parse_tagging_header)
1664            .transpose()
1665            .map_err(|e| S3Error::with_message(S3ErrorCode::InvalidArgument, e.to_string()))?;
1666        let existing_tags: Option<crate::tagging::TagSet> = self
1667            .tagging
1668            .as_ref()
1669            .and_then(|m| m.get_object_tags(&put_bucket, &put_key));
1670        self.enforce_policy_with_extra(
1671            &req,
1672            "s3:PutObject",
1673            &put_bucket,
1674            Some(&put_key),
1675            request_tags.as_ref(),
1676            existing_tags.as_ref(),
1677        )?;
1678        // v0.5 #30: an Object Lock-protected key cannot be overwritten by
1679        // a non-versioned PUT (Suspended / Unversioned bucket). Enabled
1680        // bucket PUTs are exempt because they materialise a fresh
1681        // version under a shadow key (`<key>.__s4ver__/<vid>`) — the
1682        // locked version's bytes are untouched. The check mirrors the
1683        // delete path (Compliance never bypassable, Governance via the
1684        // bypass header, legal hold never).
1685        if let Some(mgr) = self.object_lock.as_ref()
1686            && let Some(state) = mgr.get(&put_bucket, &put_key)
1687        {
1688            let bucket_versioned_enabled = self
1689                .versioning
1690                .as_ref()
1691                .map(|v| v.state(&put_bucket) == crate::versioning::VersioningState::Enabled)
1692                .unwrap_or(false);
1693            if !bucket_versioned_enabled {
1694                let bypass = parse_bypass_governance_header(&req.headers);
1695                let now = chrono::Utc::now();
1696                if !state.can_delete(now, bypass) {
1697                    crate::metrics::record_policy_denial("s3:PutObject", &put_bucket);
1698                    return Err(S3Error::with_message(
1699                        S3ErrorCode::AccessDenied,
1700                        "Access Denied because object protected by object lock",
1701                    ));
1702                }
1703            }
1704        }
1705        // v0.5 #30: per-PUT explicit retention / legal hold (S3
1706        // `x-amz-object-lock-mode`, `x-amz-object-lock-retain-until-date`,
1707        // `x-amz-object-lock-legal-hold`). Captured before the body
1708        // moves into the backend; persisted into the manager only on
1709        // backend success below.
1710        let explicit_lock_mode: Option<crate::object_lock::LockMode> = req
1711            .input
1712            .object_lock_mode
1713            .as_ref()
1714            .and_then(|m| crate::object_lock::LockMode::from_aws_str(m.as_str()));
1715        let explicit_retain_until: Option<chrono::DateTime<chrono::Utc>> = req
1716            .input
1717            .object_lock_retain_until_date
1718            .as_ref()
1719            .and_then(timestamp_to_chrono_utc);
1720        let explicit_legal_hold_on: Option<bool> = req
1721            .input
1722            .object_lock_legal_hold_status
1723            .as_ref()
1724            .map(|s| s.as_str().eq_ignore_ascii_case("ON"));
1725        if let Some(blob) = req.input.body.take() {
1726            // Sample 4 KiB から codec を決定。streaming-aware codec なら streaming
1727            // compress fast path、そうでなければ従来の collect-then-compress。
1728            let (sample, rest_stream) = peek_sample(blob, SAMPLE_BYTES)
1729                .await
1730                .map_err(internal("peek put sample"))?;
1731            let sample_len = sample.len().min(SAMPLE_BYTES);
1732            let kind = self.dispatcher.pick(&sample[..sample_len]).await;
1733
1734            // Passthrough buys nothing from S4F2 wrapping (no compression =
1735            // no per-chunk frame to skip past) and the +28-byte header
1736            // overhead breaks size-sensitive callers that expect a true
1737            // pass-through. So passthrough always uses the legacy raw-blob
1738            // path; only compressing codecs go through the framed path.
1739            let use_framed = supports_streaming_compress(kind) && kind != CodecKind::Passthrough;
1740            let (compressed, manifest, is_framed) = if use_framed {
1741                // streaming fast path: input は memory に collect しない
1742                let chained = chain_sample_with_rest(sample, rest_stream);
1743                debug!(
1744                    bucket = ?req.input.bucket,
1745                    key = ?req.input.key,
1746                    codec = kind.as_str(),
1747                    path = "streaming-framed",
1748                    "S4 put_object: compressing (streaming, S4F2 multi-frame)"
1749                );
1750                // v0.4 #16: pick the chunk size based on the request's
1751                // Content-Length when known, falling back to the 4 MiB
1752                // default for chunked transfers.
1753                let chunk_size = pick_chunk_size(req.input.content_length.map(|n| n as u64));
1754                let (body, manifest) = streaming_compress_to_frames(
1755                    chained,
1756                    Arc::clone(&self.registry),
1757                    kind,
1758                    chunk_size,
1759                )
1760                .await
1761                .map_err(internal("streaming framed compress"))?;
1762                (body, manifest, true)
1763            } else {
1764                // GPU codec 等で streaming-aware でないものは bytes-buffered path
1765                // (raw 圧縮 bytes、framed なし — back-compat 互換 path)
1766                let bytes = collect_with_sample(sample, rest_stream, self.max_body_bytes)
1767                    .await
1768                    .map_err(internal("collect put body (buffered path)"))?;
1769                debug!(
1770                    bucket = ?req.input.bucket,
1771                    key = ?req.input.key,
1772                    bytes = bytes.len(),
1773                    codec = kind.as_str(),
1774                    path = "buffered",
1775                    "S4 put_object: compressing (buffered, raw blob)"
1776                );
1777                let (body, m) = self
1778                    .registry
1779                    .compress(bytes, kind)
1780                    .await
1781                    .map_err(internal("registry compress"))?;
1782                (body, m, false)
1783            };
1784
1785            write_manifest(&mut req.input.metadata, &manifest);
1786            if is_framed {
1787                // v0.2 #4: framed body であることを GET 側に伝える meta flag。
1788                req.input
1789                    .metadata
1790                    .get_or_insert_with(Default::default)
1791                    .insert(META_FRAMED.into(), "true".into());
1792            }
1793            // 重要: content_length を圧縮後サイズで更新する。
1794            // これを忘れると下流 (aws-sdk-s3 → S3) が宣言サイズ分の bytes を
1795            // 待ち続けて RequestTimeout で失敗する (S3 仕様)。
1796            req.input.content_length = Some(compressed.len() as i64);
1797            // body を書き換えたので、客側が送ってきた original body 用の
1798            // checksum / MD5 ヘッダは無効化する (そのまま転送すると下流 S3 が
1799            // XAmzContentChecksumMismatch を返す)。S4 自身の整合性は
1800            // ChunkManifest.crc32c で担保している。
1801            req.input.checksum_algorithm = None;
1802            req.input.checksum_crc32 = None;
1803            req.input.checksum_crc32c = None;
1804            req.input.checksum_crc64nvme = None;
1805            req.input.checksum_sha1 = None;
1806            req.input.checksum_sha256 = None;
1807            req.input.content_md5 = None;
1808            let original_size = manifest.original_size;
1809            let compressed_size = manifest.compressed_size;
1810            let codec_label = manifest.codec.as_str();
1811            // framed body は GET 側で sidecar partial-fetch を効かせるため
1812            // build_index_from_body で sidecar を組み立てて backend に PUT する。
1813            let sidecar_index = if is_framed {
1814                s4_codec::index::build_index_from_body(&compressed).ok()
1815            } else {
1816                None
1817            };
1818            // v0.4 #21 / v0.5 #29 / v0.5 #27: encrypt-after-compress.
1819            // Precedence:
1820            //   - SSE-C headers present → per-request customer key (S4E3)
1821            //   - server-managed keyring configured → active key (S4E2)
1822            //   - neither → no encryption (raw compressed body)
1823            // The `s4-encrypted: aes-256-gcm` metadata flag is set in
1824            // both encrypted modes; the on-disk frame magic distinguishes
1825            // S4E1 / S4E2 / S4E3 so GET picks the right decrypt path.
1826            // v0.7 #48 BUG-2/3 fix: take() the SSE fields off req.input
1827            // so the encryption headers are NOT forwarded to the
1828            // backend. S4 owns the encrypt-then-store contract; if we
1829            // leave the headers in place, real S3-compat backends
1830            // (MinIO / AWS) try to apply their own SSE on top and
1831            // either reject (MinIO requires HTTPS for SSE-C) or fail
1832            // (MinIO has no KMS configured). MemoryBackend ignored
1833            // these so mock tests passed.
1834            let sse_c_alg = req.input.sse_customer_algorithm.take();
1835            let sse_c_key = req.input.sse_customer_key.take();
1836            let sse_c_md5 = req.input.sse_customer_key_md5.take();
1837            let sse_header = req.input.server_side_encryption.take();
1838            let sse_kms_key = req.input.ssekms_key_id.take();
1839            let sse_c_material = extract_sse_c_material(&sse_c_alg, &sse_c_key, &sse_c_md5)?;
1840            // v0.5 #28: SSE-KMS request? Resolves to None unless the
1841            // request asks for `aws:kms` AND a key id is available
1842            // (explicit header or gateway default). When set, we'll
1843            // generate a per-object DEK below.
1844            let kms_key_id = extract_kms_key_id(
1845                &sse_header,
1846                &sse_kms_key,
1847                self.kms_default_key_id.as_deref(),
1848            );
1849            // v0.5 #32: in compliance-strict mode, every PUT must
1850            // declare SSE — either client-supplied (SSE-C), KMS, or by
1851            // virtue of a server-side keyring being configured (which
1852            // applies SSE-S4 to every PUT automatically). Requests that
1853            // would otherwise land as plain compressed bytes are
1854            // rejected with 400 InvalidRequest.
1855            if self.compliance_strict
1856                && sse_c_material.is_none()
1857                && kms_key_id.is_none()
1858                && self.sse_keyring.is_none()
1859                && sse_header.as_ref().map(|s| s.as_str())
1860                    != Some(ServerSideEncryption::AES256)
1861            {
1862                return Err(S3Error::with_message(
1863                    S3ErrorCode::InvalidRequest,
1864                    "compliance-mode strict: PUT must include x-amz-server-side-encryption \
1865                     (AES256 or aws:kms) or x-amz-server-side-encryption-customer-* headers",
1866                ));
1867            }
1868            // SSE-C and SSE-KMS are mutually exclusive on a single PUT
1869            // (AWS S3 returns 400 InvalidArgument). SSE-C wins by spec.
1870            if sse_c_material.is_some() && kms_key_id.is_some() {
1871                return Err(S3Error::with_message(
1872                    S3ErrorCode::InvalidArgument,
1873                    "SSE-C and SSE-KMS cannot be used together on the same PUT",
1874                ));
1875            }
1876            // KMS path needs to call generate_dek().await before the
1877            // body_to_send branch; capture the result here.
1878            let kms_wrap = if let Some(ref key_id) = kms_key_id {
1879                let kms = self.kms.as_ref().ok_or_else(|| {
1880                    S3Error::with_message(
1881                        S3ErrorCode::InvalidRequest,
1882                        "SSE-KMS requested but no --kms-local-dir / --kms-aws-region is configured on this gateway",
1883                    )
1884                })?;
1885                let (dek, wrapped) = kms
1886                    .generate_dek(key_id)
1887                    .await
1888                    .map_err(kms_error_to_s3)?;
1889                if dek.len() != 32 {
1890                    return Err(S3Error::with_message(
1891                        S3ErrorCode::InternalError,
1892                        format!("KMS backend returned a DEK of {} bytes (expected 32)", dek.len()),
1893                    ));
1894                }
1895                let mut dek_arr = [0u8; 32];
1896                dek_arr.copy_from_slice(&dek);
1897                Some((dek_arr, wrapped))
1898            } else {
1899                None
1900            };
1901            // v0.7 #48 BUG-4 fix: stamp the SSE *type* into metadata
1902            // alongside `s4-encrypted` so HEAD (which doesn't fetch the
1903            // body) can echo the correct `x-amz-server-side-encryption`
1904            // value. Without this, HEAD on an SSE-KMS object would not
1905            // echo `aws:kms` because the frame magic is only available
1906            // on the body (which HEAD doesn't read).
1907            let body_to_send = if let Some(ref m) = sse_c_material {
1908                let meta = req.input.metadata.get_or_insert_with(Default::default);
1909                meta.insert("s4-encrypted".into(), "aes-256-gcm".into());
1910                meta.insert("s4-sse-type".into(), "AES256".into());
1911                meta.insert("s4-sse-c-key-md5".into(),
1912                    base64::engine::general_purpose::STANDARD.encode(m.key_md5));
1913                crate::sse::encrypt_with_source(
1914                    &compressed,
1915                    crate::sse::SseSource::CustomerKey {
1916                        key: &m.key,
1917                        key_md5: &m.key_md5,
1918                    },
1919                )
1920            } else if let Some((ref dek, ref wrapped)) = kms_wrap {
1921                let meta = req.input.metadata.get_or_insert_with(Default::default);
1922                meta.insert("s4-encrypted".into(), "aes-256-gcm".into());
1923                meta.insert("s4-sse-type".into(), "aws:kms".into());
1924                meta.insert("s4-sse-kms-key-id".into(), wrapped.key_id.clone());
1925                crate::sse::encrypt_with_source(
1926                    &compressed,
1927                    crate::sse::SseSource::Kms { dek, wrapped },
1928                )
1929            } else if let Some(keyring) = self.sse_keyring.as_ref() {
1930                // SSE-S4 is server-driven transparent encryption; the
1931                // client didn't ask for SSE. We stamp `s4-encrypted`
1932                // (internal flag the GET path needs) but deliberately
1933                // do NOT stamp `s4-sse-type` — that lights up the HEAD
1934                // echo of `x-amz-server-side-encryption: AES256`,
1935                // which would falsely advertise AWS-style SSE-S3
1936                // semantics the operator didn't request.
1937                let meta = req.input.metadata.get_or_insert_with(Default::default);
1938                meta.insert("s4-encrypted".into(), "aes-256-gcm".into());
1939                crate::sse::encrypt_v2(&compressed, keyring)
1940            } else {
1941                compressed.clone()
1942            };
1943            // v0.6 #40: capture the about-to-be-sent body + metadata so
1944            // the replication dispatcher (run after the source PUT
1945            // succeeds) can hand the same backend bytes to the
1946            // destination bucket. `Bytes` clone is cheap (refcounted).
1947            let replication_body = body_to_send.clone();
1948            let replication_metadata = req.input.metadata.clone();
1949            // v0.7 #48 BUG-1 fix: SSE encryption (S4E1/E2/E3/E4 frames)
1950            // makes the body longer than the post-compression bytes
1951            // (header + nonce + tag overhead). The earlier
1952            // content_length stamp at compressed.len() is now stale, so
1953            // re-stamp from the actual bytes about to be sent or the
1954            // backend (real S3 / MinIO) rejects with
1955            // `StreamLengthMismatch`. MemoryBackend never validated
1956            // this, which is why mock-only tests passed.
1957            req.input.content_length = Some(body_to_send.len() as i64);
1958            req.input.body = Some(bytes_to_blob(body_to_send));
1959            // v0.5 #34: pre-allocate a version-id when the bucket is
1960            // Enabled, then redirect the backend storage key to the
1961            // shadow path so older versions survive newer PUTs.
1962            // Suspended / Unversioned buckets keep using the plain
1963            // `<key>` (S3 spec: Suspended overwrites the same backend
1964            // object). Pre-allocation (instead of recording after PUT)
1965            // ensures the shadow key + the response's
1966            // `x-amz-version-id` use the same vid.
1967            let pending_version: Option<crate::versioning::PutOutcome> = self
1968                .versioning
1969                .as_ref()
1970                .map(|mgr| mgr.state(&put_bucket))
1971                .map(|state| match state {
1972                    crate::versioning::VersioningState::Enabled => {
1973                        crate::versioning::PutOutcome {
1974                            version_id: crate::versioning::VersioningManager::new_version_id(),
1975                            versioned_response: true,
1976                        }
1977                    }
1978                    crate::versioning::VersioningState::Suspended
1979                    | crate::versioning::VersioningState::Unversioned => {
1980                        crate::versioning::PutOutcome {
1981                            version_id: crate::versioning::NULL_VERSION_ID.to_owned(),
1982                            versioned_response: false,
1983                        }
1984                    }
1985                });
1986            if let Some(ref pv) = pending_version
1987                && pv.versioned_response
1988            {
1989                req.input.key = versioned_shadow_key(&put_key, &pv.version_id);
1990            }
1991            let mut backend_resp = self.backend.put_object(req).await;
1992            if let Some(idx) = sidecar_index
1993                && backend_resp.is_ok()
1994                && idx.entries.len() > 1
1995            {
1996                // 1 chunk しかない (small object) なら sidecar は意味がない (=
1997                // partial fetch しても full body と同じ範囲) ので省略。
1998                // Sidecar は user-visible key で書く (latest version の
1999                // partial fetch path 用)。Old versions の Range GET は今 task
2000                // の scope 外 (full read fallback でも意味的には正しい)。
2001                self.write_sidecar(&put_bucket, &put_key, &idx).await;
2002            }
2003            // v0.5 #34: commit the new version into the manager only on
2004            // backend success. Use the pre-allocated vid so the response
2005            // header and the chain entry agree.
2006            if let (Some(mgr), Some(pv), Ok(resp)) = (
2007                self.versioning.as_ref(),
2008                pending_version.as_ref(),
2009                backend_resp.as_mut(),
2010            ) {
2011                let etag = resp
2012                    .output
2013                    .e_tag
2014                    .clone()
2015                    .map(ETag::into_value)
2016                    .unwrap_or_else(|| format!("\"crc32c-{}\"", manifest.crc32c));
2017                let now = chrono::Utc::now();
2018                mgr.commit_put_with_version(
2019                    &put_bucket,
2020                    &put_key,
2021                    crate::versioning::VersionEntry {
2022                        version_id: pv.version_id.clone(),
2023                        etag,
2024                        size: original_size,
2025                        is_delete_marker: false,
2026                        created_at: now,
2027                    },
2028                );
2029                if pv.versioned_response {
2030                    resp.output.version_id = Some(pv.version_id.clone());
2031                }
2032            }
2033            // v0.5 #27: AWS S3 echoes the SSE-C headers back on success
2034            // so the client knows the server actually applied the
2035            // requested algorithm and which key fingerprint matched.
2036            if let (Some(m), Ok(resp)) = (sse_c_material.as_ref(), backend_resp.as_mut()) {
2037                resp.output.sse_customer_algorithm = Some(crate::sse::SSE_C_ALGORITHM.into());
2038                resp.output.sse_customer_key_md5 = Some(
2039                    base64::engine::general_purpose::STANDARD.encode(m.key_md5),
2040                );
2041            }
2042            // v0.5 #28: SSE-KMS echo — `aws:kms` + the canonical key id
2043            // the backend returned (AWS KMS returns the ARN even when
2044            // the request used an alias).
2045            if let (Some((_, wrapped)), Ok(resp)) =
2046                (kms_wrap.as_ref(), backend_resp.as_mut())
2047            {
2048                resp.output.server_side_encryption =
2049                    Some(ServerSideEncryption::from_static(ServerSideEncryption::AWS_KMS));
2050                resp.output.ssekms_key_id = Some(wrapped.key_id.clone());
2051            }
2052            // v0.5 #30: persist any per-PUT explicit retention / legal
2053            // hold the client supplied, then auto-apply the bucket
2054            // default (no-op when state is already populated). The
2055            // explicit fields take precedence — the bucket-default
2056            // helper bails out as soon as it sees any retention.
2057            if let (Some(mgr), Ok(_)) = (self.object_lock.as_ref(), backend_resp.as_ref()) {
2058                if explicit_lock_mode.is_some()
2059                    || explicit_retain_until.is_some()
2060                    || explicit_legal_hold_on.is_some()
2061                {
2062                    let mut state = mgr.get(&put_bucket, &put_key).unwrap_or_default();
2063                    if let Some(m) = explicit_lock_mode {
2064                        state.mode = Some(m);
2065                    }
2066                    if let Some(u) = explicit_retain_until {
2067                        state.retain_until = Some(u);
2068                    }
2069                    if let Some(lh) = explicit_legal_hold_on {
2070                        state.legal_hold_on = lh;
2071                    }
2072                    mgr.set(&put_bucket, &put_key, state);
2073                }
2074                mgr.apply_default_on_put(&put_bucket, &put_key, chrono::Utc::now());
2075            }
2076            let _ = (original_size, compressed_size); // mute unused warnings
2077            let elapsed = put_start.elapsed();
2078            crate::metrics::record_put(
2079                codec_label,
2080                original_size,
2081                compressed_size,
2082                elapsed.as_secs_f64(),
2083                backend_resp.is_ok(),
2084            );
2085            // v0.4 #20: structured access-log entry (best-effort).
2086            self.record_access(
2087                access_preamble,
2088                "REST.PUT.OBJECT",
2089                &put_bucket,
2090                Some(&put_key),
2091                if backend_resp.is_ok() { 200 } else { 500 },
2092                compressed_size,
2093                original_size,
2094                elapsed.as_millis() as u64,
2095                backend_resp.as_ref().err().map(|e| e.code().as_str()),
2096            )
2097            .await;
2098            info!(
2099                op = "put_object",
2100                bucket = %put_bucket,
2101                key = %put_key,
2102                codec = codec_label,
2103                bytes_in = original_size,
2104                bytes_out = compressed_size,
2105                ratio = format!(
2106                    "{:.3}",
2107                    if original_size == 0 { 1.0 } else { compressed_size as f64 / original_size as f64 }
2108                ),
2109                latency_ms = elapsed.as_millis() as u64,
2110                ok = backend_resp.is_ok(),
2111                "S4 put completed"
2112            );
2113            // v0.6 #35: fire bucket-notification destinations (best-effort,
2114            // detached). Skipped when no manager is attached or when the
2115            // bucket has no rule matching `s3:ObjectCreated:Put` for this
2116            // key.
2117            if backend_resp.is_ok()
2118                && let Some(mgr) = self.notifications.as_ref()
2119            {
2120                let dests = mgr.match_destinations(
2121                    &put_bucket,
2122                    &crate::notifications::EventType::ObjectCreatedPut,
2123                    &put_key,
2124                );
2125                if !dests.is_empty() {
2126                    let etag = backend_resp
2127                        .as_ref()
2128                        .ok()
2129                        .and_then(|r| r.output.e_tag.clone())
2130                        .map(ETag::into_value);
2131                    let version_id = pending_version
2132                        .as_ref()
2133                        .filter(|pv| pv.versioned_response)
2134                        .map(|pv| pv.version_id.clone());
2135                    tokio::spawn(crate::notifications::dispatch_event(
2136                        Arc::clone(mgr),
2137                        put_bucket.clone(),
2138                        put_key.clone(),
2139                        crate::notifications::EventType::ObjectCreatedPut,
2140                        Some(original_size),
2141                        etag,
2142                        version_id,
2143                        format!("S4-{}", uuid::Uuid::new_v4()),
2144                    ));
2145                }
2146            }
2147            // v0.6 #39: persist parsed `x-amz-tagging` tags into the
2148            // tagging manager on a successful PUT. AWS PutObject's
2149            // tagging is a full-replace operation (not a merge), so
2150            // any pre-existing entry for `(bucket, key)` is overwritten.
2151            if backend_resp.is_ok()
2152                && let (Some(mgr), Some(tags)) =
2153                    (self.tagging.as_ref(), request_tags.clone())
2154            {
2155                mgr.put_object_tags(&put_bucket, &put_key, tags);
2156            }
2157            // v0.6 #40: cross-bucket replication fire-point. On
2158            // successful source PUT, consult the replication manager;
2159            // when an enabled rule matches, mark the source key
2160            // `Pending` and spawn a detached task that PUTs the same
2161            // backend bytes + metadata to the rule's destination
2162            // bucket. The dispatcher itself records `Completed` /
2163            // `Failed` and bumps the drop counter on retry-budget
2164            // exhaustion.
2165            self.spawn_replication_if_matched(
2166                &put_bucket,
2167                &put_key,
2168                &request_tags,
2169                &replication_body,
2170                &replication_metadata,
2171                backend_resp.is_ok(),
2172            );
2173            return backend_resp;
2174        }
2175        // Body-less PUT (rare: zero-length object). Mirror the body-full
2176        // versioning hooks so list_object_versions / GET-by-version still see
2177        // empty-body objects in the chain.
2178        let pending_version: Option<crate::versioning::PutOutcome> = self
2179            .versioning
2180            .as_ref()
2181            .map(|mgr| mgr.state(&put_bucket))
2182            .map(|state| match state {
2183                crate::versioning::VersioningState::Enabled => crate::versioning::PutOutcome {
2184                    version_id: crate::versioning::VersioningManager::new_version_id(),
2185                    versioned_response: true,
2186                },
2187                _ => crate::versioning::PutOutcome {
2188                    version_id: crate::versioning::NULL_VERSION_ID.to_owned(),
2189                    versioned_response: false,
2190                },
2191            });
2192        if let Some(ref pv) = pending_version
2193            && pv.versioned_response
2194        {
2195            req.input.key = versioned_shadow_key(&put_key, &pv.version_id);
2196        }
2197        let mut backend_resp = self.backend.put_object(req).await;
2198        if let (Some(mgr), Some(pv), Ok(resp)) = (
2199            self.versioning.as_ref(),
2200            pending_version.as_ref(),
2201            backend_resp.as_mut(),
2202        ) {
2203            let etag = resp
2204                .output
2205                .e_tag
2206                .clone()
2207                .map(ETag::into_value)
2208                .unwrap_or_default();
2209            let now = chrono::Utc::now();
2210            mgr.commit_put_with_version(
2211                &put_bucket,
2212                &put_key,
2213                crate::versioning::VersionEntry {
2214                    version_id: pv.version_id.clone(),
2215                    etag,
2216                    size: 0,
2217                    is_delete_marker: false,
2218                    created_at: now,
2219                },
2220            );
2221            if pv.versioned_response {
2222                resp.output.version_id = Some(pv.version_id.clone());
2223            }
2224        }
2225        // v0.5 #30: same explicit-then-default lock-state commit as the
2226        // body-bearing branch above, so a zero-length PUT also picks up
2227        // bucket-default retention.
2228        if let (Some(mgr), Ok(_)) = (self.object_lock.as_ref(), backend_resp.as_ref()) {
2229            if explicit_lock_mode.is_some()
2230                || explicit_retain_until.is_some()
2231                || explicit_legal_hold_on.is_some()
2232            {
2233                let mut state = mgr.get(&put_bucket, &put_key).unwrap_or_default();
2234                if let Some(m) = explicit_lock_mode {
2235                    state.mode = Some(m);
2236                }
2237                if let Some(u) = explicit_retain_until {
2238                    state.retain_until = Some(u);
2239                }
2240                if let Some(lh) = explicit_legal_hold_on {
2241                    state.legal_hold_on = lh;
2242                }
2243                mgr.set(&put_bucket, &put_key, state);
2244            }
2245            mgr.apply_default_on_put(&put_bucket, &put_key, chrono::Utc::now());
2246        }
2247        // v0.6 #35: same notification fire-point as the body-bearing PUT
2248        // branch above (zero-length objects still match `ObjectCreated:Put`
2249        // rules per the AWS event taxonomy).
2250        if backend_resp.is_ok()
2251            && let Some(mgr) = self.notifications.as_ref()
2252        {
2253            let dests = mgr.match_destinations(
2254                &put_bucket,
2255                &crate::notifications::EventType::ObjectCreatedPut,
2256                &put_key,
2257            );
2258            if !dests.is_empty() {
2259                let etag = backend_resp
2260                    .as_ref()
2261                    .ok()
2262                    .and_then(|r| r.output.e_tag.clone())
2263                    .map(ETag::into_value);
2264                let version_id = pending_version
2265                    .as_ref()
2266                    .filter(|pv| pv.versioned_response)
2267                    .map(|pv| pv.version_id.clone());
2268                tokio::spawn(crate::notifications::dispatch_event(
2269                    Arc::clone(mgr),
2270                    put_bucket.clone(),
2271                    put_key.clone(),
2272                    crate::notifications::EventType::ObjectCreatedPut,
2273                    Some(0),
2274                    etag,
2275                    version_id,
2276                    format!("S4-{}", uuid::Uuid::new_v4()),
2277                ));
2278            }
2279        }
2280        // v0.6 #39: persist parsed `x-amz-tagging` for the body-less
2281        // (zero-length) PUT branch too — same shape as the body-bearing
2282        // branch above.
2283        if backend_resp.is_ok()
2284            && let (Some(mgr), Some(tags)) = (self.tagging.as_ref(), request_tags.clone())
2285        {
2286            mgr.put_object_tags(&put_bucket, &put_key, tags);
2287        }
2288        // v0.6 #40: cross-bucket replication for the zero-length PUT
2289        // branch — same shape as the body-bearing branch above.
2290        self.spawn_replication_if_matched(
2291            &put_bucket,
2292            &put_key,
2293            &request_tags,
2294            &bytes::Bytes::new(),
2295            &None,
2296            backend_resp.is_ok(),
2297        );
2298        backend_resp
2299    }
2300
2301    // === 圧縮を解く path (GET) ===
2302    #[tracing::instrument(
2303        name = "s4.get_object",
2304        skip(self, req),
2305        fields(bucket = %req.input.bucket, key = %req.input.key, codec, bytes_out, range, path)
2306    )]
2307    async fn get_object(
2308        &self,
2309        mut req: S3Request<GetObjectInput>,
2310    ) -> S3Result<S3Response<GetObjectOutput>> {
2311        let get_start = Instant::now();
2312        let get_bucket = req.input.bucket.clone();
2313        let get_key = req.input.key.clone();
2314        self.enforce_rate_limit(&req, &get_bucket)?;
2315        self.enforce_policy(&req, "s3:GetObject", &get_bucket, Some(&get_key))?;
2316        // Range request の事前検出 (decompress 後 slice する path に使う)。
2317        let range_request = req.input.range.take();
2318        // v0.5 #27: pull SSE-C material from the input headers before
2319        // the request is moved into the backend. A header parse error
2320        // fails fast (no body fetch). The material is consumed below
2321        // when decrypting an S4E3-framed body; the SSE-C headers on
2322        // `req.input` are cleared so the backend doesn't see them.
2323        let sse_c_alg = req.input.sse_customer_algorithm.take();
2324        let sse_c_key = req.input.sse_customer_key.take();
2325        let sse_c_md5 = req.input.sse_customer_key_md5.take();
2326        let get_sse_c_material =
2327            extract_sse_c_material(&sse_c_alg, &sse_c_key, &sse_c_md5)?;
2328
2329        // v0.5 #34: route the GET through the VersioningManager when
2330        // attached AND the bucket is in a versioning-aware state.
2331        // Resolves which version to fetch (explicit `?versionId=` query
2332        // param vs. chain latest), translates a delete-marker into 404
2333        // NoSuchKey, and rewrites the backend storage key to the shadow
2334        // path (`<key>.__s4ver__/<vid>`) for non-null Enabled-bucket
2335        // versions. `resolved_version_id` is stamped onto the response
2336        // so clients see a coherent `x-amz-version-id` header.
2337        //
2338        // When the bucket is Unversioned (or no manager attached), the
2339        // chain-resolution step is skipped and the request flows
2340        // through the existing single-key path unchanged.
2341        let resolved_version_id: Option<String> = match self.versioning.as_ref() {
2342            Some(mgr)
2343                if mgr.state(&get_bucket) != crate::versioning::VersioningState::Unversioned =>
2344            {
2345                let req_vid = req.input.version_id.take();
2346                let entry = match req_vid.as_deref() {
2347                    Some(vid) => mgr.lookup_version(&get_bucket, &get_key, vid).ok_or_else(
2348                        || S3Error::with_message(
2349                            S3ErrorCode::NoSuchVersion,
2350                            format!("no such version: {vid}"),
2351                        ),
2352                    )?,
2353                    None => mgr.lookup_latest(&get_bucket, &get_key).ok_or_else(|| {
2354                        S3Error::with_message(
2355                            S3ErrorCode::NoSuchKey,
2356                            format!("no such key: {get_key}"),
2357                        )
2358                    })?,
2359                };
2360                if entry.is_delete_marker {
2361                    // S3 spec: GET without versionId on a
2362                    // delete-marker latest → 404 NoSuchKey + the
2363                    // response carries `x-amz-delete-marker: true`.
2364                    // GET with explicit versionId pointing at a delete
2365                    // marker → 405 MethodNotAllowed; we surface
2366                    // NoSuchKey here for both since s3s collapses them
2367                    // into the same not-found error path.
2368                    return Err(S3Error::with_message(
2369                        S3ErrorCode::NoSuchKey,
2370                        format!("delete marker is the current version of {get_key}"),
2371                    ));
2372                }
2373                if entry.version_id != crate::versioning::NULL_VERSION_ID {
2374                    req.input.key = versioned_shadow_key(&get_key, &entry.version_id);
2375                }
2376                Some(entry.version_id)
2377            }
2378            _ => None,
2379        };
2380
2381        // ====== Range GET の partial-fetch fast path (sidecar index 利用) ======
2382        // sidecar `<key>.s4index` が存在し、multipart-framed object であれば
2383        // 必要 frame だけを backend に Range GET し帯域節約する。
2384        if let Some(ref r) = range_request
2385            && let Some(index) = self.read_sidecar(&req.input.bucket, &req.input.key).await
2386        {
2387            let total = index.total_original_size();
2388            let (start, end_exclusive) = match resolve_range(r, total) {
2389                Ok(v) => v,
2390                Err(e) => {
2391                    return Err(S3Error::with_message(S3ErrorCode::InvalidRange, e));
2392                }
2393            };
2394            if let Some(plan) = index.lookup_range(start, end_exclusive) {
2395                return self
2396                    .partial_range_get(&req, plan, start, end_exclusive, total, get_start)
2397                    .await;
2398            }
2399        }
2400        let mut resp = self.backend.get_object(req).await?;
2401        // v0.5 #34: stamp the resolved version-id so the client sees a
2402        // coherent `x-amz-version-id` header (only for chains owned by
2403        // the manager — Unversioned buckets / no-manager paths never
2404        // set this).
2405        if let Some(ref vid) = resolved_version_id {
2406            resp.output.version_id = Some(vid.clone());
2407        }
2408        let is_multipart = is_multipart_object(&resp.output.metadata);
2409        let is_framed_v2 = is_framed_v2_object(&resp.output.metadata);
2410        // v0.2 #4: framed-v2 single-PUT は多 frame parse が必要なので
2411        // multipart と同じ path に流す。
2412        let needs_frame_parse = is_multipart || is_framed_v2;
2413        let manifest_opt = extract_manifest(&resp.output.metadata);
2414
2415        if !needs_frame_parse && manifest_opt.is_none() {
2416            // S4 が書いていないオブジェクトは透過 (raw bucket pre-existing object 等)
2417            debug!("S4 get_object: object lacks s4-codec metadata, returning as-is");
2418            return Ok(resp);
2419        }
2420
2421        if let Some(blob) = resp.output.body.take() {
2422            // v0.4 #21 / v0.5 #27: if the object was stored under SSE
2423            // (metadata flag `s4-encrypted: aes-256-gcm`), decrypt
2424            // before any frame parse / streaming decompress. Encrypted
2425            // bodies are opaque to the codec; this also forces the
2426            // buffered path because AES-GCM needs the full body for tag
2427            // verify. SSE-C uses the per-request customer key, SSE-S4
2428            // falls back to the configured keyring.
2429            let blob = if is_sse_encrypted(&resp.output.metadata) {
2430                let body = collect_blob(blob, self.max_body_bytes)
2431                    .await
2432                    .map_err(internal("collect SSE-encrypted body"))?;
2433                // v0.5 #28: peek the frame magic to route the right
2434                // decrypt path. S4E4 means SSE-KMS — unwrap the DEK
2435                // through the KMS backend (async). S4E1/E2/E3 take the
2436                // sync path (keyring or customer key).
2437                let plain = match crate::sse::peek_magic(&body) {
2438                    Some("S4E4") => {
2439                        let kms = self.kms.as_ref().ok_or_else(|| {
2440                            S3Error::with_message(
2441                                S3ErrorCode::InvalidRequest,
2442                                "object is SSE-KMS encrypted but no --kms-local-dir / --kms-aws-region is configured on this gateway",
2443                            )
2444                        })?;
2445                        let kms_ref: &dyn crate::kms::KmsBackend = kms.as_ref();
2446                        crate::sse::decrypt_with_kms(&body, kms_ref)
2447                            .await
2448                            .map_err(|e| match e {
2449                                crate::sse::SseError::KmsBackend(k) => kms_error_to_s3(k),
2450                                other => S3Error::with_message(
2451                                    S3ErrorCode::InternalError,
2452                                    format!("SSE-KMS decrypt failed: {other}"),
2453                                ),
2454                            })?
2455                    }
2456                    _ => {
2457                        if let Some(ref m) = get_sse_c_material {
2458                            crate::sse::decrypt(
2459                                &body,
2460                                crate::sse::SseSource::CustomerKey {
2461                                    key: &m.key,
2462                                    key_md5: &m.key_md5,
2463                                },
2464                            )
2465                            .map_err(sse_c_error_to_s3)?
2466                        } else {
2467                            let keyring = self.sse_keyring.as_ref().ok_or_else(|| {
2468                                S3Error::with_message(
2469                                    S3ErrorCode::InvalidRequest,
2470                                    "object is SSE-S4 encrypted but no --sse-s4-key is configured on this gateway",
2471                                )
2472                            })?;
2473                            crate::sse::decrypt(&body, keyring).map_err(|e| {
2474                                S3Error::with_message(
2475                                    S3ErrorCode::InternalError,
2476                                    format!("SSE-S4 decrypt failed: {e}"),
2477                                )
2478                            })?
2479                        }
2480                    }
2481                };
2482                // v0.5 #28: parse out the on-disk wrapped DEK's key id
2483                // so the GET response can echo `x-amz-server-side-encryption-aws-kms-key-id`.
2484                if matches!(crate::sse::peek_magic(&body), Some("S4E4"))
2485                    && let Ok(hdr) = crate::sse::parse_s4e4_header(&body)
2486                {
2487                    resp.output.server_side_encryption = Some(
2488                        ServerSideEncryption::from_static(ServerSideEncryption::AWS_KMS),
2489                    );
2490                    resp.output.ssekms_key_id = Some(hdr.key_id.to_string());
2491                }
2492                bytes_to_blob(plain)
2493            } else if let Some(ref m) = get_sse_c_material {
2494                // Client sent SSE-C headers for an unencrypted object —
2495                // mirror AWS S3's 400 InvalidRequest.
2496                let _ = m;
2497                return Err(sse_c_error_to_s3(crate::sse::SseError::CustomerKeyUnexpected));
2498            } else {
2499                blob
2500            };
2501            // v0.5 #27: SSE-C echo on success — algorithm + key MD5
2502            // tell the client that the supplied key was the one used.
2503            if let Some(ref m) = get_sse_c_material {
2504                resp.output.sse_customer_algorithm = Some(crate::sse::SSE_C_ALGORITHM.into());
2505                resp.output.sse_customer_key_md5 = Some(
2506                    base64::engine::general_purpose::STANDARD.encode(m.key_md5),
2507                );
2508            }
2509            // ====== Streaming fast path (CpuZstd, non-multipart, codec supports it) ======
2510            // 大規模 object (e.g. 5 GB) を memory に collect すると OOM するので、
2511            // codec が streaming-aware なら body を chunk-by-chunk で decompress して
2512            // 即座に client に流す。
2513            //
2514            // ただし Range request 時は streaming できない (slice するため total bytes
2515            // が必要) → buffered path に fall through。
2516            if range_request.is_none()
2517                && !needs_frame_parse
2518                && let Some(ref m) = manifest_opt
2519                && supports_streaming_decompress(m.codec)
2520                && m.codec == CodecKind::CpuZstd
2521            {
2522                let decompressed_blob = cpu_zstd_decompress_stream(blob);
2523                resp.output.content_length = Some(m.original_size as i64);
2524                resp.output.checksum_crc32 = None;
2525                resp.output.checksum_crc32c = None;
2526                resp.output.checksum_crc64nvme = None;
2527                resp.output.checksum_sha1 = None;
2528                resp.output.checksum_sha256 = None;
2529                resp.output.e_tag = None;
2530                resp.output.body = Some(decompressed_blob);
2531                let elapsed = get_start.elapsed();
2532                crate::metrics::record_get(
2533                    m.codec.as_str(),
2534                    m.compressed_size,
2535                    m.original_size,
2536                    elapsed.as_secs_f64(),
2537                    true,
2538                );
2539                info!(
2540                    op = "get_object",
2541                    bucket = %get_bucket,
2542                    key = %get_key,
2543                    codec = m.codec.as_str(),
2544                    bytes_in = m.compressed_size,
2545                    bytes_out = m.original_size,
2546                    path = "streaming",
2547                    setup_latency_ms = elapsed.as_millis() as u64,
2548                    "S4 get started (streaming)"
2549                );
2550                return Ok(resp);
2551            }
2552            // Passthrough: そのまま流す (Range なしの場合のみ streaming)
2553            if range_request.is_none()
2554                && !needs_frame_parse
2555                && let Some(ref m) = manifest_opt
2556                && m.codec == CodecKind::Passthrough
2557            {
2558                resp.output.content_length = Some(m.original_size as i64);
2559                resp.output.checksum_crc32 = None;
2560                resp.output.checksum_crc32c = None;
2561                resp.output.checksum_crc64nvme = None;
2562                resp.output.checksum_sha1 = None;
2563                resp.output.checksum_sha256 = None;
2564                resp.output.e_tag = None;
2565                resp.output.body = Some(blob);
2566                debug!("S4 get_object: passthrough streaming");
2567                return Ok(resp);
2568            }
2569
2570            // ====== Buffered slow path (multipart frame parser, GPU codecs) ======
2571            let bytes = collect_blob(blob, self.max_body_bytes)
2572                .await
2573                .map_err(internal("collect get body"))?;
2574
2575            let decompressed = if needs_frame_parse {
2576                // multipart objects と framed-v2 single-PUT objects は同じ
2577                // S4F2 frame 列なので decompress_multipart で統一処理
2578                self.decompress_multipart(bytes).await?
2579            } else {
2580                let manifest = manifest_opt.as_ref().expect("non-multipart guarded above");
2581                self.registry
2582                    .decompress(bytes, manifest)
2583                    .await
2584                    .map_err(internal("registry decompress"))?
2585            };
2586
2587            // Range request があれば slice。なければ full body を返す。
2588            let total_size = decompressed.len() as u64;
2589            let (final_bytes, status_override) = if let Some(r) = range_request.as_ref() {
2590                let (start, end) = resolve_range(r, total_size)
2591                    .map_err(|e| S3Error::with_message(S3ErrorCode::InvalidRange, e))?;
2592                let sliced = decompressed.slice(start as usize..end as usize);
2593                resp.output.content_range = Some(format!(
2594                    "bytes {start}-{}/{total_size}",
2595                    end.saturating_sub(1)
2596                ));
2597                (sliced, Some(http::StatusCode::PARTIAL_CONTENT))
2598            } else {
2599                (decompressed, None)
2600            };
2601            // 解凍後の真のサイズを返す (S3 client は content_length を信頼するので
2602            // 圧縮 size のままだと downstream が body を途中で切ってしまう)
2603            resp.output.content_length = Some(final_bytes.len() as i64);
2604            // 圧縮済 bytes の checksum を返すと AWS SDK 側で StreamingError
2605            // (ChecksumMismatch) になる。ETag も backend が返した「圧縮済 bytes の
2606            // MD5/checksum」なので意味的にズレる — クリアして S4 自身の crc32c
2607            // (manifest 内 / frame 内) で integrity を保証する設計にする。
2608            resp.output.checksum_crc32 = None;
2609            resp.output.checksum_crc32c = None;
2610            resp.output.checksum_crc64nvme = None;
2611            resp.output.checksum_sha1 = None;
2612            resp.output.checksum_sha256 = None;
2613            resp.output.e_tag = None;
2614            let returned_size = final_bytes.len() as u64;
2615            let codec_label = manifest_opt
2616                .as_ref()
2617                .map(|m| m.codec.as_str())
2618                .unwrap_or("multipart");
2619            resp.output.body = Some(bytes_to_blob(final_bytes));
2620            if let Some(status) = status_override {
2621                resp.status = Some(status);
2622            }
2623            let elapsed = get_start.elapsed();
2624            crate::metrics::record_get(codec_label, 0, returned_size, elapsed.as_secs_f64(), true);
2625            info!(
2626                op = "get_object",
2627                bucket = %get_bucket,
2628                key = %get_key,
2629                codec = codec_label,
2630                bytes_out = returned_size,
2631                total_object_size = total_size,
2632                range = range_request.is_some(),
2633                path = "buffered",
2634                latency_ms = elapsed.as_millis() as u64,
2635                "S4 get completed (buffered)"
2636            );
2637        }
2638        // v0.6 #40: echo the recorded `x-amz-replication-status` so
2639        // consumers can poll progress (PENDING / COMPLETED / FAILED).
2640        if let Some(mgr) = self.replication.as_ref()
2641            && let Some(status) = mgr.lookup_status(&get_bucket, &get_key)
2642        {
2643            resp.output.replication_status =
2644                Some(s3s::dto::ReplicationStatus::from(status.as_aws_str().to_owned()));
2645        }
2646        Ok(resp)
2647    }
2648
2649    // === passthrough delegations ===
2650    async fn head_bucket(
2651        &self,
2652        req: S3Request<HeadBucketInput>,
2653    ) -> S3Result<S3Response<HeadBucketOutput>> {
2654        self.backend.head_bucket(req).await
2655    }
2656    async fn list_buckets(
2657        &self,
2658        req: S3Request<ListBucketsInput>,
2659    ) -> S3Result<S3Response<ListBucketsOutput>> {
2660        self.backend.list_buckets(req).await
2661    }
2662    async fn create_bucket(
2663        &self,
2664        req: S3Request<CreateBucketInput>,
2665    ) -> S3Result<S3Response<CreateBucketOutput>> {
2666        self.backend.create_bucket(req).await
2667    }
2668    async fn delete_bucket(
2669        &self,
2670        req: S3Request<DeleteBucketInput>,
2671    ) -> S3Result<S3Response<DeleteBucketOutput>> {
2672        self.backend.delete_bucket(req).await
2673    }
2674    async fn head_object(
2675        &self,
2676        req: S3Request<HeadObjectInput>,
2677    ) -> S3Result<S3Response<HeadObjectOutput>> {
2678        // v0.6 #40: capture bucket/key before req is consumed so the
2679        // replication-status echo can look the entry up.
2680        let head_bucket = req.input.bucket.clone();
2681        let head_key = req.input.key.clone();
2682        let mut resp = self.backend.head_object(req).await?;
2683        if let Some(manifest) = extract_manifest(&resp.output.metadata) {
2684            // 客側には decompress 後の意味のある content_length / checksum を返す。
2685            // backend が返す圧縮済 bytes の checksum / e_tag は意味が違うため除去
2686            // (S4 は manifest 内の crc32c で integrity を担保する)。
2687            resp.output.content_length = Some(manifest.original_size as i64);
2688            resp.output.checksum_crc32 = None;
2689            resp.output.checksum_crc32c = None;
2690            resp.output.checksum_crc64nvme = None;
2691            resp.output.checksum_sha1 = None;
2692            resp.output.checksum_sha256 = None;
2693            resp.output.e_tag = None;
2694        }
2695        // v0.6 #40: echo `x-amz-replication-status` (PENDING / COMPLETED
2696        // / FAILED) so consumers can poll progress without a GET.
2697        if let Some(mgr) = self.replication.as_ref()
2698            && let Some(status) = mgr.lookup_status(&head_bucket, &head_key)
2699        {
2700            resp.output.replication_status =
2701                Some(s3s::dto::ReplicationStatus::from(status.as_aws_str().to_owned()));
2702        }
2703        // v0.7 #48 BUG-4 fix: HEAD must echo SSE indicators so SDKs
2704        // and pipelines see the same posture they got on PUT. The PUT
2705        // path stamps `s4-sse-type` metadata for exactly this — HEAD
2706        // doesn't fetch the body, so it can't peek frame magic.
2707        if let Some(meta) = resp.output.metadata.as_ref()
2708            && let Some(sse_type) = meta.get("s4-sse-type")
2709        {
2710            {
2711                match sse_type.as_str() {
2712                    "aws:kms" => {
2713                        resp.output.server_side_encryption = Some(
2714                            ServerSideEncryption::from_static(ServerSideEncryption::AWS_KMS),
2715                        );
2716                        if let Some(key_id) = meta.get("s4-sse-kms-key-id") {
2717                            resp.output.ssekms_key_id = Some(key_id.clone());
2718                        }
2719                    }
2720                    _ => {
2721                        resp.output.server_side_encryption = Some(
2722                            ServerSideEncryption::from_static(ServerSideEncryption::AES256),
2723                        );
2724                        if let Some(md5) = meta.get("s4-sse-c-key-md5") {
2725                            resp.output.sse_customer_algorithm =
2726                                Some(crate::sse::SSE_C_ALGORITHM.into());
2727                            resp.output.sse_customer_key_md5 = Some(md5.clone());
2728                        }
2729                    }
2730                }
2731            }
2732        }
2733        Ok(resp)
2734    }
2735    async fn delete_object(
2736        &self,
2737        mut req: S3Request<DeleteObjectInput>,
2738    ) -> S3Result<S3Response<DeleteObjectOutput>> {
2739        let bucket = req.input.bucket.clone();
2740        let key = req.input.key.clone();
2741        self.enforce_rate_limit(&req, &bucket)?;
2742        self.enforce_policy(&req, "s3:DeleteObject", &bucket, Some(&key))?;
2743        // v0.6 #42: MFA Delete enforcement. When the bucket has
2744        // MFA-Delete = Enabled, every DELETE / DELETE-version /
2745        // delete-marker form needs `x-amz-mfa: <serial> <code>` (RFC 6238
2746        // 6-digit TOTP). Runs *before* the WORM / versioning routers so
2747        // a missing token is denied for free regardless of which delete
2748        // path the request would otherwise take.
2749        if let Some(mgr) = self.mfa_delete.as_ref()
2750            && mgr.is_enabled(&bucket)
2751        {
2752            let header = req.input.mfa.as_deref();
2753            if let Err(e) = crate::mfa::check_mfa(&bucket, header, mgr, current_unix_secs()) {
2754                crate::metrics::record_mfa_delete_denial(&bucket);
2755                return Err(mfa_error_to_s3(e));
2756            }
2757        }
2758        // v0.5 #30: refuse the delete while a WORM lock is in effect.
2759        // Compliance can never be bypassed; Governance can be overridden
2760        // via `x-amz-bypass-governance-retention: true`; legal hold
2761        // never. The check happens before the versioning router so a
2762        // locked object can't be soft-deleted (delete-marker push) on an
2763        // Enabled bucket either — S3 spec says lock applies to all
2764        // delete forms.
2765        if let Some(mgr) = self.object_lock.as_ref()
2766            && let Some(state) = mgr.get(&bucket, &key)
2767        {
2768            let bypass = req.input.bypass_governance_retention.unwrap_or(false);
2769            let now = chrono::Utc::now();
2770            if !state.can_delete(now, bypass) {
2771                crate::metrics::record_policy_denial("s3:DeleteObject", &bucket);
2772                return Err(S3Error::with_message(
2773                    S3ErrorCode::AccessDenied,
2774                    "Access Denied because object protected by object lock",
2775                ));
2776            }
2777        }
2778        // v0.5 #34: route DELETE through the VersioningManager when the
2779        // bucket is in a versioning-aware state.
2780        //
2781        // - Enabled bucket, no version_id → push a delete marker into
2782        //   the chain. NO backend object is touched (older versions
2783        //   stay reachable via specific-version GET).
2784        // - Enabled / Suspended bucket, with version_id → physical
2785        //   delete. Backend bytes at the shadow key (or `<key>` for
2786        //   `null`) are removed; chain entry is dropped. If the deleted
2787        //   entry was a delete marker, no backend bytes exist for it
2788        //   (record-only).
2789        // - Suspended bucket, no version_id → push a "null" delete
2790        //   marker (S3 spec); backend bytes at `<key>` are physically
2791        //   removed (same as legacy).
2792        // - Unversioned bucket → fall through to legacy passthrough.
2793        if let Some(mgr) = self.versioning.as_ref() {
2794            let state = mgr.state(&bucket);
2795            if state != crate::versioning::VersioningState::Unversioned {
2796                let req_vid = req.input.version_id.take();
2797                if let Some(vid) = req_vid {
2798                    // Specific-version DELETE: touch backend bytes only
2799                    // when the entry was a real version (not a delete
2800                    // marker, which has no backend bytes).
2801                    let outcome = mgr.record_delete_specific(&bucket, &key, &vid);
2802                    let backend_target = if vid == crate::versioning::NULL_VERSION_ID {
2803                        key.clone()
2804                    } else {
2805                        versioned_shadow_key(&key, &vid)
2806                    };
2807                    let was_real_version = outcome
2808                        .as_ref()
2809                        .map(|o| !o.is_delete_marker)
2810                        .unwrap_or(false);
2811                    if was_real_version {
2812                        // Best-effort backend cleanup; missing bytes
2813                        // are not an error (e.g. shadow key already
2814                        // GC'd).
2815                        let backend_input = DeleteObjectInput {
2816                            bucket: bucket.clone(),
2817                            key: backend_target,
2818                            ..Default::default()
2819                        };
2820                        let backend_req = S3Request {
2821                            input: backend_input,
2822                            method: http::Method::DELETE,
2823                            uri: req.uri.clone(),
2824                            headers: req.headers.clone(),
2825                            extensions: http::Extensions::new(),
2826                            credentials: req.credentials.clone(),
2827                            region: req.region.clone(),
2828                            service: req.service.clone(),
2829                            trailing_headers: None,
2830                        };
2831                        let _ = self.backend.delete_object(backend_req).await;
2832                    }
2833                    let mut output = DeleteObjectOutput {
2834                        version_id: Some(vid.clone()),
2835                        ..Default::default()
2836                    };
2837                    if let Some(o) = outcome.as_ref()
2838                        && o.is_delete_marker
2839                    {
2840                        output.delete_marker = Some(true);
2841                    }
2842                    // v0.6 #35: specific-version DELETE always counts as
2843                    // a hard `ObjectRemoved:Delete` event (the chain
2844                    // entry, marker or not, is gone after this call).
2845                    self.fire_delete_notification(
2846                        &bucket,
2847                        &key,
2848                        crate::notifications::EventType::ObjectRemovedDelete,
2849                        Some(vid.clone()),
2850                    );
2851                    return Ok(S3Response::new(output));
2852                }
2853                // No version_id: record a delete marker (state-aware).
2854                let outcome = mgr.record_delete(&bucket, &key);
2855                if state == crate::versioning::VersioningState::Suspended {
2856                    // Suspended buckets also evict the prior `<key>`
2857                    // bytes (the previous null version is gone too).
2858                    let backend_input = DeleteObjectInput {
2859                        bucket: bucket.clone(),
2860                        key: key.clone(),
2861                        ..Default::default()
2862                    };
2863                    let backend_req = S3Request {
2864                        input: backend_input,
2865                        method: http::Method::DELETE,
2866                        uri: req.uri.clone(),
2867                        headers: req.headers.clone(),
2868                        extensions: http::Extensions::new(),
2869                        credentials: req.credentials.clone(),
2870                        region: req.region.clone(),
2871                        service: req.service.clone(),
2872                        trailing_headers: None,
2873                    };
2874                    let _ = self.backend.delete_object(backend_req).await;
2875                }
2876                let output = DeleteObjectOutput {
2877                    delete_marker: Some(true),
2878                    version_id: outcome.version_id.clone(),
2879                    ..Default::default()
2880                };
2881                // v0.6 #35: versioned bucket DELETE without a version-id
2882                // creates a delete marker — the dedicated AWS event
2883                // taxonomy entry. Suspended-state buckets also push a
2884                // (null) marker, so the same event fires there.
2885                self.fire_delete_notification(
2886                    &bucket,
2887                    &key,
2888                    crate::notifications::EventType::ObjectRemovedDeleteMarker,
2889                    outcome.version_id,
2890                );
2891                return Ok(S3Response::new(output));
2892            }
2893        }
2894        // Legacy / Unversioned path: physical delete on the backend +
2895        // best-effort sidecar cleanup (mirrors v0.4 behaviour).
2896        let resp = self.backend.delete_object(req).await?;
2897        // v0.5 #30: drop any per-object lock state once the delete has
2898        // succeeded so the freed key can be re-armed by a future PUT
2899        // under the bucket default. Reaching here implies the lock had
2900        // already passed `can_delete` above, so this is purely cleanup.
2901        if let Some(mgr) = self.object_lock.as_ref() {
2902            mgr.clear(&bucket, &key);
2903        }
2904        // v0.6 #39: drop any object-level tag set on physical delete —
2905        // the freed key starts a fresh tag history if a future PUT
2906        // re-creates it. (Versioned-delete branches above return early
2907        // and do NOT touch tags, mirroring AWS where tag state is
2908        // attached to the logical key, not the version chain.)
2909        if let Some(mgr) = self.tagging.as_ref() {
2910            mgr.delete_object_tags(&bucket, &key);
2911        }
2912        let sidecar = sidecar_key(&key);
2913        // v0.7 #49: skip the sidecar DELETE if the key + sidecar suffix
2914        // can't be encoded into a request URI — the primary delete
2915        // already succeeded and a stale sidecar is harmless (Range GET
2916        // re-validates the underlying object on next read).
2917        if let Ok(uri) = safe_object_uri(&bucket, &sidecar) {
2918            let sidecar_input = DeleteObjectInput {
2919                bucket: bucket.clone(),
2920                key: sidecar,
2921                ..Default::default()
2922            };
2923            let sidecar_req = S3Request {
2924                input: sidecar_input,
2925                method: http::Method::DELETE,
2926                uri,
2927                headers: http::HeaderMap::new(),
2928                extensions: http::Extensions::new(),
2929                credentials: None,
2930                region: None,
2931                service: None,
2932                trailing_headers: None,
2933            };
2934            let _ = self.backend.delete_object(sidecar_req).await;
2935        }
2936        // v0.6 #35: legacy unversioned-bucket hard delete fires the
2937        // canonical `ObjectRemoved:Delete` event.
2938        self.fire_delete_notification(
2939            &bucket,
2940            &key,
2941            crate::notifications::EventType::ObjectRemovedDelete,
2942            None,
2943        );
2944        Ok(resp)
2945    }
2946    async fn delete_objects(
2947        &self,
2948        req: S3Request<DeleteObjectsInput>,
2949    ) -> S3Result<S3Response<DeleteObjectsOutput>> {
2950        // v0.6 #42: MFA Delete applies once to the whole batch (S3 spec:
2951        // when MFA-Delete is on the bucket, a missing / invalid token
2952        // fails the entire DeleteObjects request, not per-object).
2953        if let Some(mgr) = self.mfa_delete.as_ref()
2954            && mgr.is_enabled(&req.input.bucket)
2955        {
2956            let header = req.input.mfa.as_deref();
2957            if let Err(e) =
2958                crate::mfa::check_mfa(&req.input.bucket, header, mgr, current_unix_secs())
2959            {
2960                crate::metrics::record_mfa_delete_denial(&req.input.bucket);
2961                return Err(mfa_error_to_s3(e));
2962            }
2963        }
2964        self.backend.delete_objects(req).await
2965    }
2966    async fn copy_object(
2967        &self,
2968        mut req: S3Request<CopyObjectInput>,
2969    ) -> S3Result<S3Response<CopyObjectOutput>> {
2970        // copy is conceptually "GetObject src + PutObject dst" — enforce both.
2971        let dst_bucket = req.input.bucket.clone();
2972        let dst_key = req.input.key.clone();
2973        self.enforce_policy(&req, "s3:PutObject", &dst_bucket, Some(&dst_key))?;
2974        if let CopySource::Bucket { bucket, key, .. } = &req.input.copy_source {
2975            self.enforce_policy(&req, "s3:GetObject", bucket, Some(key))?;
2976        }
2977        // S4-aware copy: source object に s4-* metadata がある場合、それを
2978        // destination に確実に preserve する。
2979        //
2980        // - MetadataDirective::COPY (default): backend が source metadata を
2981        //   そのまま copy するので S4 metadata も自動で渡る。介入不要
2982        // - MetadataDirective::REPLACE: 客が指定した metadata で source を
2983        //   上書き → s4-* metadata が消えると destination は decompress 不能に
2984        //   なる (silent corruption)。S4 が source metadata を HEAD で取得し、
2985        //   s4-* fields を input.metadata に強制 merge する
2986        let needs_merge = req
2987            .input
2988            .metadata_directive
2989            .as_ref()
2990            .map(|d| d.as_str() == MetadataDirective::REPLACE)
2991            .unwrap_or(false);
2992        if needs_merge && let CopySource::Bucket { bucket, key, .. } = &req.input.copy_source {
2993            let head_input = HeadObjectInput {
2994                bucket: bucket.to_string(),
2995                key: key.to_string(),
2996                ..Default::default()
2997            };
2998            let head_req = S3Request {
2999                input: head_input,
3000                method: req.method.clone(),
3001                uri: req.uri.clone(),
3002                headers: req.headers.clone(),
3003                extensions: http::Extensions::new(),
3004                credentials: req.credentials.clone(),
3005                region: req.region.clone(),
3006                service: req.service.clone(),
3007                trailing_headers: None,
3008            };
3009            if let Ok(head) = self.backend.head_object(head_req).await
3010                && let Some(src_meta) = head.output.metadata.as_ref()
3011            {
3012                let dest_meta = req.input.metadata.get_or_insert_with(Default::default);
3013                for key in [
3014                    META_CODEC,
3015                    META_ORIGINAL_SIZE,
3016                    META_COMPRESSED_SIZE,
3017                    META_CRC32C,
3018                    META_MULTIPART,
3019                    META_FRAMED,
3020                ] {
3021                    if let Some(v) = src_meta.get(key) {
3022                        // 客が同じ key を指定していたら preserve しない (= 上書き許可)
3023                        // していたら何もしない。指定していなければ insert
3024                        dest_meta
3025                            .entry(key.to_string())
3026                            .or_insert_with(|| v.clone());
3027                    }
3028                }
3029                debug!(
3030                    src_bucket = %bucket,
3031                    src_key = %key,
3032                    "S4 copy_object: preserved s4-* metadata across REPLACE directive"
3033                );
3034            }
3035        }
3036        self.backend.copy_object(req).await
3037    }
3038    async fn list_objects(
3039        &self,
3040        req: S3Request<ListObjectsInput>,
3041    ) -> S3Result<S3Response<ListObjectsOutput>> {
3042        self.enforce_rate_limit(&req, &req.input.bucket)?;
3043        self.enforce_policy(&req, "s3:ListBucket", &req.input.bucket, None)?;
3044        let mut resp = self.backend.list_objects(req).await?;
3045        // S4 内部 object (`*.s4index` sidecar、`.__s4ver__/` shadow versions
3046        // — v0.5 #34) を顧客から隠す。
3047        if let Some(contents) = resp.output.contents.as_mut() {
3048            contents.retain(|o| {
3049                o.key
3050                    .as_ref()
3051                    .map(|k| !k.ends_with(".s4index") && !is_versioning_shadow_key(k))
3052                    .unwrap_or(true)
3053            });
3054        }
3055        Ok(resp)
3056    }
3057    async fn list_objects_v2(
3058        &self,
3059        req: S3Request<ListObjectsV2Input>,
3060    ) -> S3Result<S3Response<ListObjectsV2Output>> {
3061        self.enforce_rate_limit(&req, &req.input.bucket)?;
3062        self.enforce_policy(&req, "s3:ListBucket", &req.input.bucket, None)?;
3063        let mut resp = self.backend.list_objects_v2(req).await?;
3064        if let Some(contents) = resp.output.contents.as_mut() {
3065            let before = contents.len();
3066            contents.retain(|o| {
3067                o.key
3068                    .as_ref()
3069                    .map(|k| !k.ends_with(".s4index") && !is_versioning_shadow_key(k))
3070                    .unwrap_or(true)
3071            });
3072            // key_count も補正 (S3 spec compliance)
3073            if let Some(kc) = resp.output.key_count.as_mut() {
3074                *kc -= (before - contents.len()) as i32;
3075            }
3076        }
3077        Ok(resp)
3078    }
3079    /// v0.4 #17: filter S4-internal sidecars from versioned listings.
3080    /// v0.5 #34: when a [`crate::versioning::VersioningManager`] is
3081    /// attached AND the bucket is in a versioning-aware state, build
3082    /// the `Versions` / `DeleteMarkers` arrays directly from the
3083    /// in-memory chain (paginated + ordered the S3 way: key asc,
3084    /// version newest-first inside each key). Otherwise fall back to
3085    /// passthrough + sidecar-filter (legacy v0.4 behaviour).
3086    async fn list_object_versions(
3087        &self,
3088        req: S3Request<ListObjectVersionsInput>,
3089    ) -> S3Result<S3Response<ListObjectVersionsOutput>> {
3090        self.enforce_rate_limit(&req, &req.input.bucket)?;
3091        self.enforce_policy(&req, "s3:ListBucket", &req.input.bucket, None)?;
3092        // v0.5 #34: VersioningManager-owned path.
3093        if let Some(mgr) = self.versioning.as_ref()
3094            && mgr.state(&req.input.bucket) != crate::versioning::VersioningState::Unversioned
3095        {
3096            let max_keys = req.input.max_keys.unwrap_or(1000) as usize;
3097            let page = mgr.list_versions(
3098                &req.input.bucket,
3099                req.input.prefix.as_deref(),
3100                req.input.key_marker.as_deref(),
3101                req.input.version_id_marker.as_deref(),
3102                max_keys,
3103            );
3104            let versions: Vec<ObjectVersion> = page
3105                .versions
3106                .into_iter()
3107                .map(|e| ObjectVersion {
3108                    key: Some(e.key),
3109                    version_id: Some(e.version_id),
3110                    is_latest: Some(e.is_latest),
3111                    e_tag: Some(ETag::Strong(e.etag)),
3112                    size: Some(e.size as i64),
3113                    last_modified: Some(std::time::SystemTime::from(e.last_modified).into()),
3114                    ..Default::default()
3115                })
3116                .collect();
3117            let delete_markers: Vec<DeleteMarkerEntry> = page
3118                .delete_markers
3119                .into_iter()
3120                .map(|e| DeleteMarkerEntry {
3121                    key: Some(e.key),
3122                    version_id: Some(e.version_id),
3123                    is_latest: Some(e.is_latest),
3124                    last_modified: Some(std::time::SystemTime::from(e.last_modified).into()),
3125                    ..Default::default()
3126                })
3127                .collect();
3128            let output = ListObjectVersionsOutput {
3129                name: Some(req.input.bucket.clone()),
3130                prefix: req.input.prefix.clone(),
3131                key_marker: req.input.key_marker.clone(),
3132                version_id_marker: req.input.version_id_marker.clone(),
3133                max_keys: req.input.max_keys,
3134                versions: if versions.is_empty() {
3135                    None
3136                } else {
3137                    Some(versions)
3138                },
3139                delete_markers: if delete_markers.is_empty() {
3140                    None
3141                } else {
3142                    Some(delete_markers)
3143                },
3144                is_truncated: Some(page.is_truncated),
3145                next_key_marker: page.next_key_marker,
3146                next_version_id_marker: page.next_version_id_marker,
3147                ..Default::default()
3148            };
3149            return Ok(S3Response::new(output));
3150        }
3151        // Legacy passthrough path (v0.4 #17 sidecar filter retained).
3152        let mut resp = self.backend.list_object_versions(req).await?;
3153        if let Some(versions) = resp.output.versions.as_mut() {
3154            versions.retain(|v| {
3155                v.key
3156                    .as_ref()
3157                    .map(|k| !k.ends_with(".s4index") && !is_versioning_shadow_key(k))
3158                    .unwrap_or(true)
3159            });
3160        }
3161        if let Some(markers) = resp.output.delete_markers.as_mut() {
3162            markers.retain(|m| {
3163                m.key
3164                    .as_ref()
3165                    .map(|k| !k.ends_with(".s4index") && !is_versioning_shadow_key(k))
3166                    .unwrap_or(true)
3167            });
3168        }
3169        Ok(resp)
3170    }
3171
3172    async fn create_multipart_upload(
3173        &self,
3174        mut req: S3Request<CreateMultipartUploadInput>,
3175    ) -> S3Result<S3Response<CreateMultipartUploadOutput>> {
3176        // Multipart object は per-part 圧縮 + frame 形式で書く。GET 時に
3177        // frame parse を起動するため、object metadata に flag を立てる。
3178        // codec は dispatcher の default kind を採用 (per-part 別 codec は Phase 2)。
3179        let codec_kind = self.registry.default_kind();
3180        let meta = req.input.metadata.get_or_insert_with(Default::default);
3181        meta.insert(META_MULTIPART.into(), "true".into());
3182        meta.insert(META_CODEC.into(), codec_kind.as_str().into());
3183        debug!(
3184            bucket = ?req.input.bucket,
3185            key = ?req.input.key,
3186            codec = codec_kind.as_str(),
3187            "S4 create_multipart_upload: marking object for per-part compression"
3188        );
3189        self.backend.create_multipart_upload(req).await
3190    }
3191
3192    async fn upload_part(
3193        &self,
3194        mut req: S3Request<UploadPartInput>,
3195    ) -> S3Result<S3Response<UploadPartOutput>> {
3196        // 各 part を圧縮して frame header 付きで forward。GET 時に
3197        // `decompress_multipart` が frame iter で順に解凍する。
3198        // **per-part codec dispatch**: dispatcher が body 先頭 sample から
3199        // codec を選ぶので、parquet 風の mixed-content multipart で part ごとに
3200        // 最適 codec を使える (整数列 part → Bitcomp、text 列 part → zstd 等)。
3201        if let Some(blob) = req.input.body.take() {
3202            let bytes = collect_blob(blob, self.max_body_bytes)
3203                .await
3204                .map_err(internal("collect upload_part body"))?;
3205            let sample_len = bytes.len().min(SAMPLE_BYTES);
3206            let codec_kind = self.dispatcher.pick(&bytes[..sample_len]).await;
3207            let original_size = bytes.len() as u64;
3208            let (compressed, manifest) = self
3209                .registry
3210                .compress(bytes, codec_kind)
3211                .await
3212                .map_err(internal("registry compress part"))?;
3213            let header = FrameHeader {
3214                codec: codec_kind,
3215                original_size,
3216                compressed_size: compressed.len() as u64,
3217                crc32c: manifest.crc32c,
3218            };
3219            let mut framed = BytesMut::with_capacity(FRAME_HEADER_BYTES + compressed.len());
3220            write_frame(&mut framed, header, &compressed);
3221            // v0.2 #5: heuristic-based padding skip for likely-final parts.
3222            //
3223            // AWS SDK / aws-cli / boto3 always send the final (and only the
3224            // final) part below the configured part_size. So if the raw user
3225            // part is already smaller than S3's 5 MiB multipart minimum, this
3226            // is overwhelmingly likely to be the final part — and the final
3227            // part is exempt from S3's size constraint. Skipping padding here
3228            // saves up to ~5 MiB per object on highly compressible workloads.
3229            //
3230            // If a misbehaving client sends a tiny **non-final** part, S3
3231            // itself rejects with EntityTooSmall at CompleteMultipartUpload —
3232            // identical outcome to a vanilla S3 PUT, just earlier than
3233            // padding-then-complete would catch it.
3234            let likely_final = original_size < S3_MULTIPART_MIN_PART_BYTES as u64;
3235            if !likely_final {
3236                pad_to_minimum(&mut framed, S3_MULTIPART_MIN_PART_BYTES);
3237            }
3238            let framed_bytes = framed.freeze();
3239            let new_len = framed_bytes.len() as i64;
3240            // 同じ wire 互換問題が multipart にもある (content-length / checksum)
3241            req.input.content_length = Some(new_len);
3242            req.input.checksum_algorithm = None;
3243            req.input.checksum_crc32 = None;
3244            req.input.checksum_crc32c = None;
3245            req.input.checksum_crc64nvme = None;
3246            req.input.checksum_sha1 = None;
3247            req.input.checksum_sha256 = None;
3248            req.input.content_md5 = None;
3249            req.input.body = Some(bytes_to_blob(framed_bytes));
3250            debug!(
3251                part_number = ?req.input.part_number,
3252                upload_id = ?req.input.upload_id,
3253                original_size,
3254                framed_size = new_len,
3255                "S4 upload_part: framed compressed payload"
3256            );
3257        }
3258        self.backend.upload_part(req).await
3259    }
3260    async fn complete_multipart_upload(
3261        &self,
3262        req: S3Request<CompleteMultipartUploadInput>,
3263    ) -> S3Result<S3Response<CompleteMultipartUploadOutput>> {
3264        let bucket = req.input.bucket.clone();
3265        let key = req.input.key.clone();
3266        let resp = self.backend.complete_multipart_upload(req).await?;
3267        // CompleteMultipartUpload 成功 → 完成した object を full fetch して frame
3268        // index を build、`<key>.s4index` sidecar として保存。これで Range GET の
3269        // partial fetch path が利用可能になる (Range request の帯域節約)。
3270        // 注: 巨大 object の場合この pass は重いが、Range query は一度 sidecar が
3271        // できれば爆速になるので 1 回の cost は payback される
3272        let bucket_clone = bucket.clone();
3273        let key_clone = key.clone();
3274        // v0.7 #49: percent-encode the synthetic GET URI before
3275        // re-entering the backend; if the key is genuinely
3276        // un-encodable we just skip the sidecar build (Complete returns
3277        // the original success response — Range GETs degrade to a full
3278        // read on the missing sidecar, which is the existing behaviour
3279        // for any object < the streaming threshold).
3280        if let Ok(uri) = safe_object_uri(&bucket_clone, &key_clone) {
3281            let get_input = GetObjectInput {
3282                bucket: bucket_clone.clone(),
3283                key: key_clone.clone(),
3284                ..Default::default()
3285            };
3286            let get_req = S3Request {
3287                input: get_input,
3288                method: http::Method::GET,
3289                uri,
3290                headers: http::HeaderMap::new(),
3291                extensions: http::Extensions::new(),
3292                credentials: None,
3293                region: None,
3294                service: None,
3295                trailing_headers: None,
3296            };
3297            if let Ok(get_resp) = self.backend.get_object(get_req).await
3298                && let Some(blob) = get_resp.output.body
3299                && let Ok(body) = collect_blob(blob, self.max_body_bytes).await
3300                && let Ok(index) = build_index_from_body(&body)
3301            {
3302                self.write_sidecar(&bucket, &key, &index).await;
3303            }
3304        }
3305        Ok(resp)
3306    }
3307    async fn abort_multipart_upload(
3308        &self,
3309        req: S3Request<AbortMultipartUploadInput>,
3310    ) -> S3Result<S3Response<AbortMultipartUploadOutput>> {
3311        self.backend.abort_multipart_upload(req).await
3312    }
3313    async fn list_multipart_uploads(
3314        &self,
3315        req: S3Request<ListMultipartUploadsInput>,
3316    ) -> S3Result<S3Response<ListMultipartUploadsOutput>> {
3317        self.backend.list_multipart_uploads(req).await
3318    }
3319    async fn list_parts(
3320        &self,
3321        req: S3Request<ListPartsInput>,
3322    ) -> S3Result<S3Response<ListPartsOutput>> {
3323        self.backend.list_parts(req).await
3324    }
3325
3326    // =========================================================================
3327    // Phase 2 — pure passthrough delegations。S4 はこれらに対して圧縮 hook を
3328    // 持たないので、backend (= AWS S3) の動作と完全に同一。
3329    //
3330    // 既知の制限事項:
3331    // - copy_object / upload_part_copy: source object が S4-compressed の場合、
3332    //   backend が bytes を copy するだけなので metadata (s4-codec etc) も一緒に
3333    //   coppied される (AWS S3 default = MetadataDirective COPY)。GET は manifest
3334    //   経由で正しく decompress できる。MetadataDirective REPLACE で上書き
3335    //   されると圧縮 metadata が消えて壊れる — 顧客側の運用で注意
3336    // - list_object_versions: versioning enabled bucket では各 version も S4
3337    //   metadata を維持する。古い version も S4 経由で正しく GET できる。
3338    // =========================================================================
3339
3340    // ---- Object ACL / tagging / attributes ----
3341    async fn get_object_acl(
3342        &self,
3343        req: S3Request<GetObjectAclInput>,
3344    ) -> S3Result<S3Response<GetObjectAclOutput>> {
3345        self.backend.get_object_acl(req).await
3346    }
3347    async fn put_object_acl(
3348        &self,
3349        req: S3Request<PutObjectAclInput>,
3350    ) -> S3Result<S3Response<PutObjectAclOutput>> {
3351        self.backend.put_object_acl(req).await
3352    }
3353    // v0.6 #39: object tagging — when a `TagManager` is attached the
3354    // configuration / per-(bucket, key) state lives in the manager and
3355    // these handlers serve directly from it; when no manager is
3356    // attached they fall back to the backend (legacy passthrough so
3357    // v0.5 deployments are unaffected).
3358    async fn get_object_tagging(
3359        &self,
3360        req: S3Request<GetObjectTaggingInput>,
3361    ) -> S3Result<S3Response<GetObjectTaggingOutput>> {
3362        let Some(mgr) = self.tagging.as_ref() else {
3363            return self.backend.get_object_tagging(req).await;
3364        };
3365        let tags = mgr
3366            .get_object_tags(&req.input.bucket, &req.input.key)
3367            .unwrap_or_default();
3368        Ok(S3Response::new(GetObjectTaggingOutput {
3369            tag_set: tagset_to_aws(&tags),
3370            ..Default::default()
3371        }))
3372    }
3373    async fn put_object_tagging(
3374        &self,
3375        req: S3Request<PutObjectTaggingInput>,
3376    ) -> S3Result<S3Response<PutObjectTaggingOutput>> {
3377        let Some(mgr) = self.tagging.as_ref() else {
3378            return self.backend.put_object_tagging(req).await;
3379        };
3380        let bucket = req.input.bucket.clone();
3381        let key = req.input.key.clone();
3382        let parsed = aws_to_tagset(&req.input.tagging.tag_set).map_err(|e| {
3383            S3Error::with_message(S3ErrorCode::InvalidArgument, e.to_string())
3384        })?;
3385        // v0.6 #39: gate via IAM policy with both the request tags
3386        // (`s3:RequestObjectTag/<key>`) and any existing tags on the
3387        // target object (`s3:ExistingObjectTag/<key>`).
3388        let existing = mgr.get_object_tags(&bucket, &key);
3389        self.enforce_policy_with_extra(
3390            &req,
3391            "s3:PutObjectTagging",
3392            &bucket,
3393            Some(&key),
3394            Some(&parsed),
3395            existing.as_ref(),
3396        )?;
3397        mgr.put_object_tags(&bucket, &key, parsed);
3398        Ok(S3Response::new(PutObjectTaggingOutput::default()))
3399    }
3400    async fn delete_object_tagging(
3401        &self,
3402        req: S3Request<DeleteObjectTaggingInput>,
3403    ) -> S3Result<S3Response<DeleteObjectTaggingOutput>> {
3404        let Some(mgr) = self.tagging.as_ref() else {
3405            return self.backend.delete_object_tagging(req).await;
3406        };
3407        let bucket = req.input.bucket.clone();
3408        let key = req.input.key.clone();
3409        let existing = mgr.get_object_tags(&bucket, &key);
3410        self.enforce_policy_with_extra(
3411            &req,
3412            "s3:DeleteObjectTagging",
3413            &bucket,
3414            Some(&key),
3415            None,
3416            existing.as_ref(),
3417        )?;
3418        mgr.delete_object_tags(&bucket, &key);
3419        Ok(S3Response::new(DeleteObjectTaggingOutput::default()))
3420    }
3421    async fn get_object_attributes(
3422        &self,
3423        req: S3Request<GetObjectAttributesInput>,
3424    ) -> S3Result<S3Response<GetObjectAttributesOutput>> {
3425        self.backend.get_object_attributes(req).await
3426    }
3427    async fn restore_object(
3428        &self,
3429        req: S3Request<RestoreObjectInput>,
3430    ) -> S3Result<S3Response<RestoreObjectOutput>> {
3431        self.backend.restore_object(req).await
3432    }
3433    async fn upload_part_copy(
3434        &self,
3435        req: S3Request<UploadPartCopyInput>,
3436    ) -> S3Result<S3Response<UploadPartCopyOutput>> {
3437        // v0.2 #6: byte-range aware copy when the source is S4-framed.
3438        //
3439        // For a framed source (multipart upload OR single-PUT framed-v2),
3440        // a naive byte-range passthrough would copy compressed bytes that
3441        // don't align with S4 frame boundaries — silently corrupting the
3442        // result. Instead we GET the source through S4 (which handles
3443        // decompression + Range), re-compress + re-frame as a new part,
3444        // and forward as upload_part. For non-framed sources (S4-untouched
3445        // raw objects), passthrough is correct and we keep the original
3446        // (cheaper) code path.
3447        let CopySource::Bucket {
3448            bucket: src_bucket,
3449            key: src_key,
3450            ..
3451        } = &req.input.copy_source
3452        else {
3453            return self.backend.upload_part_copy(req).await;
3454        };
3455        let src_bucket = src_bucket.to_string();
3456        let src_key = src_key.to_string();
3457
3458        // Probe metadata to decide whether the source needs S4-aware copy.
3459        let head_input = HeadObjectInput {
3460            bucket: src_bucket.clone(),
3461            key: src_key.clone(),
3462            ..Default::default()
3463        };
3464        let head_req = S3Request {
3465            input: head_input,
3466            method: http::Method::HEAD,
3467            uri: req.uri.clone(),
3468            headers: req.headers.clone(),
3469            extensions: http::Extensions::new(),
3470            credentials: req.credentials.clone(),
3471            region: req.region.clone(),
3472            service: req.service.clone(),
3473            trailing_headers: None,
3474        };
3475        let needs_s4_copy = match self.backend.head_object(head_req).await {
3476            Ok(h) => {
3477                is_multipart_object(&h.output.metadata) || is_framed_v2_object(&h.output.metadata)
3478            }
3479            Err(_) => false,
3480        };
3481        if !needs_s4_copy {
3482            return self.backend.upload_part_copy(req).await;
3483        }
3484
3485        // Resolve the optional source byte range to pass to GET.
3486        let source_range = req
3487            .input
3488            .copy_source_range
3489            .as_ref()
3490            .map(|r| parse_copy_source_range(r))
3491            .transpose()
3492            .map_err(|e| S3Error::with_message(S3ErrorCode::InvalidRange, e))?;
3493
3494        // GET source via S4 (handles decompression + sidecar partial fetch
3495        // when range is present). The result is the requested user-visible
3496        // byte range, fully decompressed.
3497        let mut get_input = GetObjectInput {
3498            bucket: src_bucket.clone(),
3499            key: src_key.clone(),
3500            ..Default::default()
3501        };
3502        get_input.range = source_range;
3503        let get_req = S3Request {
3504            input: get_input,
3505            method: http::Method::GET,
3506            uri: req.uri.clone(),
3507            headers: req.headers.clone(),
3508            extensions: http::Extensions::new(),
3509            credentials: req.credentials.clone(),
3510            region: req.region.clone(),
3511            service: req.service.clone(),
3512            trailing_headers: None,
3513        };
3514        let get_resp = self.get_object(get_req).await?;
3515        let blob = get_resp.output.body.ok_or_else(|| {
3516            S3Error::with_message(
3517                S3ErrorCode::InternalError,
3518                "upload_part_copy: empty body from source GET",
3519            )
3520        })?;
3521        let bytes = collect_blob(blob, self.max_body_bytes)
3522            .await
3523            .map_err(internal("collect upload_part_copy source body"))?;
3524
3525        // Compress + frame as a fresh part (mirrors upload_part path).
3526        let sample_len = bytes.len().min(SAMPLE_BYTES);
3527        let codec_kind = self.dispatcher.pick(&bytes[..sample_len]).await;
3528        let original_size = bytes.len() as u64;
3529        let (compressed, manifest) = self
3530            .registry
3531            .compress(bytes, codec_kind)
3532            .await
3533            .map_err(internal("registry compress upload_part_copy"))?;
3534        let header = FrameHeader {
3535            codec: codec_kind,
3536            original_size,
3537            compressed_size: compressed.len() as u64,
3538            crc32c: manifest.crc32c,
3539        };
3540        let mut framed = BytesMut::with_capacity(FRAME_HEADER_BYTES + compressed.len());
3541        write_frame(&mut framed, header, &compressed);
3542        let likely_final = original_size < S3_MULTIPART_MIN_PART_BYTES as u64;
3543        if !likely_final {
3544            pad_to_minimum(&mut framed, S3_MULTIPART_MIN_PART_BYTES);
3545        }
3546        let framed_bytes = framed.freeze();
3547        let framed_len = framed_bytes.len() as i64;
3548
3549        // Forward as upload_part to the destination multipart upload.
3550        let part_input = UploadPartInput {
3551            bucket: req.input.bucket.clone(),
3552            key: req.input.key.clone(),
3553            part_number: req.input.part_number,
3554            upload_id: req.input.upload_id.clone(),
3555            body: Some(bytes_to_blob(framed_bytes)),
3556            content_length: Some(framed_len),
3557            ..Default::default()
3558        };
3559        let part_req = S3Request {
3560            input: part_input,
3561            method: http::Method::PUT,
3562            uri: req.uri.clone(),
3563            headers: req.headers.clone(),
3564            extensions: http::Extensions::new(),
3565            credentials: req.credentials.clone(),
3566            region: req.region.clone(),
3567            service: req.service.clone(),
3568            trailing_headers: None,
3569        };
3570        let upload_resp = self.backend.upload_part(part_req).await?;
3571
3572        let copy_output = UploadPartCopyOutput {
3573            copy_part_result: Some(CopyPartResult {
3574                e_tag: upload_resp.output.e_tag.clone(),
3575                ..Default::default()
3576            }),
3577            ..Default::default()
3578        };
3579        Ok(S3Response::new(copy_output))
3580    }
3581
3582    // ---- Object lock / retention / legal hold (v0.5 #30) ----
3583    //
3584    // When an `ObjectLockManager` is attached the configuration / per-object
3585    // state lives in the manager and these handlers serve directly from it;
3586    // when no manager is attached they fall back to the backend (legacy
3587    // passthrough so v0.4 deployments are unaffected).
3588    async fn get_object_lock_configuration(
3589        &self,
3590        req: S3Request<GetObjectLockConfigurationInput>,
3591    ) -> S3Result<S3Response<GetObjectLockConfigurationOutput>> {
3592        if let Some(mgr) = self.object_lock.as_ref() {
3593            let cfg = mgr.bucket_default(&req.input.bucket).map(|d| {
3594                ObjectLockConfiguration {
3595                    object_lock_enabled: Some(ObjectLockEnabled::from_static(
3596                        ObjectLockEnabled::ENABLED,
3597                    )),
3598                    rule: Some(ObjectLockRule {
3599                        default_retention: Some(DefaultRetention {
3600                            days: Some(d.retention_days as i32),
3601                            mode: Some(ObjectLockRetentionMode::from_static(
3602                                match d.mode {
3603                                    crate::object_lock::LockMode::Governance => {
3604                                        ObjectLockRetentionMode::GOVERNANCE
3605                                    }
3606                                    crate::object_lock::LockMode::Compliance => {
3607                                        ObjectLockRetentionMode::COMPLIANCE
3608                                    }
3609                                },
3610                            )),
3611                            years: None,
3612                        }),
3613                    }),
3614                }
3615            });
3616            let output = GetObjectLockConfigurationOutput {
3617                object_lock_configuration: cfg,
3618            };
3619            return Ok(S3Response::new(output));
3620        }
3621        self.backend.get_object_lock_configuration(req).await
3622    }
3623    async fn put_object_lock_configuration(
3624        &self,
3625        req: S3Request<PutObjectLockConfigurationInput>,
3626    ) -> S3Result<S3Response<PutObjectLockConfigurationOutput>> {
3627        if let Some(mgr) = self.object_lock.as_ref() {
3628            let bucket = req.input.bucket.clone();
3629            if let Some(cfg) = req.input.object_lock_configuration.as_ref()
3630                && let Some(rule) = cfg.rule.as_ref()
3631                && let Some(d) = rule.default_retention.as_ref()
3632            {
3633                let mode = d
3634                    .mode
3635                    .as_ref()
3636                    .and_then(|m| crate::object_lock::LockMode::from_aws_str(m.as_str()))
3637                    .ok_or_else(|| {
3638                        S3Error::with_message(
3639                            S3ErrorCode::InvalidRequest,
3640                            "Object Lock default retention requires a valid Mode (GOVERNANCE | COMPLIANCE)",
3641                        )
3642                    })?;
3643                // S3 spec: exactly one of Days / Years (we accept Days
3644                // outright and convert Years → Days for storage; Years
3645                // is just a UX shorthand on the wire).
3646                let days: u32 = match (d.days, d.years) {
3647                    (Some(d), None) if d > 0 => d as u32,
3648                    (None, Some(y)) if y > 0 => (y as u32).saturating_mul(365),
3649                    _ => {
3650                        return Err(S3Error::with_message(
3651                            S3ErrorCode::InvalidRequest,
3652                            "Object Lock default retention requires exactly one of Days or Years (positive integer)",
3653                        ));
3654                    }
3655                };
3656                mgr.set_bucket_default(
3657                    &bucket,
3658                    crate::object_lock::BucketObjectLockDefault {
3659                        mode,
3660                        retention_days: days,
3661                    },
3662                );
3663            }
3664            return Ok(S3Response::new(PutObjectLockConfigurationOutput::default()));
3665        }
3666        self.backend.put_object_lock_configuration(req).await
3667    }
3668    async fn get_object_legal_hold(
3669        &self,
3670        req: S3Request<GetObjectLegalHoldInput>,
3671    ) -> S3Result<S3Response<GetObjectLegalHoldOutput>> {
3672        if let Some(mgr) = self.object_lock.as_ref() {
3673            let on = mgr
3674                .get(&req.input.bucket, &req.input.key)
3675                .map(|s| s.legal_hold_on)
3676                .unwrap_or(false);
3677            let status = ObjectLockLegalHoldStatus::from_static(if on {
3678                ObjectLockLegalHoldStatus::ON
3679            } else {
3680                ObjectLockLegalHoldStatus::OFF
3681            });
3682            let output = GetObjectLegalHoldOutput {
3683                legal_hold: Some(ObjectLockLegalHold {
3684                    status: Some(status),
3685                }),
3686            };
3687            return Ok(S3Response::new(output));
3688        }
3689        self.backend.get_object_legal_hold(req).await
3690    }
3691    async fn put_object_legal_hold(
3692        &self,
3693        req: S3Request<PutObjectLegalHoldInput>,
3694    ) -> S3Result<S3Response<PutObjectLegalHoldOutput>> {
3695        if let Some(mgr) = self.object_lock.as_ref() {
3696            let on = req
3697                .input
3698                .legal_hold
3699                .as_ref()
3700                .and_then(|h| h.status.as_ref())
3701                .map(|s| s.as_str().eq_ignore_ascii_case("ON"))
3702                .unwrap_or(false);
3703            mgr.set_legal_hold(&req.input.bucket, &req.input.key, on);
3704            return Ok(S3Response::new(PutObjectLegalHoldOutput::default()));
3705        }
3706        self.backend.put_object_legal_hold(req).await
3707    }
3708    async fn get_object_retention(
3709        &self,
3710        req: S3Request<GetObjectRetentionInput>,
3711    ) -> S3Result<S3Response<GetObjectRetentionOutput>> {
3712        if let Some(mgr) = self.object_lock.as_ref() {
3713            let retention = mgr
3714                .get(&req.input.bucket, &req.input.key)
3715                .filter(|s| s.mode.is_some() || s.retain_until.is_some())
3716                .map(|s| {
3717                    let mode = s.mode.map(|m| {
3718                        ObjectLockRetentionMode::from_static(match m {
3719                            crate::object_lock::LockMode::Governance => {
3720                                ObjectLockRetentionMode::GOVERNANCE
3721                            }
3722                            crate::object_lock::LockMode::Compliance => {
3723                                ObjectLockRetentionMode::COMPLIANCE
3724                            }
3725                        })
3726                    });
3727                    let until = s.retain_until.map(chrono_utc_to_timestamp);
3728                    ObjectLockRetention {
3729                        mode,
3730                        retain_until_date: until,
3731                    }
3732                });
3733            let output = GetObjectRetentionOutput { retention };
3734            return Ok(S3Response::new(output));
3735        }
3736        self.backend.get_object_retention(req).await
3737    }
3738    async fn put_object_retention(
3739        &self,
3740        req: S3Request<PutObjectRetentionInput>,
3741    ) -> S3Result<S3Response<PutObjectRetentionOutput>> {
3742        if let Some(mgr) = self.object_lock.as_ref() {
3743            let bucket = req.input.bucket.clone();
3744            let key = req.input.key.clone();
3745            let bypass = req.input.bypass_governance_retention.unwrap_or(false);
3746            let retention = req.input.retention.as_ref().ok_or_else(|| {
3747                S3Error::with_message(
3748                    S3ErrorCode::InvalidRequest,
3749                    "PutObjectRetention requires a Retention element",
3750                )
3751            })?;
3752            let new_mode = retention
3753                .mode
3754                .as_ref()
3755                .and_then(|m| crate::object_lock::LockMode::from_aws_str(m.as_str()));
3756            let new_until = retention
3757                .retain_until_date
3758                .as_ref()
3759                .map(timestamp_to_chrono_utc)
3760                .unwrap_or(None);
3761            let now = chrono::Utc::now();
3762            let existing = mgr.get(&bucket, &key).unwrap_or_default();
3763            // S3 immutability rules:
3764            //   - Compliance is one-way: once set, mode cannot move to
3765            //     Governance, and retain-until cannot be shortened.
3766            //   - Governance can be lengthened freely; shortened only
3767            //     with bypass=true.
3768            if let Some(existing_mode) = existing.mode
3769                && existing_mode == crate::object_lock::LockMode::Compliance
3770                && existing.is_locked(now)
3771            {
3772                if matches!(new_mode, Some(crate::object_lock::LockMode::Governance)) {
3773                    return Err(S3Error::with_message(
3774                        S3ErrorCode::AccessDenied,
3775                        "Cannot downgrade Compliance retention to Governance while lock is active",
3776                    ));
3777                }
3778                if let (Some(prev), Some(next)) = (existing.retain_until, new_until)
3779                    && next < prev
3780                {
3781                    return Err(S3Error::with_message(
3782                        S3ErrorCode::AccessDenied,
3783                        "Cannot shorten Compliance retention while lock is active",
3784                    ));
3785                }
3786            }
3787            if let Some(existing_mode) = existing.mode
3788                && existing_mode == crate::object_lock::LockMode::Governance
3789                && existing.is_locked(now)
3790                && !bypass
3791                && let (Some(prev), Some(next)) = (existing.retain_until, new_until)
3792                && next < prev
3793            {
3794                return Err(S3Error::with_message(
3795                    S3ErrorCode::AccessDenied,
3796                    "Shortening Governance retention requires x-amz-bypass-governance-retention: true",
3797                ));
3798            }
3799            let mut state = existing;
3800            if new_mode.is_some() {
3801                state.mode = new_mode;
3802            }
3803            if new_until.is_some() {
3804                state.retain_until = new_until;
3805            }
3806            mgr.set(&bucket, &key, state);
3807            return Ok(S3Response::new(PutObjectRetentionOutput::default()));
3808        }
3809        self.backend.put_object_retention(req).await
3810    }
3811
3812    // ---- Versioning ----
3813    // list_object_versions is implemented above in the compression-hook
3814    // section so it filters S4-internal sidecars (v0.4 #17) AND, when a
3815    // VersioningManager is attached (v0.5 #34), serves chains directly
3816    // from the in-memory index.
3817    async fn get_bucket_versioning(
3818        &self,
3819        req: S3Request<GetBucketVersioningInput>,
3820    ) -> S3Result<S3Response<GetBucketVersioningOutput>> {
3821        // v0.5 #34: when a VersioningManager is attached, the bucket's
3822        // versioning state lives in the manager (= S4-server's
3823        // authoritative source). Pass-through hits the backend only
3824        // when no manager is configured (legacy v0.4 behaviour).
3825        if let Some(mgr) = self.versioning.as_ref() {
3826            let output = match mgr.state(&req.input.bucket).as_aws_status() {
3827                Some(s) => GetBucketVersioningOutput {
3828                    status: Some(BucketVersioningStatus::from(s.to_owned())),
3829                    ..Default::default()
3830                },
3831                None => GetBucketVersioningOutput::default(),
3832            };
3833            return Ok(S3Response::new(output));
3834        }
3835        self.backend.get_bucket_versioning(req).await
3836    }
3837    async fn put_bucket_versioning(
3838        &self,
3839        req: S3Request<PutBucketVersioningInput>,
3840    ) -> S3Result<S3Response<PutBucketVersioningOutput>> {
3841        // v0.6 #42: MFA gating on the `PutBucketVersioning` request
3842        // itself. S3 spec: when the request body carries an
3843        // `MfaDelete` element (either `Enabled` or `Disabled`), the
3844        // request must include a valid `x-amz-mfa` token — both for
3845        // the *first* enable (so the operator can't quietly side-step
3846        // the gate by never enabling it) and for any subsequent
3847        // change (so a leaked credential alone can't disable MFA
3848        // Delete to bypass it on subsequent DELETEs). Requests that
3849        // omit the `MfaDelete` element entirely (i.e. they flip only
3850        // `Status`) skip this gate, matching AWS.
3851        if let Some(mgr) = self.mfa_delete.as_ref()
3852            && let Some(target_enabled) = req
3853                .input
3854                .versioning_configuration
3855                .mfa_delete
3856                .as_ref()
3857                .map(|m| m.as_str().eq_ignore_ascii_case("Enabled"))
3858        {
3859            let bucket = req.input.bucket.clone();
3860            let header = req.input.mfa.as_deref();
3861            let secret = mgr.lookup_secret(&bucket);
3862            let verified = match (header, secret.as_ref()) {
3863                (Some(h), Some(s)) => match crate::mfa::parse_mfa_header(h) {
3864                    Ok((serial, code)) => {
3865                        serial == s.serial
3866                            && crate::mfa::verify_totp(
3867                                &s.secret_base32,
3868                                &code,
3869                                current_unix_secs(),
3870                            )
3871                    }
3872                    Err(_) => false,
3873                },
3874                _ => false,
3875            };
3876            if !verified {
3877                crate::metrics::record_mfa_delete_denial(&bucket);
3878                let err = if header.is_none() {
3879                    crate::mfa::MfaError::Missing
3880                } else {
3881                    crate::mfa::MfaError::InvalidCode
3882                };
3883                return Err(mfa_error_to_s3(err));
3884            }
3885            mgr.set_bucket_state(&bucket, target_enabled);
3886        }
3887        // v0.5 #34: stash the new state in the manager, then forward to
3888        // the backend so any downstream that *also* tracks state
3889        // (e.g. a real S3 backend) stays in sync. Manager-attached but
3890        // backend rejection is treated as a soft-fail (state is still
3891        // owned by the manager).
3892        if let Some(mgr) = self.versioning.as_ref() {
3893            let new_state = match req
3894                .input
3895                .versioning_configuration
3896                .status
3897                .as_ref()
3898                .map(|s| s.as_str())
3899            {
3900                Some(s) if s.eq_ignore_ascii_case("Enabled") => {
3901                    crate::versioning::VersioningState::Enabled
3902                }
3903                Some(s) if s.eq_ignore_ascii_case("Suspended") => {
3904                    crate::versioning::VersioningState::Suspended
3905                }
3906                _ => crate::versioning::VersioningState::Unversioned,
3907            };
3908            mgr.set_state(&req.input.bucket, new_state);
3909            return Ok(S3Response::new(PutBucketVersioningOutput::default()));
3910        }
3911        self.backend.put_bucket_versioning(req).await
3912    }
3913
3914    // ---- Bucket location ----
3915    async fn get_bucket_location(
3916        &self,
3917        req: S3Request<GetBucketLocationInput>,
3918    ) -> S3Result<S3Response<GetBucketLocationOutput>> {
3919        self.backend.get_bucket_location(req).await
3920    }
3921
3922    // ---- Bucket policy ----
3923    async fn get_bucket_policy(
3924        &self,
3925        req: S3Request<GetBucketPolicyInput>,
3926    ) -> S3Result<S3Response<GetBucketPolicyOutput>> {
3927        self.backend.get_bucket_policy(req).await
3928    }
3929    async fn put_bucket_policy(
3930        &self,
3931        req: S3Request<PutBucketPolicyInput>,
3932    ) -> S3Result<S3Response<PutBucketPolicyOutput>> {
3933        self.backend.put_bucket_policy(req).await
3934    }
3935    async fn delete_bucket_policy(
3936        &self,
3937        req: S3Request<DeleteBucketPolicyInput>,
3938    ) -> S3Result<S3Response<DeleteBucketPolicyOutput>> {
3939        self.backend.delete_bucket_policy(req).await
3940    }
3941    async fn get_bucket_policy_status(
3942        &self,
3943        req: S3Request<GetBucketPolicyStatusInput>,
3944    ) -> S3Result<S3Response<GetBucketPolicyStatusOutput>> {
3945        self.backend.get_bucket_policy_status(req).await
3946    }
3947
3948    // ---- Bucket ACL ----
3949    async fn get_bucket_acl(
3950        &self,
3951        req: S3Request<GetBucketAclInput>,
3952    ) -> S3Result<S3Response<GetBucketAclOutput>> {
3953        self.backend.get_bucket_acl(req).await
3954    }
3955    async fn put_bucket_acl(
3956        &self,
3957        req: S3Request<PutBucketAclInput>,
3958    ) -> S3Result<S3Response<PutBucketAclOutput>> {
3959        self.backend.put_bucket_acl(req).await
3960    }
3961
3962    // ---- Bucket CORS (v0.6 #38) ----
3963    async fn get_bucket_cors(
3964        &self,
3965        req: S3Request<GetBucketCorsInput>,
3966    ) -> S3Result<S3Response<GetBucketCorsOutput>> {
3967        if let Some(mgr) = self.cors.as_ref() {
3968            let cfg = mgr.get(&req.input.bucket).ok_or_else(|| {
3969                S3Error::with_message(
3970                    S3ErrorCode::NoSuchCORSConfiguration,
3971                    "The CORS configuration does not exist".to_string(),
3972                )
3973            })?;
3974            let rules: Vec<CORSRule> = cfg
3975                .rules
3976                .into_iter()
3977                .map(|r| CORSRule {
3978                    allowed_headers: if r.allowed_headers.is_empty() {
3979                        None
3980                    } else {
3981                        Some(r.allowed_headers)
3982                    },
3983                    allowed_methods: r.allowed_methods,
3984                    allowed_origins: r.allowed_origins,
3985                    expose_headers: if r.expose_headers.is_empty() {
3986                        None
3987                    } else {
3988                        Some(r.expose_headers)
3989                    },
3990                    id: r.id,
3991                    max_age_seconds: r.max_age_seconds.map(|s| s as i32),
3992                })
3993                .collect();
3994            return Ok(S3Response::new(GetBucketCorsOutput {
3995                cors_rules: Some(rules),
3996            }));
3997        }
3998        self.backend.get_bucket_cors(req).await
3999    }
4000    async fn put_bucket_cors(
4001        &self,
4002        req: S3Request<PutBucketCorsInput>,
4003    ) -> S3Result<S3Response<PutBucketCorsOutput>> {
4004        if let Some(mgr) = self.cors.as_ref() {
4005            let cfg = crate::cors::CorsConfig {
4006                rules: req
4007                    .input
4008                    .cors_configuration
4009                    .cors_rules
4010                    .into_iter()
4011                    .map(|r| crate::cors::CorsRule {
4012                        allowed_origins: r.allowed_origins,
4013                        allowed_methods: r.allowed_methods,
4014                        allowed_headers: r.allowed_headers.unwrap_or_default(),
4015                        expose_headers: r.expose_headers.unwrap_or_default(),
4016                        max_age_seconds: r.max_age_seconds.and_then(|s| {
4017                            if s < 0 { None } else { Some(s as u32) }
4018                        }),
4019                        id: r.id,
4020                    })
4021                    .collect(),
4022            };
4023            mgr.put(&req.input.bucket, cfg);
4024            return Ok(S3Response::new(PutBucketCorsOutput::default()));
4025        }
4026        self.backend.put_bucket_cors(req).await
4027    }
4028    async fn delete_bucket_cors(
4029        &self,
4030        req: S3Request<DeleteBucketCorsInput>,
4031    ) -> S3Result<S3Response<DeleteBucketCorsOutput>> {
4032        if let Some(mgr) = self.cors.as_ref() {
4033            mgr.delete(&req.input.bucket);
4034            return Ok(S3Response::new(DeleteBucketCorsOutput::default()));
4035        }
4036        self.backend.delete_bucket_cors(req).await
4037    }
4038
4039    // ---- Bucket lifecycle (v0.6 #37) ----
4040    async fn get_bucket_lifecycle_configuration(
4041        &self,
4042        req: S3Request<GetBucketLifecycleConfigurationInput>,
4043    ) -> S3Result<S3Response<GetBucketLifecycleConfigurationOutput>> {
4044        if let Some(mgr) = self.lifecycle.as_ref() {
4045            let cfg = mgr.get(&req.input.bucket).ok_or_else(|| {
4046                S3Error::with_message(
4047                    S3ErrorCode::NoSuchLifecycleConfiguration,
4048                    "The lifecycle configuration does not exist".to_string(),
4049                )
4050            })?;
4051            let rules: Vec<LifecycleRule> = cfg.rules.iter().map(internal_rule_to_dto).collect();
4052            return Ok(S3Response::new(GetBucketLifecycleConfigurationOutput {
4053                rules: Some(rules),
4054                transition_default_minimum_object_size: None,
4055            }));
4056        }
4057        self.backend.get_bucket_lifecycle_configuration(req).await
4058    }
4059    async fn put_bucket_lifecycle_configuration(
4060        &self,
4061        req: S3Request<PutBucketLifecycleConfigurationInput>,
4062    ) -> S3Result<S3Response<PutBucketLifecycleConfigurationOutput>> {
4063        if let Some(mgr) = self.lifecycle.as_ref() {
4064            let bucket = req.input.bucket.clone();
4065            let dto_cfg = req.input.lifecycle_configuration.unwrap_or_default();
4066            let cfg = dto_lifecycle_to_internal(&dto_cfg);
4067            mgr.put(&bucket, cfg);
4068            return Ok(S3Response::new(
4069                PutBucketLifecycleConfigurationOutput::default(),
4070            ));
4071        }
4072        self.backend.put_bucket_lifecycle_configuration(req).await
4073    }
4074    async fn delete_bucket_lifecycle(
4075        &self,
4076        req: S3Request<DeleteBucketLifecycleInput>,
4077    ) -> S3Result<S3Response<DeleteBucketLifecycleOutput>> {
4078        if let Some(mgr) = self.lifecycle.as_ref() {
4079            mgr.delete(&req.input.bucket);
4080            return Ok(S3Response::new(DeleteBucketLifecycleOutput::default()));
4081        }
4082        self.backend.delete_bucket_lifecycle(req).await
4083    }
4084
4085    // ---- Bucket tagging (v0.6 #39) ----
4086    async fn get_bucket_tagging(
4087        &self,
4088        req: S3Request<GetBucketTaggingInput>,
4089    ) -> S3Result<S3Response<GetBucketTaggingOutput>> {
4090        let Some(mgr) = self.tagging.as_ref() else {
4091            return self.backend.get_bucket_tagging(req).await;
4092        };
4093        let tags = mgr.get_bucket_tags(&req.input.bucket).unwrap_or_default();
4094        Ok(S3Response::new(GetBucketTaggingOutput {
4095            tag_set: tagset_to_aws(&tags),
4096        }))
4097    }
4098    async fn put_bucket_tagging(
4099        &self,
4100        req: S3Request<PutBucketTaggingInput>,
4101    ) -> S3Result<S3Response<PutBucketTaggingOutput>> {
4102        let Some(mgr) = self.tagging.as_ref() else {
4103            return self.backend.put_bucket_tagging(req).await;
4104        };
4105        let bucket = req.input.bucket.clone();
4106        let parsed = aws_to_tagset(&req.input.tagging.tag_set).map_err(|e| {
4107            S3Error::with_message(S3ErrorCode::InvalidArgument, e.to_string())
4108        })?;
4109        self.enforce_policy(&req, "s3:PutBucketTagging", &bucket, None)?;
4110        mgr.put_bucket_tags(&bucket, parsed);
4111        Ok(S3Response::new(PutBucketTaggingOutput::default()))
4112    }
4113    async fn delete_bucket_tagging(
4114        &self,
4115        req: S3Request<DeleteBucketTaggingInput>,
4116    ) -> S3Result<S3Response<DeleteBucketTaggingOutput>> {
4117        let Some(mgr) = self.tagging.as_ref() else {
4118            return self.backend.delete_bucket_tagging(req).await;
4119        };
4120        let bucket = req.input.bucket.clone();
4121        self.enforce_policy(&req, "s3:PutBucketTagging", &bucket, None)?;
4122        mgr.delete_bucket_tags(&bucket);
4123        Ok(S3Response::new(DeleteBucketTaggingOutput::default()))
4124    }
4125
4126    // ---- Bucket encryption ----
4127    async fn get_bucket_encryption(
4128        &self,
4129        req: S3Request<GetBucketEncryptionInput>,
4130    ) -> S3Result<S3Response<GetBucketEncryptionOutput>> {
4131        self.backend.get_bucket_encryption(req).await
4132    }
4133    async fn put_bucket_encryption(
4134        &self,
4135        req: S3Request<PutBucketEncryptionInput>,
4136    ) -> S3Result<S3Response<PutBucketEncryptionOutput>> {
4137        self.backend.put_bucket_encryption(req).await
4138    }
4139    async fn delete_bucket_encryption(
4140        &self,
4141        req: S3Request<DeleteBucketEncryptionInput>,
4142    ) -> S3Result<S3Response<DeleteBucketEncryptionOutput>> {
4143        self.backend.delete_bucket_encryption(req).await
4144    }
4145
4146    // ---- Bucket logging ----
4147    async fn get_bucket_logging(
4148        &self,
4149        req: S3Request<GetBucketLoggingInput>,
4150    ) -> S3Result<S3Response<GetBucketLoggingOutput>> {
4151        self.backend.get_bucket_logging(req).await
4152    }
4153    async fn put_bucket_logging(
4154        &self,
4155        req: S3Request<PutBucketLoggingInput>,
4156    ) -> S3Result<S3Response<PutBucketLoggingOutput>> {
4157        self.backend.put_bucket_logging(req).await
4158    }
4159
4160    // ---- Bucket notification (v0.6 #35) ----
4161    //
4162    // When a `NotificationManager` is attached, S4 itself owns per-bucket
4163    // notification configurations and the PUT / GET handlers route through
4164    // the manager. The wire DTO's queue / topic configurations map onto
4165    // S4's `Destination::Sqs` / `Destination::Sns`; LambdaFunction and
4166    // EventBridge configurations are accepted on PUT but silently dropped
4167    // (out of scope for v0.6 #35). When no manager is attached the legacy
4168    // backend-passthrough behaviour applies.
4169    async fn get_bucket_notification_configuration(
4170        &self,
4171        req: S3Request<GetBucketNotificationConfigurationInput>,
4172    ) -> S3Result<S3Response<GetBucketNotificationConfigurationOutput>> {
4173        if let Some(mgr) = self.notifications.as_ref() {
4174            let cfg = mgr.get(&req.input.bucket).unwrap_or_default();
4175            let dto = notif_to_dto(&cfg);
4176            return Ok(S3Response::new(GetBucketNotificationConfigurationOutput {
4177                event_bridge_configuration: dto.event_bridge_configuration,
4178                lambda_function_configurations: dto.lambda_function_configurations,
4179                queue_configurations: dto.queue_configurations,
4180                topic_configurations: dto.topic_configurations,
4181            }));
4182        }
4183        self.backend
4184            .get_bucket_notification_configuration(req)
4185            .await
4186    }
4187    async fn put_bucket_notification_configuration(
4188        &self,
4189        req: S3Request<PutBucketNotificationConfigurationInput>,
4190    ) -> S3Result<S3Response<PutBucketNotificationConfigurationOutput>> {
4191        if let Some(mgr) = self.notifications.as_ref() {
4192            let cfg = notif_from_dto(&req.input.notification_configuration);
4193            mgr.put(&req.input.bucket, cfg);
4194            return Ok(S3Response::new(
4195                PutBucketNotificationConfigurationOutput::default(),
4196            ));
4197        }
4198        self.backend
4199            .put_bucket_notification_configuration(req)
4200            .await
4201    }
4202
4203    // ---- Bucket request payment ----
4204    async fn get_bucket_request_payment(
4205        &self,
4206        req: S3Request<GetBucketRequestPaymentInput>,
4207    ) -> S3Result<S3Response<GetBucketRequestPaymentOutput>> {
4208        self.backend.get_bucket_request_payment(req).await
4209    }
4210    async fn put_bucket_request_payment(
4211        &self,
4212        req: S3Request<PutBucketRequestPaymentInput>,
4213    ) -> S3Result<S3Response<PutBucketRequestPaymentOutput>> {
4214        self.backend.put_bucket_request_payment(req).await
4215    }
4216
4217    // ---- Bucket website ----
4218    async fn get_bucket_website(
4219        &self,
4220        req: S3Request<GetBucketWebsiteInput>,
4221    ) -> S3Result<S3Response<GetBucketWebsiteOutput>> {
4222        self.backend.get_bucket_website(req).await
4223    }
4224    async fn put_bucket_website(
4225        &self,
4226        req: S3Request<PutBucketWebsiteInput>,
4227    ) -> S3Result<S3Response<PutBucketWebsiteOutput>> {
4228        self.backend.put_bucket_website(req).await
4229    }
4230    async fn delete_bucket_website(
4231        &self,
4232        req: S3Request<DeleteBucketWebsiteInput>,
4233    ) -> S3Result<S3Response<DeleteBucketWebsiteOutput>> {
4234        self.backend.delete_bucket_website(req).await
4235    }
4236
4237    // ---- Bucket replication (v0.6 #40) ----
4238    async fn get_bucket_replication(
4239        &self,
4240        req: S3Request<GetBucketReplicationInput>,
4241    ) -> S3Result<S3Response<GetBucketReplicationOutput>> {
4242        if let Some(mgr) = self.replication.as_ref() {
4243            return match mgr.get(&req.input.bucket) {
4244                Some(cfg) => Ok(S3Response::new(GetBucketReplicationOutput {
4245                    replication_configuration: Some(replication_to_dto(&cfg)),
4246                })),
4247                None => Err(S3Error::with_message(
4248                    S3ErrorCode::Custom("ReplicationConfigurationNotFoundError".into()),
4249                    format!("no replication configuration on bucket {}", req.input.bucket),
4250                )),
4251            };
4252        }
4253        self.backend.get_bucket_replication(req).await
4254    }
4255    async fn put_bucket_replication(
4256        &self,
4257        req: S3Request<PutBucketReplicationInput>,
4258    ) -> S3Result<S3Response<PutBucketReplicationOutput>> {
4259        if let Some(mgr) = self.replication.as_ref() {
4260            let cfg = replication_from_dto(&req.input.replication_configuration);
4261            mgr.put(&req.input.bucket, cfg);
4262            return Ok(S3Response::new(PutBucketReplicationOutput::default()));
4263        }
4264        self.backend.put_bucket_replication(req).await
4265    }
4266    async fn delete_bucket_replication(
4267        &self,
4268        req: S3Request<DeleteBucketReplicationInput>,
4269    ) -> S3Result<S3Response<DeleteBucketReplicationOutput>> {
4270        if let Some(mgr) = self.replication.as_ref() {
4271            mgr.delete(&req.input.bucket);
4272            return Ok(S3Response::new(DeleteBucketReplicationOutput::default()));
4273        }
4274        self.backend.delete_bucket_replication(req).await
4275    }
4276
4277    // ---- Bucket accelerate ----
4278    async fn get_bucket_accelerate_configuration(
4279        &self,
4280        req: S3Request<GetBucketAccelerateConfigurationInput>,
4281    ) -> S3Result<S3Response<GetBucketAccelerateConfigurationOutput>> {
4282        self.backend.get_bucket_accelerate_configuration(req).await
4283    }
4284    async fn put_bucket_accelerate_configuration(
4285        &self,
4286        req: S3Request<PutBucketAccelerateConfigurationInput>,
4287    ) -> S3Result<S3Response<PutBucketAccelerateConfigurationOutput>> {
4288        self.backend.put_bucket_accelerate_configuration(req).await
4289    }
4290
4291    // ---- Bucket ownership controls ----
4292    async fn get_bucket_ownership_controls(
4293        &self,
4294        req: S3Request<GetBucketOwnershipControlsInput>,
4295    ) -> S3Result<S3Response<GetBucketOwnershipControlsOutput>> {
4296        self.backend.get_bucket_ownership_controls(req).await
4297    }
4298    async fn put_bucket_ownership_controls(
4299        &self,
4300        req: S3Request<PutBucketOwnershipControlsInput>,
4301    ) -> S3Result<S3Response<PutBucketOwnershipControlsOutput>> {
4302        self.backend.put_bucket_ownership_controls(req).await
4303    }
4304    async fn delete_bucket_ownership_controls(
4305        &self,
4306        req: S3Request<DeleteBucketOwnershipControlsInput>,
4307    ) -> S3Result<S3Response<DeleteBucketOwnershipControlsOutput>> {
4308        self.backend.delete_bucket_ownership_controls(req).await
4309    }
4310
4311    // ---- Public access block ----
4312    async fn get_public_access_block(
4313        &self,
4314        req: S3Request<GetPublicAccessBlockInput>,
4315    ) -> S3Result<S3Response<GetPublicAccessBlockOutput>> {
4316        self.backend.get_public_access_block(req).await
4317    }
4318    async fn put_public_access_block(
4319        &self,
4320        req: S3Request<PutPublicAccessBlockInput>,
4321    ) -> S3Result<S3Response<PutPublicAccessBlockOutput>> {
4322        self.backend.put_public_access_block(req).await
4323    }
4324    async fn delete_public_access_block(
4325        &self,
4326        req: S3Request<DeletePublicAccessBlockInput>,
4327    ) -> S3Result<S3Response<DeletePublicAccessBlockOutput>> {
4328        self.backend.delete_public_access_block(req).await
4329    }
4330
4331    // ====================================================================
4332    // v0.6 #41: S3 Select — server-side SQL filter on object body.
4333    //
4334    // Fetch the object via the regular `get_object` path (so SSE-C /
4335    // SSE-S4 / SSE-KMS / S4 codec all decompress + decrypt transparently),
4336    // run a small SQL subset (CSV + JSON Lines, equality / inequality /
4337    // LIKE / AND / OR / NOT) over the in-memory body, and stream the
4338    // matched rows back as AWS event-stream `Records` + `Stats` + `End`
4339    // frames.
4340    //
4341    // Limitations (deliberate, documented):
4342    //   - Parquet input is rejected with NotImplemented.
4343    //   - Aggregates / GROUP BY / JOIN / ORDER BY / LIMIT are rejected at
4344    //     parse time as InvalidRequest (s3s 0.13 doesn't expose AWS's
4345    //     domain-specific `InvalidSqlExpression` code).
4346    //   - The body is fully buffered before SQL evaluation (S3 Select
4347    //     streaming-during-evaluation is v0.7 scope).
4348    //   - GPU-accelerated WHERE evaluation is stubbed out (always None).
4349    async fn select_object_content(
4350        &self,
4351        req: S3Request<SelectObjectContentInput>,
4352    ) -> S3Result<S3Response<SelectObjectContentOutput>> {
4353        use crate::select::{
4354            EventStreamWriter, SelectInputFormat, SelectOutputFormat, run_select_csv,
4355            run_select_jsonlines,
4356        };
4357
4358        let select_bucket = req.input.bucket.clone();
4359        let select_key = req.input.key.clone();
4360        self.enforce_rate_limit(&req, &select_bucket)?;
4361        self.enforce_policy(
4362            &req,
4363            "s3:GetObject",
4364            &select_bucket,
4365            Some(&select_key),
4366        )?;
4367
4368        let request = req.input.request;
4369        let sql = request.expression.clone();
4370        if request.expression_type.as_str() != "SQL" {
4371            return Err(S3Error::with_message(
4372                S3ErrorCode::InvalidExpressionType,
4373                format!(
4374                    "ExpressionType must be SQL, got: {}",
4375                    request.expression_type.as_str()
4376                ),
4377            ));
4378        }
4379
4380        let input_format = if let Some(_json) = request.input_serialization.json.as_ref() {
4381            SelectInputFormat::JsonLines
4382        } else if let Some(csv) = request.input_serialization.csv.as_ref() {
4383            let has_header = csv
4384                .file_header_info
4385                .as_ref()
4386                .map(|h| {
4387                    let s = h.as_str();
4388                    s.eq_ignore_ascii_case("USE") || s.eq_ignore_ascii_case("IGNORE")
4389                })
4390                .unwrap_or(false);
4391            let delim = csv
4392                .field_delimiter
4393                .as_deref()
4394                .and_then(|s| s.chars().next())
4395                .unwrap_or(',');
4396            SelectInputFormat::Csv {
4397                has_header,
4398                delimiter: delim,
4399            }
4400        } else if request.input_serialization.parquet.is_some() {
4401            return Err(S3Error::with_message(
4402                S3ErrorCode::NotImplemented,
4403                "Parquet input is not supported by this S3 Select implementation (v0.6: CSV / JSON Lines only)",
4404            ));
4405        } else {
4406            return Err(S3Error::with_message(
4407                S3ErrorCode::InvalidRequest,
4408                "InputSerialization requires exactly one of CSV / JSON / Parquet",
4409            ));
4410        };
4411        if let Some(ct) = request.input_serialization.compression_type.as_ref()
4412            && !ct.as_str().eq_ignore_ascii_case("NONE")
4413        {
4414            return Err(S3Error::with_message(
4415                S3ErrorCode::NotImplemented,
4416                format!(
4417                    "InputSerialization CompressionType={} is not supported (v0.6: NONE only)",
4418                    ct.as_str()
4419                ),
4420            ));
4421        }
4422
4423        let output_format = if request.output_serialization.json.is_some() {
4424            SelectOutputFormat::Json
4425        } else if request.output_serialization.csv.is_some() {
4426            SelectOutputFormat::Csv
4427        } else {
4428            return Err(S3Error::with_message(
4429                S3ErrorCode::InvalidRequest,
4430                "OutputSerialization requires exactly one of CSV / JSON",
4431            ));
4432        };
4433
4434        let get_input = GetObjectInput {
4435            bucket: select_bucket.clone(),
4436            key: select_key.clone(),
4437            sse_customer_algorithm: req.input.sse_customer_algorithm.clone(),
4438            sse_customer_key: req.input.sse_customer_key.clone(),
4439            sse_customer_key_md5: req.input.sse_customer_key_md5.clone(),
4440            ..Default::default()
4441        };
4442        let get_req = S3Request {
4443            input: get_input,
4444            method: http::Method::GET,
4445            uri: format!("/{}/{}", select_bucket, select_key)
4446                .parse()
4447                .map_err(|e| {
4448                    S3Error::with_message(
4449                        S3ErrorCode::InternalError,
4450                        format!("constructing inner GET URI: {e}"),
4451                    )
4452                })?,
4453            headers: http::HeaderMap::new(),
4454            extensions: http::Extensions::new(),
4455            credentials: req.credentials.clone(),
4456            region: req.region.clone(),
4457            service: req.service.clone(),
4458            trailing_headers: None,
4459        };
4460        let mut get_resp = self.get_object(get_req).await?;
4461        let blob = get_resp.output.body.take().ok_or_else(|| {
4462            S3Error::with_message(
4463                S3ErrorCode::InternalError,
4464                "Select: object body was empty after GET",
4465            )
4466        })?;
4467        let body_bytes = crate::blob::collect_blob(blob, self.max_body_bytes)
4468            .await
4469            .map_err(internal("collect Select body"))?;
4470        let scanned = body_bytes.len() as u64;
4471
4472        let matched_payload = match input_format {
4473            SelectInputFormat::JsonLines => {
4474                run_select_jsonlines(&sql, &body_bytes, output_format).map_err(
4475                    |e| select_error_to_s3(e, "JSON Lines"),
4476                )?
4477            }
4478            SelectInputFormat::Csv { .. } => {
4479                run_select_csv(&sql, &body_bytes, input_format, output_format)
4480                    .map_err(|e| select_error_to_s3(e, "CSV"))?
4481            }
4482        };
4483
4484        let returned = matched_payload.len() as u64;
4485        let processed = scanned;
4486        let mut events: Vec<S3Result<SelectObjectContentEvent>> = Vec::with_capacity(3);
4487        if !matched_payload.is_empty() {
4488            events.push(Ok(SelectObjectContentEvent::Records(RecordsEvent {
4489                payload: Some(bytes::Bytes::from(matched_payload)),
4490            })));
4491        }
4492        events.push(Ok(SelectObjectContentEvent::Stats(StatsEvent {
4493            details: Some(Stats {
4494                bytes_scanned: Some(scanned as i64),
4495                bytes_processed: Some(processed as i64),
4496                bytes_returned: Some(returned as i64),
4497            }),
4498        })));
4499        events.push(Ok(SelectObjectContentEvent::End(EndEvent {})));
4500        // Touch EventStreamWriter so the public API stays linked into the
4501        // build (the actual wire framing is delegated to s3s).
4502        let _writer = EventStreamWriter::new();
4503
4504        let stream =
4505            SelectObjectContentEventStream::new(futures::stream::iter(events));
4506        let output = SelectObjectContentOutput {
4507            payload: Some(stream),
4508        };
4509        Ok(S3Response::new(output))
4510    }
4511
4512    // ---- Bucket Inventory configuration (v0.6 #36) ----
4513    //
4514    // When an `InventoryManager` is attached, S4-server owns the
4515    // configuration store and these handlers no longer pass through to
4516    // the backend. The mapping between the s3s-typed
4517    // `InventoryConfiguration` and the inventory module's internal
4518    // `InventoryConfig` is intentionally lossy: only the fields S4
4519    // actually uses for periodic CSV emission survive the round trip
4520    // (id, source bucket, destination bucket / prefix, format, included
4521    // versions, schedule frequency). Optional fields, encryption, and
4522    // filter prefixes are accepted on PUT and re-surfaced on GET via
4523    // a best-effort default-shape `InventoryConfiguration` so the
4524    // client sees a roundtrip-clean response.
4525    async fn put_bucket_inventory_configuration(
4526        &self,
4527        req: S3Request<PutBucketInventoryConfigurationInput>,
4528    ) -> S3Result<S3Response<PutBucketInventoryConfigurationOutput>> {
4529        if let Some(mgr) = self.inventory.as_ref() {
4530            let cfg = inv_from_dto(
4531                &req.input.bucket,
4532                &req.input.id,
4533                &req.input.inventory_configuration,
4534            );
4535            mgr.put(cfg);
4536            return Ok(S3Response::new(PutBucketInventoryConfigurationOutput::default()));
4537        }
4538        self.backend.put_bucket_inventory_configuration(req).await
4539    }
4540
4541    async fn get_bucket_inventory_configuration(
4542        &self,
4543        req: S3Request<GetBucketInventoryConfigurationInput>,
4544    ) -> S3Result<S3Response<GetBucketInventoryConfigurationOutput>> {
4545        if let Some(mgr) = self.inventory.as_ref() {
4546            let cfg = mgr.get(&req.input.bucket, &req.input.id);
4547            if let Some(cfg) = cfg {
4548                let out = GetBucketInventoryConfigurationOutput {
4549                    inventory_configuration: Some(inv_to_dto(&cfg)),
4550                };
4551                return Ok(S3Response::new(out));
4552            }
4553            // AWS returns `NoSuchConfiguration` (404) when the id has no
4554            // matching inventory configuration on the bucket. The
4555            // generated `S3ErrorCode` enum doesn't expose a typed variant
4556            // for this code, so we round-trip through `from_bytes` which
4557            // wraps unknown codes as `Custom(...)` (= the AWS-canonical
4558            // error-code string survives into the XML response envelope).
4559            let code = S3ErrorCode::from_bytes(b"NoSuchConfiguration")
4560                .unwrap_or(S3ErrorCode::NoSuchKey);
4561            return Err(S3Error::with_message(
4562                code,
4563                format!(
4564                    "no inventory configuration with id={} on bucket={}",
4565                    req.input.id, req.input.bucket
4566                ),
4567            ));
4568        }
4569        self.backend.get_bucket_inventory_configuration(req).await
4570    }
4571
4572    async fn list_bucket_inventory_configurations(
4573        &self,
4574        req: S3Request<ListBucketInventoryConfigurationsInput>,
4575    ) -> S3Result<S3Response<ListBucketInventoryConfigurationsOutput>> {
4576        if let Some(mgr) = self.inventory.as_ref() {
4577            let list = mgr.list_for_bucket(&req.input.bucket);
4578            let dto_list: Vec<InventoryConfiguration> = list.iter().map(inv_to_dto).collect();
4579            let out = ListBucketInventoryConfigurationsOutput {
4580                continuation_token: req.input.continuation_token.clone(),
4581                inventory_configuration_list: if dto_list.is_empty() {
4582                    None
4583                } else {
4584                    Some(dto_list)
4585                },
4586                is_truncated: Some(false),
4587                next_continuation_token: None,
4588            };
4589            return Ok(S3Response::new(out));
4590        }
4591        self.backend.list_bucket_inventory_configurations(req).await
4592    }
4593
4594    async fn delete_bucket_inventory_configuration(
4595        &self,
4596        req: S3Request<DeleteBucketInventoryConfigurationInput>,
4597    ) -> S3Result<S3Response<DeleteBucketInventoryConfigurationOutput>> {
4598        if let Some(mgr) = self.inventory.as_ref() {
4599            mgr.delete(&req.input.bucket, &req.input.id);
4600            return Ok(S3Response::new(
4601                DeleteBucketInventoryConfigurationOutput::default(),
4602            ));
4603        }
4604        self.backend.delete_bucket_inventory_configuration(req).await
4605    }
4606}
4607
4608// ---------------------------------------------------------------------------
4609// v0.6 #36: Convert between the s3s-typed `InventoryConfiguration` (the wire
4610// surface) and our internal `crate::inventory::InventoryConfig`. Only the
4611// fields S4 actually uses for CSV emission survive the round trip; the
4612// missing fields (filter prefix, optional fields, encryption) are dropped on
4613// PUT and re-rendered as the AWS-default shape on GET so the client sees a
4614// well-formed `InventoryConfiguration`.
4615// ---------------------------------------------------------------------------
4616
4617fn inv_from_dto(
4618    bucket: &str,
4619    id: &str,
4620    dto: &InventoryConfiguration,
4621) -> crate::inventory::InventoryConfig {
4622    let frequency_hours = match dto.schedule.frequency.as_str() {
4623        "Weekly" => 24 * 7,
4624        // Daily is the default; anything S4 doesn't recognise (incl.
4625        // empty, which is the s3s-default) maps to Daily so the
4626        // operator's PUT doesn't silently turn into a no-op cadence.
4627        _ => 24,
4628    };
4629    // Parquet/ORC are not supported (issue #36 scope); we still accept
4630    // the PUT so callers don't fail-loud, but we record CSV and rely on
4631    // the operator catching the discrepancy on GET.
4632    let format = crate::inventory::InventoryFormat::Csv;
4633    crate::inventory::InventoryConfig {
4634        id: id.to_owned(),
4635        bucket: bucket.to_owned(),
4636        destination_bucket: dto.destination.s3_bucket_destination.bucket.clone(),
4637        destination_prefix: dto
4638            .destination
4639            .s3_bucket_destination
4640            .prefix
4641            .clone()
4642            .unwrap_or_default(),
4643        frequency_hours,
4644        format,
4645        included_object_versions: crate::inventory::IncludedVersions::from_aws_str(
4646            dto.included_object_versions.as_str(),
4647        ),
4648    }
4649}
4650
4651fn inv_to_dto(cfg: &crate::inventory::InventoryConfig) -> InventoryConfiguration {
4652    InventoryConfiguration {
4653        id: cfg.id.clone(),
4654        is_enabled: true,
4655        included_object_versions: InventoryIncludedObjectVersions::from(
4656            cfg.included_object_versions.as_aws_str().to_owned(),
4657        ),
4658        destination: InventoryDestination {
4659            s3_bucket_destination: InventoryS3BucketDestination {
4660                account_id: None,
4661                bucket: cfg.destination_bucket.clone(),
4662                encryption: None,
4663                format: InventoryFormat::from(cfg.format.as_aws_str().to_owned()),
4664                prefix: if cfg.destination_prefix.is_empty() {
4665                    None
4666                } else {
4667                    Some(cfg.destination_prefix.clone())
4668                },
4669            },
4670        },
4671        schedule: InventorySchedule {
4672            // `frequency_hours == 168` -> Weekly; everything else maps to
4673            // Daily for the wire response (the manager keeps the precise
4674            // hour count internally for due-checking).
4675            frequency: InventoryFrequency::from(
4676                if cfg.frequency_hours == 24 * 7 {
4677                    "Weekly"
4678                } else {
4679                    "Daily"
4680                }
4681                .to_owned(),
4682            ),
4683        },
4684        filter: None,
4685        optional_fields: None,
4686    }
4687}
4688
4689// ---------------------------------------------------------------------------
4690// v0.6 #35: Convert between the s3s-typed `NotificationConfiguration` (the
4691// wire surface) and our internal `crate::notifications::NotificationConfig`.
4692//
4693// We support TopicConfiguration (-> Destination::Sns) and QueueConfiguration
4694// (-> Destination::Sqs). LambdaFunction and EventBridge configurations are
4695// silently dropped on PUT (out of scope for v0.6 #35); the GET response only
4696// surfaces topic / queue rules.
4697//
4698// The webhook destination has no AWS-native wire form: operators configure
4699// webhooks via the JSON snapshot file (`--notifications-state-file`) or by
4700// poking `NotificationManager::put` directly from a custom binary. This
4701// keeps the wire surface AWS-compatible while still letting the always-
4702// available `Webhook` destination be reachable.
4703// ---------------------------------------------------------------------------
4704
4705fn notif_from_dto(
4706    dto: &NotificationConfiguration,
4707) -> crate::notifications::NotificationConfig {
4708    let mut rules: Vec<crate::notifications::NotificationRule> = Vec::new();
4709    if let Some(topics) = dto.topic_configurations.as_ref() {
4710        for (idx, t) in topics.iter().enumerate() {
4711            let events = events_from_dto(&t.events);
4712            let (prefix, suffix) = filter_from_dto(t.filter.as_ref());
4713            rules.push(crate::notifications::NotificationRule {
4714                id: t.id.clone().unwrap_or_else(|| format!("topic-{idx}")),
4715                events,
4716                destination: crate::notifications::Destination::Sns {
4717                    topic_arn: t.topic_arn.clone(),
4718                },
4719                filter_prefix: prefix,
4720                filter_suffix: suffix,
4721            });
4722        }
4723    }
4724    if let Some(queues) = dto.queue_configurations.as_ref() {
4725        for (idx, q) in queues.iter().enumerate() {
4726            let events = events_from_dto(&q.events);
4727            let (prefix, suffix) = filter_from_dto(q.filter.as_ref());
4728            rules.push(crate::notifications::NotificationRule {
4729                id: q.id.clone().unwrap_or_else(|| format!("queue-{idx}")),
4730                events,
4731                destination: crate::notifications::Destination::Sqs {
4732                    queue_arn: q.queue_arn.clone(),
4733                },
4734                filter_prefix: prefix,
4735                filter_suffix: suffix,
4736            });
4737        }
4738    }
4739    crate::notifications::NotificationConfig { rules }
4740}
4741
4742fn notif_to_dto(
4743    cfg: &crate::notifications::NotificationConfig,
4744) -> NotificationConfiguration {
4745    let mut topics: Vec<TopicConfiguration> = Vec::new();
4746    let mut queues: Vec<QueueConfiguration> = Vec::new();
4747    for rule in &cfg.rules {
4748        let events: Vec<Event> = rule
4749            .events
4750            .iter()
4751            .map(|e| Event::from(e.as_aws_str().to_owned()))
4752            .collect();
4753        let filter = filter_to_dto(rule.filter_prefix.as_deref(), rule.filter_suffix.as_deref());
4754        match &rule.destination {
4755            crate::notifications::Destination::Sns { topic_arn } => {
4756                topics.push(TopicConfiguration {
4757                    events,
4758                    filter,
4759                    id: Some(rule.id.clone()),
4760                    topic_arn: topic_arn.clone(),
4761                });
4762            }
4763            crate::notifications::Destination::Sqs { queue_arn } => {
4764                queues.push(QueueConfiguration {
4765                    events,
4766                    filter,
4767                    id: Some(rule.id.clone()),
4768                    queue_arn: queue_arn.clone(),
4769                });
4770            }
4771            // Webhook destinations have no AWS wire equivalent — they
4772            // round-trip through the JSON snapshot only. Skip them on the
4773            // GET surface (an SDK consumer wouldn't know what to do with
4774            // them anyway).
4775            crate::notifications::Destination::Webhook { .. } => {}
4776        }
4777    }
4778    NotificationConfiguration {
4779        event_bridge_configuration: None,
4780        lambda_function_configurations: None,
4781        queue_configurations: if queues.is_empty() { None } else { Some(queues) },
4782        topic_configurations: if topics.is_empty() { None } else { Some(topics) },
4783    }
4784}
4785
4786fn events_from_dto(events: &[Event]) -> Vec<crate::notifications::EventType> {
4787    events
4788        .iter()
4789        .filter_map(|e| crate::notifications::EventType::from_aws_str(e.as_ref()))
4790        .collect()
4791}
4792
4793fn filter_from_dto(
4794    f: Option<&NotificationConfigurationFilter>,
4795) -> (Option<String>, Option<String>) {
4796    let Some(f) = f else {
4797        return (None, None);
4798    };
4799    let Some(key) = f.key.as_ref() else {
4800        return (None, None);
4801    };
4802    let Some(rules) = key.filter_rules.as_ref() else {
4803        return (None, None);
4804    };
4805    let mut prefix = None;
4806    let mut suffix = None;
4807    for r in rules {
4808        let name = r.name.as_ref().map(|n| n.as_str().to_ascii_lowercase());
4809        let value = r.value.clone();
4810        match name.as_deref() {
4811            Some("prefix") => prefix = value,
4812            Some("suffix") => suffix = value,
4813            _ => {}
4814        }
4815    }
4816    (prefix, suffix)
4817}
4818
4819fn filter_to_dto(
4820    prefix: Option<&str>,
4821    suffix: Option<&str>,
4822) -> Option<NotificationConfigurationFilter> {
4823    if prefix.is_none() && suffix.is_none() {
4824        return None;
4825    }
4826    let mut rules: Vec<FilterRule> = Vec::new();
4827    if let Some(p) = prefix {
4828        rules.push(FilterRule {
4829            name: Some(FilterRuleName::from("prefix".to_owned())),
4830            value: Some(p.to_owned()),
4831        });
4832    }
4833    if let Some(s) = suffix {
4834        rules.push(FilterRule {
4835            name: Some(FilterRuleName::from("suffix".to_owned())),
4836            value: Some(s.to_owned()),
4837        });
4838    }
4839    Some(NotificationConfigurationFilter {
4840        key: Some(S3KeyFilter {
4841            filter_rules: Some(rules),
4842        }),
4843    })
4844}
4845
4846// ---------------------------------------------------------------------------
4847// v0.6 #40: Convert between the s3s-typed `ReplicationConfiguration` (the
4848// wire surface) and our internal `crate::replication::ReplicationConfig`.
4849// AWS's `ReplicationRuleFilter` is a sum type — `Prefix | Tag | And { Prefix,
4850// Tags }`; we flatten it into the single `(prefix, tag-vec)` representation
4851// the matcher needs. Sub-blocks v0.6 #40 does not implement
4852// (DeleteMarkerReplication / SourceSelectionCriteria / ReplicationTime /
4853// Metrics / EncryptionConfiguration) round-trip as `None` on GET — operators
4854// who set them on PUT see them silently dropped, mirroring "feature not
4855// supported in this release" semantics.
4856// ---------------------------------------------------------------------------
4857
4858fn replication_from_dto(
4859    dto: &ReplicationConfiguration,
4860) -> crate::replication::ReplicationConfig {
4861    let rules = dto
4862        .rules
4863        .iter()
4864        .enumerate()
4865        .map(|(idx, r)| {
4866            let id = r
4867                .id
4868                .as_ref()
4869                .map(|s| s.as_str().to_owned())
4870                .unwrap_or_else(|| format!("rule-{idx}"));
4871            let priority = r.priority.unwrap_or(0).max(0) as u32;
4872            let status_enabled = r.status.as_str() == ReplicationRuleStatus::ENABLED;
4873            let filter = replication_filter_from_dto(r.filter.as_ref(), r.prefix.as_deref());
4874            let destination_bucket = r.destination.bucket.clone();
4875            let destination_storage_class = r
4876                .destination
4877                .storage_class
4878                .as_ref()
4879                .map(|s| s.as_str().to_owned());
4880            crate::replication::ReplicationRule {
4881                id,
4882                priority,
4883                status_enabled,
4884                filter,
4885                destination_bucket,
4886                destination_storage_class,
4887            }
4888        })
4889        .collect();
4890    crate::replication::ReplicationConfig {
4891        role: dto.role.clone(),
4892        rules,
4893    }
4894}
4895
4896fn replication_to_dto(
4897    cfg: &crate::replication::ReplicationConfig,
4898) -> ReplicationConfiguration {
4899    let rules = cfg
4900        .rules
4901        .iter()
4902        .map(|r| {
4903            let status = if r.status_enabled {
4904                ReplicationRuleStatus::from_static(ReplicationRuleStatus::ENABLED)
4905            } else {
4906                ReplicationRuleStatus::from_static(ReplicationRuleStatus::DISABLED)
4907            };
4908            let destination = Destination {
4909                access_control_translation: None,
4910                account: None,
4911                bucket: r.destination_bucket.clone(),
4912                encryption_configuration: None,
4913                metrics: None,
4914                replication_time: None,
4915                storage_class: r
4916                    .destination_storage_class
4917                    .as_ref()
4918                    .map(|s| StorageClass::from(s.clone())),
4919            };
4920            let filter = Some(replication_filter_to_dto(&r.filter));
4921            ReplicationRule {
4922                delete_marker_replication: None,
4923                destination,
4924                existing_object_replication: None,
4925                filter,
4926                id: Some(r.id.clone()),
4927                prefix: None,
4928                priority: Some(r.priority as i32),
4929                source_selection_criteria: None,
4930                status,
4931            }
4932        })
4933        .collect();
4934    ReplicationConfiguration {
4935        role: cfg.role.clone(),
4936        rules,
4937    }
4938}
4939
4940fn replication_filter_from_dto(
4941    f: Option<&ReplicationRuleFilter>,
4942    rule_level_prefix: Option<&str>,
4943) -> crate::replication::ReplicationFilter {
4944    let mut prefix: Option<String> = rule_level_prefix.map(str::to_owned);
4945    let mut tags: Vec<(String, String)> = Vec::new();
4946    if let Some(f) = f {
4947        if let Some(p) = f.prefix.as_ref()
4948            && prefix.is_none()
4949        {
4950            prefix = Some(p.clone());
4951        }
4952        if let Some(t) = f.tag.as_ref()
4953            && let (Some(k), Some(v)) = (t.key.as_ref(), t.value.as_ref())
4954        {
4955            tags.push((k.clone(), v.clone()));
4956        }
4957        if let Some(and) = f.and.as_ref() {
4958            if let Some(p) = and.prefix.as_ref()
4959                && prefix.is_none()
4960            {
4961                prefix = Some(p.clone());
4962            }
4963            if let Some(ts) = and.tags.as_ref() {
4964                for t in ts {
4965                    if let (Some(k), Some(v)) = (t.key.as_ref(), t.value.as_ref()) {
4966                        tags.push((k.clone(), v.clone()));
4967                    }
4968                }
4969            }
4970        }
4971    }
4972    crate::replication::ReplicationFilter { prefix, tags }
4973}
4974
4975fn replication_filter_to_dto(
4976    f: &crate::replication::ReplicationFilter,
4977) -> ReplicationRuleFilter {
4978    if f.tags.is_empty() {
4979        ReplicationRuleFilter {
4980            and: None,
4981            prefix: f.prefix.clone(),
4982            tag: None,
4983        }
4984    } else if f.tags.len() == 1 && f.prefix.is_none() {
4985        let (k, v) = &f.tags[0];
4986        ReplicationRuleFilter {
4987            and: None,
4988            prefix: None,
4989            tag: Some(Tag {
4990                key: Some(k.clone()),
4991                value: Some(v.clone()),
4992            }),
4993        }
4994    } else {
4995        let tags: Vec<Tag> = f
4996            .tags
4997            .iter()
4998            .map(|(k, v)| Tag {
4999                key: Some(k.clone()),
5000                value: Some(v.clone()),
5001            })
5002            .collect();
5003        ReplicationRuleFilter {
5004            and: Some(ReplicationRuleAndOperator {
5005                prefix: f.prefix.clone(),
5006                tags: Some(tags),
5007            }),
5008            prefix: None,
5009            tag: None,
5010        }
5011    }
5012}
5013
5014// ---------------------------------------------------------------------------
5015// v0.6 #37: Convert between the s3s-typed `BucketLifecycleConfiguration`
5016// (the wire surface) and our internal `crate::lifecycle::LifecycleConfig`.
5017// The internal representation flattens AWS's "Filter | And" disjunction
5018// into a single `LifecycleFilter` struct of optional fields plus a tag
5019// vector. Fields S4's evaluator does not consume
5020// (`expired_object_delete_marker`, `noncurrent_version_transitions`,
5021// `transition_default_minimum_object_size`, the storage class on the
5022// noncurrent expiration) are dropped on PUT and re-rendered as their
5023// AWS-default shape on GET so the client always sees a well-formed
5024// configuration.
5025// ---------------------------------------------------------------------------
5026
5027fn dto_lifecycle_to_internal(
5028    dto: &BucketLifecycleConfiguration,
5029) -> crate::lifecycle::LifecycleConfig {
5030    crate::lifecycle::LifecycleConfig {
5031        rules: dto.rules.iter().map(dto_rule_to_internal).collect(),
5032    }
5033}
5034
5035fn dto_rule_to_internal(rule: &LifecycleRule) -> crate::lifecycle::LifecycleRule {
5036    let status = crate::lifecycle::LifecycleStatus::from_aws_str(rule.status.as_str());
5037    let filter = rule
5038        .filter
5039        .as_ref()
5040        .map(dto_filter_to_internal)
5041        .unwrap_or_default();
5042    let expiration_days = rule
5043        .expiration
5044        .as_ref()
5045        .and_then(|e| e.days)
5046        .and_then(|d| u32::try_from(d).ok());
5047    let expiration_date = rule
5048        .expiration
5049        .as_ref()
5050        .and_then(|e| e.date.as_ref())
5051        .and_then(timestamp_to_chrono_utc);
5052    let transitions: Vec<crate::lifecycle::TransitionRule> = rule
5053        .transitions
5054        .as_ref()
5055        .map(|ts| {
5056            ts.iter()
5057                .filter_map(|t| {
5058                    let days = u32::try_from(t.days?).ok()?;
5059                    let storage_class = t.storage_class.as_ref()?.as_str().to_owned();
5060                    Some(crate::lifecycle::TransitionRule {
5061                        days,
5062                        storage_class,
5063                    })
5064                })
5065                .collect()
5066        })
5067        .unwrap_or_default();
5068    let noncurrent_version_expiration_days = rule
5069        .noncurrent_version_expiration
5070        .as_ref()
5071        .and_then(|n| n.noncurrent_days)
5072        .and_then(|d| u32::try_from(d).ok());
5073    let abort_incomplete_multipart_upload_days = rule
5074        .abort_incomplete_multipart_upload
5075        .as_ref()
5076        .and_then(|a| a.days_after_initiation)
5077        .and_then(|d| u32::try_from(d).ok());
5078    crate::lifecycle::LifecycleRule {
5079        id: rule.id.clone().unwrap_or_default(),
5080        status,
5081        filter,
5082        expiration_days,
5083        expiration_date,
5084        transitions,
5085        noncurrent_version_expiration_days,
5086        abort_incomplete_multipart_upload_days,
5087    }
5088}
5089
5090fn dto_filter_to_internal(filter: &LifecycleRuleFilter) -> crate::lifecycle::LifecycleFilter {
5091    let mut prefix = filter.prefix.clone();
5092    let mut tags: Vec<(String, String)> = Vec::new();
5093    let mut size_gt: Option<u64> = filter
5094        .object_size_greater_than
5095        .and_then(|n| u64::try_from(n).ok());
5096    let mut size_lt: Option<u64> = filter
5097        .object_size_less_than
5098        .and_then(|n| u64::try_from(n).ok());
5099    if let Some(t) = &filter.tag
5100        && let (Some(k), Some(v)) = (t.key.as_ref(), t.value.as_ref())
5101    {
5102        tags.push((k.clone(), v.clone()));
5103    }
5104    if let Some(and) = &filter.and {
5105        if prefix.is_none() {
5106            prefix = and.prefix.clone();
5107        }
5108        if size_gt.is_none() {
5109            size_gt = and
5110                .object_size_greater_than
5111                .and_then(|n| u64::try_from(n).ok());
5112        }
5113        if size_lt.is_none() {
5114            size_lt = and
5115                .object_size_less_than
5116                .and_then(|n| u64::try_from(n).ok());
5117        }
5118        if let Some(ts) = &and.tags {
5119            for t in ts {
5120                if let (Some(k), Some(v)) = (t.key.as_ref(), t.value.as_ref()) {
5121                    tags.push((k.clone(), v.clone()));
5122                }
5123            }
5124        }
5125    }
5126    crate::lifecycle::LifecycleFilter {
5127        prefix,
5128        tags,
5129        object_size_greater_than: size_gt,
5130        object_size_less_than: size_lt,
5131    }
5132}
5133
5134fn internal_rule_to_dto(rule: &crate::lifecycle::LifecycleRule) -> LifecycleRule {
5135    let expiration = if rule.expiration_days.is_some() || rule.expiration_date.is_some() {
5136        Some(LifecycleExpiration {
5137            date: rule.expiration_date.map(chrono_utc_to_timestamp),
5138            days: rule.expiration_days.map(|d| d as i32),
5139            expired_object_delete_marker: None,
5140        })
5141    } else {
5142        None
5143    };
5144    let transitions: Option<TransitionList> = if rule.transitions.is_empty() {
5145        None
5146    } else {
5147        Some(
5148            rule.transitions
5149                .iter()
5150                .map(|t| Transition {
5151                    date: None,
5152                    days: Some(t.days as i32),
5153                    storage_class: Some(TransitionStorageClass::from(t.storage_class.clone())),
5154                })
5155                .collect(),
5156        )
5157    };
5158    let noncurrent_version_expiration =
5159        rule.noncurrent_version_expiration_days
5160            .map(|d| NoncurrentVersionExpiration {
5161                newer_noncurrent_versions: None,
5162                noncurrent_days: Some(d as i32),
5163            });
5164    let abort_incomplete_multipart_upload =
5165        rule.abort_incomplete_multipart_upload_days
5166            .map(|d| AbortIncompleteMultipartUpload {
5167                days_after_initiation: Some(d as i32),
5168            });
5169    let filter = if rule.filter.tags.is_empty()
5170        && rule.filter.object_size_greater_than.is_none()
5171        && rule.filter.object_size_less_than.is_none()
5172    {
5173        rule.filter.prefix.as_ref().map(|p| LifecycleRuleFilter {
5174            and: None,
5175            object_size_greater_than: None,
5176            object_size_less_than: None,
5177            prefix: Some(p.clone()),
5178            tag: None,
5179        })
5180    } else if rule.filter.tags.len() == 1
5181        && rule.filter.prefix.is_none()
5182        && rule.filter.object_size_greater_than.is_none()
5183        && rule.filter.object_size_less_than.is_none()
5184    {
5185        let (k, v) = rule.filter.tags[0].clone();
5186        Some(LifecycleRuleFilter {
5187            and: None,
5188            object_size_greater_than: None,
5189            object_size_less_than: None,
5190            prefix: None,
5191            tag: Some(Tag {
5192                key: Some(k),
5193                value: Some(v),
5194            }),
5195        })
5196    } else {
5197        let tags = if rule.filter.tags.is_empty() {
5198            None
5199        } else {
5200            Some(
5201                rule.filter
5202                    .tags
5203                    .iter()
5204                    .map(|(k, v)| Tag {
5205                        key: Some(k.clone()),
5206                        value: Some(v.clone()),
5207                    })
5208                    .collect(),
5209            )
5210        };
5211        Some(LifecycleRuleFilter {
5212            and: Some(LifecycleRuleAndOperator {
5213                object_size_greater_than: rule
5214                    .filter
5215                    .object_size_greater_than
5216                    .and_then(|n| i64::try_from(n).ok()),
5217                object_size_less_than: rule
5218                    .filter
5219                    .object_size_less_than
5220                    .and_then(|n| i64::try_from(n).ok()),
5221                prefix: rule.filter.prefix.clone(),
5222                tags,
5223            }),
5224            object_size_greater_than: None,
5225            object_size_less_than: None,
5226            prefix: None,
5227            tag: None,
5228        })
5229    };
5230    LifecycleRule {
5231        abort_incomplete_multipart_upload,
5232        expiration,
5233        filter,
5234        id: if rule.id.is_empty() {
5235            None
5236        } else {
5237            Some(rule.id.clone())
5238        },
5239        noncurrent_version_expiration,
5240        noncurrent_version_transitions: None,
5241        prefix: None,
5242        status: ExpirationStatus::from(rule.status.as_aws_str().to_owned()),
5243        transitions,
5244    }
5245}
5246
5247// (timestamp <-> chrono helpers `timestamp_to_chrono_utc` /
5248// `chrono_utc_to_timestamp` are defined earlier in this file for the
5249// tagging/notifications work; the lifecycle DTO converters reuse them.)
5250
5251// ---------------------------------------------------------------------------
5252// v0.5 #33: SigV4a (asymmetric ECDSA-P256) integration hook.
5253//
5254// Kept as a self-contained block at the bottom of the file so it doesn't
5255// touch the existing `S4Service` struct, `new()`, or any of the per-op
5256// handlers above. The hook is wired in by the binary at server-build time
5257// as a hyper middleware layer (see `main.rs`), NOT inside `S4Service`.
5258//
5259// Lifecycle:
5260//   1. `SigV4aGate::new(store)` is constructed once at boot from the
5261//      operator-supplied credential directory.
5262//   2. For each incoming request, `SigV4aGate::pre_route(&req,
5263//      &requested_region, &canonical_request_bytes)` is invoked BEFORE
5264//      the request hits the S3 framework. If the request claims SigV4a
5265//      and verifies, control returns to the framework. Otherwise a 403
5266//      `SignatureDoesNotMatch` is produced.
5267//   3. Plain SigV4 (HMAC-SHA256) requests pass through untouched.
5268// ---------------------------------------------------------------------------
5269
5270/// Gate that fronts the S3 service path with SigV4a verification (v0.5 #33).
5271///
5272/// Wraps a [`crate::sigv4a::SigV4aCredentialStore`] and exposes a single
5273/// `pre_route` entry point that returns `Ok(())` for both
5274/// "request is plain SigV4 — pass through" and "request is SigV4a and
5275/// verified", and an `Err(...)` containing a 403-equivalent diagnostic
5276/// otherwise. Cheap to clone (the inner store is `Arc`-backed).
5277#[derive(Debug, Clone)]
5278pub struct SigV4aGate {
5279    store: crate::sigv4a::SharedSigV4aCredentialStore,
5280}
5281
5282impl SigV4aGate {
5283    #[must_use]
5284    pub fn new(store: crate::sigv4a::SharedSigV4aCredentialStore) -> Self {
5285        Self { store }
5286    }
5287
5288    /// Inspect an incoming HTTP request. Behaviour:
5289    ///
5290    /// - Not SigV4a (no `X-Amz-Region-Set` and no SigV4a `Authorization`
5291    ///   prefix) → returns `Ok(())`; the framework's existing SigV4
5292    ///   path handles the request.
5293    /// - SigV4a + valid signature + region match → `Ok(())`.
5294    /// - SigV4a + unknown access-key-id → `Err` with `InvalidAccessKeyId`.
5295    /// - SigV4a + bad signature / region mismatch → `Err` with
5296    ///   `SignatureDoesNotMatch`.
5297    ///
5298    /// `canonical_request_bytes` is the SigV4a string-to-sign (or
5299    /// canonical-request bytes; the caller decides) that the framework
5300    /// has already produced for this request. Keeping it as a parameter
5301    /// instead of rebuilding it inside the hook avoids duplicating the
5302    /// canonicalisation logic.
5303    pub fn pre_route<B>(
5304        &self,
5305        req: &http::Request<B>,
5306        requested_region: &str,
5307        canonical_request_bytes: &[u8],
5308    ) -> Result<(), SigV4aGateError> {
5309        if !crate::sigv4a::detect(req) {
5310            return Ok(());
5311        }
5312        let auth_hdr = req
5313            .headers()
5314            .get(http::header::AUTHORIZATION)
5315            .and_then(|v| v.to_str().ok())
5316            .ok_or(SigV4aGateError::MissingAuthorization)?;
5317        let parsed = crate::sigv4a::parse_authorization_header(auth_hdr)
5318            .ok_or(SigV4aGateError::MalformedAuthorization)?;
5319        let region_set = req
5320            .headers()
5321            .get(crate::sigv4a::REGION_SET_HEADER)
5322            .and_then(|v| v.to_str().ok())
5323            .unwrap_or("*");
5324        let key = self
5325            .store
5326            .get(&parsed.access_key_id)
5327            .ok_or_else(|| SigV4aGateError::UnknownAccessKey(parsed.access_key_id.clone()))?;
5328        crate::sigv4a::verify(
5329            &crate::sigv4a::CanonicalRequest::new(canonical_request_bytes),
5330            &parsed.signature_der,
5331            key,
5332            region_set,
5333            requested_region,
5334        )
5335        .map_err(SigV4aGateError::Verify)?;
5336        Ok(())
5337    }
5338}
5339
5340/// Failure modes from [`SigV4aGate::pre_route`]. All variants map to
5341/// HTTP 403 with one of the two AWS-standard error codes
5342/// (`InvalidAccessKeyId` or `SignatureDoesNotMatch`).
5343#[derive(Debug, thiserror::Error)]
5344pub enum SigV4aGateError {
5345    #[error("missing Authorization header")]
5346    MissingAuthorization,
5347    #[error("malformed SigV4a Authorization header")]
5348    MalformedAuthorization,
5349    #[error("unknown SigV4a access-key-id: {0}")]
5350    UnknownAccessKey(String),
5351    #[error("SigV4a verification failed: {0}")]
5352    Verify(#[source] crate::sigv4a::SigV4aError),
5353}
5354
5355impl SigV4aGateError {
5356    /// AWS S3 error code that should accompany a 403 response.
5357    #[must_use]
5358    pub fn s3_error_code(&self) -> &'static str {
5359        match self {
5360            Self::UnknownAccessKey(_) => "InvalidAccessKeyId",
5361            _ => "SignatureDoesNotMatch",
5362        }
5363    }
5364}
5365
5366#[cfg(test)]
5367mod tests {
5368    use super::*;
5369
5370    #[test]
5371    fn manifest_roundtrip_via_metadata() {
5372        let original = ChunkManifest {
5373            codec: CodecKind::CpuZstd,
5374            original_size: 1234,
5375            compressed_size: 567,
5376            crc32c: 0xdead_beef,
5377        };
5378        let mut meta: Option<Metadata> = None;
5379        write_manifest(&mut meta, &original);
5380        let extracted = extract_manifest(&meta).expect("manifest must round-trip");
5381        assert_eq!(extracted.codec, original.codec);
5382        assert_eq!(extracted.original_size, original.original_size);
5383        assert_eq!(extracted.compressed_size, original.compressed_size);
5384        assert_eq!(extracted.crc32c, original.crc32c);
5385    }
5386
5387    #[test]
5388    fn missing_metadata_yields_none() {
5389        let meta: Option<Metadata> = None;
5390        assert!(extract_manifest(&meta).is_none());
5391    }
5392
5393    #[test]
5394    fn partial_metadata_yields_none() {
5395        let mut meta = Metadata::new();
5396        meta.insert(META_CODEC.into(), "cpu-zstd".into());
5397        let opt = Some(meta);
5398        assert!(extract_manifest(&opt).is_none());
5399    }
5400
5401    #[test]
5402    fn parse_copy_source_range_basic() {
5403        let r = parse_copy_source_range("bytes=10-20").unwrap();
5404        match r {
5405            s3s::dto::Range::Int { first, last } => {
5406                assert_eq!(first, 10);
5407                assert_eq!(last, Some(20));
5408            }
5409            _ => panic!("expected Int range"),
5410        }
5411    }
5412
5413    #[test]
5414    fn parse_copy_source_range_rejects_inverted() {
5415        let err = parse_copy_source_range("bytes=20-10").unwrap_err();
5416        assert!(err.contains("last < first"));
5417    }
5418
5419    #[test]
5420    fn parse_copy_source_range_rejects_missing_prefix() {
5421        let err = parse_copy_source_range("10-20").unwrap_err();
5422        assert!(err.contains("must start with 'bytes='"));
5423    }
5424
5425    #[test]
5426    fn parse_copy_source_range_rejects_open_ended() {
5427        // S3 upload_part_copy spec requires N-M (closed); suffix and
5428        // open-ended forms are not allowed for this header.
5429        assert!(parse_copy_source_range("bytes=10-").is_err());
5430        assert!(parse_copy_source_range("bytes=-10").is_err());
5431    }
5432
5433    // v0.7 #49: safe_object_uri must round-trip every legal S3 key
5434    // (which includes spaces, slashes, control chars, raw UTF-8) into
5435    // a parseable `http::Uri` instead of panicking like the previous
5436    // `format!(...).parse().unwrap()` call sites did.
5437
5438    #[test]
5439    fn safe_object_uri_basic_ascii() {
5440        let uri = safe_object_uri("bucket", "key").expect("ascii must be safe");
5441        assert_eq!(uri.path(), "/bucket/key");
5442    }
5443
5444    #[test]
5445    fn safe_object_uri_encodes_spaces() {
5446        let uri = safe_object_uri("bucket", "key with spaces").expect("must encode spaces");
5447        // RFC 3986 path-segment encoding turns ' ' into %20.
5448        assert!(
5449            uri.path().contains("%20"),
5450            "expected percent-encoded space, got {}",
5451            uri.path()
5452        );
5453        assert!(uri.path().starts_with("/bucket/"));
5454    }
5455
5456    #[test]
5457    fn safe_object_uri_preserves_slashes() {
5458        // S3 keys legally contain '/' as a logical path separator —
5459        // the helper must NOT escape it (otherwise the synthetic URI
5460        // changes the perceived hierarchy).
5461        let uri =
5462            safe_object_uri("bucket", "key/with/slashes").expect("slashes must round-trip");
5463        assert_eq!(uri.path(), "/bucket/key/with/slashes");
5464    }
5465
5466    #[test]
5467    fn safe_object_uri_handles_newline_without_panic() {
5468        // Newlines are control chars in URIs; whether the result is
5469        // Ok (encoded as %0A) or Err (parse rejects), the helper
5470        // MUST NOT panic. Either outcome is acceptable.
5471        let _ = safe_object_uri("bucket", "key\n");
5472    }
5473
5474    #[test]
5475    fn safe_object_uri_handles_null_byte_without_panic() {
5476        let _ = safe_object_uri("bucket", "key\0bad");
5477    }
5478
5479    #[test]
5480    fn safe_object_uri_handles_unicode_without_panic() {
5481        // RTL override, BOM, plain Japanese — none should panic.
5482        let _ = safe_object_uri("bucket", "rtl\u{202E}override");
5483        let _ = safe_object_uri("bucket", "\u{FEFF}bom-key");
5484        let _ = safe_object_uri("bucket", "日本語キー");
5485    }
5486
5487    #[test]
5488    fn safe_object_uri_no_panic_for_every_byte() {
5489        // Exhaustive byte coverage: 0x00..=0xFF as a 1-byte key.
5490        // None of these may panic. (0x80..=0xFF are not valid UTF-8
5491        // by themselves; we go through `String::from_utf8_lossy` so
5492        // the helper sees a real `&str` regardless of the raw byte.)
5493        for b in 0u8..=255 {
5494            let s = String::from_utf8_lossy(&[b]).into_owned();
5495            let _ = safe_object_uri("bucket", &s);
5496        }
5497    }
5498}