Skip to main content

s4_server/
service.rs

1//! `s3s::S3` 実装 — `s3s_aws::Proxy` への delegation を default にしつつ、
2//! `put_object` / `get_object` 経路で `s4_codec::CodecRegistry` を呼ぶ。
3//!
4//! ## カバー範囲 (Phase 1 月 2)
5//!
6//! - 圧縮 hook あり: `put_object`, `get_object`
7//! - 純 delegation (圧縮なし): `head_bucket`, `list_buckets`, `create_bucket`, `delete_bucket`,
8//!   `head_object`, `delete_object`, `delete_objects`, `copy_object`, `list_objects`,
9//!   `list_objects_v2`, `create_multipart_upload`, `upload_part`,
10//!   `complete_multipart_upload`, `abort_multipart_upload`, `list_multipart_uploads`,
11//!   `list_parts`
12//! - 未対応 (デフォルトで NotImplemented): その他 80+ ops (Tagging / ACL / Lifecycle 等は Phase 2)
13//!
14//! ## アーキテクチャ
15//!
16//! - `S4Service<B>` は backend (B: S3) と `Arc<CodecRegistry>` と `Arc<dyn CodecDispatcher>`
17//!   を保持する。`CodecRegistry` 経由で複数 codec を抱えられるので、ひとつの S4 インスタンスが
18//!   複数 codec で書かれた object を透過的に GET できる
19//! - PUT: dispatcher が body の先頭 sample から codec を選び、registry で compress、
20//!   manifest を S3 metadata に書いて backend に forward
21//! - GET: backend から取得 → metadata から manifest を復元 → registry.decompress で
22//!   manifest 指定の codec で解凍 → 元の bytes を return
23//!
24//! ## 既知の制限事項
25//!
26//! - **Multipart Upload は per-part 圧縮が未実装**: 現状は upload_part を素通し。
27//!   Phase 1 月 2 後半で per-part compress + complete_multipart_upload で manifest 集約。
28//! - **PUT body は memory に collect**: max_body_bytes 上限あり (default 5 GiB = S3 単発 PUT 上限)。
29//!   Streaming-aware 圧縮は Phase 2。
30
31use std::sync::Arc;
32
33use bytes::BytesMut;
34use s3s::dto::*;
35use s3s::{S3, S3Error, S3ErrorCode, S3Request, S3Response, S3Result};
36use s4_codec::index::{FrameIndex, build_index_from_body, decode_index, encode_index, sidecar_key};
37use s4_codec::multipart::{
38    FRAME_HEADER_BYTES, FrameHeader, FrameIter, S3_MULTIPART_MIN_PART_BYTES, pad_to_minimum,
39    write_frame,
40};
41use s4_codec::{ChunkManifest, CodecDispatcher, CodecKind, CodecRegistry};
42use std::time::Instant;
43use tracing::{debug, info};
44
45use crate::blob::{
46    bytes_to_blob, chain_sample_with_rest, collect_blob, collect_with_sample, peek_sample,
47};
48use crate::streaming::{
49    cpu_zstd_decompress_stream, pick_chunk_size, streaming_compress_to_frames,
50    supports_streaming_compress, supports_streaming_decompress,
51};
52
53/// PUT body の先頭 sampling で渡す最大 byte 数。
54const SAMPLE_BYTES: usize = 4096;
55
56/// v0.4 #20: captured at the start of a handler, before the request is
57/// consumed by the backend call, so the matching `record_access` at
58/// end-of-request can fill in the structured access log entry.
59struct AccessLogPreamble {
60    remote_ip: Option<String>,
61    requester: Option<String>,
62    request_uri: String,
63    user_agent: Option<String>,
64}
65
66pub struct S4Service<B: S3> {
67    backend: B,
68    registry: Arc<CodecRegistry>,
69    dispatcher: Arc<dyn CodecDispatcher>,
70    max_body_bytes: usize,
71    policy: Option<crate::policy::SharedPolicy>,
72    /// v0.3 #13: surfaced as the `aws:SecureTransport` Condition key. Set
73    /// to `true` when the listener is wrapped in TLS (or ACME), so policies
74    /// gating "deny if not over TLS" can do their job. Defaults to `false`
75    /// (HTTP); set via [`S4Service::with_secure_transport`] at boot.
76    secure_transport: bool,
77    /// v0.4 #19: optional per-(principal, bucket) token-bucket limiter.
78    rate_limits: Option<crate::rate_limit::SharedRateLimits>,
79    /// v0.4 #20: optional S3-style access log emitter.
80    access_log: Option<crate::access_log::SharedAccessLog>,
81    /// v0.4 #21: optional server-side encryption key (AES-256-GCM).
82    /// When set, every PUT body gets wrapped in S4E1 after the compress
83    /// and framing steps; every GET that smells like S4E1 gets
84    /// decrypted before frame parsing.
85    sse_key: Option<crate::sse::SharedSseKey>,
86}
87
88impl<B: S3> S4Service<B> {
89    /// AWS S3 単発 PUT の API 上限 (5 GiB)
90    pub const DEFAULT_MAX_BODY_BYTES: usize = 5 * 1024 * 1024 * 1024;
91
92    pub fn new(
93        backend: B,
94        registry: Arc<CodecRegistry>,
95        dispatcher: Arc<dyn CodecDispatcher>,
96    ) -> Self {
97        Self {
98            backend,
99            registry,
100            dispatcher,
101            max_body_bytes: Self::DEFAULT_MAX_BODY_BYTES,
102            policy: None,
103            secure_transport: false,
104            rate_limits: None,
105            access_log: None,
106            sse_key: None,
107        }
108    }
109
110    /// v0.4 #21: attach a server-side encryption key. When set, S4 wraps
111    /// every PUT body with AES-256-GCM (S4E1 wire format) AFTER
112    /// compression + framing, and unwraps it on GET before frame parse.
113    /// Encrypt-after-compress preserves the codec ratio (encrypted bytes
114    /// don't compress).
115    #[must_use]
116    pub fn with_sse_key(mut self, key: crate::sse::SharedSseKey) -> Self {
117        self.sse_key = Some(key);
118        self
119    }
120
121    /// v0.4 #20: attach an S3-style access-log emitter. Each completed
122    /// PUT / GET / DELETE / List handler emits one entry into the
123    /// emitter's buffer; a background flusher (started separately, see
124    /// [`crate::access_log::AccessLog::spawn_flusher`]) writes hourly
125    /// rotated `.log` files into the configured directory.
126    #[must_use]
127    pub fn with_access_log(mut self, log: crate::access_log::SharedAccessLog) -> Self {
128        self.access_log = Some(log);
129        self
130    }
131
132    /// Capture the per-request access-log preamble before the request is
133    /// consumed by the backend call. Returns `None` if no access logger
134    /// is configured (cheap early-out so the handler doesn't pay the
135    /// header-clone cost when access logging is off).
136    fn access_log_preamble<I>(&self, req: &S3Request<I>) -> Option<AccessLogPreamble> {
137        self.access_log.as_ref()?;
138        Some(AccessLogPreamble {
139            remote_ip: req
140                .headers
141                .get("x-forwarded-for")
142                .and_then(|v| v.to_str().ok())
143                .and_then(|raw| raw.split(',').next())
144                .map(|s| s.trim().to_owned()),
145            requester: Self::principal_of(req).map(str::to_owned),
146            request_uri: format!("{} {}", req.method, req.uri.path()),
147            user_agent: req
148                .headers
149                .get("user-agent")
150                .and_then(|v| v.to_str().ok())
151                .map(str::to_owned),
152        })
153    }
154
155    /// Internal — called by handlers at end-of-request with a captured
156    /// preamble. Best-effort: swallows the await fast (clones Arc +
157    /// pushes), no error propagation back to the request path.
158    #[allow(clippy::too_many_arguments)]
159    async fn record_access(
160        &self,
161        preamble: Option<AccessLogPreamble>,
162        operation: &'static str,
163        bucket: &str,
164        key: Option<&str>,
165        http_status: u16,
166        bytes_sent: u64,
167        object_size: u64,
168        total_time_ms: u64,
169        error_code: Option<&str>,
170    ) {
171        let (Some(log), Some(p)) = (self.access_log.as_ref(), preamble) else {
172            return;
173        };
174        log.record(crate::access_log::AccessLogEntry {
175            time: std::time::SystemTime::now(),
176            bucket: bucket.to_owned(),
177            remote_ip: p.remote_ip,
178            requester: p.requester,
179            operation,
180            key: key.map(str::to_owned),
181            request_uri: p.request_uri,
182            http_status,
183            error_code: error_code.map(str::to_owned),
184            bytes_sent,
185            object_size,
186            total_time_ms,
187            user_agent: p.user_agent,
188        })
189        .await;
190    }
191
192    /// v0.4 #19: attach a per-(principal, bucket) token-bucket rate limiter.
193    /// When set, every PUT / GET / DELETE / List / Copy / multipart op is
194    /// throttle-checked before the policy gate; throttled requests return
195    /// `S3ErrorCode::SlowDown` (HTTP 503) and bump
196    /// `s4_rate_limit_throttled_total{principal,bucket}`.
197    #[must_use]
198    pub fn with_rate_limits(mut self, rl: crate::rate_limit::SharedRateLimits) -> Self {
199        self.rate_limits = Some(rl);
200        self
201    }
202
203    /// Helper used by request handlers to apply the rate limit. Returns
204    /// `Ok(())` when allowed (or no rate limiter is configured), or a
205    /// `SlowDown` S3Error otherwise.
206    fn enforce_rate_limit<I>(&self, req: &S3Request<I>, bucket: &str) -> S3Result<()> {
207        let Some(rl) = self.rate_limits.as_ref() else {
208            return Ok(());
209        };
210        let principal_id = Self::principal_of(req);
211        if !rl.check(principal_id, bucket) {
212            crate::metrics::record_rate_limit_throttle(principal_id.unwrap_or("-"), bucket);
213            return Err(S3Error::with_message(
214                S3ErrorCode::SlowDown,
215                format!("rate-limited: bucket={bucket}"),
216            ));
217        }
218        Ok(())
219    }
220
221    /// Tell the policy evaluator that the listener is reached over TLS
222    /// (or ACME). When `true`, the `aws:SecureTransport` Condition key
223    /// resolves to `true`. Defaults to `false`.
224    #[must_use]
225    pub fn with_secure_transport(mut self, on: bool) -> Self {
226        self.secure_transport = on;
227        self
228    }
229
230    #[must_use]
231    pub fn with_max_body_bytes(mut self, n: usize) -> Self {
232        self.max_body_bytes = n;
233        self
234    }
235
236    /// Attach an optional bucket policy (v0.2 #7). When `Some(...)`, every
237    /// PUT / GET / DELETE / List handler runs `policy.evaluate(...)` before
238    /// delegating to the backend; failures return `S3ErrorCode::AccessDenied`.
239    /// When `None` (the default), no policy enforcement happens.
240    #[must_use]
241    pub fn with_policy(mut self, policy: crate::policy::SharedPolicy) -> Self {
242        self.policy = Some(policy);
243        self
244    }
245
246    /// Pull the SigV4 access key id off the request's credentials, if any.
247    /// Used as the `principal_id` for policy evaluation.
248    fn principal_of<I>(req: &S3Request<I>) -> Option<&str> {
249        req.credentials.as_ref().map(|c| c.access_key.as_str())
250    }
251
252    /// v0.3 #13: build the per-request policy context from the incoming
253    /// `S3Request`. Pulls `aws:UserAgent` from the User-Agent header,
254    /// `aws:SourceIp` from the standard `X-Forwarded-For` header (most
255    /// production deployments are behind an LB / reverse proxy that sets
256    /// this), `aws:CurrentTime` from the system clock, and
257    /// `aws:SecureTransport` from the per-listener TLS flag.
258    fn request_context<I>(&self, req: &S3Request<I>) -> crate::policy::RequestContext {
259        let user_agent = req
260            .headers
261            .get("user-agent")
262            .and_then(|v| v.to_str().ok())
263            .map(str::to_owned);
264        // X-Forwarded-For is `client, proxy1, proxy2`; the leftmost entry
265        // is the original client. Trim and parse leniently.
266        let source_ip = req
267            .headers
268            .get("x-forwarded-for")
269            .and_then(|v| v.to_str().ok())
270            .and_then(|raw| raw.split(',').next())
271            .and_then(|s| s.trim().parse().ok());
272        crate::policy::RequestContext {
273            source_ip,
274            user_agent,
275            request_time: Some(std::time::SystemTime::now()),
276            secure_transport: self.secure_transport,
277            extra: Default::default(),
278        }
279    }
280
281    /// Helper used by request handlers to enforce the optional policy.
282    /// Returns `Ok(())` when allowed (or no policy is configured), or an
283    /// `AccessDenied` S3Error otherwise. Bumps the policy denial Prometheus
284    /// counter on deny.
285    fn enforce_policy<I>(
286        &self,
287        req: &S3Request<I>,
288        action: &'static str,
289        bucket: &str,
290        key: Option<&str>,
291    ) -> S3Result<()> {
292        let Some(policy) = self.policy.as_ref() else {
293            return Ok(());
294        };
295        let principal_id = Self::principal_of(req);
296        let ctx = self.request_context(req);
297        let decision = policy.evaluate_with(action, bucket, key, principal_id, &ctx);
298        if decision.allow {
299            Ok(())
300        } else {
301            crate::metrics::record_policy_denial(action, bucket);
302            tracing::info!(
303                action,
304                bucket,
305                key = ?key,
306                principal = ?principal_id,
307                source_ip = ?ctx.source_ip,
308                user_agent = ?ctx.user_agent,
309                secure_transport = ctx.secure_transport,
310                matched_sid = ?decision.matched_sid,
311                effect = ?decision.matched_effect,
312                "S4 policy denied request"
313            );
314            Err(S3Error::with_message(
315                S3ErrorCode::AccessDenied,
316                format!("denied by S4 policy: {action} on bucket={bucket}"),
317            ))
318        }
319    }
320
321    /// テスト用: backend を取り戻す (test helper、production では使わない)
322    pub fn into_backend(self) -> B {
323        self.backend
324    }
325
326    /// 必要 frame だけを backend に Range GET し、frame parse + decompress + slice
327    /// した結果を返す sidecar fast path。Range request の **帯域節約版**。
328    async fn partial_range_get(
329        &self,
330        req: &S3Request<GetObjectInput>,
331        plan: s4_codec::index::RangePlan,
332        client_start: u64,
333        client_end_exclusive: u64,
334        total_original: u64,
335        get_start: Instant,
336    ) -> S3Result<S3Response<GetObjectOutput>> {
337        // 必要 byte 範囲だけを backend に partial GET
338        let backend_range = s3s::dto::Range::Int {
339            first: plan.byte_start,
340            last: Some(plan.byte_end_exclusive - 1),
341        };
342        let backend_input = GetObjectInput {
343            bucket: req.input.bucket.clone(),
344            key: req.input.key.clone(),
345            range: Some(backend_range),
346            ..Default::default()
347        };
348        let backend_req = S3Request {
349            input: backend_input,
350            method: req.method.clone(),
351            uri: req.uri.clone(),
352            headers: req.headers.clone(),
353            extensions: http::Extensions::new(),
354            credentials: req.credentials.clone(),
355            region: req.region.clone(),
356            service: req.service.clone(),
357            trailing_headers: None,
358        };
359        let mut backend_resp = self.backend.get_object(backend_req).await?;
360        let blob = backend_resp.output.body.take().ok_or_else(|| {
361            S3Error::with_message(
362                S3ErrorCode::InternalError,
363                "backend partial GET returned empty body",
364            )
365        })?;
366        let bytes = collect_blob(blob, self.max_body_bytes)
367            .await
368            .map_err(internal("collect partial body"))?;
369
370        // frame parse + decompress
371        let mut combined = BytesMut::new();
372        for frame in FrameIter::new(bytes) {
373            let (header, payload) = frame.map_err(|e| {
374                S3Error::with_message(
375                    S3ErrorCode::InternalError,
376                    format!("partial-range frame parse: {e}"),
377                )
378            })?;
379            let chunk_manifest = ChunkManifest {
380                codec: header.codec,
381                original_size: header.original_size,
382                compressed_size: header.compressed_size,
383                crc32c: header.crc32c,
384            };
385            let decompressed = self
386                .registry
387                .decompress(payload, &chunk_manifest)
388                .await
389                .map_err(internal("partial-range decompress"))?;
390            combined.extend_from_slice(&decompressed);
391        }
392        let combined = combined.freeze();
393        let sliced = combined
394            .slice(plan.slice_start_in_combined as usize..plan.slice_end_in_combined as usize);
395
396        // response 組立て
397        let returned_size = sliced.len() as u64;
398        backend_resp.output.content_length = Some(returned_size as i64);
399        backend_resp.output.content_range = Some(format!(
400            "bytes {client_start}-{}/{total_original}",
401            client_end_exclusive - 1
402        ));
403        backend_resp.output.checksum_crc32 = None;
404        backend_resp.output.checksum_crc32c = None;
405        backend_resp.output.checksum_crc64nvme = None;
406        backend_resp.output.checksum_sha1 = None;
407        backend_resp.output.checksum_sha256 = None;
408        backend_resp.output.e_tag = None;
409        backend_resp.output.body = Some(bytes_to_blob(sliced));
410        backend_resp.status = Some(http::StatusCode::PARTIAL_CONTENT);
411
412        let elapsed = get_start.elapsed();
413        crate::metrics::record_get(
414            "partial",
415            plan.byte_end_exclusive - plan.byte_start,
416            returned_size,
417            elapsed.as_secs_f64(),
418            true,
419        );
420        info!(
421            op = "get_object",
422            bucket = %req.input.bucket,
423            key = %req.input.key,
424            bytes_in = plan.byte_end_exclusive - plan.byte_start,
425            bytes_out = returned_size,
426            total_object_size = total_original,
427            range = true,
428            path = "sidecar-partial",
429            latency_ms = elapsed.as_millis() as u64,
430            "S4 partial Range GET via sidecar index"
431        );
432        Ok(backend_resp)
433    }
434
435    /// `<key>.s4index` sidecar object を backend に書く。失敗しても本体 PUT は
436    /// 成功扱いにしたいので、err は warn ログのみ (Range GET の partial path が
437    /// 使えなくなるが、full read fallback で意味的には正しい結果を返す)。
438    async fn write_sidecar(&self, bucket: &str, key: &str, index: &FrameIndex) {
439        let bytes = encode_index(index);
440        let len = bytes.len() as i64;
441        let put_input = PutObjectInput {
442            bucket: bucket.into(),
443            key: sidecar_key(key),
444            body: Some(bytes_to_blob(bytes)),
445            content_length: Some(len),
446            content_type: Some("application/x-s4-index".into()),
447            ..Default::default()
448        };
449        let put_req = S3Request {
450            input: put_input,
451            method: http::Method::PUT,
452            uri: format!("/{bucket}/{}", sidecar_key(key)).parse().unwrap(),
453            headers: http::HeaderMap::new(),
454            extensions: http::Extensions::new(),
455            credentials: None,
456            region: None,
457            service: None,
458            trailing_headers: None,
459        };
460        if let Err(e) = self.backend.put_object(put_req).await {
461            tracing::warn!(
462                bucket,
463                key,
464                "S4 write_sidecar failed (Range GET will fall back to full read): {e}"
465            );
466        }
467    }
468
469    /// `<key>.s4index` sidecar を backend から読み出す。なければ None。
470    async fn read_sidecar(&self, bucket: &str, key: &str) -> Option<FrameIndex> {
471        let get_input = GetObjectInput {
472            bucket: bucket.into(),
473            key: sidecar_key(key),
474            ..Default::default()
475        };
476        let get_req = S3Request {
477            input: get_input,
478            method: http::Method::GET,
479            uri: format!("/{bucket}/{}", sidecar_key(key)).parse().unwrap(),
480            headers: http::HeaderMap::new(),
481            extensions: http::Extensions::new(),
482            credentials: None,
483            region: None,
484            service: None,
485            trailing_headers: None,
486        };
487        let resp = self.backend.get_object(get_req).await.ok()?;
488        let blob = resp.output.body?;
489        let bytes = collect_blob(blob, 64 * 1024 * 1024).await.ok()?;
490        decode_index(bytes).ok()
491    }
492
493    /// Multipart object (frame 列) を解凍 → 元 bytes を再構築。
494    ///
495    /// **per-frame codec dispatch**: 各 frame header に codec_id が入っているので、
496    /// frame ごとに registry が違う codec を呼ぶことができる。同一 object 内で
497    /// 異なる codec が混在していても透過的に解凍可能 (parquet 風 mixed columns 等)。
498    async fn decompress_multipart(&self, bytes: bytes::Bytes) -> S3Result<bytes::Bytes> {
499        let mut out = BytesMut::new();
500        for frame in FrameIter::new(bytes) {
501            let (header, payload) = frame.map_err(|e| {
502                S3Error::with_message(
503                    S3ErrorCode::InternalError,
504                    format!("multipart frame parse: {e}"),
505                )
506            })?;
507            let chunk_manifest = ChunkManifest {
508                codec: header.codec,
509                original_size: header.original_size,
510                compressed_size: header.compressed_size,
511                crc32c: header.crc32c,
512            };
513            let decompressed = self
514                .registry
515                .decompress(payload, &chunk_manifest)
516                .await
517                .map_err(internal("multipart frame decompress"))?;
518            out.extend_from_slice(&decompressed);
519        }
520        Ok(out.freeze())
521    }
522}
523
524/// Parse a CopySourceRange header value (`bytes=N-M`, `bytes=N-`, `bytes=-N`)
525/// into the s3s::dto::Range used by the GetObject path. The S3 spec only
526/// allows `bytes=N-M` for upload_part_copy (no suffix or open-ended), so
527/// reject the other variants for parity with AWS.
528fn parse_copy_source_range(s: &str) -> Result<s3s::dto::Range, String> {
529    let rest = s
530        .strip_prefix("bytes=")
531        .ok_or_else(|| format!("CopySourceRange must start with 'bytes=', got {s:?}"))?;
532    let (a, b) = rest
533        .split_once('-')
534        .ok_or_else(|| format!("CopySourceRange must be 'bytes=N-M', got {s:?}"))?;
535    let first: u64 = a
536        .parse()
537        .map_err(|_| format!("CopySourceRange first byte not a number: {a:?}"))?;
538    let last: u64 = b
539        .parse()
540        .map_err(|_| format!("CopySourceRange last byte not a number: {b:?}"))?;
541    if last < first {
542        return Err(format!("CopySourceRange last < first: {s:?}"));
543    }
544    Ok(s3s::dto::Range::Int {
545        first,
546        last: Some(last),
547    })
548}
549
550fn is_multipart_object(metadata: &Option<Metadata>) -> bool {
551    metadata
552        .as_ref()
553        .and_then(|m| m.get(META_MULTIPART))
554        .map(|v| v == "true")
555        .unwrap_or(false)
556}
557
558const META_CODEC: &str = "s4-codec";
559const META_ORIGINAL_SIZE: &str = "s4-original-size";
560const META_COMPRESSED_SIZE: &str = "s4-compressed-size";
561const META_CRC32C: &str = "s4-crc32c";
562/// Multipart upload で per-part frame format を使ったオブジェクトであることを示す。
563/// GET 時にこの flag を見て frame parser を起動する。
564const META_MULTIPART: &str = "s4-multipart";
565/// v0.2 #4: single-PUT でも S4F2 framed format で書かれていることを示す。
566/// 旧 v0.1 single-PUT は raw 圧縮 bytes (この flag なし)。GET 時にこの flag を
567/// 見て framed 経路 (= multipart と同じ FrameIter parse) に流す。
568const META_FRAMED: &str = "s4-framed";
569
570fn is_framed_v2_object(metadata: &Option<Metadata>) -> bool {
571    metadata
572        .as_ref()
573        .and_then(|m| m.get(META_FRAMED))
574        .map(|v| v == "true")
575        .unwrap_or(false)
576}
577
578/// v0.4 #21: detect SSE-S4 by the metadata flag we set on PUT.
579fn is_sse_encrypted(metadata: &Option<Metadata>) -> bool {
580    metadata
581        .as_ref()
582        .and_then(|m| m.get("s4-encrypted"))
583        .map(|v| v == "aes-256-gcm")
584        .unwrap_or(false)
585}
586
587fn extract_manifest(metadata: &Option<Metadata>) -> Option<ChunkManifest> {
588    let m = metadata.as_ref()?;
589    let codec = m
590        .get(META_CODEC)
591        .and_then(|s| s.parse::<CodecKind>().ok())?;
592    let original_size = m.get(META_ORIGINAL_SIZE)?.parse().ok()?;
593    let compressed_size = m.get(META_COMPRESSED_SIZE)?.parse().ok()?;
594    let crc32c = m.get(META_CRC32C)?.parse().ok()?;
595    Some(ChunkManifest {
596        codec,
597        original_size,
598        compressed_size,
599        crc32c,
600    })
601}
602
603fn write_manifest(metadata: &mut Option<Metadata>, manifest: &ChunkManifest) {
604    let meta = metadata.get_or_insert_with(Default::default);
605    meta.insert(META_CODEC.into(), manifest.codec.as_str().into());
606    meta.insert(
607        META_ORIGINAL_SIZE.into(),
608        manifest.original_size.to_string(),
609    );
610    meta.insert(
611        META_COMPRESSED_SIZE.into(),
612        manifest.compressed_size.to_string(),
613    );
614    meta.insert(META_CRC32C.into(), manifest.crc32c.to_string());
615}
616
617fn internal<E: std::fmt::Display>(prefix: &'static str) -> impl FnOnce(E) -> S3Error {
618    move |e| S3Error::with_message(S3ErrorCode::InternalError, format!("{prefix}: {e}"))
619}
620
621/// `Range` request を decompressed object サイズ `total` に適用して `(start, end_exclusive)`
622/// を返す。`Range::Int { first, last }` は `bytes=first-last` (last は inclusive)、
623/// `Range::Suffix { length }` は末尾 `length` byte。S3 仕様に準拠。
624pub fn resolve_range(range: &s3s::dto::Range, total: u64) -> Result<(u64, u64), String> {
625    if total == 0 {
626        return Err("cannot range-get zero-length object".into());
627    }
628    match range {
629        s3s::dto::Range::Int { first, last } => {
630            let start = *first;
631            let end_inclusive = match last {
632                Some(l) => (*l).min(total - 1),
633                None => total - 1,
634            };
635            if start > end_inclusive || start >= total {
636                return Err(format!(
637                    "range bytes={start}-{:?} out of object size {total}",
638                    last
639                ));
640            }
641            Ok((start, end_inclusive + 1))
642        }
643        s3s::dto::Range::Suffix { length } => {
644            let len = (*length).min(total);
645            Ok((total - len, total))
646        }
647    }
648}
649
650#[async_trait::async_trait]
651impl<B: S3> S3 for S4Service<B> {
652    // === 圧縮を挟む path (PUT) ===
653    #[tracing::instrument(
654        name = "s4.put_object",
655        skip(self, req),
656        fields(bucket = %req.input.bucket, key = %req.input.key, codec, bytes_in, bytes_out, latency_ms)
657    )]
658    async fn put_object(
659        &self,
660        mut req: S3Request<PutObjectInput>,
661    ) -> S3Result<S3Response<PutObjectOutput>> {
662        let put_start = Instant::now();
663        let put_bucket = req.input.bucket.clone();
664        let put_key = req.input.key.clone();
665        let access_preamble = self.access_log_preamble(&req);
666        self.enforce_rate_limit(&req, &put_bucket)?;
667        self.enforce_policy(&req, "s3:PutObject", &put_bucket, Some(&put_key))?;
668        if let Some(blob) = req.input.body.take() {
669            // Sample 4 KiB から codec を決定。streaming-aware codec なら streaming
670            // compress fast path、そうでなければ従来の collect-then-compress。
671            let (sample, rest_stream) = peek_sample(blob, SAMPLE_BYTES)
672                .await
673                .map_err(internal("peek put sample"))?;
674            let sample_len = sample.len().min(SAMPLE_BYTES);
675            let kind = self.dispatcher.pick(&sample[..sample_len]).await;
676
677            // Passthrough buys nothing from S4F2 wrapping (no compression =
678            // no per-chunk frame to skip past) and the +28-byte header
679            // overhead breaks size-sensitive callers that expect a true
680            // pass-through. So passthrough always uses the legacy raw-blob
681            // path; only compressing codecs go through the framed path.
682            let use_framed = supports_streaming_compress(kind) && kind != CodecKind::Passthrough;
683            let (compressed, manifest, is_framed) = if use_framed {
684                // streaming fast path: input は memory に collect しない
685                let chained = chain_sample_with_rest(sample, rest_stream);
686                debug!(
687                    bucket = ?req.input.bucket,
688                    key = ?req.input.key,
689                    codec = kind.as_str(),
690                    path = "streaming-framed",
691                    "S4 put_object: compressing (streaming, S4F2 multi-frame)"
692                );
693                // v0.4 #16: pick the chunk size based on the request's
694                // Content-Length when known, falling back to the 4 MiB
695                // default for chunked transfers.
696                let chunk_size = pick_chunk_size(req.input.content_length.map(|n| n as u64));
697                let (body, manifest) = streaming_compress_to_frames(
698                    chained,
699                    Arc::clone(&self.registry),
700                    kind,
701                    chunk_size,
702                )
703                .await
704                .map_err(internal("streaming framed compress"))?;
705                (body, manifest, true)
706            } else {
707                // GPU codec 等で streaming-aware でないものは bytes-buffered path
708                // (raw 圧縮 bytes、framed なし — back-compat 互換 path)
709                let bytes = collect_with_sample(sample, rest_stream, self.max_body_bytes)
710                    .await
711                    .map_err(internal("collect put body (buffered path)"))?;
712                debug!(
713                    bucket = ?req.input.bucket,
714                    key = ?req.input.key,
715                    bytes = bytes.len(),
716                    codec = kind.as_str(),
717                    path = "buffered",
718                    "S4 put_object: compressing (buffered, raw blob)"
719                );
720                let (body, m) = self
721                    .registry
722                    .compress(bytes, kind)
723                    .await
724                    .map_err(internal("registry compress"))?;
725                (body, m, false)
726            };
727
728            write_manifest(&mut req.input.metadata, &manifest);
729            if is_framed {
730                // v0.2 #4: framed body であることを GET 側に伝える meta flag。
731                req.input
732                    .metadata
733                    .get_or_insert_with(Default::default)
734                    .insert(META_FRAMED.into(), "true".into());
735            }
736            // 重要: content_length を圧縮後サイズで更新する。
737            // これを忘れると下流 (aws-sdk-s3 → S3) が宣言サイズ分の bytes を
738            // 待ち続けて RequestTimeout で失敗する (S3 仕様)。
739            req.input.content_length = Some(compressed.len() as i64);
740            // body を書き換えたので、客側が送ってきた original body 用の
741            // checksum / MD5 ヘッダは無効化する (そのまま転送すると下流 S3 が
742            // XAmzContentChecksumMismatch を返す)。S4 自身の整合性は
743            // ChunkManifest.crc32c で担保している。
744            req.input.checksum_algorithm = None;
745            req.input.checksum_crc32 = None;
746            req.input.checksum_crc32c = None;
747            req.input.checksum_crc64nvme = None;
748            req.input.checksum_sha1 = None;
749            req.input.checksum_sha256 = None;
750            req.input.content_md5 = None;
751            let original_size = manifest.original_size;
752            let compressed_size = manifest.compressed_size;
753            let codec_label = manifest.codec.as_str();
754            // framed body は GET 側で sidecar partial-fetch を効かせるため
755            // build_index_from_body で sidecar を組み立てて backend に PUT する。
756            let sidecar_index = if is_framed {
757                s4_codec::index::build_index_from_body(&compressed).ok()
758            } else {
759                None
760            };
761            // v0.4 #21: encrypt-after-compress. Wraps the post-framing
762            // body with AES-256-GCM (S4E1) when --sse-s4-key is set.
763            // Sidecar is built from the *unencrypted* framed body so the
764            // S4F2 frame iterator can still parse it post-decrypt on GET.
765            let body_to_send = if let Some(key) = self.sse_key.as_ref() {
766                req.input
767                    .metadata
768                    .get_or_insert_with(Default::default)
769                    .insert("s4-encrypted".into(), "aes-256-gcm".into());
770                crate::sse::encrypt(key, &compressed)
771            } else {
772                compressed.clone()
773            };
774            req.input.body = Some(bytes_to_blob(body_to_send));
775            let backend_resp = self.backend.put_object(req).await;
776            if let Some(idx) = sidecar_index
777                && backend_resp.is_ok()
778                && idx.entries.len() > 1
779            {
780                // 1 chunk しかない (small object) なら sidecar は意味がない (=
781                // partial fetch しても full body と同じ範囲) ので省略。
782                self.write_sidecar(&put_bucket, &put_key, &idx).await;
783            }
784            let _ = (original_size, compressed_size); // mute unused warnings
785            let elapsed = put_start.elapsed();
786            crate::metrics::record_put(
787                codec_label,
788                original_size,
789                compressed_size,
790                elapsed.as_secs_f64(),
791                backend_resp.is_ok(),
792            );
793            // v0.4 #20: structured access-log entry (best-effort).
794            self.record_access(
795                access_preamble,
796                "REST.PUT.OBJECT",
797                &put_bucket,
798                Some(&put_key),
799                if backend_resp.is_ok() { 200 } else { 500 },
800                compressed_size,
801                original_size,
802                elapsed.as_millis() as u64,
803                backend_resp.as_ref().err().map(|e| e.code().as_str()),
804            )
805            .await;
806            info!(
807                op = "put_object",
808                bucket = %put_bucket,
809                key = %put_key,
810                codec = codec_label,
811                bytes_in = original_size,
812                bytes_out = compressed_size,
813                ratio = format!(
814                    "{:.3}",
815                    if original_size == 0 { 1.0 } else { compressed_size as f64 / original_size as f64 }
816                ),
817                latency_ms = elapsed.as_millis() as u64,
818                ok = backend_resp.is_ok(),
819                "S4 put completed"
820            );
821            return backend_resp;
822        }
823        self.backend.put_object(req).await
824    }
825
826    // === 圧縮を解く path (GET) ===
827    #[tracing::instrument(
828        name = "s4.get_object",
829        skip(self, req),
830        fields(bucket = %req.input.bucket, key = %req.input.key, codec, bytes_out, range, path)
831    )]
832    async fn get_object(
833        &self,
834        mut req: S3Request<GetObjectInput>,
835    ) -> S3Result<S3Response<GetObjectOutput>> {
836        let get_start = Instant::now();
837        let get_bucket = req.input.bucket.clone();
838        let get_key = req.input.key.clone();
839        self.enforce_rate_limit(&req, &get_bucket)?;
840        self.enforce_policy(&req, "s3:GetObject", &get_bucket, Some(&get_key))?;
841        // Range request の事前検出 (decompress 後 slice する path に使う)。
842        let range_request = req.input.range.take();
843
844        // ====== Range GET の partial-fetch fast path (sidecar index 利用) ======
845        // sidecar `<key>.s4index` が存在し、multipart-framed object であれば
846        // 必要 frame だけを backend に Range GET し帯域節約する。
847        if let Some(ref r) = range_request
848            && let Some(index) = self.read_sidecar(&req.input.bucket, &req.input.key).await
849        {
850            let total = index.total_original_size();
851            let (start, end_exclusive) = match resolve_range(r, total) {
852                Ok(v) => v,
853                Err(e) => {
854                    return Err(S3Error::with_message(S3ErrorCode::InvalidRange, e));
855                }
856            };
857            if let Some(plan) = index.lookup_range(start, end_exclusive) {
858                return self
859                    .partial_range_get(&req, plan, start, end_exclusive, total, get_start)
860                    .await;
861            }
862        }
863        let mut resp = self.backend.get_object(req).await?;
864        let is_multipart = is_multipart_object(&resp.output.metadata);
865        let is_framed_v2 = is_framed_v2_object(&resp.output.metadata);
866        // v0.2 #4: framed-v2 single-PUT は多 frame parse が必要なので
867        // multipart と同じ path に流す。
868        let needs_frame_parse = is_multipart || is_framed_v2;
869        let manifest_opt = extract_manifest(&resp.output.metadata);
870
871        if !needs_frame_parse && manifest_opt.is_none() {
872            // S4 が書いていないオブジェクトは透過 (raw bucket pre-existing object 等)
873            debug!("S4 get_object: object lacks s4-codec metadata, returning as-is");
874            return Ok(resp);
875        }
876
877        if let Some(blob) = resp.output.body.take() {
878            // v0.4 #21: if the object was stored under SSE-S4 (metadata
879            // flag `s4-encrypted: aes-256-gcm`), we MUST decrypt before
880            // any frame parse / streaming decompress. Encrypted bodies
881            // are opaque to the codec; this also forces the buffered
882            // path because AES-GCM needs the full body for tag verify.
883            let blob = if is_sse_encrypted(&resp.output.metadata) {
884                let key = self.sse_key.as_ref().ok_or_else(|| {
885                    S3Error::with_message(
886                        S3ErrorCode::InvalidRequest,
887                        "object is SSE-S4 encrypted but no --sse-s4-key is configured on this gateway",
888                    )
889                })?;
890                let body = collect_blob(blob, self.max_body_bytes)
891                    .await
892                    .map_err(internal("collect SSE-encrypted body"))?;
893                let plain = crate::sse::decrypt(key, &body).map_err(|e| {
894                    S3Error::with_message(
895                        S3ErrorCode::InternalError,
896                        format!("SSE-S4 decrypt failed: {e}"),
897                    )
898                })?;
899                bytes_to_blob(plain)
900            } else {
901                blob
902            };
903            // ====== Streaming fast path (CpuZstd, non-multipart, codec supports it) ======
904            // 大規模 object (e.g. 5 GB) を memory に collect すると OOM するので、
905            // codec が streaming-aware なら body を chunk-by-chunk で decompress して
906            // 即座に client に流す。
907            //
908            // ただし Range request 時は streaming できない (slice するため total bytes
909            // が必要) → buffered path に fall through。
910            if range_request.is_none()
911                && !needs_frame_parse
912                && let Some(ref m) = manifest_opt
913                && supports_streaming_decompress(m.codec)
914                && m.codec == CodecKind::CpuZstd
915            {
916                let decompressed_blob = cpu_zstd_decompress_stream(blob);
917                resp.output.content_length = Some(m.original_size as i64);
918                resp.output.checksum_crc32 = None;
919                resp.output.checksum_crc32c = None;
920                resp.output.checksum_crc64nvme = None;
921                resp.output.checksum_sha1 = None;
922                resp.output.checksum_sha256 = None;
923                resp.output.e_tag = None;
924                resp.output.body = Some(decompressed_blob);
925                let elapsed = get_start.elapsed();
926                crate::metrics::record_get(
927                    m.codec.as_str(),
928                    m.compressed_size,
929                    m.original_size,
930                    elapsed.as_secs_f64(),
931                    true,
932                );
933                info!(
934                    op = "get_object",
935                    bucket = %get_bucket,
936                    key = %get_key,
937                    codec = m.codec.as_str(),
938                    bytes_in = m.compressed_size,
939                    bytes_out = m.original_size,
940                    path = "streaming",
941                    setup_latency_ms = elapsed.as_millis() as u64,
942                    "S4 get started (streaming)"
943                );
944                return Ok(resp);
945            }
946            // Passthrough: そのまま流す (Range なしの場合のみ streaming)
947            if range_request.is_none()
948                && !needs_frame_parse
949                && let Some(ref m) = manifest_opt
950                && m.codec == CodecKind::Passthrough
951            {
952                resp.output.content_length = Some(m.original_size as i64);
953                resp.output.checksum_crc32 = None;
954                resp.output.checksum_crc32c = None;
955                resp.output.checksum_crc64nvme = None;
956                resp.output.checksum_sha1 = None;
957                resp.output.checksum_sha256 = None;
958                resp.output.e_tag = None;
959                resp.output.body = Some(blob);
960                debug!("S4 get_object: passthrough streaming");
961                return Ok(resp);
962            }
963
964            // ====== Buffered slow path (multipart frame parser, GPU codecs) ======
965            let bytes = collect_blob(blob, self.max_body_bytes)
966                .await
967                .map_err(internal("collect get body"))?;
968
969            let decompressed = if needs_frame_parse {
970                // multipart objects と framed-v2 single-PUT objects は同じ
971                // S4F2 frame 列なので decompress_multipart で統一処理
972                self.decompress_multipart(bytes).await?
973            } else {
974                let manifest = manifest_opt.as_ref().expect("non-multipart guarded above");
975                self.registry
976                    .decompress(bytes, manifest)
977                    .await
978                    .map_err(internal("registry decompress"))?
979            };
980
981            // Range request があれば slice。なければ full body を返す。
982            let total_size = decompressed.len() as u64;
983            let (final_bytes, status_override) = if let Some(r) = range_request.as_ref() {
984                let (start, end) = resolve_range(r, total_size)
985                    .map_err(|e| S3Error::with_message(S3ErrorCode::InvalidRange, e))?;
986                let sliced = decompressed.slice(start as usize..end as usize);
987                resp.output.content_range = Some(format!(
988                    "bytes {start}-{}/{total_size}",
989                    end.saturating_sub(1)
990                ));
991                (sliced, Some(http::StatusCode::PARTIAL_CONTENT))
992            } else {
993                (decompressed, None)
994            };
995            // 解凍後の真のサイズを返す (S3 client は content_length を信頼するので
996            // 圧縮 size のままだと downstream が body を途中で切ってしまう)
997            resp.output.content_length = Some(final_bytes.len() as i64);
998            // 圧縮済 bytes の checksum を返すと AWS SDK 側で StreamingError
999            // (ChecksumMismatch) になる。ETag も backend が返した「圧縮済 bytes の
1000            // MD5/checksum」なので意味的にズレる — クリアして S4 自身の crc32c
1001            // (manifest 内 / frame 内) で integrity を保証する設計にする。
1002            resp.output.checksum_crc32 = None;
1003            resp.output.checksum_crc32c = None;
1004            resp.output.checksum_crc64nvme = None;
1005            resp.output.checksum_sha1 = None;
1006            resp.output.checksum_sha256 = None;
1007            resp.output.e_tag = None;
1008            let returned_size = final_bytes.len() as u64;
1009            let codec_label = manifest_opt
1010                .as_ref()
1011                .map(|m| m.codec.as_str())
1012                .unwrap_or("multipart");
1013            resp.output.body = Some(bytes_to_blob(final_bytes));
1014            if let Some(status) = status_override {
1015                resp.status = Some(status);
1016            }
1017            let elapsed = get_start.elapsed();
1018            crate::metrics::record_get(codec_label, 0, returned_size, elapsed.as_secs_f64(), true);
1019            info!(
1020                op = "get_object",
1021                bucket = %get_bucket,
1022                key = %get_key,
1023                codec = codec_label,
1024                bytes_out = returned_size,
1025                total_object_size = total_size,
1026                range = range_request.is_some(),
1027                path = "buffered",
1028                latency_ms = elapsed.as_millis() as u64,
1029                "S4 get completed (buffered)"
1030            );
1031        }
1032        Ok(resp)
1033    }
1034
1035    // === passthrough delegations ===
1036    async fn head_bucket(
1037        &self,
1038        req: S3Request<HeadBucketInput>,
1039    ) -> S3Result<S3Response<HeadBucketOutput>> {
1040        self.backend.head_bucket(req).await
1041    }
1042    async fn list_buckets(
1043        &self,
1044        req: S3Request<ListBucketsInput>,
1045    ) -> S3Result<S3Response<ListBucketsOutput>> {
1046        self.backend.list_buckets(req).await
1047    }
1048    async fn create_bucket(
1049        &self,
1050        req: S3Request<CreateBucketInput>,
1051    ) -> S3Result<S3Response<CreateBucketOutput>> {
1052        self.backend.create_bucket(req).await
1053    }
1054    async fn delete_bucket(
1055        &self,
1056        req: S3Request<DeleteBucketInput>,
1057    ) -> S3Result<S3Response<DeleteBucketOutput>> {
1058        self.backend.delete_bucket(req).await
1059    }
1060    async fn head_object(
1061        &self,
1062        req: S3Request<HeadObjectInput>,
1063    ) -> S3Result<S3Response<HeadObjectOutput>> {
1064        let mut resp = self.backend.head_object(req).await?;
1065        if let Some(manifest) = extract_manifest(&resp.output.metadata) {
1066            // 客側には decompress 後の意味のある content_length / checksum を返す。
1067            // backend が返す圧縮済 bytes の checksum / e_tag は意味が違うため除去
1068            // (S4 は manifest 内の crc32c で integrity を担保する)。
1069            resp.output.content_length = Some(manifest.original_size as i64);
1070            resp.output.checksum_crc32 = None;
1071            resp.output.checksum_crc32c = None;
1072            resp.output.checksum_crc64nvme = None;
1073            resp.output.checksum_sha1 = None;
1074            resp.output.checksum_sha256 = None;
1075            resp.output.e_tag = None;
1076        }
1077        Ok(resp)
1078    }
1079    async fn delete_object(
1080        &self,
1081        req: S3Request<DeleteObjectInput>,
1082    ) -> S3Result<S3Response<DeleteObjectOutput>> {
1083        let bucket = req.input.bucket.clone();
1084        let key = req.input.key.clone();
1085        self.enforce_rate_limit(&req, &bucket)?;
1086        self.enforce_policy(&req, "s3:DeleteObject", &bucket, Some(&key))?;
1087        // sidecar も best-effort で削除 (失敗は無視 — 存在しない場合や IAM 制限を許容)
1088        let resp = self.backend.delete_object(req).await?;
1089        let sidecar_input = DeleteObjectInput {
1090            bucket: bucket.clone(),
1091            key: sidecar_key(&key),
1092            ..Default::default()
1093        };
1094        let sidecar_req = S3Request {
1095            input: sidecar_input,
1096            method: http::Method::DELETE,
1097            uri: format!("/{bucket}/{}", sidecar_key(&key)).parse().unwrap(),
1098            headers: http::HeaderMap::new(),
1099            extensions: http::Extensions::new(),
1100            credentials: None,
1101            region: None,
1102            service: None,
1103            trailing_headers: None,
1104        };
1105        let _ = self.backend.delete_object(sidecar_req).await;
1106        Ok(resp)
1107    }
1108    async fn delete_objects(
1109        &self,
1110        req: S3Request<DeleteObjectsInput>,
1111    ) -> S3Result<S3Response<DeleteObjectsOutput>> {
1112        self.backend.delete_objects(req).await
1113    }
1114    async fn copy_object(
1115        &self,
1116        mut req: S3Request<CopyObjectInput>,
1117    ) -> S3Result<S3Response<CopyObjectOutput>> {
1118        // copy is conceptually "GetObject src + PutObject dst" — enforce both.
1119        let dst_bucket = req.input.bucket.clone();
1120        let dst_key = req.input.key.clone();
1121        self.enforce_policy(&req, "s3:PutObject", &dst_bucket, Some(&dst_key))?;
1122        if let CopySource::Bucket { bucket, key, .. } = &req.input.copy_source {
1123            self.enforce_policy(&req, "s3:GetObject", bucket, Some(key))?;
1124        }
1125        // S4-aware copy: source object に s4-* metadata がある場合、それを
1126        // destination に確実に preserve する。
1127        //
1128        // - MetadataDirective::COPY (default): backend が source metadata を
1129        //   そのまま copy するので S4 metadata も自動で渡る。介入不要
1130        // - MetadataDirective::REPLACE: 客が指定した metadata で source を
1131        //   上書き → s4-* metadata が消えると destination は decompress 不能に
1132        //   なる (silent corruption)。S4 が source metadata を HEAD で取得し、
1133        //   s4-* fields を input.metadata に強制 merge する
1134        let needs_merge = req
1135            .input
1136            .metadata_directive
1137            .as_ref()
1138            .map(|d| d.as_str() == MetadataDirective::REPLACE)
1139            .unwrap_or(false);
1140        if needs_merge && let CopySource::Bucket { bucket, key, .. } = &req.input.copy_source {
1141            let head_input = HeadObjectInput {
1142                bucket: bucket.to_string(),
1143                key: key.to_string(),
1144                ..Default::default()
1145            };
1146            let head_req = S3Request {
1147                input: head_input,
1148                method: req.method.clone(),
1149                uri: req.uri.clone(),
1150                headers: req.headers.clone(),
1151                extensions: http::Extensions::new(),
1152                credentials: req.credentials.clone(),
1153                region: req.region.clone(),
1154                service: req.service.clone(),
1155                trailing_headers: None,
1156            };
1157            if let Ok(head) = self.backend.head_object(head_req).await
1158                && let Some(src_meta) = head.output.metadata.as_ref()
1159            {
1160                let dest_meta = req.input.metadata.get_or_insert_with(Default::default);
1161                for key in [
1162                    META_CODEC,
1163                    META_ORIGINAL_SIZE,
1164                    META_COMPRESSED_SIZE,
1165                    META_CRC32C,
1166                    META_MULTIPART,
1167                    META_FRAMED,
1168                ] {
1169                    if let Some(v) = src_meta.get(key) {
1170                        // 客が同じ key を指定していたら preserve しない (= 上書き許可)
1171                        // していたら何もしない。指定していなければ insert
1172                        dest_meta
1173                            .entry(key.to_string())
1174                            .or_insert_with(|| v.clone());
1175                    }
1176                }
1177                debug!(
1178                    src_bucket = %bucket,
1179                    src_key = %key,
1180                    "S4 copy_object: preserved s4-* metadata across REPLACE directive"
1181                );
1182            }
1183        }
1184        self.backend.copy_object(req).await
1185    }
1186    async fn list_objects(
1187        &self,
1188        req: S3Request<ListObjectsInput>,
1189    ) -> S3Result<S3Response<ListObjectsOutput>> {
1190        self.enforce_rate_limit(&req, &req.input.bucket)?;
1191        self.enforce_policy(&req, "s3:ListBucket", &req.input.bucket, None)?;
1192        let mut resp = self.backend.list_objects(req).await?;
1193        // S4 内部 object (`*.s4index` sidecar 等) を顧客から隠す
1194        if let Some(contents) = resp.output.contents.as_mut() {
1195            contents.retain(|o| {
1196                o.key
1197                    .as_ref()
1198                    .map(|k| !k.ends_with(".s4index"))
1199                    .unwrap_or(true)
1200            });
1201        }
1202        Ok(resp)
1203    }
1204    async fn list_objects_v2(
1205        &self,
1206        req: S3Request<ListObjectsV2Input>,
1207    ) -> S3Result<S3Response<ListObjectsV2Output>> {
1208        self.enforce_rate_limit(&req, &req.input.bucket)?;
1209        self.enforce_policy(&req, "s3:ListBucket", &req.input.bucket, None)?;
1210        let mut resp = self.backend.list_objects_v2(req).await?;
1211        if let Some(contents) = resp.output.contents.as_mut() {
1212            let before = contents.len();
1213            contents.retain(|o| {
1214                o.key
1215                    .as_ref()
1216                    .map(|k| !k.ends_with(".s4index"))
1217                    .unwrap_or(true)
1218            });
1219            // key_count も補正 (S3 spec compliance)
1220            if let Some(kc) = resp.output.key_count.as_mut() {
1221                *kc -= (before - contents.len()) as i32;
1222            }
1223        }
1224        Ok(resp)
1225    }
1226    /// v0.4 #17: filter S4-internal sidecars from versioned listings.
1227    /// Same shape as `list_objects_v2` filter but applies to both the
1228    /// `Versions` and `DeleteMarkers` arrays (each carries its own
1229    /// `.s4index` versions on a versioned bucket where the sidecar has
1230    /// been overwritten).
1231    async fn list_object_versions(
1232        &self,
1233        req: S3Request<ListObjectVersionsInput>,
1234    ) -> S3Result<S3Response<ListObjectVersionsOutput>> {
1235        self.enforce_rate_limit(&req, &req.input.bucket)?;
1236        self.enforce_policy(&req, "s3:ListBucket", &req.input.bucket, None)?;
1237        let mut resp = self.backend.list_object_versions(req).await?;
1238        if let Some(versions) = resp.output.versions.as_mut() {
1239            versions.retain(|v| {
1240                v.key
1241                    .as_ref()
1242                    .map(|k| !k.ends_with(".s4index"))
1243                    .unwrap_or(true)
1244            });
1245        }
1246        if let Some(markers) = resp.output.delete_markers.as_mut() {
1247            markers.retain(|m| {
1248                m.key
1249                    .as_ref()
1250                    .map(|k| !k.ends_with(".s4index"))
1251                    .unwrap_or(true)
1252            });
1253        }
1254        Ok(resp)
1255    }
1256
1257    async fn create_multipart_upload(
1258        &self,
1259        mut req: S3Request<CreateMultipartUploadInput>,
1260    ) -> S3Result<S3Response<CreateMultipartUploadOutput>> {
1261        // Multipart object は per-part 圧縮 + frame 形式で書く。GET 時に
1262        // frame parse を起動するため、object metadata に flag を立てる。
1263        // codec は dispatcher の default kind を採用 (per-part 別 codec は Phase 2)。
1264        let codec_kind = self.registry.default_kind();
1265        let meta = req.input.metadata.get_or_insert_with(Default::default);
1266        meta.insert(META_MULTIPART.into(), "true".into());
1267        meta.insert(META_CODEC.into(), codec_kind.as_str().into());
1268        debug!(
1269            bucket = ?req.input.bucket,
1270            key = ?req.input.key,
1271            codec = codec_kind.as_str(),
1272            "S4 create_multipart_upload: marking object for per-part compression"
1273        );
1274        self.backend.create_multipart_upload(req).await
1275    }
1276
1277    async fn upload_part(
1278        &self,
1279        mut req: S3Request<UploadPartInput>,
1280    ) -> S3Result<S3Response<UploadPartOutput>> {
1281        // 各 part を圧縮して frame header 付きで forward。GET 時に
1282        // `decompress_multipart` が frame iter で順に解凍する。
1283        // **per-part codec dispatch**: dispatcher が body 先頭 sample から
1284        // codec を選ぶので、parquet 風の mixed-content multipart で part ごとに
1285        // 最適 codec を使える (整数列 part → Bitcomp、text 列 part → zstd 等)。
1286        if let Some(blob) = req.input.body.take() {
1287            let bytes = collect_blob(blob, self.max_body_bytes)
1288                .await
1289                .map_err(internal("collect upload_part body"))?;
1290            let sample_len = bytes.len().min(SAMPLE_BYTES);
1291            let codec_kind = self.dispatcher.pick(&bytes[..sample_len]).await;
1292            let original_size = bytes.len() as u64;
1293            let (compressed, manifest) = self
1294                .registry
1295                .compress(bytes, codec_kind)
1296                .await
1297                .map_err(internal("registry compress part"))?;
1298            let header = FrameHeader {
1299                codec: codec_kind,
1300                original_size,
1301                compressed_size: compressed.len() as u64,
1302                crc32c: manifest.crc32c,
1303            };
1304            let mut framed = BytesMut::with_capacity(FRAME_HEADER_BYTES + compressed.len());
1305            write_frame(&mut framed, header, &compressed);
1306            // v0.2 #5: heuristic-based padding skip for likely-final parts.
1307            //
1308            // AWS SDK / aws-cli / boto3 always send the final (and only the
1309            // final) part below the configured part_size. So if the raw user
1310            // part is already smaller than S3's 5 MiB multipart minimum, this
1311            // is overwhelmingly likely to be the final part — and the final
1312            // part is exempt from S3's size constraint. Skipping padding here
1313            // saves up to ~5 MiB per object on highly compressible workloads.
1314            //
1315            // If a misbehaving client sends a tiny **non-final** part, S3
1316            // itself rejects with EntityTooSmall at CompleteMultipartUpload —
1317            // identical outcome to a vanilla S3 PUT, just earlier than
1318            // padding-then-complete would catch it.
1319            let likely_final = original_size < S3_MULTIPART_MIN_PART_BYTES as u64;
1320            if !likely_final {
1321                pad_to_minimum(&mut framed, S3_MULTIPART_MIN_PART_BYTES);
1322            }
1323            let framed_bytes = framed.freeze();
1324            let new_len = framed_bytes.len() as i64;
1325            // 同じ wire 互換問題が multipart にもある (content-length / checksum)
1326            req.input.content_length = Some(new_len);
1327            req.input.checksum_algorithm = None;
1328            req.input.checksum_crc32 = None;
1329            req.input.checksum_crc32c = None;
1330            req.input.checksum_crc64nvme = None;
1331            req.input.checksum_sha1 = None;
1332            req.input.checksum_sha256 = None;
1333            req.input.content_md5 = None;
1334            req.input.body = Some(bytes_to_blob(framed_bytes));
1335            debug!(
1336                part_number = ?req.input.part_number,
1337                upload_id = ?req.input.upload_id,
1338                original_size,
1339                framed_size = new_len,
1340                "S4 upload_part: framed compressed payload"
1341            );
1342        }
1343        self.backend.upload_part(req).await
1344    }
1345    async fn complete_multipart_upload(
1346        &self,
1347        req: S3Request<CompleteMultipartUploadInput>,
1348    ) -> S3Result<S3Response<CompleteMultipartUploadOutput>> {
1349        let bucket = req.input.bucket.clone();
1350        let key = req.input.key.clone();
1351        let resp = self.backend.complete_multipart_upload(req).await?;
1352        // CompleteMultipartUpload 成功 → 完成した object を full fetch して frame
1353        // index を build、`<key>.s4index` sidecar として保存。これで Range GET の
1354        // partial fetch path が利用可能になる (Range request の帯域節約)。
1355        // 注: 巨大 object の場合この pass は重いが、Range query は一度 sidecar が
1356        // できれば爆速になるので 1 回の cost は payback される
1357        let bucket_clone = bucket.clone();
1358        let key_clone = key.clone();
1359        let get_input = GetObjectInput {
1360            bucket: bucket_clone.clone(),
1361            key: key_clone.clone(),
1362            ..Default::default()
1363        };
1364        let get_req = S3Request {
1365            input: get_input,
1366            method: http::Method::GET,
1367            uri: format!("/{bucket_clone}/{key_clone}").parse().unwrap(),
1368            headers: http::HeaderMap::new(),
1369            extensions: http::Extensions::new(),
1370            credentials: None,
1371            region: None,
1372            service: None,
1373            trailing_headers: None,
1374        };
1375        if let Ok(get_resp) = self.backend.get_object(get_req).await
1376            && let Some(blob) = get_resp.output.body
1377            && let Ok(body) = collect_blob(blob, self.max_body_bytes).await
1378            && let Ok(index) = build_index_from_body(&body)
1379        {
1380            self.write_sidecar(&bucket, &key, &index).await;
1381        }
1382        Ok(resp)
1383    }
1384    async fn abort_multipart_upload(
1385        &self,
1386        req: S3Request<AbortMultipartUploadInput>,
1387    ) -> S3Result<S3Response<AbortMultipartUploadOutput>> {
1388        self.backend.abort_multipart_upload(req).await
1389    }
1390    async fn list_multipart_uploads(
1391        &self,
1392        req: S3Request<ListMultipartUploadsInput>,
1393    ) -> S3Result<S3Response<ListMultipartUploadsOutput>> {
1394        self.backend.list_multipart_uploads(req).await
1395    }
1396    async fn list_parts(
1397        &self,
1398        req: S3Request<ListPartsInput>,
1399    ) -> S3Result<S3Response<ListPartsOutput>> {
1400        self.backend.list_parts(req).await
1401    }
1402
1403    // =========================================================================
1404    // Phase 2 — pure passthrough delegations。S4 はこれらに対して圧縮 hook を
1405    // 持たないので、backend (= AWS S3) の動作と完全に同一。
1406    //
1407    // 既知の制限事項:
1408    // - copy_object / upload_part_copy: source object が S4-compressed の場合、
1409    //   backend が bytes を copy するだけなので metadata (s4-codec etc) も一緒に
1410    //   coppied される (AWS S3 default = MetadataDirective COPY)。GET は manifest
1411    //   経由で正しく decompress できる。MetadataDirective REPLACE で上書き
1412    //   されると圧縮 metadata が消えて壊れる — 顧客側の運用で注意
1413    // - list_object_versions: versioning enabled bucket では各 version も S4
1414    //   metadata を維持する。古い version も S4 経由で正しく GET できる。
1415    // =========================================================================
1416
1417    // ---- Object ACL / tagging / attributes ----
1418    async fn get_object_acl(
1419        &self,
1420        req: S3Request<GetObjectAclInput>,
1421    ) -> S3Result<S3Response<GetObjectAclOutput>> {
1422        self.backend.get_object_acl(req).await
1423    }
1424    async fn put_object_acl(
1425        &self,
1426        req: S3Request<PutObjectAclInput>,
1427    ) -> S3Result<S3Response<PutObjectAclOutput>> {
1428        self.backend.put_object_acl(req).await
1429    }
1430    async fn get_object_tagging(
1431        &self,
1432        req: S3Request<GetObjectTaggingInput>,
1433    ) -> S3Result<S3Response<GetObjectTaggingOutput>> {
1434        self.backend.get_object_tagging(req).await
1435    }
1436    async fn put_object_tagging(
1437        &self,
1438        req: S3Request<PutObjectTaggingInput>,
1439    ) -> S3Result<S3Response<PutObjectTaggingOutput>> {
1440        self.backend.put_object_tagging(req).await
1441    }
1442    async fn delete_object_tagging(
1443        &self,
1444        req: S3Request<DeleteObjectTaggingInput>,
1445    ) -> S3Result<S3Response<DeleteObjectTaggingOutput>> {
1446        self.backend.delete_object_tagging(req).await
1447    }
1448    async fn get_object_attributes(
1449        &self,
1450        req: S3Request<GetObjectAttributesInput>,
1451    ) -> S3Result<S3Response<GetObjectAttributesOutput>> {
1452        self.backend.get_object_attributes(req).await
1453    }
1454    async fn restore_object(
1455        &self,
1456        req: S3Request<RestoreObjectInput>,
1457    ) -> S3Result<S3Response<RestoreObjectOutput>> {
1458        self.backend.restore_object(req).await
1459    }
1460    async fn upload_part_copy(
1461        &self,
1462        req: S3Request<UploadPartCopyInput>,
1463    ) -> S3Result<S3Response<UploadPartCopyOutput>> {
1464        // v0.2 #6: byte-range aware copy when the source is S4-framed.
1465        //
1466        // For a framed source (multipart upload OR single-PUT framed-v2),
1467        // a naive byte-range passthrough would copy compressed bytes that
1468        // don't align with S4 frame boundaries — silently corrupting the
1469        // result. Instead we GET the source through S4 (which handles
1470        // decompression + Range), re-compress + re-frame as a new part,
1471        // and forward as upload_part. For non-framed sources (S4-untouched
1472        // raw objects), passthrough is correct and we keep the original
1473        // (cheaper) code path.
1474        let CopySource::Bucket {
1475            bucket: src_bucket,
1476            key: src_key,
1477            ..
1478        } = &req.input.copy_source
1479        else {
1480            return self.backend.upload_part_copy(req).await;
1481        };
1482        let src_bucket = src_bucket.to_string();
1483        let src_key = src_key.to_string();
1484
1485        // Probe metadata to decide whether the source needs S4-aware copy.
1486        let head_input = HeadObjectInput {
1487            bucket: src_bucket.clone(),
1488            key: src_key.clone(),
1489            ..Default::default()
1490        };
1491        let head_req = S3Request {
1492            input: head_input,
1493            method: http::Method::HEAD,
1494            uri: req.uri.clone(),
1495            headers: req.headers.clone(),
1496            extensions: http::Extensions::new(),
1497            credentials: req.credentials.clone(),
1498            region: req.region.clone(),
1499            service: req.service.clone(),
1500            trailing_headers: None,
1501        };
1502        let needs_s4_copy = match self.backend.head_object(head_req).await {
1503            Ok(h) => {
1504                is_multipart_object(&h.output.metadata) || is_framed_v2_object(&h.output.metadata)
1505            }
1506            Err(_) => false,
1507        };
1508        if !needs_s4_copy {
1509            return self.backend.upload_part_copy(req).await;
1510        }
1511
1512        // Resolve the optional source byte range to pass to GET.
1513        let source_range = req
1514            .input
1515            .copy_source_range
1516            .as_ref()
1517            .map(|r| parse_copy_source_range(r))
1518            .transpose()
1519            .map_err(|e| S3Error::with_message(S3ErrorCode::InvalidRange, e))?;
1520
1521        // GET source via S4 (handles decompression + sidecar partial fetch
1522        // when range is present). The result is the requested user-visible
1523        // byte range, fully decompressed.
1524        let mut get_input = GetObjectInput {
1525            bucket: src_bucket.clone(),
1526            key: src_key.clone(),
1527            ..Default::default()
1528        };
1529        get_input.range = source_range;
1530        let get_req = S3Request {
1531            input: get_input,
1532            method: http::Method::GET,
1533            uri: req.uri.clone(),
1534            headers: req.headers.clone(),
1535            extensions: http::Extensions::new(),
1536            credentials: req.credentials.clone(),
1537            region: req.region.clone(),
1538            service: req.service.clone(),
1539            trailing_headers: None,
1540        };
1541        let get_resp = self.get_object(get_req).await?;
1542        let blob = get_resp.output.body.ok_or_else(|| {
1543            S3Error::with_message(
1544                S3ErrorCode::InternalError,
1545                "upload_part_copy: empty body from source GET",
1546            )
1547        })?;
1548        let bytes = collect_blob(blob, self.max_body_bytes)
1549            .await
1550            .map_err(internal("collect upload_part_copy source body"))?;
1551
1552        // Compress + frame as a fresh part (mirrors upload_part path).
1553        let sample_len = bytes.len().min(SAMPLE_BYTES);
1554        let codec_kind = self.dispatcher.pick(&bytes[..sample_len]).await;
1555        let original_size = bytes.len() as u64;
1556        let (compressed, manifest) = self
1557            .registry
1558            .compress(bytes, codec_kind)
1559            .await
1560            .map_err(internal("registry compress upload_part_copy"))?;
1561        let header = FrameHeader {
1562            codec: codec_kind,
1563            original_size,
1564            compressed_size: compressed.len() as u64,
1565            crc32c: manifest.crc32c,
1566        };
1567        let mut framed = BytesMut::with_capacity(FRAME_HEADER_BYTES + compressed.len());
1568        write_frame(&mut framed, header, &compressed);
1569        let likely_final = original_size < S3_MULTIPART_MIN_PART_BYTES as u64;
1570        if !likely_final {
1571            pad_to_minimum(&mut framed, S3_MULTIPART_MIN_PART_BYTES);
1572        }
1573        let framed_bytes = framed.freeze();
1574        let framed_len = framed_bytes.len() as i64;
1575
1576        // Forward as upload_part to the destination multipart upload.
1577        let part_input = UploadPartInput {
1578            bucket: req.input.bucket.clone(),
1579            key: req.input.key.clone(),
1580            part_number: req.input.part_number,
1581            upload_id: req.input.upload_id.clone(),
1582            body: Some(bytes_to_blob(framed_bytes)),
1583            content_length: Some(framed_len),
1584            ..Default::default()
1585        };
1586        let part_req = S3Request {
1587            input: part_input,
1588            method: http::Method::PUT,
1589            uri: req.uri.clone(),
1590            headers: req.headers.clone(),
1591            extensions: http::Extensions::new(),
1592            credentials: req.credentials.clone(),
1593            region: req.region.clone(),
1594            service: req.service.clone(),
1595            trailing_headers: None,
1596        };
1597        let upload_resp = self.backend.upload_part(part_req).await?;
1598
1599        let copy_output = UploadPartCopyOutput {
1600            copy_part_result: Some(CopyPartResult {
1601                e_tag: upload_resp.output.e_tag.clone(),
1602                ..Default::default()
1603            }),
1604            ..Default::default()
1605        };
1606        Ok(S3Response::new(copy_output))
1607    }
1608
1609    // ---- Object lock / retention / legal hold ----
1610    async fn get_object_lock_configuration(
1611        &self,
1612        req: S3Request<GetObjectLockConfigurationInput>,
1613    ) -> S3Result<S3Response<GetObjectLockConfigurationOutput>> {
1614        self.backend.get_object_lock_configuration(req).await
1615    }
1616    async fn put_object_lock_configuration(
1617        &self,
1618        req: S3Request<PutObjectLockConfigurationInput>,
1619    ) -> S3Result<S3Response<PutObjectLockConfigurationOutput>> {
1620        self.backend.put_object_lock_configuration(req).await
1621    }
1622    async fn get_object_legal_hold(
1623        &self,
1624        req: S3Request<GetObjectLegalHoldInput>,
1625    ) -> S3Result<S3Response<GetObjectLegalHoldOutput>> {
1626        self.backend.get_object_legal_hold(req).await
1627    }
1628    async fn put_object_legal_hold(
1629        &self,
1630        req: S3Request<PutObjectLegalHoldInput>,
1631    ) -> S3Result<S3Response<PutObjectLegalHoldOutput>> {
1632        self.backend.put_object_legal_hold(req).await
1633    }
1634    async fn get_object_retention(
1635        &self,
1636        req: S3Request<GetObjectRetentionInput>,
1637    ) -> S3Result<S3Response<GetObjectRetentionOutput>> {
1638        self.backend.get_object_retention(req).await
1639    }
1640    async fn put_object_retention(
1641        &self,
1642        req: S3Request<PutObjectRetentionInput>,
1643    ) -> S3Result<S3Response<PutObjectRetentionOutput>> {
1644        self.backend.put_object_retention(req).await
1645    }
1646
1647    // ---- Versioning ----
1648    // list_object_versions is implemented above in the compression-hook
1649    // section so it filters S4-internal sidecars (v0.4 #17). Per-version
1650    // S4 metadata is preserved by the backend automatically.
1651    async fn get_bucket_versioning(
1652        &self,
1653        req: S3Request<GetBucketVersioningInput>,
1654    ) -> S3Result<S3Response<GetBucketVersioningOutput>> {
1655        self.backend.get_bucket_versioning(req).await
1656    }
1657    async fn put_bucket_versioning(
1658        &self,
1659        req: S3Request<PutBucketVersioningInput>,
1660    ) -> S3Result<S3Response<PutBucketVersioningOutput>> {
1661        self.backend.put_bucket_versioning(req).await
1662    }
1663
1664    // ---- Bucket location ----
1665    async fn get_bucket_location(
1666        &self,
1667        req: S3Request<GetBucketLocationInput>,
1668    ) -> S3Result<S3Response<GetBucketLocationOutput>> {
1669        self.backend.get_bucket_location(req).await
1670    }
1671
1672    // ---- Bucket policy ----
1673    async fn get_bucket_policy(
1674        &self,
1675        req: S3Request<GetBucketPolicyInput>,
1676    ) -> S3Result<S3Response<GetBucketPolicyOutput>> {
1677        self.backend.get_bucket_policy(req).await
1678    }
1679    async fn put_bucket_policy(
1680        &self,
1681        req: S3Request<PutBucketPolicyInput>,
1682    ) -> S3Result<S3Response<PutBucketPolicyOutput>> {
1683        self.backend.put_bucket_policy(req).await
1684    }
1685    async fn delete_bucket_policy(
1686        &self,
1687        req: S3Request<DeleteBucketPolicyInput>,
1688    ) -> S3Result<S3Response<DeleteBucketPolicyOutput>> {
1689        self.backend.delete_bucket_policy(req).await
1690    }
1691    async fn get_bucket_policy_status(
1692        &self,
1693        req: S3Request<GetBucketPolicyStatusInput>,
1694    ) -> S3Result<S3Response<GetBucketPolicyStatusOutput>> {
1695        self.backend.get_bucket_policy_status(req).await
1696    }
1697
1698    // ---- Bucket ACL ----
1699    async fn get_bucket_acl(
1700        &self,
1701        req: S3Request<GetBucketAclInput>,
1702    ) -> S3Result<S3Response<GetBucketAclOutput>> {
1703        self.backend.get_bucket_acl(req).await
1704    }
1705    async fn put_bucket_acl(
1706        &self,
1707        req: S3Request<PutBucketAclInput>,
1708    ) -> S3Result<S3Response<PutBucketAclOutput>> {
1709        self.backend.put_bucket_acl(req).await
1710    }
1711
1712    // ---- Bucket CORS ----
1713    async fn get_bucket_cors(
1714        &self,
1715        req: S3Request<GetBucketCorsInput>,
1716    ) -> S3Result<S3Response<GetBucketCorsOutput>> {
1717        self.backend.get_bucket_cors(req).await
1718    }
1719    async fn put_bucket_cors(
1720        &self,
1721        req: S3Request<PutBucketCorsInput>,
1722    ) -> S3Result<S3Response<PutBucketCorsOutput>> {
1723        self.backend.put_bucket_cors(req).await
1724    }
1725    async fn delete_bucket_cors(
1726        &self,
1727        req: S3Request<DeleteBucketCorsInput>,
1728    ) -> S3Result<S3Response<DeleteBucketCorsOutput>> {
1729        self.backend.delete_bucket_cors(req).await
1730    }
1731
1732    // ---- Bucket lifecycle ----
1733    async fn get_bucket_lifecycle_configuration(
1734        &self,
1735        req: S3Request<GetBucketLifecycleConfigurationInput>,
1736    ) -> S3Result<S3Response<GetBucketLifecycleConfigurationOutput>> {
1737        self.backend.get_bucket_lifecycle_configuration(req).await
1738    }
1739    async fn put_bucket_lifecycle_configuration(
1740        &self,
1741        req: S3Request<PutBucketLifecycleConfigurationInput>,
1742    ) -> S3Result<S3Response<PutBucketLifecycleConfigurationOutput>> {
1743        self.backend.put_bucket_lifecycle_configuration(req).await
1744    }
1745    async fn delete_bucket_lifecycle(
1746        &self,
1747        req: S3Request<DeleteBucketLifecycleInput>,
1748    ) -> S3Result<S3Response<DeleteBucketLifecycleOutput>> {
1749        self.backend.delete_bucket_lifecycle(req).await
1750    }
1751
1752    // ---- Bucket tagging ----
1753    async fn get_bucket_tagging(
1754        &self,
1755        req: S3Request<GetBucketTaggingInput>,
1756    ) -> S3Result<S3Response<GetBucketTaggingOutput>> {
1757        self.backend.get_bucket_tagging(req).await
1758    }
1759    async fn put_bucket_tagging(
1760        &self,
1761        req: S3Request<PutBucketTaggingInput>,
1762    ) -> S3Result<S3Response<PutBucketTaggingOutput>> {
1763        self.backend.put_bucket_tagging(req).await
1764    }
1765    async fn delete_bucket_tagging(
1766        &self,
1767        req: S3Request<DeleteBucketTaggingInput>,
1768    ) -> S3Result<S3Response<DeleteBucketTaggingOutput>> {
1769        self.backend.delete_bucket_tagging(req).await
1770    }
1771
1772    // ---- Bucket encryption ----
1773    async fn get_bucket_encryption(
1774        &self,
1775        req: S3Request<GetBucketEncryptionInput>,
1776    ) -> S3Result<S3Response<GetBucketEncryptionOutput>> {
1777        self.backend.get_bucket_encryption(req).await
1778    }
1779    async fn put_bucket_encryption(
1780        &self,
1781        req: S3Request<PutBucketEncryptionInput>,
1782    ) -> S3Result<S3Response<PutBucketEncryptionOutput>> {
1783        self.backend.put_bucket_encryption(req).await
1784    }
1785    async fn delete_bucket_encryption(
1786        &self,
1787        req: S3Request<DeleteBucketEncryptionInput>,
1788    ) -> S3Result<S3Response<DeleteBucketEncryptionOutput>> {
1789        self.backend.delete_bucket_encryption(req).await
1790    }
1791
1792    // ---- Bucket logging ----
1793    async fn get_bucket_logging(
1794        &self,
1795        req: S3Request<GetBucketLoggingInput>,
1796    ) -> S3Result<S3Response<GetBucketLoggingOutput>> {
1797        self.backend.get_bucket_logging(req).await
1798    }
1799    async fn put_bucket_logging(
1800        &self,
1801        req: S3Request<PutBucketLoggingInput>,
1802    ) -> S3Result<S3Response<PutBucketLoggingOutput>> {
1803        self.backend.put_bucket_logging(req).await
1804    }
1805
1806    // ---- Bucket notification ----
1807    async fn get_bucket_notification_configuration(
1808        &self,
1809        req: S3Request<GetBucketNotificationConfigurationInput>,
1810    ) -> S3Result<S3Response<GetBucketNotificationConfigurationOutput>> {
1811        self.backend
1812            .get_bucket_notification_configuration(req)
1813            .await
1814    }
1815    async fn put_bucket_notification_configuration(
1816        &self,
1817        req: S3Request<PutBucketNotificationConfigurationInput>,
1818    ) -> S3Result<S3Response<PutBucketNotificationConfigurationOutput>> {
1819        self.backend
1820            .put_bucket_notification_configuration(req)
1821            .await
1822    }
1823
1824    // ---- Bucket request payment ----
1825    async fn get_bucket_request_payment(
1826        &self,
1827        req: S3Request<GetBucketRequestPaymentInput>,
1828    ) -> S3Result<S3Response<GetBucketRequestPaymentOutput>> {
1829        self.backend.get_bucket_request_payment(req).await
1830    }
1831    async fn put_bucket_request_payment(
1832        &self,
1833        req: S3Request<PutBucketRequestPaymentInput>,
1834    ) -> S3Result<S3Response<PutBucketRequestPaymentOutput>> {
1835        self.backend.put_bucket_request_payment(req).await
1836    }
1837
1838    // ---- Bucket website ----
1839    async fn get_bucket_website(
1840        &self,
1841        req: S3Request<GetBucketWebsiteInput>,
1842    ) -> S3Result<S3Response<GetBucketWebsiteOutput>> {
1843        self.backend.get_bucket_website(req).await
1844    }
1845    async fn put_bucket_website(
1846        &self,
1847        req: S3Request<PutBucketWebsiteInput>,
1848    ) -> S3Result<S3Response<PutBucketWebsiteOutput>> {
1849        self.backend.put_bucket_website(req).await
1850    }
1851    async fn delete_bucket_website(
1852        &self,
1853        req: S3Request<DeleteBucketWebsiteInput>,
1854    ) -> S3Result<S3Response<DeleteBucketWebsiteOutput>> {
1855        self.backend.delete_bucket_website(req).await
1856    }
1857
1858    // ---- Bucket replication ----
1859    async fn get_bucket_replication(
1860        &self,
1861        req: S3Request<GetBucketReplicationInput>,
1862    ) -> S3Result<S3Response<GetBucketReplicationOutput>> {
1863        self.backend.get_bucket_replication(req).await
1864    }
1865    async fn put_bucket_replication(
1866        &self,
1867        req: S3Request<PutBucketReplicationInput>,
1868    ) -> S3Result<S3Response<PutBucketReplicationOutput>> {
1869        self.backend.put_bucket_replication(req).await
1870    }
1871    async fn delete_bucket_replication(
1872        &self,
1873        req: S3Request<DeleteBucketReplicationInput>,
1874    ) -> S3Result<S3Response<DeleteBucketReplicationOutput>> {
1875        self.backend.delete_bucket_replication(req).await
1876    }
1877
1878    // ---- Bucket accelerate ----
1879    async fn get_bucket_accelerate_configuration(
1880        &self,
1881        req: S3Request<GetBucketAccelerateConfigurationInput>,
1882    ) -> S3Result<S3Response<GetBucketAccelerateConfigurationOutput>> {
1883        self.backend.get_bucket_accelerate_configuration(req).await
1884    }
1885    async fn put_bucket_accelerate_configuration(
1886        &self,
1887        req: S3Request<PutBucketAccelerateConfigurationInput>,
1888    ) -> S3Result<S3Response<PutBucketAccelerateConfigurationOutput>> {
1889        self.backend.put_bucket_accelerate_configuration(req).await
1890    }
1891
1892    // ---- Bucket ownership controls ----
1893    async fn get_bucket_ownership_controls(
1894        &self,
1895        req: S3Request<GetBucketOwnershipControlsInput>,
1896    ) -> S3Result<S3Response<GetBucketOwnershipControlsOutput>> {
1897        self.backend.get_bucket_ownership_controls(req).await
1898    }
1899    async fn put_bucket_ownership_controls(
1900        &self,
1901        req: S3Request<PutBucketOwnershipControlsInput>,
1902    ) -> S3Result<S3Response<PutBucketOwnershipControlsOutput>> {
1903        self.backend.put_bucket_ownership_controls(req).await
1904    }
1905    async fn delete_bucket_ownership_controls(
1906        &self,
1907        req: S3Request<DeleteBucketOwnershipControlsInput>,
1908    ) -> S3Result<S3Response<DeleteBucketOwnershipControlsOutput>> {
1909        self.backend.delete_bucket_ownership_controls(req).await
1910    }
1911
1912    // ---- Public access block ----
1913    async fn get_public_access_block(
1914        &self,
1915        req: S3Request<GetPublicAccessBlockInput>,
1916    ) -> S3Result<S3Response<GetPublicAccessBlockOutput>> {
1917        self.backend.get_public_access_block(req).await
1918    }
1919    async fn put_public_access_block(
1920        &self,
1921        req: S3Request<PutPublicAccessBlockInput>,
1922    ) -> S3Result<S3Response<PutPublicAccessBlockOutput>> {
1923        self.backend.put_public_access_block(req).await
1924    }
1925    async fn delete_public_access_block(
1926        &self,
1927        req: S3Request<DeletePublicAccessBlockInput>,
1928    ) -> S3Result<S3Response<DeletePublicAccessBlockOutput>> {
1929        self.backend.delete_public_access_block(req).await
1930    }
1931}
1932
1933#[cfg(test)]
1934mod tests {
1935    use super::*;
1936
1937    #[test]
1938    fn manifest_roundtrip_via_metadata() {
1939        let original = ChunkManifest {
1940            codec: CodecKind::CpuZstd,
1941            original_size: 1234,
1942            compressed_size: 567,
1943            crc32c: 0xdead_beef,
1944        };
1945        let mut meta: Option<Metadata> = None;
1946        write_manifest(&mut meta, &original);
1947        let extracted = extract_manifest(&meta).expect("manifest must round-trip");
1948        assert_eq!(extracted.codec, original.codec);
1949        assert_eq!(extracted.original_size, original.original_size);
1950        assert_eq!(extracted.compressed_size, original.compressed_size);
1951        assert_eq!(extracted.crc32c, original.crc32c);
1952    }
1953
1954    #[test]
1955    fn missing_metadata_yields_none() {
1956        let meta: Option<Metadata> = None;
1957        assert!(extract_manifest(&meta).is_none());
1958    }
1959
1960    #[test]
1961    fn partial_metadata_yields_none() {
1962        let mut meta = Metadata::new();
1963        meta.insert(META_CODEC.into(), "cpu-zstd".into());
1964        let opt = Some(meta);
1965        assert!(extract_manifest(&opt).is_none());
1966    }
1967
1968    #[test]
1969    fn parse_copy_source_range_basic() {
1970        let r = parse_copy_source_range("bytes=10-20").unwrap();
1971        match r {
1972            s3s::dto::Range::Int { first, last } => {
1973                assert_eq!(first, 10);
1974                assert_eq!(last, Some(20));
1975            }
1976            _ => panic!("expected Int range"),
1977        }
1978    }
1979
1980    #[test]
1981    fn parse_copy_source_range_rejects_inverted() {
1982        let err = parse_copy_source_range("bytes=20-10").unwrap_err();
1983        assert!(err.contains("last < first"));
1984    }
1985
1986    #[test]
1987    fn parse_copy_source_range_rejects_missing_prefix() {
1988        let err = parse_copy_source_range("10-20").unwrap_err();
1989        assert!(err.contains("must start with 'bytes='"));
1990    }
1991
1992    #[test]
1993    fn parse_copy_source_range_rejects_open_ended() {
1994        // S3 upload_part_copy spec requires N-M (closed); suffix and
1995        // open-ended forms are not allowed for this header.
1996        assert!(parse_copy_source_range("bytes=10-").is_err());
1997        assert!(parse_copy_source_range("bytes=-10").is_err());
1998    }
1999}