Skip to main content

s4_server/
service.rs

1//! `s3s::S3` 実装 — `s3s_aws::Proxy` への delegation を default にしつつ、
2//! `put_object` / `get_object` 経路で `s4_codec::CodecRegistry` を呼ぶ。
3//!
4//! ## カバー範囲 (Phase 1 月 2)
5//!
6//! - 圧縮 hook あり: `put_object`, `get_object`
7//! - 純 delegation (圧縮なし): `head_bucket`, `list_buckets`, `create_bucket`, `delete_bucket`,
8//!   `head_object`, `delete_object`, `delete_objects`, `copy_object`, `list_objects`,
9//!   `list_objects_v2`, `create_multipart_upload`, `upload_part`,
10//!   `complete_multipart_upload`, `abort_multipart_upload`, `list_multipart_uploads`,
11//!   `list_parts`
12//! - 未対応 (デフォルトで NotImplemented): その他 80+ ops (Tagging / ACL / Lifecycle 等は Phase 2)
13//!
14//! ## アーキテクチャ
15//!
16//! - `S4Service<B>` は backend (B: S3) と `Arc<CodecRegistry>` と `Arc<dyn CodecDispatcher>`
17//!   を保持する。`CodecRegistry` 経由で複数 codec を抱えられるので、ひとつの S4 インスタンスが
18//!   複数 codec で書かれた object を透過的に GET できる
19//! - PUT: dispatcher が body の先頭 sample から codec を選び、registry で compress、
20//!   manifest を S3 metadata に書いて backend に forward
21//! - GET: backend から取得 → metadata から manifest を復元 → registry.decompress で
22//!   manifest 指定の codec で解凍 → 元の bytes を return
23//!
24//! ## 既知の制限事項
25//!
26//! - **Multipart Upload は per-part 圧縮が未実装**: 現状は upload_part を素通し。
27//!   Phase 1 月 2 後半で per-part compress + complete_multipart_upload で manifest 集約。
28//! - **PUT body は memory に collect**: max_body_bytes 上限あり (default 5 GiB = S3 単発 PUT 上限)。
29//!   Streaming-aware 圧縮は Phase 2。
30
31use std::sync::Arc;
32
33use base64::Engine as _;
34use bytes::BytesMut;
35use s3s::dto::*;
36use s3s::{S3, S3Error, S3ErrorCode, S3Request, S3Response, S3Result};
37use s4_codec::index::{FrameIndex, build_index_from_body, decode_index, encode_index, sidecar_key};
38use s4_codec::multipart::{
39    FRAME_HEADER_BYTES, FrameHeader, FrameIter, S3_MULTIPART_MIN_PART_BYTES, pad_to_minimum,
40    write_frame,
41};
42use s4_codec::{ChunkManifest, CodecDispatcher, CodecKind, CodecRegistry};
43use std::time::Instant;
44use tracing::{debug, info};
45
46use crate::blob::{
47    bytes_to_blob, chain_sample_with_rest, collect_blob, collect_with_sample, peek_sample,
48};
49use crate::streaming::{
50    cpu_zstd_decompress_stream, pick_chunk_size, streaming_compress_to_frames,
51    supports_streaming_compress, supports_streaming_decompress,
52};
53
54/// PUT body の先頭 sampling で渡す最大 byte 数。
55const SAMPLE_BYTES: usize = 4096;
56
57/// v0.4 #20: captured at the start of a handler, before the request is
58/// consumed by the backend call, so the matching `record_access` at
59/// end-of-request can fill in the structured access log entry.
60struct AccessLogPreamble {
61    remote_ip: Option<String>,
62    requester: Option<String>,
63    request_uri: String,
64    user_agent: Option<String>,
65}
66
67pub struct S4Service<B: S3> {
68    backend: B,
69    registry: Arc<CodecRegistry>,
70    dispatcher: Arc<dyn CodecDispatcher>,
71    max_body_bytes: usize,
72    policy: Option<crate::policy::SharedPolicy>,
73    /// v0.3 #13: surfaced as the `aws:SecureTransport` Condition key. Set
74    /// to `true` when the listener is wrapped in TLS (or ACME), so policies
75    /// gating "deny if not over TLS" can do their job. Defaults to `false`
76    /// (HTTP); set via [`S4Service::with_secure_transport`] at boot.
77    secure_transport: bool,
78    /// v0.4 #19: optional per-(principal, bucket) token-bucket limiter.
79    rate_limits: Option<crate::rate_limit::SharedRateLimits>,
80    /// v0.4 #20: optional S3-style access log emitter.
81    access_log: Option<crate::access_log::SharedAccessLog>,
82    /// v0.4 #21 / v0.5 #29: optional server-side encryption keyring
83    /// (AES-256-GCM). When set, every PUT body gets wrapped in S4E2
84    /// (with the keyring's active key id) after the compress + framing
85    /// steps; every GET that sniffs as S4E1/S4E2 is decrypted before
86    /// frame parsing. A `with_sse_key(...)` call wraps the supplied
87    /// key in a 1-slot keyring so single-key (v0.4) operators get the
88    /// same behaviour they had before, just on the v2 frame.
89    sse_keyring: Option<crate::sse::SharedSseKeyring>,
90    /// v0.5 #34: optional first-class versioning state machine. When
91    /// `Some(...)`, S4-server itself owns the per-bucket versioning
92    /// state + per-(bucket, key) version chain; PUT / GET / DELETE /
93    /// list_object_versions / get_bucket_versioning /
94    /// put_bucket_versioning handlers consult the manager instead of
95    /// passing through. When `None` (default), the legacy
96    /// backend-passthrough behaviour applies so existing v0.4
97    /// deployments are unaffected until they explicitly call
98    /// `with_versioning(...)`.
99    versioning: Option<Arc<crate::versioning::VersioningManager>>,
100    /// v0.5 #28: optional SSE-KMS envelope-encryption backend. When
101    /// `Some(...)`, PUTs carrying `x-amz-server-side-encryption: aws:kms`
102    /// generate a fresh DEK via the backend, encrypt the body with it
103    /// (S4E4 frame), and persist only the wrapped DEK. GETs sniffing as
104    /// S4E4 unwrap the DEK through the same backend before decrypt.
105    /// `kms_default_key_id` is used when the request omits an explicit
106    /// `x-amz-server-side-encryption-aws-kms-key-id` (mirrors AWS S3
107    /// bucket-default behaviour).
108    kms: Option<Arc<dyn crate::kms::KmsBackend>>,
109    kms_default_key_id: Option<String>,
110    /// v0.5 #30: optional Object Lock (WORM) enforcement layer. When
111    /// `Some(...)`, `delete_object` and overwrite-style `put_object`
112    /// consult the manager and refuse the operation with HTTP 403
113    /// `AccessDenied` while the object is locked (Compliance until
114    /// expiry, Governance unless the bypass header is set, or any time
115    /// a legal hold is on). PUT also auto-applies the bucket-default
116    /// retention to brand-new objects when configured. When `None`
117    /// (default), the legacy backend-passthrough behaviour applies, so
118    /// existing v0.4 deployments are unaffected until they explicitly
119    /// call `with_object_lock(...)`.
120    object_lock: Option<Arc<crate::object_lock::ObjectLockManager>>,
121    /// v0.5 #32: when `true`, every PUT must carry an SSE indicator
122    /// (`x-amz-server-side-encryption`, the SSE-C customer-key headers,
123    /// or be matched against a configured server-managed keyring/KMS).
124    /// Set by `--compliance-mode strict` after the boot-time
125    /// prerequisite check passes.
126    compliance_strict: bool,
127}
128
129impl<B: S3> S4Service<B> {
130    /// AWS S3 単発 PUT の API 上限 (5 GiB)
131    pub const DEFAULT_MAX_BODY_BYTES: usize = 5 * 1024 * 1024 * 1024;
132
133    pub fn new(
134        backend: B,
135        registry: Arc<CodecRegistry>,
136        dispatcher: Arc<dyn CodecDispatcher>,
137    ) -> Self {
138        Self {
139            backend,
140            registry,
141            dispatcher,
142            max_body_bytes: Self::DEFAULT_MAX_BODY_BYTES,
143            policy: None,
144            secure_transport: false,
145            rate_limits: None,
146            access_log: None,
147            sse_keyring: None,
148            versioning: None,
149            kms: None,
150            kms_default_key_id: None,
151            object_lock: None,
152            compliance_strict: false,
153        }
154    }
155
156    /// v0.5 #32: enable strict compliance mode. Every PUT must carry an
157    /// SSE indicator (server-side encryption header or SSE-C customer
158    /// key); requests without one are rejected with 400 InvalidRequest.
159    /// Boot-time prerequisite checking lives in the binary
160    /// (`validate_compliance_mode`) so this flag is purely the runtime
161    /// switch.
162    #[must_use]
163    pub fn with_compliance_strict(mut self, on: bool) -> Self {
164        self.compliance_strict = on;
165        self
166    }
167
168    /// v0.5 #30: attach the in-memory Object Lock (WORM) enforcement
169    /// manager. Once set, `delete_object` and overwrite-path
170    /// `put_object` refuse operations on locked keys with HTTP 403
171    /// `AccessDenied`; new PUTs to a bucket with a default retention
172    /// policy auto-create per-object lock state.
173    #[must_use]
174    pub fn with_object_lock(
175        mut self,
176        mgr: Arc<crate::object_lock::ObjectLockManager>,
177    ) -> Self {
178        self.object_lock = Some(mgr);
179        self
180    }
181
182    /// v0.5 #28: attach an SSE-KMS backend. `default_key_id` is used
183    /// when a PUT requests SSE-KMS without naming a specific KMS key
184    /// (operators set this to mirror AWS S3's bucket-default key).
185    #[must_use]
186    pub fn with_kms_backend(
187        mut self,
188        kms: Arc<dyn crate::kms::KmsBackend>,
189        default_key_id: Option<String>,
190    ) -> Self {
191        self.kms = Some(kms);
192        self.kms_default_key_id = default_key_id;
193        self
194    }
195
196    /// v0.5 #34: attach the first-class versioning state machine. Once
197    /// set, this `S4Service` owns the per-bucket versioning state +
198    /// per-(bucket, key) version chain; `put_object` / `get_object` /
199    /// `delete_object` / `list_object_versions` /
200    /// `get_bucket_versioning` / `put_bucket_versioning` consult the
201    /// manager instead of passing through to the backend. The backend
202    /// is still used as the byte store: Suspended / Unversioned buckets
203    /// keep using `<key>` directly (legacy), Enabled buckets redirect
204    /// each version's bytes to a shadow key
205    /// (`<key>.__s4ver__/<version-id>`) so older versions survive newer
206    /// PUTs to the same logical key.
207    #[must_use]
208    pub fn with_versioning(mut self, mgr: Arc<crate::versioning::VersioningManager>) -> Self {
209        self.versioning = Some(mgr);
210        self
211    }
212
213    /// v0.4 #21 (kept for back-compat): attach a single SSE-S4 key.
214    /// Internally wraps it in a 1-slot keyring with id=1 active, so
215    /// new objects ride the v0.5 S4E2 frame while previously-written
216    /// S4E1 bytes (this same key) still decrypt via the keyring's S4E1
217    /// fallback path. Operators wanting true rotation should call
218    /// [`Self::with_sse_keyring`] instead.
219    #[must_use]
220    pub fn with_sse_key(mut self, key: crate::sse::SharedSseKey) -> Self {
221        let keyring = crate::sse::SseKeyring::new(1, key);
222        self.sse_keyring = Some(std::sync::Arc::new(keyring));
223        self
224    }
225
226    /// v0.5 #29: attach a multi-key SSE-S4 keyring. PUT encrypts under
227    /// the active key (S4E2 frame stamped with that key's id); GET
228    /// dispatches on the body's magic — S4E1 falls back to trying every
229    /// key in the ring (active first) so v0.4 objects survive a
230    /// migration; S4E2 looks up the explicit key_id from the header.
231    #[must_use]
232    pub fn with_sse_keyring(mut self, keyring: crate::sse::SharedSseKeyring) -> Self {
233        self.sse_keyring = Some(keyring);
234        self
235    }
236
237    /// v0.4 #20: attach an S3-style access-log emitter. Each completed
238    /// PUT / GET / DELETE / List handler emits one entry into the
239    /// emitter's buffer; a background flusher (started separately, see
240    /// [`crate::access_log::AccessLog::spawn_flusher`]) writes hourly
241    /// rotated `.log` files into the configured directory.
242    #[must_use]
243    pub fn with_access_log(mut self, log: crate::access_log::SharedAccessLog) -> Self {
244        self.access_log = Some(log);
245        self
246    }
247
248    /// Capture the per-request access-log preamble before the request is
249    /// consumed by the backend call. Returns `None` if no access logger
250    /// is configured (cheap early-out so the handler doesn't pay the
251    /// header-clone cost when access logging is off).
252    fn access_log_preamble<I>(&self, req: &S3Request<I>) -> Option<AccessLogPreamble> {
253        self.access_log.as_ref()?;
254        Some(AccessLogPreamble {
255            remote_ip: req
256                .headers
257                .get("x-forwarded-for")
258                .and_then(|v| v.to_str().ok())
259                .and_then(|raw| raw.split(',').next())
260                .map(|s| s.trim().to_owned()),
261            requester: Self::principal_of(req).map(str::to_owned),
262            request_uri: format!("{} {}", req.method, req.uri.path()),
263            user_agent: req
264                .headers
265                .get("user-agent")
266                .and_then(|v| v.to_str().ok())
267                .map(str::to_owned),
268        })
269    }
270
271    /// Internal — called by handlers at end-of-request with a captured
272    /// preamble. Best-effort: swallows the await fast (clones Arc +
273    /// pushes), no error propagation back to the request path.
274    #[allow(clippy::too_many_arguments)]
275    async fn record_access(
276        &self,
277        preamble: Option<AccessLogPreamble>,
278        operation: &'static str,
279        bucket: &str,
280        key: Option<&str>,
281        http_status: u16,
282        bytes_sent: u64,
283        object_size: u64,
284        total_time_ms: u64,
285        error_code: Option<&str>,
286    ) {
287        let (Some(log), Some(p)) = (self.access_log.as_ref(), preamble) else {
288            return;
289        };
290        log.record(crate::access_log::AccessLogEntry {
291            time: std::time::SystemTime::now(),
292            bucket: bucket.to_owned(),
293            remote_ip: p.remote_ip,
294            requester: p.requester,
295            operation,
296            key: key.map(str::to_owned),
297            request_uri: p.request_uri,
298            http_status,
299            error_code: error_code.map(str::to_owned),
300            bytes_sent,
301            object_size,
302            total_time_ms,
303            user_agent: p.user_agent,
304        })
305        .await;
306    }
307
308    /// v0.4 #19: attach a per-(principal, bucket) token-bucket rate limiter.
309    /// When set, every PUT / GET / DELETE / List / Copy / multipart op is
310    /// throttle-checked before the policy gate; throttled requests return
311    /// `S3ErrorCode::SlowDown` (HTTP 503) and bump
312    /// `s4_rate_limit_throttled_total{principal,bucket}`.
313    #[must_use]
314    pub fn with_rate_limits(mut self, rl: crate::rate_limit::SharedRateLimits) -> Self {
315        self.rate_limits = Some(rl);
316        self
317    }
318
319    /// Helper used by request handlers to apply the rate limit. Returns
320    /// `Ok(())` when allowed (or no rate limiter is configured), or a
321    /// `SlowDown` S3Error otherwise.
322    fn enforce_rate_limit<I>(&self, req: &S3Request<I>, bucket: &str) -> S3Result<()> {
323        let Some(rl) = self.rate_limits.as_ref() else {
324            return Ok(());
325        };
326        let principal_id = Self::principal_of(req);
327        if !rl.check(principal_id, bucket) {
328            crate::metrics::record_rate_limit_throttle(principal_id.unwrap_or("-"), bucket);
329            return Err(S3Error::with_message(
330                S3ErrorCode::SlowDown,
331                format!("rate-limited: bucket={bucket}"),
332            ));
333        }
334        Ok(())
335    }
336
337    /// Tell the policy evaluator that the listener is reached over TLS
338    /// (or ACME). When `true`, the `aws:SecureTransport` Condition key
339    /// resolves to `true`. Defaults to `false`.
340    #[must_use]
341    pub fn with_secure_transport(mut self, on: bool) -> Self {
342        self.secure_transport = on;
343        self
344    }
345
346    #[must_use]
347    pub fn with_max_body_bytes(mut self, n: usize) -> Self {
348        self.max_body_bytes = n;
349        self
350    }
351
352    /// Attach an optional bucket policy (v0.2 #7). When `Some(...)`, every
353    /// PUT / GET / DELETE / List handler runs `policy.evaluate(...)` before
354    /// delegating to the backend; failures return `S3ErrorCode::AccessDenied`.
355    /// When `None` (the default), no policy enforcement happens.
356    #[must_use]
357    pub fn with_policy(mut self, policy: crate::policy::SharedPolicy) -> Self {
358        self.policy = Some(policy);
359        self
360    }
361
362    /// Pull the SigV4 access key id off the request's credentials, if any.
363    /// Used as the `principal_id` for policy evaluation.
364    fn principal_of<I>(req: &S3Request<I>) -> Option<&str> {
365        req.credentials.as_ref().map(|c| c.access_key.as_str())
366    }
367
368    /// v0.3 #13: build the per-request policy context from the incoming
369    /// `S3Request`. Pulls `aws:UserAgent` from the User-Agent header,
370    /// `aws:SourceIp` from the standard `X-Forwarded-For` header (most
371    /// production deployments are behind an LB / reverse proxy that sets
372    /// this), `aws:CurrentTime` from the system clock, and
373    /// `aws:SecureTransport` from the per-listener TLS flag.
374    fn request_context<I>(&self, req: &S3Request<I>) -> crate::policy::RequestContext {
375        let user_agent = req
376            .headers
377            .get("user-agent")
378            .and_then(|v| v.to_str().ok())
379            .map(str::to_owned);
380        // X-Forwarded-For is `client, proxy1, proxy2`; the leftmost entry
381        // is the original client. Trim and parse leniently.
382        let source_ip = req
383            .headers
384            .get("x-forwarded-for")
385            .and_then(|v| v.to_str().ok())
386            .and_then(|raw| raw.split(',').next())
387            .and_then(|s| s.trim().parse().ok());
388        crate::policy::RequestContext {
389            source_ip,
390            user_agent,
391            request_time: Some(std::time::SystemTime::now()),
392            secure_transport: self.secure_transport,
393            extra: Default::default(),
394        }
395    }
396
397    /// Helper used by request handlers to enforce the optional policy.
398    /// Returns `Ok(())` when allowed (or no policy is configured), or an
399    /// `AccessDenied` S3Error otherwise. Bumps the policy denial Prometheus
400    /// counter on deny.
401    fn enforce_policy<I>(
402        &self,
403        req: &S3Request<I>,
404        action: &'static str,
405        bucket: &str,
406        key: Option<&str>,
407    ) -> S3Result<()> {
408        let Some(policy) = self.policy.as_ref() else {
409            return Ok(());
410        };
411        let principal_id = Self::principal_of(req);
412        let ctx = self.request_context(req);
413        let decision = policy.evaluate_with(action, bucket, key, principal_id, &ctx);
414        if decision.allow {
415            Ok(())
416        } else {
417            crate::metrics::record_policy_denial(action, bucket);
418            tracing::info!(
419                action,
420                bucket,
421                key = ?key,
422                principal = ?principal_id,
423                source_ip = ?ctx.source_ip,
424                user_agent = ?ctx.user_agent,
425                secure_transport = ctx.secure_transport,
426                matched_sid = ?decision.matched_sid,
427                effect = ?decision.matched_effect,
428                "S4 policy denied request"
429            );
430            Err(S3Error::with_message(
431                S3ErrorCode::AccessDenied,
432                format!("denied by S4 policy: {action} on bucket={bucket}"),
433            ))
434        }
435    }
436
437    /// テスト用: backend を取り戻す (test helper、production では使わない)
438    pub fn into_backend(self) -> B {
439        self.backend
440    }
441
442    /// 必要 frame だけを backend に Range GET し、frame parse + decompress + slice
443    /// した結果を返す sidecar fast path。Range request の **帯域節約版**。
444    async fn partial_range_get(
445        &self,
446        req: &S3Request<GetObjectInput>,
447        plan: s4_codec::index::RangePlan,
448        client_start: u64,
449        client_end_exclusive: u64,
450        total_original: u64,
451        get_start: Instant,
452    ) -> S3Result<S3Response<GetObjectOutput>> {
453        // 必要 byte 範囲だけを backend に partial GET
454        let backend_range = s3s::dto::Range::Int {
455            first: plan.byte_start,
456            last: Some(plan.byte_end_exclusive - 1),
457        };
458        let backend_input = GetObjectInput {
459            bucket: req.input.bucket.clone(),
460            key: req.input.key.clone(),
461            range: Some(backend_range),
462            ..Default::default()
463        };
464        let backend_req = S3Request {
465            input: backend_input,
466            method: req.method.clone(),
467            uri: req.uri.clone(),
468            headers: req.headers.clone(),
469            extensions: http::Extensions::new(),
470            credentials: req.credentials.clone(),
471            region: req.region.clone(),
472            service: req.service.clone(),
473            trailing_headers: None,
474        };
475        let mut backend_resp = self.backend.get_object(backend_req).await?;
476        let blob = backend_resp.output.body.take().ok_or_else(|| {
477            S3Error::with_message(
478                S3ErrorCode::InternalError,
479                "backend partial GET returned empty body",
480            )
481        })?;
482        let bytes = collect_blob(blob, self.max_body_bytes)
483            .await
484            .map_err(internal("collect partial body"))?;
485
486        // frame parse + decompress
487        let mut combined = BytesMut::new();
488        for frame in FrameIter::new(bytes) {
489            let (header, payload) = frame.map_err(|e| {
490                S3Error::with_message(
491                    S3ErrorCode::InternalError,
492                    format!("partial-range frame parse: {e}"),
493                )
494            })?;
495            let chunk_manifest = ChunkManifest {
496                codec: header.codec,
497                original_size: header.original_size,
498                compressed_size: header.compressed_size,
499                crc32c: header.crc32c,
500            };
501            let decompressed = self
502                .registry
503                .decompress(payload, &chunk_manifest)
504                .await
505                .map_err(internal("partial-range decompress"))?;
506            combined.extend_from_slice(&decompressed);
507        }
508        let combined = combined.freeze();
509        let sliced = combined
510            .slice(plan.slice_start_in_combined as usize..plan.slice_end_in_combined as usize);
511
512        // response 組立て
513        let returned_size = sliced.len() as u64;
514        backend_resp.output.content_length = Some(returned_size as i64);
515        backend_resp.output.content_range = Some(format!(
516            "bytes {client_start}-{}/{total_original}",
517            client_end_exclusive - 1
518        ));
519        backend_resp.output.checksum_crc32 = None;
520        backend_resp.output.checksum_crc32c = None;
521        backend_resp.output.checksum_crc64nvme = None;
522        backend_resp.output.checksum_sha1 = None;
523        backend_resp.output.checksum_sha256 = None;
524        backend_resp.output.e_tag = None;
525        backend_resp.output.body = Some(bytes_to_blob(sliced));
526        backend_resp.status = Some(http::StatusCode::PARTIAL_CONTENT);
527
528        let elapsed = get_start.elapsed();
529        crate::metrics::record_get(
530            "partial",
531            plan.byte_end_exclusive - plan.byte_start,
532            returned_size,
533            elapsed.as_secs_f64(),
534            true,
535        );
536        info!(
537            op = "get_object",
538            bucket = %req.input.bucket,
539            key = %req.input.key,
540            bytes_in = plan.byte_end_exclusive - plan.byte_start,
541            bytes_out = returned_size,
542            total_object_size = total_original,
543            range = true,
544            path = "sidecar-partial",
545            latency_ms = elapsed.as_millis() as u64,
546            "S4 partial Range GET via sidecar index"
547        );
548        Ok(backend_resp)
549    }
550
551    /// `<key>.s4index` sidecar object を backend に書く。失敗しても本体 PUT は
552    /// 成功扱いにしたいので、err は warn ログのみ (Range GET の partial path が
553    /// 使えなくなるが、full read fallback で意味的には正しい結果を返す)。
554    async fn write_sidecar(&self, bucket: &str, key: &str, index: &FrameIndex) {
555        let bytes = encode_index(index);
556        let len = bytes.len() as i64;
557        let put_input = PutObjectInput {
558            bucket: bucket.into(),
559            key: sidecar_key(key),
560            body: Some(bytes_to_blob(bytes)),
561            content_length: Some(len),
562            content_type: Some("application/x-s4-index".into()),
563            ..Default::default()
564        };
565        let put_req = S3Request {
566            input: put_input,
567            method: http::Method::PUT,
568            uri: format!("/{bucket}/{}", sidecar_key(key)).parse().unwrap(),
569            headers: http::HeaderMap::new(),
570            extensions: http::Extensions::new(),
571            credentials: None,
572            region: None,
573            service: None,
574            trailing_headers: None,
575        };
576        if let Err(e) = self.backend.put_object(put_req).await {
577            tracing::warn!(
578                bucket,
579                key,
580                "S4 write_sidecar failed (Range GET will fall back to full read): {e}"
581            );
582        }
583    }
584
585    /// `<key>.s4index` sidecar を backend から読み出す。なければ None。
586    async fn read_sidecar(&self, bucket: &str, key: &str) -> Option<FrameIndex> {
587        let get_input = GetObjectInput {
588            bucket: bucket.into(),
589            key: sidecar_key(key),
590            ..Default::default()
591        };
592        let get_req = S3Request {
593            input: get_input,
594            method: http::Method::GET,
595            uri: format!("/{bucket}/{}", sidecar_key(key)).parse().unwrap(),
596            headers: http::HeaderMap::new(),
597            extensions: http::Extensions::new(),
598            credentials: None,
599            region: None,
600            service: None,
601            trailing_headers: None,
602        };
603        let resp = self.backend.get_object(get_req).await.ok()?;
604        let blob = resp.output.body?;
605        let bytes = collect_blob(blob, 64 * 1024 * 1024).await.ok()?;
606        decode_index(bytes).ok()
607    }
608
609    /// Multipart object (frame 列) を解凍 → 元 bytes を再構築。
610    ///
611    /// **per-frame codec dispatch**: 各 frame header に codec_id が入っているので、
612    /// frame ごとに registry が違う codec を呼ぶことができる。同一 object 内で
613    /// 異なる codec が混在していても透過的に解凍可能 (parquet 風 mixed columns 等)。
614    async fn decompress_multipart(&self, bytes: bytes::Bytes) -> S3Result<bytes::Bytes> {
615        let mut out = BytesMut::new();
616        for frame in FrameIter::new(bytes) {
617            let (header, payload) = frame.map_err(|e| {
618                S3Error::with_message(
619                    S3ErrorCode::InternalError,
620                    format!("multipart frame parse: {e}"),
621                )
622            })?;
623            let chunk_manifest = ChunkManifest {
624                codec: header.codec,
625                original_size: header.original_size,
626                compressed_size: header.compressed_size,
627                crc32c: header.crc32c,
628            };
629            let decompressed = self
630                .registry
631                .decompress(payload, &chunk_manifest)
632                .await
633                .map_err(internal("multipart frame decompress"))?;
634            out.extend_from_slice(&decompressed);
635        }
636        Ok(out.freeze())
637    }
638}
639
640/// Parse a CopySourceRange header value (`bytes=N-M`, `bytes=N-`, `bytes=-N`)
641/// into the s3s::dto::Range used by the GetObject path. The S3 spec only
642/// allows `bytes=N-M` for upload_part_copy (no suffix or open-ended), so
643/// reject the other variants for parity with AWS.
644fn parse_copy_source_range(s: &str) -> Result<s3s::dto::Range, String> {
645    let rest = s
646        .strip_prefix("bytes=")
647        .ok_or_else(|| format!("CopySourceRange must start with 'bytes=', got {s:?}"))?;
648    let (a, b) = rest
649        .split_once('-')
650        .ok_or_else(|| format!("CopySourceRange must be 'bytes=N-M', got {s:?}"))?;
651    let first: u64 = a
652        .parse()
653        .map_err(|_| format!("CopySourceRange first byte not a number: {a:?}"))?;
654    let last: u64 = b
655        .parse()
656        .map_err(|_| format!("CopySourceRange last byte not a number: {b:?}"))?;
657    if last < first {
658        return Err(format!("CopySourceRange last < first: {s:?}"));
659    }
660    Ok(s3s::dto::Range::Int {
661        first,
662        last: Some(last),
663    })
664}
665
666/// v0.5 #34: synthesize the backend storage key for a given
667/// (logical key, version-id) pair on an Enabled-versioning bucket.
668///
669/// Uses the `__s4ver__/` infix because:
670/// - it's not a substring of `.s4index` / `.s4ver` natural keys (no false-positive
671///   listing filter collisions)
672/// - directory-style separator keeps S3 console "browse by prefix" UX intact
673///   (versions roll up under one virtual folder per object)
674/// - human-readable on debug logs / `aws s3 ls`
675///
676/// `list_objects` / `list_objects_v2` / `list_object_versions` MUST filter
677/// keys containing `.__s4ver__/` from results so customers don't see internal
678/// shadow objects.
679pub fn versioned_shadow_key(key: &str, version_id: &str) -> String {
680    format!("{key}.__s4ver__/{version_id}")
681}
682
683/// Test for the marker substring used by [`versioned_shadow_key`]. Cheap str
684/// scan; both list_objects filter and the GET passthrough check use this.
685fn is_versioning_shadow_key(key: &str) -> bool {
686    key.contains(".__s4ver__/")
687}
688
689fn is_multipart_object(metadata: &Option<Metadata>) -> bool {
690    metadata
691        .as_ref()
692        .and_then(|m| m.get(META_MULTIPART))
693        .map(|v| v == "true")
694        .unwrap_or(false)
695}
696
697const META_CODEC: &str = "s4-codec";
698const META_ORIGINAL_SIZE: &str = "s4-original-size";
699const META_COMPRESSED_SIZE: &str = "s4-compressed-size";
700const META_CRC32C: &str = "s4-crc32c";
701/// Multipart upload で per-part frame format を使ったオブジェクトであることを示す。
702/// GET 時にこの flag を見て frame parser を起動する。
703const META_MULTIPART: &str = "s4-multipart";
704/// v0.2 #4: single-PUT でも S4F2 framed format で書かれていることを示す。
705/// 旧 v0.1 single-PUT は raw 圧縮 bytes (この flag なし)。GET 時にこの flag を
706/// 見て framed 経路 (= multipart と同じ FrameIter parse) に流す。
707const META_FRAMED: &str = "s4-framed";
708
709fn is_framed_v2_object(metadata: &Option<Metadata>) -> bool {
710    metadata
711        .as_ref()
712        .and_then(|m| m.get(META_FRAMED))
713        .map(|v| v == "true")
714        .unwrap_or(false)
715}
716
717/// v0.4 #21: detect SSE-S4 by the metadata flag we set on PUT.
718fn is_sse_encrypted(metadata: &Option<Metadata>) -> bool {
719    metadata
720        .as_ref()
721        .and_then(|m| m.get("s4-encrypted"))
722        .map(|v| v == "aes-256-gcm")
723        .unwrap_or(false)
724}
725
726/// v0.5 #27: pull the three SSE-C headers off an input struct. The S3
727/// contract is "all three or none" — partial sets are a 400.
728///
729/// Returns `Ok(None)` when no SSE-C headers were sent (server-managed or
730/// no encryption), `Ok(Some(material))` on validated client key, and
731/// `Err` for malformed or partial inputs.
732fn extract_sse_c_material(
733    algorithm: &Option<String>,
734    key: &Option<String>,
735    md5: &Option<String>,
736) -> S3Result<Option<crate::sse::CustomerKeyMaterial>> {
737    match (algorithm, key, md5) {
738        (None, None, None) => Ok(None),
739        (Some(a), Some(k), Some(m)) => crate::sse::parse_customer_key_headers(a, k, m)
740            .map(Some)
741            .map_err(sse_c_error_to_s3),
742        _ => Err(S3Error::with_message(
743            S3ErrorCode::InvalidRequest,
744            "SSE-C requires all three of: x-amz-server-side-encryption-customer-{algorithm,key,key-MD5}",
745        )),
746    }
747}
748
749/// v0.5 #28: detect SSE-KMS request — `x-amz-server-side-encryption: aws:kms`.
750/// Returns the key-id to wrap under, falling back to the gateway default.
751fn extract_kms_key_id(
752    sse: &Option<ServerSideEncryption>,
753    sse_kms_key_id: &Option<String>,
754    gateway_default: Option<&str>,
755) -> Option<String> {
756    let asks_for_kms = sse
757        .as_ref()
758        .map(|s| s.as_str() == ServerSideEncryption::AWS_KMS)
759        .unwrap_or(false);
760    if !asks_for_kms {
761        return None;
762    }
763    sse_kms_key_id
764        .clone()
765        .or_else(|| gateway_default.map(str::to_owned))
766}
767
768/// v0.5 #28: map kms module errors to AWS-shaped S3 error codes.
769/// `KeyNotFound` is operator misconfig (400); `BackendUnavailable` is a
770/// transient KMS outage (503). Other variants are 500 InternalError.
771fn kms_error_to_s3(e: crate::kms::KmsError) -> S3Error {
772    use crate::kms::KmsError as K;
773    match e {
774        K::KeyNotFound { key_id } => S3Error::with_message(
775            S3ErrorCode::InvalidArgument,
776            format!("KMS key not found: {key_id}"),
777        ),
778        K::BackendUnavailable { message } => S3Error::with_message(
779            S3ErrorCode::ServiceUnavailable,
780            format!("KMS backend unavailable: {message}"),
781        ),
782        other => S3Error::with_message(
783            S3ErrorCode::InternalError,
784            format!("KMS error: {other}"),
785        ),
786    }
787}
788
789/// v0.5 #27: map sse module errors to AWS-shaped S3 error codes.
790/// `WrongCustomerKey` → 403 AccessDenied (matches AWS behaviour);
791/// `InvalidCustomerKey` / algorithm / required / unexpected → 400.
792fn sse_c_error_to_s3(e: crate::sse::SseError) -> S3Error {
793    use crate::sse::SseError as E;
794    match e {
795        E::WrongCustomerKey => S3Error::with_message(
796            S3ErrorCode::AccessDenied,
797            "SSE-C key does not match the key used at PUT time",
798        ),
799        E::InvalidCustomerKey { reason } => S3Error::with_message(
800            S3ErrorCode::InvalidArgument,
801            format!("SSE-C: {reason}"),
802        ),
803        E::CustomerKeyAlgorithmUnsupported { algo } => S3Error::with_message(
804            S3ErrorCode::InvalidArgument,
805            format!("SSE-C unsupported algorithm: {algo:?} (only AES256 is allowed)"),
806        ),
807        E::CustomerKeyRequired => S3Error::with_message(
808            S3ErrorCode::InvalidRequest,
809            "object is SSE-C encrypted; supply x-amz-server-side-encryption-customer-* headers",
810        ),
811        E::CustomerKeyUnexpected => S3Error::with_message(
812            S3ErrorCode::InvalidRequest,
813            "object is not SSE-C encrypted; do not send x-amz-server-side-encryption-customer-* headers",
814        ),
815        other => S3Error::with_message(S3ErrorCode::InternalError, format!("SSE error: {other}")),
816    }
817}
818
819fn extract_manifest(metadata: &Option<Metadata>) -> Option<ChunkManifest> {
820    let m = metadata.as_ref()?;
821    let codec = m
822        .get(META_CODEC)
823        .and_then(|s| s.parse::<CodecKind>().ok())?;
824    let original_size = m.get(META_ORIGINAL_SIZE)?.parse().ok()?;
825    let compressed_size = m.get(META_COMPRESSED_SIZE)?.parse().ok()?;
826    let crc32c = m.get(META_CRC32C)?.parse().ok()?;
827    Some(ChunkManifest {
828        codec,
829        original_size,
830        compressed_size,
831        crc32c,
832    })
833}
834
835fn write_manifest(metadata: &mut Option<Metadata>, manifest: &ChunkManifest) {
836    let meta = metadata.get_or_insert_with(Default::default);
837    meta.insert(META_CODEC.into(), manifest.codec.as_str().into());
838    meta.insert(
839        META_ORIGINAL_SIZE.into(),
840        manifest.original_size.to_string(),
841    );
842    meta.insert(
843        META_COMPRESSED_SIZE.into(),
844        manifest.compressed_size.to_string(),
845    );
846    meta.insert(META_CRC32C.into(), manifest.crc32c.to_string());
847}
848
849fn internal<E: std::fmt::Display>(prefix: &'static str) -> impl FnOnce(E) -> S3Error {
850    move |e| S3Error::with_message(S3ErrorCode::InternalError, format!("{prefix}: {e}"))
851}
852
853/// v0.5 #30: parse the `x-amz-bypass-governance-retention` header into a
854/// boolean flag. AWS S3 accepts `true` (case-insensitive); any other value
855/// (including missing) is treated as `false`.
856fn parse_bypass_governance_header(headers: &http::HeaderMap) -> bool {
857    headers
858        .get("x-amz-bypass-governance-retention")
859        .and_then(|v| v.to_str().ok())
860        .map(|s| s.eq_ignore_ascii_case("true"))
861        .unwrap_or(false)
862}
863
864/// Convert s3s `Timestamp` into a `chrono::DateTime<Utc>` by formatting it
865/// as an RFC3339 string and re-parsing through `chrono`. The string format
866/// avoids pulling the `time` crate (transitive dep of s3s, not declared by
867/// s4-server) into our direct deps. Returns `None` if the format/parse fails
868/// or the value is outside `chrono`'s supported range.
869fn timestamp_to_chrono_utc(ts: &Timestamp) -> Option<chrono::DateTime<chrono::Utc>> {
870    let mut buf = Vec::new();
871    ts.format(s3s::dto::TimestampFormat::DateTime, &mut buf).ok()?;
872    let s = std::str::from_utf8(&buf).ok()?;
873    chrono::DateTime::parse_from_rfc3339(s)
874        .ok()
875        .map(|dt| dt.with_timezone(&chrono::Utc))
876}
877
878/// Inverse of [`timestamp_to_chrono_utc`] — emit RFC3339 (the s3s
879/// `DateTime` wire format) and re-parse via `Timestamp::parse`.
880fn chrono_utc_to_timestamp(dt: chrono::DateTime<chrono::Utc>) -> Timestamp {
881    // chrono's RFC3339 output format matches s3s' parser ("...Z" with
882    // optional sub-second precision). Fall back to UNIX_EPOCH if anything
883    // unexpected happens — we never produce malformed strings, so this
884    // branch is unreachable in practice.
885    let s = dt.to_rfc3339_opts(chrono::SecondsFormat::Millis, true);
886    Timestamp::parse(s3s::dto::TimestampFormat::DateTime, &s).unwrap_or_default()
887}
888
889/// `Range` request を decompressed object サイズ `total` に適用して `(start, end_exclusive)`
890/// を返す。`Range::Int { first, last }` は `bytes=first-last` (last は inclusive)、
891/// `Range::Suffix { length }` は末尾 `length` byte。S3 仕様に準拠。
892pub fn resolve_range(range: &s3s::dto::Range, total: u64) -> Result<(u64, u64), String> {
893    if total == 0 {
894        return Err("cannot range-get zero-length object".into());
895    }
896    match range {
897        s3s::dto::Range::Int { first, last } => {
898            let start = *first;
899            let end_inclusive = match last {
900                Some(l) => (*l).min(total - 1),
901                None => total - 1,
902            };
903            if start > end_inclusive || start >= total {
904                return Err(format!(
905                    "range bytes={start}-{:?} out of object size {total}",
906                    last
907                ));
908            }
909            Ok((start, end_inclusive + 1))
910        }
911        s3s::dto::Range::Suffix { length } => {
912            let len = (*length).min(total);
913            Ok((total - len, total))
914        }
915    }
916}
917
918#[async_trait::async_trait]
919impl<B: S3> S3 for S4Service<B> {
920    // === 圧縮を挟む path (PUT) ===
921    #[tracing::instrument(
922        name = "s4.put_object",
923        skip(self, req),
924        fields(bucket = %req.input.bucket, key = %req.input.key, codec, bytes_in, bytes_out, latency_ms)
925    )]
926    async fn put_object(
927        &self,
928        mut req: S3Request<PutObjectInput>,
929    ) -> S3Result<S3Response<PutObjectOutput>> {
930        let put_start = Instant::now();
931        let put_bucket = req.input.bucket.clone();
932        let put_key = req.input.key.clone();
933        let access_preamble = self.access_log_preamble(&req);
934        self.enforce_rate_limit(&req, &put_bucket)?;
935        self.enforce_policy(&req, "s3:PutObject", &put_bucket, Some(&put_key))?;
936        // v0.5 #30: an Object Lock-protected key cannot be overwritten by
937        // a non-versioned PUT (Suspended / Unversioned bucket). Enabled
938        // bucket PUTs are exempt because they materialise a fresh
939        // version under a shadow key (`<key>.__s4ver__/<vid>`) — the
940        // locked version's bytes are untouched. The check mirrors the
941        // delete path (Compliance never bypassable, Governance via the
942        // bypass header, legal hold never).
943        if let Some(mgr) = self.object_lock.as_ref()
944            && let Some(state) = mgr.get(&put_bucket, &put_key)
945        {
946            let bucket_versioned_enabled = self
947                .versioning
948                .as_ref()
949                .map(|v| v.state(&put_bucket) == crate::versioning::VersioningState::Enabled)
950                .unwrap_or(false);
951            if !bucket_versioned_enabled {
952                let bypass = parse_bypass_governance_header(&req.headers);
953                let now = chrono::Utc::now();
954                if !state.can_delete(now, bypass) {
955                    crate::metrics::record_policy_denial("s3:PutObject", &put_bucket);
956                    return Err(S3Error::with_message(
957                        S3ErrorCode::AccessDenied,
958                        "Access Denied because object protected by object lock",
959                    ));
960                }
961            }
962        }
963        // v0.5 #30: per-PUT explicit retention / legal hold (S3
964        // `x-amz-object-lock-mode`, `x-amz-object-lock-retain-until-date`,
965        // `x-amz-object-lock-legal-hold`). Captured before the body
966        // moves into the backend; persisted into the manager only on
967        // backend success below.
968        let explicit_lock_mode: Option<crate::object_lock::LockMode> = req
969            .input
970            .object_lock_mode
971            .as_ref()
972            .and_then(|m| crate::object_lock::LockMode::from_aws_str(m.as_str()));
973        let explicit_retain_until: Option<chrono::DateTime<chrono::Utc>> = req
974            .input
975            .object_lock_retain_until_date
976            .as_ref()
977            .and_then(timestamp_to_chrono_utc);
978        let explicit_legal_hold_on: Option<bool> = req
979            .input
980            .object_lock_legal_hold_status
981            .as_ref()
982            .map(|s| s.as_str().eq_ignore_ascii_case("ON"));
983        if let Some(blob) = req.input.body.take() {
984            // Sample 4 KiB から codec を決定。streaming-aware codec なら streaming
985            // compress fast path、そうでなければ従来の collect-then-compress。
986            let (sample, rest_stream) = peek_sample(blob, SAMPLE_BYTES)
987                .await
988                .map_err(internal("peek put sample"))?;
989            let sample_len = sample.len().min(SAMPLE_BYTES);
990            let kind = self.dispatcher.pick(&sample[..sample_len]).await;
991
992            // Passthrough buys nothing from S4F2 wrapping (no compression =
993            // no per-chunk frame to skip past) and the +28-byte header
994            // overhead breaks size-sensitive callers that expect a true
995            // pass-through. So passthrough always uses the legacy raw-blob
996            // path; only compressing codecs go through the framed path.
997            let use_framed = supports_streaming_compress(kind) && kind != CodecKind::Passthrough;
998            let (compressed, manifest, is_framed) = if use_framed {
999                // streaming fast path: input は memory に collect しない
1000                let chained = chain_sample_with_rest(sample, rest_stream);
1001                debug!(
1002                    bucket = ?req.input.bucket,
1003                    key = ?req.input.key,
1004                    codec = kind.as_str(),
1005                    path = "streaming-framed",
1006                    "S4 put_object: compressing (streaming, S4F2 multi-frame)"
1007                );
1008                // v0.4 #16: pick the chunk size based on the request's
1009                // Content-Length when known, falling back to the 4 MiB
1010                // default for chunked transfers.
1011                let chunk_size = pick_chunk_size(req.input.content_length.map(|n| n as u64));
1012                let (body, manifest) = streaming_compress_to_frames(
1013                    chained,
1014                    Arc::clone(&self.registry),
1015                    kind,
1016                    chunk_size,
1017                )
1018                .await
1019                .map_err(internal("streaming framed compress"))?;
1020                (body, manifest, true)
1021            } else {
1022                // GPU codec 等で streaming-aware でないものは bytes-buffered path
1023                // (raw 圧縮 bytes、framed なし — back-compat 互換 path)
1024                let bytes = collect_with_sample(sample, rest_stream, self.max_body_bytes)
1025                    .await
1026                    .map_err(internal("collect put body (buffered path)"))?;
1027                debug!(
1028                    bucket = ?req.input.bucket,
1029                    key = ?req.input.key,
1030                    bytes = bytes.len(),
1031                    codec = kind.as_str(),
1032                    path = "buffered",
1033                    "S4 put_object: compressing (buffered, raw blob)"
1034                );
1035                let (body, m) = self
1036                    .registry
1037                    .compress(bytes, kind)
1038                    .await
1039                    .map_err(internal("registry compress"))?;
1040                (body, m, false)
1041            };
1042
1043            write_manifest(&mut req.input.metadata, &manifest);
1044            if is_framed {
1045                // v0.2 #4: framed body であることを GET 側に伝える meta flag。
1046                req.input
1047                    .metadata
1048                    .get_or_insert_with(Default::default)
1049                    .insert(META_FRAMED.into(), "true".into());
1050            }
1051            // 重要: content_length を圧縮後サイズで更新する。
1052            // これを忘れると下流 (aws-sdk-s3 → S3) が宣言サイズ分の bytes を
1053            // 待ち続けて RequestTimeout で失敗する (S3 仕様)。
1054            req.input.content_length = Some(compressed.len() as i64);
1055            // body を書き換えたので、客側が送ってきた original body 用の
1056            // checksum / MD5 ヘッダは無効化する (そのまま転送すると下流 S3 が
1057            // XAmzContentChecksumMismatch を返す)。S4 自身の整合性は
1058            // ChunkManifest.crc32c で担保している。
1059            req.input.checksum_algorithm = None;
1060            req.input.checksum_crc32 = None;
1061            req.input.checksum_crc32c = None;
1062            req.input.checksum_crc64nvme = None;
1063            req.input.checksum_sha1 = None;
1064            req.input.checksum_sha256 = None;
1065            req.input.content_md5 = None;
1066            let original_size = manifest.original_size;
1067            let compressed_size = manifest.compressed_size;
1068            let codec_label = manifest.codec.as_str();
1069            // framed body は GET 側で sidecar partial-fetch を効かせるため
1070            // build_index_from_body で sidecar を組み立てて backend に PUT する。
1071            let sidecar_index = if is_framed {
1072                s4_codec::index::build_index_from_body(&compressed).ok()
1073            } else {
1074                None
1075            };
1076            // v0.4 #21 / v0.5 #29 / v0.5 #27: encrypt-after-compress.
1077            // Precedence:
1078            //   - SSE-C headers present → per-request customer key (S4E3)
1079            //   - server-managed keyring configured → active key (S4E2)
1080            //   - neither → no encryption (raw compressed body)
1081            // The `s4-encrypted: aes-256-gcm` metadata flag is set in
1082            // both encrypted modes; the on-disk frame magic distinguishes
1083            // S4E1 / S4E2 / S4E3 so GET picks the right decrypt path.
1084            let sse_c_material = extract_sse_c_material(
1085                &req.input.sse_customer_algorithm,
1086                &req.input.sse_customer_key,
1087                &req.input.sse_customer_key_md5,
1088            )?;
1089            // v0.5 #28: SSE-KMS request? Resolves to None unless the
1090            // request asks for `aws:kms` AND a key id is available
1091            // (explicit header or gateway default). When set, we'll
1092            // generate a per-object DEK below.
1093            let kms_key_id = extract_kms_key_id(
1094                &req.input.server_side_encryption,
1095                &req.input.ssekms_key_id,
1096                self.kms_default_key_id.as_deref(),
1097            );
1098            // v0.5 #32: in compliance-strict mode, every PUT must
1099            // declare SSE — either client-supplied (SSE-C), KMS, or by
1100            // virtue of a server-side keyring being configured (which
1101            // applies SSE-S4 to every PUT automatically). Requests that
1102            // would otherwise land as plain compressed bytes are
1103            // rejected with 400 InvalidRequest.
1104            if self.compliance_strict
1105                && sse_c_material.is_none()
1106                && kms_key_id.is_none()
1107                && self.sse_keyring.is_none()
1108                && req
1109                    .input
1110                    .server_side_encryption
1111                    .as_ref()
1112                    .map(|s| s.as_str())
1113                    != Some(ServerSideEncryption::AES256)
1114            {
1115                return Err(S3Error::with_message(
1116                    S3ErrorCode::InvalidRequest,
1117                    "compliance-mode strict: PUT must include x-amz-server-side-encryption \
1118                     (AES256 or aws:kms) or x-amz-server-side-encryption-customer-* headers",
1119                ));
1120            }
1121            // SSE-C and SSE-KMS are mutually exclusive on a single PUT
1122            // (AWS S3 returns 400 InvalidArgument). SSE-C wins by spec.
1123            if sse_c_material.is_some() && kms_key_id.is_some() {
1124                return Err(S3Error::with_message(
1125                    S3ErrorCode::InvalidArgument,
1126                    "SSE-C and SSE-KMS cannot be used together on the same PUT",
1127                ));
1128            }
1129            // KMS path needs to call generate_dek().await before the
1130            // body_to_send branch; capture the result here.
1131            let kms_wrap = if let Some(ref key_id) = kms_key_id {
1132                let kms = self.kms.as_ref().ok_or_else(|| {
1133                    S3Error::with_message(
1134                        S3ErrorCode::InvalidRequest,
1135                        "SSE-KMS requested but no --kms-local-dir / --kms-aws-region is configured on this gateway",
1136                    )
1137                })?;
1138                let (dek, wrapped) = kms
1139                    .generate_dek(key_id)
1140                    .await
1141                    .map_err(kms_error_to_s3)?;
1142                if dek.len() != 32 {
1143                    return Err(S3Error::with_message(
1144                        S3ErrorCode::InternalError,
1145                        format!("KMS backend returned a DEK of {} bytes (expected 32)", dek.len()),
1146                    ));
1147                }
1148                let mut dek_arr = [0u8; 32];
1149                dek_arr.copy_from_slice(&dek);
1150                Some((dek_arr, wrapped))
1151            } else {
1152                None
1153            };
1154            let body_to_send = if let Some(ref m) = sse_c_material {
1155                req.input
1156                    .metadata
1157                    .get_or_insert_with(Default::default)
1158                    .insert("s4-encrypted".into(), "aes-256-gcm".into());
1159                crate::sse::encrypt_with_source(
1160                    &compressed,
1161                    crate::sse::SseSource::CustomerKey {
1162                        key: &m.key,
1163                        key_md5: &m.key_md5,
1164                    },
1165                )
1166            } else if let Some((ref dek, ref wrapped)) = kms_wrap {
1167                req.input
1168                    .metadata
1169                    .get_or_insert_with(Default::default)
1170                    .insert("s4-encrypted".into(), "aes-256-gcm".into());
1171                crate::sse::encrypt_with_source(
1172                    &compressed,
1173                    crate::sse::SseSource::Kms { dek, wrapped },
1174                )
1175            } else if let Some(keyring) = self.sse_keyring.as_ref() {
1176                req.input
1177                    .metadata
1178                    .get_or_insert_with(Default::default)
1179                    .insert("s4-encrypted".into(), "aes-256-gcm".into());
1180                crate::sse::encrypt_v2(&compressed, keyring)
1181            } else {
1182                compressed.clone()
1183            };
1184            req.input.body = Some(bytes_to_blob(body_to_send));
1185            // v0.5 #34: pre-allocate a version-id when the bucket is
1186            // Enabled, then redirect the backend storage key to the
1187            // shadow path so older versions survive newer PUTs.
1188            // Suspended / Unversioned buckets keep using the plain
1189            // `<key>` (S3 spec: Suspended overwrites the same backend
1190            // object). Pre-allocation (instead of recording after PUT)
1191            // ensures the shadow key + the response's
1192            // `x-amz-version-id` use the same vid.
1193            let pending_version: Option<crate::versioning::PutOutcome> = self
1194                .versioning
1195                .as_ref()
1196                .map(|mgr| mgr.state(&put_bucket))
1197                .map(|state| match state {
1198                    crate::versioning::VersioningState::Enabled => {
1199                        crate::versioning::PutOutcome {
1200                            version_id: crate::versioning::VersioningManager::new_version_id(),
1201                            versioned_response: true,
1202                        }
1203                    }
1204                    crate::versioning::VersioningState::Suspended
1205                    | crate::versioning::VersioningState::Unversioned => {
1206                        crate::versioning::PutOutcome {
1207                            version_id: crate::versioning::NULL_VERSION_ID.to_owned(),
1208                            versioned_response: false,
1209                        }
1210                    }
1211                });
1212            if let Some(ref pv) = pending_version
1213                && pv.versioned_response
1214            {
1215                req.input.key = versioned_shadow_key(&put_key, &pv.version_id);
1216            }
1217            let mut backend_resp = self.backend.put_object(req).await;
1218            if let Some(idx) = sidecar_index
1219                && backend_resp.is_ok()
1220                && idx.entries.len() > 1
1221            {
1222                // 1 chunk しかない (small object) なら sidecar は意味がない (=
1223                // partial fetch しても full body と同じ範囲) ので省略。
1224                // Sidecar は user-visible key で書く (latest version の
1225                // partial fetch path 用)。Old versions の Range GET は今 task
1226                // の scope 外 (full read fallback でも意味的には正しい)。
1227                self.write_sidecar(&put_bucket, &put_key, &idx).await;
1228            }
1229            // v0.5 #34: commit the new version into the manager only on
1230            // backend success. Use the pre-allocated vid so the response
1231            // header and the chain entry agree.
1232            if let (Some(mgr), Some(pv), Ok(resp)) = (
1233                self.versioning.as_ref(),
1234                pending_version.as_ref(),
1235                backend_resp.as_mut(),
1236            ) {
1237                let etag = resp
1238                    .output
1239                    .e_tag
1240                    .clone()
1241                    .map(ETag::into_value)
1242                    .unwrap_or_else(|| format!("\"crc32c-{}\"", manifest.crc32c));
1243                let now = chrono::Utc::now();
1244                mgr.commit_put_with_version(
1245                    &put_bucket,
1246                    &put_key,
1247                    crate::versioning::VersionEntry {
1248                        version_id: pv.version_id.clone(),
1249                        etag,
1250                        size: original_size,
1251                        is_delete_marker: false,
1252                        created_at: now,
1253                    },
1254                );
1255                if pv.versioned_response {
1256                    resp.output.version_id = Some(pv.version_id.clone());
1257                }
1258            }
1259            // v0.5 #27: AWS S3 echoes the SSE-C headers back on success
1260            // so the client knows the server actually applied the
1261            // requested algorithm and which key fingerprint matched.
1262            if let (Some(m), Ok(resp)) = (sse_c_material.as_ref(), backend_resp.as_mut()) {
1263                resp.output.sse_customer_algorithm = Some(crate::sse::SSE_C_ALGORITHM.into());
1264                resp.output.sse_customer_key_md5 = Some(
1265                    base64::engine::general_purpose::STANDARD.encode(m.key_md5),
1266                );
1267            }
1268            // v0.5 #28: SSE-KMS echo — `aws:kms` + the canonical key id
1269            // the backend returned (AWS KMS returns the ARN even when
1270            // the request used an alias).
1271            if let (Some((_, wrapped)), Ok(resp)) =
1272                (kms_wrap.as_ref(), backend_resp.as_mut())
1273            {
1274                resp.output.server_side_encryption =
1275                    Some(ServerSideEncryption::from_static(ServerSideEncryption::AWS_KMS));
1276                resp.output.ssekms_key_id = Some(wrapped.key_id.clone());
1277            }
1278            // v0.5 #30: persist any per-PUT explicit retention / legal
1279            // hold the client supplied, then auto-apply the bucket
1280            // default (no-op when state is already populated). The
1281            // explicit fields take precedence — the bucket-default
1282            // helper bails out as soon as it sees any retention.
1283            if let (Some(mgr), Ok(_)) = (self.object_lock.as_ref(), backend_resp.as_ref()) {
1284                if explicit_lock_mode.is_some()
1285                    || explicit_retain_until.is_some()
1286                    || explicit_legal_hold_on.is_some()
1287                {
1288                    let mut state = mgr.get(&put_bucket, &put_key).unwrap_or_default();
1289                    if let Some(m) = explicit_lock_mode {
1290                        state.mode = Some(m);
1291                    }
1292                    if let Some(u) = explicit_retain_until {
1293                        state.retain_until = Some(u);
1294                    }
1295                    if let Some(lh) = explicit_legal_hold_on {
1296                        state.legal_hold_on = lh;
1297                    }
1298                    mgr.set(&put_bucket, &put_key, state);
1299                }
1300                mgr.apply_default_on_put(&put_bucket, &put_key, chrono::Utc::now());
1301            }
1302            let _ = (original_size, compressed_size); // mute unused warnings
1303            let elapsed = put_start.elapsed();
1304            crate::metrics::record_put(
1305                codec_label,
1306                original_size,
1307                compressed_size,
1308                elapsed.as_secs_f64(),
1309                backend_resp.is_ok(),
1310            );
1311            // v0.4 #20: structured access-log entry (best-effort).
1312            self.record_access(
1313                access_preamble,
1314                "REST.PUT.OBJECT",
1315                &put_bucket,
1316                Some(&put_key),
1317                if backend_resp.is_ok() { 200 } else { 500 },
1318                compressed_size,
1319                original_size,
1320                elapsed.as_millis() as u64,
1321                backend_resp.as_ref().err().map(|e| e.code().as_str()),
1322            )
1323            .await;
1324            info!(
1325                op = "put_object",
1326                bucket = %put_bucket,
1327                key = %put_key,
1328                codec = codec_label,
1329                bytes_in = original_size,
1330                bytes_out = compressed_size,
1331                ratio = format!(
1332                    "{:.3}",
1333                    if original_size == 0 { 1.0 } else { compressed_size as f64 / original_size as f64 }
1334                ),
1335                latency_ms = elapsed.as_millis() as u64,
1336                ok = backend_resp.is_ok(),
1337                "S4 put completed"
1338            );
1339            return backend_resp;
1340        }
1341        // Body-less PUT (rare: zero-length object). Mirror the body-full
1342        // versioning hooks so list_object_versions / GET-by-version still see
1343        // empty-body objects in the chain.
1344        let pending_version: Option<crate::versioning::PutOutcome> = self
1345            .versioning
1346            .as_ref()
1347            .map(|mgr| mgr.state(&put_bucket))
1348            .map(|state| match state {
1349                crate::versioning::VersioningState::Enabled => crate::versioning::PutOutcome {
1350                    version_id: crate::versioning::VersioningManager::new_version_id(),
1351                    versioned_response: true,
1352                },
1353                _ => crate::versioning::PutOutcome {
1354                    version_id: crate::versioning::NULL_VERSION_ID.to_owned(),
1355                    versioned_response: false,
1356                },
1357            });
1358        if let Some(ref pv) = pending_version
1359            && pv.versioned_response
1360        {
1361            req.input.key = versioned_shadow_key(&put_key, &pv.version_id);
1362        }
1363        let mut backend_resp = self.backend.put_object(req).await;
1364        if let (Some(mgr), Some(pv), Ok(resp)) = (
1365            self.versioning.as_ref(),
1366            pending_version.as_ref(),
1367            backend_resp.as_mut(),
1368        ) {
1369            let etag = resp
1370                .output
1371                .e_tag
1372                .clone()
1373                .map(ETag::into_value)
1374                .unwrap_or_default();
1375            let now = chrono::Utc::now();
1376            mgr.commit_put_with_version(
1377                &put_bucket,
1378                &put_key,
1379                crate::versioning::VersionEntry {
1380                    version_id: pv.version_id.clone(),
1381                    etag,
1382                    size: 0,
1383                    is_delete_marker: false,
1384                    created_at: now,
1385                },
1386            );
1387            if pv.versioned_response {
1388                resp.output.version_id = Some(pv.version_id.clone());
1389            }
1390        }
1391        // v0.5 #30: same explicit-then-default lock-state commit as the
1392        // body-bearing branch above, so a zero-length PUT also picks up
1393        // bucket-default retention.
1394        if let (Some(mgr), Ok(_)) = (self.object_lock.as_ref(), backend_resp.as_ref()) {
1395            if explicit_lock_mode.is_some()
1396                || explicit_retain_until.is_some()
1397                || explicit_legal_hold_on.is_some()
1398            {
1399                let mut state = mgr.get(&put_bucket, &put_key).unwrap_or_default();
1400                if let Some(m) = explicit_lock_mode {
1401                    state.mode = Some(m);
1402                }
1403                if let Some(u) = explicit_retain_until {
1404                    state.retain_until = Some(u);
1405                }
1406                if let Some(lh) = explicit_legal_hold_on {
1407                    state.legal_hold_on = lh;
1408                }
1409                mgr.set(&put_bucket, &put_key, state);
1410            }
1411            mgr.apply_default_on_put(&put_bucket, &put_key, chrono::Utc::now());
1412        }
1413        backend_resp
1414    }
1415
1416    // === 圧縮を解く path (GET) ===
1417    #[tracing::instrument(
1418        name = "s4.get_object",
1419        skip(self, req),
1420        fields(bucket = %req.input.bucket, key = %req.input.key, codec, bytes_out, range, path)
1421    )]
1422    async fn get_object(
1423        &self,
1424        mut req: S3Request<GetObjectInput>,
1425    ) -> S3Result<S3Response<GetObjectOutput>> {
1426        let get_start = Instant::now();
1427        let get_bucket = req.input.bucket.clone();
1428        let get_key = req.input.key.clone();
1429        self.enforce_rate_limit(&req, &get_bucket)?;
1430        self.enforce_policy(&req, "s3:GetObject", &get_bucket, Some(&get_key))?;
1431        // Range request の事前検出 (decompress 後 slice する path に使う)。
1432        let range_request = req.input.range.take();
1433        // v0.5 #27: pull SSE-C material from the input headers before
1434        // the request is moved into the backend. A header parse error
1435        // fails fast (no body fetch). The material is consumed below
1436        // when decrypting an S4E3-framed body; the SSE-C headers on
1437        // `req.input` are cleared so the backend doesn't see them.
1438        let sse_c_alg = req.input.sse_customer_algorithm.take();
1439        let sse_c_key = req.input.sse_customer_key.take();
1440        let sse_c_md5 = req.input.sse_customer_key_md5.take();
1441        let get_sse_c_material =
1442            extract_sse_c_material(&sse_c_alg, &sse_c_key, &sse_c_md5)?;
1443
1444        // v0.5 #34: route the GET through the VersioningManager when
1445        // attached AND the bucket is in a versioning-aware state.
1446        // Resolves which version to fetch (explicit `?versionId=` query
1447        // param vs. chain latest), translates a delete-marker into 404
1448        // NoSuchKey, and rewrites the backend storage key to the shadow
1449        // path (`<key>.__s4ver__/<vid>`) for non-null Enabled-bucket
1450        // versions. `resolved_version_id` is stamped onto the response
1451        // so clients see a coherent `x-amz-version-id` header.
1452        //
1453        // When the bucket is Unversioned (or no manager attached), the
1454        // chain-resolution step is skipped and the request flows
1455        // through the existing single-key path unchanged.
1456        let resolved_version_id: Option<String> = match self.versioning.as_ref() {
1457            Some(mgr)
1458                if mgr.state(&get_bucket) != crate::versioning::VersioningState::Unversioned =>
1459            {
1460                let req_vid = req.input.version_id.take();
1461                let entry = match req_vid.as_deref() {
1462                    Some(vid) => mgr.lookup_version(&get_bucket, &get_key, vid).ok_or_else(
1463                        || S3Error::with_message(
1464                            S3ErrorCode::NoSuchVersion,
1465                            format!("no such version: {vid}"),
1466                        ),
1467                    )?,
1468                    None => mgr.lookup_latest(&get_bucket, &get_key).ok_or_else(|| {
1469                        S3Error::with_message(
1470                            S3ErrorCode::NoSuchKey,
1471                            format!("no such key: {get_key}"),
1472                        )
1473                    })?,
1474                };
1475                if entry.is_delete_marker {
1476                    // S3 spec: GET without versionId on a
1477                    // delete-marker latest → 404 NoSuchKey + the
1478                    // response carries `x-amz-delete-marker: true`.
1479                    // GET with explicit versionId pointing at a delete
1480                    // marker → 405 MethodNotAllowed; we surface
1481                    // NoSuchKey here for both since s3s collapses them
1482                    // into the same not-found error path.
1483                    return Err(S3Error::with_message(
1484                        S3ErrorCode::NoSuchKey,
1485                        format!("delete marker is the current version of {get_key}"),
1486                    ));
1487                }
1488                if entry.version_id != crate::versioning::NULL_VERSION_ID {
1489                    req.input.key = versioned_shadow_key(&get_key, &entry.version_id);
1490                }
1491                Some(entry.version_id)
1492            }
1493            _ => None,
1494        };
1495
1496        // ====== Range GET の partial-fetch fast path (sidecar index 利用) ======
1497        // sidecar `<key>.s4index` が存在し、multipart-framed object であれば
1498        // 必要 frame だけを backend に Range GET し帯域節約する。
1499        if let Some(ref r) = range_request
1500            && let Some(index) = self.read_sidecar(&req.input.bucket, &req.input.key).await
1501        {
1502            let total = index.total_original_size();
1503            let (start, end_exclusive) = match resolve_range(r, total) {
1504                Ok(v) => v,
1505                Err(e) => {
1506                    return Err(S3Error::with_message(S3ErrorCode::InvalidRange, e));
1507                }
1508            };
1509            if let Some(plan) = index.lookup_range(start, end_exclusive) {
1510                return self
1511                    .partial_range_get(&req, plan, start, end_exclusive, total, get_start)
1512                    .await;
1513            }
1514        }
1515        let mut resp = self.backend.get_object(req).await?;
1516        // v0.5 #34: stamp the resolved version-id so the client sees a
1517        // coherent `x-amz-version-id` header (only for chains owned by
1518        // the manager — Unversioned buckets / no-manager paths never
1519        // set this).
1520        if let Some(ref vid) = resolved_version_id {
1521            resp.output.version_id = Some(vid.clone());
1522        }
1523        let is_multipart = is_multipart_object(&resp.output.metadata);
1524        let is_framed_v2 = is_framed_v2_object(&resp.output.metadata);
1525        // v0.2 #4: framed-v2 single-PUT は多 frame parse が必要なので
1526        // multipart と同じ path に流す。
1527        let needs_frame_parse = is_multipart || is_framed_v2;
1528        let manifest_opt = extract_manifest(&resp.output.metadata);
1529
1530        if !needs_frame_parse && manifest_opt.is_none() {
1531            // S4 が書いていないオブジェクトは透過 (raw bucket pre-existing object 等)
1532            debug!("S4 get_object: object lacks s4-codec metadata, returning as-is");
1533            return Ok(resp);
1534        }
1535
1536        if let Some(blob) = resp.output.body.take() {
1537            // v0.4 #21 / v0.5 #27: if the object was stored under SSE
1538            // (metadata flag `s4-encrypted: aes-256-gcm`), decrypt
1539            // before any frame parse / streaming decompress. Encrypted
1540            // bodies are opaque to the codec; this also forces the
1541            // buffered path because AES-GCM needs the full body for tag
1542            // verify. SSE-C uses the per-request customer key, SSE-S4
1543            // falls back to the configured keyring.
1544            let blob = if is_sse_encrypted(&resp.output.metadata) {
1545                let body = collect_blob(blob, self.max_body_bytes)
1546                    .await
1547                    .map_err(internal("collect SSE-encrypted body"))?;
1548                // v0.5 #28: peek the frame magic to route the right
1549                // decrypt path. S4E4 means SSE-KMS — unwrap the DEK
1550                // through the KMS backend (async). S4E1/E2/E3 take the
1551                // sync path (keyring or customer key).
1552                let plain = match crate::sse::peek_magic(&body) {
1553                    Some("S4E4") => {
1554                        let kms = self.kms.as_ref().ok_or_else(|| {
1555                            S3Error::with_message(
1556                                S3ErrorCode::InvalidRequest,
1557                                "object is SSE-KMS encrypted but no --kms-local-dir / --kms-aws-region is configured on this gateway",
1558                            )
1559                        })?;
1560                        let kms_ref: &dyn crate::kms::KmsBackend = kms.as_ref();
1561                        crate::sse::decrypt_with_kms(&body, kms_ref)
1562                            .await
1563                            .map_err(|e| match e {
1564                                crate::sse::SseError::KmsBackend(k) => kms_error_to_s3(k),
1565                                other => S3Error::with_message(
1566                                    S3ErrorCode::InternalError,
1567                                    format!("SSE-KMS decrypt failed: {other}"),
1568                                ),
1569                            })?
1570                    }
1571                    _ => {
1572                        if let Some(ref m) = get_sse_c_material {
1573                            crate::sse::decrypt(
1574                                &body,
1575                                crate::sse::SseSource::CustomerKey {
1576                                    key: &m.key,
1577                                    key_md5: &m.key_md5,
1578                                },
1579                            )
1580                            .map_err(sse_c_error_to_s3)?
1581                        } else {
1582                            let keyring = self.sse_keyring.as_ref().ok_or_else(|| {
1583                                S3Error::with_message(
1584                                    S3ErrorCode::InvalidRequest,
1585                                    "object is SSE-S4 encrypted but no --sse-s4-key is configured on this gateway",
1586                                )
1587                            })?;
1588                            crate::sse::decrypt(&body, keyring).map_err(|e| {
1589                                S3Error::with_message(
1590                                    S3ErrorCode::InternalError,
1591                                    format!("SSE-S4 decrypt failed: {e}"),
1592                                )
1593                            })?
1594                        }
1595                    }
1596                };
1597                // v0.5 #28: parse out the on-disk wrapped DEK's key id
1598                // so the GET response can echo `x-amz-server-side-encryption-aws-kms-key-id`.
1599                if matches!(crate::sse::peek_magic(&body), Some("S4E4"))
1600                    && let Ok(hdr) = crate::sse::parse_s4e4_header(&body)
1601                {
1602                    resp.output.server_side_encryption = Some(
1603                        ServerSideEncryption::from_static(ServerSideEncryption::AWS_KMS),
1604                    );
1605                    resp.output.ssekms_key_id = Some(hdr.key_id.to_string());
1606                }
1607                bytes_to_blob(plain)
1608            } else if let Some(ref m) = get_sse_c_material {
1609                // Client sent SSE-C headers for an unencrypted object —
1610                // mirror AWS S3's 400 InvalidRequest.
1611                let _ = m;
1612                return Err(sse_c_error_to_s3(crate::sse::SseError::CustomerKeyUnexpected));
1613            } else {
1614                blob
1615            };
1616            // v0.5 #27: SSE-C echo on success — algorithm + key MD5
1617            // tell the client that the supplied key was the one used.
1618            if let Some(ref m) = get_sse_c_material {
1619                resp.output.sse_customer_algorithm = Some(crate::sse::SSE_C_ALGORITHM.into());
1620                resp.output.sse_customer_key_md5 = Some(
1621                    base64::engine::general_purpose::STANDARD.encode(m.key_md5),
1622                );
1623            }
1624            // ====== Streaming fast path (CpuZstd, non-multipart, codec supports it) ======
1625            // 大規模 object (e.g. 5 GB) を memory に collect すると OOM するので、
1626            // codec が streaming-aware なら body を chunk-by-chunk で decompress して
1627            // 即座に client に流す。
1628            //
1629            // ただし Range request 時は streaming できない (slice するため total bytes
1630            // が必要) → buffered path に fall through。
1631            if range_request.is_none()
1632                && !needs_frame_parse
1633                && let Some(ref m) = manifest_opt
1634                && supports_streaming_decompress(m.codec)
1635                && m.codec == CodecKind::CpuZstd
1636            {
1637                let decompressed_blob = cpu_zstd_decompress_stream(blob);
1638                resp.output.content_length = Some(m.original_size as i64);
1639                resp.output.checksum_crc32 = None;
1640                resp.output.checksum_crc32c = None;
1641                resp.output.checksum_crc64nvme = None;
1642                resp.output.checksum_sha1 = None;
1643                resp.output.checksum_sha256 = None;
1644                resp.output.e_tag = None;
1645                resp.output.body = Some(decompressed_blob);
1646                let elapsed = get_start.elapsed();
1647                crate::metrics::record_get(
1648                    m.codec.as_str(),
1649                    m.compressed_size,
1650                    m.original_size,
1651                    elapsed.as_secs_f64(),
1652                    true,
1653                );
1654                info!(
1655                    op = "get_object",
1656                    bucket = %get_bucket,
1657                    key = %get_key,
1658                    codec = m.codec.as_str(),
1659                    bytes_in = m.compressed_size,
1660                    bytes_out = m.original_size,
1661                    path = "streaming",
1662                    setup_latency_ms = elapsed.as_millis() as u64,
1663                    "S4 get started (streaming)"
1664                );
1665                return Ok(resp);
1666            }
1667            // Passthrough: そのまま流す (Range なしの場合のみ streaming)
1668            if range_request.is_none()
1669                && !needs_frame_parse
1670                && let Some(ref m) = manifest_opt
1671                && m.codec == CodecKind::Passthrough
1672            {
1673                resp.output.content_length = Some(m.original_size as i64);
1674                resp.output.checksum_crc32 = None;
1675                resp.output.checksum_crc32c = None;
1676                resp.output.checksum_crc64nvme = None;
1677                resp.output.checksum_sha1 = None;
1678                resp.output.checksum_sha256 = None;
1679                resp.output.e_tag = None;
1680                resp.output.body = Some(blob);
1681                debug!("S4 get_object: passthrough streaming");
1682                return Ok(resp);
1683            }
1684
1685            // ====== Buffered slow path (multipart frame parser, GPU codecs) ======
1686            let bytes = collect_blob(blob, self.max_body_bytes)
1687                .await
1688                .map_err(internal("collect get body"))?;
1689
1690            let decompressed = if needs_frame_parse {
1691                // multipart objects と framed-v2 single-PUT objects は同じ
1692                // S4F2 frame 列なので decompress_multipart で統一処理
1693                self.decompress_multipart(bytes).await?
1694            } else {
1695                let manifest = manifest_opt.as_ref().expect("non-multipart guarded above");
1696                self.registry
1697                    .decompress(bytes, manifest)
1698                    .await
1699                    .map_err(internal("registry decompress"))?
1700            };
1701
1702            // Range request があれば slice。なければ full body を返す。
1703            let total_size = decompressed.len() as u64;
1704            let (final_bytes, status_override) = if let Some(r) = range_request.as_ref() {
1705                let (start, end) = resolve_range(r, total_size)
1706                    .map_err(|e| S3Error::with_message(S3ErrorCode::InvalidRange, e))?;
1707                let sliced = decompressed.slice(start as usize..end as usize);
1708                resp.output.content_range = Some(format!(
1709                    "bytes {start}-{}/{total_size}",
1710                    end.saturating_sub(1)
1711                ));
1712                (sliced, Some(http::StatusCode::PARTIAL_CONTENT))
1713            } else {
1714                (decompressed, None)
1715            };
1716            // 解凍後の真のサイズを返す (S3 client は content_length を信頼するので
1717            // 圧縮 size のままだと downstream が body を途中で切ってしまう)
1718            resp.output.content_length = Some(final_bytes.len() as i64);
1719            // 圧縮済 bytes の checksum を返すと AWS SDK 側で StreamingError
1720            // (ChecksumMismatch) になる。ETag も backend が返した「圧縮済 bytes の
1721            // MD5/checksum」なので意味的にズレる — クリアして S4 自身の crc32c
1722            // (manifest 内 / frame 内) で integrity を保証する設計にする。
1723            resp.output.checksum_crc32 = None;
1724            resp.output.checksum_crc32c = None;
1725            resp.output.checksum_crc64nvme = None;
1726            resp.output.checksum_sha1 = None;
1727            resp.output.checksum_sha256 = None;
1728            resp.output.e_tag = None;
1729            let returned_size = final_bytes.len() as u64;
1730            let codec_label = manifest_opt
1731                .as_ref()
1732                .map(|m| m.codec.as_str())
1733                .unwrap_or("multipart");
1734            resp.output.body = Some(bytes_to_blob(final_bytes));
1735            if let Some(status) = status_override {
1736                resp.status = Some(status);
1737            }
1738            let elapsed = get_start.elapsed();
1739            crate::metrics::record_get(codec_label, 0, returned_size, elapsed.as_secs_f64(), true);
1740            info!(
1741                op = "get_object",
1742                bucket = %get_bucket,
1743                key = %get_key,
1744                codec = codec_label,
1745                bytes_out = returned_size,
1746                total_object_size = total_size,
1747                range = range_request.is_some(),
1748                path = "buffered",
1749                latency_ms = elapsed.as_millis() as u64,
1750                "S4 get completed (buffered)"
1751            );
1752        }
1753        Ok(resp)
1754    }
1755
1756    // === passthrough delegations ===
1757    async fn head_bucket(
1758        &self,
1759        req: S3Request<HeadBucketInput>,
1760    ) -> S3Result<S3Response<HeadBucketOutput>> {
1761        self.backend.head_bucket(req).await
1762    }
1763    async fn list_buckets(
1764        &self,
1765        req: S3Request<ListBucketsInput>,
1766    ) -> S3Result<S3Response<ListBucketsOutput>> {
1767        self.backend.list_buckets(req).await
1768    }
1769    async fn create_bucket(
1770        &self,
1771        req: S3Request<CreateBucketInput>,
1772    ) -> S3Result<S3Response<CreateBucketOutput>> {
1773        self.backend.create_bucket(req).await
1774    }
1775    async fn delete_bucket(
1776        &self,
1777        req: S3Request<DeleteBucketInput>,
1778    ) -> S3Result<S3Response<DeleteBucketOutput>> {
1779        self.backend.delete_bucket(req).await
1780    }
1781    async fn head_object(
1782        &self,
1783        req: S3Request<HeadObjectInput>,
1784    ) -> S3Result<S3Response<HeadObjectOutput>> {
1785        let mut resp = self.backend.head_object(req).await?;
1786        if let Some(manifest) = extract_manifest(&resp.output.metadata) {
1787            // 客側には decompress 後の意味のある content_length / checksum を返す。
1788            // backend が返す圧縮済 bytes の checksum / e_tag は意味が違うため除去
1789            // (S4 は manifest 内の crc32c で integrity を担保する)。
1790            resp.output.content_length = Some(manifest.original_size as i64);
1791            resp.output.checksum_crc32 = None;
1792            resp.output.checksum_crc32c = None;
1793            resp.output.checksum_crc64nvme = None;
1794            resp.output.checksum_sha1 = None;
1795            resp.output.checksum_sha256 = None;
1796            resp.output.e_tag = None;
1797        }
1798        Ok(resp)
1799    }
1800    async fn delete_object(
1801        &self,
1802        mut req: S3Request<DeleteObjectInput>,
1803    ) -> S3Result<S3Response<DeleteObjectOutput>> {
1804        let bucket = req.input.bucket.clone();
1805        let key = req.input.key.clone();
1806        self.enforce_rate_limit(&req, &bucket)?;
1807        self.enforce_policy(&req, "s3:DeleteObject", &bucket, Some(&key))?;
1808        // v0.5 #30: refuse the delete while a WORM lock is in effect.
1809        // Compliance can never be bypassed; Governance can be overridden
1810        // via `x-amz-bypass-governance-retention: true`; legal hold
1811        // never. The check happens before the versioning router so a
1812        // locked object can't be soft-deleted (delete-marker push) on an
1813        // Enabled bucket either — S3 spec says lock applies to all
1814        // delete forms.
1815        if let Some(mgr) = self.object_lock.as_ref()
1816            && let Some(state) = mgr.get(&bucket, &key)
1817        {
1818            let bypass = req.input.bypass_governance_retention.unwrap_or(false);
1819            let now = chrono::Utc::now();
1820            if !state.can_delete(now, bypass) {
1821                crate::metrics::record_policy_denial("s3:DeleteObject", &bucket);
1822                return Err(S3Error::with_message(
1823                    S3ErrorCode::AccessDenied,
1824                    "Access Denied because object protected by object lock",
1825                ));
1826            }
1827        }
1828        // v0.5 #34: route DELETE through the VersioningManager when the
1829        // bucket is in a versioning-aware state.
1830        //
1831        // - Enabled bucket, no version_id → push a delete marker into
1832        //   the chain. NO backend object is touched (older versions
1833        //   stay reachable via specific-version GET).
1834        // - Enabled / Suspended bucket, with version_id → physical
1835        //   delete. Backend bytes at the shadow key (or `<key>` for
1836        //   `null`) are removed; chain entry is dropped. If the deleted
1837        //   entry was a delete marker, no backend bytes exist for it
1838        //   (record-only).
1839        // - Suspended bucket, no version_id → push a "null" delete
1840        //   marker (S3 spec); backend bytes at `<key>` are physically
1841        //   removed (same as legacy).
1842        // - Unversioned bucket → fall through to legacy passthrough.
1843        if let Some(mgr) = self.versioning.as_ref() {
1844            let state = mgr.state(&bucket);
1845            if state != crate::versioning::VersioningState::Unversioned {
1846                let req_vid = req.input.version_id.take();
1847                if let Some(vid) = req_vid {
1848                    // Specific-version DELETE: touch backend bytes only
1849                    // when the entry was a real version (not a delete
1850                    // marker, which has no backend bytes).
1851                    let outcome = mgr.record_delete_specific(&bucket, &key, &vid);
1852                    let backend_target = if vid == crate::versioning::NULL_VERSION_ID {
1853                        key.clone()
1854                    } else {
1855                        versioned_shadow_key(&key, &vid)
1856                    };
1857                    let was_real_version = outcome
1858                        .as_ref()
1859                        .map(|o| !o.is_delete_marker)
1860                        .unwrap_or(false);
1861                    if was_real_version {
1862                        // Best-effort backend cleanup; missing bytes
1863                        // are not an error (e.g. shadow key already
1864                        // GC'd).
1865                        let backend_input = DeleteObjectInput {
1866                            bucket: bucket.clone(),
1867                            key: backend_target,
1868                            ..Default::default()
1869                        };
1870                        let backend_req = S3Request {
1871                            input: backend_input,
1872                            method: http::Method::DELETE,
1873                            uri: req.uri.clone(),
1874                            headers: req.headers.clone(),
1875                            extensions: http::Extensions::new(),
1876                            credentials: req.credentials.clone(),
1877                            region: req.region.clone(),
1878                            service: req.service.clone(),
1879                            trailing_headers: None,
1880                        };
1881                        let _ = self.backend.delete_object(backend_req).await;
1882                    }
1883                    let mut output = DeleteObjectOutput {
1884                        version_id: Some(vid.clone()),
1885                        ..Default::default()
1886                    };
1887                    if let Some(o) = outcome.as_ref()
1888                        && o.is_delete_marker
1889                    {
1890                        output.delete_marker = Some(true);
1891                    }
1892                    return Ok(S3Response::new(output));
1893                }
1894                // No version_id: record a delete marker (state-aware).
1895                let outcome = mgr.record_delete(&bucket, &key);
1896                if state == crate::versioning::VersioningState::Suspended {
1897                    // Suspended buckets also evict the prior `<key>`
1898                    // bytes (the previous null version is gone too).
1899                    let backend_input = DeleteObjectInput {
1900                        bucket: bucket.clone(),
1901                        key: key.clone(),
1902                        ..Default::default()
1903                    };
1904                    let backend_req = S3Request {
1905                        input: backend_input,
1906                        method: http::Method::DELETE,
1907                        uri: req.uri.clone(),
1908                        headers: req.headers.clone(),
1909                        extensions: http::Extensions::new(),
1910                        credentials: req.credentials.clone(),
1911                        region: req.region.clone(),
1912                        service: req.service.clone(),
1913                        trailing_headers: None,
1914                    };
1915                    let _ = self.backend.delete_object(backend_req).await;
1916                }
1917                let output = DeleteObjectOutput {
1918                    delete_marker: Some(true),
1919                    version_id: outcome.version_id,
1920                    ..Default::default()
1921                };
1922                return Ok(S3Response::new(output));
1923            }
1924        }
1925        // Legacy / Unversioned path: physical delete on the backend +
1926        // best-effort sidecar cleanup (mirrors v0.4 behaviour).
1927        let resp = self.backend.delete_object(req).await?;
1928        // v0.5 #30: drop any per-object lock state once the delete has
1929        // succeeded so the freed key can be re-armed by a future PUT
1930        // under the bucket default. Reaching here implies the lock had
1931        // already passed `can_delete` above, so this is purely cleanup.
1932        if let Some(mgr) = self.object_lock.as_ref() {
1933            mgr.clear(&bucket, &key);
1934        }
1935        let sidecar_input = DeleteObjectInput {
1936            bucket: bucket.clone(),
1937            key: sidecar_key(&key),
1938            ..Default::default()
1939        };
1940        let sidecar_req = S3Request {
1941            input: sidecar_input,
1942            method: http::Method::DELETE,
1943            uri: format!("/{bucket}/{}", sidecar_key(&key)).parse().unwrap(),
1944            headers: http::HeaderMap::new(),
1945            extensions: http::Extensions::new(),
1946            credentials: None,
1947            region: None,
1948            service: None,
1949            trailing_headers: None,
1950        };
1951        let _ = self.backend.delete_object(sidecar_req).await;
1952        Ok(resp)
1953    }
1954    async fn delete_objects(
1955        &self,
1956        req: S3Request<DeleteObjectsInput>,
1957    ) -> S3Result<S3Response<DeleteObjectsOutput>> {
1958        self.backend.delete_objects(req).await
1959    }
1960    async fn copy_object(
1961        &self,
1962        mut req: S3Request<CopyObjectInput>,
1963    ) -> S3Result<S3Response<CopyObjectOutput>> {
1964        // copy is conceptually "GetObject src + PutObject dst" — enforce both.
1965        let dst_bucket = req.input.bucket.clone();
1966        let dst_key = req.input.key.clone();
1967        self.enforce_policy(&req, "s3:PutObject", &dst_bucket, Some(&dst_key))?;
1968        if let CopySource::Bucket { bucket, key, .. } = &req.input.copy_source {
1969            self.enforce_policy(&req, "s3:GetObject", bucket, Some(key))?;
1970        }
1971        // S4-aware copy: source object に s4-* metadata がある場合、それを
1972        // destination に確実に preserve する。
1973        //
1974        // - MetadataDirective::COPY (default): backend が source metadata を
1975        //   そのまま copy するので S4 metadata も自動で渡る。介入不要
1976        // - MetadataDirective::REPLACE: 客が指定した metadata で source を
1977        //   上書き → s4-* metadata が消えると destination は decompress 不能に
1978        //   なる (silent corruption)。S4 が source metadata を HEAD で取得し、
1979        //   s4-* fields を input.metadata に強制 merge する
1980        let needs_merge = req
1981            .input
1982            .metadata_directive
1983            .as_ref()
1984            .map(|d| d.as_str() == MetadataDirective::REPLACE)
1985            .unwrap_or(false);
1986        if needs_merge && let CopySource::Bucket { bucket, key, .. } = &req.input.copy_source {
1987            let head_input = HeadObjectInput {
1988                bucket: bucket.to_string(),
1989                key: key.to_string(),
1990                ..Default::default()
1991            };
1992            let head_req = S3Request {
1993                input: head_input,
1994                method: req.method.clone(),
1995                uri: req.uri.clone(),
1996                headers: req.headers.clone(),
1997                extensions: http::Extensions::new(),
1998                credentials: req.credentials.clone(),
1999                region: req.region.clone(),
2000                service: req.service.clone(),
2001                trailing_headers: None,
2002            };
2003            if let Ok(head) = self.backend.head_object(head_req).await
2004                && let Some(src_meta) = head.output.metadata.as_ref()
2005            {
2006                let dest_meta = req.input.metadata.get_or_insert_with(Default::default);
2007                for key in [
2008                    META_CODEC,
2009                    META_ORIGINAL_SIZE,
2010                    META_COMPRESSED_SIZE,
2011                    META_CRC32C,
2012                    META_MULTIPART,
2013                    META_FRAMED,
2014                ] {
2015                    if let Some(v) = src_meta.get(key) {
2016                        // 客が同じ key を指定していたら preserve しない (= 上書き許可)
2017                        // していたら何もしない。指定していなければ insert
2018                        dest_meta
2019                            .entry(key.to_string())
2020                            .or_insert_with(|| v.clone());
2021                    }
2022                }
2023                debug!(
2024                    src_bucket = %bucket,
2025                    src_key = %key,
2026                    "S4 copy_object: preserved s4-* metadata across REPLACE directive"
2027                );
2028            }
2029        }
2030        self.backend.copy_object(req).await
2031    }
2032    async fn list_objects(
2033        &self,
2034        req: S3Request<ListObjectsInput>,
2035    ) -> S3Result<S3Response<ListObjectsOutput>> {
2036        self.enforce_rate_limit(&req, &req.input.bucket)?;
2037        self.enforce_policy(&req, "s3:ListBucket", &req.input.bucket, None)?;
2038        let mut resp = self.backend.list_objects(req).await?;
2039        // S4 内部 object (`*.s4index` sidecar、`.__s4ver__/` shadow versions
2040        // — v0.5 #34) を顧客から隠す。
2041        if let Some(contents) = resp.output.contents.as_mut() {
2042            contents.retain(|o| {
2043                o.key
2044                    .as_ref()
2045                    .map(|k| !k.ends_with(".s4index") && !is_versioning_shadow_key(k))
2046                    .unwrap_or(true)
2047            });
2048        }
2049        Ok(resp)
2050    }
2051    async fn list_objects_v2(
2052        &self,
2053        req: S3Request<ListObjectsV2Input>,
2054    ) -> S3Result<S3Response<ListObjectsV2Output>> {
2055        self.enforce_rate_limit(&req, &req.input.bucket)?;
2056        self.enforce_policy(&req, "s3:ListBucket", &req.input.bucket, None)?;
2057        let mut resp = self.backend.list_objects_v2(req).await?;
2058        if let Some(contents) = resp.output.contents.as_mut() {
2059            let before = contents.len();
2060            contents.retain(|o| {
2061                o.key
2062                    .as_ref()
2063                    .map(|k| !k.ends_with(".s4index") && !is_versioning_shadow_key(k))
2064                    .unwrap_or(true)
2065            });
2066            // key_count も補正 (S3 spec compliance)
2067            if let Some(kc) = resp.output.key_count.as_mut() {
2068                *kc -= (before - contents.len()) as i32;
2069            }
2070        }
2071        Ok(resp)
2072    }
2073    /// v0.4 #17: filter S4-internal sidecars from versioned listings.
2074    /// v0.5 #34: when a [`crate::versioning::VersioningManager`] is
2075    /// attached AND the bucket is in a versioning-aware state, build
2076    /// the `Versions` / `DeleteMarkers` arrays directly from the
2077    /// in-memory chain (paginated + ordered the S3 way: key asc,
2078    /// version newest-first inside each key). Otherwise fall back to
2079    /// passthrough + sidecar-filter (legacy v0.4 behaviour).
2080    async fn list_object_versions(
2081        &self,
2082        req: S3Request<ListObjectVersionsInput>,
2083    ) -> S3Result<S3Response<ListObjectVersionsOutput>> {
2084        self.enforce_rate_limit(&req, &req.input.bucket)?;
2085        self.enforce_policy(&req, "s3:ListBucket", &req.input.bucket, None)?;
2086        // v0.5 #34: VersioningManager-owned path.
2087        if let Some(mgr) = self.versioning.as_ref()
2088            && mgr.state(&req.input.bucket) != crate::versioning::VersioningState::Unversioned
2089        {
2090            let max_keys = req.input.max_keys.unwrap_or(1000) as usize;
2091            let page = mgr.list_versions(
2092                &req.input.bucket,
2093                req.input.prefix.as_deref(),
2094                req.input.key_marker.as_deref(),
2095                req.input.version_id_marker.as_deref(),
2096                max_keys,
2097            );
2098            let versions: Vec<ObjectVersion> = page
2099                .versions
2100                .into_iter()
2101                .map(|e| ObjectVersion {
2102                    key: Some(e.key),
2103                    version_id: Some(e.version_id),
2104                    is_latest: Some(e.is_latest),
2105                    e_tag: Some(ETag::Strong(e.etag)),
2106                    size: Some(e.size as i64),
2107                    last_modified: Some(std::time::SystemTime::from(e.last_modified).into()),
2108                    ..Default::default()
2109                })
2110                .collect();
2111            let delete_markers: Vec<DeleteMarkerEntry> = page
2112                .delete_markers
2113                .into_iter()
2114                .map(|e| DeleteMarkerEntry {
2115                    key: Some(e.key),
2116                    version_id: Some(e.version_id),
2117                    is_latest: Some(e.is_latest),
2118                    last_modified: Some(std::time::SystemTime::from(e.last_modified).into()),
2119                    ..Default::default()
2120                })
2121                .collect();
2122            let output = ListObjectVersionsOutput {
2123                name: Some(req.input.bucket.clone()),
2124                prefix: req.input.prefix.clone(),
2125                key_marker: req.input.key_marker.clone(),
2126                version_id_marker: req.input.version_id_marker.clone(),
2127                max_keys: req.input.max_keys,
2128                versions: if versions.is_empty() {
2129                    None
2130                } else {
2131                    Some(versions)
2132                },
2133                delete_markers: if delete_markers.is_empty() {
2134                    None
2135                } else {
2136                    Some(delete_markers)
2137                },
2138                is_truncated: Some(page.is_truncated),
2139                next_key_marker: page.next_key_marker,
2140                next_version_id_marker: page.next_version_id_marker,
2141                ..Default::default()
2142            };
2143            return Ok(S3Response::new(output));
2144        }
2145        // Legacy passthrough path (v0.4 #17 sidecar filter retained).
2146        let mut resp = self.backend.list_object_versions(req).await?;
2147        if let Some(versions) = resp.output.versions.as_mut() {
2148            versions.retain(|v| {
2149                v.key
2150                    .as_ref()
2151                    .map(|k| !k.ends_with(".s4index") && !is_versioning_shadow_key(k))
2152                    .unwrap_or(true)
2153            });
2154        }
2155        if let Some(markers) = resp.output.delete_markers.as_mut() {
2156            markers.retain(|m| {
2157                m.key
2158                    .as_ref()
2159                    .map(|k| !k.ends_with(".s4index") && !is_versioning_shadow_key(k))
2160                    .unwrap_or(true)
2161            });
2162        }
2163        Ok(resp)
2164    }
2165
2166    async fn create_multipart_upload(
2167        &self,
2168        mut req: S3Request<CreateMultipartUploadInput>,
2169    ) -> S3Result<S3Response<CreateMultipartUploadOutput>> {
2170        // Multipart object は per-part 圧縮 + frame 形式で書く。GET 時に
2171        // frame parse を起動するため、object metadata に flag を立てる。
2172        // codec は dispatcher の default kind を採用 (per-part 別 codec は Phase 2)。
2173        let codec_kind = self.registry.default_kind();
2174        let meta = req.input.metadata.get_or_insert_with(Default::default);
2175        meta.insert(META_MULTIPART.into(), "true".into());
2176        meta.insert(META_CODEC.into(), codec_kind.as_str().into());
2177        debug!(
2178            bucket = ?req.input.bucket,
2179            key = ?req.input.key,
2180            codec = codec_kind.as_str(),
2181            "S4 create_multipart_upload: marking object for per-part compression"
2182        );
2183        self.backend.create_multipart_upload(req).await
2184    }
2185
2186    async fn upload_part(
2187        &self,
2188        mut req: S3Request<UploadPartInput>,
2189    ) -> S3Result<S3Response<UploadPartOutput>> {
2190        // 各 part を圧縮して frame header 付きで forward。GET 時に
2191        // `decompress_multipart` が frame iter で順に解凍する。
2192        // **per-part codec dispatch**: dispatcher が body 先頭 sample から
2193        // codec を選ぶので、parquet 風の mixed-content multipart で part ごとに
2194        // 最適 codec を使える (整数列 part → Bitcomp、text 列 part → zstd 等)。
2195        if let Some(blob) = req.input.body.take() {
2196            let bytes = collect_blob(blob, self.max_body_bytes)
2197                .await
2198                .map_err(internal("collect upload_part body"))?;
2199            let sample_len = bytes.len().min(SAMPLE_BYTES);
2200            let codec_kind = self.dispatcher.pick(&bytes[..sample_len]).await;
2201            let original_size = bytes.len() as u64;
2202            let (compressed, manifest) = self
2203                .registry
2204                .compress(bytes, codec_kind)
2205                .await
2206                .map_err(internal("registry compress part"))?;
2207            let header = FrameHeader {
2208                codec: codec_kind,
2209                original_size,
2210                compressed_size: compressed.len() as u64,
2211                crc32c: manifest.crc32c,
2212            };
2213            let mut framed = BytesMut::with_capacity(FRAME_HEADER_BYTES + compressed.len());
2214            write_frame(&mut framed, header, &compressed);
2215            // v0.2 #5: heuristic-based padding skip for likely-final parts.
2216            //
2217            // AWS SDK / aws-cli / boto3 always send the final (and only the
2218            // final) part below the configured part_size. So if the raw user
2219            // part is already smaller than S3's 5 MiB multipart minimum, this
2220            // is overwhelmingly likely to be the final part — and the final
2221            // part is exempt from S3's size constraint. Skipping padding here
2222            // saves up to ~5 MiB per object on highly compressible workloads.
2223            //
2224            // If a misbehaving client sends a tiny **non-final** part, S3
2225            // itself rejects with EntityTooSmall at CompleteMultipartUpload —
2226            // identical outcome to a vanilla S3 PUT, just earlier than
2227            // padding-then-complete would catch it.
2228            let likely_final = original_size < S3_MULTIPART_MIN_PART_BYTES as u64;
2229            if !likely_final {
2230                pad_to_minimum(&mut framed, S3_MULTIPART_MIN_PART_BYTES);
2231            }
2232            let framed_bytes = framed.freeze();
2233            let new_len = framed_bytes.len() as i64;
2234            // 同じ wire 互換問題が multipart にもある (content-length / checksum)
2235            req.input.content_length = Some(new_len);
2236            req.input.checksum_algorithm = None;
2237            req.input.checksum_crc32 = None;
2238            req.input.checksum_crc32c = None;
2239            req.input.checksum_crc64nvme = None;
2240            req.input.checksum_sha1 = None;
2241            req.input.checksum_sha256 = None;
2242            req.input.content_md5 = None;
2243            req.input.body = Some(bytes_to_blob(framed_bytes));
2244            debug!(
2245                part_number = ?req.input.part_number,
2246                upload_id = ?req.input.upload_id,
2247                original_size,
2248                framed_size = new_len,
2249                "S4 upload_part: framed compressed payload"
2250            );
2251        }
2252        self.backend.upload_part(req).await
2253    }
2254    async fn complete_multipart_upload(
2255        &self,
2256        req: S3Request<CompleteMultipartUploadInput>,
2257    ) -> S3Result<S3Response<CompleteMultipartUploadOutput>> {
2258        let bucket = req.input.bucket.clone();
2259        let key = req.input.key.clone();
2260        let resp = self.backend.complete_multipart_upload(req).await?;
2261        // CompleteMultipartUpload 成功 → 完成した object を full fetch して frame
2262        // index を build、`<key>.s4index` sidecar として保存。これで Range GET の
2263        // partial fetch path が利用可能になる (Range request の帯域節約)。
2264        // 注: 巨大 object の場合この pass は重いが、Range query は一度 sidecar が
2265        // できれば爆速になるので 1 回の cost は payback される
2266        let bucket_clone = bucket.clone();
2267        let key_clone = key.clone();
2268        let get_input = GetObjectInput {
2269            bucket: bucket_clone.clone(),
2270            key: key_clone.clone(),
2271            ..Default::default()
2272        };
2273        let get_req = S3Request {
2274            input: get_input,
2275            method: http::Method::GET,
2276            uri: format!("/{bucket_clone}/{key_clone}").parse().unwrap(),
2277            headers: http::HeaderMap::new(),
2278            extensions: http::Extensions::new(),
2279            credentials: None,
2280            region: None,
2281            service: None,
2282            trailing_headers: None,
2283        };
2284        if let Ok(get_resp) = self.backend.get_object(get_req).await
2285            && let Some(blob) = get_resp.output.body
2286            && let Ok(body) = collect_blob(blob, self.max_body_bytes).await
2287            && let Ok(index) = build_index_from_body(&body)
2288        {
2289            self.write_sidecar(&bucket, &key, &index).await;
2290        }
2291        Ok(resp)
2292    }
2293    async fn abort_multipart_upload(
2294        &self,
2295        req: S3Request<AbortMultipartUploadInput>,
2296    ) -> S3Result<S3Response<AbortMultipartUploadOutput>> {
2297        self.backend.abort_multipart_upload(req).await
2298    }
2299    async fn list_multipart_uploads(
2300        &self,
2301        req: S3Request<ListMultipartUploadsInput>,
2302    ) -> S3Result<S3Response<ListMultipartUploadsOutput>> {
2303        self.backend.list_multipart_uploads(req).await
2304    }
2305    async fn list_parts(
2306        &self,
2307        req: S3Request<ListPartsInput>,
2308    ) -> S3Result<S3Response<ListPartsOutput>> {
2309        self.backend.list_parts(req).await
2310    }
2311
2312    // =========================================================================
2313    // Phase 2 — pure passthrough delegations。S4 はこれらに対して圧縮 hook を
2314    // 持たないので、backend (= AWS S3) の動作と完全に同一。
2315    //
2316    // 既知の制限事項:
2317    // - copy_object / upload_part_copy: source object が S4-compressed の場合、
2318    //   backend が bytes を copy するだけなので metadata (s4-codec etc) も一緒に
2319    //   coppied される (AWS S3 default = MetadataDirective COPY)。GET は manifest
2320    //   経由で正しく decompress できる。MetadataDirective REPLACE で上書き
2321    //   されると圧縮 metadata が消えて壊れる — 顧客側の運用で注意
2322    // - list_object_versions: versioning enabled bucket では各 version も S4
2323    //   metadata を維持する。古い version も S4 経由で正しく GET できる。
2324    // =========================================================================
2325
2326    // ---- Object ACL / tagging / attributes ----
2327    async fn get_object_acl(
2328        &self,
2329        req: S3Request<GetObjectAclInput>,
2330    ) -> S3Result<S3Response<GetObjectAclOutput>> {
2331        self.backend.get_object_acl(req).await
2332    }
2333    async fn put_object_acl(
2334        &self,
2335        req: S3Request<PutObjectAclInput>,
2336    ) -> S3Result<S3Response<PutObjectAclOutput>> {
2337        self.backend.put_object_acl(req).await
2338    }
2339    async fn get_object_tagging(
2340        &self,
2341        req: S3Request<GetObjectTaggingInput>,
2342    ) -> S3Result<S3Response<GetObjectTaggingOutput>> {
2343        self.backend.get_object_tagging(req).await
2344    }
2345    async fn put_object_tagging(
2346        &self,
2347        req: S3Request<PutObjectTaggingInput>,
2348    ) -> S3Result<S3Response<PutObjectTaggingOutput>> {
2349        self.backend.put_object_tagging(req).await
2350    }
2351    async fn delete_object_tagging(
2352        &self,
2353        req: S3Request<DeleteObjectTaggingInput>,
2354    ) -> S3Result<S3Response<DeleteObjectTaggingOutput>> {
2355        self.backend.delete_object_tagging(req).await
2356    }
2357    async fn get_object_attributes(
2358        &self,
2359        req: S3Request<GetObjectAttributesInput>,
2360    ) -> S3Result<S3Response<GetObjectAttributesOutput>> {
2361        self.backend.get_object_attributes(req).await
2362    }
2363    async fn restore_object(
2364        &self,
2365        req: S3Request<RestoreObjectInput>,
2366    ) -> S3Result<S3Response<RestoreObjectOutput>> {
2367        self.backend.restore_object(req).await
2368    }
2369    async fn upload_part_copy(
2370        &self,
2371        req: S3Request<UploadPartCopyInput>,
2372    ) -> S3Result<S3Response<UploadPartCopyOutput>> {
2373        // v0.2 #6: byte-range aware copy when the source is S4-framed.
2374        //
2375        // For a framed source (multipart upload OR single-PUT framed-v2),
2376        // a naive byte-range passthrough would copy compressed bytes that
2377        // don't align with S4 frame boundaries — silently corrupting the
2378        // result. Instead we GET the source through S4 (which handles
2379        // decompression + Range), re-compress + re-frame as a new part,
2380        // and forward as upload_part. For non-framed sources (S4-untouched
2381        // raw objects), passthrough is correct and we keep the original
2382        // (cheaper) code path.
2383        let CopySource::Bucket {
2384            bucket: src_bucket,
2385            key: src_key,
2386            ..
2387        } = &req.input.copy_source
2388        else {
2389            return self.backend.upload_part_copy(req).await;
2390        };
2391        let src_bucket = src_bucket.to_string();
2392        let src_key = src_key.to_string();
2393
2394        // Probe metadata to decide whether the source needs S4-aware copy.
2395        let head_input = HeadObjectInput {
2396            bucket: src_bucket.clone(),
2397            key: src_key.clone(),
2398            ..Default::default()
2399        };
2400        let head_req = S3Request {
2401            input: head_input,
2402            method: http::Method::HEAD,
2403            uri: req.uri.clone(),
2404            headers: req.headers.clone(),
2405            extensions: http::Extensions::new(),
2406            credentials: req.credentials.clone(),
2407            region: req.region.clone(),
2408            service: req.service.clone(),
2409            trailing_headers: None,
2410        };
2411        let needs_s4_copy = match self.backend.head_object(head_req).await {
2412            Ok(h) => {
2413                is_multipart_object(&h.output.metadata) || is_framed_v2_object(&h.output.metadata)
2414            }
2415            Err(_) => false,
2416        };
2417        if !needs_s4_copy {
2418            return self.backend.upload_part_copy(req).await;
2419        }
2420
2421        // Resolve the optional source byte range to pass to GET.
2422        let source_range = req
2423            .input
2424            .copy_source_range
2425            .as_ref()
2426            .map(|r| parse_copy_source_range(r))
2427            .transpose()
2428            .map_err(|e| S3Error::with_message(S3ErrorCode::InvalidRange, e))?;
2429
2430        // GET source via S4 (handles decompression + sidecar partial fetch
2431        // when range is present). The result is the requested user-visible
2432        // byte range, fully decompressed.
2433        let mut get_input = GetObjectInput {
2434            bucket: src_bucket.clone(),
2435            key: src_key.clone(),
2436            ..Default::default()
2437        };
2438        get_input.range = source_range;
2439        let get_req = S3Request {
2440            input: get_input,
2441            method: http::Method::GET,
2442            uri: req.uri.clone(),
2443            headers: req.headers.clone(),
2444            extensions: http::Extensions::new(),
2445            credentials: req.credentials.clone(),
2446            region: req.region.clone(),
2447            service: req.service.clone(),
2448            trailing_headers: None,
2449        };
2450        let get_resp = self.get_object(get_req).await?;
2451        let blob = get_resp.output.body.ok_or_else(|| {
2452            S3Error::with_message(
2453                S3ErrorCode::InternalError,
2454                "upload_part_copy: empty body from source GET",
2455            )
2456        })?;
2457        let bytes = collect_blob(blob, self.max_body_bytes)
2458            .await
2459            .map_err(internal("collect upload_part_copy source body"))?;
2460
2461        // Compress + frame as a fresh part (mirrors upload_part path).
2462        let sample_len = bytes.len().min(SAMPLE_BYTES);
2463        let codec_kind = self.dispatcher.pick(&bytes[..sample_len]).await;
2464        let original_size = bytes.len() as u64;
2465        let (compressed, manifest) = self
2466            .registry
2467            .compress(bytes, codec_kind)
2468            .await
2469            .map_err(internal("registry compress upload_part_copy"))?;
2470        let header = FrameHeader {
2471            codec: codec_kind,
2472            original_size,
2473            compressed_size: compressed.len() as u64,
2474            crc32c: manifest.crc32c,
2475        };
2476        let mut framed = BytesMut::with_capacity(FRAME_HEADER_BYTES + compressed.len());
2477        write_frame(&mut framed, header, &compressed);
2478        let likely_final = original_size < S3_MULTIPART_MIN_PART_BYTES as u64;
2479        if !likely_final {
2480            pad_to_minimum(&mut framed, S3_MULTIPART_MIN_PART_BYTES);
2481        }
2482        let framed_bytes = framed.freeze();
2483        let framed_len = framed_bytes.len() as i64;
2484
2485        // Forward as upload_part to the destination multipart upload.
2486        let part_input = UploadPartInput {
2487            bucket: req.input.bucket.clone(),
2488            key: req.input.key.clone(),
2489            part_number: req.input.part_number,
2490            upload_id: req.input.upload_id.clone(),
2491            body: Some(bytes_to_blob(framed_bytes)),
2492            content_length: Some(framed_len),
2493            ..Default::default()
2494        };
2495        let part_req = S3Request {
2496            input: part_input,
2497            method: http::Method::PUT,
2498            uri: req.uri.clone(),
2499            headers: req.headers.clone(),
2500            extensions: http::Extensions::new(),
2501            credentials: req.credentials.clone(),
2502            region: req.region.clone(),
2503            service: req.service.clone(),
2504            trailing_headers: None,
2505        };
2506        let upload_resp = self.backend.upload_part(part_req).await?;
2507
2508        let copy_output = UploadPartCopyOutput {
2509            copy_part_result: Some(CopyPartResult {
2510                e_tag: upload_resp.output.e_tag.clone(),
2511                ..Default::default()
2512            }),
2513            ..Default::default()
2514        };
2515        Ok(S3Response::new(copy_output))
2516    }
2517
2518    // ---- Object lock / retention / legal hold (v0.5 #30) ----
2519    //
2520    // When an `ObjectLockManager` is attached the configuration / per-object
2521    // state lives in the manager and these handlers serve directly from it;
2522    // when no manager is attached they fall back to the backend (legacy
2523    // passthrough so v0.4 deployments are unaffected).
2524    async fn get_object_lock_configuration(
2525        &self,
2526        req: S3Request<GetObjectLockConfigurationInput>,
2527    ) -> S3Result<S3Response<GetObjectLockConfigurationOutput>> {
2528        if let Some(mgr) = self.object_lock.as_ref() {
2529            let cfg = mgr.bucket_default(&req.input.bucket).map(|d| {
2530                ObjectLockConfiguration {
2531                    object_lock_enabled: Some(ObjectLockEnabled::from_static(
2532                        ObjectLockEnabled::ENABLED,
2533                    )),
2534                    rule: Some(ObjectLockRule {
2535                        default_retention: Some(DefaultRetention {
2536                            days: Some(d.retention_days as i32),
2537                            mode: Some(ObjectLockRetentionMode::from_static(
2538                                match d.mode {
2539                                    crate::object_lock::LockMode::Governance => {
2540                                        ObjectLockRetentionMode::GOVERNANCE
2541                                    }
2542                                    crate::object_lock::LockMode::Compliance => {
2543                                        ObjectLockRetentionMode::COMPLIANCE
2544                                    }
2545                                },
2546                            )),
2547                            years: None,
2548                        }),
2549                    }),
2550                }
2551            });
2552            let output = GetObjectLockConfigurationOutput {
2553                object_lock_configuration: cfg,
2554            };
2555            return Ok(S3Response::new(output));
2556        }
2557        self.backend.get_object_lock_configuration(req).await
2558    }
2559    async fn put_object_lock_configuration(
2560        &self,
2561        req: S3Request<PutObjectLockConfigurationInput>,
2562    ) -> S3Result<S3Response<PutObjectLockConfigurationOutput>> {
2563        if let Some(mgr) = self.object_lock.as_ref() {
2564            let bucket = req.input.bucket.clone();
2565            if let Some(cfg) = req.input.object_lock_configuration.as_ref()
2566                && let Some(rule) = cfg.rule.as_ref()
2567                && let Some(d) = rule.default_retention.as_ref()
2568            {
2569                let mode = d
2570                    .mode
2571                    .as_ref()
2572                    .and_then(|m| crate::object_lock::LockMode::from_aws_str(m.as_str()))
2573                    .ok_or_else(|| {
2574                        S3Error::with_message(
2575                            S3ErrorCode::InvalidRequest,
2576                            "Object Lock default retention requires a valid Mode (GOVERNANCE | COMPLIANCE)",
2577                        )
2578                    })?;
2579                // S3 spec: exactly one of Days / Years (we accept Days
2580                // outright and convert Years → Days for storage; Years
2581                // is just a UX shorthand on the wire).
2582                let days: u32 = match (d.days, d.years) {
2583                    (Some(d), None) if d > 0 => d as u32,
2584                    (None, Some(y)) if y > 0 => (y as u32).saturating_mul(365),
2585                    _ => {
2586                        return Err(S3Error::with_message(
2587                            S3ErrorCode::InvalidRequest,
2588                            "Object Lock default retention requires exactly one of Days or Years (positive integer)",
2589                        ));
2590                    }
2591                };
2592                mgr.set_bucket_default(
2593                    &bucket,
2594                    crate::object_lock::BucketObjectLockDefault {
2595                        mode,
2596                        retention_days: days,
2597                    },
2598                );
2599            }
2600            return Ok(S3Response::new(PutObjectLockConfigurationOutput::default()));
2601        }
2602        self.backend.put_object_lock_configuration(req).await
2603    }
2604    async fn get_object_legal_hold(
2605        &self,
2606        req: S3Request<GetObjectLegalHoldInput>,
2607    ) -> S3Result<S3Response<GetObjectLegalHoldOutput>> {
2608        if let Some(mgr) = self.object_lock.as_ref() {
2609            let on = mgr
2610                .get(&req.input.bucket, &req.input.key)
2611                .map(|s| s.legal_hold_on)
2612                .unwrap_or(false);
2613            let status = ObjectLockLegalHoldStatus::from_static(if on {
2614                ObjectLockLegalHoldStatus::ON
2615            } else {
2616                ObjectLockLegalHoldStatus::OFF
2617            });
2618            let output = GetObjectLegalHoldOutput {
2619                legal_hold: Some(ObjectLockLegalHold {
2620                    status: Some(status),
2621                }),
2622            };
2623            return Ok(S3Response::new(output));
2624        }
2625        self.backend.get_object_legal_hold(req).await
2626    }
2627    async fn put_object_legal_hold(
2628        &self,
2629        req: S3Request<PutObjectLegalHoldInput>,
2630    ) -> S3Result<S3Response<PutObjectLegalHoldOutput>> {
2631        if let Some(mgr) = self.object_lock.as_ref() {
2632            let on = req
2633                .input
2634                .legal_hold
2635                .as_ref()
2636                .and_then(|h| h.status.as_ref())
2637                .map(|s| s.as_str().eq_ignore_ascii_case("ON"))
2638                .unwrap_or(false);
2639            mgr.set_legal_hold(&req.input.bucket, &req.input.key, on);
2640            return Ok(S3Response::new(PutObjectLegalHoldOutput::default()));
2641        }
2642        self.backend.put_object_legal_hold(req).await
2643    }
2644    async fn get_object_retention(
2645        &self,
2646        req: S3Request<GetObjectRetentionInput>,
2647    ) -> S3Result<S3Response<GetObjectRetentionOutput>> {
2648        if let Some(mgr) = self.object_lock.as_ref() {
2649            let retention = mgr
2650                .get(&req.input.bucket, &req.input.key)
2651                .filter(|s| s.mode.is_some() || s.retain_until.is_some())
2652                .map(|s| {
2653                    let mode = s.mode.map(|m| {
2654                        ObjectLockRetentionMode::from_static(match m {
2655                            crate::object_lock::LockMode::Governance => {
2656                                ObjectLockRetentionMode::GOVERNANCE
2657                            }
2658                            crate::object_lock::LockMode::Compliance => {
2659                                ObjectLockRetentionMode::COMPLIANCE
2660                            }
2661                        })
2662                    });
2663                    let until = s.retain_until.map(chrono_utc_to_timestamp);
2664                    ObjectLockRetention {
2665                        mode,
2666                        retain_until_date: until,
2667                    }
2668                });
2669            let output = GetObjectRetentionOutput { retention };
2670            return Ok(S3Response::new(output));
2671        }
2672        self.backend.get_object_retention(req).await
2673    }
2674    async fn put_object_retention(
2675        &self,
2676        req: S3Request<PutObjectRetentionInput>,
2677    ) -> S3Result<S3Response<PutObjectRetentionOutput>> {
2678        if let Some(mgr) = self.object_lock.as_ref() {
2679            let bucket = req.input.bucket.clone();
2680            let key = req.input.key.clone();
2681            let bypass = req.input.bypass_governance_retention.unwrap_or(false);
2682            let retention = req.input.retention.as_ref().ok_or_else(|| {
2683                S3Error::with_message(
2684                    S3ErrorCode::InvalidRequest,
2685                    "PutObjectRetention requires a Retention element",
2686                )
2687            })?;
2688            let new_mode = retention
2689                .mode
2690                .as_ref()
2691                .and_then(|m| crate::object_lock::LockMode::from_aws_str(m.as_str()));
2692            let new_until = retention
2693                .retain_until_date
2694                .as_ref()
2695                .map(timestamp_to_chrono_utc)
2696                .unwrap_or(None);
2697            let now = chrono::Utc::now();
2698            let existing = mgr.get(&bucket, &key).unwrap_or_default();
2699            // S3 immutability rules:
2700            //   - Compliance is one-way: once set, mode cannot move to
2701            //     Governance, and retain-until cannot be shortened.
2702            //   - Governance can be lengthened freely; shortened only
2703            //     with bypass=true.
2704            if let Some(existing_mode) = existing.mode
2705                && existing_mode == crate::object_lock::LockMode::Compliance
2706                && existing.is_locked(now)
2707            {
2708                if matches!(new_mode, Some(crate::object_lock::LockMode::Governance)) {
2709                    return Err(S3Error::with_message(
2710                        S3ErrorCode::AccessDenied,
2711                        "Cannot downgrade Compliance retention to Governance while lock is active",
2712                    ));
2713                }
2714                if let (Some(prev), Some(next)) = (existing.retain_until, new_until)
2715                    && next < prev
2716                {
2717                    return Err(S3Error::with_message(
2718                        S3ErrorCode::AccessDenied,
2719                        "Cannot shorten Compliance retention while lock is active",
2720                    ));
2721                }
2722            }
2723            if let Some(existing_mode) = existing.mode
2724                && existing_mode == crate::object_lock::LockMode::Governance
2725                && existing.is_locked(now)
2726                && !bypass
2727                && let (Some(prev), Some(next)) = (existing.retain_until, new_until)
2728                && next < prev
2729            {
2730                return Err(S3Error::with_message(
2731                    S3ErrorCode::AccessDenied,
2732                    "Shortening Governance retention requires x-amz-bypass-governance-retention: true",
2733                ));
2734            }
2735            let mut state = existing;
2736            if new_mode.is_some() {
2737                state.mode = new_mode;
2738            }
2739            if new_until.is_some() {
2740                state.retain_until = new_until;
2741            }
2742            mgr.set(&bucket, &key, state);
2743            return Ok(S3Response::new(PutObjectRetentionOutput::default()));
2744        }
2745        self.backend.put_object_retention(req).await
2746    }
2747
2748    // ---- Versioning ----
2749    // list_object_versions is implemented above in the compression-hook
2750    // section so it filters S4-internal sidecars (v0.4 #17) AND, when a
2751    // VersioningManager is attached (v0.5 #34), serves chains directly
2752    // from the in-memory index.
2753    async fn get_bucket_versioning(
2754        &self,
2755        req: S3Request<GetBucketVersioningInput>,
2756    ) -> S3Result<S3Response<GetBucketVersioningOutput>> {
2757        // v0.5 #34: when a VersioningManager is attached, the bucket's
2758        // versioning state lives in the manager (= S4-server's
2759        // authoritative source). Pass-through hits the backend only
2760        // when no manager is configured (legacy v0.4 behaviour).
2761        if let Some(mgr) = self.versioning.as_ref() {
2762            let output = match mgr.state(&req.input.bucket).as_aws_status() {
2763                Some(s) => GetBucketVersioningOutput {
2764                    status: Some(BucketVersioningStatus::from(s.to_owned())),
2765                    ..Default::default()
2766                },
2767                None => GetBucketVersioningOutput::default(),
2768            };
2769            return Ok(S3Response::new(output));
2770        }
2771        self.backend.get_bucket_versioning(req).await
2772    }
2773    async fn put_bucket_versioning(
2774        &self,
2775        req: S3Request<PutBucketVersioningInput>,
2776    ) -> S3Result<S3Response<PutBucketVersioningOutput>> {
2777        // v0.5 #34: stash the new state in the manager, then forward to
2778        // the backend so any downstream that *also* tracks state
2779        // (e.g. a real S3 backend) stays in sync. Manager-attached but
2780        // backend rejection is treated as a soft-fail (state is still
2781        // owned by the manager).
2782        if let Some(mgr) = self.versioning.as_ref() {
2783            let new_state = match req
2784                .input
2785                .versioning_configuration
2786                .status
2787                .as_ref()
2788                .map(|s| s.as_str())
2789            {
2790                Some(s) if s.eq_ignore_ascii_case("Enabled") => {
2791                    crate::versioning::VersioningState::Enabled
2792                }
2793                Some(s) if s.eq_ignore_ascii_case("Suspended") => {
2794                    crate::versioning::VersioningState::Suspended
2795                }
2796                _ => crate::versioning::VersioningState::Unversioned,
2797            };
2798            mgr.set_state(&req.input.bucket, new_state);
2799            return Ok(S3Response::new(PutBucketVersioningOutput::default()));
2800        }
2801        self.backend.put_bucket_versioning(req).await
2802    }
2803
2804    // ---- Bucket location ----
2805    async fn get_bucket_location(
2806        &self,
2807        req: S3Request<GetBucketLocationInput>,
2808    ) -> S3Result<S3Response<GetBucketLocationOutput>> {
2809        self.backend.get_bucket_location(req).await
2810    }
2811
2812    // ---- Bucket policy ----
2813    async fn get_bucket_policy(
2814        &self,
2815        req: S3Request<GetBucketPolicyInput>,
2816    ) -> S3Result<S3Response<GetBucketPolicyOutput>> {
2817        self.backend.get_bucket_policy(req).await
2818    }
2819    async fn put_bucket_policy(
2820        &self,
2821        req: S3Request<PutBucketPolicyInput>,
2822    ) -> S3Result<S3Response<PutBucketPolicyOutput>> {
2823        self.backend.put_bucket_policy(req).await
2824    }
2825    async fn delete_bucket_policy(
2826        &self,
2827        req: S3Request<DeleteBucketPolicyInput>,
2828    ) -> S3Result<S3Response<DeleteBucketPolicyOutput>> {
2829        self.backend.delete_bucket_policy(req).await
2830    }
2831    async fn get_bucket_policy_status(
2832        &self,
2833        req: S3Request<GetBucketPolicyStatusInput>,
2834    ) -> S3Result<S3Response<GetBucketPolicyStatusOutput>> {
2835        self.backend.get_bucket_policy_status(req).await
2836    }
2837
2838    // ---- Bucket ACL ----
2839    async fn get_bucket_acl(
2840        &self,
2841        req: S3Request<GetBucketAclInput>,
2842    ) -> S3Result<S3Response<GetBucketAclOutput>> {
2843        self.backend.get_bucket_acl(req).await
2844    }
2845    async fn put_bucket_acl(
2846        &self,
2847        req: S3Request<PutBucketAclInput>,
2848    ) -> S3Result<S3Response<PutBucketAclOutput>> {
2849        self.backend.put_bucket_acl(req).await
2850    }
2851
2852    // ---- Bucket CORS ----
2853    async fn get_bucket_cors(
2854        &self,
2855        req: S3Request<GetBucketCorsInput>,
2856    ) -> S3Result<S3Response<GetBucketCorsOutput>> {
2857        self.backend.get_bucket_cors(req).await
2858    }
2859    async fn put_bucket_cors(
2860        &self,
2861        req: S3Request<PutBucketCorsInput>,
2862    ) -> S3Result<S3Response<PutBucketCorsOutput>> {
2863        self.backend.put_bucket_cors(req).await
2864    }
2865    async fn delete_bucket_cors(
2866        &self,
2867        req: S3Request<DeleteBucketCorsInput>,
2868    ) -> S3Result<S3Response<DeleteBucketCorsOutput>> {
2869        self.backend.delete_bucket_cors(req).await
2870    }
2871
2872    // ---- Bucket lifecycle ----
2873    async fn get_bucket_lifecycle_configuration(
2874        &self,
2875        req: S3Request<GetBucketLifecycleConfigurationInput>,
2876    ) -> S3Result<S3Response<GetBucketLifecycleConfigurationOutput>> {
2877        self.backend.get_bucket_lifecycle_configuration(req).await
2878    }
2879    async fn put_bucket_lifecycle_configuration(
2880        &self,
2881        req: S3Request<PutBucketLifecycleConfigurationInput>,
2882    ) -> S3Result<S3Response<PutBucketLifecycleConfigurationOutput>> {
2883        self.backend.put_bucket_lifecycle_configuration(req).await
2884    }
2885    async fn delete_bucket_lifecycle(
2886        &self,
2887        req: S3Request<DeleteBucketLifecycleInput>,
2888    ) -> S3Result<S3Response<DeleteBucketLifecycleOutput>> {
2889        self.backend.delete_bucket_lifecycle(req).await
2890    }
2891
2892    // ---- Bucket tagging ----
2893    async fn get_bucket_tagging(
2894        &self,
2895        req: S3Request<GetBucketTaggingInput>,
2896    ) -> S3Result<S3Response<GetBucketTaggingOutput>> {
2897        self.backend.get_bucket_tagging(req).await
2898    }
2899    async fn put_bucket_tagging(
2900        &self,
2901        req: S3Request<PutBucketTaggingInput>,
2902    ) -> S3Result<S3Response<PutBucketTaggingOutput>> {
2903        self.backend.put_bucket_tagging(req).await
2904    }
2905    async fn delete_bucket_tagging(
2906        &self,
2907        req: S3Request<DeleteBucketTaggingInput>,
2908    ) -> S3Result<S3Response<DeleteBucketTaggingOutput>> {
2909        self.backend.delete_bucket_tagging(req).await
2910    }
2911
2912    // ---- Bucket encryption ----
2913    async fn get_bucket_encryption(
2914        &self,
2915        req: S3Request<GetBucketEncryptionInput>,
2916    ) -> S3Result<S3Response<GetBucketEncryptionOutput>> {
2917        self.backend.get_bucket_encryption(req).await
2918    }
2919    async fn put_bucket_encryption(
2920        &self,
2921        req: S3Request<PutBucketEncryptionInput>,
2922    ) -> S3Result<S3Response<PutBucketEncryptionOutput>> {
2923        self.backend.put_bucket_encryption(req).await
2924    }
2925    async fn delete_bucket_encryption(
2926        &self,
2927        req: S3Request<DeleteBucketEncryptionInput>,
2928    ) -> S3Result<S3Response<DeleteBucketEncryptionOutput>> {
2929        self.backend.delete_bucket_encryption(req).await
2930    }
2931
2932    // ---- Bucket logging ----
2933    async fn get_bucket_logging(
2934        &self,
2935        req: S3Request<GetBucketLoggingInput>,
2936    ) -> S3Result<S3Response<GetBucketLoggingOutput>> {
2937        self.backend.get_bucket_logging(req).await
2938    }
2939    async fn put_bucket_logging(
2940        &self,
2941        req: S3Request<PutBucketLoggingInput>,
2942    ) -> S3Result<S3Response<PutBucketLoggingOutput>> {
2943        self.backend.put_bucket_logging(req).await
2944    }
2945
2946    // ---- Bucket notification ----
2947    async fn get_bucket_notification_configuration(
2948        &self,
2949        req: S3Request<GetBucketNotificationConfigurationInput>,
2950    ) -> S3Result<S3Response<GetBucketNotificationConfigurationOutput>> {
2951        self.backend
2952            .get_bucket_notification_configuration(req)
2953            .await
2954    }
2955    async fn put_bucket_notification_configuration(
2956        &self,
2957        req: S3Request<PutBucketNotificationConfigurationInput>,
2958    ) -> S3Result<S3Response<PutBucketNotificationConfigurationOutput>> {
2959        self.backend
2960            .put_bucket_notification_configuration(req)
2961            .await
2962    }
2963
2964    // ---- Bucket request payment ----
2965    async fn get_bucket_request_payment(
2966        &self,
2967        req: S3Request<GetBucketRequestPaymentInput>,
2968    ) -> S3Result<S3Response<GetBucketRequestPaymentOutput>> {
2969        self.backend.get_bucket_request_payment(req).await
2970    }
2971    async fn put_bucket_request_payment(
2972        &self,
2973        req: S3Request<PutBucketRequestPaymentInput>,
2974    ) -> S3Result<S3Response<PutBucketRequestPaymentOutput>> {
2975        self.backend.put_bucket_request_payment(req).await
2976    }
2977
2978    // ---- Bucket website ----
2979    async fn get_bucket_website(
2980        &self,
2981        req: S3Request<GetBucketWebsiteInput>,
2982    ) -> S3Result<S3Response<GetBucketWebsiteOutput>> {
2983        self.backend.get_bucket_website(req).await
2984    }
2985    async fn put_bucket_website(
2986        &self,
2987        req: S3Request<PutBucketWebsiteInput>,
2988    ) -> S3Result<S3Response<PutBucketWebsiteOutput>> {
2989        self.backend.put_bucket_website(req).await
2990    }
2991    async fn delete_bucket_website(
2992        &self,
2993        req: S3Request<DeleteBucketWebsiteInput>,
2994    ) -> S3Result<S3Response<DeleteBucketWebsiteOutput>> {
2995        self.backend.delete_bucket_website(req).await
2996    }
2997
2998    // ---- Bucket replication ----
2999    async fn get_bucket_replication(
3000        &self,
3001        req: S3Request<GetBucketReplicationInput>,
3002    ) -> S3Result<S3Response<GetBucketReplicationOutput>> {
3003        self.backend.get_bucket_replication(req).await
3004    }
3005    async fn put_bucket_replication(
3006        &self,
3007        req: S3Request<PutBucketReplicationInput>,
3008    ) -> S3Result<S3Response<PutBucketReplicationOutput>> {
3009        self.backend.put_bucket_replication(req).await
3010    }
3011    async fn delete_bucket_replication(
3012        &self,
3013        req: S3Request<DeleteBucketReplicationInput>,
3014    ) -> S3Result<S3Response<DeleteBucketReplicationOutput>> {
3015        self.backend.delete_bucket_replication(req).await
3016    }
3017
3018    // ---- Bucket accelerate ----
3019    async fn get_bucket_accelerate_configuration(
3020        &self,
3021        req: S3Request<GetBucketAccelerateConfigurationInput>,
3022    ) -> S3Result<S3Response<GetBucketAccelerateConfigurationOutput>> {
3023        self.backend.get_bucket_accelerate_configuration(req).await
3024    }
3025    async fn put_bucket_accelerate_configuration(
3026        &self,
3027        req: S3Request<PutBucketAccelerateConfigurationInput>,
3028    ) -> S3Result<S3Response<PutBucketAccelerateConfigurationOutput>> {
3029        self.backend.put_bucket_accelerate_configuration(req).await
3030    }
3031
3032    // ---- Bucket ownership controls ----
3033    async fn get_bucket_ownership_controls(
3034        &self,
3035        req: S3Request<GetBucketOwnershipControlsInput>,
3036    ) -> S3Result<S3Response<GetBucketOwnershipControlsOutput>> {
3037        self.backend.get_bucket_ownership_controls(req).await
3038    }
3039    async fn put_bucket_ownership_controls(
3040        &self,
3041        req: S3Request<PutBucketOwnershipControlsInput>,
3042    ) -> S3Result<S3Response<PutBucketOwnershipControlsOutput>> {
3043        self.backend.put_bucket_ownership_controls(req).await
3044    }
3045    async fn delete_bucket_ownership_controls(
3046        &self,
3047        req: S3Request<DeleteBucketOwnershipControlsInput>,
3048    ) -> S3Result<S3Response<DeleteBucketOwnershipControlsOutput>> {
3049        self.backend.delete_bucket_ownership_controls(req).await
3050    }
3051
3052    // ---- Public access block ----
3053    async fn get_public_access_block(
3054        &self,
3055        req: S3Request<GetPublicAccessBlockInput>,
3056    ) -> S3Result<S3Response<GetPublicAccessBlockOutput>> {
3057        self.backend.get_public_access_block(req).await
3058    }
3059    async fn put_public_access_block(
3060        &self,
3061        req: S3Request<PutPublicAccessBlockInput>,
3062    ) -> S3Result<S3Response<PutPublicAccessBlockOutput>> {
3063        self.backend.put_public_access_block(req).await
3064    }
3065    async fn delete_public_access_block(
3066        &self,
3067        req: S3Request<DeletePublicAccessBlockInput>,
3068    ) -> S3Result<S3Response<DeletePublicAccessBlockOutput>> {
3069        self.backend.delete_public_access_block(req).await
3070    }
3071}
3072
3073// ---------------------------------------------------------------------------
3074// v0.5 #33: SigV4a (asymmetric ECDSA-P256) integration hook.
3075//
3076// Kept as a self-contained block at the bottom of the file so it doesn't
3077// touch the existing `S4Service` struct, `new()`, or any of the per-op
3078// handlers above. The hook is wired in by the binary at server-build time
3079// as a hyper middleware layer (see `main.rs`), NOT inside `S4Service`.
3080//
3081// Lifecycle:
3082//   1. `SigV4aGate::new(store)` is constructed once at boot from the
3083//      operator-supplied credential directory.
3084//   2. For each incoming request, `SigV4aGate::pre_route(&req,
3085//      &requested_region, &canonical_request_bytes)` is invoked BEFORE
3086//      the request hits the S3 framework. If the request claims SigV4a
3087//      and verifies, control returns to the framework. Otherwise a 403
3088//      `SignatureDoesNotMatch` is produced.
3089//   3. Plain SigV4 (HMAC-SHA256) requests pass through untouched.
3090// ---------------------------------------------------------------------------
3091
3092/// Gate that fronts the S3 service path with SigV4a verification (v0.5 #33).
3093///
3094/// Wraps a [`crate::sigv4a::SigV4aCredentialStore`] and exposes a single
3095/// `pre_route` entry point that returns `Ok(())` for both
3096/// "request is plain SigV4 — pass through" and "request is SigV4a and
3097/// verified", and an `Err(...)` containing a 403-equivalent diagnostic
3098/// otherwise. Cheap to clone (the inner store is `Arc`-backed).
3099#[derive(Debug, Clone)]
3100pub struct SigV4aGate {
3101    store: crate::sigv4a::SharedSigV4aCredentialStore,
3102}
3103
3104impl SigV4aGate {
3105    #[must_use]
3106    pub fn new(store: crate::sigv4a::SharedSigV4aCredentialStore) -> Self {
3107        Self { store }
3108    }
3109
3110    /// Inspect an incoming HTTP request. Behaviour:
3111    ///
3112    /// - Not SigV4a (no `X-Amz-Region-Set` and no SigV4a `Authorization`
3113    ///   prefix) → returns `Ok(())`; the framework's existing SigV4
3114    ///   path handles the request.
3115    /// - SigV4a + valid signature + region match → `Ok(())`.
3116    /// - SigV4a + unknown access-key-id → `Err` with `InvalidAccessKeyId`.
3117    /// - SigV4a + bad signature / region mismatch → `Err` with
3118    ///   `SignatureDoesNotMatch`.
3119    ///
3120    /// `canonical_request_bytes` is the SigV4a string-to-sign (or
3121    /// canonical-request bytes; the caller decides) that the framework
3122    /// has already produced for this request. Keeping it as a parameter
3123    /// instead of rebuilding it inside the hook avoids duplicating the
3124    /// canonicalisation logic.
3125    pub fn pre_route<B>(
3126        &self,
3127        req: &http::Request<B>,
3128        requested_region: &str,
3129        canonical_request_bytes: &[u8],
3130    ) -> Result<(), SigV4aGateError> {
3131        if !crate::sigv4a::detect(req) {
3132            return Ok(());
3133        }
3134        let auth_hdr = req
3135            .headers()
3136            .get(http::header::AUTHORIZATION)
3137            .and_then(|v| v.to_str().ok())
3138            .ok_or(SigV4aGateError::MissingAuthorization)?;
3139        let parsed = crate::sigv4a::parse_authorization_header(auth_hdr)
3140            .ok_or(SigV4aGateError::MalformedAuthorization)?;
3141        let region_set = req
3142            .headers()
3143            .get(crate::sigv4a::REGION_SET_HEADER)
3144            .and_then(|v| v.to_str().ok())
3145            .unwrap_or("*");
3146        let key = self
3147            .store
3148            .get(&parsed.access_key_id)
3149            .ok_or_else(|| SigV4aGateError::UnknownAccessKey(parsed.access_key_id.clone()))?;
3150        crate::sigv4a::verify(
3151            &crate::sigv4a::CanonicalRequest::new(canonical_request_bytes),
3152            &parsed.signature_der,
3153            key,
3154            region_set,
3155            requested_region,
3156        )
3157        .map_err(SigV4aGateError::Verify)?;
3158        Ok(())
3159    }
3160}
3161
3162/// Failure modes from [`SigV4aGate::pre_route`]. All variants map to
3163/// HTTP 403 with one of the two AWS-standard error codes
3164/// (`InvalidAccessKeyId` or `SignatureDoesNotMatch`).
3165#[derive(Debug, thiserror::Error)]
3166pub enum SigV4aGateError {
3167    #[error("missing Authorization header")]
3168    MissingAuthorization,
3169    #[error("malformed SigV4a Authorization header")]
3170    MalformedAuthorization,
3171    #[error("unknown SigV4a access-key-id: {0}")]
3172    UnknownAccessKey(String),
3173    #[error("SigV4a verification failed: {0}")]
3174    Verify(#[source] crate::sigv4a::SigV4aError),
3175}
3176
3177impl SigV4aGateError {
3178    /// AWS S3 error code that should accompany a 403 response.
3179    #[must_use]
3180    pub fn s3_error_code(&self) -> &'static str {
3181        match self {
3182            Self::UnknownAccessKey(_) => "InvalidAccessKeyId",
3183            _ => "SignatureDoesNotMatch",
3184        }
3185    }
3186}
3187
3188#[cfg(test)]
3189mod tests {
3190    use super::*;
3191
3192    #[test]
3193    fn manifest_roundtrip_via_metadata() {
3194        let original = ChunkManifest {
3195            codec: CodecKind::CpuZstd,
3196            original_size: 1234,
3197            compressed_size: 567,
3198            crc32c: 0xdead_beef,
3199        };
3200        let mut meta: Option<Metadata> = None;
3201        write_manifest(&mut meta, &original);
3202        let extracted = extract_manifest(&meta).expect("manifest must round-trip");
3203        assert_eq!(extracted.codec, original.codec);
3204        assert_eq!(extracted.original_size, original.original_size);
3205        assert_eq!(extracted.compressed_size, original.compressed_size);
3206        assert_eq!(extracted.crc32c, original.crc32c);
3207    }
3208
3209    #[test]
3210    fn missing_metadata_yields_none() {
3211        let meta: Option<Metadata> = None;
3212        assert!(extract_manifest(&meta).is_none());
3213    }
3214
3215    #[test]
3216    fn partial_metadata_yields_none() {
3217        let mut meta = Metadata::new();
3218        meta.insert(META_CODEC.into(), "cpu-zstd".into());
3219        let opt = Some(meta);
3220        assert!(extract_manifest(&opt).is_none());
3221    }
3222
3223    #[test]
3224    fn parse_copy_source_range_basic() {
3225        let r = parse_copy_source_range("bytes=10-20").unwrap();
3226        match r {
3227            s3s::dto::Range::Int { first, last } => {
3228                assert_eq!(first, 10);
3229                assert_eq!(last, Some(20));
3230            }
3231            _ => panic!("expected Int range"),
3232        }
3233    }
3234
3235    #[test]
3236    fn parse_copy_source_range_rejects_inverted() {
3237        let err = parse_copy_source_range("bytes=20-10").unwrap_err();
3238        assert!(err.contains("last < first"));
3239    }
3240
3241    #[test]
3242    fn parse_copy_source_range_rejects_missing_prefix() {
3243        let err = parse_copy_source_range("10-20").unwrap_err();
3244        assert!(err.contains("must start with 'bytes='"));
3245    }
3246
3247    #[test]
3248    fn parse_copy_source_range_rejects_open_ended() {
3249        // S3 upload_part_copy spec requires N-M (closed); suffix and
3250        // open-ended forms are not allowed for this header.
3251        assert!(parse_copy_source_range("bytes=10-").is_err());
3252        assert!(parse_copy_source_range("bytes=-10").is_err());
3253    }
3254}