Skip to main content

s4_server/
service.rs

1//! `s3s::S3` 実装 — `s3s_aws::Proxy` への delegation を default にしつつ、
2//! `put_object` / `get_object` 経路で `s4_codec::CodecRegistry` を呼ぶ。
3//!
4//! ## カバー範囲 (Phase 1 月 2)
5//!
6//! - 圧縮 hook あり: `put_object`, `get_object`
7//! - 純 delegation (圧縮なし): `head_bucket`, `list_buckets`, `create_bucket`, `delete_bucket`,
8//!   `head_object`, `delete_object`, `delete_objects`, `copy_object`, `list_objects`,
9//!   `list_objects_v2`, `create_multipart_upload`, `upload_part`,
10//!   `complete_multipart_upload`, `abort_multipart_upload`, `list_multipart_uploads`,
11//!   `list_parts`
12//! - 未対応 (デフォルトで NotImplemented): その他 80+ ops (Tagging / ACL / Lifecycle 等は Phase 2)
13//!
14//! ## アーキテクチャ
15//!
16//! - `S4Service<B>` は backend (B: S3) と `Arc<CodecRegistry>` と `Arc<dyn CodecDispatcher>`
17//!   を保持する。`CodecRegistry` 経由で複数 codec を抱えられるので、ひとつの S4 インスタンスが
18//!   複数 codec で書かれた object を透過的に GET できる
19//! - PUT: dispatcher が body の先頭 sample から codec を選び、registry で compress、
20//!   manifest を S3 metadata に書いて backend に forward
21//! - GET: backend から取得 → metadata から manifest を復元 → registry.decompress で
22//!   manifest 指定の codec で解凍 → 元の bytes を return
23//!
24//! ## 既知の制限事項
25//!
26//! - **Multipart Upload は per-part 圧縮が未実装**: 現状は upload_part を素通し。
27//!   Phase 1 月 2 後半で per-part compress + complete_multipart_upload で manifest 集約。
28//! - **PUT body は memory に collect**: max_body_bytes 上限あり (default 5 GiB = S3 単発 PUT 上限)。
29//!   Streaming-aware 圧縮は Phase 2。
30
31use std::sync::Arc;
32
33use bytes::BytesMut;
34use s3s::dto::*;
35use s3s::{S3, S3Error, S3ErrorCode, S3Request, S3Response, S3Result};
36use s4_codec::index::{FrameIndex, build_index_from_body, decode_index, encode_index, sidecar_key};
37use s4_codec::multipart::{
38    FRAME_HEADER_BYTES, FrameHeader, FrameIter, S3_MULTIPART_MIN_PART_BYTES, pad_to_minimum,
39    write_frame,
40};
41use s4_codec::{ChunkManifest, CodecDispatcher, CodecKind, CodecRegistry};
42use std::time::Instant;
43use tracing::{debug, info};
44
45use crate::blob::{
46    bytes_to_blob, chain_sample_with_rest, collect_blob, collect_with_sample, peek_sample,
47};
48use crate::streaming::{
49    DEFAULT_S4F2_CHUNK_SIZE, cpu_zstd_decompress_stream, streaming_compress_to_frames,
50    supports_streaming_compress, supports_streaming_decompress,
51};
52
53/// PUT body の先頭 sampling で渡す最大 byte 数。
54const SAMPLE_BYTES: usize = 4096;
55
56pub struct S4Service<B: S3> {
57    backend: B,
58    registry: Arc<CodecRegistry>,
59    dispatcher: Arc<dyn CodecDispatcher>,
60    max_body_bytes: usize,
61    policy: Option<crate::policy::SharedPolicy>,
62}
63
64impl<B: S3> S4Service<B> {
65    /// AWS S3 単発 PUT の API 上限 (5 GiB)
66    pub const DEFAULT_MAX_BODY_BYTES: usize = 5 * 1024 * 1024 * 1024;
67
68    pub fn new(
69        backend: B,
70        registry: Arc<CodecRegistry>,
71        dispatcher: Arc<dyn CodecDispatcher>,
72    ) -> Self {
73        Self {
74            backend,
75            registry,
76            dispatcher,
77            max_body_bytes: Self::DEFAULT_MAX_BODY_BYTES,
78            policy: None,
79        }
80    }
81
82    #[must_use]
83    pub fn with_max_body_bytes(mut self, n: usize) -> Self {
84        self.max_body_bytes = n;
85        self
86    }
87
88    /// Attach an optional bucket policy (v0.2 #7). When `Some(...)`, every
89    /// PUT / GET / DELETE / List handler runs `policy.evaluate(...)` before
90    /// delegating to the backend; failures return `S3ErrorCode::AccessDenied`.
91    /// When `None` (the default), no policy enforcement happens.
92    #[must_use]
93    pub fn with_policy(mut self, policy: crate::policy::SharedPolicy) -> Self {
94        self.policy = Some(policy);
95        self
96    }
97
98    /// Pull the SigV4 access key id off the request's credentials, if any.
99    /// Used as the `principal_id` for policy evaluation.
100    fn principal_of<I>(req: &S3Request<I>) -> Option<&str> {
101        req.credentials.as_ref().map(|c| c.access_key.as_str())
102    }
103
104    /// Helper used by request handlers to enforce the optional policy.
105    /// Returns `Ok(())` when allowed (or no policy is configured), or an
106    /// `AccessDenied` S3Error otherwise. Bumps the policy denial Prometheus
107    /// counter on deny.
108    fn enforce_policy(
109        &self,
110        action: &'static str,
111        bucket: &str,
112        key: Option<&str>,
113        principal_id: Option<&str>,
114    ) -> S3Result<()> {
115        let Some(policy) = self.policy.as_ref() else {
116            return Ok(());
117        };
118        let decision = policy.evaluate(action, bucket, key, principal_id);
119        if decision.allow {
120            Ok(())
121        } else {
122            crate::metrics::record_policy_denial(action, bucket);
123            tracing::info!(
124                action,
125                bucket,
126                key = ?key,
127                principal = ?principal_id,
128                matched_sid = ?decision.matched_sid,
129                effect = ?decision.matched_effect,
130                "S4 policy denied request"
131            );
132            Err(S3Error::with_message(
133                S3ErrorCode::AccessDenied,
134                format!("denied by S4 policy: {action} on bucket={bucket}"),
135            ))
136        }
137    }
138
139    /// テスト用: backend を取り戻す (test helper、production では使わない)
140    pub fn into_backend(self) -> B {
141        self.backend
142    }
143
144    /// 必要 frame だけを backend に Range GET し、frame parse + decompress + slice
145    /// した結果を返す sidecar fast path。Range request の **帯域節約版**。
146    async fn partial_range_get(
147        &self,
148        req: &S3Request<GetObjectInput>,
149        plan: s4_codec::index::RangePlan,
150        client_start: u64,
151        client_end_exclusive: u64,
152        total_original: u64,
153        get_start: Instant,
154    ) -> S3Result<S3Response<GetObjectOutput>> {
155        // 必要 byte 範囲だけを backend に partial GET
156        let backend_range = s3s::dto::Range::Int {
157            first: plan.byte_start,
158            last: Some(plan.byte_end_exclusive - 1),
159        };
160        let backend_input = GetObjectInput {
161            bucket: req.input.bucket.clone(),
162            key: req.input.key.clone(),
163            range: Some(backend_range),
164            ..Default::default()
165        };
166        let backend_req = S3Request {
167            input: backend_input,
168            method: req.method.clone(),
169            uri: req.uri.clone(),
170            headers: req.headers.clone(),
171            extensions: http::Extensions::new(),
172            credentials: req.credentials.clone(),
173            region: req.region.clone(),
174            service: req.service.clone(),
175            trailing_headers: None,
176        };
177        let mut backend_resp = self.backend.get_object(backend_req).await?;
178        let blob = backend_resp.output.body.take().ok_or_else(|| {
179            S3Error::with_message(
180                S3ErrorCode::InternalError,
181                "backend partial GET returned empty body",
182            )
183        })?;
184        let bytes = collect_blob(blob, self.max_body_bytes)
185            .await
186            .map_err(internal("collect partial body"))?;
187
188        // frame parse + decompress
189        let mut combined = BytesMut::new();
190        for frame in FrameIter::new(bytes) {
191            let (header, payload) = frame.map_err(|e| {
192                S3Error::with_message(
193                    S3ErrorCode::InternalError,
194                    format!("partial-range frame parse: {e}"),
195                )
196            })?;
197            let chunk_manifest = ChunkManifest {
198                codec: header.codec,
199                original_size: header.original_size,
200                compressed_size: header.compressed_size,
201                crc32c: header.crc32c,
202            };
203            let decompressed = self
204                .registry
205                .decompress(payload, &chunk_manifest)
206                .await
207                .map_err(internal("partial-range decompress"))?;
208            combined.extend_from_slice(&decompressed);
209        }
210        let combined = combined.freeze();
211        let sliced = combined
212            .slice(plan.slice_start_in_combined as usize..plan.slice_end_in_combined as usize);
213
214        // response 組立て
215        let returned_size = sliced.len() as u64;
216        backend_resp.output.content_length = Some(returned_size as i64);
217        backend_resp.output.content_range = Some(format!(
218            "bytes {client_start}-{}/{total_original}",
219            client_end_exclusive - 1
220        ));
221        backend_resp.output.checksum_crc32 = None;
222        backend_resp.output.checksum_crc32c = None;
223        backend_resp.output.checksum_crc64nvme = None;
224        backend_resp.output.checksum_sha1 = None;
225        backend_resp.output.checksum_sha256 = None;
226        backend_resp.output.e_tag = None;
227        backend_resp.output.body = Some(bytes_to_blob(sliced));
228        backend_resp.status = Some(http::StatusCode::PARTIAL_CONTENT);
229
230        let elapsed = get_start.elapsed();
231        crate::metrics::record_get(
232            "partial",
233            plan.byte_end_exclusive - plan.byte_start,
234            returned_size,
235            elapsed.as_secs_f64(),
236            true,
237        );
238        info!(
239            op = "get_object",
240            bucket = %req.input.bucket,
241            key = %req.input.key,
242            bytes_in = plan.byte_end_exclusive - plan.byte_start,
243            bytes_out = returned_size,
244            total_object_size = total_original,
245            range = true,
246            path = "sidecar-partial",
247            latency_ms = elapsed.as_millis() as u64,
248            "S4 partial Range GET via sidecar index"
249        );
250        Ok(backend_resp)
251    }
252
253    /// `<key>.s4index` sidecar object を backend に書く。失敗しても本体 PUT は
254    /// 成功扱いにしたいので、err は warn ログのみ (Range GET の partial path が
255    /// 使えなくなるが、full read fallback で意味的には正しい結果を返す)。
256    async fn write_sidecar(&self, bucket: &str, key: &str, index: &FrameIndex) {
257        let bytes = encode_index(index);
258        let len = bytes.len() as i64;
259        let put_input = PutObjectInput {
260            bucket: bucket.into(),
261            key: sidecar_key(key),
262            body: Some(bytes_to_blob(bytes)),
263            content_length: Some(len),
264            content_type: Some("application/x-s4-index".into()),
265            ..Default::default()
266        };
267        let put_req = S3Request {
268            input: put_input,
269            method: http::Method::PUT,
270            uri: format!("/{bucket}/{}", sidecar_key(key)).parse().unwrap(),
271            headers: http::HeaderMap::new(),
272            extensions: http::Extensions::new(),
273            credentials: None,
274            region: None,
275            service: None,
276            trailing_headers: None,
277        };
278        if let Err(e) = self.backend.put_object(put_req).await {
279            tracing::warn!(
280                bucket,
281                key,
282                "S4 write_sidecar failed (Range GET will fall back to full read): {e}"
283            );
284        }
285    }
286
287    /// `<key>.s4index` sidecar を backend から読み出す。なければ None。
288    async fn read_sidecar(&self, bucket: &str, key: &str) -> Option<FrameIndex> {
289        let get_input = GetObjectInput {
290            bucket: bucket.into(),
291            key: sidecar_key(key),
292            ..Default::default()
293        };
294        let get_req = S3Request {
295            input: get_input,
296            method: http::Method::GET,
297            uri: format!("/{bucket}/{}", sidecar_key(key)).parse().unwrap(),
298            headers: http::HeaderMap::new(),
299            extensions: http::Extensions::new(),
300            credentials: None,
301            region: None,
302            service: None,
303            trailing_headers: None,
304        };
305        let resp = self.backend.get_object(get_req).await.ok()?;
306        let blob = resp.output.body?;
307        let bytes = collect_blob(blob, 64 * 1024 * 1024).await.ok()?;
308        decode_index(bytes).ok()
309    }
310
311    /// Multipart object (frame 列) を解凍 → 元 bytes を再構築。
312    ///
313    /// **per-frame codec dispatch**: 各 frame header に codec_id が入っているので、
314    /// frame ごとに registry が違う codec を呼ぶことができる。同一 object 内で
315    /// 異なる codec が混在していても透過的に解凍可能 (parquet 風 mixed columns 等)。
316    async fn decompress_multipart(&self, bytes: bytes::Bytes) -> S3Result<bytes::Bytes> {
317        let mut out = BytesMut::new();
318        for frame in FrameIter::new(bytes) {
319            let (header, payload) = frame.map_err(|e| {
320                S3Error::with_message(
321                    S3ErrorCode::InternalError,
322                    format!("multipart frame parse: {e}"),
323                )
324            })?;
325            let chunk_manifest = ChunkManifest {
326                codec: header.codec,
327                original_size: header.original_size,
328                compressed_size: header.compressed_size,
329                crc32c: header.crc32c,
330            };
331            let decompressed = self
332                .registry
333                .decompress(payload, &chunk_manifest)
334                .await
335                .map_err(internal("multipart frame decompress"))?;
336            out.extend_from_slice(&decompressed);
337        }
338        Ok(out.freeze())
339    }
340}
341
342/// Parse a CopySourceRange header value (`bytes=N-M`, `bytes=N-`, `bytes=-N`)
343/// into the s3s::dto::Range used by the GetObject path. The S3 spec only
344/// allows `bytes=N-M` for upload_part_copy (no suffix or open-ended), so
345/// reject the other variants for parity with AWS.
346fn parse_copy_source_range(s: &str) -> Result<s3s::dto::Range, String> {
347    let rest = s
348        .strip_prefix("bytes=")
349        .ok_or_else(|| format!("CopySourceRange must start with 'bytes=', got {s:?}"))?;
350    let (a, b) = rest
351        .split_once('-')
352        .ok_or_else(|| format!("CopySourceRange must be 'bytes=N-M', got {s:?}"))?;
353    let first: u64 = a
354        .parse()
355        .map_err(|_| format!("CopySourceRange first byte not a number: {a:?}"))?;
356    let last: u64 = b
357        .parse()
358        .map_err(|_| format!("CopySourceRange last byte not a number: {b:?}"))?;
359    if last < first {
360        return Err(format!("CopySourceRange last < first: {s:?}"));
361    }
362    Ok(s3s::dto::Range::Int {
363        first,
364        last: Some(last),
365    })
366}
367
368fn is_multipart_object(metadata: &Option<Metadata>) -> bool {
369    metadata
370        .as_ref()
371        .and_then(|m| m.get(META_MULTIPART))
372        .map(|v| v == "true")
373        .unwrap_or(false)
374}
375
376const META_CODEC: &str = "s4-codec";
377const META_ORIGINAL_SIZE: &str = "s4-original-size";
378const META_COMPRESSED_SIZE: &str = "s4-compressed-size";
379const META_CRC32C: &str = "s4-crc32c";
380/// Multipart upload で per-part frame format を使ったオブジェクトであることを示す。
381/// GET 時にこの flag を見て frame parser を起動する。
382const META_MULTIPART: &str = "s4-multipart";
383/// v0.2 #4: single-PUT でも S4F2 framed format で書かれていることを示す。
384/// 旧 v0.1 single-PUT は raw 圧縮 bytes (この flag なし)。GET 時にこの flag を
385/// 見て framed 経路 (= multipart と同じ FrameIter parse) に流す。
386const META_FRAMED: &str = "s4-framed";
387
388fn is_framed_v2_object(metadata: &Option<Metadata>) -> bool {
389    metadata
390        .as_ref()
391        .and_then(|m| m.get(META_FRAMED))
392        .map(|v| v == "true")
393        .unwrap_or(false)
394}
395
396fn extract_manifest(metadata: &Option<Metadata>) -> Option<ChunkManifest> {
397    let m = metadata.as_ref()?;
398    let codec = m
399        .get(META_CODEC)
400        .and_then(|s| s.parse::<CodecKind>().ok())?;
401    let original_size = m.get(META_ORIGINAL_SIZE)?.parse().ok()?;
402    let compressed_size = m.get(META_COMPRESSED_SIZE)?.parse().ok()?;
403    let crc32c = m.get(META_CRC32C)?.parse().ok()?;
404    Some(ChunkManifest {
405        codec,
406        original_size,
407        compressed_size,
408        crc32c,
409    })
410}
411
412fn write_manifest(metadata: &mut Option<Metadata>, manifest: &ChunkManifest) {
413    let meta = metadata.get_or_insert_with(Default::default);
414    meta.insert(META_CODEC.into(), manifest.codec.as_str().into());
415    meta.insert(
416        META_ORIGINAL_SIZE.into(),
417        manifest.original_size.to_string(),
418    );
419    meta.insert(
420        META_COMPRESSED_SIZE.into(),
421        manifest.compressed_size.to_string(),
422    );
423    meta.insert(META_CRC32C.into(), manifest.crc32c.to_string());
424}
425
426fn internal<E: std::fmt::Display>(prefix: &'static str) -> impl FnOnce(E) -> S3Error {
427    move |e| S3Error::with_message(S3ErrorCode::InternalError, format!("{prefix}: {e}"))
428}
429
430/// `Range` request を decompressed object サイズ `total` に適用して `(start, end_exclusive)`
431/// を返す。`Range::Int { first, last }` は `bytes=first-last` (last は inclusive)、
432/// `Range::Suffix { length }` は末尾 `length` byte。S3 仕様に準拠。
433pub fn resolve_range(range: &s3s::dto::Range, total: u64) -> Result<(u64, u64), String> {
434    if total == 0 {
435        return Err("cannot range-get zero-length object".into());
436    }
437    match range {
438        s3s::dto::Range::Int { first, last } => {
439            let start = *first;
440            let end_inclusive = match last {
441                Some(l) => (*l).min(total - 1),
442                None => total - 1,
443            };
444            if start > end_inclusive || start >= total {
445                return Err(format!(
446                    "range bytes={start}-{:?} out of object size {total}",
447                    last
448                ));
449            }
450            Ok((start, end_inclusive + 1))
451        }
452        s3s::dto::Range::Suffix { length } => {
453            let len = (*length).min(total);
454            Ok((total - len, total))
455        }
456    }
457}
458
459#[async_trait::async_trait]
460impl<B: S3> S3 for S4Service<B> {
461    // === 圧縮を挟む path (PUT) ===
462    #[tracing::instrument(
463        name = "s4.put_object",
464        skip(self, req),
465        fields(bucket = %req.input.bucket, key = %req.input.key, codec, bytes_in, bytes_out, latency_ms)
466    )]
467    async fn put_object(
468        &self,
469        mut req: S3Request<PutObjectInput>,
470    ) -> S3Result<S3Response<PutObjectOutput>> {
471        let put_start = Instant::now();
472        let put_bucket = req.input.bucket.clone();
473        let put_key = req.input.key.clone();
474        self.enforce_policy(
475            "s3:PutObject",
476            &put_bucket,
477            Some(&put_key),
478            Self::principal_of(&req),
479        )?;
480        if let Some(blob) = req.input.body.take() {
481            // Sample 4 KiB から codec を決定。streaming-aware codec なら streaming
482            // compress fast path、そうでなければ従来の collect-then-compress。
483            let (sample, rest_stream) = peek_sample(blob, SAMPLE_BYTES)
484                .await
485                .map_err(internal("peek put sample"))?;
486            let sample_len = sample.len().min(SAMPLE_BYTES);
487            let kind = self.dispatcher.pick(&sample[..sample_len]).await;
488
489            // Passthrough buys nothing from S4F2 wrapping (no compression =
490            // no per-chunk frame to skip past) and the +28-byte header
491            // overhead breaks size-sensitive callers that expect a true
492            // pass-through. So passthrough always uses the legacy raw-blob
493            // path; only compressing codecs go through the framed path.
494            let use_framed = supports_streaming_compress(kind) && kind != CodecKind::Passthrough;
495            let (compressed, manifest, is_framed) = if use_framed {
496                // streaming fast path: input は memory に collect しない
497                let chained = chain_sample_with_rest(sample, rest_stream);
498                debug!(
499                    bucket = ?req.input.bucket,
500                    key = ?req.input.key,
501                    codec = kind.as_str(),
502                    path = "streaming-framed",
503                    "S4 put_object: compressing (streaming, S4F2 multi-frame)"
504                );
505                let (body, manifest) = streaming_compress_to_frames(
506                    chained,
507                    Arc::clone(&self.registry),
508                    kind,
509                    DEFAULT_S4F2_CHUNK_SIZE,
510                )
511                .await
512                .map_err(internal("streaming framed compress"))?;
513                (body, manifest, true)
514            } else {
515                // GPU codec 等で streaming-aware でないものは bytes-buffered path
516                // (raw 圧縮 bytes、framed なし — back-compat 互換 path)
517                let bytes = collect_with_sample(sample, rest_stream, self.max_body_bytes)
518                    .await
519                    .map_err(internal("collect put body (buffered path)"))?;
520                debug!(
521                    bucket = ?req.input.bucket,
522                    key = ?req.input.key,
523                    bytes = bytes.len(),
524                    codec = kind.as_str(),
525                    path = "buffered",
526                    "S4 put_object: compressing (buffered, raw blob)"
527                );
528                let (body, m) = self
529                    .registry
530                    .compress(bytes, kind)
531                    .await
532                    .map_err(internal("registry compress"))?;
533                (body, m, false)
534            };
535
536            write_manifest(&mut req.input.metadata, &manifest);
537            if is_framed {
538                // v0.2 #4: framed body であることを GET 側に伝える meta flag。
539                req.input
540                    .metadata
541                    .get_or_insert_with(Default::default)
542                    .insert(META_FRAMED.into(), "true".into());
543            }
544            // 重要: content_length を圧縮後サイズで更新する。
545            // これを忘れると下流 (aws-sdk-s3 → S3) が宣言サイズ分の bytes を
546            // 待ち続けて RequestTimeout で失敗する (S3 仕様)。
547            req.input.content_length = Some(compressed.len() as i64);
548            // body を書き換えたので、客側が送ってきた original body 用の
549            // checksum / MD5 ヘッダは無効化する (そのまま転送すると下流 S3 が
550            // XAmzContentChecksumMismatch を返す)。S4 自身の整合性は
551            // ChunkManifest.crc32c で担保している。
552            req.input.checksum_algorithm = None;
553            req.input.checksum_crc32 = None;
554            req.input.checksum_crc32c = None;
555            req.input.checksum_crc64nvme = None;
556            req.input.checksum_sha1 = None;
557            req.input.checksum_sha256 = None;
558            req.input.content_md5 = None;
559            let original_size = manifest.original_size;
560            let compressed_size = manifest.compressed_size;
561            let codec_label = manifest.codec.as_str();
562            // framed body は GET 側で sidecar partial-fetch を効かせるため
563            // build_index_from_body で sidecar を組み立てて backend に PUT する。
564            let sidecar_index = if is_framed {
565                s4_codec::index::build_index_from_body(&compressed).ok()
566            } else {
567                None
568            };
569            req.input.body = Some(bytes_to_blob(compressed));
570            let backend_resp = self.backend.put_object(req).await;
571            if let Some(idx) = sidecar_index
572                && backend_resp.is_ok()
573                && idx.entries.len() > 1
574            {
575                // 1 chunk しかない (small object) なら sidecar は意味がない (=
576                // partial fetch しても full body と同じ範囲) ので省略。
577                self.write_sidecar(&put_bucket, &put_key, &idx).await;
578            }
579            let _ = (original_size, compressed_size); // mute unused warnings
580            let elapsed = put_start.elapsed();
581            crate::metrics::record_put(
582                codec_label,
583                original_size,
584                compressed_size,
585                elapsed.as_secs_f64(),
586                backend_resp.is_ok(),
587            );
588            info!(
589                op = "put_object",
590                bucket = %put_bucket,
591                key = %put_key,
592                codec = codec_label,
593                bytes_in = original_size,
594                bytes_out = compressed_size,
595                ratio = format!(
596                    "{:.3}",
597                    if original_size == 0 { 1.0 } else { compressed_size as f64 / original_size as f64 }
598                ),
599                latency_ms = elapsed.as_millis() as u64,
600                ok = backend_resp.is_ok(),
601                "S4 put completed"
602            );
603            return backend_resp;
604        }
605        self.backend.put_object(req).await
606    }
607
608    // === 圧縮を解く path (GET) ===
609    #[tracing::instrument(
610        name = "s4.get_object",
611        skip(self, req),
612        fields(bucket = %req.input.bucket, key = %req.input.key, codec, bytes_out, range, path)
613    )]
614    async fn get_object(
615        &self,
616        mut req: S3Request<GetObjectInput>,
617    ) -> S3Result<S3Response<GetObjectOutput>> {
618        let get_start = Instant::now();
619        let get_bucket = req.input.bucket.clone();
620        let get_key = req.input.key.clone();
621        self.enforce_policy(
622            "s3:GetObject",
623            &get_bucket,
624            Some(&get_key),
625            Self::principal_of(&req),
626        )?;
627        // Range request の事前検出 (decompress 後 slice する path に使う)。
628        let range_request = req.input.range.take();
629
630        // ====== Range GET の partial-fetch fast path (sidecar index 利用) ======
631        // sidecar `<key>.s4index` が存在し、multipart-framed object であれば
632        // 必要 frame だけを backend に Range GET し帯域節約する。
633        if let Some(ref r) = range_request
634            && let Some(index) = self.read_sidecar(&req.input.bucket, &req.input.key).await
635        {
636            let total = index.total_original_size();
637            let (start, end_exclusive) = match resolve_range(r, total) {
638                Ok(v) => v,
639                Err(e) => {
640                    return Err(S3Error::with_message(S3ErrorCode::InvalidRange, e));
641                }
642            };
643            if let Some(plan) = index.lookup_range(start, end_exclusive) {
644                return self
645                    .partial_range_get(&req, plan, start, end_exclusive, total, get_start)
646                    .await;
647            }
648        }
649        let mut resp = self.backend.get_object(req).await?;
650        let is_multipart = is_multipart_object(&resp.output.metadata);
651        let is_framed_v2 = is_framed_v2_object(&resp.output.metadata);
652        // v0.2 #4: framed-v2 single-PUT は多 frame parse が必要なので
653        // multipart と同じ path に流す。
654        let needs_frame_parse = is_multipart || is_framed_v2;
655        let manifest_opt = extract_manifest(&resp.output.metadata);
656
657        if !needs_frame_parse && manifest_opt.is_none() {
658            // S4 が書いていないオブジェクトは透過 (raw bucket pre-existing object 等)
659            debug!("S4 get_object: object lacks s4-codec metadata, returning as-is");
660            return Ok(resp);
661        }
662
663        if let Some(blob) = resp.output.body.take() {
664            // ====== Streaming fast path (CpuZstd, non-multipart, codec supports it) ======
665            // 大規模 object (e.g. 5 GB) を memory に collect すると OOM するので、
666            // codec が streaming-aware なら body を chunk-by-chunk で decompress して
667            // 即座に client に流す。
668            //
669            // ただし Range request 時は streaming できない (slice するため total bytes
670            // が必要) → buffered path に fall through。
671            if range_request.is_none()
672                && !needs_frame_parse
673                && let Some(ref m) = manifest_opt
674                && supports_streaming_decompress(m.codec)
675                && m.codec == CodecKind::CpuZstd
676            {
677                let decompressed_blob = cpu_zstd_decompress_stream(blob);
678                resp.output.content_length = Some(m.original_size as i64);
679                resp.output.checksum_crc32 = None;
680                resp.output.checksum_crc32c = None;
681                resp.output.checksum_crc64nvme = None;
682                resp.output.checksum_sha1 = None;
683                resp.output.checksum_sha256 = None;
684                resp.output.e_tag = None;
685                resp.output.body = Some(decompressed_blob);
686                let elapsed = get_start.elapsed();
687                crate::metrics::record_get(
688                    m.codec.as_str(),
689                    m.compressed_size,
690                    m.original_size,
691                    elapsed.as_secs_f64(),
692                    true,
693                );
694                info!(
695                    op = "get_object",
696                    bucket = %get_bucket,
697                    key = %get_key,
698                    codec = m.codec.as_str(),
699                    bytes_in = m.compressed_size,
700                    bytes_out = m.original_size,
701                    path = "streaming",
702                    setup_latency_ms = elapsed.as_millis() as u64,
703                    "S4 get started (streaming)"
704                );
705                return Ok(resp);
706            }
707            // Passthrough: そのまま流す (Range なしの場合のみ streaming)
708            if range_request.is_none()
709                && !needs_frame_parse
710                && let Some(ref m) = manifest_opt
711                && m.codec == CodecKind::Passthrough
712            {
713                resp.output.content_length = Some(m.original_size as i64);
714                resp.output.checksum_crc32 = None;
715                resp.output.checksum_crc32c = None;
716                resp.output.checksum_crc64nvme = None;
717                resp.output.checksum_sha1 = None;
718                resp.output.checksum_sha256 = None;
719                resp.output.e_tag = None;
720                resp.output.body = Some(blob);
721                debug!("S4 get_object: passthrough streaming");
722                return Ok(resp);
723            }
724
725            // ====== Buffered slow path (multipart frame parser, GPU codecs) ======
726            let bytes = collect_blob(blob, self.max_body_bytes)
727                .await
728                .map_err(internal("collect get body"))?;
729
730            let decompressed = if needs_frame_parse {
731                // multipart objects と framed-v2 single-PUT objects は同じ
732                // S4F2 frame 列なので decompress_multipart で統一処理
733                self.decompress_multipart(bytes).await?
734            } else {
735                let manifest = manifest_opt.as_ref().expect("non-multipart guarded above");
736                self.registry
737                    .decompress(bytes, manifest)
738                    .await
739                    .map_err(internal("registry decompress"))?
740            };
741
742            // Range request があれば slice。なければ full body を返す。
743            let total_size = decompressed.len() as u64;
744            let (final_bytes, status_override) = if let Some(r) = range_request.as_ref() {
745                let (start, end) = resolve_range(r, total_size)
746                    .map_err(|e| S3Error::with_message(S3ErrorCode::InvalidRange, e))?;
747                let sliced = decompressed.slice(start as usize..end as usize);
748                resp.output.content_range = Some(format!(
749                    "bytes {start}-{}/{total_size}",
750                    end.saturating_sub(1)
751                ));
752                (sliced, Some(http::StatusCode::PARTIAL_CONTENT))
753            } else {
754                (decompressed, None)
755            };
756            // 解凍後の真のサイズを返す (S3 client は content_length を信頼するので
757            // 圧縮 size のままだと downstream が body を途中で切ってしまう)
758            resp.output.content_length = Some(final_bytes.len() as i64);
759            // 圧縮済 bytes の checksum を返すと AWS SDK 側で StreamingError
760            // (ChecksumMismatch) になる。ETag も backend が返した「圧縮済 bytes の
761            // MD5/checksum」なので意味的にズレる — クリアして S4 自身の crc32c
762            // (manifest 内 / frame 内) で integrity を保証する設計にする。
763            resp.output.checksum_crc32 = None;
764            resp.output.checksum_crc32c = None;
765            resp.output.checksum_crc64nvme = None;
766            resp.output.checksum_sha1 = None;
767            resp.output.checksum_sha256 = None;
768            resp.output.e_tag = None;
769            let returned_size = final_bytes.len() as u64;
770            let codec_label = manifest_opt
771                .as_ref()
772                .map(|m| m.codec.as_str())
773                .unwrap_or("multipart");
774            resp.output.body = Some(bytes_to_blob(final_bytes));
775            if let Some(status) = status_override {
776                resp.status = Some(status);
777            }
778            let elapsed = get_start.elapsed();
779            crate::metrics::record_get(codec_label, 0, returned_size, elapsed.as_secs_f64(), true);
780            info!(
781                op = "get_object",
782                bucket = %get_bucket,
783                key = %get_key,
784                codec = codec_label,
785                bytes_out = returned_size,
786                total_object_size = total_size,
787                range = range_request.is_some(),
788                path = "buffered",
789                latency_ms = elapsed.as_millis() as u64,
790                "S4 get completed (buffered)"
791            );
792        }
793        Ok(resp)
794    }
795
796    // === passthrough delegations ===
797    async fn head_bucket(
798        &self,
799        req: S3Request<HeadBucketInput>,
800    ) -> S3Result<S3Response<HeadBucketOutput>> {
801        self.backend.head_bucket(req).await
802    }
803    async fn list_buckets(
804        &self,
805        req: S3Request<ListBucketsInput>,
806    ) -> S3Result<S3Response<ListBucketsOutput>> {
807        self.backend.list_buckets(req).await
808    }
809    async fn create_bucket(
810        &self,
811        req: S3Request<CreateBucketInput>,
812    ) -> S3Result<S3Response<CreateBucketOutput>> {
813        self.backend.create_bucket(req).await
814    }
815    async fn delete_bucket(
816        &self,
817        req: S3Request<DeleteBucketInput>,
818    ) -> S3Result<S3Response<DeleteBucketOutput>> {
819        self.backend.delete_bucket(req).await
820    }
821    async fn head_object(
822        &self,
823        req: S3Request<HeadObjectInput>,
824    ) -> S3Result<S3Response<HeadObjectOutput>> {
825        let mut resp = self.backend.head_object(req).await?;
826        if let Some(manifest) = extract_manifest(&resp.output.metadata) {
827            // 客側には decompress 後の意味のある content_length / checksum を返す。
828            // backend が返す圧縮済 bytes の checksum / e_tag は意味が違うため除去
829            // (S4 は manifest 内の crc32c で integrity を担保する)。
830            resp.output.content_length = Some(manifest.original_size as i64);
831            resp.output.checksum_crc32 = None;
832            resp.output.checksum_crc32c = None;
833            resp.output.checksum_crc64nvme = None;
834            resp.output.checksum_sha1 = None;
835            resp.output.checksum_sha256 = None;
836            resp.output.e_tag = None;
837        }
838        Ok(resp)
839    }
840    async fn delete_object(
841        &self,
842        req: S3Request<DeleteObjectInput>,
843    ) -> S3Result<S3Response<DeleteObjectOutput>> {
844        let bucket = req.input.bucket.clone();
845        let key = req.input.key.clone();
846        self.enforce_policy(
847            "s3:DeleteObject",
848            &bucket,
849            Some(&key),
850            Self::principal_of(&req),
851        )?;
852        // sidecar も best-effort で削除 (失敗は無視 — 存在しない場合や IAM 制限を許容)
853        let resp = self.backend.delete_object(req).await?;
854        let sidecar_input = DeleteObjectInput {
855            bucket: bucket.clone(),
856            key: sidecar_key(&key),
857            ..Default::default()
858        };
859        let sidecar_req = S3Request {
860            input: sidecar_input,
861            method: http::Method::DELETE,
862            uri: format!("/{bucket}/{}", sidecar_key(&key)).parse().unwrap(),
863            headers: http::HeaderMap::new(),
864            extensions: http::Extensions::new(),
865            credentials: None,
866            region: None,
867            service: None,
868            trailing_headers: None,
869        };
870        let _ = self.backend.delete_object(sidecar_req).await;
871        Ok(resp)
872    }
873    async fn delete_objects(
874        &self,
875        req: S3Request<DeleteObjectsInput>,
876    ) -> S3Result<S3Response<DeleteObjectsOutput>> {
877        self.backend.delete_objects(req).await
878    }
879    async fn copy_object(
880        &self,
881        mut req: S3Request<CopyObjectInput>,
882    ) -> S3Result<S3Response<CopyObjectOutput>> {
883        // copy is conceptually "GetObject src + PutObject dst" — enforce both.
884        let dst_bucket = req.input.bucket.clone();
885        let dst_key = req.input.key.clone();
886        self.enforce_policy(
887            "s3:PutObject",
888            &dst_bucket,
889            Some(&dst_key),
890            Self::principal_of(&req),
891        )?;
892        if let CopySource::Bucket { bucket, key, .. } = &req.input.copy_source {
893            self.enforce_policy("s3:GetObject", bucket, Some(key), Self::principal_of(&req))?;
894        }
895        // S4-aware copy: source object に s4-* metadata がある場合、それを
896        // destination に確実に preserve する。
897        //
898        // - MetadataDirective::COPY (default): backend が source metadata を
899        //   そのまま copy するので S4 metadata も自動で渡る。介入不要
900        // - MetadataDirective::REPLACE: 客が指定した metadata で source を
901        //   上書き → s4-* metadata が消えると destination は decompress 不能に
902        //   なる (silent corruption)。S4 が source metadata を HEAD で取得し、
903        //   s4-* fields を input.metadata に強制 merge する
904        let needs_merge = req
905            .input
906            .metadata_directive
907            .as_ref()
908            .map(|d| d.as_str() == MetadataDirective::REPLACE)
909            .unwrap_or(false);
910        if needs_merge && let CopySource::Bucket { bucket, key, .. } = &req.input.copy_source {
911            let head_input = HeadObjectInput {
912                bucket: bucket.to_string(),
913                key: key.to_string(),
914                ..Default::default()
915            };
916            let head_req = S3Request {
917                input: head_input,
918                method: req.method.clone(),
919                uri: req.uri.clone(),
920                headers: req.headers.clone(),
921                extensions: http::Extensions::new(),
922                credentials: req.credentials.clone(),
923                region: req.region.clone(),
924                service: req.service.clone(),
925                trailing_headers: None,
926            };
927            if let Ok(head) = self.backend.head_object(head_req).await
928                && let Some(src_meta) = head.output.metadata.as_ref()
929            {
930                let dest_meta = req.input.metadata.get_or_insert_with(Default::default);
931                for key in [
932                    META_CODEC,
933                    META_ORIGINAL_SIZE,
934                    META_COMPRESSED_SIZE,
935                    META_CRC32C,
936                    META_MULTIPART,
937                    META_FRAMED,
938                ] {
939                    if let Some(v) = src_meta.get(key) {
940                        // 客が同じ key を指定していたら preserve しない (= 上書き許可)
941                        // していたら何もしない。指定していなければ insert
942                        dest_meta
943                            .entry(key.to_string())
944                            .or_insert_with(|| v.clone());
945                    }
946                }
947                debug!(
948                    src_bucket = %bucket,
949                    src_key = %key,
950                    "S4 copy_object: preserved s4-* metadata across REPLACE directive"
951                );
952            }
953        }
954        self.backend.copy_object(req).await
955    }
956    async fn list_objects(
957        &self,
958        req: S3Request<ListObjectsInput>,
959    ) -> S3Result<S3Response<ListObjectsOutput>> {
960        self.enforce_policy(
961            "s3:ListBucket",
962            &req.input.bucket,
963            None,
964            Self::principal_of(&req),
965        )?;
966        let mut resp = self.backend.list_objects(req).await?;
967        // S4 内部 object (`*.s4index` sidecar 等) を顧客から隠す
968        if let Some(contents) = resp.output.contents.as_mut() {
969            contents.retain(|o| {
970                o.key
971                    .as_ref()
972                    .map(|k| !k.ends_with(".s4index"))
973                    .unwrap_or(true)
974            });
975        }
976        Ok(resp)
977    }
978    async fn list_objects_v2(
979        &self,
980        req: S3Request<ListObjectsV2Input>,
981    ) -> S3Result<S3Response<ListObjectsV2Output>> {
982        self.enforce_policy(
983            "s3:ListBucket",
984            &req.input.bucket,
985            None,
986            Self::principal_of(&req),
987        )?;
988        let mut resp = self.backend.list_objects_v2(req).await?;
989        if let Some(contents) = resp.output.contents.as_mut() {
990            let before = contents.len();
991            contents.retain(|o| {
992                o.key
993                    .as_ref()
994                    .map(|k| !k.ends_with(".s4index"))
995                    .unwrap_or(true)
996            });
997            // key_count も補正 (S3 spec compliance)
998            if let Some(kc) = resp.output.key_count.as_mut() {
999                *kc -= (before - contents.len()) as i32;
1000            }
1001        }
1002        Ok(resp)
1003    }
1004    async fn create_multipart_upload(
1005        &self,
1006        mut req: S3Request<CreateMultipartUploadInput>,
1007    ) -> S3Result<S3Response<CreateMultipartUploadOutput>> {
1008        // Multipart object は per-part 圧縮 + frame 形式で書く。GET 時に
1009        // frame parse を起動するため、object metadata に flag を立てる。
1010        // codec は dispatcher の default kind を採用 (per-part 別 codec は Phase 2)。
1011        let codec_kind = self.registry.default_kind();
1012        let meta = req.input.metadata.get_or_insert_with(Default::default);
1013        meta.insert(META_MULTIPART.into(), "true".into());
1014        meta.insert(META_CODEC.into(), codec_kind.as_str().into());
1015        debug!(
1016            bucket = ?req.input.bucket,
1017            key = ?req.input.key,
1018            codec = codec_kind.as_str(),
1019            "S4 create_multipart_upload: marking object for per-part compression"
1020        );
1021        self.backend.create_multipart_upload(req).await
1022    }
1023
1024    async fn upload_part(
1025        &self,
1026        mut req: S3Request<UploadPartInput>,
1027    ) -> S3Result<S3Response<UploadPartOutput>> {
1028        // 各 part を圧縮して frame header 付きで forward。GET 時に
1029        // `decompress_multipart` が frame iter で順に解凍する。
1030        // **per-part codec dispatch**: dispatcher が body 先頭 sample から
1031        // codec を選ぶので、parquet 風の mixed-content multipart で part ごとに
1032        // 最適 codec を使える (整数列 part → Bitcomp、text 列 part → zstd 等)。
1033        if let Some(blob) = req.input.body.take() {
1034            let bytes = collect_blob(blob, self.max_body_bytes)
1035                .await
1036                .map_err(internal("collect upload_part body"))?;
1037            let sample_len = bytes.len().min(SAMPLE_BYTES);
1038            let codec_kind = self.dispatcher.pick(&bytes[..sample_len]).await;
1039            let original_size = bytes.len() as u64;
1040            let (compressed, manifest) = self
1041                .registry
1042                .compress(bytes, codec_kind)
1043                .await
1044                .map_err(internal("registry compress part"))?;
1045            let header = FrameHeader {
1046                codec: codec_kind,
1047                original_size,
1048                compressed_size: compressed.len() as u64,
1049                crc32c: manifest.crc32c,
1050            };
1051            let mut framed = BytesMut::with_capacity(FRAME_HEADER_BYTES + compressed.len());
1052            write_frame(&mut framed, header, &compressed);
1053            // v0.2 #5: heuristic-based padding skip for likely-final parts.
1054            //
1055            // AWS SDK / aws-cli / boto3 always send the final (and only the
1056            // final) part below the configured part_size. So if the raw user
1057            // part is already smaller than S3's 5 MiB multipart minimum, this
1058            // is overwhelmingly likely to be the final part — and the final
1059            // part is exempt from S3's size constraint. Skipping padding here
1060            // saves up to ~5 MiB per object on highly compressible workloads.
1061            //
1062            // If a misbehaving client sends a tiny **non-final** part, S3
1063            // itself rejects with EntityTooSmall at CompleteMultipartUpload —
1064            // identical outcome to a vanilla S3 PUT, just earlier than
1065            // padding-then-complete would catch it.
1066            let likely_final = original_size < S3_MULTIPART_MIN_PART_BYTES as u64;
1067            if !likely_final {
1068                pad_to_minimum(&mut framed, S3_MULTIPART_MIN_PART_BYTES);
1069            }
1070            let framed_bytes = framed.freeze();
1071            let new_len = framed_bytes.len() as i64;
1072            // 同じ wire 互換問題が multipart にもある (content-length / checksum)
1073            req.input.content_length = Some(new_len);
1074            req.input.checksum_algorithm = None;
1075            req.input.checksum_crc32 = None;
1076            req.input.checksum_crc32c = None;
1077            req.input.checksum_crc64nvme = None;
1078            req.input.checksum_sha1 = None;
1079            req.input.checksum_sha256 = None;
1080            req.input.content_md5 = None;
1081            req.input.body = Some(bytes_to_blob(framed_bytes));
1082            debug!(
1083                part_number = ?req.input.part_number,
1084                upload_id = ?req.input.upload_id,
1085                original_size,
1086                framed_size = new_len,
1087                "S4 upload_part: framed compressed payload"
1088            );
1089        }
1090        self.backend.upload_part(req).await
1091    }
1092    async fn complete_multipart_upload(
1093        &self,
1094        req: S3Request<CompleteMultipartUploadInput>,
1095    ) -> S3Result<S3Response<CompleteMultipartUploadOutput>> {
1096        let bucket = req.input.bucket.clone();
1097        let key = req.input.key.clone();
1098        let resp = self.backend.complete_multipart_upload(req).await?;
1099        // CompleteMultipartUpload 成功 → 完成した object を full fetch して frame
1100        // index を build、`<key>.s4index` sidecar として保存。これで Range GET の
1101        // partial fetch path が利用可能になる (Range request の帯域節約)。
1102        // 注: 巨大 object の場合この pass は重いが、Range query は一度 sidecar が
1103        // できれば爆速になるので 1 回の cost は payback される
1104        let bucket_clone = bucket.clone();
1105        let key_clone = key.clone();
1106        let get_input = GetObjectInput {
1107            bucket: bucket_clone.clone(),
1108            key: key_clone.clone(),
1109            ..Default::default()
1110        };
1111        let get_req = S3Request {
1112            input: get_input,
1113            method: http::Method::GET,
1114            uri: format!("/{bucket_clone}/{key_clone}").parse().unwrap(),
1115            headers: http::HeaderMap::new(),
1116            extensions: http::Extensions::new(),
1117            credentials: None,
1118            region: None,
1119            service: None,
1120            trailing_headers: None,
1121        };
1122        if let Ok(get_resp) = self.backend.get_object(get_req).await
1123            && let Some(blob) = get_resp.output.body
1124            && let Ok(body) = collect_blob(blob, self.max_body_bytes).await
1125            && let Ok(index) = build_index_from_body(&body)
1126        {
1127            self.write_sidecar(&bucket, &key, &index).await;
1128        }
1129        Ok(resp)
1130    }
1131    async fn abort_multipart_upload(
1132        &self,
1133        req: S3Request<AbortMultipartUploadInput>,
1134    ) -> S3Result<S3Response<AbortMultipartUploadOutput>> {
1135        self.backend.abort_multipart_upload(req).await
1136    }
1137    async fn list_multipart_uploads(
1138        &self,
1139        req: S3Request<ListMultipartUploadsInput>,
1140    ) -> S3Result<S3Response<ListMultipartUploadsOutput>> {
1141        self.backend.list_multipart_uploads(req).await
1142    }
1143    async fn list_parts(
1144        &self,
1145        req: S3Request<ListPartsInput>,
1146    ) -> S3Result<S3Response<ListPartsOutput>> {
1147        self.backend.list_parts(req).await
1148    }
1149
1150    // =========================================================================
1151    // Phase 2 — pure passthrough delegations。S4 はこれらに対して圧縮 hook を
1152    // 持たないので、backend (= AWS S3) の動作と完全に同一。
1153    //
1154    // 既知の制限事項:
1155    // - copy_object / upload_part_copy: source object が S4-compressed の場合、
1156    //   backend が bytes を copy するだけなので metadata (s4-codec etc) も一緒に
1157    //   coppied される (AWS S3 default = MetadataDirective COPY)。GET は manifest
1158    //   経由で正しく decompress できる。MetadataDirective REPLACE で上書き
1159    //   されると圧縮 metadata が消えて壊れる — 顧客側の運用で注意
1160    // - list_object_versions: versioning enabled bucket では各 version も S4
1161    //   metadata を維持する。古い version も S4 経由で正しく GET できる。
1162    // =========================================================================
1163
1164    // ---- Object ACL / tagging / attributes ----
1165    async fn get_object_acl(
1166        &self,
1167        req: S3Request<GetObjectAclInput>,
1168    ) -> S3Result<S3Response<GetObjectAclOutput>> {
1169        self.backend.get_object_acl(req).await
1170    }
1171    async fn put_object_acl(
1172        &self,
1173        req: S3Request<PutObjectAclInput>,
1174    ) -> S3Result<S3Response<PutObjectAclOutput>> {
1175        self.backend.put_object_acl(req).await
1176    }
1177    async fn get_object_tagging(
1178        &self,
1179        req: S3Request<GetObjectTaggingInput>,
1180    ) -> S3Result<S3Response<GetObjectTaggingOutput>> {
1181        self.backend.get_object_tagging(req).await
1182    }
1183    async fn put_object_tagging(
1184        &self,
1185        req: S3Request<PutObjectTaggingInput>,
1186    ) -> S3Result<S3Response<PutObjectTaggingOutput>> {
1187        self.backend.put_object_tagging(req).await
1188    }
1189    async fn delete_object_tagging(
1190        &self,
1191        req: S3Request<DeleteObjectTaggingInput>,
1192    ) -> S3Result<S3Response<DeleteObjectTaggingOutput>> {
1193        self.backend.delete_object_tagging(req).await
1194    }
1195    async fn get_object_attributes(
1196        &self,
1197        req: S3Request<GetObjectAttributesInput>,
1198    ) -> S3Result<S3Response<GetObjectAttributesOutput>> {
1199        self.backend.get_object_attributes(req).await
1200    }
1201    async fn restore_object(
1202        &self,
1203        req: S3Request<RestoreObjectInput>,
1204    ) -> S3Result<S3Response<RestoreObjectOutput>> {
1205        self.backend.restore_object(req).await
1206    }
1207    async fn upload_part_copy(
1208        &self,
1209        req: S3Request<UploadPartCopyInput>,
1210    ) -> S3Result<S3Response<UploadPartCopyOutput>> {
1211        // v0.2 #6: byte-range aware copy when the source is S4-framed.
1212        //
1213        // For a framed source (multipart upload OR single-PUT framed-v2),
1214        // a naive byte-range passthrough would copy compressed bytes that
1215        // don't align with S4 frame boundaries — silently corrupting the
1216        // result. Instead we GET the source through S4 (which handles
1217        // decompression + Range), re-compress + re-frame as a new part,
1218        // and forward as upload_part. For non-framed sources (S4-untouched
1219        // raw objects), passthrough is correct and we keep the original
1220        // (cheaper) code path.
1221        let CopySource::Bucket {
1222            bucket: src_bucket,
1223            key: src_key,
1224            ..
1225        } = &req.input.copy_source
1226        else {
1227            return self.backend.upload_part_copy(req).await;
1228        };
1229        let src_bucket = src_bucket.to_string();
1230        let src_key = src_key.to_string();
1231
1232        // Probe metadata to decide whether the source needs S4-aware copy.
1233        let head_input = HeadObjectInput {
1234            bucket: src_bucket.clone(),
1235            key: src_key.clone(),
1236            ..Default::default()
1237        };
1238        let head_req = S3Request {
1239            input: head_input,
1240            method: http::Method::HEAD,
1241            uri: req.uri.clone(),
1242            headers: req.headers.clone(),
1243            extensions: http::Extensions::new(),
1244            credentials: req.credentials.clone(),
1245            region: req.region.clone(),
1246            service: req.service.clone(),
1247            trailing_headers: None,
1248        };
1249        let needs_s4_copy = match self.backend.head_object(head_req).await {
1250            Ok(h) => {
1251                is_multipart_object(&h.output.metadata) || is_framed_v2_object(&h.output.metadata)
1252            }
1253            Err(_) => false,
1254        };
1255        if !needs_s4_copy {
1256            return self.backend.upload_part_copy(req).await;
1257        }
1258
1259        // Resolve the optional source byte range to pass to GET.
1260        let source_range = req
1261            .input
1262            .copy_source_range
1263            .as_ref()
1264            .map(|r| parse_copy_source_range(r))
1265            .transpose()
1266            .map_err(|e| S3Error::with_message(S3ErrorCode::InvalidRange, e))?;
1267
1268        // GET source via S4 (handles decompression + sidecar partial fetch
1269        // when range is present). The result is the requested user-visible
1270        // byte range, fully decompressed.
1271        let mut get_input = GetObjectInput {
1272            bucket: src_bucket.clone(),
1273            key: src_key.clone(),
1274            ..Default::default()
1275        };
1276        get_input.range = source_range;
1277        let get_req = S3Request {
1278            input: get_input,
1279            method: http::Method::GET,
1280            uri: req.uri.clone(),
1281            headers: req.headers.clone(),
1282            extensions: http::Extensions::new(),
1283            credentials: req.credentials.clone(),
1284            region: req.region.clone(),
1285            service: req.service.clone(),
1286            trailing_headers: None,
1287        };
1288        let get_resp = self.get_object(get_req).await?;
1289        let blob = get_resp.output.body.ok_or_else(|| {
1290            S3Error::with_message(
1291                S3ErrorCode::InternalError,
1292                "upload_part_copy: empty body from source GET",
1293            )
1294        })?;
1295        let bytes = collect_blob(blob, self.max_body_bytes)
1296            .await
1297            .map_err(internal("collect upload_part_copy source body"))?;
1298
1299        // Compress + frame as a fresh part (mirrors upload_part path).
1300        let sample_len = bytes.len().min(SAMPLE_BYTES);
1301        let codec_kind = self.dispatcher.pick(&bytes[..sample_len]).await;
1302        let original_size = bytes.len() as u64;
1303        let (compressed, manifest) = self
1304            .registry
1305            .compress(bytes, codec_kind)
1306            .await
1307            .map_err(internal("registry compress upload_part_copy"))?;
1308        let header = FrameHeader {
1309            codec: codec_kind,
1310            original_size,
1311            compressed_size: compressed.len() as u64,
1312            crc32c: manifest.crc32c,
1313        };
1314        let mut framed = BytesMut::with_capacity(FRAME_HEADER_BYTES + compressed.len());
1315        write_frame(&mut framed, header, &compressed);
1316        let likely_final = original_size < S3_MULTIPART_MIN_PART_BYTES as u64;
1317        if !likely_final {
1318            pad_to_minimum(&mut framed, S3_MULTIPART_MIN_PART_BYTES);
1319        }
1320        let framed_bytes = framed.freeze();
1321        let framed_len = framed_bytes.len() as i64;
1322
1323        // Forward as upload_part to the destination multipart upload.
1324        let part_input = UploadPartInput {
1325            bucket: req.input.bucket.clone(),
1326            key: req.input.key.clone(),
1327            part_number: req.input.part_number,
1328            upload_id: req.input.upload_id.clone(),
1329            body: Some(bytes_to_blob(framed_bytes)),
1330            content_length: Some(framed_len),
1331            ..Default::default()
1332        };
1333        let part_req = S3Request {
1334            input: part_input,
1335            method: http::Method::PUT,
1336            uri: req.uri.clone(),
1337            headers: req.headers.clone(),
1338            extensions: http::Extensions::new(),
1339            credentials: req.credentials.clone(),
1340            region: req.region.clone(),
1341            service: req.service.clone(),
1342            trailing_headers: None,
1343        };
1344        let upload_resp = self.backend.upload_part(part_req).await?;
1345
1346        let copy_output = UploadPartCopyOutput {
1347            copy_part_result: Some(CopyPartResult {
1348                e_tag: upload_resp.output.e_tag.clone(),
1349                ..Default::default()
1350            }),
1351            ..Default::default()
1352        };
1353        Ok(S3Response::new(copy_output))
1354    }
1355
1356    // ---- Object lock / retention / legal hold ----
1357    async fn get_object_lock_configuration(
1358        &self,
1359        req: S3Request<GetObjectLockConfigurationInput>,
1360    ) -> S3Result<S3Response<GetObjectLockConfigurationOutput>> {
1361        self.backend.get_object_lock_configuration(req).await
1362    }
1363    async fn put_object_lock_configuration(
1364        &self,
1365        req: S3Request<PutObjectLockConfigurationInput>,
1366    ) -> S3Result<S3Response<PutObjectLockConfigurationOutput>> {
1367        self.backend.put_object_lock_configuration(req).await
1368    }
1369    async fn get_object_legal_hold(
1370        &self,
1371        req: S3Request<GetObjectLegalHoldInput>,
1372    ) -> S3Result<S3Response<GetObjectLegalHoldOutput>> {
1373        self.backend.get_object_legal_hold(req).await
1374    }
1375    async fn put_object_legal_hold(
1376        &self,
1377        req: S3Request<PutObjectLegalHoldInput>,
1378    ) -> S3Result<S3Response<PutObjectLegalHoldOutput>> {
1379        self.backend.put_object_legal_hold(req).await
1380    }
1381    async fn get_object_retention(
1382        &self,
1383        req: S3Request<GetObjectRetentionInput>,
1384    ) -> S3Result<S3Response<GetObjectRetentionOutput>> {
1385        self.backend.get_object_retention(req).await
1386    }
1387    async fn put_object_retention(
1388        &self,
1389        req: S3Request<PutObjectRetentionInput>,
1390    ) -> S3Result<S3Response<PutObjectRetentionOutput>> {
1391        self.backend.put_object_retention(req).await
1392    }
1393
1394    // ---- Versioning ----
1395    async fn list_object_versions(
1396        &self,
1397        req: S3Request<ListObjectVersionsInput>,
1398    ) -> S3Result<S3Response<ListObjectVersionsOutput>> {
1399        self.backend.list_object_versions(req).await
1400    }
1401    async fn get_bucket_versioning(
1402        &self,
1403        req: S3Request<GetBucketVersioningInput>,
1404    ) -> S3Result<S3Response<GetBucketVersioningOutput>> {
1405        self.backend.get_bucket_versioning(req).await
1406    }
1407    async fn put_bucket_versioning(
1408        &self,
1409        req: S3Request<PutBucketVersioningInput>,
1410    ) -> S3Result<S3Response<PutBucketVersioningOutput>> {
1411        self.backend.put_bucket_versioning(req).await
1412    }
1413
1414    // ---- Bucket location ----
1415    async fn get_bucket_location(
1416        &self,
1417        req: S3Request<GetBucketLocationInput>,
1418    ) -> S3Result<S3Response<GetBucketLocationOutput>> {
1419        self.backend.get_bucket_location(req).await
1420    }
1421
1422    // ---- Bucket policy ----
1423    async fn get_bucket_policy(
1424        &self,
1425        req: S3Request<GetBucketPolicyInput>,
1426    ) -> S3Result<S3Response<GetBucketPolicyOutput>> {
1427        self.backend.get_bucket_policy(req).await
1428    }
1429    async fn put_bucket_policy(
1430        &self,
1431        req: S3Request<PutBucketPolicyInput>,
1432    ) -> S3Result<S3Response<PutBucketPolicyOutput>> {
1433        self.backend.put_bucket_policy(req).await
1434    }
1435    async fn delete_bucket_policy(
1436        &self,
1437        req: S3Request<DeleteBucketPolicyInput>,
1438    ) -> S3Result<S3Response<DeleteBucketPolicyOutput>> {
1439        self.backend.delete_bucket_policy(req).await
1440    }
1441    async fn get_bucket_policy_status(
1442        &self,
1443        req: S3Request<GetBucketPolicyStatusInput>,
1444    ) -> S3Result<S3Response<GetBucketPolicyStatusOutput>> {
1445        self.backend.get_bucket_policy_status(req).await
1446    }
1447
1448    // ---- Bucket ACL ----
1449    async fn get_bucket_acl(
1450        &self,
1451        req: S3Request<GetBucketAclInput>,
1452    ) -> S3Result<S3Response<GetBucketAclOutput>> {
1453        self.backend.get_bucket_acl(req).await
1454    }
1455    async fn put_bucket_acl(
1456        &self,
1457        req: S3Request<PutBucketAclInput>,
1458    ) -> S3Result<S3Response<PutBucketAclOutput>> {
1459        self.backend.put_bucket_acl(req).await
1460    }
1461
1462    // ---- Bucket CORS ----
1463    async fn get_bucket_cors(
1464        &self,
1465        req: S3Request<GetBucketCorsInput>,
1466    ) -> S3Result<S3Response<GetBucketCorsOutput>> {
1467        self.backend.get_bucket_cors(req).await
1468    }
1469    async fn put_bucket_cors(
1470        &self,
1471        req: S3Request<PutBucketCorsInput>,
1472    ) -> S3Result<S3Response<PutBucketCorsOutput>> {
1473        self.backend.put_bucket_cors(req).await
1474    }
1475    async fn delete_bucket_cors(
1476        &self,
1477        req: S3Request<DeleteBucketCorsInput>,
1478    ) -> S3Result<S3Response<DeleteBucketCorsOutput>> {
1479        self.backend.delete_bucket_cors(req).await
1480    }
1481
1482    // ---- Bucket lifecycle ----
1483    async fn get_bucket_lifecycle_configuration(
1484        &self,
1485        req: S3Request<GetBucketLifecycleConfigurationInput>,
1486    ) -> S3Result<S3Response<GetBucketLifecycleConfigurationOutput>> {
1487        self.backend.get_bucket_lifecycle_configuration(req).await
1488    }
1489    async fn put_bucket_lifecycle_configuration(
1490        &self,
1491        req: S3Request<PutBucketLifecycleConfigurationInput>,
1492    ) -> S3Result<S3Response<PutBucketLifecycleConfigurationOutput>> {
1493        self.backend.put_bucket_lifecycle_configuration(req).await
1494    }
1495    async fn delete_bucket_lifecycle(
1496        &self,
1497        req: S3Request<DeleteBucketLifecycleInput>,
1498    ) -> S3Result<S3Response<DeleteBucketLifecycleOutput>> {
1499        self.backend.delete_bucket_lifecycle(req).await
1500    }
1501
1502    // ---- Bucket tagging ----
1503    async fn get_bucket_tagging(
1504        &self,
1505        req: S3Request<GetBucketTaggingInput>,
1506    ) -> S3Result<S3Response<GetBucketTaggingOutput>> {
1507        self.backend.get_bucket_tagging(req).await
1508    }
1509    async fn put_bucket_tagging(
1510        &self,
1511        req: S3Request<PutBucketTaggingInput>,
1512    ) -> S3Result<S3Response<PutBucketTaggingOutput>> {
1513        self.backend.put_bucket_tagging(req).await
1514    }
1515    async fn delete_bucket_tagging(
1516        &self,
1517        req: S3Request<DeleteBucketTaggingInput>,
1518    ) -> S3Result<S3Response<DeleteBucketTaggingOutput>> {
1519        self.backend.delete_bucket_tagging(req).await
1520    }
1521
1522    // ---- Bucket encryption ----
1523    async fn get_bucket_encryption(
1524        &self,
1525        req: S3Request<GetBucketEncryptionInput>,
1526    ) -> S3Result<S3Response<GetBucketEncryptionOutput>> {
1527        self.backend.get_bucket_encryption(req).await
1528    }
1529    async fn put_bucket_encryption(
1530        &self,
1531        req: S3Request<PutBucketEncryptionInput>,
1532    ) -> S3Result<S3Response<PutBucketEncryptionOutput>> {
1533        self.backend.put_bucket_encryption(req).await
1534    }
1535    async fn delete_bucket_encryption(
1536        &self,
1537        req: S3Request<DeleteBucketEncryptionInput>,
1538    ) -> S3Result<S3Response<DeleteBucketEncryptionOutput>> {
1539        self.backend.delete_bucket_encryption(req).await
1540    }
1541
1542    // ---- Bucket logging ----
1543    async fn get_bucket_logging(
1544        &self,
1545        req: S3Request<GetBucketLoggingInput>,
1546    ) -> S3Result<S3Response<GetBucketLoggingOutput>> {
1547        self.backend.get_bucket_logging(req).await
1548    }
1549    async fn put_bucket_logging(
1550        &self,
1551        req: S3Request<PutBucketLoggingInput>,
1552    ) -> S3Result<S3Response<PutBucketLoggingOutput>> {
1553        self.backend.put_bucket_logging(req).await
1554    }
1555
1556    // ---- Bucket notification ----
1557    async fn get_bucket_notification_configuration(
1558        &self,
1559        req: S3Request<GetBucketNotificationConfigurationInput>,
1560    ) -> S3Result<S3Response<GetBucketNotificationConfigurationOutput>> {
1561        self.backend
1562            .get_bucket_notification_configuration(req)
1563            .await
1564    }
1565    async fn put_bucket_notification_configuration(
1566        &self,
1567        req: S3Request<PutBucketNotificationConfigurationInput>,
1568    ) -> S3Result<S3Response<PutBucketNotificationConfigurationOutput>> {
1569        self.backend
1570            .put_bucket_notification_configuration(req)
1571            .await
1572    }
1573
1574    // ---- Bucket request payment ----
1575    async fn get_bucket_request_payment(
1576        &self,
1577        req: S3Request<GetBucketRequestPaymentInput>,
1578    ) -> S3Result<S3Response<GetBucketRequestPaymentOutput>> {
1579        self.backend.get_bucket_request_payment(req).await
1580    }
1581    async fn put_bucket_request_payment(
1582        &self,
1583        req: S3Request<PutBucketRequestPaymentInput>,
1584    ) -> S3Result<S3Response<PutBucketRequestPaymentOutput>> {
1585        self.backend.put_bucket_request_payment(req).await
1586    }
1587
1588    // ---- Bucket website ----
1589    async fn get_bucket_website(
1590        &self,
1591        req: S3Request<GetBucketWebsiteInput>,
1592    ) -> S3Result<S3Response<GetBucketWebsiteOutput>> {
1593        self.backend.get_bucket_website(req).await
1594    }
1595    async fn put_bucket_website(
1596        &self,
1597        req: S3Request<PutBucketWebsiteInput>,
1598    ) -> S3Result<S3Response<PutBucketWebsiteOutput>> {
1599        self.backend.put_bucket_website(req).await
1600    }
1601    async fn delete_bucket_website(
1602        &self,
1603        req: S3Request<DeleteBucketWebsiteInput>,
1604    ) -> S3Result<S3Response<DeleteBucketWebsiteOutput>> {
1605        self.backend.delete_bucket_website(req).await
1606    }
1607
1608    // ---- Bucket replication ----
1609    async fn get_bucket_replication(
1610        &self,
1611        req: S3Request<GetBucketReplicationInput>,
1612    ) -> S3Result<S3Response<GetBucketReplicationOutput>> {
1613        self.backend.get_bucket_replication(req).await
1614    }
1615    async fn put_bucket_replication(
1616        &self,
1617        req: S3Request<PutBucketReplicationInput>,
1618    ) -> S3Result<S3Response<PutBucketReplicationOutput>> {
1619        self.backend.put_bucket_replication(req).await
1620    }
1621    async fn delete_bucket_replication(
1622        &self,
1623        req: S3Request<DeleteBucketReplicationInput>,
1624    ) -> S3Result<S3Response<DeleteBucketReplicationOutput>> {
1625        self.backend.delete_bucket_replication(req).await
1626    }
1627
1628    // ---- Bucket accelerate ----
1629    async fn get_bucket_accelerate_configuration(
1630        &self,
1631        req: S3Request<GetBucketAccelerateConfigurationInput>,
1632    ) -> S3Result<S3Response<GetBucketAccelerateConfigurationOutput>> {
1633        self.backend.get_bucket_accelerate_configuration(req).await
1634    }
1635    async fn put_bucket_accelerate_configuration(
1636        &self,
1637        req: S3Request<PutBucketAccelerateConfigurationInput>,
1638    ) -> S3Result<S3Response<PutBucketAccelerateConfigurationOutput>> {
1639        self.backend.put_bucket_accelerate_configuration(req).await
1640    }
1641
1642    // ---- Bucket ownership controls ----
1643    async fn get_bucket_ownership_controls(
1644        &self,
1645        req: S3Request<GetBucketOwnershipControlsInput>,
1646    ) -> S3Result<S3Response<GetBucketOwnershipControlsOutput>> {
1647        self.backend.get_bucket_ownership_controls(req).await
1648    }
1649    async fn put_bucket_ownership_controls(
1650        &self,
1651        req: S3Request<PutBucketOwnershipControlsInput>,
1652    ) -> S3Result<S3Response<PutBucketOwnershipControlsOutput>> {
1653        self.backend.put_bucket_ownership_controls(req).await
1654    }
1655    async fn delete_bucket_ownership_controls(
1656        &self,
1657        req: S3Request<DeleteBucketOwnershipControlsInput>,
1658    ) -> S3Result<S3Response<DeleteBucketOwnershipControlsOutput>> {
1659        self.backend.delete_bucket_ownership_controls(req).await
1660    }
1661
1662    // ---- Public access block ----
1663    async fn get_public_access_block(
1664        &self,
1665        req: S3Request<GetPublicAccessBlockInput>,
1666    ) -> S3Result<S3Response<GetPublicAccessBlockOutput>> {
1667        self.backend.get_public_access_block(req).await
1668    }
1669    async fn put_public_access_block(
1670        &self,
1671        req: S3Request<PutPublicAccessBlockInput>,
1672    ) -> S3Result<S3Response<PutPublicAccessBlockOutput>> {
1673        self.backend.put_public_access_block(req).await
1674    }
1675    async fn delete_public_access_block(
1676        &self,
1677        req: S3Request<DeletePublicAccessBlockInput>,
1678    ) -> S3Result<S3Response<DeletePublicAccessBlockOutput>> {
1679        self.backend.delete_public_access_block(req).await
1680    }
1681}
1682
1683#[cfg(test)]
1684mod tests {
1685    use super::*;
1686
1687    #[test]
1688    fn manifest_roundtrip_via_metadata() {
1689        let original = ChunkManifest {
1690            codec: CodecKind::CpuZstd,
1691            original_size: 1234,
1692            compressed_size: 567,
1693            crc32c: 0xdead_beef,
1694        };
1695        let mut meta: Option<Metadata> = None;
1696        write_manifest(&mut meta, &original);
1697        let extracted = extract_manifest(&meta).expect("manifest must round-trip");
1698        assert_eq!(extracted.codec, original.codec);
1699        assert_eq!(extracted.original_size, original.original_size);
1700        assert_eq!(extracted.compressed_size, original.compressed_size);
1701        assert_eq!(extracted.crc32c, original.crc32c);
1702    }
1703
1704    #[test]
1705    fn missing_metadata_yields_none() {
1706        let meta: Option<Metadata> = None;
1707        assert!(extract_manifest(&meta).is_none());
1708    }
1709
1710    #[test]
1711    fn partial_metadata_yields_none() {
1712        let mut meta = Metadata::new();
1713        meta.insert(META_CODEC.into(), "cpu-zstd".into());
1714        let opt = Some(meta);
1715        assert!(extract_manifest(&opt).is_none());
1716    }
1717
1718    #[test]
1719    fn parse_copy_source_range_basic() {
1720        let r = parse_copy_source_range("bytes=10-20").unwrap();
1721        match r {
1722            s3s::dto::Range::Int { first, last } => {
1723                assert_eq!(first, 10);
1724                assert_eq!(last, Some(20));
1725            }
1726            _ => panic!("expected Int range"),
1727        }
1728    }
1729
1730    #[test]
1731    fn parse_copy_source_range_rejects_inverted() {
1732        let err = parse_copy_source_range("bytes=20-10").unwrap_err();
1733        assert!(err.contains("last < first"));
1734    }
1735
1736    #[test]
1737    fn parse_copy_source_range_rejects_missing_prefix() {
1738        let err = parse_copy_source_range("10-20").unwrap_err();
1739        assert!(err.contains("must start with 'bytes='"));
1740    }
1741
1742    #[test]
1743    fn parse_copy_source_range_rejects_open_ended() {
1744        // S3 upload_part_copy spec requires N-M (closed); suffix and
1745        // open-ended forms are not allowed for this header.
1746        assert!(parse_copy_source_range("bytes=10-").is_err());
1747        assert!(parse_copy_source_range("bytes=-10").is_err());
1748    }
1749}