Skip to main content

s4_server/
service.rs

1//! `s3s::S3` 実装 — `s3s_aws::Proxy` への delegation を default にしつつ、
2//! `put_object` / `get_object` 経路で `s4_codec::CodecRegistry` を呼ぶ。
3//!
4//! ## カバー範囲 (Phase 1 月 2)
5//!
6//! - 圧縮 hook あり: `put_object`, `get_object`
7//! - 純 delegation (圧縮なし): `head_bucket`, `list_buckets`, `create_bucket`, `delete_bucket`,
8//!   `head_object`, `delete_object`, `delete_objects`, `copy_object`, `list_objects`,
9//!   `list_objects_v2`, `create_multipart_upload`, `upload_part`,
10//!   `complete_multipart_upload`, `abort_multipart_upload`, `list_multipart_uploads`,
11//!   `list_parts`
12//! - 未対応 (デフォルトで NotImplemented): その他 80+ ops (Tagging / ACL / Lifecycle 等は Phase 2)
13//!
14//! ## アーキテクチャ
15//!
16//! - `S4Service<B>` は backend (B: S3) と `Arc<CodecRegistry>` と `Arc<dyn CodecDispatcher>`
17//!   を保持する。`CodecRegistry` 経由で複数 codec を抱えられるので、ひとつの S4 インスタンスが
18//!   複数 codec で書かれた object を透過的に GET できる
19//! - PUT: dispatcher が body の先頭 sample から codec を選び、registry で compress、
20//!   manifest を S3 metadata に書いて backend に forward
21//! - GET: backend から取得 → metadata から manifest を復元 → registry.decompress で
22//!   manifest 指定の codec で解凍 → 元の bytes を return
23//!
24//! ## 既知の制限事項
25//!
26//! - **Multipart Upload は per-part 圧縮が未実装**: 現状は upload_part を素通し。
27//!   Phase 1 月 2 後半で per-part compress + complete_multipart_upload で manifest 集約。
28//! - **PUT body は memory に collect**: max_body_bytes 上限あり (default 5 GiB = S3 単発 PUT 上限)。
29//!   Streaming-aware 圧縮は Phase 2。
30
31use std::sync::Arc;
32
33use bytes::BytesMut;
34use s3s::dto::*;
35use s3s::{S3, S3Error, S3ErrorCode, S3Request, S3Response, S3Result};
36use s4_codec::index::{FrameIndex, build_index_from_body, decode_index, encode_index, sidecar_key};
37use s4_codec::multipart::{
38    FRAME_HEADER_BYTES, FrameHeader, FrameIter, S3_MULTIPART_MIN_PART_BYTES, pad_to_minimum,
39    write_frame,
40};
41use s4_codec::{ChunkManifest, CodecDispatcher, CodecKind, CodecRegistry};
42use std::time::Instant;
43use tracing::{debug, info};
44
45use crate::blob::{
46    bytes_to_blob, chain_sample_with_rest, collect_blob, collect_with_sample, peek_sample,
47};
48use crate::streaming::{
49    cpu_zstd_decompress_stream, streaming_compress_cpu_zstd, streaming_passthrough,
50    supports_streaming_compress, supports_streaming_decompress,
51};
52
53/// PUT body の先頭 sampling で渡す最大 byte 数。
54const SAMPLE_BYTES: usize = 4096;
55
56pub struct S4Service<B: S3> {
57    backend: B,
58    registry: Arc<CodecRegistry>,
59    dispatcher: Arc<dyn CodecDispatcher>,
60    max_body_bytes: usize,
61}
62
63impl<B: S3> S4Service<B> {
64    /// AWS S3 単発 PUT の API 上限 (5 GiB)
65    pub const DEFAULT_MAX_BODY_BYTES: usize = 5 * 1024 * 1024 * 1024;
66
67    pub fn new(
68        backend: B,
69        registry: Arc<CodecRegistry>,
70        dispatcher: Arc<dyn CodecDispatcher>,
71    ) -> Self {
72        Self {
73            backend,
74            registry,
75            dispatcher,
76            max_body_bytes: Self::DEFAULT_MAX_BODY_BYTES,
77        }
78    }
79
80    #[must_use]
81    pub fn with_max_body_bytes(mut self, n: usize) -> Self {
82        self.max_body_bytes = n;
83        self
84    }
85
86    /// テスト用: backend を取り戻す (test helper、production では使わない)
87    pub fn into_backend(self) -> B {
88        self.backend
89    }
90
91    /// 必要 frame だけを backend に Range GET し、frame parse + decompress + slice
92    /// した結果を返す sidecar fast path。Range request の **帯域節約版**。
93    async fn partial_range_get(
94        &self,
95        req: &S3Request<GetObjectInput>,
96        plan: s4_codec::index::RangePlan,
97        client_start: u64,
98        client_end_exclusive: u64,
99        total_original: u64,
100        get_start: Instant,
101    ) -> S3Result<S3Response<GetObjectOutput>> {
102        // 必要 byte 範囲だけを backend に partial GET
103        let backend_range = s3s::dto::Range::Int {
104            first: plan.byte_start,
105            last: Some(plan.byte_end_exclusive - 1),
106        };
107        let backend_input = GetObjectInput {
108            bucket: req.input.bucket.clone(),
109            key: req.input.key.clone(),
110            range: Some(backend_range),
111            ..Default::default()
112        };
113        let backend_req = S3Request {
114            input: backend_input,
115            method: req.method.clone(),
116            uri: req.uri.clone(),
117            headers: req.headers.clone(),
118            extensions: http::Extensions::new(),
119            credentials: req.credentials.clone(),
120            region: req.region.clone(),
121            service: req.service.clone(),
122            trailing_headers: None,
123        };
124        let mut backend_resp = self.backend.get_object(backend_req).await?;
125        let blob = backend_resp.output.body.take().ok_or_else(|| {
126            S3Error::with_message(
127                S3ErrorCode::InternalError,
128                "backend partial GET returned empty body",
129            )
130        })?;
131        let bytes = collect_blob(blob, self.max_body_bytes)
132            .await
133            .map_err(internal("collect partial body"))?;
134
135        // frame parse + decompress
136        let mut combined = BytesMut::new();
137        for frame in FrameIter::new(bytes) {
138            let (header, payload) = frame.map_err(|e| {
139                S3Error::with_message(
140                    S3ErrorCode::InternalError,
141                    format!("partial-range frame parse: {e}"),
142                )
143            })?;
144            let chunk_manifest = ChunkManifest {
145                codec: header.codec,
146                original_size: header.original_size,
147                compressed_size: header.compressed_size,
148                crc32c: header.crc32c,
149            };
150            let decompressed = self
151                .registry
152                .decompress(payload, &chunk_manifest)
153                .await
154                .map_err(internal("partial-range decompress"))?;
155            combined.extend_from_slice(&decompressed);
156        }
157        let combined = combined.freeze();
158        let sliced = combined
159            .slice(plan.slice_start_in_combined as usize..plan.slice_end_in_combined as usize);
160
161        // response 組立て
162        let returned_size = sliced.len() as u64;
163        backend_resp.output.content_length = Some(returned_size as i64);
164        backend_resp.output.content_range = Some(format!(
165            "bytes {client_start}-{}/{total_original}",
166            client_end_exclusive - 1
167        ));
168        backend_resp.output.checksum_crc32 = None;
169        backend_resp.output.checksum_crc32c = None;
170        backend_resp.output.checksum_crc64nvme = None;
171        backend_resp.output.checksum_sha1 = None;
172        backend_resp.output.checksum_sha256 = None;
173        backend_resp.output.e_tag = None;
174        backend_resp.output.body = Some(bytes_to_blob(sliced));
175        backend_resp.status = Some(http::StatusCode::PARTIAL_CONTENT);
176
177        let elapsed = get_start.elapsed();
178        crate::metrics::record_get(
179            "partial",
180            plan.byte_end_exclusive - plan.byte_start,
181            returned_size,
182            elapsed.as_secs_f64(),
183            true,
184        );
185        info!(
186            op = "get_object",
187            bucket = %req.input.bucket,
188            key = %req.input.key,
189            bytes_in = plan.byte_end_exclusive - plan.byte_start,
190            bytes_out = returned_size,
191            total_object_size = total_original,
192            range = true,
193            path = "sidecar-partial",
194            latency_ms = elapsed.as_millis() as u64,
195            "S4 partial Range GET via sidecar index"
196        );
197        Ok(backend_resp)
198    }
199
200    /// `<key>.s4index` sidecar object を backend に書く。失敗しても本体 PUT は
201    /// 成功扱いにしたいので、err は warn ログのみ (Range GET の partial path が
202    /// 使えなくなるが、full read fallback で意味的には正しい結果を返す)。
203    async fn write_sidecar(&self, bucket: &str, key: &str, index: &FrameIndex) {
204        let bytes = encode_index(index);
205        let len = bytes.len() as i64;
206        let put_input = PutObjectInput {
207            bucket: bucket.into(),
208            key: sidecar_key(key),
209            body: Some(bytes_to_blob(bytes)),
210            content_length: Some(len),
211            content_type: Some("application/x-s4-index".into()),
212            ..Default::default()
213        };
214        let put_req = S3Request {
215            input: put_input,
216            method: http::Method::PUT,
217            uri: format!("/{bucket}/{}", sidecar_key(key)).parse().unwrap(),
218            headers: http::HeaderMap::new(),
219            extensions: http::Extensions::new(),
220            credentials: None,
221            region: None,
222            service: None,
223            trailing_headers: None,
224        };
225        if let Err(e) = self.backend.put_object(put_req).await {
226            tracing::warn!(
227                bucket,
228                key,
229                "S4 write_sidecar failed (Range GET will fall back to full read): {e}"
230            );
231        }
232    }
233
234    /// `<key>.s4index` sidecar を backend から読み出す。なければ None。
235    async fn read_sidecar(&self, bucket: &str, key: &str) -> Option<FrameIndex> {
236        let get_input = GetObjectInput {
237            bucket: bucket.into(),
238            key: sidecar_key(key),
239            ..Default::default()
240        };
241        let get_req = S3Request {
242            input: get_input,
243            method: http::Method::GET,
244            uri: format!("/{bucket}/{}", sidecar_key(key)).parse().unwrap(),
245            headers: http::HeaderMap::new(),
246            extensions: http::Extensions::new(),
247            credentials: None,
248            region: None,
249            service: None,
250            trailing_headers: None,
251        };
252        let resp = self.backend.get_object(get_req).await.ok()?;
253        let blob = resp.output.body?;
254        let bytes = collect_blob(blob, 64 * 1024 * 1024).await.ok()?;
255        decode_index(bytes).ok()
256    }
257
258    /// Multipart object (frame 列) を解凍 → 元 bytes を再構築。
259    ///
260    /// **per-frame codec dispatch**: 各 frame header に codec_id が入っているので、
261    /// frame ごとに registry が違う codec を呼ぶことができる。同一 object 内で
262    /// 異なる codec が混在していても透過的に解凍可能 (parquet 風 mixed columns 等)。
263    async fn decompress_multipart(&self, bytes: bytes::Bytes) -> S3Result<bytes::Bytes> {
264        let mut out = BytesMut::new();
265        for frame in FrameIter::new(bytes) {
266            let (header, payload) = frame.map_err(|e| {
267                S3Error::with_message(
268                    S3ErrorCode::InternalError,
269                    format!("multipart frame parse: {e}"),
270                )
271            })?;
272            let chunk_manifest = ChunkManifest {
273                codec: header.codec,
274                original_size: header.original_size,
275                compressed_size: header.compressed_size,
276                crc32c: header.crc32c,
277            };
278            let decompressed = self
279                .registry
280                .decompress(payload, &chunk_manifest)
281                .await
282                .map_err(internal("multipart frame decompress"))?;
283            out.extend_from_slice(&decompressed);
284        }
285        Ok(out.freeze())
286    }
287}
288
289fn is_multipart_object(metadata: &Option<Metadata>) -> bool {
290    metadata
291        .as_ref()
292        .and_then(|m| m.get(META_MULTIPART))
293        .map(|v| v == "true")
294        .unwrap_or(false)
295}
296
297const META_CODEC: &str = "s4-codec";
298const META_ORIGINAL_SIZE: &str = "s4-original-size";
299const META_COMPRESSED_SIZE: &str = "s4-compressed-size";
300const META_CRC32C: &str = "s4-crc32c";
301/// Multipart upload で per-part frame format を使ったオブジェクトであることを示す。
302/// GET 時にこの flag を見て frame parser を起動する。
303const META_MULTIPART: &str = "s4-multipart";
304
305fn extract_manifest(metadata: &Option<Metadata>) -> Option<ChunkManifest> {
306    let m = metadata.as_ref()?;
307    let codec = m
308        .get(META_CODEC)
309        .and_then(|s| s.parse::<CodecKind>().ok())?;
310    let original_size = m.get(META_ORIGINAL_SIZE)?.parse().ok()?;
311    let compressed_size = m.get(META_COMPRESSED_SIZE)?.parse().ok()?;
312    let crc32c = m.get(META_CRC32C)?.parse().ok()?;
313    Some(ChunkManifest {
314        codec,
315        original_size,
316        compressed_size,
317        crc32c,
318    })
319}
320
321fn write_manifest(metadata: &mut Option<Metadata>, manifest: &ChunkManifest) {
322    let meta = metadata.get_or_insert_with(Default::default);
323    meta.insert(META_CODEC.into(), manifest.codec.as_str().into());
324    meta.insert(
325        META_ORIGINAL_SIZE.into(),
326        manifest.original_size.to_string(),
327    );
328    meta.insert(
329        META_COMPRESSED_SIZE.into(),
330        manifest.compressed_size.to_string(),
331    );
332    meta.insert(META_CRC32C.into(), manifest.crc32c.to_string());
333}
334
335fn internal<E: std::fmt::Display>(prefix: &'static str) -> impl FnOnce(E) -> S3Error {
336    move |e| S3Error::with_message(S3ErrorCode::InternalError, format!("{prefix}: {e}"))
337}
338
339/// `Range` request を decompressed object サイズ `total` に適用して `(start, end_exclusive)`
340/// を返す。`Range::Int { first, last }` は `bytes=first-last` (last は inclusive)、
341/// `Range::Suffix { length }` は末尾 `length` byte。S3 仕様に準拠。
342pub fn resolve_range(range: &s3s::dto::Range, total: u64) -> Result<(u64, u64), String> {
343    if total == 0 {
344        return Err("cannot range-get zero-length object".into());
345    }
346    match range {
347        s3s::dto::Range::Int { first, last } => {
348            let start = *first;
349            let end_inclusive = match last {
350                Some(l) => (*l).min(total - 1),
351                None => total - 1,
352            };
353            if start > end_inclusive || start >= total {
354                return Err(format!(
355                    "range bytes={start}-{:?} out of object size {total}",
356                    last
357                ));
358            }
359            Ok((start, end_inclusive + 1))
360        }
361        s3s::dto::Range::Suffix { length } => {
362            let len = (*length).min(total);
363            Ok((total - len, total))
364        }
365    }
366}
367
368#[async_trait::async_trait]
369impl<B: S3> S3 for S4Service<B> {
370    // === 圧縮を挟む path (PUT) ===
371    #[tracing::instrument(
372        name = "s4.put_object",
373        skip(self, req),
374        fields(bucket = %req.input.bucket, key = %req.input.key, codec, bytes_in, bytes_out, latency_ms)
375    )]
376    async fn put_object(
377        &self,
378        mut req: S3Request<PutObjectInput>,
379    ) -> S3Result<S3Response<PutObjectOutput>> {
380        let put_start = Instant::now();
381        let put_bucket = req.input.bucket.clone();
382        let put_key = req.input.key.clone();
383        if let Some(blob) = req.input.body.take() {
384            // Sample 4 KiB から codec を決定。streaming-aware codec なら streaming
385            // compress fast path、そうでなければ従来の collect-then-compress。
386            let (sample, rest_stream) = peek_sample(blob, SAMPLE_BYTES)
387                .await
388                .map_err(internal("peek put sample"))?;
389            let sample_len = sample.len().min(SAMPLE_BYTES);
390            let kind = self.dispatcher.pick(&sample[..sample_len]).await;
391
392            let (compressed, manifest) = if supports_streaming_compress(kind) {
393                // streaming fast path: input は memory に collect しない
394                let chained = chain_sample_with_rest(sample, rest_stream);
395                debug!(
396                    bucket = ?req.input.bucket,
397                    key = ?req.input.key,
398                    codec = kind.as_str(),
399                    path = "streaming",
400                    "S4 put_object: compressing (streaming)"
401                );
402                match kind {
403                    CodecKind::CpuZstd => {
404                        // dispatcher の default zstd level (3) を使う。CLI で
405                        // `--zstd-level` を変えても registry 内の codec を
406                        // 経由せず streaming に分岐するため反映されない。
407                        // Phase 2.1 で `streaming_compress` への level 注入を検討
408                        streaming_compress_cpu_zstd(chained, 3).await
409                    }
410                    CodecKind::Passthrough => streaming_passthrough(chained).await,
411                    other => unreachable!("supports_streaming_compress lied: {other:?}"),
412                }
413                .map_err(internal("streaming compress"))?
414            } else {
415                // GPU codec 等は batch API なので bytes-buffered path
416                let bytes = collect_with_sample(sample, rest_stream, self.max_body_bytes)
417                    .await
418                    .map_err(internal("collect put body (gpu path)"))?;
419                debug!(
420                    bucket = ?req.input.bucket,
421                    key = ?req.input.key,
422                    bytes = bytes.len(),
423                    codec = kind.as_str(),
424                    path = "buffered",
425                    "S4 put_object: compressing (buffered)"
426                );
427                self.registry
428                    .compress(bytes, kind)
429                    .await
430                    .map_err(internal("registry compress"))?
431            };
432
433            write_manifest(&mut req.input.metadata, &manifest);
434            // 重要: content_length を圧縮後サイズで更新する。
435            // これを忘れると下流 (aws-sdk-s3 → S3) が宣言サイズ分の bytes を
436            // 待ち続けて RequestTimeout で失敗する (S3 仕様)。
437            req.input.content_length = Some(compressed.len() as i64);
438            // body を書き換えたので、客側が送ってきた original body 用の
439            // checksum / MD5 ヘッダは無効化する (そのまま転送すると下流 S3 が
440            // XAmzContentChecksumMismatch を返す)。S4 自身の整合性は
441            // ChunkManifest.crc32c で担保している。
442            req.input.checksum_algorithm = None;
443            req.input.checksum_crc32 = None;
444            req.input.checksum_crc32c = None;
445            req.input.checksum_crc64nvme = None;
446            req.input.checksum_sha1 = None;
447            req.input.checksum_sha256 = None;
448            req.input.content_md5 = None;
449            let original_size = manifest.original_size;
450            let compressed_size = manifest.compressed_size;
451            let codec_label = manifest.codec.as_str();
452            req.input.body = Some(bytes_to_blob(compressed));
453            let backend_resp = self.backend.put_object(req).await;
454            // 単発 PUT は frame 化していない (raw 圧縮 bytes をそのまま流す) ので
455            // sidecar は書かない。書いても partial-range fast path で
456            // S4F2 magic が見つからず failure する。Phase 2.2 で non-multipart も
457            // 全て frame 化に統一すれば sidecar を書ける (今は multipart 限定)
458            let _ = (original_size, compressed_size); // mute unused warnings
459            let elapsed = put_start.elapsed();
460            crate::metrics::record_put(
461                codec_label,
462                original_size,
463                compressed_size,
464                elapsed.as_secs_f64(),
465                backend_resp.is_ok(),
466            );
467            info!(
468                op = "put_object",
469                bucket = %put_bucket,
470                key = %put_key,
471                codec = codec_label,
472                bytes_in = original_size,
473                bytes_out = compressed_size,
474                ratio = format!(
475                    "{:.3}",
476                    if original_size == 0 { 1.0 } else { compressed_size as f64 / original_size as f64 }
477                ),
478                latency_ms = elapsed.as_millis() as u64,
479                ok = backend_resp.is_ok(),
480                "S4 put completed"
481            );
482            return backend_resp;
483        }
484        self.backend.put_object(req).await
485    }
486
487    // === 圧縮を解く path (GET) ===
488    #[tracing::instrument(
489        name = "s4.get_object",
490        skip(self, req),
491        fields(bucket = %req.input.bucket, key = %req.input.key, codec, bytes_out, range, path)
492    )]
493    async fn get_object(
494        &self,
495        mut req: S3Request<GetObjectInput>,
496    ) -> S3Result<S3Response<GetObjectOutput>> {
497        let get_start = Instant::now();
498        let get_bucket = req.input.bucket.clone();
499        let get_key = req.input.key.clone();
500        // Range request の事前検出 (decompress 後 slice する path に使う)。
501        let range_request = req.input.range.take();
502
503        // ====== Range GET の partial-fetch fast path (sidecar index 利用) ======
504        // sidecar `<key>.s4index` が存在し、multipart-framed object であれば
505        // 必要 frame だけを backend に Range GET し帯域節約する。
506        if let Some(ref r) = range_request
507            && let Some(index) = self.read_sidecar(&req.input.bucket, &req.input.key).await
508        {
509            let total = index.total_original_size();
510            let (start, end_exclusive) = match resolve_range(r, total) {
511                Ok(v) => v,
512                Err(e) => {
513                    return Err(S3Error::with_message(S3ErrorCode::InvalidRange, e));
514                }
515            };
516            if let Some(plan) = index.lookup_range(start, end_exclusive) {
517                return self
518                    .partial_range_get(&req, plan, start, end_exclusive, total, get_start)
519                    .await;
520            }
521        }
522        let mut resp = self.backend.get_object(req).await?;
523        let is_multipart = is_multipart_object(&resp.output.metadata);
524        let manifest_opt = extract_manifest(&resp.output.metadata);
525
526        if !is_multipart && manifest_opt.is_none() {
527            // S4 が書いていないオブジェクトは透過 (raw bucket pre-existing object 等)
528            debug!("S4 get_object: object lacks s4-codec metadata, returning as-is");
529            return Ok(resp);
530        }
531
532        if let Some(blob) = resp.output.body.take() {
533            // ====== Streaming fast path (CpuZstd, non-multipart, codec supports it) ======
534            // 大規模 object (e.g. 5 GB) を memory に collect すると OOM するので、
535            // codec が streaming-aware なら body を chunk-by-chunk で decompress して
536            // 即座に client に流す。
537            //
538            // ただし Range request 時は streaming できない (slice するため total bytes
539            // が必要) → buffered path に fall through。
540            if range_request.is_none()
541                && !is_multipart
542                && let Some(ref m) = manifest_opt
543                && supports_streaming_decompress(m.codec)
544                && m.codec == CodecKind::CpuZstd
545            {
546                let decompressed_blob = cpu_zstd_decompress_stream(blob);
547                resp.output.content_length = Some(m.original_size as i64);
548                resp.output.checksum_crc32 = None;
549                resp.output.checksum_crc32c = None;
550                resp.output.checksum_crc64nvme = None;
551                resp.output.checksum_sha1 = None;
552                resp.output.checksum_sha256 = None;
553                resp.output.e_tag = None;
554                resp.output.body = Some(decompressed_blob);
555                let elapsed = get_start.elapsed();
556                crate::metrics::record_get(
557                    m.codec.as_str(),
558                    m.compressed_size,
559                    m.original_size,
560                    elapsed.as_secs_f64(),
561                    true,
562                );
563                info!(
564                    op = "get_object",
565                    bucket = %get_bucket,
566                    key = %get_key,
567                    codec = m.codec.as_str(),
568                    bytes_in = m.compressed_size,
569                    bytes_out = m.original_size,
570                    path = "streaming",
571                    setup_latency_ms = elapsed.as_millis() as u64,
572                    "S4 get started (streaming)"
573                );
574                return Ok(resp);
575            }
576            // Passthrough: そのまま流す (Range なしの場合のみ streaming)
577            if range_request.is_none()
578                && !is_multipart
579                && let Some(ref m) = manifest_opt
580                && m.codec == CodecKind::Passthrough
581            {
582                resp.output.content_length = Some(m.original_size as i64);
583                resp.output.checksum_crc32 = None;
584                resp.output.checksum_crc32c = None;
585                resp.output.checksum_crc64nvme = None;
586                resp.output.checksum_sha1 = None;
587                resp.output.checksum_sha256 = None;
588                resp.output.e_tag = None;
589                resp.output.body = Some(blob);
590                debug!("S4 get_object: passthrough streaming");
591                return Ok(resp);
592            }
593
594            // ====== Buffered slow path (multipart frame parser, GPU codecs) ======
595            let bytes = collect_blob(blob, self.max_body_bytes)
596                .await
597                .map_err(internal("collect get body"))?;
598
599            let decompressed = if is_multipart {
600                self.decompress_multipart(bytes).await?
601            } else {
602                let manifest = manifest_opt.as_ref().expect("non-multipart guarded above");
603                self.registry
604                    .decompress(bytes, manifest)
605                    .await
606                    .map_err(internal("registry decompress"))?
607            };
608
609            // Range request があれば slice。なければ full body を返す。
610            let total_size = decompressed.len() as u64;
611            let (final_bytes, status_override) = if let Some(r) = range_request.as_ref() {
612                let (start, end) = resolve_range(r, total_size)
613                    .map_err(|e| S3Error::with_message(S3ErrorCode::InvalidRange, e))?;
614                let sliced = decompressed.slice(start as usize..end as usize);
615                resp.output.content_range = Some(format!(
616                    "bytes {start}-{}/{total_size}",
617                    end.saturating_sub(1)
618                ));
619                (sliced, Some(http::StatusCode::PARTIAL_CONTENT))
620            } else {
621                (decompressed, None)
622            };
623            // 解凍後の真のサイズを返す (S3 client は content_length を信頼するので
624            // 圧縮 size のままだと downstream が body を途中で切ってしまう)
625            resp.output.content_length = Some(final_bytes.len() as i64);
626            // 圧縮済 bytes の checksum を返すと AWS SDK 側で StreamingError
627            // (ChecksumMismatch) になる。ETag も backend が返した「圧縮済 bytes の
628            // MD5/checksum」なので意味的にズレる — クリアして S4 自身の crc32c
629            // (manifest 内 / frame 内) で integrity を保証する設計にする。
630            resp.output.checksum_crc32 = None;
631            resp.output.checksum_crc32c = None;
632            resp.output.checksum_crc64nvme = None;
633            resp.output.checksum_sha1 = None;
634            resp.output.checksum_sha256 = None;
635            resp.output.e_tag = None;
636            let returned_size = final_bytes.len() as u64;
637            let codec_label = manifest_opt
638                .as_ref()
639                .map(|m| m.codec.as_str())
640                .unwrap_or("multipart");
641            resp.output.body = Some(bytes_to_blob(final_bytes));
642            if let Some(status) = status_override {
643                resp.status = Some(status);
644            }
645            let elapsed = get_start.elapsed();
646            crate::metrics::record_get(codec_label, 0, returned_size, elapsed.as_secs_f64(), true);
647            info!(
648                op = "get_object",
649                bucket = %get_bucket,
650                key = %get_key,
651                codec = codec_label,
652                bytes_out = returned_size,
653                total_object_size = total_size,
654                range = range_request.is_some(),
655                path = "buffered",
656                latency_ms = elapsed.as_millis() as u64,
657                "S4 get completed (buffered)"
658            );
659        }
660        Ok(resp)
661    }
662
663    // === passthrough delegations ===
664    async fn head_bucket(
665        &self,
666        req: S3Request<HeadBucketInput>,
667    ) -> S3Result<S3Response<HeadBucketOutput>> {
668        self.backend.head_bucket(req).await
669    }
670    async fn list_buckets(
671        &self,
672        req: S3Request<ListBucketsInput>,
673    ) -> S3Result<S3Response<ListBucketsOutput>> {
674        self.backend.list_buckets(req).await
675    }
676    async fn create_bucket(
677        &self,
678        req: S3Request<CreateBucketInput>,
679    ) -> S3Result<S3Response<CreateBucketOutput>> {
680        self.backend.create_bucket(req).await
681    }
682    async fn delete_bucket(
683        &self,
684        req: S3Request<DeleteBucketInput>,
685    ) -> S3Result<S3Response<DeleteBucketOutput>> {
686        self.backend.delete_bucket(req).await
687    }
688    async fn head_object(
689        &self,
690        req: S3Request<HeadObjectInput>,
691    ) -> S3Result<S3Response<HeadObjectOutput>> {
692        let mut resp = self.backend.head_object(req).await?;
693        if let Some(manifest) = extract_manifest(&resp.output.metadata) {
694            // 客側には decompress 後の意味のある content_length / checksum を返す。
695            // backend が返す圧縮済 bytes の checksum / e_tag は意味が違うため除去
696            // (S4 は manifest 内の crc32c で integrity を担保する)。
697            resp.output.content_length = Some(manifest.original_size as i64);
698            resp.output.checksum_crc32 = None;
699            resp.output.checksum_crc32c = None;
700            resp.output.checksum_crc64nvme = None;
701            resp.output.checksum_sha1 = None;
702            resp.output.checksum_sha256 = None;
703            resp.output.e_tag = None;
704        }
705        Ok(resp)
706    }
707    async fn delete_object(
708        &self,
709        req: S3Request<DeleteObjectInput>,
710    ) -> S3Result<S3Response<DeleteObjectOutput>> {
711        // sidecar も best-effort で削除 (失敗は無視 — 存在しない場合や IAM 制限を許容)
712        let bucket = req.input.bucket.clone();
713        let key = req.input.key.clone();
714        let resp = self.backend.delete_object(req).await?;
715        let sidecar_input = DeleteObjectInput {
716            bucket: bucket.clone(),
717            key: sidecar_key(&key),
718            ..Default::default()
719        };
720        let sidecar_req = S3Request {
721            input: sidecar_input,
722            method: http::Method::DELETE,
723            uri: format!("/{bucket}/{}", sidecar_key(&key)).parse().unwrap(),
724            headers: http::HeaderMap::new(),
725            extensions: http::Extensions::new(),
726            credentials: None,
727            region: None,
728            service: None,
729            trailing_headers: None,
730        };
731        let _ = self.backend.delete_object(sidecar_req).await;
732        Ok(resp)
733    }
734    async fn delete_objects(
735        &self,
736        req: S3Request<DeleteObjectsInput>,
737    ) -> S3Result<S3Response<DeleteObjectsOutput>> {
738        self.backend.delete_objects(req).await
739    }
740    async fn copy_object(
741        &self,
742        mut req: S3Request<CopyObjectInput>,
743    ) -> S3Result<S3Response<CopyObjectOutput>> {
744        // S4-aware copy: source object に s4-* metadata がある場合、それを
745        // destination に確実に preserve する。
746        //
747        // - MetadataDirective::COPY (default): backend が source metadata を
748        //   そのまま copy するので S4 metadata も自動で渡る。介入不要
749        // - MetadataDirective::REPLACE: 客が指定した metadata で source を
750        //   上書き → s4-* metadata が消えると destination は decompress 不能に
751        //   なる (silent corruption)。S4 が source metadata を HEAD で取得し、
752        //   s4-* fields を input.metadata に強制 merge する
753        let needs_merge = req
754            .input
755            .metadata_directive
756            .as_ref()
757            .map(|d| d.as_str() == MetadataDirective::REPLACE)
758            .unwrap_or(false);
759        if needs_merge && let CopySource::Bucket { bucket, key, .. } = &req.input.copy_source {
760            let head_input = HeadObjectInput {
761                bucket: bucket.to_string(),
762                key: key.to_string(),
763                ..Default::default()
764            };
765            let head_req = S3Request {
766                input: head_input,
767                method: req.method.clone(),
768                uri: req.uri.clone(),
769                headers: req.headers.clone(),
770                extensions: http::Extensions::new(),
771                credentials: req.credentials.clone(),
772                region: req.region.clone(),
773                service: req.service.clone(),
774                trailing_headers: None,
775            };
776            if let Ok(head) = self.backend.head_object(head_req).await
777                && let Some(src_meta) = head.output.metadata.as_ref()
778            {
779                let dest_meta = req.input.metadata.get_or_insert_with(Default::default);
780                for key in [
781                    META_CODEC,
782                    META_ORIGINAL_SIZE,
783                    META_COMPRESSED_SIZE,
784                    META_CRC32C,
785                    META_MULTIPART,
786                ] {
787                    if let Some(v) = src_meta.get(key) {
788                        // 客が同じ key を指定していたら preserve しない (= 上書き許可)
789                        // していたら何もしない。指定していなければ insert
790                        dest_meta
791                            .entry(key.to_string())
792                            .or_insert_with(|| v.clone());
793                    }
794                }
795                debug!(
796                    src_bucket = %bucket,
797                    src_key = %key,
798                    "S4 copy_object: preserved s4-* metadata across REPLACE directive"
799                );
800            }
801        }
802        self.backend.copy_object(req).await
803    }
804    async fn list_objects(
805        &self,
806        req: S3Request<ListObjectsInput>,
807    ) -> S3Result<S3Response<ListObjectsOutput>> {
808        let mut resp = self.backend.list_objects(req).await?;
809        // S4 内部 object (`*.s4index` sidecar 等) を顧客から隠す
810        if let Some(contents) = resp.output.contents.as_mut() {
811            contents.retain(|o| {
812                o.key
813                    .as_ref()
814                    .map(|k| !k.ends_with(".s4index"))
815                    .unwrap_or(true)
816            });
817        }
818        Ok(resp)
819    }
820    async fn list_objects_v2(
821        &self,
822        req: S3Request<ListObjectsV2Input>,
823    ) -> S3Result<S3Response<ListObjectsV2Output>> {
824        let mut resp = self.backend.list_objects_v2(req).await?;
825        if let Some(contents) = resp.output.contents.as_mut() {
826            let before = contents.len();
827            contents.retain(|o| {
828                o.key
829                    .as_ref()
830                    .map(|k| !k.ends_with(".s4index"))
831                    .unwrap_or(true)
832            });
833            // key_count も補正 (S3 spec compliance)
834            if let Some(kc) = resp.output.key_count.as_mut() {
835                *kc -= (before - contents.len()) as i32;
836            }
837        }
838        Ok(resp)
839    }
840    async fn create_multipart_upload(
841        &self,
842        mut req: S3Request<CreateMultipartUploadInput>,
843    ) -> S3Result<S3Response<CreateMultipartUploadOutput>> {
844        // Multipart object は per-part 圧縮 + frame 形式で書く。GET 時に
845        // frame parse を起動するため、object metadata に flag を立てる。
846        // codec は dispatcher の default kind を採用 (per-part 別 codec は Phase 2)。
847        let codec_kind = self.registry.default_kind();
848        let meta = req.input.metadata.get_or_insert_with(Default::default);
849        meta.insert(META_MULTIPART.into(), "true".into());
850        meta.insert(META_CODEC.into(), codec_kind.as_str().into());
851        debug!(
852            bucket = ?req.input.bucket,
853            key = ?req.input.key,
854            codec = codec_kind.as_str(),
855            "S4 create_multipart_upload: marking object for per-part compression"
856        );
857        self.backend.create_multipart_upload(req).await
858    }
859
860    async fn upload_part(
861        &self,
862        mut req: S3Request<UploadPartInput>,
863    ) -> S3Result<S3Response<UploadPartOutput>> {
864        // 各 part を圧縮して frame header 付きで forward。GET 時に
865        // `decompress_multipart` が frame iter で順に解凍する。
866        // **per-part codec dispatch**: dispatcher が body 先頭 sample から
867        // codec を選ぶので、parquet 風の mixed-content multipart で part ごとに
868        // 最適 codec を使える (整数列 part → Bitcomp、text 列 part → zstd 等)。
869        if let Some(blob) = req.input.body.take() {
870            let bytes = collect_blob(blob, self.max_body_bytes)
871                .await
872                .map_err(internal("collect upload_part body"))?;
873            let sample_len = bytes.len().min(SAMPLE_BYTES);
874            let codec_kind = self.dispatcher.pick(&bytes[..sample_len]).await;
875            let original_size = bytes.len() as u64;
876            let (compressed, manifest) = self
877                .registry
878                .compress(bytes, codec_kind)
879                .await
880                .map_err(internal("registry compress part"))?;
881            let header = FrameHeader {
882                codec: codec_kind,
883                original_size,
884                compressed_size: compressed.len() as u64,
885                crc32c: manifest.crc32c,
886            };
887            let mut framed = BytesMut::with_capacity(FRAME_HEADER_BYTES + compressed.len());
888            write_frame(&mut framed, header, &compressed);
889            // S3 multipart の non-final part 最小サイズ (5 MiB) を満たすため
890            // padding frame を追加。FrameIter が S4P1 padding を skip する。
891            // 注: 最終 part も常に pad してしまっているが、最終 part だけ pad しない
892            // 最適化は S4Service が「最終 part か」を upload_part 時点で知れない
893            // ため Phase 2 (CompleteMultipartUpload で trim) で対応。
894            pad_to_minimum(&mut framed, S3_MULTIPART_MIN_PART_BYTES);
895            let framed_bytes = framed.freeze();
896            let new_len = framed_bytes.len() as i64;
897            // 同じ wire 互換問題が multipart にもある (content-length / checksum)
898            req.input.content_length = Some(new_len);
899            req.input.checksum_algorithm = None;
900            req.input.checksum_crc32 = None;
901            req.input.checksum_crc32c = None;
902            req.input.checksum_crc64nvme = None;
903            req.input.checksum_sha1 = None;
904            req.input.checksum_sha256 = None;
905            req.input.content_md5 = None;
906            req.input.body = Some(bytes_to_blob(framed_bytes));
907            debug!(
908                part_number = ?req.input.part_number,
909                upload_id = ?req.input.upload_id,
910                original_size,
911                framed_size = new_len,
912                "S4 upload_part: framed compressed payload"
913            );
914        }
915        self.backend.upload_part(req).await
916    }
917    async fn complete_multipart_upload(
918        &self,
919        req: S3Request<CompleteMultipartUploadInput>,
920    ) -> S3Result<S3Response<CompleteMultipartUploadOutput>> {
921        let bucket = req.input.bucket.clone();
922        let key = req.input.key.clone();
923        let resp = self.backend.complete_multipart_upload(req).await?;
924        // CompleteMultipartUpload 成功 → 完成した object を full fetch して frame
925        // index を build、`<key>.s4index` sidecar として保存。これで Range GET の
926        // partial fetch path が利用可能になる (Range request の帯域節約)。
927        // 注: 巨大 object の場合この pass は重いが、Range query は一度 sidecar が
928        // できれば爆速になるので 1 回の cost は payback される
929        let bucket_clone = bucket.clone();
930        let key_clone = key.clone();
931        let get_input = GetObjectInput {
932            bucket: bucket_clone.clone(),
933            key: key_clone.clone(),
934            ..Default::default()
935        };
936        let get_req = S3Request {
937            input: get_input,
938            method: http::Method::GET,
939            uri: format!("/{bucket_clone}/{key_clone}").parse().unwrap(),
940            headers: http::HeaderMap::new(),
941            extensions: http::Extensions::new(),
942            credentials: None,
943            region: None,
944            service: None,
945            trailing_headers: None,
946        };
947        if let Ok(get_resp) = self.backend.get_object(get_req).await
948            && let Some(blob) = get_resp.output.body
949            && let Ok(body) = collect_blob(blob, self.max_body_bytes).await
950            && let Ok(index) = build_index_from_body(&body)
951        {
952            self.write_sidecar(&bucket, &key, &index).await;
953        }
954        Ok(resp)
955    }
956    async fn abort_multipart_upload(
957        &self,
958        req: S3Request<AbortMultipartUploadInput>,
959    ) -> S3Result<S3Response<AbortMultipartUploadOutput>> {
960        self.backend.abort_multipart_upload(req).await
961    }
962    async fn list_multipart_uploads(
963        &self,
964        req: S3Request<ListMultipartUploadsInput>,
965    ) -> S3Result<S3Response<ListMultipartUploadsOutput>> {
966        self.backend.list_multipart_uploads(req).await
967    }
968    async fn list_parts(
969        &self,
970        req: S3Request<ListPartsInput>,
971    ) -> S3Result<S3Response<ListPartsOutput>> {
972        self.backend.list_parts(req).await
973    }
974
975    // =========================================================================
976    // Phase 2 — pure passthrough delegations。S4 はこれらに対して圧縮 hook を
977    // 持たないので、backend (= AWS S3) の動作と完全に同一。
978    //
979    // 既知の制限事項:
980    // - copy_object / upload_part_copy: source object が S4-compressed の場合、
981    //   backend が bytes を copy するだけなので metadata (s4-codec etc) も一緒に
982    //   coppied される (AWS S3 default = MetadataDirective COPY)。GET は manifest
983    //   経由で正しく decompress できる。MetadataDirective REPLACE で上書き
984    //   されると圧縮 metadata が消えて壊れる — 顧客側の運用で注意
985    // - list_object_versions: versioning enabled bucket では各 version も S4
986    //   metadata を維持する。古い version も S4 経由で正しく GET できる。
987    // =========================================================================
988
989    // ---- Object ACL / tagging / attributes ----
990    async fn get_object_acl(
991        &self,
992        req: S3Request<GetObjectAclInput>,
993    ) -> S3Result<S3Response<GetObjectAclOutput>> {
994        self.backend.get_object_acl(req).await
995    }
996    async fn put_object_acl(
997        &self,
998        req: S3Request<PutObjectAclInput>,
999    ) -> S3Result<S3Response<PutObjectAclOutput>> {
1000        self.backend.put_object_acl(req).await
1001    }
1002    async fn get_object_tagging(
1003        &self,
1004        req: S3Request<GetObjectTaggingInput>,
1005    ) -> S3Result<S3Response<GetObjectTaggingOutput>> {
1006        self.backend.get_object_tagging(req).await
1007    }
1008    async fn put_object_tagging(
1009        &self,
1010        req: S3Request<PutObjectTaggingInput>,
1011    ) -> S3Result<S3Response<PutObjectTaggingOutput>> {
1012        self.backend.put_object_tagging(req).await
1013    }
1014    async fn delete_object_tagging(
1015        &self,
1016        req: S3Request<DeleteObjectTaggingInput>,
1017    ) -> S3Result<S3Response<DeleteObjectTaggingOutput>> {
1018        self.backend.delete_object_tagging(req).await
1019    }
1020    async fn get_object_attributes(
1021        &self,
1022        req: S3Request<GetObjectAttributesInput>,
1023    ) -> S3Result<S3Response<GetObjectAttributesOutput>> {
1024        self.backend.get_object_attributes(req).await
1025    }
1026    async fn restore_object(
1027        &self,
1028        req: S3Request<RestoreObjectInput>,
1029    ) -> S3Result<S3Response<RestoreObjectOutput>> {
1030        self.backend.restore_object(req).await
1031    }
1032    async fn upload_part_copy(
1033        &self,
1034        req: S3Request<UploadPartCopyInput>,
1035    ) -> S3Result<S3Response<UploadPartCopyOutput>> {
1036        // 注: source が S4-compressed の場合、bytes の partial copy は壊れる。
1037        //     S3 spec の仕様上 byte-range で copy できるが、S4 の compress block
1038        //     boundary とは無関係。Phase 2 で per-part 圧縮を入れた後に再考。
1039        self.backend.upload_part_copy(req).await
1040    }
1041
1042    // ---- Object lock / retention / legal hold ----
1043    async fn get_object_lock_configuration(
1044        &self,
1045        req: S3Request<GetObjectLockConfigurationInput>,
1046    ) -> S3Result<S3Response<GetObjectLockConfigurationOutput>> {
1047        self.backend.get_object_lock_configuration(req).await
1048    }
1049    async fn put_object_lock_configuration(
1050        &self,
1051        req: S3Request<PutObjectLockConfigurationInput>,
1052    ) -> S3Result<S3Response<PutObjectLockConfigurationOutput>> {
1053        self.backend.put_object_lock_configuration(req).await
1054    }
1055    async fn get_object_legal_hold(
1056        &self,
1057        req: S3Request<GetObjectLegalHoldInput>,
1058    ) -> S3Result<S3Response<GetObjectLegalHoldOutput>> {
1059        self.backend.get_object_legal_hold(req).await
1060    }
1061    async fn put_object_legal_hold(
1062        &self,
1063        req: S3Request<PutObjectLegalHoldInput>,
1064    ) -> S3Result<S3Response<PutObjectLegalHoldOutput>> {
1065        self.backend.put_object_legal_hold(req).await
1066    }
1067    async fn get_object_retention(
1068        &self,
1069        req: S3Request<GetObjectRetentionInput>,
1070    ) -> S3Result<S3Response<GetObjectRetentionOutput>> {
1071        self.backend.get_object_retention(req).await
1072    }
1073    async fn put_object_retention(
1074        &self,
1075        req: S3Request<PutObjectRetentionInput>,
1076    ) -> S3Result<S3Response<PutObjectRetentionOutput>> {
1077        self.backend.put_object_retention(req).await
1078    }
1079
1080    // ---- Versioning ----
1081    async fn list_object_versions(
1082        &self,
1083        req: S3Request<ListObjectVersionsInput>,
1084    ) -> S3Result<S3Response<ListObjectVersionsOutput>> {
1085        self.backend.list_object_versions(req).await
1086    }
1087    async fn get_bucket_versioning(
1088        &self,
1089        req: S3Request<GetBucketVersioningInput>,
1090    ) -> S3Result<S3Response<GetBucketVersioningOutput>> {
1091        self.backend.get_bucket_versioning(req).await
1092    }
1093    async fn put_bucket_versioning(
1094        &self,
1095        req: S3Request<PutBucketVersioningInput>,
1096    ) -> S3Result<S3Response<PutBucketVersioningOutput>> {
1097        self.backend.put_bucket_versioning(req).await
1098    }
1099
1100    // ---- Bucket location ----
1101    async fn get_bucket_location(
1102        &self,
1103        req: S3Request<GetBucketLocationInput>,
1104    ) -> S3Result<S3Response<GetBucketLocationOutput>> {
1105        self.backend.get_bucket_location(req).await
1106    }
1107
1108    // ---- Bucket policy ----
1109    async fn get_bucket_policy(
1110        &self,
1111        req: S3Request<GetBucketPolicyInput>,
1112    ) -> S3Result<S3Response<GetBucketPolicyOutput>> {
1113        self.backend.get_bucket_policy(req).await
1114    }
1115    async fn put_bucket_policy(
1116        &self,
1117        req: S3Request<PutBucketPolicyInput>,
1118    ) -> S3Result<S3Response<PutBucketPolicyOutput>> {
1119        self.backend.put_bucket_policy(req).await
1120    }
1121    async fn delete_bucket_policy(
1122        &self,
1123        req: S3Request<DeleteBucketPolicyInput>,
1124    ) -> S3Result<S3Response<DeleteBucketPolicyOutput>> {
1125        self.backend.delete_bucket_policy(req).await
1126    }
1127    async fn get_bucket_policy_status(
1128        &self,
1129        req: S3Request<GetBucketPolicyStatusInput>,
1130    ) -> S3Result<S3Response<GetBucketPolicyStatusOutput>> {
1131        self.backend.get_bucket_policy_status(req).await
1132    }
1133
1134    // ---- Bucket ACL ----
1135    async fn get_bucket_acl(
1136        &self,
1137        req: S3Request<GetBucketAclInput>,
1138    ) -> S3Result<S3Response<GetBucketAclOutput>> {
1139        self.backend.get_bucket_acl(req).await
1140    }
1141    async fn put_bucket_acl(
1142        &self,
1143        req: S3Request<PutBucketAclInput>,
1144    ) -> S3Result<S3Response<PutBucketAclOutput>> {
1145        self.backend.put_bucket_acl(req).await
1146    }
1147
1148    // ---- Bucket CORS ----
1149    async fn get_bucket_cors(
1150        &self,
1151        req: S3Request<GetBucketCorsInput>,
1152    ) -> S3Result<S3Response<GetBucketCorsOutput>> {
1153        self.backend.get_bucket_cors(req).await
1154    }
1155    async fn put_bucket_cors(
1156        &self,
1157        req: S3Request<PutBucketCorsInput>,
1158    ) -> S3Result<S3Response<PutBucketCorsOutput>> {
1159        self.backend.put_bucket_cors(req).await
1160    }
1161    async fn delete_bucket_cors(
1162        &self,
1163        req: S3Request<DeleteBucketCorsInput>,
1164    ) -> S3Result<S3Response<DeleteBucketCorsOutput>> {
1165        self.backend.delete_bucket_cors(req).await
1166    }
1167
1168    // ---- Bucket lifecycle ----
1169    async fn get_bucket_lifecycle_configuration(
1170        &self,
1171        req: S3Request<GetBucketLifecycleConfigurationInput>,
1172    ) -> S3Result<S3Response<GetBucketLifecycleConfigurationOutput>> {
1173        self.backend.get_bucket_lifecycle_configuration(req).await
1174    }
1175    async fn put_bucket_lifecycle_configuration(
1176        &self,
1177        req: S3Request<PutBucketLifecycleConfigurationInput>,
1178    ) -> S3Result<S3Response<PutBucketLifecycleConfigurationOutput>> {
1179        self.backend.put_bucket_lifecycle_configuration(req).await
1180    }
1181    async fn delete_bucket_lifecycle(
1182        &self,
1183        req: S3Request<DeleteBucketLifecycleInput>,
1184    ) -> S3Result<S3Response<DeleteBucketLifecycleOutput>> {
1185        self.backend.delete_bucket_lifecycle(req).await
1186    }
1187
1188    // ---- Bucket tagging ----
1189    async fn get_bucket_tagging(
1190        &self,
1191        req: S3Request<GetBucketTaggingInput>,
1192    ) -> S3Result<S3Response<GetBucketTaggingOutput>> {
1193        self.backend.get_bucket_tagging(req).await
1194    }
1195    async fn put_bucket_tagging(
1196        &self,
1197        req: S3Request<PutBucketTaggingInput>,
1198    ) -> S3Result<S3Response<PutBucketTaggingOutput>> {
1199        self.backend.put_bucket_tagging(req).await
1200    }
1201    async fn delete_bucket_tagging(
1202        &self,
1203        req: S3Request<DeleteBucketTaggingInput>,
1204    ) -> S3Result<S3Response<DeleteBucketTaggingOutput>> {
1205        self.backend.delete_bucket_tagging(req).await
1206    }
1207
1208    // ---- Bucket encryption ----
1209    async fn get_bucket_encryption(
1210        &self,
1211        req: S3Request<GetBucketEncryptionInput>,
1212    ) -> S3Result<S3Response<GetBucketEncryptionOutput>> {
1213        self.backend.get_bucket_encryption(req).await
1214    }
1215    async fn put_bucket_encryption(
1216        &self,
1217        req: S3Request<PutBucketEncryptionInput>,
1218    ) -> S3Result<S3Response<PutBucketEncryptionOutput>> {
1219        self.backend.put_bucket_encryption(req).await
1220    }
1221    async fn delete_bucket_encryption(
1222        &self,
1223        req: S3Request<DeleteBucketEncryptionInput>,
1224    ) -> S3Result<S3Response<DeleteBucketEncryptionOutput>> {
1225        self.backend.delete_bucket_encryption(req).await
1226    }
1227
1228    // ---- Bucket logging ----
1229    async fn get_bucket_logging(
1230        &self,
1231        req: S3Request<GetBucketLoggingInput>,
1232    ) -> S3Result<S3Response<GetBucketLoggingOutput>> {
1233        self.backend.get_bucket_logging(req).await
1234    }
1235    async fn put_bucket_logging(
1236        &self,
1237        req: S3Request<PutBucketLoggingInput>,
1238    ) -> S3Result<S3Response<PutBucketLoggingOutput>> {
1239        self.backend.put_bucket_logging(req).await
1240    }
1241
1242    // ---- Bucket notification ----
1243    async fn get_bucket_notification_configuration(
1244        &self,
1245        req: S3Request<GetBucketNotificationConfigurationInput>,
1246    ) -> S3Result<S3Response<GetBucketNotificationConfigurationOutput>> {
1247        self.backend
1248            .get_bucket_notification_configuration(req)
1249            .await
1250    }
1251    async fn put_bucket_notification_configuration(
1252        &self,
1253        req: S3Request<PutBucketNotificationConfigurationInput>,
1254    ) -> S3Result<S3Response<PutBucketNotificationConfigurationOutput>> {
1255        self.backend
1256            .put_bucket_notification_configuration(req)
1257            .await
1258    }
1259
1260    // ---- Bucket request payment ----
1261    async fn get_bucket_request_payment(
1262        &self,
1263        req: S3Request<GetBucketRequestPaymentInput>,
1264    ) -> S3Result<S3Response<GetBucketRequestPaymentOutput>> {
1265        self.backend.get_bucket_request_payment(req).await
1266    }
1267    async fn put_bucket_request_payment(
1268        &self,
1269        req: S3Request<PutBucketRequestPaymentInput>,
1270    ) -> S3Result<S3Response<PutBucketRequestPaymentOutput>> {
1271        self.backend.put_bucket_request_payment(req).await
1272    }
1273
1274    // ---- Bucket website ----
1275    async fn get_bucket_website(
1276        &self,
1277        req: S3Request<GetBucketWebsiteInput>,
1278    ) -> S3Result<S3Response<GetBucketWebsiteOutput>> {
1279        self.backend.get_bucket_website(req).await
1280    }
1281    async fn put_bucket_website(
1282        &self,
1283        req: S3Request<PutBucketWebsiteInput>,
1284    ) -> S3Result<S3Response<PutBucketWebsiteOutput>> {
1285        self.backend.put_bucket_website(req).await
1286    }
1287    async fn delete_bucket_website(
1288        &self,
1289        req: S3Request<DeleteBucketWebsiteInput>,
1290    ) -> S3Result<S3Response<DeleteBucketWebsiteOutput>> {
1291        self.backend.delete_bucket_website(req).await
1292    }
1293
1294    // ---- Bucket replication ----
1295    async fn get_bucket_replication(
1296        &self,
1297        req: S3Request<GetBucketReplicationInput>,
1298    ) -> S3Result<S3Response<GetBucketReplicationOutput>> {
1299        self.backend.get_bucket_replication(req).await
1300    }
1301    async fn put_bucket_replication(
1302        &self,
1303        req: S3Request<PutBucketReplicationInput>,
1304    ) -> S3Result<S3Response<PutBucketReplicationOutput>> {
1305        self.backend.put_bucket_replication(req).await
1306    }
1307    async fn delete_bucket_replication(
1308        &self,
1309        req: S3Request<DeleteBucketReplicationInput>,
1310    ) -> S3Result<S3Response<DeleteBucketReplicationOutput>> {
1311        self.backend.delete_bucket_replication(req).await
1312    }
1313
1314    // ---- Bucket accelerate ----
1315    async fn get_bucket_accelerate_configuration(
1316        &self,
1317        req: S3Request<GetBucketAccelerateConfigurationInput>,
1318    ) -> S3Result<S3Response<GetBucketAccelerateConfigurationOutput>> {
1319        self.backend.get_bucket_accelerate_configuration(req).await
1320    }
1321    async fn put_bucket_accelerate_configuration(
1322        &self,
1323        req: S3Request<PutBucketAccelerateConfigurationInput>,
1324    ) -> S3Result<S3Response<PutBucketAccelerateConfigurationOutput>> {
1325        self.backend.put_bucket_accelerate_configuration(req).await
1326    }
1327
1328    // ---- Bucket ownership controls ----
1329    async fn get_bucket_ownership_controls(
1330        &self,
1331        req: S3Request<GetBucketOwnershipControlsInput>,
1332    ) -> S3Result<S3Response<GetBucketOwnershipControlsOutput>> {
1333        self.backend.get_bucket_ownership_controls(req).await
1334    }
1335    async fn put_bucket_ownership_controls(
1336        &self,
1337        req: S3Request<PutBucketOwnershipControlsInput>,
1338    ) -> S3Result<S3Response<PutBucketOwnershipControlsOutput>> {
1339        self.backend.put_bucket_ownership_controls(req).await
1340    }
1341    async fn delete_bucket_ownership_controls(
1342        &self,
1343        req: S3Request<DeleteBucketOwnershipControlsInput>,
1344    ) -> S3Result<S3Response<DeleteBucketOwnershipControlsOutput>> {
1345        self.backend.delete_bucket_ownership_controls(req).await
1346    }
1347
1348    // ---- Public access block ----
1349    async fn get_public_access_block(
1350        &self,
1351        req: S3Request<GetPublicAccessBlockInput>,
1352    ) -> S3Result<S3Response<GetPublicAccessBlockOutput>> {
1353        self.backend.get_public_access_block(req).await
1354    }
1355    async fn put_public_access_block(
1356        &self,
1357        req: S3Request<PutPublicAccessBlockInput>,
1358    ) -> S3Result<S3Response<PutPublicAccessBlockOutput>> {
1359        self.backend.put_public_access_block(req).await
1360    }
1361    async fn delete_public_access_block(
1362        &self,
1363        req: S3Request<DeletePublicAccessBlockInput>,
1364    ) -> S3Result<S3Response<DeletePublicAccessBlockOutput>> {
1365        self.backend.delete_public_access_block(req).await
1366    }
1367}
1368
1369#[cfg(test)]
1370mod tests {
1371    use super::*;
1372
1373    #[test]
1374    fn manifest_roundtrip_via_metadata() {
1375        let original = ChunkManifest {
1376            codec: CodecKind::CpuZstd,
1377            original_size: 1234,
1378            compressed_size: 567,
1379            crc32c: 0xdead_beef,
1380        };
1381        let mut meta: Option<Metadata> = None;
1382        write_manifest(&mut meta, &original);
1383        let extracted = extract_manifest(&meta).expect("manifest must round-trip");
1384        assert_eq!(extracted.codec, original.codec);
1385        assert_eq!(extracted.original_size, original.original_size);
1386        assert_eq!(extracted.compressed_size, original.compressed_size);
1387        assert_eq!(extracted.crc32c, original.crc32c);
1388    }
1389
1390    #[test]
1391    fn missing_metadata_yields_none() {
1392        let meta: Option<Metadata> = None;
1393        assert!(extract_manifest(&meta).is_none());
1394    }
1395
1396    #[test]
1397    fn partial_metadata_yields_none() {
1398        let mut meta = Metadata::new();
1399        meta.insert(META_CODEC.into(), "cpu-zstd".into());
1400        let opt = Some(meta);
1401        assert!(extract_manifest(&opt).is_none());
1402    }
1403}