Skip to main content

s4_server/
streaming.rs

1//! Streaming compression / decompression helpers。
2//!
3//! `s4_codec::Codec` の bytes-in / bytes-out API は memory cap (5 GiB) を持つため
4//! 大規模オブジェクトで OOM 危険。本 module は `async-compression` 経由で zstd を
5//! AsyncRead/AsyncWrite に差し込み、`StreamingBlob` (= `futures::Stream<Bytes>`)
6//! ↔ AsyncRead を `tokio_util::io` で橋渡しする。
7//!
8//! ## 対応 codec
9//!
10//! - **CpuZstd**: `async_compression::tokio::bufread::ZstdDecoder` で完全 streaming
11//! - **Passthrough**: 入力 stream をそのまま返す (ゼロコスト streaming)
12//! - **NvcompZstd / NvcompBitcomp**: nvCOMP は batch API のため per-chunk batch 処理
13//!   (Phase 2.1 で追加予定、現状は default の bytes-based に fallback)
14//!
15//! ## 整合性検証
16//!
17//! Streaming GET では bytes 全体の CRC32C をオンザフライで計算しつつ stream を
18//! 流す `Crc32cVerifier` adapter を被せる。最後の chunk が yield された時点で
19//! manifest.crc32c と比較し、不一致なら error として伝播 (= client 側で
20//! body parse 失敗として現れる)。
21
22use std::io;
23use std::pin::Pin;
24use std::task::{Context, Poll};
25
26use async_compression::Level;
27use async_compression::tokio::bufread::ZstdDecoder;
28use async_compression::tokio::write::ZstdEncoder;
29use bytes::Bytes;
30use futures::{Stream, StreamExt};
31use s3s::StdError;
32use s3s::dto::StreamingBlob;
33use s3s::stream::{ByteStream, RemainingLength};
34use s4_codec::multipart::{FrameHeader, write_frame};
35use s4_codec::{ChunkManifest, CodecError, CodecKind, CodecRegistry};
36use std::sync::Arc;
37use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt, BufReader, ReadBuf};
38use tokio_util::io::{ReaderStream, StreamReader};
39
40/// `StreamingBlob` を AsyncRead として扱えるラッパ。
41///
42/// `s3s::dto::StreamingBlob` は `futures::Stream<Item = Result<Bytes, StdError>>`
43/// なので、`tokio_util::io::StreamReader` を使うと `tokio::io::AsyncRead` に変換できる。
44/// ただし StreamReader は `std::io::Error` を期待するので、StdError → io::Error への
45/// 変換層を挟む必要がある。
46pub fn blob_to_async_read(blob: StreamingBlob) -> impl AsyncRead + Unpin + Send + Sync + 'static {
47    let mapped = blob.map(|chunk| chunk.map_err(|e| io::Error::other(e.to_string())));
48    StreamReader::new(mapped)
49}
50
51/// `AsyncRead` を 1 chunk = 64 KiB の `StreamingBlob` に変換 (size 不明の chunked stream)。
52pub fn async_read_to_blob<R: AsyncRead + Unpin + Send + Sync + 'static>(
53    reader: R,
54) -> StreamingBlob {
55    let stream = ReaderStream::new(reader).map(|res| res.map_err(|e| Box::new(e) as StdError));
56    StreamingBlob::new(StreamWrapper { inner: stream })
57}
58
59pin_project_lite::pin_project! {
60    /// Stream<Item=Result<Bytes, StdError>> に ByteStream impl を生やすラッパ。
61    /// remaining_length は unknown を返す (streaming = size 未知)。
62    struct StreamWrapper<S> { #[pin] inner: S }
63}
64
65impl<S> Stream for StreamWrapper<S>
66where
67    S: Stream<Item = Result<Bytes, StdError>>,
68{
69    type Item = Result<Bytes, StdError>;
70    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
71        self.project().inner.poll_next(cx)
72    }
73    fn size_hint(&self) -> (usize, Option<usize>) {
74        self.inner.size_hint()
75    }
76}
77
78impl<S> ByteStream for StreamWrapper<S>
79where
80    S: Stream<Item = Result<Bytes, StdError>> + Send + Sync,
81{
82    fn remaining_length(&self) -> RemainingLength {
83        // streaming output: size unknown
84        RemainingLength::unknown()
85    }
86}
87
88/// CpuZstd で `body` を **streaming** に decompress した `StreamingBlob` を返す。
89///
90/// memory peak は zstd window size + chunk size 程度 (typically <10 MiB)。
91/// TTFB は最初の chunk が decompress された時点で client に渡る。
92///
93/// **multi-frame 対応**: zstd 仕様で「複数 frame の連結 = 1 つの valid な zstd
94/// stream」と定義されているため、`streaming_compress_cpu_zstd` のような per-chunk
95/// 圧縮された連結出力もそのまま decode 可能。`async_compression` の default は
96/// single-frame のため、明示的に `multiple_members(true)` を有効化。
97pub fn cpu_zstd_decompress_stream(body: StreamingBlob) -> StreamingBlob {
98    let read = blob_to_async_read(body);
99    let mut decoder = ZstdDecoder::new(BufReader::new(read));
100    decoder.multiple_members(true);
101    async_read_to_blob(decoder)
102}
103
104/// v0.8.4 #73 H-1: streaming GET integrity guard. Wraps an inner `AsyncRead`
105/// (typically the output of [`cpu_zstd_decompress_stream`]) and computes a
106/// rolling CRC32C as bytes flow through. On EOF the rolling CRC and the
107/// observed byte count are compared against the manifest-declared values; a
108/// mismatch surfaces as `io::ErrorKind::InvalidData` so the HTTP body stream
109/// fails — the client sees a truncated / aborted response rather than silent
110/// corruption.
111///
112/// The wrapper is **bytes-pass-through**: the entire payload reaches the
113/// client as soon as each chunk is produced (no buffering of the plaintext),
114/// preserving the streaming TTFB property that the unwrapped CpuZstd path
115/// already has. The integrity decision lands at EOF, which on a corrupted
116/// body shows up as a streaming error tail (HTTP/1.1 chunked: an aborted
117/// final chunk; HTTP/2: RST_STREAM with INTERNAL_ERROR).
118///
119/// Why a custom wrapper instead of, say, a `tokio_util` adapter: the rolling
120/// CRC needs both the per-chunk bytes (to fold into the running checksum)
121/// and the EOF signal (to issue the final compare); the existing wrappers
122/// in `tokio-util` (`StreamReader`, `InspectReader`) only expose pre-EOF
123/// byte hooks and would require a separate end-of-stream reactor.
124pub struct Crc32cVerifyingReader<R> {
125    inner: R,
126    expected_crc: u32,
127    expected_size: u64,
128    rolling_crc: u32,
129    bytes_read: u64,
130    /// Once a verify-failure has been emitted we keep returning EOF on
131    /// subsequent polls so callers that don't immediately stop after the
132    /// error don't get a fresh CRC value (which would be the rolling CRC
133    /// from a partial stream — meaningless after the failure was reported).
134    failed: bool,
135}
136
137impl<R> Crc32cVerifyingReader<R> {
138    pub fn new(inner: R, expected_crc: u32, expected_size: u64) -> Self {
139        Self {
140            inner,
141            expected_crc,
142            expected_size,
143            rolling_crc: 0,
144            bytes_read: 0,
145            failed: false,
146        }
147    }
148
149    /// Test-only inspection of the rolling CRC at the current point in the
150    /// stream. Useful from unit tests that drive the reader manually.
151    #[cfg(test)]
152    pub fn rolling_crc(&self) -> u32 {
153        self.rolling_crc
154    }
155
156    #[cfg(test)]
157    pub fn bytes_read(&self) -> u64 {
158        self.bytes_read
159    }
160}
161
162impl<R> AsyncRead for Crc32cVerifyingReader<R>
163where
164    R: AsyncRead + Unpin,
165{
166    fn poll_read(
167        mut self: Pin<&mut Self>,
168        cx: &mut Context<'_>,
169        buf: &mut ReadBuf<'_>,
170    ) -> Poll<io::Result<()>> {
171        if self.failed {
172            // Once we've reported the corruption, behave like a closed
173            // stream — no further bytes, no further error. (Re-issuing
174            // the error on every poll would also be defensible; we pick
175            // "EOF after error" so callers that loop on `Ok(0)` cleanly
176            // exit instead of spinning on `Err`.)
177            return Poll::Ready(Ok(()));
178        }
179        let pre_filled = buf.filled().len();
180        match Pin::new(&mut self.inner).poll_read(cx, buf) {
181            Poll::Pending => Poll::Pending,
182            Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
183            Poll::Ready(Ok(())) => {
184                let new_filled = buf.filled().len();
185                if new_filled > pre_filled {
186                    let chunk = &buf.filled()[pre_filled..new_filled];
187                    self.rolling_crc = crc32c::crc32c_append(self.rolling_crc, chunk);
188                    self.bytes_read = self.bytes_read.saturating_add(chunk.len() as u64);
189                    Poll::Ready(Ok(()))
190                } else {
191                    // EOF — verify both invariants. Size mismatch comes
192                    // first because a short stream often signals the same
193                    // root cause (truncation) more clearly than the CRC
194                    // mismatch derived from partial bytes.
195                    if self.bytes_read != self.expected_size {
196                        self.failed = true;
197                        return Poll::Ready(Err(io::Error::new(
198                            io::ErrorKind::InvalidData,
199                            format!(
200                                "S4 streaming GET size mismatch: \
201                                 expected {} bytes, got {}",
202                                self.expected_size, self.bytes_read
203                            ),
204                        )));
205                    }
206                    if self.rolling_crc != self.expected_crc {
207                        self.failed = true;
208                        return Poll::Ready(Err(io::Error::new(
209                            io::ErrorKind::InvalidData,
210                            format!(
211                                "S4 streaming GET crc32c mismatch: \
212                                 expected {:#010x}, got {:#010x}",
213                                self.expected_crc, self.rolling_crc
214                            ),
215                        )));
216                    }
217                    Poll::Ready(Ok(()))
218                }
219            }
220        }
221    }
222}
223
224/// codec が streaming-aware かを判定 (S4Service 側で fast path 分岐に使う)。
225pub fn supports_streaming_decompress(codec: CodecKind) -> bool {
226    // NvcompZstd の出力は zstd frame の連結 (zstd 仕様で valid な single stream) なので
227    // CPU zstd decoder で stream decompress 可能。NvcompBitcomp は per-chunk metadata が
228    // 別形式なので未対応 (HLIF self-describing 化を待つ)。
229    matches!(
230        codec,
231        CodecKind::Passthrough | CodecKind::CpuZstd | CodecKind::NvcompZstd
232    )
233}
234
235pub fn supports_streaming_compress(codec: CodecKind) -> bool {
236    #[cfg(feature = "nvcomp-gpu")]
237    {
238        matches!(
239            codec,
240            CodecKind::Passthrough | CodecKind::CpuZstd | CodecKind::NvcompZstd
241        )
242    }
243    #[cfg(not(feature = "nvcomp-gpu"))]
244    {
245        matches!(codec, CodecKind::Passthrough | CodecKind::CpuZstd)
246    }
247}
248
249/// `body` を CPU zstd で **input-streaming** 圧縮し、(compressed bytes, manifest)
250/// を返す。memory peak は zstd window + 64 KiB read buffer + 圧縮済 output ≈
251/// `compressed_size + ~100 MB`。**入力 5 GB を 100 MB に圧縮するケースで peak は
252/// ~200 MB** (vs. naive bytes-buffered だと peak 5 GB)。
253///
254/// 圧縮済 output 全体を Bytes として保持する理由は backend (aws-sdk-s3) が
255/// chunked-without-content-length を SigV4 chunked encoding interceptor で reject
256/// するため。Phase 2.1 で SigV4 streaming mode に対応すれば true streaming PUT
257/// (compressed output も streaming) に拡張可。
258pub async fn streaming_compress_cpu_zstd(
259    body: StreamingBlob,
260    level: i32,
261) -> Result<(Bytes, ChunkManifest), CodecError> {
262    let mut read = blob_to_async_read(body);
263    let mut compressed_buf: Vec<u8> = Vec::with_capacity(256 * 1024);
264    let mut crc: u32 = 0;
265    let mut total_in: u64 = 0;
266    let mut in_buf = vec![0u8; 64 * 1024];
267
268    {
269        let mut encoder = ZstdEncoder::with_quality(&mut compressed_buf, Level::Precise(level));
270        loop {
271            let n = read.read(&mut in_buf).await.map_err(CodecError::Io)?;
272            if n == 0 {
273                break;
274            }
275            crc = crc32c::crc32c_append(crc, &in_buf[..n]);
276            total_in += n as u64;
277            encoder
278                .write_all(&in_buf[..n])
279                .await
280                .map_err(CodecError::Io)?;
281        }
282        encoder.shutdown().await.map_err(CodecError::Io)?;
283    }
284
285    let compressed_len = compressed_buf.len() as u64;
286    Ok((
287        Bytes::from(compressed_buf),
288        ChunkManifest {
289            codec: CodecKind::CpuZstd,
290            original_size: total_in,
291            compressed_size: compressed_len,
292            crc32c: crc,
293        },
294    ))
295}
296
297/// `streaming_compress_to_frames` の default chunk size。
298///
299/// 4 MiB を選んだ根拠:
300/// - Range GET 1 件の最小帯域 ~= compressed_size_per_chunk (~数百 KB-1 MB)
301/// - chunk 数が現実的 (1 GiB object → 256 frames、sidecar < 10 KB)
302/// - CPU/GPU codec の per-call overhead が amortized
303pub const DEFAULT_S4F2_CHUNK_SIZE: usize = 4 * 1024 * 1024;
304
305/// v0.4 #16: pick a chunk size for `streaming_compress_to_frames` based on
306/// the request's `Content-Length` (when known). Smaller objects get smaller
307/// chunks (avoids carrying multi-MiB framing infrastructure on a 64 KiB
308/// upload); large objects get larger chunks (amortises GPU launch overhead
309/// and keeps the sidecar small).
310///
311/// Thresholds:
312/// - `None` (chunked transfer-encoding, size unknown): default 4 MiB
313/// - `<= 1 MiB`:           1 MiB (single chunk for small uploads)
314/// - `1 MiB ..= 100 MiB`:  4 MiB (the v0.2 #4 default; balanced)
315/// - `> 100 MiB`:          16 MiB (fewer frames → less sidecar / GPU overhead)
316pub fn pick_chunk_size(content_length: Option<u64>) -> usize {
317    match content_length {
318        None => DEFAULT_S4F2_CHUNK_SIZE,
319        Some(len) if len <= 1024 * 1024 => 1024 * 1024,
320        Some(len) if len <= 100 * 1024 * 1024 => DEFAULT_S4F2_CHUNK_SIZE,
321        Some(_) => 16 * 1024 * 1024,
322    }
323}
324
325/// `streaming_compress_to_frames` の default in-flight depth (v0.3 #12)。
326///
327/// 同時に走らせる per-chunk compress task の数。chunk K-1 の compress 中に
328/// chunk K の host-side read + crc 計算 + spawn を走らせ、完了したら順次
329/// frame を書き出す pipeline で、CPU codec / GPU codec 両方で大物入力の
330/// total throughput を 2-4× 改善 (issue #12 acceptance)。
331///
332/// 3 を選んだ根拠:
333/// - 1 (= sequential) より明確に速い、4+ にしても reader / writer が
334///   bottleneck で improvement diminishing
335/// - host RAM peak が `N * chunk_size + accumulating output` で予測可能
336///   (3 × 4 MiB = 12 MiB の input buffering vs 1 chunk = 4 MiB)
337pub const DEFAULT_S4F2_INFLIGHT: usize = 3;
338
339/// 入力 `body` を **chunked + framed + pipelined** に圧縮した output を返す
340/// (v0.2 #4 + v0.3 #12)。
341///
342/// 各 chunk を `registry.compress(chunk_kind)` に投げ、最大
343/// [`DEFAULT_S4F2_INFLIGHT`] 件まで in-flight に保つ。 結果は元の chunk 順を
344/// 保持して S4F2 frame として連結。
345///
346/// **wire format**: `[S4F2 frame][S4F2 frame]...[S4F2 frame]` の連結。
347/// 各 frame は self-describing なので reader 側は `multipart::FrameIter` で
348/// そのまま parse 可能 (= 既存 multipart decompress 経路と同じ機構)。
349///
350/// **why chunked single-PUT**:
351/// - Range GET partial-fetch を sidecar で活用可能 (issue #4)
352/// - per-frame CRC で局所 corruption を検出可能
353/// - per-frame codec dispatch (将来 mixed-codec 対応)
354///
355/// **memory peak**: `inflight × chunk_size` (in-flight chunks の input/output) +
356/// `compressed_size` (output buffer accumulating)。
357/// 入力 5 GB を 200 MB に圧縮する case で peak ≈ 12 MiB + 200 MB = ~212 MB
358/// (vs sequential `chunk_size + compressed_size` = ~204 MB)。
359/// `expected_size`: v0.8.4 #73 M2 — when the caller knows how many input
360/// bytes the body is supposed to deliver (e.g. an HTTP `Content-Length`),
361/// pass `Some(n)` and the function fails fast with [`CodecError::TruncatedStream`]
362/// if the input stream returns EOF before `n` bytes were consumed. Without
363/// this guard, a mid-chunk client disconnect would silently turn into a
364/// "successful" PUT of a truncated payload — the rolling CRC would be
365/// computed against the partial input and the GET would happily return the
366/// truncated body. Pass `None` for chunked Transfer-Encoding requests where
367/// the size is genuinely unknown (the upstream backend will still reject a
368/// short chunked body via its own framing).
369pub async fn streaming_compress_to_frames(
370    body: StreamingBlob,
371    registry: Arc<CodecRegistry>,
372    codec_kind: CodecKind,
373    chunk_size: usize,
374    expected_size: Option<u64>,
375) -> Result<(Bytes, ChunkManifest), CodecError> {
376    streaming_compress_to_frames_with(
377        body,
378        registry,
379        codec_kind,
380        chunk_size,
381        DEFAULT_S4F2_INFLIGHT,
382        expected_size,
383    )
384    .await
385}
386
387/// Like [`streaming_compress_to_frames`] but lets callers tune the in-flight
388/// depth — useful in the bench harness, and as the building block any
389/// `streaming_compress_to_frames` callers extend if their workload needs a
390/// non-default pipelining depth.
391pub async fn streaming_compress_to_frames_with(
392    body: StreamingBlob,
393    registry: Arc<CodecRegistry>,
394    codec_kind: CodecKind,
395    chunk_size: usize,
396    inflight: usize,
397    expected_size: Option<u64>,
398) -> Result<(Bytes, ChunkManifest), CodecError> {
399    use bytes::BytesMut;
400    use futures::StreamExt as _;
401    use futures::stream::FuturesOrdered;
402
403    let inflight = inflight.max(1);
404    let mut read = blob_to_async_read(body);
405    let mut framed = BytesMut::with_capacity(chunk_size);
406    let mut rolling_crc: u32 = 0;
407    let mut total_in: u64 = 0;
408    let mut chunk_buf = vec![0u8; chunk_size];
409
410    // Each in-flight task carries the per-chunk frame header (computed
411    // synchronously when the chunk was read) and a JoinHandle that resolves
412    // to the codec output. Ordering is preserved by FuturesOrdered.
413    type InFlight = futures::future::BoxFuture<'static, Result<(FrameHeader, Bytes), CodecError>>;
414    let mut queue: FuturesOrdered<InFlight> = FuturesOrdered::new();
415    let mut eof = false;
416
417    loop {
418        // Refill the in-flight queue.
419        while !eof && queue.len() < inflight {
420            let mut filled = 0;
421            while filled < chunk_size {
422                let n = read
423                    .read(&mut chunk_buf[filled..])
424                    .await
425                    .map_err(CodecError::Io)?;
426                if n == 0 {
427                    break;
428                }
429                filled += n;
430            }
431            if filled == 0 {
432                eof = true;
433                break;
434            }
435
436            let chunk_slice = &chunk_buf[..filled];
437            let chunk_crc = crc32c::crc32c(chunk_slice);
438            rolling_crc = crc32c::crc32c_append(rolling_crc, chunk_slice);
439            total_in += filled as u64;
440
441            let header = FrameHeader {
442                codec: codec_kind,
443                original_size: filled as u64,
444                compressed_size: 0, // patched after compress completes
445                crc32c: chunk_crc,
446            };
447            let original_chunk = Bytes::copy_from_slice(chunk_slice);
448            let registry = Arc::clone(&registry);
449            queue.push_back(Box::pin(async move {
450                let (compressed_chunk, _per_chunk_manifest) =
451                    registry.compress(original_chunk, codec_kind).await?;
452                let mut header = header;
453                header.compressed_size = compressed_chunk.len() as u64;
454                Ok::<_, CodecError>((header, compressed_chunk))
455            }));
456        }
457
458        // Drain the next ready frame in chunk order.
459        match queue.next().await {
460            Some(Ok((header, compressed_chunk))) => {
461                write_frame(&mut framed, header, &compressed_chunk);
462            }
463            Some(Err(e)) => return Err(e),
464            None => break,
465        }
466    }
467
468    // v0.8.4 #73 M2: truncation guard. We're about to declare the produced
469    // bytes as the canonical compressed object — if the caller advertised a
470    // Content-Length and we got fewer bytes (mid-chunk client disconnect,
471    // half-uploaded body, etc.), surface the truncation NOW so the caller
472    // can return 400 to the client. Without this branch, the rolling CRC
473    // would be computed against the partial input, the manifest would
474    // look internally consistent, and a future GET would happily return
475    // the truncated body — silent data loss.
476    if let Some(expected) = expected_size
477        && total_in < expected
478    {
479        return Err(CodecError::TruncatedStream {
480            expected,
481            got: total_in,
482        });
483    }
484
485    let total_framed = framed.len() as u64;
486    Ok((
487        framed.freeze(),
488        ChunkManifest {
489            codec: codec_kind,
490            original_size: total_in,
491            compressed_size: total_framed,
492            crc32c: rolling_crc,
493        },
494    ))
495}
496
497/// `body` を passthrough で集めるだけ。CRC32C も計算する。
498pub async fn streaming_passthrough(
499    body: StreamingBlob,
500) -> Result<(Bytes, ChunkManifest), CodecError> {
501    let mut read = blob_to_async_read(body);
502    let mut buf: Vec<u8> = Vec::with_capacity(256 * 1024);
503    let mut crc: u32 = 0;
504    let mut total: u64 = 0;
505    let mut chunk = vec![0u8; 64 * 1024];
506    loop {
507        let n = read.read(&mut chunk).await.map_err(CodecError::Io)?;
508        if n == 0 {
509            break;
510        }
511        crc = crc32c::crc32c_append(crc, &chunk[..n]);
512        total += n as u64;
513        buf.extend_from_slice(&chunk[..n]);
514    }
515    let len = buf.len() as u64;
516    Ok((
517        Bytes::from(buf),
518        ChunkManifest {
519            codec: CodecKind::Passthrough,
520            original_size: total,
521            compressed_size: len,
522            crc32c: crc,
523        },
524    ))
525}
526
527#[cfg(test)]
528mod tests {
529    use super::*;
530    use bytes::BytesMut;
531    use futures::stream;
532    use futures::stream::StreamExt;
533
534    /// v0.4 #16: pick_chunk_size threshold table.
535    #[test]
536    fn pick_chunk_size_thresholds() {
537        // None (chunked transfer-encoding) → default 4 MiB
538        assert_eq!(pick_chunk_size(None), DEFAULT_S4F2_CHUNK_SIZE);
539        // <= 1 MiB → 1 MiB
540        assert_eq!(pick_chunk_size(Some(0)), 1024 * 1024);
541        assert_eq!(pick_chunk_size(Some(64 * 1024)), 1024 * 1024);
542        assert_eq!(pick_chunk_size(Some(1024 * 1024)), 1024 * 1024);
543        // 1 MiB ..= 100 MiB → 4 MiB (default)
544        assert_eq!(
545            pick_chunk_size(Some(1024 * 1024 + 1)),
546            DEFAULT_S4F2_CHUNK_SIZE
547        );
548        assert_eq!(
549            pick_chunk_size(Some(50 * 1024 * 1024)),
550            DEFAULT_S4F2_CHUNK_SIZE
551        );
552        assert_eq!(
553            pick_chunk_size(Some(100 * 1024 * 1024)),
554            DEFAULT_S4F2_CHUNK_SIZE
555        );
556        // > 100 MiB → 16 MiB
557        assert_eq!(
558            pick_chunk_size(Some(100 * 1024 * 1024 + 1)),
559            16 * 1024 * 1024
560        );
561        assert_eq!(
562            pick_chunk_size(Some(10 * 1024 * 1024 * 1024)),
563            16 * 1024 * 1024
564        );
565    }
566
567    async fn collect(blob: StreamingBlob) -> Bytes {
568        let mut buf = BytesMut::new();
569        let mut s = blob;
570        while let Some(chunk) = s.next().await {
571            buf.extend_from_slice(&chunk.unwrap());
572        }
573        buf.freeze()
574    }
575
576    fn make_blob(b: Bytes) -> StreamingBlob {
577        let stream = stream::once(async move { Ok::<_, std::io::Error>(b) });
578        StreamingBlob::wrap(stream)
579    }
580
581    #[tokio::test]
582    async fn cpu_zstd_streaming_roundtrip_small() {
583        let original = Bytes::from("the quick brown fox jumps over the lazy dog. ".repeat(100));
584        let compressed = zstd::stream::encode_all(original.as_ref(), 3).unwrap();
585        let blob = make_blob(Bytes::from(compressed));
586        let out_blob = cpu_zstd_decompress_stream(blob);
587        let out = collect(out_blob).await;
588        assert_eq!(out, original);
589    }
590
591    #[tokio::test]
592    async fn cpu_zstd_streaming_handles_chunked_input() {
593        let original = Bytes::from(vec![b'x'; 1_000_000]);
594        let compressed = zstd::stream::encode_all(original.as_ref(), 3).unwrap();
595        // Split compressed into many small chunks to stress the streaming decoder.
596        let mut chunks = Vec::new();
597        for chunk in compressed.chunks(1024) {
598            chunks.push(Ok::<_, std::io::Error>(Bytes::copy_from_slice(chunk)));
599        }
600        let in_stream = stream::iter(chunks);
601        let blob = StreamingBlob::wrap(in_stream);
602        let out_blob = cpu_zstd_decompress_stream(blob);
603        let out = collect(out_blob).await;
604        assert_eq!(out, original);
605    }
606
607    #[tokio::test]
608    async fn streaming_passes_through_for_passthrough() {
609        let original = Bytes::from_static(b"hello");
610        let blob = make_blob(original.clone());
611        let out_blob = async_read_to_blob(blob_to_async_read(blob));
612        let out = collect(out_blob).await;
613        assert_eq!(out, original);
614    }
615
616    #[tokio::test]
617    async fn streaming_compress_then_decompress_roundtrip() {
618        let original = Bytes::from(vec![b'q'; 200_000]);
619        let blob = make_blob(original.clone());
620        let (compressed, manifest) = streaming_compress_cpu_zstd(blob, 3).await.unwrap();
621        assert!(
622            compressed.len() < original.len() / 100,
623            "should be highly compressible"
624        );
625        assert_eq!(manifest.codec, CodecKind::CpuZstd);
626        assert_eq!(manifest.original_size, original.len() as u64);
627        assert_eq!(manifest.compressed_size, compressed.len() as u64);
628        // crc32c は all-in-one と一致する
629        assert_eq!(manifest.crc32c, crc32c::crc32c(&original));
630
631        // Decompress 経路で完全に元に戻る
632        let decompressed_blob = cpu_zstd_decompress_stream(make_blob(compressed));
633        let out = collect(decompressed_blob).await;
634        assert_eq!(out, original);
635    }
636
637    /// Verifies that `cpu_zstd_decompress_stream` correctly handles
638    /// multi-frame zstd streams (multi-call CPU encoder output produces
639    /// concatenated valid zstd frames per RFC 8478). `multiple_members(true)`
640    /// on the async_compression decoder is what makes this work.
641    #[tokio::test]
642    async fn concatenated_zstd_frames_are_a_single_valid_stream() {
643        let chunk_a = Bytes::from(vec![b'a'; 50_000]);
644        let chunk_b = Bytes::from(vec![b'b'; 50_000]);
645        let chunk_c = Bytes::from(vec![b'c'; 50_000]);
646
647        let frame_a = zstd::stream::encode_all(chunk_a.as_ref(), 3).unwrap();
648        let frame_b = zstd::stream::encode_all(chunk_b.as_ref(), 3).unwrap();
649        let frame_c = zstd::stream::encode_all(chunk_c.as_ref(), 3).unwrap();
650
651        let mut concatenated: Vec<u8> = Vec::new();
652        concatenated.extend_from_slice(&frame_a);
653        concatenated.extend_from_slice(&frame_b);
654        concatenated.extend_from_slice(&frame_c);
655
656        let expected: Vec<u8> = chunk_a
657            .iter()
658            .chain(chunk_b.iter())
659            .chain(chunk_c.iter())
660            .copied()
661            .collect();
662
663        let blob = make_blob(Bytes::from(concatenated));
664        let out_blob = cpu_zstd_decompress_stream(blob);
665        let out = collect(out_blob).await;
666        assert_eq!(out, Bytes::from(expected));
667    }
668
669    /// Validates the chunked pipeline shape (chunk size, CRC accumulation,
670    /// manifest aggregation, roundtrip via streaming CPU zstd decoder) used
671    /// by both `streaming_compress_cpu_zstd` and the GPU codec paths in
672    /// `streaming_compress_to_frames`.
673    #[tokio::test]
674    async fn streaming_chunked_compress_pipeline_roundtrip() {
675        // Use cpu zstd as a stand-in for the GPU codec to exercise the same
676        // chunking / CRC / output-concat pipeline that the nvcomp path uses.
677        // The nvcomp variant differs only in which codec processes each chunk.
678        async fn streaming_compress_chunked_cpu_zstd(
679            body: StreamingBlob,
680            chunk_size: usize,
681        ) -> Result<(Bytes, ChunkManifest), CodecError> {
682            let mut read = blob_to_async_read(body);
683            let mut compressed_buf: Vec<u8> = Vec::with_capacity(chunk_size / 2);
684            let mut crc: u32 = 0;
685            let mut total_in: u64 = 0;
686            let mut chunk_buf = vec![0u8; chunk_size];
687            loop {
688                let mut filled = 0;
689                while filled < chunk_size {
690                    let n = read
691                        .read(&mut chunk_buf[filled..])
692                        .await
693                        .map_err(CodecError::Io)?;
694                    if n == 0 {
695                        break;
696                    }
697                    filled += n;
698                }
699                if filled == 0 {
700                    break;
701                }
702                crc = crc32c::crc32c_append(crc, &chunk_buf[..filled]);
703                total_in += filled as u64;
704                let compressed_chunk =
705                    zstd::stream::encode_all(&chunk_buf[..filled], 3).map_err(CodecError::Io)?;
706                compressed_buf.extend_from_slice(&compressed_chunk);
707            }
708            let compressed_len = compressed_buf.len() as u64;
709            Ok((
710                Bytes::from(compressed_buf),
711                ChunkManifest {
712                    codec: CodecKind::CpuZstd,
713                    original_size: total_in,
714                    compressed_size: compressed_len,
715                    crc32c: crc,
716                },
717            ))
718        }
719
720        // 256 KiB input split into 8 chunks of 32 KiB.
721        let original = Bytes::from(
722            (0u32..65_536)
723                .flat_map(|n| n.to_le_bytes())
724                .collect::<Vec<u8>>(),
725        );
726        assert_eq!(original.len(), 262_144);
727
728        let blob = make_blob(original.clone());
729        let (compressed, manifest) = streaming_compress_chunked_cpu_zstd(blob, 32 * 1024)
730            .await
731            .unwrap();
732
733        assert_eq!(manifest.original_size, original.len() as u64);
734        assert_eq!(manifest.compressed_size, compressed.len() as u64);
735        assert_eq!(manifest.crc32c, crc32c::crc32c(&original));
736
737        // Decompress via the streaming CPU decoder (same path the GET handler uses).
738        let decompressed_blob = cpu_zstd_decompress_stream(make_blob(compressed));
739        let out = collect(decompressed_blob).await;
740        assert_eq!(out, original);
741    }
742
743    #[tokio::test]
744    async fn streaming_passthrough_yields_input_unchanged() {
745        let original = Bytes::from_static(b"hello world");
746        let (out, manifest) = streaming_passthrough(make_blob(original.clone()))
747            .await
748            .unwrap();
749        assert_eq!(out, original);
750        assert_eq!(manifest.codec, CodecKind::Passthrough);
751        assert_eq!(manifest.original_size, original.len() as u64);
752        assert_eq!(manifest.compressed_size, original.len() as u64);
753        assert_eq!(manifest.crc32c, crc32c::crc32c(&original));
754    }
755
756    // =================================================================
757    // v0.8.4 #73 H-1 + M2 unit coverage.
758    // =================================================================
759
760    /// v0.8.4 #73 H-1: a verifier wrapped around a clean stream must
761    /// emit exactly the inner bytes and report success at EOF.
762    #[tokio::test]
763    async fn crc32c_verifying_reader_passes_correct_crc() {
764        use tokio::io::AsyncReadExt as _;
765        let original = Bytes::from(vec![0xa3u8; 17_000]);
766        let crc = crc32c::crc32c(&original);
767        let inner = blob_to_async_read(make_blob(original.clone()));
768        let mut verifier = Crc32cVerifyingReader::new(inner, crc, original.len() as u64);
769        let mut out = Vec::new();
770        verifier
771            .read_to_end(&mut out)
772            .await
773            .expect("clean stream must read cleanly");
774        assert_eq!(out, original.as_ref());
775        // Post-condition: rolling CRC + bytes_read agree with manifest.
776        assert_eq!(verifier.rolling_crc(), crc);
777        assert_eq!(verifier.bytes_read(), original.len() as u64);
778    }
779
780    /// v0.8.4 #73 H-1: a verifier that sees corruption (rolling CRC
781    /// differs from the manifest's CRC at EOF) must surface an
782    /// `InvalidData` io error to the consumer instead of returning the
783    /// bytes silently. This is the streaming-GET integrity guarantee.
784    #[tokio::test]
785    async fn crc32c_verifying_reader_detects_corruption() {
786        use tokio::io::AsyncReadExt as _;
787        let original = Bytes::from_static(b"clean payload bytes");
788        let real_crc = crc32c::crc32c(&original);
789        // Wrap the *same* bytes but tell the verifier to expect a
790        // *different* CRC — equivalent to the upstream having tampered
791        // with the body (or a back-end corruption that the zstd decoder
792        // happened to silently decode into different bytes).
793        let bogus_expected_crc = real_crc.wrapping_add(1);
794        let inner = blob_to_async_read(make_blob(original.clone()));
795        let mut verifier =
796            Crc32cVerifyingReader::new(inner, bogus_expected_crc, original.len() as u64);
797        let mut out = Vec::new();
798        let err = verifier
799            .read_to_end(&mut out)
800            .await
801            .expect_err("CRC mismatch must surface as io::Error");
802        assert_eq!(err.kind(), io::ErrorKind::InvalidData);
803        let msg = err.to_string();
804        assert!(
805            msg.contains("crc32c mismatch"),
806            "error must mention CRC mismatch, got `{msg}`"
807        );
808        // The bytes ARE delivered before the EOF verify (streaming is
809        // pass-through); the integrity decision lands at EOF.
810        assert_eq!(out, original.as_ref());
811    }
812
813    /// v0.8.4 #73 M2: `streaming_compress_to_frames` must reject a body
814    /// whose stream produces fewer bytes than `expected_size` (mid-PUT
815    /// truncation / client disconnect) with `TruncatedStream`.
816    #[tokio::test]
817    async fn streaming_compress_truncated_input_returns_truncated_stream_error() {
818        use s4_codec::cpu_zstd::CpuZstd;
819        let registry =
820            Arc::new(CodecRegistry::new(CodecKind::CpuZstd).with(Arc::new(CpuZstd::default())));
821        // The synthetic body yields exactly 4 KiB but the caller
822        // *advertises* 16 KiB — the same shape as a client that
823        // disconnected after 25% of the upload.
824        let actual = Bytes::from(vec![b'z'; 4096]);
825        let advertised: u64 = 16 * 1024;
826        let blob = make_blob(actual.clone());
827        let err = streaming_compress_to_frames(
828            blob,
829            registry,
830            CodecKind::CpuZstd,
831            1024,
832            Some(advertised),
833        )
834        .await
835        .expect_err("truncated stream must error");
836        match err {
837            CodecError::TruncatedStream { expected, got } => {
838                assert_eq!(expected, advertised);
839                assert_eq!(got, actual.len() as u64);
840            }
841            other => panic!("expected TruncatedStream, got {other:?}"),
842        }
843    }
844}