Skip to main content

s4_server/
service.rs

1//! `s3s::S3` 実装 — `s3s_aws::Proxy` への delegation を default にしつつ、
2//! `put_object` / `get_object` 経路で `s4_codec::CodecRegistry` を呼ぶ。
3//!
4//! ## カバー範囲 (Phase 1 月 2)
5//!
6//! - 圧縮 hook あり: `put_object`, `get_object`
7//! - 純 delegation (圧縮なし): `head_bucket`, `list_buckets`, `create_bucket`, `delete_bucket`,
8//!   `head_object`, `delete_object`, `delete_objects`, `copy_object`, `list_objects`,
9//!   `list_objects_v2`, `create_multipart_upload`, `upload_part`,
10//!   `complete_multipart_upload`, `abort_multipart_upload`, `list_multipart_uploads`,
11//!   `list_parts`
12//! - 未対応 (デフォルトで NotImplemented): その他 80+ ops (Tagging / ACL / Lifecycle 等は Phase 2)
13//!
14//! ## アーキテクチャ
15//!
16//! - `S4Service<B>` は backend (B: S3) と `Arc<CodecRegistry>` と `Arc<dyn CodecDispatcher>`
17//!   を保持する。`CodecRegistry` 経由で複数 codec を抱えられるので、ひとつの S4 インスタンスが
18//!   複数 codec で書かれた object を透過的に GET できる
19//! - PUT: dispatcher が body の先頭 sample から codec を選び、registry で compress、
20//!   manifest を S3 metadata に書いて backend に forward
21//! - GET: backend から取得 → metadata から manifest を復元 → registry.decompress で
22//!   manifest 指定の codec で解凍 → 元の bytes を return
23//!
24//! ## 既知の制限事項
25//!
26//! - **Multipart Upload は per-part 圧縮が未実装**: 現状は upload_part を素通し。
27//!   Phase 1 月 2 後半で per-part compress + complete_multipart_upload で manifest 集約。
28//! - **PUT body は memory に collect**: max_body_bytes 上限あり (default 5 GiB = S3 単発 PUT 上限)。
29//!   Streaming-aware 圧縮は Phase 2。
30
31use std::sync::Arc;
32
33use base64::Engine as _;
34use bytes::BytesMut;
35use s3s::dto::*;
36use s3s::{S3, S3Error, S3ErrorCode, S3Request, S3Response, S3Result};
37use s4_codec::index::{FrameIndex, build_index_from_body, decode_index, encode_index, sidecar_key};
38use s4_codec::multipart::{
39    FRAME_HEADER_BYTES, FrameHeader, FrameIter, S3_MULTIPART_MIN_PART_BYTES, pad_to_minimum,
40    write_frame,
41};
42use s4_codec::{ChunkManifest, CodecDispatcher, CodecKind, CodecRegistry};
43use std::time::Instant;
44use tracing::{debug, info};
45
46use crate::blob::{
47    bytes_to_blob, chain_sample_with_rest, collect_blob, collect_with_sample, peek_sample,
48};
49use crate::streaming::{
50    cpu_zstd_decompress_stream, pick_chunk_size, streaming_compress_to_frames,
51    supports_streaming_compress, supports_streaming_decompress,
52};
53
54/// PUT body の先頭 sampling で渡す最大 byte 数。
55const SAMPLE_BYTES: usize = 4096;
56
57/// v0.4 #20: captured at the start of a handler, before the request is
58/// consumed by the backend call, so the matching `record_access` at
59/// end-of-request can fill in the structured access log entry.
60struct AccessLogPreamble {
61    remote_ip: Option<String>,
62    requester: Option<String>,
63    request_uri: String,
64    user_agent: Option<String>,
65}
66
67pub struct S4Service<B: S3> {
68    /// Wrapped in `Arc` so the v0.6 #40 cross-bucket replication
69    /// dispatcher can clone it into a detached `tokio::spawn` task
70    /// (Arc::clone is cheap; backend trait methods take `&self` so no
71    /// other handler is affected by the indirection).
72    backend: Arc<B>,
73    registry: Arc<CodecRegistry>,
74    dispatcher: Arc<dyn CodecDispatcher>,
75    max_body_bytes: usize,
76    policy: Option<crate::policy::SharedPolicy>,
77    /// v0.3 #13: surfaced as the `aws:SecureTransport` Condition key. Set
78    /// to `true` when the listener is wrapped in TLS (or ACME), so policies
79    /// gating "deny if not over TLS" can do their job. Defaults to `false`
80    /// (HTTP); set via [`S4Service::with_secure_transport`] at boot.
81    secure_transport: bool,
82    /// v0.4 #19: optional per-(principal, bucket) token-bucket limiter.
83    rate_limits: Option<crate::rate_limit::SharedRateLimits>,
84    /// v0.4 #20: optional S3-style access log emitter.
85    access_log: Option<crate::access_log::SharedAccessLog>,
86    /// v0.4 #21 / v0.5 #29: optional server-side encryption keyring
87    /// (AES-256-GCM). When set, every PUT body gets wrapped in S4E2
88    /// (with the keyring's active key id) after the compress + framing
89    /// steps; every GET that sniffs as S4E1/S4E2 is decrypted before
90    /// frame parsing. A `with_sse_key(...)` call wraps the supplied
91    /// key in a 1-slot keyring so single-key (v0.4) operators get the
92    /// same behaviour they had before, just on the v2 frame.
93    sse_keyring: Option<crate::sse::SharedSseKeyring>,
94    /// v0.5 #34: optional first-class versioning state machine. When
95    /// `Some(...)`, S4-server itself owns the per-bucket versioning
96    /// state + per-(bucket, key) version chain; PUT / GET / DELETE /
97    /// list_object_versions / get_bucket_versioning /
98    /// put_bucket_versioning handlers consult the manager instead of
99    /// passing through. When `None` (default), the legacy
100    /// backend-passthrough behaviour applies so existing v0.4
101    /// deployments are unaffected until they explicitly call
102    /// `with_versioning(...)`.
103    versioning: Option<Arc<crate::versioning::VersioningManager>>,
104    /// v0.5 #28: optional SSE-KMS envelope-encryption backend. When
105    /// `Some(...)`, PUTs carrying `x-amz-server-side-encryption: aws:kms`
106    /// generate a fresh DEK via the backend, encrypt the body with it
107    /// (S4E4 frame), and persist only the wrapped DEK. GETs sniffing as
108    /// S4E4 unwrap the DEK through the same backend before decrypt.
109    /// `kms_default_key_id` is used when the request omits an explicit
110    /// `x-amz-server-side-encryption-aws-kms-key-id` (mirrors AWS S3
111    /// bucket-default behaviour).
112    kms: Option<Arc<dyn crate::kms::KmsBackend>>,
113    kms_default_key_id: Option<String>,
114    /// v0.5 #30: optional Object Lock (WORM) enforcement layer. When
115    /// `Some(...)`, `delete_object` and overwrite-style `put_object`
116    /// consult the manager and refuse the operation with HTTP 403
117    /// `AccessDenied` while the object is locked (Compliance until
118    /// expiry, Governance unless the bypass header is set, or any time
119    /// a legal hold is on). PUT also auto-applies the bucket-default
120    /// retention to brand-new objects when configured. When `None`
121    /// (default), the legacy backend-passthrough behaviour applies, so
122    /// existing v0.4 deployments are unaffected until they explicitly
123    /// call `with_object_lock(...)`.
124    object_lock: Option<Arc<crate::object_lock::ObjectLockManager>>,
125    /// v0.6 #38: optional first-class CORS bucket configuration manager.
126    /// When `Some(...)`, S4-server itself owns per-bucket CORS rules and
127    /// `put_bucket_cors` / `get_bucket_cors` / `delete_bucket_cors`
128    /// consult the manager instead of passing through to the backend.
129    /// `handle_preflight` (public method on `S4Service`) routes OPTIONS-
130    /// style preflight matching through the same store; the actual HTTP
131    /// OPTIONS routing wire-up at the listener level is a follow-up
132    /// (s3s framework does not surface OPTIONS as a typed handler).
133    cors: Option<Arc<crate::cors::CorsManager>>,
134    /// v0.6 #36: optional first-class S3 Inventory manager. When
135    /// `Some(...)`, S4-server itself owns per-(bucket, id) inventory
136    /// configurations and `put_bucket_inventory_configuration` /
137    /// `get_bucket_inventory_configuration` /
138    /// `list_bucket_inventory_configurations` /
139    /// `delete_bucket_inventory_configuration` consult the manager
140    /// instead of passing through to the backend. The actual periodic
141    /// CSV emission is driven by a tokio task in `main.rs` that calls
142    /// `InventoryManager::run_once_for_test` on a fixed cadence; the
143    /// service handlers below only deal with config-level CRUD.
144    inventory: Option<Arc<crate::inventory::InventoryManager>>,
145    /// v0.6 #35: optional first-class S3 bucket-notification manager.
146    /// When `Some(...)`, S4-server itself owns per-bucket notification
147    /// configurations and `put_bucket_notification_configuration` /
148    /// `get_bucket_notification_configuration` consult the manager
149    /// instead of passing through to the backend. Successful PUT /
150    /// DELETE handlers fire matching destinations on a detached tokio
151    /// task (best-effort; see `crate::notifications::dispatch_event`).
152    notifications: Option<Arc<crate::notifications::NotificationManager>>,
153    /// v0.6 #37: optional first-class S3 Lifecycle configuration
154    /// manager. When `Some(...)`, S4-server itself owns per-bucket
155    /// lifecycle rules and `put_bucket_lifecycle_configuration` /
156    /// `get_bucket_lifecycle_configuration` /
157    /// `delete_bucket_lifecycle` consult the manager instead of
158    /// passing through to the backend. The actual background scanner
159    /// (list_objects_v2 -> evaluate -> delete / metadata-rewrite per
160    /// rule) is a v0.7+ follow-up; the test path
161    /// `S4Service::run_lifecycle_once_for_test` exercises the
162    /// evaluator end-to-end so this v0.6 #37 wiring is enough to ship
163    /// the configuration-management half without putting a
164    /// half-wired bucket-walk in front of users.
165    lifecycle: Option<Arc<crate::lifecycle::LifecycleManager>>,
166    /// v0.6 #39: optional first-class object + bucket Tagging manager.
167    /// When `Some(...)`, S4-server itself owns per-(bucket, key) and
168    /// per-bucket tag state — `PutObjectTagging` /
169    /// `GetObjectTagging` / `DeleteObjectTagging` /
170    /// `PutBucketTagging` / `GetBucketTagging` /
171    /// `DeleteBucketTagging` route through the manager (replacing the
172    /// previous backend-passthrough behaviour). `put_object` also
173    /// pre-parses the `x-amz-tagging` header / `Tagging` input field
174    /// so the IAM policy evaluator can gate on
175    /// `s3:RequestObjectTag/<key>` and `s3:ExistingObjectTag/<key>`.
176    /// On a successful PUT the parsed tags are persisted; on a
177    /// successful DELETE the matching tag entry is dropped.
178    tagging: Option<Arc<crate::tagging::TagManager>>,
179    /// v0.6 #40: optional first-class cross-bucket replication manager.
180    /// When `Some(...)`, S4-server itself owns per-bucket replication
181    /// rules; `PutBucketReplication` / `GetBucketReplication` /
182    /// `DeleteBucketReplication` route through the manager (replacing
183    /// the previous backend-passthrough behaviour). On every successful
184    /// `put_object` the manager's rule list is consulted; the
185    /// highest-priority matching enabled rule wins, the per-key status
186    /// is recorded as `Pending`, and the source body and metadata are
187    /// handed to a detached tokio task that PUTs to the destination
188    /// bucket through the same backend. The replica is stamped with
189    /// `x-amz-replication-status: REPLICA` in its metadata; the
190    /// source-side status is updated to `Completed` on success or
191    /// `Failed` after the 3-attempt retry budget is exhausted (drop
192    /// counter bumps in either-side case so dashboards see the loss).
193    /// `head_object` / `get_object` echo the recorded status back as
194    /// `x-amz-replication-status` so consumers can poll progress.
195    /// Limited to single-instance (same `S4Service`) replication; true
196    /// cross-region (multi-instance) is a v0.7+ follow-up.
197    replication: Option<Arc<crate::replication::ReplicationManager>>,
198    /// v0.6 #42: optional MFA-Delete enforcement layer. When `Some(...)`,
199    /// every DELETE / DELETE-version / delete-marker / `PutBucketVersioning`
200    /// request against a bucket whose MFA-Delete state is `Enabled`
201    /// must carry `x-amz-mfa: <serial> <code>` (RFC 6238 6-digit TOTP);
202    /// missing or invalid tokens return HTTP 403 `AccessDenied`. When
203    /// `None` (default), the gate is a no-op so existing v0.4 / v0.5
204    /// deployments are unaffected until they explicitly call
205    /// `with_mfa_delete(...)`.
206    mfa_delete: Option<Arc<crate::mfa::MfaDeleteManager>>,
207    /// v0.5 #32: when `true`, every PUT must carry an SSE indicator
208    /// (`x-amz-server-side-encryption`, the SSE-C customer-key headers,
209    /// or be matched against a configured server-managed keyring/KMS).
210    /// Set by `--compliance-mode strict` after the boot-time
211    /// prerequisite check passes.
212    compliance_strict: bool,
213}
214
215impl<B: S3> S4Service<B> {
216    /// AWS S3 単発 PUT の API 上限 (5 GiB)
217    pub const DEFAULT_MAX_BODY_BYTES: usize = 5 * 1024 * 1024 * 1024;
218
219    pub fn new(
220        backend: B,
221        registry: Arc<CodecRegistry>,
222        dispatcher: Arc<dyn CodecDispatcher>,
223    ) -> Self {
224        Self {
225            backend: Arc::new(backend),
226            registry,
227            dispatcher,
228            max_body_bytes: Self::DEFAULT_MAX_BODY_BYTES,
229            policy: None,
230            secure_transport: false,
231            rate_limits: None,
232            access_log: None,
233            sse_keyring: None,
234            versioning: None,
235            kms: None,
236            kms_default_key_id: None,
237            object_lock: None,
238            cors: None,
239            inventory: None,
240            notifications: None,
241            lifecycle: None,
242            tagging: None,
243            replication: None,
244            mfa_delete: None,
245            compliance_strict: false,
246        }
247    }
248
249    /// v0.6 #39: attach the in-memory object + bucket Tagging manager.
250    /// Once set, `Put/Get/Delete` `Object/Bucket Tagging` route
251    /// through the manager (instead of forwarding to the backend),
252    /// and `put_object`'s `x-amz-tagging` parse path becomes the
253    /// source of `s3:RequestObjectTag/<key>` for the IAM policy
254    /// evaluator. The manager itself is shared via `Arc`.
255    #[must_use]
256    pub fn with_tagging(mut self, mgr: Arc<crate::tagging::TagManager>) -> Self {
257        self.tagging = Some(mgr);
258        self
259    }
260
261    /// v0.6 #39: borrow the attached tagging manager (test /
262    /// introspection — the snapshotter in `main.rs`, when wired,
263    /// will keep its own `Arc` clone).
264    #[must_use]
265    pub fn tag_manager(&self) -> Option<&Arc<crate::tagging::TagManager>> {
266        self.tagging.as_ref()
267    }
268
269    /// v0.6 #36: attach the in-memory S3 Inventory manager. Once set,
270    /// `put_bucket_inventory_configuration` /
271    /// `get_bucket_inventory_configuration` /
272    /// `list_bucket_inventory_configurations` /
273    /// `delete_bucket_inventory_configuration` route through the
274    /// manager. The actual periodic CSV / manifest emission is
275    /// orchestrated by a tokio task started in `main.rs`; the manager
276    /// itself is shared between the handler and the scheduler via
277    /// `Arc`.
278    #[must_use]
279    pub fn with_inventory(mut self, mgr: Arc<crate::inventory::InventoryManager>) -> Self {
280        self.inventory = Some(mgr);
281        self
282    }
283
284    /// v0.6 #36: borrow the attached inventory manager (test /
285    /// introspection — the background scheduler in `main.rs` keeps its
286    /// own `Arc` clone, so this accessor is for the test path that
287    /// invokes `run_once_for_test` directly).
288    #[must_use]
289    pub fn inventory_manager(&self) -> Option<&Arc<crate::inventory::InventoryManager>> {
290        self.inventory.as_ref()
291    }
292
293    /// v0.6 #37: attach the in-memory S3 Lifecycle configuration
294    /// manager. Once set, `put_bucket_lifecycle_configuration` /
295    /// `get_bucket_lifecycle_configuration` / `delete_bucket_lifecycle`
296    /// route through the manager (replacing the previous backend-
297    /// passthrough behaviour). The actual periodic scanner that walks
298    /// the source bucket and invokes Expiration / Transition /
299    /// NoncurrentExpiration actions is a v0.7+ follow-up — see
300    /// [`Self::run_lifecycle_once_for_test`] for the in-memory test
301    /// path that exercises the evaluator end-to-end.
302    #[must_use]
303    pub fn with_lifecycle(mut self, mgr: Arc<crate::lifecycle::LifecycleManager>) -> Self {
304        self.lifecycle = Some(mgr);
305        self
306    }
307
308    /// v0.6 #37: borrow the attached lifecycle manager (test /
309    /// introspection — the background scheduler in `main.rs` keeps its
310    /// own `Arc` clone, so this accessor is for the test path that
311    /// invokes the evaluator directly).
312    #[must_use]
313    pub fn lifecycle_manager(&self) -> Option<&Arc<crate::lifecycle::LifecycleManager>> {
314        self.lifecycle.as_ref()
315    }
316
317    /// v0.6 #37: synchronous test entry that runs the lifecycle evaluator
318    /// against a caller-provided list of `(key, age, size, tags)` tuples
319    /// and returns the `(key, action)` pairs that should fire. The actual
320    /// backend invocation (S3.delete_object / metadata rewrite) is left
321    /// to the caller — the unit + E2E tests use this to verify the
322    /// evaluator without spawning the (deferred) background scanner.
323    /// Returns an empty `Vec` when no lifecycle manager is attached or
324    /// no rule matches.
325    #[must_use]
326    pub fn run_lifecycle_once_for_test(
327        &self,
328        bucket: &str,
329        objects: &[crate::lifecycle::EvaluateBatchEntry],
330    ) -> Vec<(String, crate::lifecycle::LifecycleAction)> {
331        let Some(mgr) = self.lifecycle.as_ref() else {
332            return Vec::new();
333        };
334        crate::lifecycle::evaluate_batch(mgr, bucket, objects)
335    }
336
337    /// v0.6 #35: attach the in-memory bucket-notification manager. Once
338    /// set, `put_bucket_notification_configuration` /
339    /// `get_bucket_notification_configuration` route through the manager
340    /// (replacing the previous backend-passthrough behaviour); successful
341    /// `put_object` / `delete_object` calls fire matching destinations
342    /// on a detached tokio task via
343    /// `crate::notifications::dispatch_event` (best-effort, fire-and-
344    /// forget — failures bump the manager's `dropped_total` counter and
345    /// log at warn but do NOT fail the originating S3 request).
346    #[must_use]
347    pub fn with_notifications(
348        mut self,
349        mgr: Arc<crate::notifications::NotificationManager>,
350    ) -> Self {
351        self.notifications = Some(mgr);
352        self
353    }
354
355    /// v0.6 #35: borrow the attached notifications manager (test /
356    /// introspection — used by the metrics layer to read
357    /// `dropped_total`).
358    #[must_use]
359    pub fn notifications_manager(
360        &self,
361    ) -> Option<&Arc<crate::notifications::NotificationManager>> {
362        self.notifications.as_ref()
363    }
364
365    /// v0.6 #35: internal helper used by the DELETE handlers to fire a
366    /// matching notification on a detached tokio task. No-op when no
367    /// manager is attached or no rule on the bucket matches the given
368    /// (event, key) tuple.
369    fn fire_delete_notification(
370        &self,
371        bucket: &str,
372        key: &str,
373        event: crate::notifications::EventType,
374        version_id: Option<String>,
375    ) {
376        let Some(mgr) = self.notifications.as_ref() else {
377            return;
378        };
379        let dests = mgr.match_destinations(bucket, &event, key);
380        if dests.is_empty() {
381            return;
382        }
383        tokio::spawn(crate::notifications::dispatch_event(
384            Arc::clone(mgr),
385            bucket.to_owned(),
386            key.to_owned(),
387            event,
388            None,
389            None,
390            version_id,
391            format!("S4-{}", uuid::Uuid::new_v4()),
392        ));
393    }
394
395    /// v0.6 #40: attach the in-memory cross-bucket replication manager.
396    /// Once set, `put_bucket_replication` / `get_bucket_replication` /
397    /// `delete_bucket_replication` route through the manager (replacing
398    /// the previous backend-passthrough behaviour); a successful
399    /// `put_object` whose key matches an enabled rule fires a detached
400    /// tokio task that PUTs the same body + metadata to the rule's
401    /// destination bucket, stamping the replica with
402    /// `x-amz-replication-status: REPLICA`. Failures after the retry
403    /// budget bump the manager's `dropped_total` counter and are
404    /// surfaced in the `s4_replication_dropped_total` Prometheus
405    /// counter; successes bump `s4_replication_replicated_total`.
406    #[must_use]
407    pub fn with_replication(
408        mut self,
409        mgr: Arc<crate::replication::ReplicationManager>,
410    ) -> Self {
411        self.replication = Some(mgr);
412        self
413    }
414
415    /// v0.6 #40: borrow the attached replication manager (test /
416    /// introspection — used by the metrics layer to read
417    /// `dropped_total`).
418    #[must_use]
419    pub fn replication_manager(
420        &self,
421    ) -> Option<&Arc<crate::replication::ReplicationManager>> {
422        self.replication.as_ref()
423    }
424
425    /// v0.6 #40: internal helper used by the PUT handlers to fire a
426    /// detached cross-bucket replication task. No-op when no manager
427    /// is attached, the source backend PUT failed, or no rule on the
428    /// source bucket matches the (key, tags) tuple. The `body` is the
429    /// post-compression / post-encryption `Bytes` that was sent to
430    /// the source backend (refcount-cloned), and `metadata` is the
431    /// metadata map that already includes the manifest /
432    /// `s4-encrypted` markers — the replica decodes through the same
433    /// path. The destination PUT runs through `Arc<B>::put_object`.
434    fn spawn_replication_if_matched(
435        &self,
436        source_bucket: &str,
437        source_key: &str,
438        request_tags: &Option<crate::tagging::TagSet>,
439        body: &bytes::Bytes,
440        metadata: &Option<std::collections::HashMap<String, String>>,
441        backend_ok: bool,
442    ) where
443        B: Send + Sync + 'static,
444    {
445        if !backend_ok {
446            return;
447        }
448        let Some(mgr) = self.replication.as_ref() else {
449            return;
450        };
451        // Pull the request's tags into the (k, v) shape the matcher
452        // expects. The tagging manager would have the canonical
453        // post-PUT view but at this point in the pipeline it's
454        // already been written above; for the rule-match decision
455        // the request's tags are sufficient (= the tags this PUT
456        // applies, S3 PutObject is full-replace on tags).
457        let object_tags: Vec<(String, String)> = request_tags
458            .as_ref()
459            .map(|ts| ts.iter().cloned().collect())
460            .unwrap_or_default();
461        let Some(rule) = mgr.match_rule(source_bucket, source_key, &object_tags) else {
462            return;
463        };
464        // Eagerly mark the source key as Pending so a HEAD between
465        // the source PUT returning and the spawned task completing
466        // surfaces the in-flight state.
467        mgr.record_status(
468            source_bucket,
469            source_key,
470            crate::replication::ReplicationStatus::Pending,
471        );
472        let mgr_cl = Arc::clone(mgr);
473        let backend = Arc::clone(&self.backend);
474        let body_cl = body.clone();
475        let metadata_cl = metadata.clone();
476        let source_bucket_cl = source_bucket.to_owned();
477        let source_key_cl = source_key.to_owned();
478        tokio::spawn(async move {
479            let do_put = move |dest_bucket: String,
480                               dest_key: String,
481                               dest_body: bytes::Bytes,
482                               dest_meta: Option<std::collections::HashMap<String, String>>| {
483                let backend = Arc::clone(&backend);
484                async move {
485                    let req = S3Request {
486                        input: PutObjectInput {
487                            bucket: dest_bucket,
488                            key: dest_key,
489                            body: Some(bytes_to_blob(dest_body)),
490                            metadata: dest_meta,
491                            ..Default::default()
492                        },
493                        method: http::Method::PUT,
494                        uri: "/".parse().unwrap(),
495                        headers: http::HeaderMap::new(),
496                        extensions: http::Extensions::new(),
497                        credentials: None,
498                        region: None,
499                        service: None,
500                        trailing_headers: None,
501                    };
502                    backend
503                        .put_object(req)
504                        .await
505                        .map(|_| ())
506                        .map_err(|e| format!("destination put_object: {e}"))
507                }
508            };
509            crate::replication::replicate_object(
510                rule,
511                source_bucket_cl,
512                source_key_cl,
513                body_cl,
514                metadata_cl,
515                do_put,
516                mgr_cl,
517            )
518            .await;
519        });
520    }
521
522    /// v0.6 #42: attach the in-memory MFA-Delete enforcement manager.
523    /// Once set, every DELETE / DELETE-version / delete-marker /
524    /// `PutBucketVersioning` request against a bucket whose MFA-Delete
525    /// state is `Enabled` requires a valid `x-amz-mfa: <serial> <code>`
526    /// header (RFC 6238 6-digit TOTP); the gate is a no-op for buckets
527    /// where MFA-Delete is `Disabled` (S3 default).
528    #[must_use]
529    pub fn with_mfa_delete(mut self, mgr: Arc<crate::mfa::MfaDeleteManager>) -> Self {
530        self.mfa_delete = Some(mgr);
531        self
532    }
533
534    /// v0.6 #42: borrow the attached MFA-Delete manager (test /
535    /// introspection — used by the snapshot path in `main.rs` to call
536    /// `to_json` for restart-recoverable state).
537    #[must_use]
538    pub fn mfa_delete_manager(&self) -> Option<&Arc<crate::mfa::MfaDeleteManager>> {
539        self.mfa_delete.as_ref()
540    }
541
542    /// v0.6 #38: attach the in-memory CORS configuration manager. Once
543    /// set, `put_bucket_cors` / `get_bucket_cors` / `delete_bucket_cors`
544    /// route through the manager instead of forwarding to the backend,
545    /// and [`Self::handle_preflight`] becomes useful for the (future)
546    /// listener-side OPTIONS interceptor.
547    #[must_use]
548    pub fn with_cors(mut self, mgr: Arc<crate::cors::CorsManager>) -> Self {
549        self.cors = Some(mgr);
550        self
551    }
552
553    /// v0.6 #38: Borrow the attached CORS manager (test / introspection).
554    #[must_use]
555    pub fn cors_manager(&self) -> Option<&Arc<crate::cors::CorsManager>> {
556        self.cors.as_ref()
557    }
558
559    /// v0.6 #38: evaluate a CORS preflight request against the bucket's
560    /// configured rules and, if a rule matches, return the headers that
561    /// the (future) listener-side OPTIONS interceptor must put on the
562    /// 200 response: `Access-Control-Allow-Origin`, `Access-Control-
563    /// Allow-Methods`, `Access-Control-Allow-Headers`, optionally
564    /// `Access-Control-Max-Age` and `Access-Control-Expose-Headers`.
565    ///
566    /// Returns `None` when no manager is attached, no config is
567    /// registered for the bucket, or no rule matches the (origin,
568    /// method, headers) triple. The caller is responsible for turning
569    /// `None` into the appropriate 403 response.
570    ///
571    /// **Note:** the OPTIONS routing itself (i.e. wiring this method
572    /// into the hyper-util listener path) is a follow-up — s3s does not
573    /// surface OPTIONS as a typed S3 handler, so this method is
574    /// currently call-able only from inside other handlers and tests.
575    #[must_use]
576    pub fn handle_preflight(
577        &self,
578        bucket: &str,
579        origin: &str,
580        method: &str,
581        request_headers: &[String],
582    ) -> Option<std::collections::HashMap<String, String>> {
583        let mgr = self.cors.as_ref()?;
584        let rule = mgr.match_preflight(bucket, origin, method, request_headers)?;
585        let mut h = std::collections::HashMap::new();
586        // Echo the matched origin back. If the rule used "*" we still
587        // echo "*" (S3 spec — the spec does not require us to echo the
588        // *requesting* origin when the wildcard matched).
589        let allow_origin = if rule.allowed_origins.iter().any(|o| o == "*") {
590            "*".to_string()
591        } else {
592            origin.to_string()
593        };
594        h.insert("Access-Control-Allow-Origin".to_string(), allow_origin);
595        h.insert(
596            "Access-Control-Allow-Methods".to_string(),
597            rule.allowed_methods.join(", "),
598        );
599        if !rule.allowed_headers.is_empty() {
600            // For the Allow-Headers response, echo back the rule's
601            // pattern list verbatim (S3 echoes the configured list,
602            // including "*" if present). Browsers honour exact-match
603            // rules.
604            h.insert(
605                "Access-Control-Allow-Headers".to_string(),
606                rule.allowed_headers.join(", "),
607            );
608        }
609        if let Some(secs) = rule.max_age_seconds {
610            h.insert("Access-Control-Max-Age".to_string(), secs.to_string());
611        }
612        if !rule.expose_headers.is_empty() {
613            h.insert(
614                "Access-Control-Expose-Headers".to_string(),
615                rule.expose_headers.join(", "),
616            );
617        }
618        Some(h)
619    }
620
621    /// v0.5 #32: enable strict compliance mode. Every PUT must carry an
622    /// SSE indicator (server-side encryption header or SSE-C customer
623    /// key); requests without one are rejected with 400 InvalidRequest.
624    /// Boot-time prerequisite checking lives in the binary
625    /// (`validate_compliance_mode`) so this flag is purely the runtime
626    /// switch.
627    #[must_use]
628    pub fn with_compliance_strict(mut self, on: bool) -> Self {
629        self.compliance_strict = on;
630        self
631    }
632
633    /// v0.5 #30: attach the in-memory Object Lock (WORM) enforcement
634    /// manager. Once set, `delete_object` and overwrite-path
635    /// `put_object` refuse operations on locked keys with HTTP 403
636    /// `AccessDenied`; new PUTs to a bucket with a default retention
637    /// policy auto-create per-object lock state.
638    #[must_use]
639    pub fn with_object_lock(
640        mut self,
641        mgr: Arc<crate::object_lock::ObjectLockManager>,
642    ) -> Self {
643        self.object_lock = Some(mgr);
644        self
645    }
646
647    /// v0.5 #28: attach an SSE-KMS backend. `default_key_id` is used
648    /// when a PUT requests SSE-KMS without naming a specific KMS key
649    /// (operators set this to mirror AWS S3's bucket-default key).
650    #[must_use]
651    pub fn with_kms_backend(
652        mut self,
653        kms: Arc<dyn crate::kms::KmsBackend>,
654        default_key_id: Option<String>,
655    ) -> Self {
656        self.kms = Some(kms);
657        self.kms_default_key_id = default_key_id;
658        self
659    }
660
661    /// v0.5 #34: attach the first-class versioning state machine. Once
662    /// set, this `S4Service` owns the per-bucket versioning state +
663    /// per-(bucket, key) version chain; `put_object` / `get_object` /
664    /// `delete_object` / `list_object_versions` /
665    /// `get_bucket_versioning` / `put_bucket_versioning` consult the
666    /// manager instead of passing through to the backend. The backend
667    /// is still used as the byte store: Suspended / Unversioned buckets
668    /// keep using `<key>` directly (legacy), Enabled buckets redirect
669    /// each version's bytes to a shadow key
670    /// (`<key>.__s4ver__/<version-id>`) so older versions survive newer
671    /// PUTs to the same logical key.
672    #[must_use]
673    pub fn with_versioning(mut self, mgr: Arc<crate::versioning::VersioningManager>) -> Self {
674        self.versioning = Some(mgr);
675        self
676    }
677
678    /// v0.4 #21 (kept for back-compat): attach a single SSE-S4 key.
679    /// Internally wraps it in a 1-slot keyring with id=1 active, so
680    /// new objects ride the v0.5 S4E2 frame while previously-written
681    /// S4E1 bytes (this same key) still decrypt via the keyring's S4E1
682    /// fallback path. Operators wanting true rotation should call
683    /// [`Self::with_sse_keyring`] instead.
684    #[must_use]
685    pub fn with_sse_key(mut self, key: crate::sse::SharedSseKey) -> Self {
686        let keyring = crate::sse::SseKeyring::new(1, key);
687        self.sse_keyring = Some(std::sync::Arc::new(keyring));
688        self
689    }
690
691    /// v0.5 #29: attach a multi-key SSE-S4 keyring. PUT encrypts under
692    /// the active key (S4E2 frame stamped with that key's id); GET
693    /// dispatches on the body's magic — S4E1 falls back to trying every
694    /// key in the ring (active first) so v0.4 objects survive a
695    /// migration; S4E2 looks up the explicit key_id from the header.
696    #[must_use]
697    pub fn with_sse_keyring(mut self, keyring: crate::sse::SharedSseKeyring) -> Self {
698        self.sse_keyring = Some(keyring);
699        self
700    }
701
702    /// v0.4 #20: attach an S3-style access-log emitter. Each completed
703    /// PUT / GET / DELETE / List handler emits one entry into the
704    /// emitter's buffer; a background flusher (started separately, see
705    /// [`crate::access_log::AccessLog::spawn_flusher`]) writes hourly
706    /// rotated `.log` files into the configured directory.
707    #[must_use]
708    pub fn with_access_log(mut self, log: crate::access_log::SharedAccessLog) -> Self {
709        self.access_log = Some(log);
710        self
711    }
712
713    /// Capture the per-request access-log preamble before the request is
714    /// consumed by the backend call. Returns `None` if no access logger
715    /// is configured (cheap early-out so the handler doesn't pay the
716    /// header-clone cost when access logging is off).
717    fn access_log_preamble<I>(&self, req: &S3Request<I>) -> Option<AccessLogPreamble> {
718        self.access_log.as_ref()?;
719        Some(AccessLogPreamble {
720            remote_ip: req
721                .headers
722                .get("x-forwarded-for")
723                .and_then(|v| v.to_str().ok())
724                .and_then(|raw| raw.split(',').next())
725                .map(|s| s.trim().to_owned()),
726            requester: Self::principal_of(req).map(str::to_owned),
727            request_uri: format!("{} {}", req.method, req.uri.path()),
728            user_agent: req
729                .headers
730                .get("user-agent")
731                .and_then(|v| v.to_str().ok())
732                .map(str::to_owned),
733        })
734    }
735
736    /// Internal — called by handlers at end-of-request with a captured
737    /// preamble. Best-effort: swallows the await fast (clones Arc +
738    /// pushes), no error propagation back to the request path.
739    #[allow(clippy::too_many_arguments)]
740    async fn record_access(
741        &self,
742        preamble: Option<AccessLogPreamble>,
743        operation: &'static str,
744        bucket: &str,
745        key: Option<&str>,
746        http_status: u16,
747        bytes_sent: u64,
748        object_size: u64,
749        total_time_ms: u64,
750        error_code: Option<&str>,
751    ) {
752        let (Some(log), Some(p)) = (self.access_log.as_ref(), preamble) else {
753            return;
754        };
755        log.record(crate::access_log::AccessLogEntry {
756            time: std::time::SystemTime::now(),
757            bucket: bucket.to_owned(),
758            remote_ip: p.remote_ip,
759            requester: p.requester,
760            operation,
761            key: key.map(str::to_owned),
762            request_uri: p.request_uri,
763            http_status,
764            error_code: error_code.map(str::to_owned),
765            bytes_sent,
766            object_size,
767            total_time_ms,
768            user_agent: p.user_agent,
769        })
770        .await;
771    }
772
773    /// v0.4 #19: attach a per-(principal, bucket) token-bucket rate limiter.
774    /// When set, every PUT / GET / DELETE / List / Copy / multipart op is
775    /// throttle-checked before the policy gate; throttled requests return
776    /// `S3ErrorCode::SlowDown` (HTTP 503) and bump
777    /// `s4_rate_limit_throttled_total{principal,bucket}`.
778    #[must_use]
779    pub fn with_rate_limits(mut self, rl: crate::rate_limit::SharedRateLimits) -> Self {
780        self.rate_limits = Some(rl);
781        self
782    }
783
784    /// Helper used by request handlers to apply the rate limit. Returns
785    /// `Ok(())` when allowed (or no rate limiter is configured), or a
786    /// `SlowDown` S3Error otherwise.
787    fn enforce_rate_limit<I>(&self, req: &S3Request<I>, bucket: &str) -> S3Result<()> {
788        let Some(rl) = self.rate_limits.as_ref() else {
789            return Ok(());
790        };
791        let principal_id = Self::principal_of(req);
792        if !rl.check(principal_id, bucket) {
793            crate::metrics::record_rate_limit_throttle(principal_id.unwrap_or("-"), bucket);
794            return Err(S3Error::with_message(
795                S3ErrorCode::SlowDown,
796                format!("rate-limited: bucket={bucket}"),
797            ));
798        }
799        Ok(())
800    }
801
802    /// Tell the policy evaluator that the listener is reached over TLS
803    /// (or ACME). When `true`, the `aws:SecureTransport` Condition key
804    /// resolves to `true`. Defaults to `false`.
805    #[must_use]
806    pub fn with_secure_transport(mut self, on: bool) -> Self {
807        self.secure_transport = on;
808        self
809    }
810
811    #[must_use]
812    pub fn with_max_body_bytes(mut self, n: usize) -> Self {
813        self.max_body_bytes = n;
814        self
815    }
816
817    /// Attach an optional bucket policy (v0.2 #7). When `Some(...)`, every
818    /// PUT / GET / DELETE / List handler runs `policy.evaluate(...)` before
819    /// delegating to the backend; failures return `S3ErrorCode::AccessDenied`.
820    /// When `None` (the default), no policy enforcement happens.
821    #[must_use]
822    pub fn with_policy(mut self, policy: crate::policy::SharedPolicy) -> Self {
823        self.policy = Some(policy);
824        self
825    }
826
827    /// Pull the SigV4 access key id off the request's credentials, if any.
828    /// Used as the `principal_id` for policy evaluation.
829    fn principal_of<I>(req: &S3Request<I>) -> Option<&str> {
830        req.credentials.as_ref().map(|c| c.access_key.as_str())
831    }
832
833    /// v0.3 #13: build the per-request policy context from the incoming
834    /// `S3Request`. Pulls `aws:UserAgent` from the User-Agent header,
835    /// `aws:SourceIp` from the standard `X-Forwarded-For` header (most
836    /// production deployments are behind an LB / reverse proxy that sets
837    /// this), `aws:CurrentTime` from the system clock, and
838    /// `aws:SecureTransport` from the per-listener TLS flag.
839    fn request_context<I>(&self, req: &S3Request<I>) -> crate::policy::RequestContext {
840        let user_agent = req
841            .headers
842            .get("user-agent")
843            .and_then(|v| v.to_str().ok())
844            .map(str::to_owned);
845        // X-Forwarded-For is `client, proxy1, proxy2`; the leftmost entry
846        // is the original client. Trim and parse leniently.
847        let source_ip = req
848            .headers
849            .get("x-forwarded-for")
850            .and_then(|v| v.to_str().ok())
851            .and_then(|raw| raw.split(',').next())
852            .and_then(|s| s.trim().parse().ok());
853        crate::policy::RequestContext {
854            source_ip,
855            user_agent,
856            request_time: Some(std::time::SystemTime::now()),
857            secure_transport: self.secure_transport,
858            existing_object_tags: None,
859            request_object_tags: None,
860            extra: Default::default(),
861        }
862    }
863
864    /// Helper used by request handlers to enforce the optional policy.
865    /// Returns `Ok(())` when allowed (or no policy is configured), or an
866    /// `AccessDenied` S3Error otherwise. Bumps the policy denial Prometheus
867    /// counter on deny.
868    fn enforce_policy<I>(
869        &self,
870        req: &S3Request<I>,
871        action: &'static str,
872        bucket: &str,
873        key: Option<&str>,
874    ) -> S3Result<()> {
875        self.enforce_policy_with_extra(req, action, bucket, key, None, None)
876    }
877
878    /// v0.6 #39: variant of [`Self::enforce_policy`] that lets the
879    /// caller plumb tag context (existing-on-object + on-request) into
880    /// the policy evaluator. Both arguments default to `None`, in
881    /// which case the resulting `RequestContext` is identical to
882    /// [`Self::enforce_policy`]'s — so for handlers that don't deal
883    /// with tags this is a transparent no-op.
884    fn enforce_policy_with_extra<I>(
885        &self,
886        req: &S3Request<I>,
887        action: &'static str,
888        bucket: &str,
889        key: Option<&str>,
890        request_tags: Option<&crate::tagging::TagSet>,
891        existing_tags: Option<&crate::tagging::TagSet>,
892    ) -> S3Result<()> {
893        let Some(policy) = self.policy.as_ref() else {
894            return Ok(());
895        };
896        let principal_id = Self::principal_of(req);
897        let mut ctx = self.request_context(req);
898        if let Some(t) = request_tags {
899            ctx.request_object_tags = Some(t.clone());
900        }
901        if let Some(t) = existing_tags {
902            ctx.existing_object_tags = Some(t.clone());
903        }
904        let decision = policy.evaluate_with(action, bucket, key, principal_id, &ctx);
905        if decision.allow {
906            Ok(())
907        } else {
908            crate::metrics::record_policy_denial(action, bucket);
909            tracing::info!(
910                action,
911                bucket,
912                key = ?key,
913                principal = ?principal_id,
914                source_ip = ?ctx.source_ip,
915                user_agent = ?ctx.user_agent,
916                secure_transport = ctx.secure_transport,
917                matched_sid = ?decision.matched_sid,
918                effect = ?decision.matched_effect,
919                "S4 policy denied request"
920            );
921            Err(S3Error::with_message(
922                S3ErrorCode::AccessDenied,
923                format!("denied by S4 policy: {action} on bucket={bucket}"),
924            ))
925        }
926    }
927
928    /// テスト用: backend を取り戻す (test helper、production では使わない).
929    /// v0.6 #40 で `backend` が `Arc<B>` 化したので `Arc::try_unwrap` で
930    /// 1-clone の場合のみ返す。共有されている (= replication dispatcher が
931    /// 同じ Arc を持っていて未完了) 場合は `Err` を返さず panic させる
932    /// (test 用途専用 helper の caller 契約を維持)。
933    pub fn into_backend(self) -> B {
934        Arc::try_unwrap(self.backend)
935            .unwrap_or_else(|_| panic!("into_backend: backend Arc still shared (replication dispatcher in flight?)"))
936    }
937
938    /// 必要 frame だけを backend に Range GET し、frame parse + decompress + slice
939    /// した結果を返す sidecar fast path。Range request の **帯域節約版**。
940    async fn partial_range_get(
941        &self,
942        req: &S3Request<GetObjectInput>,
943        plan: s4_codec::index::RangePlan,
944        client_start: u64,
945        client_end_exclusive: u64,
946        total_original: u64,
947        get_start: Instant,
948    ) -> S3Result<S3Response<GetObjectOutput>> {
949        // 必要 byte 範囲だけを backend に partial GET
950        let backend_range = s3s::dto::Range::Int {
951            first: plan.byte_start,
952            last: Some(plan.byte_end_exclusive - 1),
953        };
954        let backend_input = GetObjectInput {
955            bucket: req.input.bucket.clone(),
956            key: req.input.key.clone(),
957            range: Some(backend_range),
958            ..Default::default()
959        };
960        let backend_req = S3Request {
961            input: backend_input,
962            method: req.method.clone(),
963            uri: req.uri.clone(),
964            headers: req.headers.clone(),
965            extensions: http::Extensions::new(),
966            credentials: req.credentials.clone(),
967            region: req.region.clone(),
968            service: req.service.clone(),
969            trailing_headers: None,
970        };
971        let mut backend_resp = self.backend.get_object(backend_req).await?;
972        let blob = backend_resp.output.body.take().ok_or_else(|| {
973            S3Error::with_message(
974                S3ErrorCode::InternalError,
975                "backend partial GET returned empty body",
976            )
977        })?;
978        let bytes = collect_blob(blob, self.max_body_bytes)
979            .await
980            .map_err(internal("collect partial body"))?;
981
982        // frame parse + decompress
983        let mut combined = BytesMut::new();
984        for frame in FrameIter::new(bytes) {
985            let (header, payload) = frame.map_err(|e| {
986                S3Error::with_message(
987                    S3ErrorCode::InternalError,
988                    format!("partial-range frame parse: {e}"),
989                )
990            })?;
991            let chunk_manifest = ChunkManifest {
992                codec: header.codec,
993                original_size: header.original_size,
994                compressed_size: header.compressed_size,
995                crc32c: header.crc32c,
996            };
997            let decompressed = self
998                .registry
999                .decompress(payload, &chunk_manifest)
1000                .await
1001                .map_err(internal("partial-range decompress"))?;
1002            combined.extend_from_slice(&decompressed);
1003        }
1004        let combined = combined.freeze();
1005        let sliced = combined
1006            .slice(plan.slice_start_in_combined as usize..plan.slice_end_in_combined as usize);
1007
1008        // response 組立て
1009        let returned_size = sliced.len() as u64;
1010        backend_resp.output.content_length = Some(returned_size as i64);
1011        backend_resp.output.content_range = Some(format!(
1012            "bytes {client_start}-{}/{total_original}",
1013            client_end_exclusive - 1
1014        ));
1015        backend_resp.output.checksum_crc32 = None;
1016        backend_resp.output.checksum_crc32c = None;
1017        backend_resp.output.checksum_crc64nvme = None;
1018        backend_resp.output.checksum_sha1 = None;
1019        backend_resp.output.checksum_sha256 = None;
1020        backend_resp.output.e_tag = None;
1021        backend_resp.output.body = Some(bytes_to_blob(sliced));
1022        backend_resp.status = Some(http::StatusCode::PARTIAL_CONTENT);
1023
1024        let elapsed = get_start.elapsed();
1025        crate::metrics::record_get(
1026            "partial",
1027            plan.byte_end_exclusive - plan.byte_start,
1028            returned_size,
1029            elapsed.as_secs_f64(),
1030            true,
1031        );
1032        info!(
1033            op = "get_object",
1034            bucket = %req.input.bucket,
1035            key = %req.input.key,
1036            bytes_in = plan.byte_end_exclusive - plan.byte_start,
1037            bytes_out = returned_size,
1038            total_object_size = total_original,
1039            range = true,
1040            path = "sidecar-partial",
1041            latency_ms = elapsed.as_millis() as u64,
1042            "S4 partial Range GET via sidecar index"
1043        );
1044        Ok(backend_resp)
1045    }
1046
1047    /// `<key>.s4index` sidecar object を backend に書く。失敗しても本体 PUT は
1048    /// 成功扱いにしたいので、err は warn ログのみ (Range GET の partial path が
1049    /// 使えなくなるが、full read fallback で意味的には正しい結果を返す)。
1050    async fn write_sidecar(&self, bucket: &str, key: &str, index: &FrameIndex) {
1051        let bytes = encode_index(index);
1052        let len = bytes.len() as i64;
1053        let put_input = PutObjectInput {
1054            bucket: bucket.into(),
1055            key: sidecar_key(key),
1056            body: Some(bytes_to_blob(bytes)),
1057            content_length: Some(len),
1058            content_type: Some("application/x-s4-index".into()),
1059            ..Default::default()
1060        };
1061        let put_req = S3Request {
1062            input: put_input,
1063            method: http::Method::PUT,
1064            uri: format!("/{bucket}/{}", sidecar_key(key)).parse().unwrap(),
1065            headers: http::HeaderMap::new(),
1066            extensions: http::Extensions::new(),
1067            credentials: None,
1068            region: None,
1069            service: None,
1070            trailing_headers: None,
1071        };
1072        if let Err(e) = self.backend.put_object(put_req).await {
1073            tracing::warn!(
1074                bucket,
1075                key,
1076                "S4 write_sidecar failed (Range GET will fall back to full read): {e}"
1077            );
1078        }
1079    }
1080
1081    /// `<key>.s4index` sidecar を backend から読み出す。なければ None。
1082    async fn read_sidecar(&self, bucket: &str, key: &str) -> Option<FrameIndex> {
1083        let get_input = GetObjectInput {
1084            bucket: bucket.into(),
1085            key: sidecar_key(key),
1086            ..Default::default()
1087        };
1088        let get_req = S3Request {
1089            input: get_input,
1090            method: http::Method::GET,
1091            uri: format!("/{bucket}/{}", sidecar_key(key)).parse().unwrap(),
1092            headers: http::HeaderMap::new(),
1093            extensions: http::Extensions::new(),
1094            credentials: None,
1095            region: None,
1096            service: None,
1097            trailing_headers: None,
1098        };
1099        let resp = self.backend.get_object(get_req).await.ok()?;
1100        let blob = resp.output.body?;
1101        let bytes = collect_blob(blob, 64 * 1024 * 1024).await.ok()?;
1102        decode_index(bytes).ok()
1103    }
1104
1105    /// Multipart object (frame 列) を解凍 → 元 bytes を再構築。
1106    ///
1107    /// **per-frame codec dispatch**: 各 frame header に codec_id が入っているので、
1108    /// frame ごとに registry が違う codec を呼ぶことができる。同一 object 内で
1109    /// 異なる codec が混在していても透過的に解凍可能 (parquet 風 mixed columns 等)。
1110    async fn decompress_multipart(&self, bytes: bytes::Bytes) -> S3Result<bytes::Bytes> {
1111        let mut out = BytesMut::new();
1112        for frame in FrameIter::new(bytes) {
1113            let (header, payload) = frame.map_err(|e| {
1114                S3Error::with_message(
1115                    S3ErrorCode::InternalError,
1116                    format!("multipart frame parse: {e}"),
1117                )
1118            })?;
1119            let chunk_manifest = ChunkManifest {
1120                codec: header.codec,
1121                original_size: header.original_size,
1122                compressed_size: header.compressed_size,
1123                crc32c: header.crc32c,
1124            };
1125            let decompressed = self
1126                .registry
1127                .decompress(payload, &chunk_manifest)
1128                .await
1129                .map_err(internal("multipart frame decompress"))?;
1130            out.extend_from_slice(&decompressed);
1131        }
1132        Ok(out.freeze())
1133    }
1134}
1135
1136/// Parse a CopySourceRange header value (`bytes=N-M`, `bytes=N-`, `bytes=-N`)
1137/// into the s3s::dto::Range used by the GetObject path. The S3 spec only
1138/// allows `bytes=N-M` for upload_part_copy (no suffix or open-ended), so
1139/// reject the other variants for parity with AWS.
1140fn parse_copy_source_range(s: &str) -> Result<s3s::dto::Range, String> {
1141    let rest = s
1142        .strip_prefix("bytes=")
1143        .ok_or_else(|| format!("CopySourceRange must start with 'bytes=', got {s:?}"))?;
1144    let (a, b) = rest
1145        .split_once('-')
1146        .ok_or_else(|| format!("CopySourceRange must be 'bytes=N-M', got {s:?}"))?;
1147    let first: u64 = a
1148        .parse()
1149        .map_err(|_| format!("CopySourceRange first byte not a number: {a:?}"))?;
1150    let last: u64 = b
1151        .parse()
1152        .map_err(|_| format!("CopySourceRange last byte not a number: {b:?}"))?;
1153    if last < first {
1154        return Err(format!("CopySourceRange last < first: {s:?}"));
1155    }
1156    Ok(s3s::dto::Range::Int {
1157        first,
1158        last: Some(last),
1159    })
1160}
1161
1162/// v0.5 #34: synthesize the backend storage key for a given
1163/// (logical key, version-id) pair on an Enabled-versioning bucket.
1164///
1165/// Uses the `__s4ver__/` infix because:
1166/// - it's not a substring of `.s4index` / `.s4ver` natural keys (no false-positive
1167///   listing filter collisions)
1168/// - directory-style separator keeps S3 console "browse by prefix" UX intact
1169///   (versions roll up under one virtual folder per object)
1170/// - human-readable on debug logs / `aws s3 ls`
1171///
1172/// `list_objects` / `list_objects_v2` / `list_object_versions` MUST filter
1173/// keys containing `.__s4ver__/` from results so customers don't see internal
1174/// shadow objects.
1175pub fn versioned_shadow_key(key: &str, version_id: &str) -> String {
1176    format!("{key}.__s4ver__/{version_id}")
1177}
1178
1179/// Test for the marker substring used by [`versioned_shadow_key`]. Cheap str
1180/// scan; both list_objects filter and the GET passthrough check use this.
1181fn is_versioning_shadow_key(key: &str) -> bool {
1182    key.contains(".__s4ver__/")
1183}
1184
1185/// v0.6 #42: wall-clock seconds since the UNIX epoch — fed to
1186/// `mfa::check_mfa` so the TOTP verifier can match the client's
1187/// authenticator app's view of "now". Falls back to `0` on the
1188/// (impossible-in-practice) clock-before-1970 path so the verifier
1189/// rejects rather than panicking.
1190fn current_unix_secs() -> u64 {
1191    std::time::SystemTime::now()
1192        .duration_since(std::time::UNIX_EPOCH)
1193        .map(|d| d.as_secs())
1194        .unwrap_or(0)
1195}
1196
1197/// v0.6 #42: translate an `MfaError` into the matching S3 wire error.
1198///
1199/// - `Missing` / `SerialMismatch` / `InvalidCode` → `403 AccessDenied`
1200///   (S3 spec for MFA Delete: every gating failure surfaces as
1201///   `AccessDenied`, not a separate `MFA*` code).
1202/// - `Malformed` → `400 InvalidRequest` (the request itself is
1203///   syntactically broken, not a permission issue).
1204fn mfa_error_to_s3(e: crate::mfa::MfaError) -> S3Error {
1205    match e {
1206        crate::mfa::MfaError::Missing => S3Error::with_message(
1207            S3ErrorCode::AccessDenied,
1208            "MFA token required for this operation",
1209        ),
1210        crate::mfa::MfaError::Malformed => S3Error::with_message(
1211            S3ErrorCode::InvalidRequest,
1212            "malformed x-amz-mfa header",
1213        ),
1214        crate::mfa::MfaError::SerialMismatch => S3Error::with_message(
1215            S3ErrorCode::AccessDenied,
1216            "MFA serial does not match configured device",
1217        ),
1218        crate::mfa::MfaError::InvalidCode => S3Error::with_message(
1219            S3ErrorCode::AccessDenied,
1220            "invalid MFA code",
1221        ),
1222    }
1223}
1224
1225fn is_multipart_object(metadata: &Option<Metadata>) -> bool {
1226    metadata
1227        .as_ref()
1228        .and_then(|m| m.get(META_MULTIPART))
1229        .map(|v| v == "true")
1230        .unwrap_or(false)
1231}
1232
1233const META_CODEC: &str = "s4-codec";
1234const META_ORIGINAL_SIZE: &str = "s4-original-size";
1235const META_COMPRESSED_SIZE: &str = "s4-compressed-size";
1236const META_CRC32C: &str = "s4-crc32c";
1237/// Multipart upload で per-part frame format を使ったオブジェクトであることを示す。
1238/// GET 時にこの flag を見て frame parser を起動する。
1239const META_MULTIPART: &str = "s4-multipart";
1240/// v0.2 #4: single-PUT でも S4F2 framed format で書かれていることを示す。
1241/// 旧 v0.1 single-PUT は raw 圧縮 bytes (この flag なし)。GET 時にこの flag を
1242/// 見て framed 経路 (= multipart と同じ FrameIter parse) に流す。
1243const META_FRAMED: &str = "s4-framed";
1244
1245fn is_framed_v2_object(metadata: &Option<Metadata>) -> bool {
1246    metadata
1247        .as_ref()
1248        .and_then(|m| m.get(META_FRAMED))
1249        .map(|v| v == "true")
1250        .unwrap_or(false)
1251}
1252
1253/// v0.4 #21: detect SSE-S4 by the metadata flag we set on PUT.
1254fn is_sse_encrypted(metadata: &Option<Metadata>) -> bool {
1255    metadata
1256        .as_ref()
1257        .and_then(|m| m.get("s4-encrypted"))
1258        .map(|v| v == "aes-256-gcm")
1259        .unwrap_or(false)
1260}
1261
1262/// v0.5 #27: pull the three SSE-C headers off an input struct. The S3
1263/// contract is "all three or none" — partial sets are a 400.
1264///
1265/// Returns `Ok(None)` when no SSE-C headers were sent (server-managed or
1266/// no encryption), `Ok(Some(material))` on validated client key, and
1267/// `Err` for malformed or partial inputs.
1268fn extract_sse_c_material(
1269    algorithm: &Option<String>,
1270    key: &Option<String>,
1271    md5: &Option<String>,
1272) -> S3Result<Option<crate::sse::CustomerKeyMaterial>> {
1273    match (algorithm, key, md5) {
1274        (None, None, None) => Ok(None),
1275        (Some(a), Some(k), Some(m)) => crate::sse::parse_customer_key_headers(a, k, m)
1276            .map(Some)
1277            .map_err(sse_c_error_to_s3),
1278        _ => Err(S3Error::with_message(
1279            S3ErrorCode::InvalidRequest,
1280            "SSE-C requires all three of: x-amz-server-side-encryption-customer-{algorithm,key,key-MD5}",
1281        )),
1282    }
1283}
1284
1285/// v0.5 #28: detect SSE-KMS request — `x-amz-server-side-encryption: aws:kms`.
1286/// Returns the key-id to wrap under, falling back to the gateway default.
1287fn extract_kms_key_id(
1288    sse: &Option<ServerSideEncryption>,
1289    sse_kms_key_id: &Option<String>,
1290    gateway_default: Option<&str>,
1291) -> Option<String> {
1292    let asks_for_kms = sse
1293        .as_ref()
1294        .map(|s| s.as_str() == ServerSideEncryption::AWS_KMS)
1295        .unwrap_or(false);
1296    if !asks_for_kms {
1297        return None;
1298    }
1299    sse_kms_key_id
1300        .clone()
1301        .or_else(|| gateway_default.map(str::to_owned))
1302}
1303
1304/// v0.5 #28: map kms module errors to AWS-shaped S3 error codes.
1305/// `KeyNotFound` is operator misconfig (400); `BackendUnavailable` is a
1306/// transient KMS outage (503). Other variants are 500 InternalError.
1307fn kms_error_to_s3(e: crate::kms::KmsError) -> S3Error {
1308    use crate::kms::KmsError as K;
1309    match e {
1310        K::KeyNotFound { key_id } => S3Error::with_message(
1311            S3ErrorCode::InvalidArgument,
1312            format!("KMS key not found: {key_id}"),
1313        ),
1314        K::BackendUnavailable { message } => S3Error::with_message(
1315            S3ErrorCode::ServiceUnavailable,
1316            format!("KMS backend unavailable: {message}"),
1317        ),
1318        other => S3Error::with_message(
1319            S3ErrorCode::InternalError,
1320            format!("KMS error: {other}"),
1321        ),
1322    }
1323}
1324
1325/// v0.5 #27: map sse module errors to AWS-shaped S3 error codes.
1326/// `WrongCustomerKey` → 403 AccessDenied (matches AWS behaviour);
1327/// `InvalidCustomerKey` / algorithm / required / unexpected → 400.
1328fn sse_c_error_to_s3(e: crate::sse::SseError) -> S3Error {
1329    use crate::sse::SseError as E;
1330    match e {
1331        E::WrongCustomerKey => S3Error::with_message(
1332            S3ErrorCode::AccessDenied,
1333            "SSE-C key does not match the key used at PUT time",
1334        ),
1335        E::InvalidCustomerKey { reason } => S3Error::with_message(
1336            S3ErrorCode::InvalidArgument,
1337            format!("SSE-C: {reason}"),
1338        ),
1339        E::CustomerKeyAlgorithmUnsupported { algo } => S3Error::with_message(
1340            S3ErrorCode::InvalidArgument,
1341            format!("SSE-C unsupported algorithm: {algo:?} (only AES256 is allowed)"),
1342        ),
1343        E::CustomerKeyRequired => S3Error::with_message(
1344            S3ErrorCode::InvalidRequest,
1345            "object is SSE-C encrypted; supply x-amz-server-side-encryption-customer-* headers",
1346        ),
1347        E::CustomerKeyUnexpected => S3Error::with_message(
1348            S3ErrorCode::InvalidRequest,
1349            "object is not SSE-C encrypted; do not send x-amz-server-side-encryption-customer-* headers",
1350        ),
1351        other => S3Error::with_message(S3ErrorCode::InternalError, format!("SSE error: {other}")),
1352    }
1353}
1354
1355fn extract_manifest(metadata: &Option<Metadata>) -> Option<ChunkManifest> {
1356    let m = metadata.as_ref()?;
1357    let codec = m
1358        .get(META_CODEC)
1359        .and_then(|s| s.parse::<CodecKind>().ok())?;
1360    let original_size = m.get(META_ORIGINAL_SIZE)?.parse().ok()?;
1361    let compressed_size = m.get(META_COMPRESSED_SIZE)?.parse().ok()?;
1362    let crc32c = m.get(META_CRC32C)?.parse().ok()?;
1363    Some(ChunkManifest {
1364        codec,
1365        original_size,
1366        compressed_size,
1367        crc32c,
1368    })
1369}
1370
1371fn write_manifest(metadata: &mut Option<Metadata>, manifest: &ChunkManifest) {
1372    let meta = metadata.get_or_insert_with(Default::default);
1373    meta.insert(META_CODEC.into(), manifest.codec.as_str().into());
1374    meta.insert(
1375        META_ORIGINAL_SIZE.into(),
1376        manifest.original_size.to_string(),
1377    );
1378    meta.insert(
1379        META_COMPRESSED_SIZE.into(),
1380        manifest.compressed_size.to_string(),
1381    );
1382    meta.insert(META_CRC32C.into(), manifest.crc32c.to_string());
1383}
1384
1385fn internal<E: std::fmt::Display>(prefix: &'static str) -> impl FnOnce(E) -> S3Error {
1386    move |e| S3Error::with_message(S3ErrorCode::InternalError, format!("{prefix}: {e}"))
1387}
1388
1389/// v0.6 #41: map a `select::SelectError` to the S3 error surface. AWS
1390/// uses a domain-specific `InvalidSqlExpression` code for parse / unsupported
1391/// errors, but s3s 0.13 doesn't expose that as a typed variant — we
1392/// fall back to the well-known `InvalidRequest` 400 with a descriptive
1393/// message that includes the original error context.
1394fn select_error_to_s3(e: crate::select::SelectError, fmt: &str) -> S3Error {
1395    use crate::select::SelectError;
1396    match e {
1397        SelectError::Parse(msg) => S3Error::with_message(
1398            S3ErrorCode::InvalidRequest,
1399            format!("SQL parse error: {msg}"),
1400        ),
1401        SelectError::UnsupportedFeature(msg) => S3Error::with_message(
1402            S3ErrorCode::InvalidRequest,
1403            format!("unsupported SQL feature: {msg}"),
1404        ),
1405        SelectError::RowEval(msg) => S3Error::with_message(
1406            S3ErrorCode::InvalidRequest,
1407            format!("SQL row evaluation error: {msg}"),
1408        ),
1409        SelectError::InputFormat(msg) => S3Error::with_message(
1410            S3ErrorCode::InvalidRequest,
1411            format!("{fmt} input format error: {msg}"),
1412        ),
1413    }
1414}
1415
1416/// v0.5 #30: parse the `x-amz-bypass-governance-retention` header into a
1417/// boolean flag. AWS S3 accepts `true` (case-insensitive); any other value
1418/// (including missing) is treated as `false`.
1419fn parse_bypass_governance_header(headers: &http::HeaderMap) -> bool {
1420    headers
1421        .get("x-amz-bypass-governance-retention")
1422        .and_then(|v| v.to_str().ok())
1423        .map(|s| s.eq_ignore_ascii_case("true"))
1424        .unwrap_or(false)
1425}
1426
1427/// Convert s3s `Timestamp` into a `chrono::DateTime<Utc>` by formatting it
1428/// as an RFC3339 string and re-parsing through `chrono`. The string format
1429/// avoids pulling the `time` crate (transitive dep of s3s, not declared by
1430/// s4-server) into our direct deps. Returns `None` if the format/parse fails
1431/// or the value is outside `chrono`'s supported range.
1432fn timestamp_to_chrono_utc(ts: &Timestamp) -> Option<chrono::DateTime<chrono::Utc>> {
1433    let mut buf = Vec::new();
1434    ts.format(s3s::dto::TimestampFormat::DateTime, &mut buf).ok()?;
1435    let s = std::str::from_utf8(&buf).ok()?;
1436    chrono::DateTime::parse_from_rfc3339(s)
1437        .ok()
1438        .map(|dt| dt.with_timezone(&chrono::Utc))
1439}
1440
1441/// Inverse of [`timestamp_to_chrono_utc`] — emit RFC3339 (the s3s
1442/// `DateTime` wire format) and re-parse via `Timestamp::parse`.
1443fn chrono_utc_to_timestamp(dt: chrono::DateTime<chrono::Utc>) -> Timestamp {
1444    // chrono's RFC3339 output format matches s3s' parser ("...Z" with
1445    // optional sub-second precision). Fall back to UNIX_EPOCH if anything
1446    // unexpected happens — we never produce malformed strings, so this
1447    // branch is unreachable in practice.
1448    let s = dt.to_rfc3339_opts(chrono::SecondsFormat::Millis, true);
1449    Timestamp::parse(s3s::dto::TimestampFormat::DateTime, &s).unwrap_or_default()
1450}
1451
1452/// v0.6 #39: convert our internal [`crate::tagging::TagSet`] into the
1453/// s3s `Vec<Tag>` wire shape used on `GetObject/BucketTaggingOutput`.
1454/// Both halves of every pair land in the `Some(_)` slot — AWS marks
1455/// the field optional but always populates it on response.
1456fn tagset_to_aws(set: &crate::tagging::TagSet) -> Vec<Tag> {
1457    set.iter()
1458        .map(|(k, v)| Tag {
1459            key: Some(k.clone()),
1460            value: Some(v.clone()),
1461        })
1462        .collect()
1463}
1464
1465/// v0.6 #39: inverse of [`tagset_to_aws`] for input handlers. Missing
1466/// keys / values become empty strings (mirrors AWS, which rejects
1467/// `<Key/>` with InvalidTag at the parser layer; downstream
1468/// `TagSet::validate` then enforces our size limits).
1469fn aws_to_tagset(tags: &[Tag]) -> Result<crate::tagging::TagSet, crate::tagging::TagError> {
1470    let pairs = tags
1471        .iter()
1472        .map(|t| {
1473            (
1474                t.key.clone().unwrap_or_default(),
1475                t.value.clone().unwrap_or_default(),
1476            )
1477        })
1478        .collect();
1479    crate::tagging::TagSet::from_pairs(pairs)
1480}
1481
1482/// `Range` request を decompressed object サイズ `total` に適用して `(start, end_exclusive)`
1483/// を返す。`Range::Int { first, last }` は `bytes=first-last` (last は inclusive)、
1484/// `Range::Suffix { length }` は末尾 `length` byte。S3 仕様に準拠。
1485pub fn resolve_range(range: &s3s::dto::Range, total: u64) -> Result<(u64, u64), String> {
1486    if total == 0 {
1487        return Err("cannot range-get zero-length object".into());
1488    }
1489    match range {
1490        s3s::dto::Range::Int { first, last } => {
1491            let start = *first;
1492            let end_inclusive = match last {
1493                Some(l) => (*l).min(total - 1),
1494                None => total - 1,
1495            };
1496            if start > end_inclusive || start >= total {
1497                return Err(format!(
1498                    "range bytes={start}-{:?} out of object size {total}",
1499                    last
1500                ));
1501            }
1502            Ok((start, end_inclusive + 1))
1503        }
1504        s3s::dto::Range::Suffix { length } => {
1505            let len = (*length).min(total);
1506            Ok((total - len, total))
1507        }
1508    }
1509}
1510
1511#[async_trait::async_trait]
1512impl<B: S3> S3 for S4Service<B> {
1513    // === 圧縮を挟む path (PUT) ===
1514    #[tracing::instrument(
1515        name = "s4.put_object",
1516        skip(self, req),
1517        fields(bucket = %req.input.bucket, key = %req.input.key, codec, bytes_in, bytes_out, latency_ms)
1518    )]
1519    async fn put_object(
1520        &self,
1521        mut req: S3Request<PutObjectInput>,
1522    ) -> S3Result<S3Response<PutObjectOutput>> {
1523        let put_start = Instant::now();
1524        let put_bucket = req.input.bucket.clone();
1525        let put_key = req.input.key.clone();
1526        let access_preamble = self.access_log_preamble(&req);
1527        self.enforce_rate_limit(&req, &put_bucket)?;
1528        // v0.6 #39: parse `x-amz-tagging` (URL-encoded query string) so
1529        // the IAM policy gate sees the request's tags via
1530        // `s3:RequestObjectTag/<key>`. `existing_object_tags` is also
1531        // resolved from the Tagging manager (when wired) so
1532        // `s3:ExistingObjectTag/<key>` works on overwrite.
1533        let request_tags: Option<crate::tagging::TagSet> = req
1534            .input
1535            .tagging
1536            .as_deref()
1537            .map(crate::tagging::parse_tagging_header)
1538            .transpose()
1539            .map_err(|e| S3Error::with_message(S3ErrorCode::InvalidArgument, e.to_string()))?;
1540        let existing_tags: Option<crate::tagging::TagSet> = self
1541            .tagging
1542            .as_ref()
1543            .and_then(|m| m.get_object_tags(&put_bucket, &put_key));
1544        self.enforce_policy_with_extra(
1545            &req,
1546            "s3:PutObject",
1547            &put_bucket,
1548            Some(&put_key),
1549            request_tags.as_ref(),
1550            existing_tags.as_ref(),
1551        )?;
1552        // v0.5 #30: an Object Lock-protected key cannot be overwritten by
1553        // a non-versioned PUT (Suspended / Unversioned bucket). Enabled
1554        // bucket PUTs are exempt because they materialise a fresh
1555        // version under a shadow key (`<key>.__s4ver__/<vid>`) — the
1556        // locked version's bytes are untouched. The check mirrors the
1557        // delete path (Compliance never bypassable, Governance via the
1558        // bypass header, legal hold never).
1559        if let Some(mgr) = self.object_lock.as_ref()
1560            && let Some(state) = mgr.get(&put_bucket, &put_key)
1561        {
1562            let bucket_versioned_enabled = self
1563                .versioning
1564                .as_ref()
1565                .map(|v| v.state(&put_bucket) == crate::versioning::VersioningState::Enabled)
1566                .unwrap_or(false);
1567            if !bucket_versioned_enabled {
1568                let bypass = parse_bypass_governance_header(&req.headers);
1569                let now = chrono::Utc::now();
1570                if !state.can_delete(now, bypass) {
1571                    crate::metrics::record_policy_denial("s3:PutObject", &put_bucket);
1572                    return Err(S3Error::with_message(
1573                        S3ErrorCode::AccessDenied,
1574                        "Access Denied because object protected by object lock",
1575                    ));
1576                }
1577            }
1578        }
1579        // v0.5 #30: per-PUT explicit retention / legal hold (S3
1580        // `x-amz-object-lock-mode`, `x-amz-object-lock-retain-until-date`,
1581        // `x-amz-object-lock-legal-hold`). Captured before the body
1582        // moves into the backend; persisted into the manager only on
1583        // backend success below.
1584        let explicit_lock_mode: Option<crate::object_lock::LockMode> = req
1585            .input
1586            .object_lock_mode
1587            .as_ref()
1588            .and_then(|m| crate::object_lock::LockMode::from_aws_str(m.as_str()));
1589        let explicit_retain_until: Option<chrono::DateTime<chrono::Utc>> = req
1590            .input
1591            .object_lock_retain_until_date
1592            .as_ref()
1593            .and_then(timestamp_to_chrono_utc);
1594        let explicit_legal_hold_on: Option<bool> = req
1595            .input
1596            .object_lock_legal_hold_status
1597            .as_ref()
1598            .map(|s| s.as_str().eq_ignore_ascii_case("ON"));
1599        if let Some(blob) = req.input.body.take() {
1600            // Sample 4 KiB から codec を決定。streaming-aware codec なら streaming
1601            // compress fast path、そうでなければ従来の collect-then-compress。
1602            let (sample, rest_stream) = peek_sample(blob, SAMPLE_BYTES)
1603                .await
1604                .map_err(internal("peek put sample"))?;
1605            let sample_len = sample.len().min(SAMPLE_BYTES);
1606            let kind = self.dispatcher.pick(&sample[..sample_len]).await;
1607
1608            // Passthrough buys nothing from S4F2 wrapping (no compression =
1609            // no per-chunk frame to skip past) and the +28-byte header
1610            // overhead breaks size-sensitive callers that expect a true
1611            // pass-through. So passthrough always uses the legacy raw-blob
1612            // path; only compressing codecs go through the framed path.
1613            let use_framed = supports_streaming_compress(kind) && kind != CodecKind::Passthrough;
1614            let (compressed, manifest, is_framed) = if use_framed {
1615                // streaming fast path: input は memory に collect しない
1616                let chained = chain_sample_with_rest(sample, rest_stream);
1617                debug!(
1618                    bucket = ?req.input.bucket,
1619                    key = ?req.input.key,
1620                    codec = kind.as_str(),
1621                    path = "streaming-framed",
1622                    "S4 put_object: compressing (streaming, S4F2 multi-frame)"
1623                );
1624                // v0.4 #16: pick the chunk size based on the request's
1625                // Content-Length when known, falling back to the 4 MiB
1626                // default for chunked transfers.
1627                let chunk_size = pick_chunk_size(req.input.content_length.map(|n| n as u64));
1628                let (body, manifest) = streaming_compress_to_frames(
1629                    chained,
1630                    Arc::clone(&self.registry),
1631                    kind,
1632                    chunk_size,
1633                )
1634                .await
1635                .map_err(internal("streaming framed compress"))?;
1636                (body, manifest, true)
1637            } else {
1638                // GPU codec 等で streaming-aware でないものは bytes-buffered path
1639                // (raw 圧縮 bytes、framed なし — back-compat 互換 path)
1640                let bytes = collect_with_sample(sample, rest_stream, self.max_body_bytes)
1641                    .await
1642                    .map_err(internal("collect put body (buffered path)"))?;
1643                debug!(
1644                    bucket = ?req.input.bucket,
1645                    key = ?req.input.key,
1646                    bytes = bytes.len(),
1647                    codec = kind.as_str(),
1648                    path = "buffered",
1649                    "S4 put_object: compressing (buffered, raw blob)"
1650                );
1651                let (body, m) = self
1652                    .registry
1653                    .compress(bytes, kind)
1654                    .await
1655                    .map_err(internal("registry compress"))?;
1656                (body, m, false)
1657            };
1658
1659            write_manifest(&mut req.input.metadata, &manifest);
1660            if is_framed {
1661                // v0.2 #4: framed body であることを GET 側に伝える meta flag。
1662                req.input
1663                    .metadata
1664                    .get_or_insert_with(Default::default)
1665                    .insert(META_FRAMED.into(), "true".into());
1666            }
1667            // 重要: content_length を圧縮後サイズで更新する。
1668            // これを忘れると下流 (aws-sdk-s3 → S3) が宣言サイズ分の bytes を
1669            // 待ち続けて RequestTimeout で失敗する (S3 仕様)。
1670            req.input.content_length = Some(compressed.len() as i64);
1671            // body を書き換えたので、客側が送ってきた original body 用の
1672            // checksum / MD5 ヘッダは無効化する (そのまま転送すると下流 S3 が
1673            // XAmzContentChecksumMismatch を返す)。S4 自身の整合性は
1674            // ChunkManifest.crc32c で担保している。
1675            req.input.checksum_algorithm = None;
1676            req.input.checksum_crc32 = None;
1677            req.input.checksum_crc32c = None;
1678            req.input.checksum_crc64nvme = None;
1679            req.input.checksum_sha1 = None;
1680            req.input.checksum_sha256 = None;
1681            req.input.content_md5 = None;
1682            let original_size = manifest.original_size;
1683            let compressed_size = manifest.compressed_size;
1684            let codec_label = manifest.codec.as_str();
1685            // framed body は GET 側で sidecar partial-fetch を効かせるため
1686            // build_index_from_body で sidecar を組み立てて backend に PUT する。
1687            let sidecar_index = if is_framed {
1688                s4_codec::index::build_index_from_body(&compressed).ok()
1689            } else {
1690                None
1691            };
1692            // v0.4 #21 / v0.5 #29 / v0.5 #27: encrypt-after-compress.
1693            // Precedence:
1694            //   - SSE-C headers present → per-request customer key (S4E3)
1695            //   - server-managed keyring configured → active key (S4E2)
1696            //   - neither → no encryption (raw compressed body)
1697            // The `s4-encrypted: aes-256-gcm` metadata flag is set in
1698            // both encrypted modes; the on-disk frame magic distinguishes
1699            // S4E1 / S4E2 / S4E3 so GET picks the right decrypt path.
1700            let sse_c_material = extract_sse_c_material(
1701                &req.input.sse_customer_algorithm,
1702                &req.input.sse_customer_key,
1703                &req.input.sse_customer_key_md5,
1704            )?;
1705            // v0.5 #28: SSE-KMS request? Resolves to None unless the
1706            // request asks for `aws:kms` AND a key id is available
1707            // (explicit header or gateway default). When set, we'll
1708            // generate a per-object DEK below.
1709            let kms_key_id = extract_kms_key_id(
1710                &req.input.server_side_encryption,
1711                &req.input.ssekms_key_id,
1712                self.kms_default_key_id.as_deref(),
1713            );
1714            // v0.5 #32: in compliance-strict mode, every PUT must
1715            // declare SSE — either client-supplied (SSE-C), KMS, or by
1716            // virtue of a server-side keyring being configured (which
1717            // applies SSE-S4 to every PUT automatically). Requests that
1718            // would otherwise land as plain compressed bytes are
1719            // rejected with 400 InvalidRequest.
1720            if self.compliance_strict
1721                && sse_c_material.is_none()
1722                && kms_key_id.is_none()
1723                && self.sse_keyring.is_none()
1724                && req
1725                    .input
1726                    .server_side_encryption
1727                    .as_ref()
1728                    .map(|s| s.as_str())
1729                    != Some(ServerSideEncryption::AES256)
1730            {
1731                return Err(S3Error::with_message(
1732                    S3ErrorCode::InvalidRequest,
1733                    "compliance-mode strict: PUT must include x-amz-server-side-encryption \
1734                     (AES256 or aws:kms) or x-amz-server-side-encryption-customer-* headers",
1735                ));
1736            }
1737            // SSE-C and SSE-KMS are mutually exclusive on a single PUT
1738            // (AWS S3 returns 400 InvalidArgument). SSE-C wins by spec.
1739            if sse_c_material.is_some() && kms_key_id.is_some() {
1740                return Err(S3Error::with_message(
1741                    S3ErrorCode::InvalidArgument,
1742                    "SSE-C and SSE-KMS cannot be used together on the same PUT",
1743                ));
1744            }
1745            // KMS path needs to call generate_dek().await before the
1746            // body_to_send branch; capture the result here.
1747            let kms_wrap = if let Some(ref key_id) = kms_key_id {
1748                let kms = self.kms.as_ref().ok_or_else(|| {
1749                    S3Error::with_message(
1750                        S3ErrorCode::InvalidRequest,
1751                        "SSE-KMS requested but no --kms-local-dir / --kms-aws-region is configured on this gateway",
1752                    )
1753                })?;
1754                let (dek, wrapped) = kms
1755                    .generate_dek(key_id)
1756                    .await
1757                    .map_err(kms_error_to_s3)?;
1758                if dek.len() != 32 {
1759                    return Err(S3Error::with_message(
1760                        S3ErrorCode::InternalError,
1761                        format!("KMS backend returned a DEK of {} bytes (expected 32)", dek.len()),
1762                    ));
1763                }
1764                let mut dek_arr = [0u8; 32];
1765                dek_arr.copy_from_slice(&dek);
1766                Some((dek_arr, wrapped))
1767            } else {
1768                None
1769            };
1770            let body_to_send = if let Some(ref m) = sse_c_material {
1771                req.input
1772                    .metadata
1773                    .get_or_insert_with(Default::default)
1774                    .insert("s4-encrypted".into(), "aes-256-gcm".into());
1775                crate::sse::encrypt_with_source(
1776                    &compressed,
1777                    crate::sse::SseSource::CustomerKey {
1778                        key: &m.key,
1779                        key_md5: &m.key_md5,
1780                    },
1781                )
1782            } else if let Some((ref dek, ref wrapped)) = kms_wrap {
1783                req.input
1784                    .metadata
1785                    .get_or_insert_with(Default::default)
1786                    .insert("s4-encrypted".into(), "aes-256-gcm".into());
1787                crate::sse::encrypt_with_source(
1788                    &compressed,
1789                    crate::sse::SseSource::Kms { dek, wrapped },
1790                )
1791            } else if let Some(keyring) = self.sse_keyring.as_ref() {
1792                req.input
1793                    .metadata
1794                    .get_or_insert_with(Default::default)
1795                    .insert("s4-encrypted".into(), "aes-256-gcm".into());
1796                crate::sse::encrypt_v2(&compressed, keyring)
1797            } else {
1798                compressed.clone()
1799            };
1800            // v0.6 #40: capture the about-to-be-sent body + metadata so
1801            // the replication dispatcher (run after the source PUT
1802            // succeeds) can hand the same backend bytes to the
1803            // destination bucket. `Bytes` clone is cheap (refcounted).
1804            let replication_body = body_to_send.clone();
1805            let replication_metadata = req.input.metadata.clone();
1806            req.input.body = Some(bytes_to_blob(body_to_send));
1807            // v0.5 #34: pre-allocate a version-id when the bucket is
1808            // Enabled, then redirect the backend storage key to the
1809            // shadow path so older versions survive newer PUTs.
1810            // Suspended / Unversioned buckets keep using the plain
1811            // `<key>` (S3 spec: Suspended overwrites the same backend
1812            // object). Pre-allocation (instead of recording after PUT)
1813            // ensures the shadow key + the response's
1814            // `x-amz-version-id` use the same vid.
1815            let pending_version: Option<crate::versioning::PutOutcome> = self
1816                .versioning
1817                .as_ref()
1818                .map(|mgr| mgr.state(&put_bucket))
1819                .map(|state| match state {
1820                    crate::versioning::VersioningState::Enabled => {
1821                        crate::versioning::PutOutcome {
1822                            version_id: crate::versioning::VersioningManager::new_version_id(),
1823                            versioned_response: true,
1824                        }
1825                    }
1826                    crate::versioning::VersioningState::Suspended
1827                    | crate::versioning::VersioningState::Unversioned => {
1828                        crate::versioning::PutOutcome {
1829                            version_id: crate::versioning::NULL_VERSION_ID.to_owned(),
1830                            versioned_response: false,
1831                        }
1832                    }
1833                });
1834            if let Some(ref pv) = pending_version
1835                && pv.versioned_response
1836            {
1837                req.input.key = versioned_shadow_key(&put_key, &pv.version_id);
1838            }
1839            let mut backend_resp = self.backend.put_object(req).await;
1840            if let Some(idx) = sidecar_index
1841                && backend_resp.is_ok()
1842                && idx.entries.len() > 1
1843            {
1844                // 1 chunk しかない (small object) なら sidecar は意味がない (=
1845                // partial fetch しても full body と同じ範囲) ので省略。
1846                // Sidecar は user-visible key で書く (latest version の
1847                // partial fetch path 用)。Old versions の Range GET は今 task
1848                // の scope 外 (full read fallback でも意味的には正しい)。
1849                self.write_sidecar(&put_bucket, &put_key, &idx).await;
1850            }
1851            // v0.5 #34: commit the new version into the manager only on
1852            // backend success. Use the pre-allocated vid so the response
1853            // header and the chain entry agree.
1854            if let (Some(mgr), Some(pv), Ok(resp)) = (
1855                self.versioning.as_ref(),
1856                pending_version.as_ref(),
1857                backend_resp.as_mut(),
1858            ) {
1859                let etag = resp
1860                    .output
1861                    .e_tag
1862                    .clone()
1863                    .map(ETag::into_value)
1864                    .unwrap_or_else(|| format!("\"crc32c-{}\"", manifest.crc32c));
1865                let now = chrono::Utc::now();
1866                mgr.commit_put_with_version(
1867                    &put_bucket,
1868                    &put_key,
1869                    crate::versioning::VersionEntry {
1870                        version_id: pv.version_id.clone(),
1871                        etag,
1872                        size: original_size,
1873                        is_delete_marker: false,
1874                        created_at: now,
1875                    },
1876                );
1877                if pv.versioned_response {
1878                    resp.output.version_id = Some(pv.version_id.clone());
1879                }
1880            }
1881            // v0.5 #27: AWS S3 echoes the SSE-C headers back on success
1882            // so the client knows the server actually applied the
1883            // requested algorithm and which key fingerprint matched.
1884            if let (Some(m), Ok(resp)) = (sse_c_material.as_ref(), backend_resp.as_mut()) {
1885                resp.output.sse_customer_algorithm = Some(crate::sse::SSE_C_ALGORITHM.into());
1886                resp.output.sse_customer_key_md5 = Some(
1887                    base64::engine::general_purpose::STANDARD.encode(m.key_md5),
1888                );
1889            }
1890            // v0.5 #28: SSE-KMS echo — `aws:kms` + the canonical key id
1891            // the backend returned (AWS KMS returns the ARN even when
1892            // the request used an alias).
1893            if let (Some((_, wrapped)), Ok(resp)) =
1894                (kms_wrap.as_ref(), backend_resp.as_mut())
1895            {
1896                resp.output.server_side_encryption =
1897                    Some(ServerSideEncryption::from_static(ServerSideEncryption::AWS_KMS));
1898                resp.output.ssekms_key_id = Some(wrapped.key_id.clone());
1899            }
1900            // v0.5 #30: persist any per-PUT explicit retention / legal
1901            // hold the client supplied, then auto-apply the bucket
1902            // default (no-op when state is already populated). The
1903            // explicit fields take precedence — the bucket-default
1904            // helper bails out as soon as it sees any retention.
1905            if let (Some(mgr), Ok(_)) = (self.object_lock.as_ref(), backend_resp.as_ref()) {
1906                if explicit_lock_mode.is_some()
1907                    || explicit_retain_until.is_some()
1908                    || explicit_legal_hold_on.is_some()
1909                {
1910                    let mut state = mgr.get(&put_bucket, &put_key).unwrap_or_default();
1911                    if let Some(m) = explicit_lock_mode {
1912                        state.mode = Some(m);
1913                    }
1914                    if let Some(u) = explicit_retain_until {
1915                        state.retain_until = Some(u);
1916                    }
1917                    if let Some(lh) = explicit_legal_hold_on {
1918                        state.legal_hold_on = lh;
1919                    }
1920                    mgr.set(&put_bucket, &put_key, state);
1921                }
1922                mgr.apply_default_on_put(&put_bucket, &put_key, chrono::Utc::now());
1923            }
1924            let _ = (original_size, compressed_size); // mute unused warnings
1925            let elapsed = put_start.elapsed();
1926            crate::metrics::record_put(
1927                codec_label,
1928                original_size,
1929                compressed_size,
1930                elapsed.as_secs_f64(),
1931                backend_resp.is_ok(),
1932            );
1933            // v0.4 #20: structured access-log entry (best-effort).
1934            self.record_access(
1935                access_preamble,
1936                "REST.PUT.OBJECT",
1937                &put_bucket,
1938                Some(&put_key),
1939                if backend_resp.is_ok() { 200 } else { 500 },
1940                compressed_size,
1941                original_size,
1942                elapsed.as_millis() as u64,
1943                backend_resp.as_ref().err().map(|e| e.code().as_str()),
1944            )
1945            .await;
1946            info!(
1947                op = "put_object",
1948                bucket = %put_bucket,
1949                key = %put_key,
1950                codec = codec_label,
1951                bytes_in = original_size,
1952                bytes_out = compressed_size,
1953                ratio = format!(
1954                    "{:.3}",
1955                    if original_size == 0 { 1.0 } else { compressed_size as f64 / original_size as f64 }
1956                ),
1957                latency_ms = elapsed.as_millis() as u64,
1958                ok = backend_resp.is_ok(),
1959                "S4 put completed"
1960            );
1961            // v0.6 #35: fire bucket-notification destinations (best-effort,
1962            // detached). Skipped when no manager is attached or when the
1963            // bucket has no rule matching `s3:ObjectCreated:Put` for this
1964            // key.
1965            if backend_resp.is_ok()
1966                && let Some(mgr) = self.notifications.as_ref()
1967            {
1968                let dests = mgr.match_destinations(
1969                    &put_bucket,
1970                    &crate::notifications::EventType::ObjectCreatedPut,
1971                    &put_key,
1972                );
1973                if !dests.is_empty() {
1974                    let etag = backend_resp
1975                        .as_ref()
1976                        .ok()
1977                        .and_then(|r| r.output.e_tag.clone())
1978                        .map(ETag::into_value);
1979                    let version_id = pending_version
1980                        .as_ref()
1981                        .filter(|pv| pv.versioned_response)
1982                        .map(|pv| pv.version_id.clone());
1983                    tokio::spawn(crate::notifications::dispatch_event(
1984                        Arc::clone(mgr),
1985                        put_bucket.clone(),
1986                        put_key.clone(),
1987                        crate::notifications::EventType::ObjectCreatedPut,
1988                        Some(original_size),
1989                        etag,
1990                        version_id,
1991                        format!("S4-{}", uuid::Uuid::new_v4()),
1992                    ));
1993                }
1994            }
1995            // v0.6 #39: persist parsed `x-amz-tagging` tags into the
1996            // tagging manager on a successful PUT. AWS PutObject's
1997            // tagging is a full-replace operation (not a merge), so
1998            // any pre-existing entry for `(bucket, key)` is overwritten.
1999            if backend_resp.is_ok()
2000                && let (Some(mgr), Some(tags)) =
2001                    (self.tagging.as_ref(), request_tags.clone())
2002            {
2003                mgr.put_object_tags(&put_bucket, &put_key, tags);
2004            }
2005            // v0.6 #40: cross-bucket replication fire-point. On
2006            // successful source PUT, consult the replication manager;
2007            // when an enabled rule matches, mark the source key
2008            // `Pending` and spawn a detached task that PUTs the same
2009            // backend bytes + metadata to the rule's destination
2010            // bucket. The dispatcher itself records `Completed` /
2011            // `Failed` and bumps the drop counter on retry-budget
2012            // exhaustion.
2013            self.spawn_replication_if_matched(
2014                &put_bucket,
2015                &put_key,
2016                &request_tags,
2017                &replication_body,
2018                &replication_metadata,
2019                backend_resp.is_ok(),
2020            );
2021            return backend_resp;
2022        }
2023        // Body-less PUT (rare: zero-length object). Mirror the body-full
2024        // versioning hooks so list_object_versions / GET-by-version still see
2025        // empty-body objects in the chain.
2026        let pending_version: Option<crate::versioning::PutOutcome> = self
2027            .versioning
2028            .as_ref()
2029            .map(|mgr| mgr.state(&put_bucket))
2030            .map(|state| match state {
2031                crate::versioning::VersioningState::Enabled => crate::versioning::PutOutcome {
2032                    version_id: crate::versioning::VersioningManager::new_version_id(),
2033                    versioned_response: true,
2034                },
2035                _ => crate::versioning::PutOutcome {
2036                    version_id: crate::versioning::NULL_VERSION_ID.to_owned(),
2037                    versioned_response: false,
2038                },
2039            });
2040        if let Some(ref pv) = pending_version
2041            && pv.versioned_response
2042        {
2043            req.input.key = versioned_shadow_key(&put_key, &pv.version_id);
2044        }
2045        let mut backend_resp = self.backend.put_object(req).await;
2046        if let (Some(mgr), Some(pv), Ok(resp)) = (
2047            self.versioning.as_ref(),
2048            pending_version.as_ref(),
2049            backend_resp.as_mut(),
2050        ) {
2051            let etag = resp
2052                .output
2053                .e_tag
2054                .clone()
2055                .map(ETag::into_value)
2056                .unwrap_or_default();
2057            let now = chrono::Utc::now();
2058            mgr.commit_put_with_version(
2059                &put_bucket,
2060                &put_key,
2061                crate::versioning::VersionEntry {
2062                    version_id: pv.version_id.clone(),
2063                    etag,
2064                    size: 0,
2065                    is_delete_marker: false,
2066                    created_at: now,
2067                },
2068            );
2069            if pv.versioned_response {
2070                resp.output.version_id = Some(pv.version_id.clone());
2071            }
2072        }
2073        // v0.5 #30: same explicit-then-default lock-state commit as the
2074        // body-bearing branch above, so a zero-length PUT also picks up
2075        // bucket-default retention.
2076        if let (Some(mgr), Ok(_)) = (self.object_lock.as_ref(), backend_resp.as_ref()) {
2077            if explicit_lock_mode.is_some()
2078                || explicit_retain_until.is_some()
2079                || explicit_legal_hold_on.is_some()
2080            {
2081                let mut state = mgr.get(&put_bucket, &put_key).unwrap_or_default();
2082                if let Some(m) = explicit_lock_mode {
2083                    state.mode = Some(m);
2084                }
2085                if let Some(u) = explicit_retain_until {
2086                    state.retain_until = Some(u);
2087                }
2088                if let Some(lh) = explicit_legal_hold_on {
2089                    state.legal_hold_on = lh;
2090                }
2091                mgr.set(&put_bucket, &put_key, state);
2092            }
2093            mgr.apply_default_on_put(&put_bucket, &put_key, chrono::Utc::now());
2094        }
2095        // v0.6 #35: same notification fire-point as the body-bearing PUT
2096        // branch above (zero-length objects still match `ObjectCreated:Put`
2097        // rules per the AWS event taxonomy).
2098        if backend_resp.is_ok()
2099            && let Some(mgr) = self.notifications.as_ref()
2100        {
2101            let dests = mgr.match_destinations(
2102                &put_bucket,
2103                &crate::notifications::EventType::ObjectCreatedPut,
2104                &put_key,
2105            );
2106            if !dests.is_empty() {
2107                let etag = backend_resp
2108                    .as_ref()
2109                    .ok()
2110                    .and_then(|r| r.output.e_tag.clone())
2111                    .map(ETag::into_value);
2112                let version_id = pending_version
2113                    .as_ref()
2114                    .filter(|pv| pv.versioned_response)
2115                    .map(|pv| pv.version_id.clone());
2116                tokio::spawn(crate::notifications::dispatch_event(
2117                    Arc::clone(mgr),
2118                    put_bucket.clone(),
2119                    put_key.clone(),
2120                    crate::notifications::EventType::ObjectCreatedPut,
2121                    Some(0),
2122                    etag,
2123                    version_id,
2124                    format!("S4-{}", uuid::Uuid::new_v4()),
2125                ));
2126            }
2127        }
2128        // v0.6 #39: persist parsed `x-amz-tagging` for the body-less
2129        // (zero-length) PUT branch too — same shape as the body-bearing
2130        // branch above.
2131        if backend_resp.is_ok()
2132            && let (Some(mgr), Some(tags)) = (self.tagging.as_ref(), request_tags.clone())
2133        {
2134            mgr.put_object_tags(&put_bucket, &put_key, tags);
2135        }
2136        // v0.6 #40: cross-bucket replication for the zero-length PUT
2137        // branch — same shape as the body-bearing branch above.
2138        self.spawn_replication_if_matched(
2139            &put_bucket,
2140            &put_key,
2141            &request_tags,
2142            &bytes::Bytes::new(),
2143            &None,
2144            backend_resp.is_ok(),
2145        );
2146        backend_resp
2147    }
2148
2149    // === 圧縮を解く path (GET) ===
2150    #[tracing::instrument(
2151        name = "s4.get_object",
2152        skip(self, req),
2153        fields(bucket = %req.input.bucket, key = %req.input.key, codec, bytes_out, range, path)
2154    )]
2155    async fn get_object(
2156        &self,
2157        mut req: S3Request<GetObjectInput>,
2158    ) -> S3Result<S3Response<GetObjectOutput>> {
2159        let get_start = Instant::now();
2160        let get_bucket = req.input.bucket.clone();
2161        let get_key = req.input.key.clone();
2162        self.enforce_rate_limit(&req, &get_bucket)?;
2163        self.enforce_policy(&req, "s3:GetObject", &get_bucket, Some(&get_key))?;
2164        // Range request の事前検出 (decompress 後 slice する path に使う)。
2165        let range_request = req.input.range.take();
2166        // v0.5 #27: pull SSE-C material from the input headers before
2167        // the request is moved into the backend. A header parse error
2168        // fails fast (no body fetch). The material is consumed below
2169        // when decrypting an S4E3-framed body; the SSE-C headers on
2170        // `req.input` are cleared so the backend doesn't see them.
2171        let sse_c_alg = req.input.sse_customer_algorithm.take();
2172        let sse_c_key = req.input.sse_customer_key.take();
2173        let sse_c_md5 = req.input.sse_customer_key_md5.take();
2174        let get_sse_c_material =
2175            extract_sse_c_material(&sse_c_alg, &sse_c_key, &sse_c_md5)?;
2176
2177        // v0.5 #34: route the GET through the VersioningManager when
2178        // attached AND the bucket is in a versioning-aware state.
2179        // Resolves which version to fetch (explicit `?versionId=` query
2180        // param vs. chain latest), translates a delete-marker into 404
2181        // NoSuchKey, and rewrites the backend storage key to the shadow
2182        // path (`<key>.__s4ver__/<vid>`) for non-null Enabled-bucket
2183        // versions. `resolved_version_id` is stamped onto the response
2184        // so clients see a coherent `x-amz-version-id` header.
2185        //
2186        // When the bucket is Unversioned (or no manager attached), the
2187        // chain-resolution step is skipped and the request flows
2188        // through the existing single-key path unchanged.
2189        let resolved_version_id: Option<String> = match self.versioning.as_ref() {
2190            Some(mgr)
2191                if mgr.state(&get_bucket) != crate::versioning::VersioningState::Unversioned =>
2192            {
2193                let req_vid = req.input.version_id.take();
2194                let entry = match req_vid.as_deref() {
2195                    Some(vid) => mgr.lookup_version(&get_bucket, &get_key, vid).ok_or_else(
2196                        || S3Error::with_message(
2197                            S3ErrorCode::NoSuchVersion,
2198                            format!("no such version: {vid}"),
2199                        ),
2200                    )?,
2201                    None => mgr.lookup_latest(&get_bucket, &get_key).ok_or_else(|| {
2202                        S3Error::with_message(
2203                            S3ErrorCode::NoSuchKey,
2204                            format!("no such key: {get_key}"),
2205                        )
2206                    })?,
2207                };
2208                if entry.is_delete_marker {
2209                    // S3 spec: GET without versionId on a
2210                    // delete-marker latest → 404 NoSuchKey + the
2211                    // response carries `x-amz-delete-marker: true`.
2212                    // GET with explicit versionId pointing at a delete
2213                    // marker → 405 MethodNotAllowed; we surface
2214                    // NoSuchKey here for both since s3s collapses them
2215                    // into the same not-found error path.
2216                    return Err(S3Error::with_message(
2217                        S3ErrorCode::NoSuchKey,
2218                        format!("delete marker is the current version of {get_key}"),
2219                    ));
2220                }
2221                if entry.version_id != crate::versioning::NULL_VERSION_ID {
2222                    req.input.key = versioned_shadow_key(&get_key, &entry.version_id);
2223                }
2224                Some(entry.version_id)
2225            }
2226            _ => None,
2227        };
2228
2229        // ====== Range GET の partial-fetch fast path (sidecar index 利用) ======
2230        // sidecar `<key>.s4index` が存在し、multipart-framed object であれば
2231        // 必要 frame だけを backend に Range GET し帯域節約する。
2232        if let Some(ref r) = range_request
2233            && let Some(index) = self.read_sidecar(&req.input.bucket, &req.input.key).await
2234        {
2235            let total = index.total_original_size();
2236            let (start, end_exclusive) = match resolve_range(r, total) {
2237                Ok(v) => v,
2238                Err(e) => {
2239                    return Err(S3Error::with_message(S3ErrorCode::InvalidRange, e));
2240                }
2241            };
2242            if let Some(plan) = index.lookup_range(start, end_exclusive) {
2243                return self
2244                    .partial_range_get(&req, plan, start, end_exclusive, total, get_start)
2245                    .await;
2246            }
2247        }
2248        let mut resp = self.backend.get_object(req).await?;
2249        // v0.5 #34: stamp the resolved version-id so the client sees a
2250        // coherent `x-amz-version-id` header (only for chains owned by
2251        // the manager — Unversioned buckets / no-manager paths never
2252        // set this).
2253        if let Some(ref vid) = resolved_version_id {
2254            resp.output.version_id = Some(vid.clone());
2255        }
2256        let is_multipart = is_multipart_object(&resp.output.metadata);
2257        let is_framed_v2 = is_framed_v2_object(&resp.output.metadata);
2258        // v0.2 #4: framed-v2 single-PUT は多 frame parse が必要なので
2259        // multipart と同じ path に流す。
2260        let needs_frame_parse = is_multipart || is_framed_v2;
2261        let manifest_opt = extract_manifest(&resp.output.metadata);
2262
2263        if !needs_frame_parse && manifest_opt.is_none() {
2264            // S4 が書いていないオブジェクトは透過 (raw bucket pre-existing object 等)
2265            debug!("S4 get_object: object lacks s4-codec metadata, returning as-is");
2266            return Ok(resp);
2267        }
2268
2269        if let Some(blob) = resp.output.body.take() {
2270            // v0.4 #21 / v0.5 #27: if the object was stored under SSE
2271            // (metadata flag `s4-encrypted: aes-256-gcm`), decrypt
2272            // before any frame parse / streaming decompress. Encrypted
2273            // bodies are opaque to the codec; this also forces the
2274            // buffered path because AES-GCM needs the full body for tag
2275            // verify. SSE-C uses the per-request customer key, SSE-S4
2276            // falls back to the configured keyring.
2277            let blob = if is_sse_encrypted(&resp.output.metadata) {
2278                let body = collect_blob(blob, self.max_body_bytes)
2279                    .await
2280                    .map_err(internal("collect SSE-encrypted body"))?;
2281                // v0.5 #28: peek the frame magic to route the right
2282                // decrypt path. S4E4 means SSE-KMS — unwrap the DEK
2283                // through the KMS backend (async). S4E1/E2/E3 take the
2284                // sync path (keyring or customer key).
2285                let plain = match crate::sse::peek_magic(&body) {
2286                    Some("S4E4") => {
2287                        let kms = self.kms.as_ref().ok_or_else(|| {
2288                            S3Error::with_message(
2289                                S3ErrorCode::InvalidRequest,
2290                                "object is SSE-KMS encrypted but no --kms-local-dir / --kms-aws-region is configured on this gateway",
2291                            )
2292                        })?;
2293                        let kms_ref: &dyn crate::kms::KmsBackend = kms.as_ref();
2294                        crate::sse::decrypt_with_kms(&body, kms_ref)
2295                            .await
2296                            .map_err(|e| match e {
2297                                crate::sse::SseError::KmsBackend(k) => kms_error_to_s3(k),
2298                                other => S3Error::with_message(
2299                                    S3ErrorCode::InternalError,
2300                                    format!("SSE-KMS decrypt failed: {other}"),
2301                                ),
2302                            })?
2303                    }
2304                    _ => {
2305                        if let Some(ref m) = get_sse_c_material {
2306                            crate::sse::decrypt(
2307                                &body,
2308                                crate::sse::SseSource::CustomerKey {
2309                                    key: &m.key,
2310                                    key_md5: &m.key_md5,
2311                                },
2312                            )
2313                            .map_err(sse_c_error_to_s3)?
2314                        } else {
2315                            let keyring = self.sse_keyring.as_ref().ok_or_else(|| {
2316                                S3Error::with_message(
2317                                    S3ErrorCode::InvalidRequest,
2318                                    "object is SSE-S4 encrypted but no --sse-s4-key is configured on this gateway",
2319                                )
2320                            })?;
2321                            crate::sse::decrypt(&body, keyring).map_err(|e| {
2322                                S3Error::with_message(
2323                                    S3ErrorCode::InternalError,
2324                                    format!("SSE-S4 decrypt failed: {e}"),
2325                                )
2326                            })?
2327                        }
2328                    }
2329                };
2330                // v0.5 #28: parse out the on-disk wrapped DEK's key id
2331                // so the GET response can echo `x-amz-server-side-encryption-aws-kms-key-id`.
2332                if matches!(crate::sse::peek_magic(&body), Some("S4E4"))
2333                    && let Ok(hdr) = crate::sse::parse_s4e4_header(&body)
2334                {
2335                    resp.output.server_side_encryption = Some(
2336                        ServerSideEncryption::from_static(ServerSideEncryption::AWS_KMS),
2337                    );
2338                    resp.output.ssekms_key_id = Some(hdr.key_id.to_string());
2339                }
2340                bytes_to_blob(plain)
2341            } else if let Some(ref m) = get_sse_c_material {
2342                // Client sent SSE-C headers for an unencrypted object —
2343                // mirror AWS S3's 400 InvalidRequest.
2344                let _ = m;
2345                return Err(sse_c_error_to_s3(crate::sse::SseError::CustomerKeyUnexpected));
2346            } else {
2347                blob
2348            };
2349            // v0.5 #27: SSE-C echo on success — algorithm + key MD5
2350            // tell the client that the supplied key was the one used.
2351            if let Some(ref m) = get_sse_c_material {
2352                resp.output.sse_customer_algorithm = Some(crate::sse::SSE_C_ALGORITHM.into());
2353                resp.output.sse_customer_key_md5 = Some(
2354                    base64::engine::general_purpose::STANDARD.encode(m.key_md5),
2355                );
2356            }
2357            // ====== Streaming fast path (CpuZstd, non-multipart, codec supports it) ======
2358            // 大規模 object (e.g. 5 GB) を memory に collect すると OOM するので、
2359            // codec が streaming-aware なら body を chunk-by-chunk で decompress して
2360            // 即座に client に流す。
2361            //
2362            // ただし Range request 時は streaming できない (slice するため total bytes
2363            // が必要) → buffered path に fall through。
2364            if range_request.is_none()
2365                && !needs_frame_parse
2366                && let Some(ref m) = manifest_opt
2367                && supports_streaming_decompress(m.codec)
2368                && m.codec == CodecKind::CpuZstd
2369            {
2370                let decompressed_blob = cpu_zstd_decompress_stream(blob);
2371                resp.output.content_length = Some(m.original_size as i64);
2372                resp.output.checksum_crc32 = None;
2373                resp.output.checksum_crc32c = None;
2374                resp.output.checksum_crc64nvme = None;
2375                resp.output.checksum_sha1 = None;
2376                resp.output.checksum_sha256 = None;
2377                resp.output.e_tag = None;
2378                resp.output.body = Some(decompressed_blob);
2379                let elapsed = get_start.elapsed();
2380                crate::metrics::record_get(
2381                    m.codec.as_str(),
2382                    m.compressed_size,
2383                    m.original_size,
2384                    elapsed.as_secs_f64(),
2385                    true,
2386                );
2387                info!(
2388                    op = "get_object",
2389                    bucket = %get_bucket,
2390                    key = %get_key,
2391                    codec = m.codec.as_str(),
2392                    bytes_in = m.compressed_size,
2393                    bytes_out = m.original_size,
2394                    path = "streaming",
2395                    setup_latency_ms = elapsed.as_millis() as u64,
2396                    "S4 get started (streaming)"
2397                );
2398                return Ok(resp);
2399            }
2400            // Passthrough: そのまま流す (Range なしの場合のみ streaming)
2401            if range_request.is_none()
2402                && !needs_frame_parse
2403                && let Some(ref m) = manifest_opt
2404                && m.codec == CodecKind::Passthrough
2405            {
2406                resp.output.content_length = Some(m.original_size as i64);
2407                resp.output.checksum_crc32 = None;
2408                resp.output.checksum_crc32c = None;
2409                resp.output.checksum_crc64nvme = None;
2410                resp.output.checksum_sha1 = None;
2411                resp.output.checksum_sha256 = None;
2412                resp.output.e_tag = None;
2413                resp.output.body = Some(blob);
2414                debug!("S4 get_object: passthrough streaming");
2415                return Ok(resp);
2416            }
2417
2418            // ====== Buffered slow path (multipart frame parser, GPU codecs) ======
2419            let bytes = collect_blob(blob, self.max_body_bytes)
2420                .await
2421                .map_err(internal("collect get body"))?;
2422
2423            let decompressed = if needs_frame_parse {
2424                // multipart objects と framed-v2 single-PUT objects は同じ
2425                // S4F2 frame 列なので decompress_multipart で統一処理
2426                self.decompress_multipart(bytes).await?
2427            } else {
2428                let manifest = manifest_opt.as_ref().expect("non-multipart guarded above");
2429                self.registry
2430                    .decompress(bytes, manifest)
2431                    .await
2432                    .map_err(internal("registry decompress"))?
2433            };
2434
2435            // Range request があれば slice。なければ full body を返す。
2436            let total_size = decompressed.len() as u64;
2437            let (final_bytes, status_override) = if let Some(r) = range_request.as_ref() {
2438                let (start, end) = resolve_range(r, total_size)
2439                    .map_err(|e| S3Error::with_message(S3ErrorCode::InvalidRange, e))?;
2440                let sliced = decompressed.slice(start as usize..end as usize);
2441                resp.output.content_range = Some(format!(
2442                    "bytes {start}-{}/{total_size}",
2443                    end.saturating_sub(1)
2444                ));
2445                (sliced, Some(http::StatusCode::PARTIAL_CONTENT))
2446            } else {
2447                (decompressed, None)
2448            };
2449            // 解凍後の真のサイズを返す (S3 client は content_length を信頼するので
2450            // 圧縮 size のままだと downstream が body を途中で切ってしまう)
2451            resp.output.content_length = Some(final_bytes.len() as i64);
2452            // 圧縮済 bytes の checksum を返すと AWS SDK 側で StreamingError
2453            // (ChecksumMismatch) になる。ETag も backend が返した「圧縮済 bytes の
2454            // MD5/checksum」なので意味的にズレる — クリアして S4 自身の crc32c
2455            // (manifest 内 / frame 内) で integrity を保証する設計にする。
2456            resp.output.checksum_crc32 = None;
2457            resp.output.checksum_crc32c = None;
2458            resp.output.checksum_crc64nvme = None;
2459            resp.output.checksum_sha1 = None;
2460            resp.output.checksum_sha256 = None;
2461            resp.output.e_tag = None;
2462            let returned_size = final_bytes.len() as u64;
2463            let codec_label = manifest_opt
2464                .as_ref()
2465                .map(|m| m.codec.as_str())
2466                .unwrap_or("multipart");
2467            resp.output.body = Some(bytes_to_blob(final_bytes));
2468            if let Some(status) = status_override {
2469                resp.status = Some(status);
2470            }
2471            let elapsed = get_start.elapsed();
2472            crate::metrics::record_get(codec_label, 0, returned_size, elapsed.as_secs_f64(), true);
2473            info!(
2474                op = "get_object",
2475                bucket = %get_bucket,
2476                key = %get_key,
2477                codec = codec_label,
2478                bytes_out = returned_size,
2479                total_object_size = total_size,
2480                range = range_request.is_some(),
2481                path = "buffered",
2482                latency_ms = elapsed.as_millis() as u64,
2483                "S4 get completed (buffered)"
2484            );
2485        }
2486        // v0.6 #40: echo the recorded `x-amz-replication-status` so
2487        // consumers can poll progress (PENDING / COMPLETED / FAILED).
2488        if let Some(mgr) = self.replication.as_ref()
2489            && let Some(status) = mgr.lookup_status(&get_bucket, &get_key)
2490        {
2491            resp.output.replication_status =
2492                Some(s3s::dto::ReplicationStatus::from(status.as_aws_str().to_owned()));
2493        }
2494        Ok(resp)
2495    }
2496
2497    // === passthrough delegations ===
2498    async fn head_bucket(
2499        &self,
2500        req: S3Request<HeadBucketInput>,
2501    ) -> S3Result<S3Response<HeadBucketOutput>> {
2502        self.backend.head_bucket(req).await
2503    }
2504    async fn list_buckets(
2505        &self,
2506        req: S3Request<ListBucketsInput>,
2507    ) -> S3Result<S3Response<ListBucketsOutput>> {
2508        self.backend.list_buckets(req).await
2509    }
2510    async fn create_bucket(
2511        &self,
2512        req: S3Request<CreateBucketInput>,
2513    ) -> S3Result<S3Response<CreateBucketOutput>> {
2514        self.backend.create_bucket(req).await
2515    }
2516    async fn delete_bucket(
2517        &self,
2518        req: S3Request<DeleteBucketInput>,
2519    ) -> S3Result<S3Response<DeleteBucketOutput>> {
2520        self.backend.delete_bucket(req).await
2521    }
2522    async fn head_object(
2523        &self,
2524        req: S3Request<HeadObjectInput>,
2525    ) -> S3Result<S3Response<HeadObjectOutput>> {
2526        // v0.6 #40: capture bucket/key before req is consumed so the
2527        // replication-status echo can look the entry up.
2528        let head_bucket = req.input.bucket.clone();
2529        let head_key = req.input.key.clone();
2530        let mut resp = self.backend.head_object(req).await?;
2531        if let Some(manifest) = extract_manifest(&resp.output.metadata) {
2532            // 客側には decompress 後の意味のある content_length / checksum を返す。
2533            // backend が返す圧縮済 bytes の checksum / e_tag は意味が違うため除去
2534            // (S4 は manifest 内の crc32c で integrity を担保する)。
2535            resp.output.content_length = Some(manifest.original_size as i64);
2536            resp.output.checksum_crc32 = None;
2537            resp.output.checksum_crc32c = None;
2538            resp.output.checksum_crc64nvme = None;
2539            resp.output.checksum_sha1 = None;
2540            resp.output.checksum_sha256 = None;
2541            resp.output.e_tag = None;
2542        }
2543        // v0.6 #40: echo `x-amz-replication-status` (PENDING / COMPLETED
2544        // / FAILED) so consumers can poll progress without a GET.
2545        if let Some(mgr) = self.replication.as_ref()
2546            && let Some(status) = mgr.lookup_status(&head_bucket, &head_key)
2547        {
2548            resp.output.replication_status =
2549                Some(s3s::dto::ReplicationStatus::from(status.as_aws_str().to_owned()));
2550        }
2551        Ok(resp)
2552    }
2553    async fn delete_object(
2554        &self,
2555        mut req: S3Request<DeleteObjectInput>,
2556    ) -> S3Result<S3Response<DeleteObjectOutput>> {
2557        let bucket = req.input.bucket.clone();
2558        let key = req.input.key.clone();
2559        self.enforce_rate_limit(&req, &bucket)?;
2560        self.enforce_policy(&req, "s3:DeleteObject", &bucket, Some(&key))?;
2561        // v0.6 #42: MFA Delete enforcement. When the bucket has
2562        // MFA-Delete = Enabled, every DELETE / DELETE-version /
2563        // delete-marker form needs `x-amz-mfa: <serial> <code>` (RFC 6238
2564        // 6-digit TOTP). Runs *before* the WORM / versioning routers so
2565        // a missing token is denied for free regardless of which delete
2566        // path the request would otherwise take.
2567        if let Some(mgr) = self.mfa_delete.as_ref()
2568            && mgr.is_enabled(&bucket)
2569        {
2570            let header = req.input.mfa.as_deref();
2571            if let Err(e) = crate::mfa::check_mfa(&bucket, header, mgr, current_unix_secs()) {
2572                crate::metrics::record_mfa_delete_denial(&bucket);
2573                return Err(mfa_error_to_s3(e));
2574            }
2575        }
2576        // v0.5 #30: refuse the delete while a WORM lock is in effect.
2577        // Compliance can never be bypassed; Governance can be overridden
2578        // via `x-amz-bypass-governance-retention: true`; legal hold
2579        // never. The check happens before the versioning router so a
2580        // locked object can't be soft-deleted (delete-marker push) on an
2581        // Enabled bucket either — S3 spec says lock applies to all
2582        // delete forms.
2583        if let Some(mgr) = self.object_lock.as_ref()
2584            && let Some(state) = mgr.get(&bucket, &key)
2585        {
2586            let bypass = req.input.bypass_governance_retention.unwrap_or(false);
2587            let now = chrono::Utc::now();
2588            if !state.can_delete(now, bypass) {
2589                crate::metrics::record_policy_denial("s3:DeleteObject", &bucket);
2590                return Err(S3Error::with_message(
2591                    S3ErrorCode::AccessDenied,
2592                    "Access Denied because object protected by object lock",
2593                ));
2594            }
2595        }
2596        // v0.5 #34: route DELETE through the VersioningManager when the
2597        // bucket is in a versioning-aware state.
2598        //
2599        // - Enabled bucket, no version_id → push a delete marker into
2600        //   the chain. NO backend object is touched (older versions
2601        //   stay reachable via specific-version GET).
2602        // - Enabled / Suspended bucket, with version_id → physical
2603        //   delete. Backend bytes at the shadow key (or `<key>` for
2604        //   `null`) are removed; chain entry is dropped. If the deleted
2605        //   entry was a delete marker, no backend bytes exist for it
2606        //   (record-only).
2607        // - Suspended bucket, no version_id → push a "null" delete
2608        //   marker (S3 spec); backend bytes at `<key>` are physically
2609        //   removed (same as legacy).
2610        // - Unversioned bucket → fall through to legacy passthrough.
2611        if let Some(mgr) = self.versioning.as_ref() {
2612            let state = mgr.state(&bucket);
2613            if state != crate::versioning::VersioningState::Unversioned {
2614                let req_vid = req.input.version_id.take();
2615                if let Some(vid) = req_vid {
2616                    // Specific-version DELETE: touch backend bytes only
2617                    // when the entry was a real version (not a delete
2618                    // marker, which has no backend bytes).
2619                    let outcome = mgr.record_delete_specific(&bucket, &key, &vid);
2620                    let backend_target = if vid == crate::versioning::NULL_VERSION_ID {
2621                        key.clone()
2622                    } else {
2623                        versioned_shadow_key(&key, &vid)
2624                    };
2625                    let was_real_version = outcome
2626                        .as_ref()
2627                        .map(|o| !o.is_delete_marker)
2628                        .unwrap_or(false);
2629                    if was_real_version {
2630                        // Best-effort backend cleanup; missing bytes
2631                        // are not an error (e.g. shadow key already
2632                        // GC'd).
2633                        let backend_input = DeleteObjectInput {
2634                            bucket: bucket.clone(),
2635                            key: backend_target,
2636                            ..Default::default()
2637                        };
2638                        let backend_req = S3Request {
2639                            input: backend_input,
2640                            method: http::Method::DELETE,
2641                            uri: req.uri.clone(),
2642                            headers: req.headers.clone(),
2643                            extensions: http::Extensions::new(),
2644                            credentials: req.credentials.clone(),
2645                            region: req.region.clone(),
2646                            service: req.service.clone(),
2647                            trailing_headers: None,
2648                        };
2649                        let _ = self.backend.delete_object(backend_req).await;
2650                    }
2651                    let mut output = DeleteObjectOutput {
2652                        version_id: Some(vid.clone()),
2653                        ..Default::default()
2654                    };
2655                    if let Some(o) = outcome.as_ref()
2656                        && o.is_delete_marker
2657                    {
2658                        output.delete_marker = Some(true);
2659                    }
2660                    // v0.6 #35: specific-version DELETE always counts as
2661                    // a hard `ObjectRemoved:Delete` event (the chain
2662                    // entry, marker or not, is gone after this call).
2663                    self.fire_delete_notification(
2664                        &bucket,
2665                        &key,
2666                        crate::notifications::EventType::ObjectRemovedDelete,
2667                        Some(vid.clone()),
2668                    );
2669                    return Ok(S3Response::new(output));
2670                }
2671                // No version_id: record a delete marker (state-aware).
2672                let outcome = mgr.record_delete(&bucket, &key);
2673                if state == crate::versioning::VersioningState::Suspended {
2674                    // Suspended buckets also evict the prior `<key>`
2675                    // bytes (the previous null version is gone too).
2676                    let backend_input = DeleteObjectInput {
2677                        bucket: bucket.clone(),
2678                        key: key.clone(),
2679                        ..Default::default()
2680                    };
2681                    let backend_req = S3Request {
2682                        input: backend_input,
2683                        method: http::Method::DELETE,
2684                        uri: req.uri.clone(),
2685                        headers: req.headers.clone(),
2686                        extensions: http::Extensions::new(),
2687                        credentials: req.credentials.clone(),
2688                        region: req.region.clone(),
2689                        service: req.service.clone(),
2690                        trailing_headers: None,
2691                    };
2692                    let _ = self.backend.delete_object(backend_req).await;
2693                }
2694                let output = DeleteObjectOutput {
2695                    delete_marker: Some(true),
2696                    version_id: outcome.version_id.clone(),
2697                    ..Default::default()
2698                };
2699                // v0.6 #35: versioned bucket DELETE without a version-id
2700                // creates a delete marker — the dedicated AWS event
2701                // taxonomy entry. Suspended-state buckets also push a
2702                // (null) marker, so the same event fires there.
2703                self.fire_delete_notification(
2704                    &bucket,
2705                    &key,
2706                    crate::notifications::EventType::ObjectRemovedDeleteMarker,
2707                    outcome.version_id,
2708                );
2709                return Ok(S3Response::new(output));
2710            }
2711        }
2712        // Legacy / Unversioned path: physical delete on the backend +
2713        // best-effort sidecar cleanup (mirrors v0.4 behaviour).
2714        let resp = self.backend.delete_object(req).await?;
2715        // v0.5 #30: drop any per-object lock state once the delete has
2716        // succeeded so the freed key can be re-armed by a future PUT
2717        // under the bucket default. Reaching here implies the lock had
2718        // already passed `can_delete` above, so this is purely cleanup.
2719        if let Some(mgr) = self.object_lock.as_ref() {
2720            mgr.clear(&bucket, &key);
2721        }
2722        // v0.6 #39: drop any object-level tag set on physical delete —
2723        // the freed key starts a fresh tag history if a future PUT
2724        // re-creates it. (Versioned-delete branches above return early
2725        // and do NOT touch tags, mirroring AWS where tag state is
2726        // attached to the logical key, not the version chain.)
2727        if let Some(mgr) = self.tagging.as_ref() {
2728            mgr.delete_object_tags(&bucket, &key);
2729        }
2730        let sidecar_input = DeleteObjectInput {
2731            bucket: bucket.clone(),
2732            key: sidecar_key(&key),
2733            ..Default::default()
2734        };
2735        let sidecar_req = S3Request {
2736            input: sidecar_input,
2737            method: http::Method::DELETE,
2738            uri: format!("/{bucket}/{}", sidecar_key(&key)).parse().unwrap(),
2739            headers: http::HeaderMap::new(),
2740            extensions: http::Extensions::new(),
2741            credentials: None,
2742            region: None,
2743            service: None,
2744            trailing_headers: None,
2745        };
2746        let _ = self.backend.delete_object(sidecar_req).await;
2747        // v0.6 #35: legacy unversioned-bucket hard delete fires the
2748        // canonical `ObjectRemoved:Delete` event.
2749        self.fire_delete_notification(
2750            &bucket,
2751            &key,
2752            crate::notifications::EventType::ObjectRemovedDelete,
2753            None,
2754        );
2755        Ok(resp)
2756    }
2757    async fn delete_objects(
2758        &self,
2759        req: S3Request<DeleteObjectsInput>,
2760    ) -> S3Result<S3Response<DeleteObjectsOutput>> {
2761        // v0.6 #42: MFA Delete applies once to the whole batch (S3 spec:
2762        // when MFA-Delete is on the bucket, a missing / invalid token
2763        // fails the entire DeleteObjects request, not per-object).
2764        if let Some(mgr) = self.mfa_delete.as_ref()
2765            && mgr.is_enabled(&req.input.bucket)
2766        {
2767            let header = req.input.mfa.as_deref();
2768            if let Err(e) =
2769                crate::mfa::check_mfa(&req.input.bucket, header, mgr, current_unix_secs())
2770            {
2771                crate::metrics::record_mfa_delete_denial(&req.input.bucket);
2772                return Err(mfa_error_to_s3(e));
2773            }
2774        }
2775        self.backend.delete_objects(req).await
2776    }
2777    async fn copy_object(
2778        &self,
2779        mut req: S3Request<CopyObjectInput>,
2780    ) -> S3Result<S3Response<CopyObjectOutput>> {
2781        // copy is conceptually "GetObject src + PutObject dst" — enforce both.
2782        let dst_bucket = req.input.bucket.clone();
2783        let dst_key = req.input.key.clone();
2784        self.enforce_policy(&req, "s3:PutObject", &dst_bucket, Some(&dst_key))?;
2785        if let CopySource::Bucket { bucket, key, .. } = &req.input.copy_source {
2786            self.enforce_policy(&req, "s3:GetObject", bucket, Some(key))?;
2787        }
2788        // S4-aware copy: source object に s4-* metadata がある場合、それを
2789        // destination に確実に preserve する。
2790        //
2791        // - MetadataDirective::COPY (default): backend が source metadata を
2792        //   そのまま copy するので S4 metadata も自動で渡る。介入不要
2793        // - MetadataDirective::REPLACE: 客が指定した metadata で source を
2794        //   上書き → s4-* metadata が消えると destination は decompress 不能に
2795        //   なる (silent corruption)。S4 が source metadata を HEAD で取得し、
2796        //   s4-* fields を input.metadata に強制 merge する
2797        let needs_merge = req
2798            .input
2799            .metadata_directive
2800            .as_ref()
2801            .map(|d| d.as_str() == MetadataDirective::REPLACE)
2802            .unwrap_or(false);
2803        if needs_merge && let CopySource::Bucket { bucket, key, .. } = &req.input.copy_source {
2804            let head_input = HeadObjectInput {
2805                bucket: bucket.to_string(),
2806                key: key.to_string(),
2807                ..Default::default()
2808            };
2809            let head_req = S3Request {
2810                input: head_input,
2811                method: req.method.clone(),
2812                uri: req.uri.clone(),
2813                headers: req.headers.clone(),
2814                extensions: http::Extensions::new(),
2815                credentials: req.credentials.clone(),
2816                region: req.region.clone(),
2817                service: req.service.clone(),
2818                trailing_headers: None,
2819            };
2820            if let Ok(head) = self.backend.head_object(head_req).await
2821                && let Some(src_meta) = head.output.metadata.as_ref()
2822            {
2823                let dest_meta = req.input.metadata.get_or_insert_with(Default::default);
2824                for key in [
2825                    META_CODEC,
2826                    META_ORIGINAL_SIZE,
2827                    META_COMPRESSED_SIZE,
2828                    META_CRC32C,
2829                    META_MULTIPART,
2830                    META_FRAMED,
2831                ] {
2832                    if let Some(v) = src_meta.get(key) {
2833                        // 客が同じ key を指定していたら preserve しない (= 上書き許可)
2834                        // していたら何もしない。指定していなければ insert
2835                        dest_meta
2836                            .entry(key.to_string())
2837                            .or_insert_with(|| v.clone());
2838                    }
2839                }
2840                debug!(
2841                    src_bucket = %bucket,
2842                    src_key = %key,
2843                    "S4 copy_object: preserved s4-* metadata across REPLACE directive"
2844                );
2845            }
2846        }
2847        self.backend.copy_object(req).await
2848    }
2849    async fn list_objects(
2850        &self,
2851        req: S3Request<ListObjectsInput>,
2852    ) -> S3Result<S3Response<ListObjectsOutput>> {
2853        self.enforce_rate_limit(&req, &req.input.bucket)?;
2854        self.enforce_policy(&req, "s3:ListBucket", &req.input.bucket, None)?;
2855        let mut resp = self.backend.list_objects(req).await?;
2856        // S4 内部 object (`*.s4index` sidecar、`.__s4ver__/` shadow versions
2857        // — v0.5 #34) を顧客から隠す。
2858        if let Some(contents) = resp.output.contents.as_mut() {
2859            contents.retain(|o| {
2860                o.key
2861                    .as_ref()
2862                    .map(|k| !k.ends_with(".s4index") && !is_versioning_shadow_key(k))
2863                    .unwrap_or(true)
2864            });
2865        }
2866        Ok(resp)
2867    }
2868    async fn list_objects_v2(
2869        &self,
2870        req: S3Request<ListObjectsV2Input>,
2871    ) -> S3Result<S3Response<ListObjectsV2Output>> {
2872        self.enforce_rate_limit(&req, &req.input.bucket)?;
2873        self.enforce_policy(&req, "s3:ListBucket", &req.input.bucket, None)?;
2874        let mut resp = self.backend.list_objects_v2(req).await?;
2875        if let Some(contents) = resp.output.contents.as_mut() {
2876            let before = contents.len();
2877            contents.retain(|o| {
2878                o.key
2879                    .as_ref()
2880                    .map(|k| !k.ends_with(".s4index") && !is_versioning_shadow_key(k))
2881                    .unwrap_or(true)
2882            });
2883            // key_count も補正 (S3 spec compliance)
2884            if let Some(kc) = resp.output.key_count.as_mut() {
2885                *kc -= (before - contents.len()) as i32;
2886            }
2887        }
2888        Ok(resp)
2889    }
2890    /// v0.4 #17: filter S4-internal sidecars from versioned listings.
2891    /// v0.5 #34: when a [`crate::versioning::VersioningManager`] is
2892    /// attached AND the bucket is in a versioning-aware state, build
2893    /// the `Versions` / `DeleteMarkers` arrays directly from the
2894    /// in-memory chain (paginated + ordered the S3 way: key asc,
2895    /// version newest-first inside each key). Otherwise fall back to
2896    /// passthrough + sidecar-filter (legacy v0.4 behaviour).
2897    async fn list_object_versions(
2898        &self,
2899        req: S3Request<ListObjectVersionsInput>,
2900    ) -> S3Result<S3Response<ListObjectVersionsOutput>> {
2901        self.enforce_rate_limit(&req, &req.input.bucket)?;
2902        self.enforce_policy(&req, "s3:ListBucket", &req.input.bucket, None)?;
2903        // v0.5 #34: VersioningManager-owned path.
2904        if let Some(mgr) = self.versioning.as_ref()
2905            && mgr.state(&req.input.bucket) != crate::versioning::VersioningState::Unversioned
2906        {
2907            let max_keys = req.input.max_keys.unwrap_or(1000) as usize;
2908            let page = mgr.list_versions(
2909                &req.input.bucket,
2910                req.input.prefix.as_deref(),
2911                req.input.key_marker.as_deref(),
2912                req.input.version_id_marker.as_deref(),
2913                max_keys,
2914            );
2915            let versions: Vec<ObjectVersion> = page
2916                .versions
2917                .into_iter()
2918                .map(|e| ObjectVersion {
2919                    key: Some(e.key),
2920                    version_id: Some(e.version_id),
2921                    is_latest: Some(e.is_latest),
2922                    e_tag: Some(ETag::Strong(e.etag)),
2923                    size: Some(e.size as i64),
2924                    last_modified: Some(std::time::SystemTime::from(e.last_modified).into()),
2925                    ..Default::default()
2926                })
2927                .collect();
2928            let delete_markers: Vec<DeleteMarkerEntry> = page
2929                .delete_markers
2930                .into_iter()
2931                .map(|e| DeleteMarkerEntry {
2932                    key: Some(e.key),
2933                    version_id: Some(e.version_id),
2934                    is_latest: Some(e.is_latest),
2935                    last_modified: Some(std::time::SystemTime::from(e.last_modified).into()),
2936                    ..Default::default()
2937                })
2938                .collect();
2939            let output = ListObjectVersionsOutput {
2940                name: Some(req.input.bucket.clone()),
2941                prefix: req.input.prefix.clone(),
2942                key_marker: req.input.key_marker.clone(),
2943                version_id_marker: req.input.version_id_marker.clone(),
2944                max_keys: req.input.max_keys,
2945                versions: if versions.is_empty() {
2946                    None
2947                } else {
2948                    Some(versions)
2949                },
2950                delete_markers: if delete_markers.is_empty() {
2951                    None
2952                } else {
2953                    Some(delete_markers)
2954                },
2955                is_truncated: Some(page.is_truncated),
2956                next_key_marker: page.next_key_marker,
2957                next_version_id_marker: page.next_version_id_marker,
2958                ..Default::default()
2959            };
2960            return Ok(S3Response::new(output));
2961        }
2962        // Legacy passthrough path (v0.4 #17 sidecar filter retained).
2963        let mut resp = self.backend.list_object_versions(req).await?;
2964        if let Some(versions) = resp.output.versions.as_mut() {
2965            versions.retain(|v| {
2966                v.key
2967                    .as_ref()
2968                    .map(|k| !k.ends_with(".s4index") && !is_versioning_shadow_key(k))
2969                    .unwrap_or(true)
2970            });
2971        }
2972        if let Some(markers) = resp.output.delete_markers.as_mut() {
2973            markers.retain(|m| {
2974                m.key
2975                    .as_ref()
2976                    .map(|k| !k.ends_with(".s4index") && !is_versioning_shadow_key(k))
2977                    .unwrap_or(true)
2978            });
2979        }
2980        Ok(resp)
2981    }
2982
2983    async fn create_multipart_upload(
2984        &self,
2985        mut req: S3Request<CreateMultipartUploadInput>,
2986    ) -> S3Result<S3Response<CreateMultipartUploadOutput>> {
2987        // Multipart object は per-part 圧縮 + frame 形式で書く。GET 時に
2988        // frame parse を起動するため、object metadata に flag を立てる。
2989        // codec は dispatcher の default kind を採用 (per-part 別 codec は Phase 2)。
2990        let codec_kind = self.registry.default_kind();
2991        let meta = req.input.metadata.get_or_insert_with(Default::default);
2992        meta.insert(META_MULTIPART.into(), "true".into());
2993        meta.insert(META_CODEC.into(), codec_kind.as_str().into());
2994        debug!(
2995            bucket = ?req.input.bucket,
2996            key = ?req.input.key,
2997            codec = codec_kind.as_str(),
2998            "S4 create_multipart_upload: marking object for per-part compression"
2999        );
3000        self.backend.create_multipart_upload(req).await
3001    }
3002
3003    async fn upload_part(
3004        &self,
3005        mut req: S3Request<UploadPartInput>,
3006    ) -> S3Result<S3Response<UploadPartOutput>> {
3007        // 各 part を圧縮して frame header 付きで forward。GET 時に
3008        // `decompress_multipart` が frame iter で順に解凍する。
3009        // **per-part codec dispatch**: dispatcher が body 先頭 sample から
3010        // codec を選ぶので、parquet 風の mixed-content multipart で part ごとに
3011        // 最適 codec を使える (整数列 part → Bitcomp、text 列 part → zstd 等)。
3012        if let Some(blob) = req.input.body.take() {
3013            let bytes = collect_blob(blob, self.max_body_bytes)
3014                .await
3015                .map_err(internal("collect upload_part body"))?;
3016            let sample_len = bytes.len().min(SAMPLE_BYTES);
3017            let codec_kind = self.dispatcher.pick(&bytes[..sample_len]).await;
3018            let original_size = bytes.len() as u64;
3019            let (compressed, manifest) = self
3020                .registry
3021                .compress(bytes, codec_kind)
3022                .await
3023                .map_err(internal("registry compress part"))?;
3024            let header = FrameHeader {
3025                codec: codec_kind,
3026                original_size,
3027                compressed_size: compressed.len() as u64,
3028                crc32c: manifest.crc32c,
3029            };
3030            let mut framed = BytesMut::with_capacity(FRAME_HEADER_BYTES + compressed.len());
3031            write_frame(&mut framed, header, &compressed);
3032            // v0.2 #5: heuristic-based padding skip for likely-final parts.
3033            //
3034            // AWS SDK / aws-cli / boto3 always send the final (and only the
3035            // final) part below the configured part_size. So if the raw user
3036            // part is already smaller than S3's 5 MiB multipart minimum, this
3037            // is overwhelmingly likely to be the final part — and the final
3038            // part is exempt from S3's size constraint. Skipping padding here
3039            // saves up to ~5 MiB per object on highly compressible workloads.
3040            //
3041            // If a misbehaving client sends a tiny **non-final** part, S3
3042            // itself rejects with EntityTooSmall at CompleteMultipartUpload —
3043            // identical outcome to a vanilla S3 PUT, just earlier than
3044            // padding-then-complete would catch it.
3045            let likely_final = original_size < S3_MULTIPART_MIN_PART_BYTES as u64;
3046            if !likely_final {
3047                pad_to_minimum(&mut framed, S3_MULTIPART_MIN_PART_BYTES);
3048            }
3049            let framed_bytes = framed.freeze();
3050            let new_len = framed_bytes.len() as i64;
3051            // 同じ wire 互換問題が multipart にもある (content-length / checksum)
3052            req.input.content_length = Some(new_len);
3053            req.input.checksum_algorithm = None;
3054            req.input.checksum_crc32 = None;
3055            req.input.checksum_crc32c = None;
3056            req.input.checksum_crc64nvme = None;
3057            req.input.checksum_sha1 = None;
3058            req.input.checksum_sha256 = None;
3059            req.input.content_md5 = None;
3060            req.input.body = Some(bytes_to_blob(framed_bytes));
3061            debug!(
3062                part_number = ?req.input.part_number,
3063                upload_id = ?req.input.upload_id,
3064                original_size,
3065                framed_size = new_len,
3066                "S4 upload_part: framed compressed payload"
3067            );
3068        }
3069        self.backend.upload_part(req).await
3070    }
3071    async fn complete_multipart_upload(
3072        &self,
3073        req: S3Request<CompleteMultipartUploadInput>,
3074    ) -> S3Result<S3Response<CompleteMultipartUploadOutput>> {
3075        let bucket = req.input.bucket.clone();
3076        let key = req.input.key.clone();
3077        let resp = self.backend.complete_multipart_upload(req).await?;
3078        // CompleteMultipartUpload 成功 → 完成した object を full fetch して frame
3079        // index を build、`<key>.s4index` sidecar として保存。これで Range GET の
3080        // partial fetch path が利用可能になる (Range request の帯域節約)。
3081        // 注: 巨大 object の場合この pass は重いが、Range query は一度 sidecar が
3082        // できれば爆速になるので 1 回の cost は payback される
3083        let bucket_clone = bucket.clone();
3084        let key_clone = key.clone();
3085        let get_input = GetObjectInput {
3086            bucket: bucket_clone.clone(),
3087            key: key_clone.clone(),
3088            ..Default::default()
3089        };
3090        let get_req = S3Request {
3091            input: get_input,
3092            method: http::Method::GET,
3093            uri: format!("/{bucket_clone}/{key_clone}").parse().unwrap(),
3094            headers: http::HeaderMap::new(),
3095            extensions: http::Extensions::new(),
3096            credentials: None,
3097            region: None,
3098            service: None,
3099            trailing_headers: None,
3100        };
3101        if let Ok(get_resp) = self.backend.get_object(get_req).await
3102            && let Some(blob) = get_resp.output.body
3103            && let Ok(body) = collect_blob(blob, self.max_body_bytes).await
3104            && let Ok(index) = build_index_from_body(&body)
3105        {
3106            self.write_sidecar(&bucket, &key, &index).await;
3107        }
3108        Ok(resp)
3109    }
3110    async fn abort_multipart_upload(
3111        &self,
3112        req: S3Request<AbortMultipartUploadInput>,
3113    ) -> S3Result<S3Response<AbortMultipartUploadOutput>> {
3114        self.backend.abort_multipart_upload(req).await
3115    }
3116    async fn list_multipart_uploads(
3117        &self,
3118        req: S3Request<ListMultipartUploadsInput>,
3119    ) -> S3Result<S3Response<ListMultipartUploadsOutput>> {
3120        self.backend.list_multipart_uploads(req).await
3121    }
3122    async fn list_parts(
3123        &self,
3124        req: S3Request<ListPartsInput>,
3125    ) -> S3Result<S3Response<ListPartsOutput>> {
3126        self.backend.list_parts(req).await
3127    }
3128
3129    // =========================================================================
3130    // Phase 2 — pure passthrough delegations。S4 はこれらに対して圧縮 hook を
3131    // 持たないので、backend (= AWS S3) の動作と完全に同一。
3132    //
3133    // 既知の制限事項:
3134    // - copy_object / upload_part_copy: source object が S4-compressed の場合、
3135    //   backend が bytes を copy するだけなので metadata (s4-codec etc) も一緒に
3136    //   coppied される (AWS S3 default = MetadataDirective COPY)。GET は manifest
3137    //   経由で正しく decompress できる。MetadataDirective REPLACE で上書き
3138    //   されると圧縮 metadata が消えて壊れる — 顧客側の運用で注意
3139    // - list_object_versions: versioning enabled bucket では各 version も S4
3140    //   metadata を維持する。古い version も S4 経由で正しく GET できる。
3141    // =========================================================================
3142
3143    // ---- Object ACL / tagging / attributes ----
3144    async fn get_object_acl(
3145        &self,
3146        req: S3Request<GetObjectAclInput>,
3147    ) -> S3Result<S3Response<GetObjectAclOutput>> {
3148        self.backend.get_object_acl(req).await
3149    }
3150    async fn put_object_acl(
3151        &self,
3152        req: S3Request<PutObjectAclInput>,
3153    ) -> S3Result<S3Response<PutObjectAclOutput>> {
3154        self.backend.put_object_acl(req).await
3155    }
3156    // v0.6 #39: object tagging — when a `TagManager` is attached the
3157    // configuration / per-(bucket, key) state lives in the manager and
3158    // these handlers serve directly from it; when no manager is
3159    // attached they fall back to the backend (legacy passthrough so
3160    // v0.5 deployments are unaffected).
3161    async fn get_object_tagging(
3162        &self,
3163        req: S3Request<GetObjectTaggingInput>,
3164    ) -> S3Result<S3Response<GetObjectTaggingOutput>> {
3165        let Some(mgr) = self.tagging.as_ref() else {
3166            return self.backend.get_object_tagging(req).await;
3167        };
3168        let tags = mgr
3169            .get_object_tags(&req.input.bucket, &req.input.key)
3170            .unwrap_or_default();
3171        Ok(S3Response::new(GetObjectTaggingOutput {
3172            tag_set: tagset_to_aws(&tags),
3173            ..Default::default()
3174        }))
3175    }
3176    async fn put_object_tagging(
3177        &self,
3178        req: S3Request<PutObjectTaggingInput>,
3179    ) -> S3Result<S3Response<PutObjectTaggingOutput>> {
3180        let Some(mgr) = self.tagging.as_ref() else {
3181            return self.backend.put_object_tagging(req).await;
3182        };
3183        let bucket = req.input.bucket.clone();
3184        let key = req.input.key.clone();
3185        let parsed = aws_to_tagset(&req.input.tagging.tag_set).map_err(|e| {
3186            S3Error::with_message(S3ErrorCode::InvalidArgument, e.to_string())
3187        })?;
3188        // v0.6 #39: gate via IAM policy with both the request tags
3189        // (`s3:RequestObjectTag/<key>`) and any existing tags on the
3190        // target object (`s3:ExistingObjectTag/<key>`).
3191        let existing = mgr.get_object_tags(&bucket, &key);
3192        self.enforce_policy_with_extra(
3193            &req,
3194            "s3:PutObjectTagging",
3195            &bucket,
3196            Some(&key),
3197            Some(&parsed),
3198            existing.as_ref(),
3199        )?;
3200        mgr.put_object_tags(&bucket, &key, parsed);
3201        Ok(S3Response::new(PutObjectTaggingOutput::default()))
3202    }
3203    async fn delete_object_tagging(
3204        &self,
3205        req: S3Request<DeleteObjectTaggingInput>,
3206    ) -> S3Result<S3Response<DeleteObjectTaggingOutput>> {
3207        let Some(mgr) = self.tagging.as_ref() else {
3208            return self.backend.delete_object_tagging(req).await;
3209        };
3210        let bucket = req.input.bucket.clone();
3211        let key = req.input.key.clone();
3212        let existing = mgr.get_object_tags(&bucket, &key);
3213        self.enforce_policy_with_extra(
3214            &req,
3215            "s3:DeleteObjectTagging",
3216            &bucket,
3217            Some(&key),
3218            None,
3219            existing.as_ref(),
3220        )?;
3221        mgr.delete_object_tags(&bucket, &key);
3222        Ok(S3Response::new(DeleteObjectTaggingOutput::default()))
3223    }
3224    async fn get_object_attributes(
3225        &self,
3226        req: S3Request<GetObjectAttributesInput>,
3227    ) -> S3Result<S3Response<GetObjectAttributesOutput>> {
3228        self.backend.get_object_attributes(req).await
3229    }
3230    async fn restore_object(
3231        &self,
3232        req: S3Request<RestoreObjectInput>,
3233    ) -> S3Result<S3Response<RestoreObjectOutput>> {
3234        self.backend.restore_object(req).await
3235    }
3236    async fn upload_part_copy(
3237        &self,
3238        req: S3Request<UploadPartCopyInput>,
3239    ) -> S3Result<S3Response<UploadPartCopyOutput>> {
3240        // v0.2 #6: byte-range aware copy when the source is S4-framed.
3241        //
3242        // For a framed source (multipart upload OR single-PUT framed-v2),
3243        // a naive byte-range passthrough would copy compressed bytes that
3244        // don't align with S4 frame boundaries — silently corrupting the
3245        // result. Instead we GET the source through S4 (which handles
3246        // decompression + Range), re-compress + re-frame as a new part,
3247        // and forward as upload_part. For non-framed sources (S4-untouched
3248        // raw objects), passthrough is correct and we keep the original
3249        // (cheaper) code path.
3250        let CopySource::Bucket {
3251            bucket: src_bucket,
3252            key: src_key,
3253            ..
3254        } = &req.input.copy_source
3255        else {
3256            return self.backend.upload_part_copy(req).await;
3257        };
3258        let src_bucket = src_bucket.to_string();
3259        let src_key = src_key.to_string();
3260
3261        // Probe metadata to decide whether the source needs S4-aware copy.
3262        let head_input = HeadObjectInput {
3263            bucket: src_bucket.clone(),
3264            key: src_key.clone(),
3265            ..Default::default()
3266        };
3267        let head_req = S3Request {
3268            input: head_input,
3269            method: http::Method::HEAD,
3270            uri: req.uri.clone(),
3271            headers: req.headers.clone(),
3272            extensions: http::Extensions::new(),
3273            credentials: req.credentials.clone(),
3274            region: req.region.clone(),
3275            service: req.service.clone(),
3276            trailing_headers: None,
3277        };
3278        let needs_s4_copy = match self.backend.head_object(head_req).await {
3279            Ok(h) => {
3280                is_multipart_object(&h.output.metadata) || is_framed_v2_object(&h.output.metadata)
3281            }
3282            Err(_) => false,
3283        };
3284        if !needs_s4_copy {
3285            return self.backend.upload_part_copy(req).await;
3286        }
3287
3288        // Resolve the optional source byte range to pass to GET.
3289        let source_range = req
3290            .input
3291            .copy_source_range
3292            .as_ref()
3293            .map(|r| parse_copy_source_range(r))
3294            .transpose()
3295            .map_err(|e| S3Error::with_message(S3ErrorCode::InvalidRange, e))?;
3296
3297        // GET source via S4 (handles decompression + sidecar partial fetch
3298        // when range is present). The result is the requested user-visible
3299        // byte range, fully decompressed.
3300        let mut get_input = GetObjectInput {
3301            bucket: src_bucket.clone(),
3302            key: src_key.clone(),
3303            ..Default::default()
3304        };
3305        get_input.range = source_range;
3306        let get_req = S3Request {
3307            input: get_input,
3308            method: http::Method::GET,
3309            uri: req.uri.clone(),
3310            headers: req.headers.clone(),
3311            extensions: http::Extensions::new(),
3312            credentials: req.credentials.clone(),
3313            region: req.region.clone(),
3314            service: req.service.clone(),
3315            trailing_headers: None,
3316        };
3317        let get_resp = self.get_object(get_req).await?;
3318        let blob = get_resp.output.body.ok_or_else(|| {
3319            S3Error::with_message(
3320                S3ErrorCode::InternalError,
3321                "upload_part_copy: empty body from source GET",
3322            )
3323        })?;
3324        let bytes = collect_blob(blob, self.max_body_bytes)
3325            .await
3326            .map_err(internal("collect upload_part_copy source body"))?;
3327
3328        // Compress + frame as a fresh part (mirrors upload_part path).
3329        let sample_len = bytes.len().min(SAMPLE_BYTES);
3330        let codec_kind = self.dispatcher.pick(&bytes[..sample_len]).await;
3331        let original_size = bytes.len() as u64;
3332        let (compressed, manifest) = self
3333            .registry
3334            .compress(bytes, codec_kind)
3335            .await
3336            .map_err(internal("registry compress upload_part_copy"))?;
3337        let header = FrameHeader {
3338            codec: codec_kind,
3339            original_size,
3340            compressed_size: compressed.len() as u64,
3341            crc32c: manifest.crc32c,
3342        };
3343        let mut framed = BytesMut::with_capacity(FRAME_HEADER_BYTES + compressed.len());
3344        write_frame(&mut framed, header, &compressed);
3345        let likely_final = original_size < S3_MULTIPART_MIN_PART_BYTES as u64;
3346        if !likely_final {
3347            pad_to_minimum(&mut framed, S3_MULTIPART_MIN_PART_BYTES);
3348        }
3349        let framed_bytes = framed.freeze();
3350        let framed_len = framed_bytes.len() as i64;
3351
3352        // Forward as upload_part to the destination multipart upload.
3353        let part_input = UploadPartInput {
3354            bucket: req.input.bucket.clone(),
3355            key: req.input.key.clone(),
3356            part_number: req.input.part_number,
3357            upload_id: req.input.upload_id.clone(),
3358            body: Some(bytes_to_blob(framed_bytes)),
3359            content_length: Some(framed_len),
3360            ..Default::default()
3361        };
3362        let part_req = S3Request {
3363            input: part_input,
3364            method: http::Method::PUT,
3365            uri: req.uri.clone(),
3366            headers: req.headers.clone(),
3367            extensions: http::Extensions::new(),
3368            credentials: req.credentials.clone(),
3369            region: req.region.clone(),
3370            service: req.service.clone(),
3371            trailing_headers: None,
3372        };
3373        let upload_resp = self.backend.upload_part(part_req).await?;
3374
3375        let copy_output = UploadPartCopyOutput {
3376            copy_part_result: Some(CopyPartResult {
3377                e_tag: upload_resp.output.e_tag.clone(),
3378                ..Default::default()
3379            }),
3380            ..Default::default()
3381        };
3382        Ok(S3Response::new(copy_output))
3383    }
3384
3385    // ---- Object lock / retention / legal hold (v0.5 #30) ----
3386    //
3387    // When an `ObjectLockManager` is attached the configuration / per-object
3388    // state lives in the manager and these handlers serve directly from it;
3389    // when no manager is attached they fall back to the backend (legacy
3390    // passthrough so v0.4 deployments are unaffected).
3391    async fn get_object_lock_configuration(
3392        &self,
3393        req: S3Request<GetObjectLockConfigurationInput>,
3394    ) -> S3Result<S3Response<GetObjectLockConfigurationOutput>> {
3395        if let Some(mgr) = self.object_lock.as_ref() {
3396            let cfg = mgr.bucket_default(&req.input.bucket).map(|d| {
3397                ObjectLockConfiguration {
3398                    object_lock_enabled: Some(ObjectLockEnabled::from_static(
3399                        ObjectLockEnabled::ENABLED,
3400                    )),
3401                    rule: Some(ObjectLockRule {
3402                        default_retention: Some(DefaultRetention {
3403                            days: Some(d.retention_days as i32),
3404                            mode: Some(ObjectLockRetentionMode::from_static(
3405                                match d.mode {
3406                                    crate::object_lock::LockMode::Governance => {
3407                                        ObjectLockRetentionMode::GOVERNANCE
3408                                    }
3409                                    crate::object_lock::LockMode::Compliance => {
3410                                        ObjectLockRetentionMode::COMPLIANCE
3411                                    }
3412                                },
3413                            )),
3414                            years: None,
3415                        }),
3416                    }),
3417                }
3418            });
3419            let output = GetObjectLockConfigurationOutput {
3420                object_lock_configuration: cfg,
3421            };
3422            return Ok(S3Response::new(output));
3423        }
3424        self.backend.get_object_lock_configuration(req).await
3425    }
3426    async fn put_object_lock_configuration(
3427        &self,
3428        req: S3Request<PutObjectLockConfigurationInput>,
3429    ) -> S3Result<S3Response<PutObjectLockConfigurationOutput>> {
3430        if let Some(mgr) = self.object_lock.as_ref() {
3431            let bucket = req.input.bucket.clone();
3432            if let Some(cfg) = req.input.object_lock_configuration.as_ref()
3433                && let Some(rule) = cfg.rule.as_ref()
3434                && let Some(d) = rule.default_retention.as_ref()
3435            {
3436                let mode = d
3437                    .mode
3438                    .as_ref()
3439                    .and_then(|m| crate::object_lock::LockMode::from_aws_str(m.as_str()))
3440                    .ok_or_else(|| {
3441                        S3Error::with_message(
3442                            S3ErrorCode::InvalidRequest,
3443                            "Object Lock default retention requires a valid Mode (GOVERNANCE | COMPLIANCE)",
3444                        )
3445                    })?;
3446                // S3 spec: exactly one of Days / Years (we accept Days
3447                // outright and convert Years → Days for storage; Years
3448                // is just a UX shorthand on the wire).
3449                let days: u32 = match (d.days, d.years) {
3450                    (Some(d), None) if d > 0 => d as u32,
3451                    (None, Some(y)) if y > 0 => (y as u32).saturating_mul(365),
3452                    _ => {
3453                        return Err(S3Error::with_message(
3454                            S3ErrorCode::InvalidRequest,
3455                            "Object Lock default retention requires exactly one of Days or Years (positive integer)",
3456                        ));
3457                    }
3458                };
3459                mgr.set_bucket_default(
3460                    &bucket,
3461                    crate::object_lock::BucketObjectLockDefault {
3462                        mode,
3463                        retention_days: days,
3464                    },
3465                );
3466            }
3467            return Ok(S3Response::new(PutObjectLockConfigurationOutput::default()));
3468        }
3469        self.backend.put_object_lock_configuration(req).await
3470    }
3471    async fn get_object_legal_hold(
3472        &self,
3473        req: S3Request<GetObjectLegalHoldInput>,
3474    ) -> S3Result<S3Response<GetObjectLegalHoldOutput>> {
3475        if let Some(mgr) = self.object_lock.as_ref() {
3476            let on = mgr
3477                .get(&req.input.bucket, &req.input.key)
3478                .map(|s| s.legal_hold_on)
3479                .unwrap_or(false);
3480            let status = ObjectLockLegalHoldStatus::from_static(if on {
3481                ObjectLockLegalHoldStatus::ON
3482            } else {
3483                ObjectLockLegalHoldStatus::OFF
3484            });
3485            let output = GetObjectLegalHoldOutput {
3486                legal_hold: Some(ObjectLockLegalHold {
3487                    status: Some(status),
3488                }),
3489            };
3490            return Ok(S3Response::new(output));
3491        }
3492        self.backend.get_object_legal_hold(req).await
3493    }
3494    async fn put_object_legal_hold(
3495        &self,
3496        req: S3Request<PutObjectLegalHoldInput>,
3497    ) -> S3Result<S3Response<PutObjectLegalHoldOutput>> {
3498        if let Some(mgr) = self.object_lock.as_ref() {
3499            let on = req
3500                .input
3501                .legal_hold
3502                .as_ref()
3503                .and_then(|h| h.status.as_ref())
3504                .map(|s| s.as_str().eq_ignore_ascii_case("ON"))
3505                .unwrap_or(false);
3506            mgr.set_legal_hold(&req.input.bucket, &req.input.key, on);
3507            return Ok(S3Response::new(PutObjectLegalHoldOutput::default()));
3508        }
3509        self.backend.put_object_legal_hold(req).await
3510    }
3511    async fn get_object_retention(
3512        &self,
3513        req: S3Request<GetObjectRetentionInput>,
3514    ) -> S3Result<S3Response<GetObjectRetentionOutput>> {
3515        if let Some(mgr) = self.object_lock.as_ref() {
3516            let retention = mgr
3517                .get(&req.input.bucket, &req.input.key)
3518                .filter(|s| s.mode.is_some() || s.retain_until.is_some())
3519                .map(|s| {
3520                    let mode = s.mode.map(|m| {
3521                        ObjectLockRetentionMode::from_static(match m {
3522                            crate::object_lock::LockMode::Governance => {
3523                                ObjectLockRetentionMode::GOVERNANCE
3524                            }
3525                            crate::object_lock::LockMode::Compliance => {
3526                                ObjectLockRetentionMode::COMPLIANCE
3527                            }
3528                        })
3529                    });
3530                    let until = s.retain_until.map(chrono_utc_to_timestamp);
3531                    ObjectLockRetention {
3532                        mode,
3533                        retain_until_date: until,
3534                    }
3535                });
3536            let output = GetObjectRetentionOutput { retention };
3537            return Ok(S3Response::new(output));
3538        }
3539        self.backend.get_object_retention(req).await
3540    }
3541    async fn put_object_retention(
3542        &self,
3543        req: S3Request<PutObjectRetentionInput>,
3544    ) -> S3Result<S3Response<PutObjectRetentionOutput>> {
3545        if let Some(mgr) = self.object_lock.as_ref() {
3546            let bucket = req.input.bucket.clone();
3547            let key = req.input.key.clone();
3548            let bypass = req.input.bypass_governance_retention.unwrap_or(false);
3549            let retention = req.input.retention.as_ref().ok_or_else(|| {
3550                S3Error::with_message(
3551                    S3ErrorCode::InvalidRequest,
3552                    "PutObjectRetention requires a Retention element",
3553                )
3554            })?;
3555            let new_mode = retention
3556                .mode
3557                .as_ref()
3558                .and_then(|m| crate::object_lock::LockMode::from_aws_str(m.as_str()));
3559            let new_until = retention
3560                .retain_until_date
3561                .as_ref()
3562                .map(timestamp_to_chrono_utc)
3563                .unwrap_or(None);
3564            let now = chrono::Utc::now();
3565            let existing = mgr.get(&bucket, &key).unwrap_or_default();
3566            // S3 immutability rules:
3567            //   - Compliance is one-way: once set, mode cannot move to
3568            //     Governance, and retain-until cannot be shortened.
3569            //   - Governance can be lengthened freely; shortened only
3570            //     with bypass=true.
3571            if let Some(existing_mode) = existing.mode
3572                && existing_mode == crate::object_lock::LockMode::Compliance
3573                && existing.is_locked(now)
3574            {
3575                if matches!(new_mode, Some(crate::object_lock::LockMode::Governance)) {
3576                    return Err(S3Error::with_message(
3577                        S3ErrorCode::AccessDenied,
3578                        "Cannot downgrade Compliance retention to Governance while lock is active",
3579                    ));
3580                }
3581                if let (Some(prev), Some(next)) = (existing.retain_until, new_until)
3582                    && next < prev
3583                {
3584                    return Err(S3Error::with_message(
3585                        S3ErrorCode::AccessDenied,
3586                        "Cannot shorten Compliance retention while lock is active",
3587                    ));
3588                }
3589            }
3590            if let Some(existing_mode) = existing.mode
3591                && existing_mode == crate::object_lock::LockMode::Governance
3592                && existing.is_locked(now)
3593                && !bypass
3594                && let (Some(prev), Some(next)) = (existing.retain_until, new_until)
3595                && next < prev
3596            {
3597                return Err(S3Error::with_message(
3598                    S3ErrorCode::AccessDenied,
3599                    "Shortening Governance retention requires x-amz-bypass-governance-retention: true",
3600                ));
3601            }
3602            let mut state = existing;
3603            if new_mode.is_some() {
3604                state.mode = new_mode;
3605            }
3606            if new_until.is_some() {
3607                state.retain_until = new_until;
3608            }
3609            mgr.set(&bucket, &key, state);
3610            return Ok(S3Response::new(PutObjectRetentionOutput::default()));
3611        }
3612        self.backend.put_object_retention(req).await
3613    }
3614
3615    // ---- Versioning ----
3616    // list_object_versions is implemented above in the compression-hook
3617    // section so it filters S4-internal sidecars (v0.4 #17) AND, when a
3618    // VersioningManager is attached (v0.5 #34), serves chains directly
3619    // from the in-memory index.
3620    async fn get_bucket_versioning(
3621        &self,
3622        req: S3Request<GetBucketVersioningInput>,
3623    ) -> S3Result<S3Response<GetBucketVersioningOutput>> {
3624        // v0.5 #34: when a VersioningManager is attached, the bucket's
3625        // versioning state lives in the manager (= S4-server's
3626        // authoritative source). Pass-through hits the backend only
3627        // when no manager is configured (legacy v0.4 behaviour).
3628        if let Some(mgr) = self.versioning.as_ref() {
3629            let output = match mgr.state(&req.input.bucket).as_aws_status() {
3630                Some(s) => GetBucketVersioningOutput {
3631                    status: Some(BucketVersioningStatus::from(s.to_owned())),
3632                    ..Default::default()
3633                },
3634                None => GetBucketVersioningOutput::default(),
3635            };
3636            return Ok(S3Response::new(output));
3637        }
3638        self.backend.get_bucket_versioning(req).await
3639    }
3640    async fn put_bucket_versioning(
3641        &self,
3642        req: S3Request<PutBucketVersioningInput>,
3643    ) -> S3Result<S3Response<PutBucketVersioningOutput>> {
3644        // v0.6 #42: MFA gating on the `PutBucketVersioning` request
3645        // itself. S3 spec: when the request body carries an
3646        // `MfaDelete` element (either `Enabled` or `Disabled`), the
3647        // request must include a valid `x-amz-mfa` token — both for
3648        // the *first* enable (so the operator can't quietly side-step
3649        // the gate by never enabling it) and for any subsequent
3650        // change (so a leaked credential alone can't disable MFA
3651        // Delete to bypass it on subsequent DELETEs). Requests that
3652        // omit the `MfaDelete` element entirely (i.e. they flip only
3653        // `Status`) skip this gate, matching AWS.
3654        if let Some(mgr) = self.mfa_delete.as_ref()
3655            && let Some(target_enabled) = req
3656                .input
3657                .versioning_configuration
3658                .mfa_delete
3659                .as_ref()
3660                .map(|m| m.as_str().eq_ignore_ascii_case("Enabled"))
3661        {
3662            let bucket = req.input.bucket.clone();
3663            let header = req.input.mfa.as_deref();
3664            let secret = mgr.lookup_secret(&bucket);
3665            let verified = match (header, secret.as_ref()) {
3666                (Some(h), Some(s)) => match crate::mfa::parse_mfa_header(h) {
3667                    Ok((serial, code)) => {
3668                        serial == s.serial
3669                            && crate::mfa::verify_totp(
3670                                &s.secret_base32,
3671                                &code,
3672                                current_unix_secs(),
3673                            )
3674                    }
3675                    Err(_) => false,
3676                },
3677                _ => false,
3678            };
3679            if !verified {
3680                crate::metrics::record_mfa_delete_denial(&bucket);
3681                let err = if header.is_none() {
3682                    crate::mfa::MfaError::Missing
3683                } else {
3684                    crate::mfa::MfaError::InvalidCode
3685                };
3686                return Err(mfa_error_to_s3(err));
3687            }
3688            mgr.set_bucket_state(&bucket, target_enabled);
3689        }
3690        // v0.5 #34: stash the new state in the manager, then forward to
3691        // the backend so any downstream that *also* tracks state
3692        // (e.g. a real S3 backend) stays in sync. Manager-attached but
3693        // backend rejection is treated as a soft-fail (state is still
3694        // owned by the manager).
3695        if let Some(mgr) = self.versioning.as_ref() {
3696            let new_state = match req
3697                .input
3698                .versioning_configuration
3699                .status
3700                .as_ref()
3701                .map(|s| s.as_str())
3702            {
3703                Some(s) if s.eq_ignore_ascii_case("Enabled") => {
3704                    crate::versioning::VersioningState::Enabled
3705                }
3706                Some(s) if s.eq_ignore_ascii_case("Suspended") => {
3707                    crate::versioning::VersioningState::Suspended
3708                }
3709                _ => crate::versioning::VersioningState::Unversioned,
3710            };
3711            mgr.set_state(&req.input.bucket, new_state);
3712            return Ok(S3Response::new(PutBucketVersioningOutput::default()));
3713        }
3714        self.backend.put_bucket_versioning(req).await
3715    }
3716
3717    // ---- Bucket location ----
3718    async fn get_bucket_location(
3719        &self,
3720        req: S3Request<GetBucketLocationInput>,
3721    ) -> S3Result<S3Response<GetBucketLocationOutput>> {
3722        self.backend.get_bucket_location(req).await
3723    }
3724
3725    // ---- Bucket policy ----
3726    async fn get_bucket_policy(
3727        &self,
3728        req: S3Request<GetBucketPolicyInput>,
3729    ) -> S3Result<S3Response<GetBucketPolicyOutput>> {
3730        self.backend.get_bucket_policy(req).await
3731    }
3732    async fn put_bucket_policy(
3733        &self,
3734        req: S3Request<PutBucketPolicyInput>,
3735    ) -> S3Result<S3Response<PutBucketPolicyOutput>> {
3736        self.backend.put_bucket_policy(req).await
3737    }
3738    async fn delete_bucket_policy(
3739        &self,
3740        req: S3Request<DeleteBucketPolicyInput>,
3741    ) -> S3Result<S3Response<DeleteBucketPolicyOutput>> {
3742        self.backend.delete_bucket_policy(req).await
3743    }
3744    async fn get_bucket_policy_status(
3745        &self,
3746        req: S3Request<GetBucketPolicyStatusInput>,
3747    ) -> S3Result<S3Response<GetBucketPolicyStatusOutput>> {
3748        self.backend.get_bucket_policy_status(req).await
3749    }
3750
3751    // ---- Bucket ACL ----
3752    async fn get_bucket_acl(
3753        &self,
3754        req: S3Request<GetBucketAclInput>,
3755    ) -> S3Result<S3Response<GetBucketAclOutput>> {
3756        self.backend.get_bucket_acl(req).await
3757    }
3758    async fn put_bucket_acl(
3759        &self,
3760        req: S3Request<PutBucketAclInput>,
3761    ) -> S3Result<S3Response<PutBucketAclOutput>> {
3762        self.backend.put_bucket_acl(req).await
3763    }
3764
3765    // ---- Bucket CORS (v0.6 #38) ----
3766    async fn get_bucket_cors(
3767        &self,
3768        req: S3Request<GetBucketCorsInput>,
3769    ) -> S3Result<S3Response<GetBucketCorsOutput>> {
3770        if let Some(mgr) = self.cors.as_ref() {
3771            let cfg = mgr.get(&req.input.bucket).ok_or_else(|| {
3772                S3Error::with_message(
3773                    S3ErrorCode::NoSuchCORSConfiguration,
3774                    "The CORS configuration does not exist".to_string(),
3775                )
3776            })?;
3777            let rules: Vec<CORSRule> = cfg
3778                .rules
3779                .into_iter()
3780                .map(|r| CORSRule {
3781                    allowed_headers: if r.allowed_headers.is_empty() {
3782                        None
3783                    } else {
3784                        Some(r.allowed_headers)
3785                    },
3786                    allowed_methods: r.allowed_methods,
3787                    allowed_origins: r.allowed_origins,
3788                    expose_headers: if r.expose_headers.is_empty() {
3789                        None
3790                    } else {
3791                        Some(r.expose_headers)
3792                    },
3793                    id: r.id,
3794                    max_age_seconds: r.max_age_seconds.map(|s| s as i32),
3795                })
3796                .collect();
3797            return Ok(S3Response::new(GetBucketCorsOutput {
3798                cors_rules: Some(rules),
3799            }));
3800        }
3801        self.backend.get_bucket_cors(req).await
3802    }
3803    async fn put_bucket_cors(
3804        &self,
3805        req: S3Request<PutBucketCorsInput>,
3806    ) -> S3Result<S3Response<PutBucketCorsOutput>> {
3807        if let Some(mgr) = self.cors.as_ref() {
3808            let cfg = crate::cors::CorsConfig {
3809                rules: req
3810                    .input
3811                    .cors_configuration
3812                    .cors_rules
3813                    .into_iter()
3814                    .map(|r| crate::cors::CorsRule {
3815                        allowed_origins: r.allowed_origins,
3816                        allowed_methods: r.allowed_methods,
3817                        allowed_headers: r.allowed_headers.unwrap_or_default(),
3818                        expose_headers: r.expose_headers.unwrap_or_default(),
3819                        max_age_seconds: r.max_age_seconds.and_then(|s| {
3820                            if s < 0 { None } else { Some(s as u32) }
3821                        }),
3822                        id: r.id,
3823                    })
3824                    .collect(),
3825            };
3826            mgr.put(&req.input.bucket, cfg);
3827            return Ok(S3Response::new(PutBucketCorsOutput::default()));
3828        }
3829        self.backend.put_bucket_cors(req).await
3830    }
3831    async fn delete_bucket_cors(
3832        &self,
3833        req: S3Request<DeleteBucketCorsInput>,
3834    ) -> S3Result<S3Response<DeleteBucketCorsOutput>> {
3835        if let Some(mgr) = self.cors.as_ref() {
3836            mgr.delete(&req.input.bucket);
3837            return Ok(S3Response::new(DeleteBucketCorsOutput::default()));
3838        }
3839        self.backend.delete_bucket_cors(req).await
3840    }
3841
3842    // ---- Bucket lifecycle (v0.6 #37) ----
3843    async fn get_bucket_lifecycle_configuration(
3844        &self,
3845        req: S3Request<GetBucketLifecycleConfigurationInput>,
3846    ) -> S3Result<S3Response<GetBucketLifecycleConfigurationOutput>> {
3847        if let Some(mgr) = self.lifecycle.as_ref() {
3848            let cfg = mgr.get(&req.input.bucket).ok_or_else(|| {
3849                S3Error::with_message(
3850                    S3ErrorCode::NoSuchLifecycleConfiguration,
3851                    "The lifecycle configuration does not exist".to_string(),
3852                )
3853            })?;
3854            let rules: Vec<LifecycleRule> = cfg.rules.iter().map(internal_rule_to_dto).collect();
3855            return Ok(S3Response::new(GetBucketLifecycleConfigurationOutput {
3856                rules: Some(rules),
3857                transition_default_minimum_object_size: None,
3858            }));
3859        }
3860        self.backend.get_bucket_lifecycle_configuration(req).await
3861    }
3862    async fn put_bucket_lifecycle_configuration(
3863        &self,
3864        req: S3Request<PutBucketLifecycleConfigurationInput>,
3865    ) -> S3Result<S3Response<PutBucketLifecycleConfigurationOutput>> {
3866        if let Some(mgr) = self.lifecycle.as_ref() {
3867            let bucket = req.input.bucket.clone();
3868            let dto_cfg = req.input.lifecycle_configuration.unwrap_or_default();
3869            let cfg = dto_lifecycle_to_internal(&dto_cfg);
3870            mgr.put(&bucket, cfg);
3871            return Ok(S3Response::new(
3872                PutBucketLifecycleConfigurationOutput::default(),
3873            ));
3874        }
3875        self.backend.put_bucket_lifecycle_configuration(req).await
3876    }
3877    async fn delete_bucket_lifecycle(
3878        &self,
3879        req: S3Request<DeleteBucketLifecycleInput>,
3880    ) -> S3Result<S3Response<DeleteBucketLifecycleOutput>> {
3881        if let Some(mgr) = self.lifecycle.as_ref() {
3882            mgr.delete(&req.input.bucket);
3883            return Ok(S3Response::new(DeleteBucketLifecycleOutput::default()));
3884        }
3885        self.backend.delete_bucket_lifecycle(req).await
3886    }
3887
3888    // ---- Bucket tagging (v0.6 #39) ----
3889    async fn get_bucket_tagging(
3890        &self,
3891        req: S3Request<GetBucketTaggingInput>,
3892    ) -> S3Result<S3Response<GetBucketTaggingOutput>> {
3893        let Some(mgr) = self.tagging.as_ref() else {
3894            return self.backend.get_bucket_tagging(req).await;
3895        };
3896        let tags = mgr.get_bucket_tags(&req.input.bucket).unwrap_or_default();
3897        Ok(S3Response::new(GetBucketTaggingOutput {
3898            tag_set: tagset_to_aws(&tags),
3899        }))
3900    }
3901    async fn put_bucket_tagging(
3902        &self,
3903        req: S3Request<PutBucketTaggingInput>,
3904    ) -> S3Result<S3Response<PutBucketTaggingOutput>> {
3905        let Some(mgr) = self.tagging.as_ref() else {
3906            return self.backend.put_bucket_tagging(req).await;
3907        };
3908        let bucket = req.input.bucket.clone();
3909        let parsed = aws_to_tagset(&req.input.tagging.tag_set).map_err(|e| {
3910            S3Error::with_message(S3ErrorCode::InvalidArgument, e.to_string())
3911        })?;
3912        self.enforce_policy(&req, "s3:PutBucketTagging", &bucket, None)?;
3913        mgr.put_bucket_tags(&bucket, parsed);
3914        Ok(S3Response::new(PutBucketTaggingOutput::default()))
3915    }
3916    async fn delete_bucket_tagging(
3917        &self,
3918        req: S3Request<DeleteBucketTaggingInput>,
3919    ) -> S3Result<S3Response<DeleteBucketTaggingOutput>> {
3920        let Some(mgr) = self.tagging.as_ref() else {
3921            return self.backend.delete_bucket_tagging(req).await;
3922        };
3923        let bucket = req.input.bucket.clone();
3924        self.enforce_policy(&req, "s3:PutBucketTagging", &bucket, None)?;
3925        mgr.delete_bucket_tags(&bucket);
3926        Ok(S3Response::new(DeleteBucketTaggingOutput::default()))
3927    }
3928
3929    // ---- Bucket encryption ----
3930    async fn get_bucket_encryption(
3931        &self,
3932        req: S3Request<GetBucketEncryptionInput>,
3933    ) -> S3Result<S3Response<GetBucketEncryptionOutput>> {
3934        self.backend.get_bucket_encryption(req).await
3935    }
3936    async fn put_bucket_encryption(
3937        &self,
3938        req: S3Request<PutBucketEncryptionInput>,
3939    ) -> S3Result<S3Response<PutBucketEncryptionOutput>> {
3940        self.backend.put_bucket_encryption(req).await
3941    }
3942    async fn delete_bucket_encryption(
3943        &self,
3944        req: S3Request<DeleteBucketEncryptionInput>,
3945    ) -> S3Result<S3Response<DeleteBucketEncryptionOutput>> {
3946        self.backend.delete_bucket_encryption(req).await
3947    }
3948
3949    // ---- Bucket logging ----
3950    async fn get_bucket_logging(
3951        &self,
3952        req: S3Request<GetBucketLoggingInput>,
3953    ) -> S3Result<S3Response<GetBucketLoggingOutput>> {
3954        self.backend.get_bucket_logging(req).await
3955    }
3956    async fn put_bucket_logging(
3957        &self,
3958        req: S3Request<PutBucketLoggingInput>,
3959    ) -> S3Result<S3Response<PutBucketLoggingOutput>> {
3960        self.backend.put_bucket_logging(req).await
3961    }
3962
3963    // ---- Bucket notification (v0.6 #35) ----
3964    //
3965    // When a `NotificationManager` is attached, S4 itself owns per-bucket
3966    // notification configurations and the PUT / GET handlers route through
3967    // the manager. The wire DTO's queue / topic configurations map onto
3968    // S4's `Destination::Sqs` / `Destination::Sns`; LambdaFunction and
3969    // EventBridge configurations are accepted on PUT but silently dropped
3970    // (out of scope for v0.6 #35). When no manager is attached the legacy
3971    // backend-passthrough behaviour applies.
3972    async fn get_bucket_notification_configuration(
3973        &self,
3974        req: S3Request<GetBucketNotificationConfigurationInput>,
3975    ) -> S3Result<S3Response<GetBucketNotificationConfigurationOutput>> {
3976        if let Some(mgr) = self.notifications.as_ref() {
3977            let cfg = mgr.get(&req.input.bucket).unwrap_or_default();
3978            let dto = notif_to_dto(&cfg);
3979            return Ok(S3Response::new(GetBucketNotificationConfigurationOutput {
3980                event_bridge_configuration: dto.event_bridge_configuration,
3981                lambda_function_configurations: dto.lambda_function_configurations,
3982                queue_configurations: dto.queue_configurations,
3983                topic_configurations: dto.topic_configurations,
3984            }));
3985        }
3986        self.backend
3987            .get_bucket_notification_configuration(req)
3988            .await
3989    }
3990    async fn put_bucket_notification_configuration(
3991        &self,
3992        req: S3Request<PutBucketNotificationConfigurationInput>,
3993    ) -> S3Result<S3Response<PutBucketNotificationConfigurationOutput>> {
3994        if let Some(mgr) = self.notifications.as_ref() {
3995            let cfg = notif_from_dto(&req.input.notification_configuration);
3996            mgr.put(&req.input.bucket, cfg);
3997            return Ok(S3Response::new(
3998                PutBucketNotificationConfigurationOutput::default(),
3999            ));
4000        }
4001        self.backend
4002            .put_bucket_notification_configuration(req)
4003            .await
4004    }
4005
4006    // ---- Bucket request payment ----
4007    async fn get_bucket_request_payment(
4008        &self,
4009        req: S3Request<GetBucketRequestPaymentInput>,
4010    ) -> S3Result<S3Response<GetBucketRequestPaymentOutput>> {
4011        self.backend.get_bucket_request_payment(req).await
4012    }
4013    async fn put_bucket_request_payment(
4014        &self,
4015        req: S3Request<PutBucketRequestPaymentInput>,
4016    ) -> S3Result<S3Response<PutBucketRequestPaymentOutput>> {
4017        self.backend.put_bucket_request_payment(req).await
4018    }
4019
4020    // ---- Bucket website ----
4021    async fn get_bucket_website(
4022        &self,
4023        req: S3Request<GetBucketWebsiteInput>,
4024    ) -> S3Result<S3Response<GetBucketWebsiteOutput>> {
4025        self.backend.get_bucket_website(req).await
4026    }
4027    async fn put_bucket_website(
4028        &self,
4029        req: S3Request<PutBucketWebsiteInput>,
4030    ) -> S3Result<S3Response<PutBucketWebsiteOutput>> {
4031        self.backend.put_bucket_website(req).await
4032    }
4033    async fn delete_bucket_website(
4034        &self,
4035        req: S3Request<DeleteBucketWebsiteInput>,
4036    ) -> S3Result<S3Response<DeleteBucketWebsiteOutput>> {
4037        self.backend.delete_bucket_website(req).await
4038    }
4039
4040    // ---- Bucket replication (v0.6 #40) ----
4041    async fn get_bucket_replication(
4042        &self,
4043        req: S3Request<GetBucketReplicationInput>,
4044    ) -> S3Result<S3Response<GetBucketReplicationOutput>> {
4045        if let Some(mgr) = self.replication.as_ref() {
4046            return match mgr.get(&req.input.bucket) {
4047                Some(cfg) => Ok(S3Response::new(GetBucketReplicationOutput {
4048                    replication_configuration: Some(replication_to_dto(&cfg)),
4049                })),
4050                None => Err(S3Error::with_message(
4051                    S3ErrorCode::Custom("ReplicationConfigurationNotFoundError".into()),
4052                    format!("no replication configuration on bucket {}", req.input.bucket),
4053                )),
4054            };
4055        }
4056        self.backend.get_bucket_replication(req).await
4057    }
4058    async fn put_bucket_replication(
4059        &self,
4060        req: S3Request<PutBucketReplicationInput>,
4061    ) -> S3Result<S3Response<PutBucketReplicationOutput>> {
4062        if let Some(mgr) = self.replication.as_ref() {
4063            let cfg = replication_from_dto(&req.input.replication_configuration);
4064            mgr.put(&req.input.bucket, cfg);
4065            return Ok(S3Response::new(PutBucketReplicationOutput::default()));
4066        }
4067        self.backend.put_bucket_replication(req).await
4068    }
4069    async fn delete_bucket_replication(
4070        &self,
4071        req: S3Request<DeleteBucketReplicationInput>,
4072    ) -> S3Result<S3Response<DeleteBucketReplicationOutput>> {
4073        if let Some(mgr) = self.replication.as_ref() {
4074            mgr.delete(&req.input.bucket);
4075            return Ok(S3Response::new(DeleteBucketReplicationOutput::default()));
4076        }
4077        self.backend.delete_bucket_replication(req).await
4078    }
4079
4080    // ---- Bucket accelerate ----
4081    async fn get_bucket_accelerate_configuration(
4082        &self,
4083        req: S3Request<GetBucketAccelerateConfigurationInput>,
4084    ) -> S3Result<S3Response<GetBucketAccelerateConfigurationOutput>> {
4085        self.backend.get_bucket_accelerate_configuration(req).await
4086    }
4087    async fn put_bucket_accelerate_configuration(
4088        &self,
4089        req: S3Request<PutBucketAccelerateConfigurationInput>,
4090    ) -> S3Result<S3Response<PutBucketAccelerateConfigurationOutput>> {
4091        self.backend.put_bucket_accelerate_configuration(req).await
4092    }
4093
4094    // ---- Bucket ownership controls ----
4095    async fn get_bucket_ownership_controls(
4096        &self,
4097        req: S3Request<GetBucketOwnershipControlsInput>,
4098    ) -> S3Result<S3Response<GetBucketOwnershipControlsOutput>> {
4099        self.backend.get_bucket_ownership_controls(req).await
4100    }
4101    async fn put_bucket_ownership_controls(
4102        &self,
4103        req: S3Request<PutBucketOwnershipControlsInput>,
4104    ) -> S3Result<S3Response<PutBucketOwnershipControlsOutput>> {
4105        self.backend.put_bucket_ownership_controls(req).await
4106    }
4107    async fn delete_bucket_ownership_controls(
4108        &self,
4109        req: S3Request<DeleteBucketOwnershipControlsInput>,
4110    ) -> S3Result<S3Response<DeleteBucketOwnershipControlsOutput>> {
4111        self.backend.delete_bucket_ownership_controls(req).await
4112    }
4113
4114    // ---- Public access block ----
4115    async fn get_public_access_block(
4116        &self,
4117        req: S3Request<GetPublicAccessBlockInput>,
4118    ) -> S3Result<S3Response<GetPublicAccessBlockOutput>> {
4119        self.backend.get_public_access_block(req).await
4120    }
4121    async fn put_public_access_block(
4122        &self,
4123        req: S3Request<PutPublicAccessBlockInput>,
4124    ) -> S3Result<S3Response<PutPublicAccessBlockOutput>> {
4125        self.backend.put_public_access_block(req).await
4126    }
4127    async fn delete_public_access_block(
4128        &self,
4129        req: S3Request<DeletePublicAccessBlockInput>,
4130    ) -> S3Result<S3Response<DeletePublicAccessBlockOutput>> {
4131        self.backend.delete_public_access_block(req).await
4132    }
4133
4134    // ====================================================================
4135    // v0.6 #41: S3 Select — server-side SQL filter on object body.
4136    //
4137    // Fetch the object via the regular `get_object` path (so SSE-C /
4138    // SSE-S4 / SSE-KMS / S4 codec all decompress + decrypt transparently),
4139    // run a small SQL subset (CSV + JSON Lines, equality / inequality /
4140    // LIKE / AND / OR / NOT) over the in-memory body, and stream the
4141    // matched rows back as AWS event-stream `Records` + `Stats` + `End`
4142    // frames.
4143    //
4144    // Limitations (deliberate, documented):
4145    //   - Parquet input is rejected with NotImplemented.
4146    //   - Aggregates / GROUP BY / JOIN / ORDER BY / LIMIT are rejected at
4147    //     parse time as InvalidRequest (s3s 0.13 doesn't expose AWS's
4148    //     domain-specific `InvalidSqlExpression` code).
4149    //   - The body is fully buffered before SQL evaluation (S3 Select
4150    //     streaming-during-evaluation is v0.7 scope).
4151    //   - GPU-accelerated WHERE evaluation is stubbed out (always None).
4152    async fn select_object_content(
4153        &self,
4154        req: S3Request<SelectObjectContentInput>,
4155    ) -> S3Result<S3Response<SelectObjectContentOutput>> {
4156        use crate::select::{
4157            EventStreamWriter, SelectInputFormat, SelectOutputFormat, run_select_csv,
4158            run_select_jsonlines,
4159        };
4160
4161        let select_bucket = req.input.bucket.clone();
4162        let select_key = req.input.key.clone();
4163        self.enforce_rate_limit(&req, &select_bucket)?;
4164        self.enforce_policy(
4165            &req,
4166            "s3:GetObject",
4167            &select_bucket,
4168            Some(&select_key),
4169        )?;
4170
4171        let request = req.input.request;
4172        let sql = request.expression.clone();
4173        if request.expression_type.as_str() != "SQL" {
4174            return Err(S3Error::with_message(
4175                S3ErrorCode::InvalidExpressionType,
4176                format!(
4177                    "ExpressionType must be SQL, got: {}",
4178                    request.expression_type.as_str()
4179                ),
4180            ));
4181        }
4182
4183        let input_format = if let Some(_json) = request.input_serialization.json.as_ref() {
4184            SelectInputFormat::JsonLines
4185        } else if let Some(csv) = request.input_serialization.csv.as_ref() {
4186            let has_header = csv
4187                .file_header_info
4188                .as_ref()
4189                .map(|h| {
4190                    let s = h.as_str();
4191                    s.eq_ignore_ascii_case("USE") || s.eq_ignore_ascii_case("IGNORE")
4192                })
4193                .unwrap_or(false);
4194            let delim = csv
4195                .field_delimiter
4196                .as_deref()
4197                .and_then(|s| s.chars().next())
4198                .unwrap_or(',');
4199            SelectInputFormat::Csv {
4200                has_header,
4201                delimiter: delim,
4202            }
4203        } else if request.input_serialization.parquet.is_some() {
4204            return Err(S3Error::with_message(
4205                S3ErrorCode::NotImplemented,
4206                "Parquet input is not supported by this S3 Select implementation (v0.6: CSV / JSON Lines only)",
4207            ));
4208        } else {
4209            return Err(S3Error::with_message(
4210                S3ErrorCode::InvalidRequest,
4211                "InputSerialization requires exactly one of CSV / JSON / Parquet",
4212            ));
4213        };
4214        if let Some(ct) = request.input_serialization.compression_type.as_ref()
4215            && !ct.as_str().eq_ignore_ascii_case("NONE")
4216        {
4217            return Err(S3Error::with_message(
4218                S3ErrorCode::NotImplemented,
4219                format!(
4220                    "InputSerialization CompressionType={} is not supported (v0.6: NONE only)",
4221                    ct.as_str()
4222                ),
4223            ));
4224        }
4225
4226        let output_format = if request.output_serialization.json.is_some() {
4227            SelectOutputFormat::Json
4228        } else if request.output_serialization.csv.is_some() {
4229            SelectOutputFormat::Csv
4230        } else {
4231            return Err(S3Error::with_message(
4232                S3ErrorCode::InvalidRequest,
4233                "OutputSerialization requires exactly one of CSV / JSON",
4234            ));
4235        };
4236
4237        let get_input = GetObjectInput {
4238            bucket: select_bucket.clone(),
4239            key: select_key.clone(),
4240            sse_customer_algorithm: req.input.sse_customer_algorithm.clone(),
4241            sse_customer_key: req.input.sse_customer_key.clone(),
4242            sse_customer_key_md5: req.input.sse_customer_key_md5.clone(),
4243            ..Default::default()
4244        };
4245        let get_req = S3Request {
4246            input: get_input,
4247            method: http::Method::GET,
4248            uri: format!("/{}/{}", select_bucket, select_key)
4249                .parse()
4250                .map_err(|e| {
4251                    S3Error::with_message(
4252                        S3ErrorCode::InternalError,
4253                        format!("constructing inner GET URI: {e}"),
4254                    )
4255                })?,
4256            headers: http::HeaderMap::new(),
4257            extensions: http::Extensions::new(),
4258            credentials: req.credentials.clone(),
4259            region: req.region.clone(),
4260            service: req.service.clone(),
4261            trailing_headers: None,
4262        };
4263        let mut get_resp = self.get_object(get_req).await?;
4264        let blob = get_resp.output.body.take().ok_or_else(|| {
4265            S3Error::with_message(
4266                S3ErrorCode::InternalError,
4267                "Select: object body was empty after GET",
4268            )
4269        })?;
4270        let body_bytes = crate::blob::collect_blob(blob, self.max_body_bytes)
4271            .await
4272            .map_err(internal("collect Select body"))?;
4273        let scanned = body_bytes.len() as u64;
4274
4275        let matched_payload = match input_format {
4276            SelectInputFormat::JsonLines => {
4277                run_select_jsonlines(&sql, &body_bytes, output_format).map_err(
4278                    |e| select_error_to_s3(e, "JSON Lines"),
4279                )?
4280            }
4281            SelectInputFormat::Csv { .. } => {
4282                run_select_csv(&sql, &body_bytes, input_format, output_format)
4283                    .map_err(|e| select_error_to_s3(e, "CSV"))?
4284            }
4285        };
4286
4287        let returned = matched_payload.len() as u64;
4288        let processed = scanned;
4289        let mut events: Vec<S3Result<SelectObjectContentEvent>> = Vec::with_capacity(3);
4290        if !matched_payload.is_empty() {
4291            events.push(Ok(SelectObjectContentEvent::Records(RecordsEvent {
4292                payload: Some(bytes::Bytes::from(matched_payload)),
4293            })));
4294        }
4295        events.push(Ok(SelectObjectContentEvent::Stats(StatsEvent {
4296            details: Some(Stats {
4297                bytes_scanned: Some(scanned as i64),
4298                bytes_processed: Some(processed as i64),
4299                bytes_returned: Some(returned as i64),
4300            }),
4301        })));
4302        events.push(Ok(SelectObjectContentEvent::End(EndEvent {})));
4303        // Touch EventStreamWriter so the public API stays linked into the
4304        // build (the actual wire framing is delegated to s3s).
4305        let _writer = EventStreamWriter::new();
4306
4307        let stream =
4308            SelectObjectContentEventStream::new(futures::stream::iter(events));
4309        let output = SelectObjectContentOutput {
4310            payload: Some(stream),
4311        };
4312        Ok(S3Response::new(output))
4313    }
4314
4315    // ---- Bucket Inventory configuration (v0.6 #36) ----
4316    //
4317    // When an `InventoryManager` is attached, S4-server owns the
4318    // configuration store and these handlers no longer pass through to
4319    // the backend. The mapping between the s3s-typed
4320    // `InventoryConfiguration` and the inventory module's internal
4321    // `InventoryConfig` is intentionally lossy: only the fields S4
4322    // actually uses for periodic CSV emission survive the round trip
4323    // (id, source bucket, destination bucket / prefix, format, included
4324    // versions, schedule frequency). Optional fields, encryption, and
4325    // filter prefixes are accepted on PUT and re-surfaced on GET via
4326    // a best-effort default-shape `InventoryConfiguration` so the
4327    // client sees a roundtrip-clean response.
4328    async fn put_bucket_inventory_configuration(
4329        &self,
4330        req: S3Request<PutBucketInventoryConfigurationInput>,
4331    ) -> S3Result<S3Response<PutBucketInventoryConfigurationOutput>> {
4332        if let Some(mgr) = self.inventory.as_ref() {
4333            let cfg = inv_from_dto(
4334                &req.input.bucket,
4335                &req.input.id,
4336                &req.input.inventory_configuration,
4337            );
4338            mgr.put(cfg);
4339            return Ok(S3Response::new(PutBucketInventoryConfigurationOutput::default()));
4340        }
4341        self.backend.put_bucket_inventory_configuration(req).await
4342    }
4343
4344    async fn get_bucket_inventory_configuration(
4345        &self,
4346        req: S3Request<GetBucketInventoryConfigurationInput>,
4347    ) -> S3Result<S3Response<GetBucketInventoryConfigurationOutput>> {
4348        if let Some(mgr) = self.inventory.as_ref() {
4349            let cfg = mgr.get(&req.input.bucket, &req.input.id);
4350            if let Some(cfg) = cfg {
4351                let out = GetBucketInventoryConfigurationOutput {
4352                    inventory_configuration: Some(inv_to_dto(&cfg)),
4353                };
4354                return Ok(S3Response::new(out));
4355            }
4356            // AWS returns `NoSuchConfiguration` (404) when the id has no
4357            // matching inventory configuration on the bucket. The
4358            // generated `S3ErrorCode` enum doesn't expose a typed variant
4359            // for this code, so we round-trip through `from_bytes` which
4360            // wraps unknown codes as `Custom(...)` (= the AWS-canonical
4361            // error-code string survives into the XML response envelope).
4362            let code = S3ErrorCode::from_bytes(b"NoSuchConfiguration")
4363                .unwrap_or(S3ErrorCode::NoSuchKey);
4364            return Err(S3Error::with_message(
4365                code,
4366                format!(
4367                    "no inventory configuration with id={} on bucket={}",
4368                    req.input.id, req.input.bucket
4369                ),
4370            ));
4371        }
4372        self.backend.get_bucket_inventory_configuration(req).await
4373    }
4374
4375    async fn list_bucket_inventory_configurations(
4376        &self,
4377        req: S3Request<ListBucketInventoryConfigurationsInput>,
4378    ) -> S3Result<S3Response<ListBucketInventoryConfigurationsOutput>> {
4379        if let Some(mgr) = self.inventory.as_ref() {
4380            let list = mgr.list_for_bucket(&req.input.bucket);
4381            let dto_list: Vec<InventoryConfiguration> = list.iter().map(inv_to_dto).collect();
4382            let out = ListBucketInventoryConfigurationsOutput {
4383                continuation_token: req.input.continuation_token.clone(),
4384                inventory_configuration_list: if dto_list.is_empty() {
4385                    None
4386                } else {
4387                    Some(dto_list)
4388                },
4389                is_truncated: Some(false),
4390                next_continuation_token: None,
4391            };
4392            return Ok(S3Response::new(out));
4393        }
4394        self.backend.list_bucket_inventory_configurations(req).await
4395    }
4396
4397    async fn delete_bucket_inventory_configuration(
4398        &self,
4399        req: S3Request<DeleteBucketInventoryConfigurationInput>,
4400    ) -> S3Result<S3Response<DeleteBucketInventoryConfigurationOutput>> {
4401        if let Some(mgr) = self.inventory.as_ref() {
4402            mgr.delete(&req.input.bucket, &req.input.id);
4403            return Ok(S3Response::new(
4404                DeleteBucketInventoryConfigurationOutput::default(),
4405            ));
4406        }
4407        self.backend.delete_bucket_inventory_configuration(req).await
4408    }
4409}
4410
4411// ---------------------------------------------------------------------------
4412// v0.6 #36: Convert between the s3s-typed `InventoryConfiguration` (the wire
4413// surface) and our internal `crate::inventory::InventoryConfig`. Only the
4414// fields S4 actually uses for CSV emission survive the round trip; the
4415// missing fields (filter prefix, optional fields, encryption) are dropped on
4416// PUT and re-rendered as the AWS-default shape on GET so the client sees a
4417// well-formed `InventoryConfiguration`.
4418// ---------------------------------------------------------------------------
4419
4420fn inv_from_dto(
4421    bucket: &str,
4422    id: &str,
4423    dto: &InventoryConfiguration,
4424) -> crate::inventory::InventoryConfig {
4425    let frequency_hours = match dto.schedule.frequency.as_str() {
4426        "Weekly" => 24 * 7,
4427        // Daily is the default; anything S4 doesn't recognise (incl.
4428        // empty, which is the s3s-default) maps to Daily so the
4429        // operator's PUT doesn't silently turn into a no-op cadence.
4430        _ => 24,
4431    };
4432    // Parquet/ORC are not supported (issue #36 scope); we still accept
4433    // the PUT so callers don't fail-loud, but we record CSV and rely on
4434    // the operator catching the discrepancy on GET.
4435    let format = crate::inventory::InventoryFormat::Csv;
4436    crate::inventory::InventoryConfig {
4437        id: id.to_owned(),
4438        bucket: bucket.to_owned(),
4439        destination_bucket: dto.destination.s3_bucket_destination.bucket.clone(),
4440        destination_prefix: dto
4441            .destination
4442            .s3_bucket_destination
4443            .prefix
4444            .clone()
4445            .unwrap_or_default(),
4446        frequency_hours,
4447        format,
4448        included_object_versions: crate::inventory::IncludedVersions::from_aws_str(
4449            dto.included_object_versions.as_str(),
4450        ),
4451    }
4452}
4453
4454fn inv_to_dto(cfg: &crate::inventory::InventoryConfig) -> InventoryConfiguration {
4455    InventoryConfiguration {
4456        id: cfg.id.clone(),
4457        is_enabled: true,
4458        included_object_versions: InventoryIncludedObjectVersions::from(
4459            cfg.included_object_versions.as_aws_str().to_owned(),
4460        ),
4461        destination: InventoryDestination {
4462            s3_bucket_destination: InventoryS3BucketDestination {
4463                account_id: None,
4464                bucket: cfg.destination_bucket.clone(),
4465                encryption: None,
4466                format: InventoryFormat::from(cfg.format.as_aws_str().to_owned()),
4467                prefix: if cfg.destination_prefix.is_empty() {
4468                    None
4469                } else {
4470                    Some(cfg.destination_prefix.clone())
4471                },
4472            },
4473        },
4474        schedule: InventorySchedule {
4475            // `frequency_hours == 168` -> Weekly; everything else maps to
4476            // Daily for the wire response (the manager keeps the precise
4477            // hour count internally for due-checking).
4478            frequency: InventoryFrequency::from(
4479                if cfg.frequency_hours == 24 * 7 {
4480                    "Weekly"
4481                } else {
4482                    "Daily"
4483                }
4484                .to_owned(),
4485            ),
4486        },
4487        filter: None,
4488        optional_fields: None,
4489    }
4490}
4491
4492// ---------------------------------------------------------------------------
4493// v0.6 #35: Convert between the s3s-typed `NotificationConfiguration` (the
4494// wire surface) and our internal `crate::notifications::NotificationConfig`.
4495//
4496// We support TopicConfiguration (-> Destination::Sns) and QueueConfiguration
4497// (-> Destination::Sqs). LambdaFunction and EventBridge configurations are
4498// silently dropped on PUT (out of scope for v0.6 #35); the GET response only
4499// surfaces topic / queue rules.
4500//
4501// The webhook destination has no AWS-native wire form: operators configure
4502// webhooks via the JSON snapshot file (`--notifications-state-file`) or by
4503// poking `NotificationManager::put` directly from a custom binary. This
4504// keeps the wire surface AWS-compatible while still letting the always-
4505// available `Webhook` destination be reachable.
4506// ---------------------------------------------------------------------------
4507
4508fn notif_from_dto(
4509    dto: &NotificationConfiguration,
4510) -> crate::notifications::NotificationConfig {
4511    let mut rules: Vec<crate::notifications::NotificationRule> = Vec::new();
4512    if let Some(topics) = dto.topic_configurations.as_ref() {
4513        for (idx, t) in topics.iter().enumerate() {
4514            let events = events_from_dto(&t.events);
4515            let (prefix, suffix) = filter_from_dto(t.filter.as_ref());
4516            rules.push(crate::notifications::NotificationRule {
4517                id: t.id.clone().unwrap_or_else(|| format!("topic-{idx}")),
4518                events,
4519                destination: crate::notifications::Destination::Sns {
4520                    topic_arn: t.topic_arn.clone(),
4521                },
4522                filter_prefix: prefix,
4523                filter_suffix: suffix,
4524            });
4525        }
4526    }
4527    if let Some(queues) = dto.queue_configurations.as_ref() {
4528        for (idx, q) in queues.iter().enumerate() {
4529            let events = events_from_dto(&q.events);
4530            let (prefix, suffix) = filter_from_dto(q.filter.as_ref());
4531            rules.push(crate::notifications::NotificationRule {
4532                id: q.id.clone().unwrap_or_else(|| format!("queue-{idx}")),
4533                events,
4534                destination: crate::notifications::Destination::Sqs {
4535                    queue_arn: q.queue_arn.clone(),
4536                },
4537                filter_prefix: prefix,
4538                filter_suffix: suffix,
4539            });
4540        }
4541    }
4542    crate::notifications::NotificationConfig { rules }
4543}
4544
4545fn notif_to_dto(
4546    cfg: &crate::notifications::NotificationConfig,
4547) -> NotificationConfiguration {
4548    let mut topics: Vec<TopicConfiguration> = Vec::new();
4549    let mut queues: Vec<QueueConfiguration> = Vec::new();
4550    for rule in &cfg.rules {
4551        let events: Vec<Event> = rule
4552            .events
4553            .iter()
4554            .map(|e| Event::from(e.as_aws_str().to_owned()))
4555            .collect();
4556        let filter = filter_to_dto(rule.filter_prefix.as_deref(), rule.filter_suffix.as_deref());
4557        match &rule.destination {
4558            crate::notifications::Destination::Sns { topic_arn } => {
4559                topics.push(TopicConfiguration {
4560                    events,
4561                    filter,
4562                    id: Some(rule.id.clone()),
4563                    topic_arn: topic_arn.clone(),
4564                });
4565            }
4566            crate::notifications::Destination::Sqs { queue_arn } => {
4567                queues.push(QueueConfiguration {
4568                    events,
4569                    filter,
4570                    id: Some(rule.id.clone()),
4571                    queue_arn: queue_arn.clone(),
4572                });
4573            }
4574            // Webhook destinations have no AWS wire equivalent — they
4575            // round-trip through the JSON snapshot only. Skip them on the
4576            // GET surface (an SDK consumer wouldn't know what to do with
4577            // them anyway).
4578            crate::notifications::Destination::Webhook { .. } => {}
4579        }
4580    }
4581    NotificationConfiguration {
4582        event_bridge_configuration: None,
4583        lambda_function_configurations: None,
4584        queue_configurations: if queues.is_empty() { None } else { Some(queues) },
4585        topic_configurations: if topics.is_empty() { None } else { Some(topics) },
4586    }
4587}
4588
4589fn events_from_dto(events: &[Event]) -> Vec<crate::notifications::EventType> {
4590    events
4591        .iter()
4592        .filter_map(|e| crate::notifications::EventType::from_aws_str(e.as_ref()))
4593        .collect()
4594}
4595
4596fn filter_from_dto(
4597    f: Option<&NotificationConfigurationFilter>,
4598) -> (Option<String>, Option<String>) {
4599    let Some(f) = f else {
4600        return (None, None);
4601    };
4602    let Some(key) = f.key.as_ref() else {
4603        return (None, None);
4604    };
4605    let Some(rules) = key.filter_rules.as_ref() else {
4606        return (None, None);
4607    };
4608    let mut prefix = None;
4609    let mut suffix = None;
4610    for r in rules {
4611        let name = r.name.as_ref().map(|n| n.as_str().to_ascii_lowercase());
4612        let value = r.value.clone();
4613        match name.as_deref() {
4614            Some("prefix") => prefix = value,
4615            Some("suffix") => suffix = value,
4616            _ => {}
4617        }
4618    }
4619    (prefix, suffix)
4620}
4621
4622fn filter_to_dto(
4623    prefix: Option<&str>,
4624    suffix: Option<&str>,
4625) -> Option<NotificationConfigurationFilter> {
4626    if prefix.is_none() && suffix.is_none() {
4627        return None;
4628    }
4629    let mut rules: Vec<FilterRule> = Vec::new();
4630    if let Some(p) = prefix {
4631        rules.push(FilterRule {
4632            name: Some(FilterRuleName::from("prefix".to_owned())),
4633            value: Some(p.to_owned()),
4634        });
4635    }
4636    if let Some(s) = suffix {
4637        rules.push(FilterRule {
4638            name: Some(FilterRuleName::from("suffix".to_owned())),
4639            value: Some(s.to_owned()),
4640        });
4641    }
4642    Some(NotificationConfigurationFilter {
4643        key: Some(S3KeyFilter {
4644            filter_rules: Some(rules),
4645        }),
4646    })
4647}
4648
4649// ---------------------------------------------------------------------------
4650// v0.6 #40: Convert between the s3s-typed `ReplicationConfiguration` (the
4651// wire surface) and our internal `crate::replication::ReplicationConfig`.
4652// AWS's `ReplicationRuleFilter` is a sum type — `Prefix | Tag | And { Prefix,
4653// Tags }`; we flatten it into the single `(prefix, tag-vec)` representation
4654// the matcher needs. Sub-blocks v0.6 #40 does not implement
4655// (DeleteMarkerReplication / SourceSelectionCriteria / ReplicationTime /
4656// Metrics / EncryptionConfiguration) round-trip as `None` on GET — operators
4657// who set them on PUT see them silently dropped, mirroring "feature not
4658// supported in this release" semantics.
4659// ---------------------------------------------------------------------------
4660
4661fn replication_from_dto(
4662    dto: &ReplicationConfiguration,
4663) -> crate::replication::ReplicationConfig {
4664    let rules = dto
4665        .rules
4666        .iter()
4667        .enumerate()
4668        .map(|(idx, r)| {
4669            let id = r
4670                .id
4671                .as_ref()
4672                .map(|s| s.as_str().to_owned())
4673                .unwrap_or_else(|| format!("rule-{idx}"));
4674            let priority = r.priority.unwrap_or(0).max(0) as u32;
4675            let status_enabled = r.status.as_str() == ReplicationRuleStatus::ENABLED;
4676            let filter = replication_filter_from_dto(r.filter.as_ref(), r.prefix.as_deref());
4677            let destination_bucket = r.destination.bucket.clone();
4678            let destination_storage_class = r
4679                .destination
4680                .storage_class
4681                .as_ref()
4682                .map(|s| s.as_str().to_owned());
4683            crate::replication::ReplicationRule {
4684                id,
4685                priority,
4686                status_enabled,
4687                filter,
4688                destination_bucket,
4689                destination_storage_class,
4690            }
4691        })
4692        .collect();
4693    crate::replication::ReplicationConfig {
4694        role: dto.role.clone(),
4695        rules,
4696    }
4697}
4698
4699fn replication_to_dto(
4700    cfg: &crate::replication::ReplicationConfig,
4701) -> ReplicationConfiguration {
4702    let rules = cfg
4703        .rules
4704        .iter()
4705        .map(|r| {
4706            let status = if r.status_enabled {
4707                ReplicationRuleStatus::from_static(ReplicationRuleStatus::ENABLED)
4708            } else {
4709                ReplicationRuleStatus::from_static(ReplicationRuleStatus::DISABLED)
4710            };
4711            let destination = Destination {
4712                access_control_translation: None,
4713                account: None,
4714                bucket: r.destination_bucket.clone(),
4715                encryption_configuration: None,
4716                metrics: None,
4717                replication_time: None,
4718                storage_class: r
4719                    .destination_storage_class
4720                    .as_ref()
4721                    .map(|s| StorageClass::from(s.clone())),
4722            };
4723            let filter = Some(replication_filter_to_dto(&r.filter));
4724            ReplicationRule {
4725                delete_marker_replication: None,
4726                destination,
4727                existing_object_replication: None,
4728                filter,
4729                id: Some(r.id.clone()),
4730                prefix: None,
4731                priority: Some(r.priority as i32),
4732                source_selection_criteria: None,
4733                status,
4734            }
4735        })
4736        .collect();
4737    ReplicationConfiguration {
4738        role: cfg.role.clone(),
4739        rules,
4740    }
4741}
4742
4743fn replication_filter_from_dto(
4744    f: Option<&ReplicationRuleFilter>,
4745    rule_level_prefix: Option<&str>,
4746) -> crate::replication::ReplicationFilter {
4747    let mut prefix: Option<String> = rule_level_prefix.map(str::to_owned);
4748    let mut tags: Vec<(String, String)> = Vec::new();
4749    if let Some(f) = f {
4750        if let Some(p) = f.prefix.as_ref()
4751            && prefix.is_none()
4752        {
4753            prefix = Some(p.clone());
4754        }
4755        if let Some(t) = f.tag.as_ref()
4756            && let (Some(k), Some(v)) = (t.key.as_ref(), t.value.as_ref())
4757        {
4758            tags.push((k.clone(), v.clone()));
4759        }
4760        if let Some(and) = f.and.as_ref() {
4761            if let Some(p) = and.prefix.as_ref()
4762                && prefix.is_none()
4763            {
4764                prefix = Some(p.clone());
4765            }
4766            if let Some(ts) = and.tags.as_ref() {
4767                for t in ts {
4768                    if let (Some(k), Some(v)) = (t.key.as_ref(), t.value.as_ref()) {
4769                        tags.push((k.clone(), v.clone()));
4770                    }
4771                }
4772            }
4773        }
4774    }
4775    crate::replication::ReplicationFilter { prefix, tags }
4776}
4777
4778fn replication_filter_to_dto(
4779    f: &crate::replication::ReplicationFilter,
4780) -> ReplicationRuleFilter {
4781    if f.tags.is_empty() {
4782        ReplicationRuleFilter {
4783            and: None,
4784            prefix: f.prefix.clone(),
4785            tag: None,
4786        }
4787    } else if f.tags.len() == 1 && f.prefix.is_none() {
4788        let (k, v) = &f.tags[0];
4789        ReplicationRuleFilter {
4790            and: None,
4791            prefix: None,
4792            tag: Some(Tag {
4793                key: Some(k.clone()),
4794                value: Some(v.clone()),
4795            }),
4796        }
4797    } else {
4798        let tags: Vec<Tag> = f
4799            .tags
4800            .iter()
4801            .map(|(k, v)| Tag {
4802                key: Some(k.clone()),
4803                value: Some(v.clone()),
4804            })
4805            .collect();
4806        ReplicationRuleFilter {
4807            and: Some(ReplicationRuleAndOperator {
4808                prefix: f.prefix.clone(),
4809                tags: Some(tags),
4810            }),
4811            prefix: None,
4812            tag: None,
4813        }
4814    }
4815}
4816
4817// ---------------------------------------------------------------------------
4818// v0.6 #37: Convert between the s3s-typed `BucketLifecycleConfiguration`
4819// (the wire surface) and our internal `crate::lifecycle::LifecycleConfig`.
4820// The internal representation flattens AWS's "Filter | And" disjunction
4821// into a single `LifecycleFilter` struct of optional fields plus a tag
4822// vector. Fields S4's evaluator does not consume
4823// (`expired_object_delete_marker`, `noncurrent_version_transitions`,
4824// `transition_default_minimum_object_size`, the storage class on the
4825// noncurrent expiration) are dropped on PUT and re-rendered as their
4826// AWS-default shape on GET so the client always sees a well-formed
4827// configuration.
4828// ---------------------------------------------------------------------------
4829
4830fn dto_lifecycle_to_internal(
4831    dto: &BucketLifecycleConfiguration,
4832) -> crate::lifecycle::LifecycleConfig {
4833    crate::lifecycle::LifecycleConfig {
4834        rules: dto.rules.iter().map(dto_rule_to_internal).collect(),
4835    }
4836}
4837
4838fn dto_rule_to_internal(rule: &LifecycleRule) -> crate::lifecycle::LifecycleRule {
4839    let status = crate::lifecycle::LifecycleStatus::from_aws_str(rule.status.as_str());
4840    let filter = rule
4841        .filter
4842        .as_ref()
4843        .map(dto_filter_to_internal)
4844        .unwrap_or_default();
4845    let expiration_days = rule
4846        .expiration
4847        .as_ref()
4848        .and_then(|e| e.days)
4849        .and_then(|d| u32::try_from(d).ok());
4850    let expiration_date = rule
4851        .expiration
4852        .as_ref()
4853        .and_then(|e| e.date.as_ref())
4854        .and_then(timestamp_to_chrono_utc);
4855    let transitions: Vec<crate::lifecycle::TransitionRule> = rule
4856        .transitions
4857        .as_ref()
4858        .map(|ts| {
4859            ts.iter()
4860                .filter_map(|t| {
4861                    let days = u32::try_from(t.days?).ok()?;
4862                    let storage_class = t.storage_class.as_ref()?.as_str().to_owned();
4863                    Some(crate::lifecycle::TransitionRule {
4864                        days,
4865                        storage_class,
4866                    })
4867                })
4868                .collect()
4869        })
4870        .unwrap_or_default();
4871    let noncurrent_version_expiration_days = rule
4872        .noncurrent_version_expiration
4873        .as_ref()
4874        .and_then(|n| n.noncurrent_days)
4875        .and_then(|d| u32::try_from(d).ok());
4876    let abort_incomplete_multipart_upload_days = rule
4877        .abort_incomplete_multipart_upload
4878        .as_ref()
4879        .and_then(|a| a.days_after_initiation)
4880        .and_then(|d| u32::try_from(d).ok());
4881    crate::lifecycle::LifecycleRule {
4882        id: rule.id.clone().unwrap_or_default(),
4883        status,
4884        filter,
4885        expiration_days,
4886        expiration_date,
4887        transitions,
4888        noncurrent_version_expiration_days,
4889        abort_incomplete_multipart_upload_days,
4890    }
4891}
4892
4893fn dto_filter_to_internal(filter: &LifecycleRuleFilter) -> crate::lifecycle::LifecycleFilter {
4894    let mut prefix = filter.prefix.clone();
4895    let mut tags: Vec<(String, String)> = Vec::new();
4896    let mut size_gt: Option<u64> = filter
4897        .object_size_greater_than
4898        .and_then(|n| u64::try_from(n).ok());
4899    let mut size_lt: Option<u64> = filter
4900        .object_size_less_than
4901        .and_then(|n| u64::try_from(n).ok());
4902    if let Some(t) = &filter.tag
4903        && let (Some(k), Some(v)) = (t.key.as_ref(), t.value.as_ref())
4904    {
4905        tags.push((k.clone(), v.clone()));
4906    }
4907    if let Some(and) = &filter.and {
4908        if prefix.is_none() {
4909            prefix = and.prefix.clone();
4910        }
4911        if size_gt.is_none() {
4912            size_gt = and
4913                .object_size_greater_than
4914                .and_then(|n| u64::try_from(n).ok());
4915        }
4916        if size_lt.is_none() {
4917            size_lt = and
4918                .object_size_less_than
4919                .and_then(|n| u64::try_from(n).ok());
4920        }
4921        if let Some(ts) = &and.tags {
4922            for t in ts {
4923                if let (Some(k), Some(v)) = (t.key.as_ref(), t.value.as_ref()) {
4924                    tags.push((k.clone(), v.clone()));
4925                }
4926            }
4927        }
4928    }
4929    crate::lifecycle::LifecycleFilter {
4930        prefix,
4931        tags,
4932        object_size_greater_than: size_gt,
4933        object_size_less_than: size_lt,
4934    }
4935}
4936
4937fn internal_rule_to_dto(rule: &crate::lifecycle::LifecycleRule) -> LifecycleRule {
4938    let expiration = if rule.expiration_days.is_some() || rule.expiration_date.is_some() {
4939        Some(LifecycleExpiration {
4940            date: rule.expiration_date.map(chrono_utc_to_timestamp),
4941            days: rule.expiration_days.map(|d| d as i32),
4942            expired_object_delete_marker: None,
4943        })
4944    } else {
4945        None
4946    };
4947    let transitions: Option<TransitionList> = if rule.transitions.is_empty() {
4948        None
4949    } else {
4950        Some(
4951            rule.transitions
4952                .iter()
4953                .map(|t| Transition {
4954                    date: None,
4955                    days: Some(t.days as i32),
4956                    storage_class: Some(TransitionStorageClass::from(t.storage_class.clone())),
4957                })
4958                .collect(),
4959        )
4960    };
4961    let noncurrent_version_expiration =
4962        rule.noncurrent_version_expiration_days
4963            .map(|d| NoncurrentVersionExpiration {
4964                newer_noncurrent_versions: None,
4965                noncurrent_days: Some(d as i32),
4966            });
4967    let abort_incomplete_multipart_upload =
4968        rule.abort_incomplete_multipart_upload_days
4969            .map(|d| AbortIncompleteMultipartUpload {
4970                days_after_initiation: Some(d as i32),
4971            });
4972    let filter = if rule.filter.tags.is_empty()
4973        && rule.filter.object_size_greater_than.is_none()
4974        && rule.filter.object_size_less_than.is_none()
4975    {
4976        rule.filter.prefix.as_ref().map(|p| LifecycleRuleFilter {
4977            and: None,
4978            object_size_greater_than: None,
4979            object_size_less_than: None,
4980            prefix: Some(p.clone()),
4981            tag: None,
4982        })
4983    } else if rule.filter.tags.len() == 1
4984        && rule.filter.prefix.is_none()
4985        && rule.filter.object_size_greater_than.is_none()
4986        && rule.filter.object_size_less_than.is_none()
4987    {
4988        let (k, v) = rule.filter.tags[0].clone();
4989        Some(LifecycleRuleFilter {
4990            and: None,
4991            object_size_greater_than: None,
4992            object_size_less_than: None,
4993            prefix: None,
4994            tag: Some(Tag {
4995                key: Some(k),
4996                value: Some(v),
4997            }),
4998        })
4999    } else {
5000        let tags = if rule.filter.tags.is_empty() {
5001            None
5002        } else {
5003            Some(
5004                rule.filter
5005                    .tags
5006                    .iter()
5007                    .map(|(k, v)| Tag {
5008                        key: Some(k.clone()),
5009                        value: Some(v.clone()),
5010                    })
5011                    .collect(),
5012            )
5013        };
5014        Some(LifecycleRuleFilter {
5015            and: Some(LifecycleRuleAndOperator {
5016                object_size_greater_than: rule
5017                    .filter
5018                    .object_size_greater_than
5019                    .and_then(|n| i64::try_from(n).ok()),
5020                object_size_less_than: rule
5021                    .filter
5022                    .object_size_less_than
5023                    .and_then(|n| i64::try_from(n).ok()),
5024                prefix: rule.filter.prefix.clone(),
5025                tags,
5026            }),
5027            object_size_greater_than: None,
5028            object_size_less_than: None,
5029            prefix: None,
5030            tag: None,
5031        })
5032    };
5033    LifecycleRule {
5034        abort_incomplete_multipart_upload,
5035        expiration,
5036        filter,
5037        id: if rule.id.is_empty() {
5038            None
5039        } else {
5040            Some(rule.id.clone())
5041        },
5042        noncurrent_version_expiration,
5043        noncurrent_version_transitions: None,
5044        prefix: None,
5045        status: ExpirationStatus::from(rule.status.as_aws_str().to_owned()),
5046        transitions,
5047    }
5048}
5049
5050// (timestamp <-> chrono helpers `timestamp_to_chrono_utc` /
5051// `chrono_utc_to_timestamp` are defined earlier in this file for the
5052// tagging/notifications work; the lifecycle DTO converters reuse them.)
5053
5054// ---------------------------------------------------------------------------
5055// v0.5 #33: SigV4a (asymmetric ECDSA-P256) integration hook.
5056//
5057// Kept as a self-contained block at the bottom of the file so it doesn't
5058// touch the existing `S4Service` struct, `new()`, or any of the per-op
5059// handlers above. The hook is wired in by the binary at server-build time
5060// as a hyper middleware layer (see `main.rs`), NOT inside `S4Service`.
5061//
5062// Lifecycle:
5063//   1. `SigV4aGate::new(store)` is constructed once at boot from the
5064//      operator-supplied credential directory.
5065//   2. For each incoming request, `SigV4aGate::pre_route(&req,
5066//      &requested_region, &canonical_request_bytes)` is invoked BEFORE
5067//      the request hits the S3 framework. If the request claims SigV4a
5068//      and verifies, control returns to the framework. Otherwise a 403
5069//      `SignatureDoesNotMatch` is produced.
5070//   3. Plain SigV4 (HMAC-SHA256) requests pass through untouched.
5071// ---------------------------------------------------------------------------
5072
5073/// Gate that fronts the S3 service path with SigV4a verification (v0.5 #33).
5074///
5075/// Wraps a [`crate::sigv4a::SigV4aCredentialStore`] and exposes a single
5076/// `pre_route` entry point that returns `Ok(())` for both
5077/// "request is plain SigV4 — pass through" and "request is SigV4a and
5078/// verified", and an `Err(...)` containing a 403-equivalent diagnostic
5079/// otherwise. Cheap to clone (the inner store is `Arc`-backed).
5080#[derive(Debug, Clone)]
5081pub struct SigV4aGate {
5082    store: crate::sigv4a::SharedSigV4aCredentialStore,
5083}
5084
5085impl SigV4aGate {
5086    #[must_use]
5087    pub fn new(store: crate::sigv4a::SharedSigV4aCredentialStore) -> Self {
5088        Self { store }
5089    }
5090
5091    /// Inspect an incoming HTTP request. Behaviour:
5092    ///
5093    /// - Not SigV4a (no `X-Amz-Region-Set` and no SigV4a `Authorization`
5094    ///   prefix) → returns `Ok(())`; the framework's existing SigV4
5095    ///   path handles the request.
5096    /// - SigV4a + valid signature + region match → `Ok(())`.
5097    /// - SigV4a + unknown access-key-id → `Err` with `InvalidAccessKeyId`.
5098    /// - SigV4a + bad signature / region mismatch → `Err` with
5099    ///   `SignatureDoesNotMatch`.
5100    ///
5101    /// `canonical_request_bytes` is the SigV4a string-to-sign (or
5102    /// canonical-request bytes; the caller decides) that the framework
5103    /// has already produced for this request. Keeping it as a parameter
5104    /// instead of rebuilding it inside the hook avoids duplicating the
5105    /// canonicalisation logic.
5106    pub fn pre_route<B>(
5107        &self,
5108        req: &http::Request<B>,
5109        requested_region: &str,
5110        canonical_request_bytes: &[u8],
5111    ) -> Result<(), SigV4aGateError> {
5112        if !crate::sigv4a::detect(req) {
5113            return Ok(());
5114        }
5115        let auth_hdr = req
5116            .headers()
5117            .get(http::header::AUTHORIZATION)
5118            .and_then(|v| v.to_str().ok())
5119            .ok_or(SigV4aGateError::MissingAuthorization)?;
5120        let parsed = crate::sigv4a::parse_authorization_header(auth_hdr)
5121            .ok_or(SigV4aGateError::MalformedAuthorization)?;
5122        let region_set = req
5123            .headers()
5124            .get(crate::sigv4a::REGION_SET_HEADER)
5125            .and_then(|v| v.to_str().ok())
5126            .unwrap_or("*");
5127        let key = self
5128            .store
5129            .get(&parsed.access_key_id)
5130            .ok_or_else(|| SigV4aGateError::UnknownAccessKey(parsed.access_key_id.clone()))?;
5131        crate::sigv4a::verify(
5132            &crate::sigv4a::CanonicalRequest::new(canonical_request_bytes),
5133            &parsed.signature_der,
5134            key,
5135            region_set,
5136            requested_region,
5137        )
5138        .map_err(SigV4aGateError::Verify)?;
5139        Ok(())
5140    }
5141}
5142
5143/// Failure modes from [`SigV4aGate::pre_route`]. All variants map to
5144/// HTTP 403 with one of the two AWS-standard error codes
5145/// (`InvalidAccessKeyId` or `SignatureDoesNotMatch`).
5146#[derive(Debug, thiserror::Error)]
5147pub enum SigV4aGateError {
5148    #[error("missing Authorization header")]
5149    MissingAuthorization,
5150    #[error("malformed SigV4a Authorization header")]
5151    MalformedAuthorization,
5152    #[error("unknown SigV4a access-key-id: {0}")]
5153    UnknownAccessKey(String),
5154    #[error("SigV4a verification failed: {0}")]
5155    Verify(#[source] crate::sigv4a::SigV4aError),
5156}
5157
5158impl SigV4aGateError {
5159    /// AWS S3 error code that should accompany a 403 response.
5160    #[must_use]
5161    pub fn s3_error_code(&self) -> &'static str {
5162        match self {
5163            Self::UnknownAccessKey(_) => "InvalidAccessKeyId",
5164            _ => "SignatureDoesNotMatch",
5165        }
5166    }
5167}
5168
5169#[cfg(test)]
5170mod tests {
5171    use super::*;
5172
5173    #[test]
5174    fn manifest_roundtrip_via_metadata() {
5175        let original = ChunkManifest {
5176            codec: CodecKind::CpuZstd,
5177            original_size: 1234,
5178            compressed_size: 567,
5179            crc32c: 0xdead_beef,
5180        };
5181        let mut meta: Option<Metadata> = None;
5182        write_manifest(&mut meta, &original);
5183        let extracted = extract_manifest(&meta).expect("manifest must round-trip");
5184        assert_eq!(extracted.codec, original.codec);
5185        assert_eq!(extracted.original_size, original.original_size);
5186        assert_eq!(extracted.compressed_size, original.compressed_size);
5187        assert_eq!(extracted.crc32c, original.crc32c);
5188    }
5189
5190    #[test]
5191    fn missing_metadata_yields_none() {
5192        let meta: Option<Metadata> = None;
5193        assert!(extract_manifest(&meta).is_none());
5194    }
5195
5196    #[test]
5197    fn partial_metadata_yields_none() {
5198        let mut meta = Metadata::new();
5199        meta.insert(META_CODEC.into(), "cpu-zstd".into());
5200        let opt = Some(meta);
5201        assert!(extract_manifest(&opt).is_none());
5202    }
5203
5204    #[test]
5205    fn parse_copy_source_range_basic() {
5206        let r = parse_copy_source_range("bytes=10-20").unwrap();
5207        match r {
5208            s3s::dto::Range::Int { first, last } => {
5209                assert_eq!(first, 10);
5210                assert_eq!(last, Some(20));
5211            }
5212            _ => panic!("expected Int range"),
5213        }
5214    }
5215
5216    #[test]
5217    fn parse_copy_source_range_rejects_inverted() {
5218        let err = parse_copy_source_range("bytes=20-10").unwrap_err();
5219        assert!(err.contains("last < first"));
5220    }
5221
5222    #[test]
5223    fn parse_copy_source_range_rejects_missing_prefix() {
5224        let err = parse_copy_source_range("10-20").unwrap_err();
5225        assert!(err.contains("must start with 'bytes='"));
5226    }
5227
5228    #[test]
5229    fn parse_copy_source_range_rejects_open_ended() {
5230        // S3 upload_part_copy spec requires N-M (closed); suffix and
5231        // open-ended forms are not allowed for this header.
5232        assert!(parse_copy_source_range("bytes=10-").is_err());
5233        assert!(parse_copy_source_range("bytes=-10").is_err());
5234    }
5235}