Skip to main content

s4_server/
service.rs

1//! `s3s::S3` 実装 — `s3s_aws::Proxy` への delegation を default にしつつ、
2//! `put_object` / `get_object` 経路で `s4_codec::CodecRegistry` を呼ぶ。
3//!
4//! ## カバー範囲 (Phase 1 月 2)
5//!
6//! - 圧縮 hook あり: `put_object`, `get_object`
7//! - 純 delegation (圧縮なし): `head_bucket`, `list_buckets`, `create_bucket`, `delete_bucket`,
8//!   `head_object`, `delete_object`, `delete_objects`, `copy_object`, `list_objects`,
9//!   `list_objects_v2`, `create_multipart_upload`, `upload_part`,
10//!   `complete_multipart_upload`, `abort_multipart_upload`, `list_multipart_uploads`,
11//!   `list_parts`
12//! - 未対応 (デフォルトで NotImplemented): その他 80+ ops (Tagging / ACL / Lifecycle 等は Phase 2)
13//!
14//! ## アーキテクチャ
15//!
16//! - `S4Service<B>` は backend (B: S3) と `Arc<CodecRegistry>` と `Arc<dyn CodecDispatcher>`
17//!   を保持する。`CodecRegistry` 経由で複数 codec を抱えられるので、ひとつの S4 インスタンスが
18//!   複数 codec で書かれた object を透過的に GET できる
19//! - PUT: dispatcher が body の先頭 sample から codec を選び、registry で compress、
20//!   manifest を S3 metadata に書いて backend に forward
21//! - GET: backend から取得 → metadata から manifest を復元 → registry.decompress で
22//!   manifest 指定の codec で解凍 → 元の bytes を return
23//!
24//! ## 既知の制限事項
25//!
26//! - **Multipart Upload は per-part 圧縮が未実装**: 現状は upload_part を素通し。
27//!   Phase 1 月 2 後半で per-part compress + complete_multipart_upload で manifest 集約。
28//! - **PUT body は memory に collect**: max_body_bytes 上限あり (default 5 GiB = S3 単発 PUT 上限)。
29//!   Streaming-aware 圧縮は Phase 2。
30
31use std::sync::Arc;
32
33use bytes::BytesMut;
34use s3s::dto::*;
35use s3s::{S3, S3Error, S3ErrorCode, S3Request, S3Response, S3Result};
36use s4_codec::index::{FrameIndex, build_index_from_body, decode_index, encode_index, sidecar_key};
37use s4_codec::multipart::{
38    FRAME_HEADER_BYTES, FrameHeader, FrameIter, S3_MULTIPART_MIN_PART_BYTES, pad_to_minimum,
39    write_frame,
40};
41use s4_codec::{ChunkManifest, CodecDispatcher, CodecKind, CodecRegistry};
42use std::time::Instant;
43use tracing::{debug, info};
44
45use crate::blob::{
46    bytes_to_blob, chain_sample_with_rest, collect_blob, collect_with_sample, peek_sample,
47};
48use crate::streaming::{
49    DEFAULT_S4F2_CHUNK_SIZE, cpu_zstd_decompress_stream, streaming_compress_to_frames,
50    supports_streaming_compress, supports_streaming_decompress,
51};
52
53/// PUT body の先頭 sampling で渡す最大 byte 数。
54const SAMPLE_BYTES: usize = 4096;
55
56pub struct S4Service<B: S3> {
57    backend: B,
58    registry: Arc<CodecRegistry>,
59    dispatcher: Arc<dyn CodecDispatcher>,
60    max_body_bytes: usize,
61    policy: Option<crate::policy::SharedPolicy>,
62    /// v0.3 #13: surfaced as the `aws:SecureTransport` Condition key. Set
63    /// to `true` when the listener is wrapped in TLS (or ACME), so policies
64    /// gating "deny if not over TLS" can do their job. Defaults to `false`
65    /// (HTTP); set via [`S4Service::with_secure_transport`] at boot.
66    secure_transport: bool,
67}
68
69impl<B: S3> S4Service<B> {
70    /// AWS S3 単発 PUT の API 上限 (5 GiB)
71    pub const DEFAULT_MAX_BODY_BYTES: usize = 5 * 1024 * 1024 * 1024;
72
73    pub fn new(
74        backend: B,
75        registry: Arc<CodecRegistry>,
76        dispatcher: Arc<dyn CodecDispatcher>,
77    ) -> Self {
78        Self {
79            backend,
80            registry,
81            dispatcher,
82            max_body_bytes: Self::DEFAULT_MAX_BODY_BYTES,
83            policy: None,
84            secure_transport: false,
85        }
86    }
87
88    /// Tell the policy evaluator that the listener is reached over TLS
89    /// (or ACME). When `true`, the `aws:SecureTransport` Condition key
90    /// resolves to `true`. Defaults to `false`.
91    #[must_use]
92    pub fn with_secure_transport(mut self, on: bool) -> Self {
93        self.secure_transport = on;
94        self
95    }
96
97    #[must_use]
98    pub fn with_max_body_bytes(mut self, n: usize) -> Self {
99        self.max_body_bytes = n;
100        self
101    }
102
103    /// Attach an optional bucket policy (v0.2 #7). When `Some(...)`, every
104    /// PUT / GET / DELETE / List handler runs `policy.evaluate(...)` before
105    /// delegating to the backend; failures return `S3ErrorCode::AccessDenied`.
106    /// When `None` (the default), no policy enforcement happens.
107    #[must_use]
108    pub fn with_policy(mut self, policy: crate::policy::SharedPolicy) -> Self {
109        self.policy = Some(policy);
110        self
111    }
112
113    /// Pull the SigV4 access key id off the request's credentials, if any.
114    /// Used as the `principal_id` for policy evaluation.
115    fn principal_of<I>(req: &S3Request<I>) -> Option<&str> {
116        req.credentials.as_ref().map(|c| c.access_key.as_str())
117    }
118
119    /// v0.3 #13: build the per-request policy context from the incoming
120    /// `S3Request`. Pulls `aws:UserAgent` from the User-Agent header,
121    /// `aws:SourceIp` from the standard `X-Forwarded-For` header (most
122    /// production deployments are behind an LB / reverse proxy that sets
123    /// this), `aws:CurrentTime` from the system clock, and
124    /// `aws:SecureTransport` from the per-listener TLS flag.
125    fn request_context<I>(&self, req: &S3Request<I>) -> crate::policy::RequestContext {
126        let user_agent = req
127            .headers
128            .get("user-agent")
129            .and_then(|v| v.to_str().ok())
130            .map(str::to_owned);
131        // X-Forwarded-For is `client, proxy1, proxy2`; the leftmost entry
132        // is the original client. Trim and parse leniently.
133        let source_ip = req
134            .headers
135            .get("x-forwarded-for")
136            .and_then(|v| v.to_str().ok())
137            .and_then(|raw| raw.split(',').next())
138            .and_then(|s| s.trim().parse().ok());
139        crate::policy::RequestContext {
140            source_ip,
141            user_agent,
142            request_time: Some(std::time::SystemTime::now()),
143            secure_transport: self.secure_transport,
144            extra: Default::default(),
145        }
146    }
147
148    /// Helper used by request handlers to enforce the optional policy.
149    /// Returns `Ok(())` when allowed (or no policy is configured), or an
150    /// `AccessDenied` S3Error otherwise. Bumps the policy denial Prometheus
151    /// counter on deny.
152    fn enforce_policy<I>(
153        &self,
154        req: &S3Request<I>,
155        action: &'static str,
156        bucket: &str,
157        key: Option<&str>,
158    ) -> S3Result<()> {
159        let Some(policy) = self.policy.as_ref() else {
160            return Ok(());
161        };
162        let principal_id = Self::principal_of(req);
163        let ctx = self.request_context(req);
164        let decision = policy.evaluate_with(action, bucket, key, principal_id, &ctx);
165        if decision.allow {
166            Ok(())
167        } else {
168            crate::metrics::record_policy_denial(action, bucket);
169            tracing::info!(
170                action,
171                bucket,
172                key = ?key,
173                principal = ?principal_id,
174                source_ip = ?ctx.source_ip,
175                user_agent = ?ctx.user_agent,
176                secure_transport = ctx.secure_transport,
177                matched_sid = ?decision.matched_sid,
178                effect = ?decision.matched_effect,
179                "S4 policy denied request"
180            );
181            Err(S3Error::with_message(
182                S3ErrorCode::AccessDenied,
183                format!("denied by S4 policy: {action} on bucket={bucket}"),
184            ))
185        }
186    }
187
188    /// テスト用: backend を取り戻す (test helper、production では使わない)
189    pub fn into_backend(self) -> B {
190        self.backend
191    }
192
193    /// 必要 frame だけを backend に Range GET し、frame parse + decompress + slice
194    /// した結果を返す sidecar fast path。Range request の **帯域節約版**。
195    async fn partial_range_get(
196        &self,
197        req: &S3Request<GetObjectInput>,
198        plan: s4_codec::index::RangePlan,
199        client_start: u64,
200        client_end_exclusive: u64,
201        total_original: u64,
202        get_start: Instant,
203    ) -> S3Result<S3Response<GetObjectOutput>> {
204        // 必要 byte 範囲だけを backend に partial GET
205        let backend_range = s3s::dto::Range::Int {
206            first: plan.byte_start,
207            last: Some(plan.byte_end_exclusive - 1),
208        };
209        let backend_input = GetObjectInput {
210            bucket: req.input.bucket.clone(),
211            key: req.input.key.clone(),
212            range: Some(backend_range),
213            ..Default::default()
214        };
215        let backend_req = S3Request {
216            input: backend_input,
217            method: req.method.clone(),
218            uri: req.uri.clone(),
219            headers: req.headers.clone(),
220            extensions: http::Extensions::new(),
221            credentials: req.credentials.clone(),
222            region: req.region.clone(),
223            service: req.service.clone(),
224            trailing_headers: None,
225        };
226        let mut backend_resp = self.backend.get_object(backend_req).await?;
227        let blob = backend_resp.output.body.take().ok_or_else(|| {
228            S3Error::with_message(
229                S3ErrorCode::InternalError,
230                "backend partial GET returned empty body",
231            )
232        })?;
233        let bytes = collect_blob(blob, self.max_body_bytes)
234            .await
235            .map_err(internal("collect partial body"))?;
236
237        // frame parse + decompress
238        let mut combined = BytesMut::new();
239        for frame in FrameIter::new(bytes) {
240            let (header, payload) = frame.map_err(|e| {
241                S3Error::with_message(
242                    S3ErrorCode::InternalError,
243                    format!("partial-range frame parse: {e}"),
244                )
245            })?;
246            let chunk_manifest = ChunkManifest {
247                codec: header.codec,
248                original_size: header.original_size,
249                compressed_size: header.compressed_size,
250                crc32c: header.crc32c,
251            };
252            let decompressed = self
253                .registry
254                .decompress(payload, &chunk_manifest)
255                .await
256                .map_err(internal("partial-range decompress"))?;
257            combined.extend_from_slice(&decompressed);
258        }
259        let combined = combined.freeze();
260        let sliced = combined
261            .slice(plan.slice_start_in_combined as usize..plan.slice_end_in_combined as usize);
262
263        // response 組立て
264        let returned_size = sliced.len() as u64;
265        backend_resp.output.content_length = Some(returned_size as i64);
266        backend_resp.output.content_range = Some(format!(
267            "bytes {client_start}-{}/{total_original}",
268            client_end_exclusive - 1
269        ));
270        backend_resp.output.checksum_crc32 = None;
271        backend_resp.output.checksum_crc32c = None;
272        backend_resp.output.checksum_crc64nvme = None;
273        backend_resp.output.checksum_sha1 = None;
274        backend_resp.output.checksum_sha256 = None;
275        backend_resp.output.e_tag = None;
276        backend_resp.output.body = Some(bytes_to_blob(sliced));
277        backend_resp.status = Some(http::StatusCode::PARTIAL_CONTENT);
278
279        let elapsed = get_start.elapsed();
280        crate::metrics::record_get(
281            "partial",
282            plan.byte_end_exclusive - plan.byte_start,
283            returned_size,
284            elapsed.as_secs_f64(),
285            true,
286        );
287        info!(
288            op = "get_object",
289            bucket = %req.input.bucket,
290            key = %req.input.key,
291            bytes_in = plan.byte_end_exclusive - plan.byte_start,
292            bytes_out = returned_size,
293            total_object_size = total_original,
294            range = true,
295            path = "sidecar-partial",
296            latency_ms = elapsed.as_millis() as u64,
297            "S4 partial Range GET via sidecar index"
298        );
299        Ok(backend_resp)
300    }
301
302    /// `<key>.s4index` sidecar object を backend に書く。失敗しても本体 PUT は
303    /// 成功扱いにしたいので、err は warn ログのみ (Range GET の partial path が
304    /// 使えなくなるが、full read fallback で意味的には正しい結果を返す)。
305    async fn write_sidecar(&self, bucket: &str, key: &str, index: &FrameIndex) {
306        let bytes = encode_index(index);
307        let len = bytes.len() as i64;
308        let put_input = PutObjectInput {
309            bucket: bucket.into(),
310            key: sidecar_key(key),
311            body: Some(bytes_to_blob(bytes)),
312            content_length: Some(len),
313            content_type: Some("application/x-s4-index".into()),
314            ..Default::default()
315        };
316        let put_req = S3Request {
317            input: put_input,
318            method: http::Method::PUT,
319            uri: format!("/{bucket}/{}", sidecar_key(key)).parse().unwrap(),
320            headers: http::HeaderMap::new(),
321            extensions: http::Extensions::new(),
322            credentials: None,
323            region: None,
324            service: None,
325            trailing_headers: None,
326        };
327        if let Err(e) = self.backend.put_object(put_req).await {
328            tracing::warn!(
329                bucket,
330                key,
331                "S4 write_sidecar failed (Range GET will fall back to full read): {e}"
332            );
333        }
334    }
335
336    /// `<key>.s4index` sidecar を backend から読み出す。なければ None。
337    async fn read_sidecar(&self, bucket: &str, key: &str) -> Option<FrameIndex> {
338        let get_input = GetObjectInput {
339            bucket: bucket.into(),
340            key: sidecar_key(key),
341            ..Default::default()
342        };
343        let get_req = S3Request {
344            input: get_input,
345            method: http::Method::GET,
346            uri: format!("/{bucket}/{}", sidecar_key(key)).parse().unwrap(),
347            headers: http::HeaderMap::new(),
348            extensions: http::Extensions::new(),
349            credentials: None,
350            region: None,
351            service: None,
352            trailing_headers: None,
353        };
354        let resp = self.backend.get_object(get_req).await.ok()?;
355        let blob = resp.output.body?;
356        let bytes = collect_blob(blob, 64 * 1024 * 1024).await.ok()?;
357        decode_index(bytes).ok()
358    }
359
360    /// Multipart object (frame 列) を解凍 → 元 bytes を再構築。
361    ///
362    /// **per-frame codec dispatch**: 各 frame header に codec_id が入っているので、
363    /// frame ごとに registry が違う codec を呼ぶことができる。同一 object 内で
364    /// 異なる codec が混在していても透過的に解凍可能 (parquet 風 mixed columns 等)。
365    async fn decompress_multipart(&self, bytes: bytes::Bytes) -> S3Result<bytes::Bytes> {
366        let mut out = BytesMut::new();
367        for frame in FrameIter::new(bytes) {
368            let (header, payload) = frame.map_err(|e| {
369                S3Error::with_message(
370                    S3ErrorCode::InternalError,
371                    format!("multipart frame parse: {e}"),
372                )
373            })?;
374            let chunk_manifest = ChunkManifest {
375                codec: header.codec,
376                original_size: header.original_size,
377                compressed_size: header.compressed_size,
378                crc32c: header.crc32c,
379            };
380            let decompressed = self
381                .registry
382                .decompress(payload, &chunk_manifest)
383                .await
384                .map_err(internal("multipart frame decompress"))?;
385            out.extend_from_slice(&decompressed);
386        }
387        Ok(out.freeze())
388    }
389}
390
391/// Parse a CopySourceRange header value (`bytes=N-M`, `bytes=N-`, `bytes=-N`)
392/// into the s3s::dto::Range used by the GetObject path. The S3 spec only
393/// allows `bytes=N-M` for upload_part_copy (no suffix or open-ended), so
394/// reject the other variants for parity with AWS.
395fn parse_copy_source_range(s: &str) -> Result<s3s::dto::Range, String> {
396    let rest = s
397        .strip_prefix("bytes=")
398        .ok_or_else(|| format!("CopySourceRange must start with 'bytes=', got {s:?}"))?;
399    let (a, b) = rest
400        .split_once('-')
401        .ok_or_else(|| format!("CopySourceRange must be 'bytes=N-M', got {s:?}"))?;
402    let first: u64 = a
403        .parse()
404        .map_err(|_| format!("CopySourceRange first byte not a number: {a:?}"))?;
405    let last: u64 = b
406        .parse()
407        .map_err(|_| format!("CopySourceRange last byte not a number: {b:?}"))?;
408    if last < first {
409        return Err(format!("CopySourceRange last < first: {s:?}"));
410    }
411    Ok(s3s::dto::Range::Int {
412        first,
413        last: Some(last),
414    })
415}
416
417fn is_multipart_object(metadata: &Option<Metadata>) -> bool {
418    metadata
419        .as_ref()
420        .and_then(|m| m.get(META_MULTIPART))
421        .map(|v| v == "true")
422        .unwrap_or(false)
423}
424
425const META_CODEC: &str = "s4-codec";
426const META_ORIGINAL_SIZE: &str = "s4-original-size";
427const META_COMPRESSED_SIZE: &str = "s4-compressed-size";
428const META_CRC32C: &str = "s4-crc32c";
429/// Multipart upload で per-part frame format を使ったオブジェクトであることを示す。
430/// GET 時にこの flag を見て frame parser を起動する。
431const META_MULTIPART: &str = "s4-multipart";
432/// v0.2 #4: single-PUT でも S4F2 framed format で書かれていることを示す。
433/// 旧 v0.1 single-PUT は raw 圧縮 bytes (この flag なし)。GET 時にこの flag を
434/// 見て framed 経路 (= multipart と同じ FrameIter parse) に流す。
435const META_FRAMED: &str = "s4-framed";
436
437fn is_framed_v2_object(metadata: &Option<Metadata>) -> bool {
438    metadata
439        .as_ref()
440        .and_then(|m| m.get(META_FRAMED))
441        .map(|v| v == "true")
442        .unwrap_or(false)
443}
444
445fn extract_manifest(metadata: &Option<Metadata>) -> Option<ChunkManifest> {
446    let m = metadata.as_ref()?;
447    let codec = m
448        .get(META_CODEC)
449        .and_then(|s| s.parse::<CodecKind>().ok())?;
450    let original_size = m.get(META_ORIGINAL_SIZE)?.parse().ok()?;
451    let compressed_size = m.get(META_COMPRESSED_SIZE)?.parse().ok()?;
452    let crc32c = m.get(META_CRC32C)?.parse().ok()?;
453    Some(ChunkManifest {
454        codec,
455        original_size,
456        compressed_size,
457        crc32c,
458    })
459}
460
461fn write_manifest(metadata: &mut Option<Metadata>, manifest: &ChunkManifest) {
462    let meta = metadata.get_or_insert_with(Default::default);
463    meta.insert(META_CODEC.into(), manifest.codec.as_str().into());
464    meta.insert(
465        META_ORIGINAL_SIZE.into(),
466        manifest.original_size.to_string(),
467    );
468    meta.insert(
469        META_COMPRESSED_SIZE.into(),
470        manifest.compressed_size.to_string(),
471    );
472    meta.insert(META_CRC32C.into(), manifest.crc32c.to_string());
473}
474
475fn internal<E: std::fmt::Display>(prefix: &'static str) -> impl FnOnce(E) -> S3Error {
476    move |e| S3Error::with_message(S3ErrorCode::InternalError, format!("{prefix}: {e}"))
477}
478
479/// `Range` request を decompressed object サイズ `total` に適用して `(start, end_exclusive)`
480/// を返す。`Range::Int { first, last }` は `bytes=first-last` (last は inclusive)、
481/// `Range::Suffix { length }` は末尾 `length` byte。S3 仕様に準拠。
482pub fn resolve_range(range: &s3s::dto::Range, total: u64) -> Result<(u64, u64), String> {
483    if total == 0 {
484        return Err("cannot range-get zero-length object".into());
485    }
486    match range {
487        s3s::dto::Range::Int { first, last } => {
488            let start = *first;
489            let end_inclusive = match last {
490                Some(l) => (*l).min(total - 1),
491                None => total - 1,
492            };
493            if start > end_inclusive || start >= total {
494                return Err(format!(
495                    "range bytes={start}-{:?} out of object size {total}",
496                    last
497                ));
498            }
499            Ok((start, end_inclusive + 1))
500        }
501        s3s::dto::Range::Suffix { length } => {
502            let len = (*length).min(total);
503            Ok((total - len, total))
504        }
505    }
506}
507
508#[async_trait::async_trait]
509impl<B: S3> S3 for S4Service<B> {
510    // === 圧縮を挟む path (PUT) ===
511    #[tracing::instrument(
512        name = "s4.put_object",
513        skip(self, req),
514        fields(bucket = %req.input.bucket, key = %req.input.key, codec, bytes_in, bytes_out, latency_ms)
515    )]
516    async fn put_object(
517        &self,
518        mut req: S3Request<PutObjectInput>,
519    ) -> S3Result<S3Response<PutObjectOutput>> {
520        let put_start = Instant::now();
521        let put_bucket = req.input.bucket.clone();
522        let put_key = req.input.key.clone();
523        self.enforce_policy(&req, "s3:PutObject", &put_bucket, Some(&put_key))?;
524        if let Some(blob) = req.input.body.take() {
525            // Sample 4 KiB から codec を決定。streaming-aware codec なら streaming
526            // compress fast path、そうでなければ従来の collect-then-compress。
527            let (sample, rest_stream) = peek_sample(blob, SAMPLE_BYTES)
528                .await
529                .map_err(internal("peek put sample"))?;
530            let sample_len = sample.len().min(SAMPLE_BYTES);
531            let kind = self.dispatcher.pick(&sample[..sample_len]).await;
532
533            // Passthrough buys nothing from S4F2 wrapping (no compression =
534            // no per-chunk frame to skip past) and the +28-byte header
535            // overhead breaks size-sensitive callers that expect a true
536            // pass-through. So passthrough always uses the legacy raw-blob
537            // path; only compressing codecs go through the framed path.
538            let use_framed = supports_streaming_compress(kind) && kind != CodecKind::Passthrough;
539            let (compressed, manifest, is_framed) = if use_framed {
540                // streaming fast path: input は memory に collect しない
541                let chained = chain_sample_with_rest(sample, rest_stream);
542                debug!(
543                    bucket = ?req.input.bucket,
544                    key = ?req.input.key,
545                    codec = kind.as_str(),
546                    path = "streaming-framed",
547                    "S4 put_object: compressing (streaming, S4F2 multi-frame)"
548                );
549                let (body, manifest) = streaming_compress_to_frames(
550                    chained,
551                    Arc::clone(&self.registry),
552                    kind,
553                    DEFAULT_S4F2_CHUNK_SIZE,
554                )
555                .await
556                .map_err(internal("streaming framed compress"))?;
557                (body, manifest, true)
558            } else {
559                // GPU codec 等で streaming-aware でないものは bytes-buffered path
560                // (raw 圧縮 bytes、framed なし — back-compat 互換 path)
561                let bytes = collect_with_sample(sample, rest_stream, self.max_body_bytes)
562                    .await
563                    .map_err(internal("collect put body (buffered path)"))?;
564                debug!(
565                    bucket = ?req.input.bucket,
566                    key = ?req.input.key,
567                    bytes = bytes.len(),
568                    codec = kind.as_str(),
569                    path = "buffered",
570                    "S4 put_object: compressing (buffered, raw blob)"
571                );
572                let (body, m) = self
573                    .registry
574                    .compress(bytes, kind)
575                    .await
576                    .map_err(internal("registry compress"))?;
577                (body, m, false)
578            };
579
580            write_manifest(&mut req.input.metadata, &manifest);
581            if is_framed {
582                // v0.2 #4: framed body であることを GET 側に伝える meta flag。
583                req.input
584                    .metadata
585                    .get_or_insert_with(Default::default)
586                    .insert(META_FRAMED.into(), "true".into());
587            }
588            // 重要: content_length を圧縮後サイズで更新する。
589            // これを忘れると下流 (aws-sdk-s3 → S3) が宣言サイズ分の bytes を
590            // 待ち続けて RequestTimeout で失敗する (S3 仕様)。
591            req.input.content_length = Some(compressed.len() as i64);
592            // body を書き換えたので、客側が送ってきた original body 用の
593            // checksum / MD5 ヘッダは無効化する (そのまま転送すると下流 S3 が
594            // XAmzContentChecksumMismatch を返す)。S4 自身の整合性は
595            // ChunkManifest.crc32c で担保している。
596            req.input.checksum_algorithm = None;
597            req.input.checksum_crc32 = None;
598            req.input.checksum_crc32c = None;
599            req.input.checksum_crc64nvme = None;
600            req.input.checksum_sha1 = None;
601            req.input.checksum_sha256 = None;
602            req.input.content_md5 = None;
603            let original_size = manifest.original_size;
604            let compressed_size = manifest.compressed_size;
605            let codec_label = manifest.codec.as_str();
606            // framed body は GET 側で sidecar partial-fetch を効かせるため
607            // build_index_from_body で sidecar を組み立てて backend に PUT する。
608            let sidecar_index = if is_framed {
609                s4_codec::index::build_index_from_body(&compressed).ok()
610            } else {
611                None
612            };
613            req.input.body = Some(bytes_to_blob(compressed));
614            let backend_resp = self.backend.put_object(req).await;
615            if let Some(idx) = sidecar_index
616                && backend_resp.is_ok()
617                && idx.entries.len() > 1
618            {
619                // 1 chunk しかない (small object) なら sidecar は意味がない (=
620                // partial fetch しても full body と同じ範囲) ので省略。
621                self.write_sidecar(&put_bucket, &put_key, &idx).await;
622            }
623            let _ = (original_size, compressed_size); // mute unused warnings
624            let elapsed = put_start.elapsed();
625            crate::metrics::record_put(
626                codec_label,
627                original_size,
628                compressed_size,
629                elapsed.as_secs_f64(),
630                backend_resp.is_ok(),
631            );
632            info!(
633                op = "put_object",
634                bucket = %put_bucket,
635                key = %put_key,
636                codec = codec_label,
637                bytes_in = original_size,
638                bytes_out = compressed_size,
639                ratio = format!(
640                    "{:.3}",
641                    if original_size == 0 { 1.0 } else { compressed_size as f64 / original_size as f64 }
642                ),
643                latency_ms = elapsed.as_millis() as u64,
644                ok = backend_resp.is_ok(),
645                "S4 put completed"
646            );
647            return backend_resp;
648        }
649        self.backend.put_object(req).await
650    }
651
652    // === 圧縮を解く path (GET) ===
653    #[tracing::instrument(
654        name = "s4.get_object",
655        skip(self, req),
656        fields(bucket = %req.input.bucket, key = %req.input.key, codec, bytes_out, range, path)
657    )]
658    async fn get_object(
659        &self,
660        mut req: S3Request<GetObjectInput>,
661    ) -> S3Result<S3Response<GetObjectOutput>> {
662        let get_start = Instant::now();
663        let get_bucket = req.input.bucket.clone();
664        let get_key = req.input.key.clone();
665        self.enforce_policy(&req, "s3:GetObject", &get_bucket, Some(&get_key))?;
666        // Range request の事前検出 (decompress 後 slice する path に使う)。
667        let range_request = req.input.range.take();
668
669        // ====== Range GET の partial-fetch fast path (sidecar index 利用) ======
670        // sidecar `<key>.s4index` が存在し、multipart-framed object であれば
671        // 必要 frame だけを backend に Range GET し帯域節約する。
672        if let Some(ref r) = range_request
673            && let Some(index) = self.read_sidecar(&req.input.bucket, &req.input.key).await
674        {
675            let total = index.total_original_size();
676            let (start, end_exclusive) = match resolve_range(r, total) {
677                Ok(v) => v,
678                Err(e) => {
679                    return Err(S3Error::with_message(S3ErrorCode::InvalidRange, e));
680                }
681            };
682            if let Some(plan) = index.lookup_range(start, end_exclusive) {
683                return self
684                    .partial_range_get(&req, plan, start, end_exclusive, total, get_start)
685                    .await;
686            }
687        }
688        let mut resp = self.backend.get_object(req).await?;
689        let is_multipart = is_multipart_object(&resp.output.metadata);
690        let is_framed_v2 = is_framed_v2_object(&resp.output.metadata);
691        // v0.2 #4: framed-v2 single-PUT は多 frame parse が必要なので
692        // multipart と同じ path に流す。
693        let needs_frame_parse = is_multipart || is_framed_v2;
694        let manifest_opt = extract_manifest(&resp.output.metadata);
695
696        if !needs_frame_parse && manifest_opt.is_none() {
697            // S4 が書いていないオブジェクトは透過 (raw bucket pre-existing object 等)
698            debug!("S4 get_object: object lacks s4-codec metadata, returning as-is");
699            return Ok(resp);
700        }
701
702        if let Some(blob) = resp.output.body.take() {
703            // ====== Streaming fast path (CpuZstd, non-multipart, codec supports it) ======
704            // 大規模 object (e.g. 5 GB) を memory に collect すると OOM するので、
705            // codec が streaming-aware なら body を chunk-by-chunk で decompress して
706            // 即座に client に流す。
707            //
708            // ただし Range request 時は streaming できない (slice するため total bytes
709            // が必要) → buffered path に fall through。
710            if range_request.is_none()
711                && !needs_frame_parse
712                && let Some(ref m) = manifest_opt
713                && supports_streaming_decompress(m.codec)
714                && m.codec == CodecKind::CpuZstd
715            {
716                let decompressed_blob = cpu_zstd_decompress_stream(blob);
717                resp.output.content_length = Some(m.original_size as i64);
718                resp.output.checksum_crc32 = None;
719                resp.output.checksum_crc32c = None;
720                resp.output.checksum_crc64nvme = None;
721                resp.output.checksum_sha1 = None;
722                resp.output.checksum_sha256 = None;
723                resp.output.e_tag = None;
724                resp.output.body = Some(decompressed_blob);
725                let elapsed = get_start.elapsed();
726                crate::metrics::record_get(
727                    m.codec.as_str(),
728                    m.compressed_size,
729                    m.original_size,
730                    elapsed.as_secs_f64(),
731                    true,
732                );
733                info!(
734                    op = "get_object",
735                    bucket = %get_bucket,
736                    key = %get_key,
737                    codec = m.codec.as_str(),
738                    bytes_in = m.compressed_size,
739                    bytes_out = m.original_size,
740                    path = "streaming",
741                    setup_latency_ms = elapsed.as_millis() as u64,
742                    "S4 get started (streaming)"
743                );
744                return Ok(resp);
745            }
746            // Passthrough: そのまま流す (Range なしの場合のみ streaming)
747            if range_request.is_none()
748                && !needs_frame_parse
749                && let Some(ref m) = manifest_opt
750                && m.codec == CodecKind::Passthrough
751            {
752                resp.output.content_length = Some(m.original_size as i64);
753                resp.output.checksum_crc32 = None;
754                resp.output.checksum_crc32c = None;
755                resp.output.checksum_crc64nvme = None;
756                resp.output.checksum_sha1 = None;
757                resp.output.checksum_sha256 = None;
758                resp.output.e_tag = None;
759                resp.output.body = Some(blob);
760                debug!("S4 get_object: passthrough streaming");
761                return Ok(resp);
762            }
763
764            // ====== Buffered slow path (multipart frame parser, GPU codecs) ======
765            let bytes = collect_blob(blob, self.max_body_bytes)
766                .await
767                .map_err(internal("collect get body"))?;
768
769            let decompressed = if needs_frame_parse {
770                // multipart objects と framed-v2 single-PUT objects は同じ
771                // S4F2 frame 列なので decompress_multipart で統一処理
772                self.decompress_multipart(bytes).await?
773            } else {
774                let manifest = manifest_opt.as_ref().expect("non-multipart guarded above");
775                self.registry
776                    .decompress(bytes, manifest)
777                    .await
778                    .map_err(internal("registry decompress"))?
779            };
780
781            // Range request があれば slice。なければ full body を返す。
782            let total_size = decompressed.len() as u64;
783            let (final_bytes, status_override) = if let Some(r) = range_request.as_ref() {
784                let (start, end) = resolve_range(r, total_size)
785                    .map_err(|e| S3Error::with_message(S3ErrorCode::InvalidRange, e))?;
786                let sliced = decompressed.slice(start as usize..end as usize);
787                resp.output.content_range = Some(format!(
788                    "bytes {start}-{}/{total_size}",
789                    end.saturating_sub(1)
790                ));
791                (sliced, Some(http::StatusCode::PARTIAL_CONTENT))
792            } else {
793                (decompressed, None)
794            };
795            // 解凍後の真のサイズを返す (S3 client は content_length を信頼するので
796            // 圧縮 size のままだと downstream が body を途中で切ってしまう)
797            resp.output.content_length = Some(final_bytes.len() as i64);
798            // 圧縮済 bytes の checksum を返すと AWS SDK 側で StreamingError
799            // (ChecksumMismatch) になる。ETag も backend が返した「圧縮済 bytes の
800            // MD5/checksum」なので意味的にズレる — クリアして S4 自身の crc32c
801            // (manifest 内 / frame 内) で integrity を保証する設計にする。
802            resp.output.checksum_crc32 = None;
803            resp.output.checksum_crc32c = None;
804            resp.output.checksum_crc64nvme = None;
805            resp.output.checksum_sha1 = None;
806            resp.output.checksum_sha256 = None;
807            resp.output.e_tag = None;
808            let returned_size = final_bytes.len() as u64;
809            let codec_label = manifest_opt
810                .as_ref()
811                .map(|m| m.codec.as_str())
812                .unwrap_or("multipart");
813            resp.output.body = Some(bytes_to_blob(final_bytes));
814            if let Some(status) = status_override {
815                resp.status = Some(status);
816            }
817            let elapsed = get_start.elapsed();
818            crate::metrics::record_get(codec_label, 0, returned_size, elapsed.as_secs_f64(), true);
819            info!(
820                op = "get_object",
821                bucket = %get_bucket,
822                key = %get_key,
823                codec = codec_label,
824                bytes_out = returned_size,
825                total_object_size = total_size,
826                range = range_request.is_some(),
827                path = "buffered",
828                latency_ms = elapsed.as_millis() as u64,
829                "S4 get completed (buffered)"
830            );
831        }
832        Ok(resp)
833    }
834
835    // === passthrough delegations ===
836    async fn head_bucket(
837        &self,
838        req: S3Request<HeadBucketInput>,
839    ) -> S3Result<S3Response<HeadBucketOutput>> {
840        self.backend.head_bucket(req).await
841    }
842    async fn list_buckets(
843        &self,
844        req: S3Request<ListBucketsInput>,
845    ) -> S3Result<S3Response<ListBucketsOutput>> {
846        self.backend.list_buckets(req).await
847    }
848    async fn create_bucket(
849        &self,
850        req: S3Request<CreateBucketInput>,
851    ) -> S3Result<S3Response<CreateBucketOutput>> {
852        self.backend.create_bucket(req).await
853    }
854    async fn delete_bucket(
855        &self,
856        req: S3Request<DeleteBucketInput>,
857    ) -> S3Result<S3Response<DeleteBucketOutput>> {
858        self.backend.delete_bucket(req).await
859    }
860    async fn head_object(
861        &self,
862        req: S3Request<HeadObjectInput>,
863    ) -> S3Result<S3Response<HeadObjectOutput>> {
864        let mut resp = self.backend.head_object(req).await?;
865        if let Some(manifest) = extract_manifest(&resp.output.metadata) {
866            // 客側には decompress 後の意味のある content_length / checksum を返す。
867            // backend が返す圧縮済 bytes の checksum / e_tag は意味が違うため除去
868            // (S4 は manifest 内の crc32c で integrity を担保する)。
869            resp.output.content_length = Some(manifest.original_size as i64);
870            resp.output.checksum_crc32 = None;
871            resp.output.checksum_crc32c = None;
872            resp.output.checksum_crc64nvme = None;
873            resp.output.checksum_sha1 = None;
874            resp.output.checksum_sha256 = None;
875            resp.output.e_tag = None;
876        }
877        Ok(resp)
878    }
879    async fn delete_object(
880        &self,
881        req: S3Request<DeleteObjectInput>,
882    ) -> S3Result<S3Response<DeleteObjectOutput>> {
883        let bucket = req.input.bucket.clone();
884        let key = req.input.key.clone();
885        self.enforce_policy(&req, "s3:DeleteObject", &bucket, Some(&key))?;
886        // sidecar も best-effort で削除 (失敗は無視 — 存在しない場合や IAM 制限を許容)
887        let resp = self.backend.delete_object(req).await?;
888        let sidecar_input = DeleteObjectInput {
889            bucket: bucket.clone(),
890            key: sidecar_key(&key),
891            ..Default::default()
892        };
893        let sidecar_req = S3Request {
894            input: sidecar_input,
895            method: http::Method::DELETE,
896            uri: format!("/{bucket}/{}", sidecar_key(&key)).parse().unwrap(),
897            headers: http::HeaderMap::new(),
898            extensions: http::Extensions::new(),
899            credentials: None,
900            region: None,
901            service: None,
902            trailing_headers: None,
903        };
904        let _ = self.backend.delete_object(sidecar_req).await;
905        Ok(resp)
906    }
907    async fn delete_objects(
908        &self,
909        req: S3Request<DeleteObjectsInput>,
910    ) -> S3Result<S3Response<DeleteObjectsOutput>> {
911        self.backend.delete_objects(req).await
912    }
913    async fn copy_object(
914        &self,
915        mut req: S3Request<CopyObjectInput>,
916    ) -> S3Result<S3Response<CopyObjectOutput>> {
917        // copy is conceptually "GetObject src + PutObject dst" — enforce both.
918        let dst_bucket = req.input.bucket.clone();
919        let dst_key = req.input.key.clone();
920        self.enforce_policy(&req, "s3:PutObject", &dst_bucket, Some(&dst_key))?;
921        if let CopySource::Bucket { bucket, key, .. } = &req.input.copy_source {
922            self.enforce_policy(&req, "s3:GetObject", bucket, Some(key))?;
923        }
924        // S4-aware copy: source object に s4-* metadata がある場合、それを
925        // destination に確実に preserve する。
926        //
927        // - MetadataDirective::COPY (default): backend が source metadata を
928        //   そのまま copy するので S4 metadata も自動で渡る。介入不要
929        // - MetadataDirective::REPLACE: 客が指定した metadata で source を
930        //   上書き → s4-* metadata が消えると destination は decompress 不能に
931        //   なる (silent corruption)。S4 が source metadata を HEAD で取得し、
932        //   s4-* fields を input.metadata に強制 merge する
933        let needs_merge = req
934            .input
935            .metadata_directive
936            .as_ref()
937            .map(|d| d.as_str() == MetadataDirective::REPLACE)
938            .unwrap_or(false);
939        if needs_merge && let CopySource::Bucket { bucket, key, .. } = &req.input.copy_source {
940            let head_input = HeadObjectInput {
941                bucket: bucket.to_string(),
942                key: key.to_string(),
943                ..Default::default()
944            };
945            let head_req = S3Request {
946                input: head_input,
947                method: req.method.clone(),
948                uri: req.uri.clone(),
949                headers: req.headers.clone(),
950                extensions: http::Extensions::new(),
951                credentials: req.credentials.clone(),
952                region: req.region.clone(),
953                service: req.service.clone(),
954                trailing_headers: None,
955            };
956            if let Ok(head) = self.backend.head_object(head_req).await
957                && let Some(src_meta) = head.output.metadata.as_ref()
958            {
959                let dest_meta = req.input.metadata.get_or_insert_with(Default::default);
960                for key in [
961                    META_CODEC,
962                    META_ORIGINAL_SIZE,
963                    META_COMPRESSED_SIZE,
964                    META_CRC32C,
965                    META_MULTIPART,
966                    META_FRAMED,
967                ] {
968                    if let Some(v) = src_meta.get(key) {
969                        // 客が同じ key を指定していたら preserve しない (= 上書き許可)
970                        // していたら何もしない。指定していなければ insert
971                        dest_meta
972                            .entry(key.to_string())
973                            .or_insert_with(|| v.clone());
974                    }
975                }
976                debug!(
977                    src_bucket = %bucket,
978                    src_key = %key,
979                    "S4 copy_object: preserved s4-* metadata across REPLACE directive"
980                );
981            }
982        }
983        self.backend.copy_object(req).await
984    }
985    async fn list_objects(
986        &self,
987        req: S3Request<ListObjectsInput>,
988    ) -> S3Result<S3Response<ListObjectsOutput>> {
989        self.enforce_policy(&req, "s3:ListBucket", &req.input.bucket, None)?;
990        let mut resp = self.backend.list_objects(req).await?;
991        // S4 内部 object (`*.s4index` sidecar 等) を顧客から隠す
992        if let Some(contents) = resp.output.contents.as_mut() {
993            contents.retain(|o| {
994                o.key
995                    .as_ref()
996                    .map(|k| !k.ends_with(".s4index"))
997                    .unwrap_or(true)
998            });
999        }
1000        Ok(resp)
1001    }
1002    async fn list_objects_v2(
1003        &self,
1004        req: S3Request<ListObjectsV2Input>,
1005    ) -> S3Result<S3Response<ListObjectsV2Output>> {
1006        self.enforce_policy(&req, "s3:ListBucket", &req.input.bucket, None)?;
1007        let mut resp = self.backend.list_objects_v2(req).await?;
1008        if let Some(contents) = resp.output.contents.as_mut() {
1009            let before = contents.len();
1010            contents.retain(|o| {
1011                o.key
1012                    .as_ref()
1013                    .map(|k| !k.ends_with(".s4index"))
1014                    .unwrap_or(true)
1015            });
1016            // key_count も補正 (S3 spec compliance)
1017            if let Some(kc) = resp.output.key_count.as_mut() {
1018                *kc -= (before - contents.len()) as i32;
1019            }
1020        }
1021        Ok(resp)
1022    }
1023    async fn create_multipart_upload(
1024        &self,
1025        mut req: S3Request<CreateMultipartUploadInput>,
1026    ) -> S3Result<S3Response<CreateMultipartUploadOutput>> {
1027        // Multipart object は per-part 圧縮 + frame 形式で書く。GET 時に
1028        // frame parse を起動するため、object metadata に flag を立てる。
1029        // codec は dispatcher の default kind を採用 (per-part 別 codec は Phase 2)。
1030        let codec_kind = self.registry.default_kind();
1031        let meta = req.input.metadata.get_or_insert_with(Default::default);
1032        meta.insert(META_MULTIPART.into(), "true".into());
1033        meta.insert(META_CODEC.into(), codec_kind.as_str().into());
1034        debug!(
1035            bucket = ?req.input.bucket,
1036            key = ?req.input.key,
1037            codec = codec_kind.as_str(),
1038            "S4 create_multipart_upload: marking object for per-part compression"
1039        );
1040        self.backend.create_multipart_upload(req).await
1041    }
1042
1043    async fn upload_part(
1044        &self,
1045        mut req: S3Request<UploadPartInput>,
1046    ) -> S3Result<S3Response<UploadPartOutput>> {
1047        // 各 part を圧縮して frame header 付きで forward。GET 時に
1048        // `decompress_multipart` が frame iter で順に解凍する。
1049        // **per-part codec dispatch**: dispatcher が body 先頭 sample から
1050        // codec を選ぶので、parquet 風の mixed-content multipart で part ごとに
1051        // 最適 codec を使える (整数列 part → Bitcomp、text 列 part → zstd 等)。
1052        if let Some(blob) = req.input.body.take() {
1053            let bytes = collect_blob(blob, self.max_body_bytes)
1054                .await
1055                .map_err(internal("collect upload_part body"))?;
1056            let sample_len = bytes.len().min(SAMPLE_BYTES);
1057            let codec_kind = self.dispatcher.pick(&bytes[..sample_len]).await;
1058            let original_size = bytes.len() as u64;
1059            let (compressed, manifest) = self
1060                .registry
1061                .compress(bytes, codec_kind)
1062                .await
1063                .map_err(internal("registry compress part"))?;
1064            let header = FrameHeader {
1065                codec: codec_kind,
1066                original_size,
1067                compressed_size: compressed.len() as u64,
1068                crc32c: manifest.crc32c,
1069            };
1070            let mut framed = BytesMut::with_capacity(FRAME_HEADER_BYTES + compressed.len());
1071            write_frame(&mut framed, header, &compressed);
1072            // v0.2 #5: heuristic-based padding skip for likely-final parts.
1073            //
1074            // AWS SDK / aws-cli / boto3 always send the final (and only the
1075            // final) part below the configured part_size. So if the raw user
1076            // part is already smaller than S3's 5 MiB multipart minimum, this
1077            // is overwhelmingly likely to be the final part — and the final
1078            // part is exempt from S3's size constraint. Skipping padding here
1079            // saves up to ~5 MiB per object on highly compressible workloads.
1080            //
1081            // If a misbehaving client sends a tiny **non-final** part, S3
1082            // itself rejects with EntityTooSmall at CompleteMultipartUpload —
1083            // identical outcome to a vanilla S3 PUT, just earlier than
1084            // padding-then-complete would catch it.
1085            let likely_final = original_size < S3_MULTIPART_MIN_PART_BYTES as u64;
1086            if !likely_final {
1087                pad_to_minimum(&mut framed, S3_MULTIPART_MIN_PART_BYTES);
1088            }
1089            let framed_bytes = framed.freeze();
1090            let new_len = framed_bytes.len() as i64;
1091            // 同じ wire 互換問題が multipart にもある (content-length / checksum)
1092            req.input.content_length = Some(new_len);
1093            req.input.checksum_algorithm = None;
1094            req.input.checksum_crc32 = None;
1095            req.input.checksum_crc32c = None;
1096            req.input.checksum_crc64nvme = None;
1097            req.input.checksum_sha1 = None;
1098            req.input.checksum_sha256 = None;
1099            req.input.content_md5 = None;
1100            req.input.body = Some(bytes_to_blob(framed_bytes));
1101            debug!(
1102                part_number = ?req.input.part_number,
1103                upload_id = ?req.input.upload_id,
1104                original_size,
1105                framed_size = new_len,
1106                "S4 upload_part: framed compressed payload"
1107            );
1108        }
1109        self.backend.upload_part(req).await
1110    }
1111    async fn complete_multipart_upload(
1112        &self,
1113        req: S3Request<CompleteMultipartUploadInput>,
1114    ) -> S3Result<S3Response<CompleteMultipartUploadOutput>> {
1115        let bucket = req.input.bucket.clone();
1116        let key = req.input.key.clone();
1117        let resp = self.backend.complete_multipart_upload(req).await?;
1118        // CompleteMultipartUpload 成功 → 完成した object を full fetch して frame
1119        // index を build、`<key>.s4index` sidecar として保存。これで Range GET の
1120        // partial fetch path が利用可能になる (Range request の帯域節約)。
1121        // 注: 巨大 object の場合この pass は重いが、Range query は一度 sidecar が
1122        // できれば爆速になるので 1 回の cost は payback される
1123        let bucket_clone = bucket.clone();
1124        let key_clone = key.clone();
1125        let get_input = GetObjectInput {
1126            bucket: bucket_clone.clone(),
1127            key: key_clone.clone(),
1128            ..Default::default()
1129        };
1130        let get_req = S3Request {
1131            input: get_input,
1132            method: http::Method::GET,
1133            uri: format!("/{bucket_clone}/{key_clone}").parse().unwrap(),
1134            headers: http::HeaderMap::new(),
1135            extensions: http::Extensions::new(),
1136            credentials: None,
1137            region: None,
1138            service: None,
1139            trailing_headers: None,
1140        };
1141        if let Ok(get_resp) = self.backend.get_object(get_req).await
1142            && let Some(blob) = get_resp.output.body
1143            && let Ok(body) = collect_blob(blob, self.max_body_bytes).await
1144            && let Ok(index) = build_index_from_body(&body)
1145        {
1146            self.write_sidecar(&bucket, &key, &index).await;
1147        }
1148        Ok(resp)
1149    }
1150    async fn abort_multipart_upload(
1151        &self,
1152        req: S3Request<AbortMultipartUploadInput>,
1153    ) -> S3Result<S3Response<AbortMultipartUploadOutput>> {
1154        self.backend.abort_multipart_upload(req).await
1155    }
1156    async fn list_multipart_uploads(
1157        &self,
1158        req: S3Request<ListMultipartUploadsInput>,
1159    ) -> S3Result<S3Response<ListMultipartUploadsOutput>> {
1160        self.backend.list_multipart_uploads(req).await
1161    }
1162    async fn list_parts(
1163        &self,
1164        req: S3Request<ListPartsInput>,
1165    ) -> S3Result<S3Response<ListPartsOutput>> {
1166        self.backend.list_parts(req).await
1167    }
1168
1169    // =========================================================================
1170    // Phase 2 — pure passthrough delegations。S4 はこれらに対して圧縮 hook を
1171    // 持たないので、backend (= AWS S3) の動作と完全に同一。
1172    //
1173    // 既知の制限事項:
1174    // - copy_object / upload_part_copy: source object が S4-compressed の場合、
1175    //   backend が bytes を copy するだけなので metadata (s4-codec etc) も一緒に
1176    //   coppied される (AWS S3 default = MetadataDirective COPY)。GET は manifest
1177    //   経由で正しく decompress できる。MetadataDirective REPLACE で上書き
1178    //   されると圧縮 metadata が消えて壊れる — 顧客側の運用で注意
1179    // - list_object_versions: versioning enabled bucket では各 version も S4
1180    //   metadata を維持する。古い version も S4 経由で正しく GET できる。
1181    // =========================================================================
1182
1183    // ---- Object ACL / tagging / attributes ----
1184    async fn get_object_acl(
1185        &self,
1186        req: S3Request<GetObjectAclInput>,
1187    ) -> S3Result<S3Response<GetObjectAclOutput>> {
1188        self.backend.get_object_acl(req).await
1189    }
1190    async fn put_object_acl(
1191        &self,
1192        req: S3Request<PutObjectAclInput>,
1193    ) -> S3Result<S3Response<PutObjectAclOutput>> {
1194        self.backend.put_object_acl(req).await
1195    }
1196    async fn get_object_tagging(
1197        &self,
1198        req: S3Request<GetObjectTaggingInput>,
1199    ) -> S3Result<S3Response<GetObjectTaggingOutput>> {
1200        self.backend.get_object_tagging(req).await
1201    }
1202    async fn put_object_tagging(
1203        &self,
1204        req: S3Request<PutObjectTaggingInput>,
1205    ) -> S3Result<S3Response<PutObjectTaggingOutput>> {
1206        self.backend.put_object_tagging(req).await
1207    }
1208    async fn delete_object_tagging(
1209        &self,
1210        req: S3Request<DeleteObjectTaggingInput>,
1211    ) -> S3Result<S3Response<DeleteObjectTaggingOutput>> {
1212        self.backend.delete_object_tagging(req).await
1213    }
1214    async fn get_object_attributes(
1215        &self,
1216        req: S3Request<GetObjectAttributesInput>,
1217    ) -> S3Result<S3Response<GetObjectAttributesOutput>> {
1218        self.backend.get_object_attributes(req).await
1219    }
1220    async fn restore_object(
1221        &self,
1222        req: S3Request<RestoreObjectInput>,
1223    ) -> S3Result<S3Response<RestoreObjectOutput>> {
1224        self.backend.restore_object(req).await
1225    }
1226    async fn upload_part_copy(
1227        &self,
1228        req: S3Request<UploadPartCopyInput>,
1229    ) -> S3Result<S3Response<UploadPartCopyOutput>> {
1230        // v0.2 #6: byte-range aware copy when the source is S4-framed.
1231        //
1232        // For a framed source (multipart upload OR single-PUT framed-v2),
1233        // a naive byte-range passthrough would copy compressed bytes that
1234        // don't align with S4 frame boundaries — silently corrupting the
1235        // result. Instead we GET the source through S4 (which handles
1236        // decompression + Range), re-compress + re-frame as a new part,
1237        // and forward as upload_part. For non-framed sources (S4-untouched
1238        // raw objects), passthrough is correct and we keep the original
1239        // (cheaper) code path.
1240        let CopySource::Bucket {
1241            bucket: src_bucket,
1242            key: src_key,
1243            ..
1244        } = &req.input.copy_source
1245        else {
1246            return self.backend.upload_part_copy(req).await;
1247        };
1248        let src_bucket = src_bucket.to_string();
1249        let src_key = src_key.to_string();
1250
1251        // Probe metadata to decide whether the source needs S4-aware copy.
1252        let head_input = HeadObjectInput {
1253            bucket: src_bucket.clone(),
1254            key: src_key.clone(),
1255            ..Default::default()
1256        };
1257        let head_req = S3Request {
1258            input: head_input,
1259            method: http::Method::HEAD,
1260            uri: req.uri.clone(),
1261            headers: req.headers.clone(),
1262            extensions: http::Extensions::new(),
1263            credentials: req.credentials.clone(),
1264            region: req.region.clone(),
1265            service: req.service.clone(),
1266            trailing_headers: None,
1267        };
1268        let needs_s4_copy = match self.backend.head_object(head_req).await {
1269            Ok(h) => {
1270                is_multipart_object(&h.output.metadata) || is_framed_v2_object(&h.output.metadata)
1271            }
1272            Err(_) => false,
1273        };
1274        if !needs_s4_copy {
1275            return self.backend.upload_part_copy(req).await;
1276        }
1277
1278        // Resolve the optional source byte range to pass to GET.
1279        let source_range = req
1280            .input
1281            .copy_source_range
1282            .as_ref()
1283            .map(|r| parse_copy_source_range(r))
1284            .transpose()
1285            .map_err(|e| S3Error::with_message(S3ErrorCode::InvalidRange, e))?;
1286
1287        // GET source via S4 (handles decompression + sidecar partial fetch
1288        // when range is present). The result is the requested user-visible
1289        // byte range, fully decompressed.
1290        let mut get_input = GetObjectInput {
1291            bucket: src_bucket.clone(),
1292            key: src_key.clone(),
1293            ..Default::default()
1294        };
1295        get_input.range = source_range;
1296        let get_req = S3Request {
1297            input: get_input,
1298            method: http::Method::GET,
1299            uri: req.uri.clone(),
1300            headers: req.headers.clone(),
1301            extensions: http::Extensions::new(),
1302            credentials: req.credentials.clone(),
1303            region: req.region.clone(),
1304            service: req.service.clone(),
1305            trailing_headers: None,
1306        };
1307        let get_resp = self.get_object(get_req).await?;
1308        let blob = get_resp.output.body.ok_or_else(|| {
1309            S3Error::with_message(
1310                S3ErrorCode::InternalError,
1311                "upload_part_copy: empty body from source GET",
1312            )
1313        })?;
1314        let bytes = collect_blob(blob, self.max_body_bytes)
1315            .await
1316            .map_err(internal("collect upload_part_copy source body"))?;
1317
1318        // Compress + frame as a fresh part (mirrors upload_part path).
1319        let sample_len = bytes.len().min(SAMPLE_BYTES);
1320        let codec_kind = self.dispatcher.pick(&bytes[..sample_len]).await;
1321        let original_size = bytes.len() as u64;
1322        let (compressed, manifest) = self
1323            .registry
1324            .compress(bytes, codec_kind)
1325            .await
1326            .map_err(internal("registry compress upload_part_copy"))?;
1327        let header = FrameHeader {
1328            codec: codec_kind,
1329            original_size,
1330            compressed_size: compressed.len() as u64,
1331            crc32c: manifest.crc32c,
1332        };
1333        let mut framed = BytesMut::with_capacity(FRAME_HEADER_BYTES + compressed.len());
1334        write_frame(&mut framed, header, &compressed);
1335        let likely_final = original_size < S3_MULTIPART_MIN_PART_BYTES as u64;
1336        if !likely_final {
1337            pad_to_minimum(&mut framed, S3_MULTIPART_MIN_PART_BYTES);
1338        }
1339        let framed_bytes = framed.freeze();
1340        let framed_len = framed_bytes.len() as i64;
1341
1342        // Forward as upload_part to the destination multipart upload.
1343        let part_input = UploadPartInput {
1344            bucket: req.input.bucket.clone(),
1345            key: req.input.key.clone(),
1346            part_number: req.input.part_number,
1347            upload_id: req.input.upload_id.clone(),
1348            body: Some(bytes_to_blob(framed_bytes)),
1349            content_length: Some(framed_len),
1350            ..Default::default()
1351        };
1352        let part_req = S3Request {
1353            input: part_input,
1354            method: http::Method::PUT,
1355            uri: req.uri.clone(),
1356            headers: req.headers.clone(),
1357            extensions: http::Extensions::new(),
1358            credentials: req.credentials.clone(),
1359            region: req.region.clone(),
1360            service: req.service.clone(),
1361            trailing_headers: None,
1362        };
1363        let upload_resp = self.backend.upload_part(part_req).await?;
1364
1365        let copy_output = UploadPartCopyOutput {
1366            copy_part_result: Some(CopyPartResult {
1367                e_tag: upload_resp.output.e_tag.clone(),
1368                ..Default::default()
1369            }),
1370            ..Default::default()
1371        };
1372        Ok(S3Response::new(copy_output))
1373    }
1374
1375    // ---- Object lock / retention / legal hold ----
1376    async fn get_object_lock_configuration(
1377        &self,
1378        req: S3Request<GetObjectLockConfigurationInput>,
1379    ) -> S3Result<S3Response<GetObjectLockConfigurationOutput>> {
1380        self.backend.get_object_lock_configuration(req).await
1381    }
1382    async fn put_object_lock_configuration(
1383        &self,
1384        req: S3Request<PutObjectLockConfigurationInput>,
1385    ) -> S3Result<S3Response<PutObjectLockConfigurationOutput>> {
1386        self.backend.put_object_lock_configuration(req).await
1387    }
1388    async fn get_object_legal_hold(
1389        &self,
1390        req: S3Request<GetObjectLegalHoldInput>,
1391    ) -> S3Result<S3Response<GetObjectLegalHoldOutput>> {
1392        self.backend.get_object_legal_hold(req).await
1393    }
1394    async fn put_object_legal_hold(
1395        &self,
1396        req: S3Request<PutObjectLegalHoldInput>,
1397    ) -> S3Result<S3Response<PutObjectLegalHoldOutput>> {
1398        self.backend.put_object_legal_hold(req).await
1399    }
1400    async fn get_object_retention(
1401        &self,
1402        req: S3Request<GetObjectRetentionInput>,
1403    ) -> S3Result<S3Response<GetObjectRetentionOutput>> {
1404        self.backend.get_object_retention(req).await
1405    }
1406    async fn put_object_retention(
1407        &self,
1408        req: S3Request<PutObjectRetentionInput>,
1409    ) -> S3Result<S3Response<PutObjectRetentionOutput>> {
1410        self.backend.put_object_retention(req).await
1411    }
1412
1413    // ---- Versioning ----
1414    async fn list_object_versions(
1415        &self,
1416        req: S3Request<ListObjectVersionsInput>,
1417    ) -> S3Result<S3Response<ListObjectVersionsOutput>> {
1418        self.backend.list_object_versions(req).await
1419    }
1420    async fn get_bucket_versioning(
1421        &self,
1422        req: S3Request<GetBucketVersioningInput>,
1423    ) -> S3Result<S3Response<GetBucketVersioningOutput>> {
1424        self.backend.get_bucket_versioning(req).await
1425    }
1426    async fn put_bucket_versioning(
1427        &self,
1428        req: S3Request<PutBucketVersioningInput>,
1429    ) -> S3Result<S3Response<PutBucketVersioningOutput>> {
1430        self.backend.put_bucket_versioning(req).await
1431    }
1432
1433    // ---- Bucket location ----
1434    async fn get_bucket_location(
1435        &self,
1436        req: S3Request<GetBucketLocationInput>,
1437    ) -> S3Result<S3Response<GetBucketLocationOutput>> {
1438        self.backend.get_bucket_location(req).await
1439    }
1440
1441    // ---- Bucket policy ----
1442    async fn get_bucket_policy(
1443        &self,
1444        req: S3Request<GetBucketPolicyInput>,
1445    ) -> S3Result<S3Response<GetBucketPolicyOutput>> {
1446        self.backend.get_bucket_policy(req).await
1447    }
1448    async fn put_bucket_policy(
1449        &self,
1450        req: S3Request<PutBucketPolicyInput>,
1451    ) -> S3Result<S3Response<PutBucketPolicyOutput>> {
1452        self.backend.put_bucket_policy(req).await
1453    }
1454    async fn delete_bucket_policy(
1455        &self,
1456        req: S3Request<DeleteBucketPolicyInput>,
1457    ) -> S3Result<S3Response<DeleteBucketPolicyOutput>> {
1458        self.backend.delete_bucket_policy(req).await
1459    }
1460    async fn get_bucket_policy_status(
1461        &self,
1462        req: S3Request<GetBucketPolicyStatusInput>,
1463    ) -> S3Result<S3Response<GetBucketPolicyStatusOutput>> {
1464        self.backend.get_bucket_policy_status(req).await
1465    }
1466
1467    // ---- Bucket ACL ----
1468    async fn get_bucket_acl(
1469        &self,
1470        req: S3Request<GetBucketAclInput>,
1471    ) -> S3Result<S3Response<GetBucketAclOutput>> {
1472        self.backend.get_bucket_acl(req).await
1473    }
1474    async fn put_bucket_acl(
1475        &self,
1476        req: S3Request<PutBucketAclInput>,
1477    ) -> S3Result<S3Response<PutBucketAclOutput>> {
1478        self.backend.put_bucket_acl(req).await
1479    }
1480
1481    // ---- Bucket CORS ----
1482    async fn get_bucket_cors(
1483        &self,
1484        req: S3Request<GetBucketCorsInput>,
1485    ) -> S3Result<S3Response<GetBucketCorsOutput>> {
1486        self.backend.get_bucket_cors(req).await
1487    }
1488    async fn put_bucket_cors(
1489        &self,
1490        req: S3Request<PutBucketCorsInput>,
1491    ) -> S3Result<S3Response<PutBucketCorsOutput>> {
1492        self.backend.put_bucket_cors(req).await
1493    }
1494    async fn delete_bucket_cors(
1495        &self,
1496        req: S3Request<DeleteBucketCorsInput>,
1497    ) -> S3Result<S3Response<DeleteBucketCorsOutput>> {
1498        self.backend.delete_bucket_cors(req).await
1499    }
1500
1501    // ---- Bucket lifecycle ----
1502    async fn get_bucket_lifecycle_configuration(
1503        &self,
1504        req: S3Request<GetBucketLifecycleConfigurationInput>,
1505    ) -> S3Result<S3Response<GetBucketLifecycleConfigurationOutput>> {
1506        self.backend.get_bucket_lifecycle_configuration(req).await
1507    }
1508    async fn put_bucket_lifecycle_configuration(
1509        &self,
1510        req: S3Request<PutBucketLifecycleConfigurationInput>,
1511    ) -> S3Result<S3Response<PutBucketLifecycleConfigurationOutput>> {
1512        self.backend.put_bucket_lifecycle_configuration(req).await
1513    }
1514    async fn delete_bucket_lifecycle(
1515        &self,
1516        req: S3Request<DeleteBucketLifecycleInput>,
1517    ) -> S3Result<S3Response<DeleteBucketLifecycleOutput>> {
1518        self.backend.delete_bucket_lifecycle(req).await
1519    }
1520
1521    // ---- Bucket tagging ----
1522    async fn get_bucket_tagging(
1523        &self,
1524        req: S3Request<GetBucketTaggingInput>,
1525    ) -> S3Result<S3Response<GetBucketTaggingOutput>> {
1526        self.backend.get_bucket_tagging(req).await
1527    }
1528    async fn put_bucket_tagging(
1529        &self,
1530        req: S3Request<PutBucketTaggingInput>,
1531    ) -> S3Result<S3Response<PutBucketTaggingOutput>> {
1532        self.backend.put_bucket_tagging(req).await
1533    }
1534    async fn delete_bucket_tagging(
1535        &self,
1536        req: S3Request<DeleteBucketTaggingInput>,
1537    ) -> S3Result<S3Response<DeleteBucketTaggingOutput>> {
1538        self.backend.delete_bucket_tagging(req).await
1539    }
1540
1541    // ---- Bucket encryption ----
1542    async fn get_bucket_encryption(
1543        &self,
1544        req: S3Request<GetBucketEncryptionInput>,
1545    ) -> S3Result<S3Response<GetBucketEncryptionOutput>> {
1546        self.backend.get_bucket_encryption(req).await
1547    }
1548    async fn put_bucket_encryption(
1549        &self,
1550        req: S3Request<PutBucketEncryptionInput>,
1551    ) -> S3Result<S3Response<PutBucketEncryptionOutput>> {
1552        self.backend.put_bucket_encryption(req).await
1553    }
1554    async fn delete_bucket_encryption(
1555        &self,
1556        req: S3Request<DeleteBucketEncryptionInput>,
1557    ) -> S3Result<S3Response<DeleteBucketEncryptionOutput>> {
1558        self.backend.delete_bucket_encryption(req).await
1559    }
1560
1561    // ---- Bucket logging ----
1562    async fn get_bucket_logging(
1563        &self,
1564        req: S3Request<GetBucketLoggingInput>,
1565    ) -> S3Result<S3Response<GetBucketLoggingOutput>> {
1566        self.backend.get_bucket_logging(req).await
1567    }
1568    async fn put_bucket_logging(
1569        &self,
1570        req: S3Request<PutBucketLoggingInput>,
1571    ) -> S3Result<S3Response<PutBucketLoggingOutput>> {
1572        self.backend.put_bucket_logging(req).await
1573    }
1574
1575    // ---- Bucket notification ----
1576    async fn get_bucket_notification_configuration(
1577        &self,
1578        req: S3Request<GetBucketNotificationConfigurationInput>,
1579    ) -> S3Result<S3Response<GetBucketNotificationConfigurationOutput>> {
1580        self.backend
1581            .get_bucket_notification_configuration(req)
1582            .await
1583    }
1584    async fn put_bucket_notification_configuration(
1585        &self,
1586        req: S3Request<PutBucketNotificationConfigurationInput>,
1587    ) -> S3Result<S3Response<PutBucketNotificationConfigurationOutput>> {
1588        self.backend
1589            .put_bucket_notification_configuration(req)
1590            .await
1591    }
1592
1593    // ---- Bucket request payment ----
1594    async fn get_bucket_request_payment(
1595        &self,
1596        req: S3Request<GetBucketRequestPaymentInput>,
1597    ) -> S3Result<S3Response<GetBucketRequestPaymentOutput>> {
1598        self.backend.get_bucket_request_payment(req).await
1599    }
1600    async fn put_bucket_request_payment(
1601        &self,
1602        req: S3Request<PutBucketRequestPaymentInput>,
1603    ) -> S3Result<S3Response<PutBucketRequestPaymentOutput>> {
1604        self.backend.put_bucket_request_payment(req).await
1605    }
1606
1607    // ---- Bucket website ----
1608    async fn get_bucket_website(
1609        &self,
1610        req: S3Request<GetBucketWebsiteInput>,
1611    ) -> S3Result<S3Response<GetBucketWebsiteOutput>> {
1612        self.backend.get_bucket_website(req).await
1613    }
1614    async fn put_bucket_website(
1615        &self,
1616        req: S3Request<PutBucketWebsiteInput>,
1617    ) -> S3Result<S3Response<PutBucketWebsiteOutput>> {
1618        self.backend.put_bucket_website(req).await
1619    }
1620    async fn delete_bucket_website(
1621        &self,
1622        req: S3Request<DeleteBucketWebsiteInput>,
1623    ) -> S3Result<S3Response<DeleteBucketWebsiteOutput>> {
1624        self.backend.delete_bucket_website(req).await
1625    }
1626
1627    // ---- Bucket replication ----
1628    async fn get_bucket_replication(
1629        &self,
1630        req: S3Request<GetBucketReplicationInput>,
1631    ) -> S3Result<S3Response<GetBucketReplicationOutput>> {
1632        self.backend.get_bucket_replication(req).await
1633    }
1634    async fn put_bucket_replication(
1635        &self,
1636        req: S3Request<PutBucketReplicationInput>,
1637    ) -> S3Result<S3Response<PutBucketReplicationOutput>> {
1638        self.backend.put_bucket_replication(req).await
1639    }
1640    async fn delete_bucket_replication(
1641        &self,
1642        req: S3Request<DeleteBucketReplicationInput>,
1643    ) -> S3Result<S3Response<DeleteBucketReplicationOutput>> {
1644        self.backend.delete_bucket_replication(req).await
1645    }
1646
1647    // ---- Bucket accelerate ----
1648    async fn get_bucket_accelerate_configuration(
1649        &self,
1650        req: S3Request<GetBucketAccelerateConfigurationInput>,
1651    ) -> S3Result<S3Response<GetBucketAccelerateConfigurationOutput>> {
1652        self.backend.get_bucket_accelerate_configuration(req).await
1653    }
1654    async fn put_bucket_accelerate_configuration(
1655        &self,
1656        req: S3Request<PutBucketAccelerateConfigurationInput>,
1657    ) -> S3Result<S3Response<PutBucketAccelerateConfigurationOutput>> {
1658        self.backend.put_bucket_accelerate_configuration(req).await
1659    }
1660
1661    // ---- Bucket ownership controls ----
1662    async fn get_bucket_ownership_controls(
1663        &self,
1664        req: S3Request<GetBucketOwnershipControlsInput>,
1665    ) -> S3Result<S3Response<GetBucketOwnershipControlsOutput>> {
1666        self.backend.get_bucket_ownership_controls(req).await
1667    }
1668    async fn put_bucket_ownership_controls(
1669        &self,
1670        req: S3Request<PutBucketOwnershipControlsInput>,
1671    ) -> S3Result<S3Response<PutBucketOwnershipControlsOutput>> {
1672        self.backend.put_bucket_ownership_controls(req).await
1673    }
1674    async fn delete_bucket_ownership_controls(
1675        &self,
1676        req: S3Request<DeleteBucketOwnershipControlsInput>,
1677    ) -> S3Result<S3Response<DeleteBucketOwnershipControlsOutput>> {
1678        self.backend.delete_bucket_ownership_controls(req).await
1679    }
1680
1681    // ---- Public access block ----
1682    async fn get_public_access_block(
1683        &self,
1684        req: S3Request<GetPublicAccessBlockInput>,
1685    ) -> S3Result<S3Response<GetPublicAccessBlockOutput>> {
1686        self.backend.get_public_access_block(req).await
1687    }
1688    async fn put_public_access_block(
1689        &self,
1690        req: S3Request<PutPublicAccessBlockInput>,
1691    ) -> S3Result<S3Response<PutPublicAccessBlockOutput>> {
1692        self.backend.put_public_access_block(req).await
1693    }
1694    async fn delete_public_access_block(
1695        &self,
1696        req: S3Request<DeletePublicAccessBlockInput>,
1697    ) -> S3Result<S3Response<DeletePublicAccessBlockOutput>> {
1698        self.backend.delete_public_access_block(req).await
1699    }
1700}
1701
1702#[cfg(test)]
1703mod tests {
1704    use super::*;
1705
1706    #[test]
1707    fn manifest_roundtrip_via_metadata() {
1708        let original = ChunkManifest {
1709            codec: CodecKind::CpuZstd,
1710            original_size: 1234,
1711            compressed_size: 567,
1712            crc32c: 0xdead_beef,
1713        };
1714        let mut meta: Option<Metadata> = None;
1715        write_manifest(&mut meta, &original);
1716        let extracted = extract_manifest(&meta).expect("manifest must round-trip");
1717        assert_eq!(extracted.codec, original.codec);
1718        assert_eq!(extracted.original_size, original.original_size);
1719        assert_eq!(extracted.compressed_size, original.compressed_size);
1720        assert_eq!(extracted.crc32c, original.crc32c);
1721    }
1722
1723    #[test]
1724    fn missing_metadata_yields_none() {
1725        let meta: Option<Metadata> = None;
1726        assert!(extract_manifest(&meta).is_none());
1727    }
1728
1729    #[test]
1730    fn partial_metadata_yields_none() {
1731        let mut meta = Metadata::new();
1732        meta.insert(META_CODEC.into(), "cpu-zstd".into());
1733        let opt = Some(meta);
1734        assert!(extract_manifest(&opt).is_none());
1735    }
1736
1737    #[test]
1738    fn parse_copy_source_range_basic() {
1739        let r = parse_copy_source_range("bytes=10-20").unwrap();
1740        match r {
1741            s3s::dto::Range::Int { first, last } => {
1742                assert_eq!(first, 10);
1743                assert_eq!(last, Some(20));
1744            }
1745            _ => panic!("expected Int range"),
1746        }
1747    }
1748
1749    #[test]
1750    fn parse_copy_source_range_rejects_inverted() {
1751        let err = parse_copy_source_range("bytes=20-10").unwrap_err();
1752        assert!(err.contains("last < first"));
1753    }
1754
1755    #[test]
1756    fn parse_copy_source_range_rejects_missing_prefix() {
1757        let err = parse_copy_source_range("10-20").unwrap_err();
1758        assert!(err.contains("must start with 'bytes='"));
1759    }
1760
1761    #[test]
1762    fn parse_copy_source_range_rejects_open_ended() {
1763        // S3 upload_part_copy spec requires N-M (closed); suffix and
1764        // open-ended forms are not allowed for this header.
1765        assert!(parse_copy_source_range("bytes=10-").is_err());
1766        assert!(parse_copy_source_range("bytes=-10").is_err());
1767    }
1768}