Skip to main content

s4_server/
streaming.rs

1//! Streaming compression / decompression helpers。
2//!
3//! `s4_codec::Codec` の bytes-in / bytes-out API は memory cap (5 GiB) を持つため
4//! 大規模オブジェクトで OOM 危険。本 module は `async-compression` 経由で zstd を
5//! AsyncRead/AsyncWrite に差し込み、`StreamingBlob` (= `futures::Stream<Bytes>`)
6//! ↔ AsyncRead を `tokio_util::io` で橋渡しする。
7//!
8//! ## 対応 codec
9//!
10//! - **CpuZstd**: `async_compression::tokio::bufread::ZstdDecoder` で完全 streaming
11//! - **Passthrough**: 入力 stream をそのまま返す (ゼロコスト streaming)
12//! - **NvcompZstd / NvcompBitcomp**: nvCOMP は batch API のため per-chunk batch 処理
13//!   (Phase 2.1 で追加予定、現状は default の bytes-based に fallback)
14//!
15//! ## 整合性検証
16//!
17//! Streaming GET では bytes 全体の CRC32C をオンザフライで計算しつつ stream を
18//! 流す `Crc32cVerifier` adapter を被せる。最後の chunk が yield された時点で
19//! manifest.crc32c と比較し、不一致なら error として伝播 (= client 側で
20//! body parse 失敗として現れる)。
21
22use std::io;
23use std::pin::Pin;
24use std::task::{Context, Poll};
25
26use async_compression::Level;
27use async_compression::tokio::bufread::ZstdDecoder;
28use async_compression::tokio::write::ZstdEncoder;
29use bytes::Bytes;
30use futures::{Stream, StreamExt};
31use s3s::StdError;
32use s3s::dto::StreamingBlob;
33use s3s::stream::{ByteStream, RemainingLength};
34use s4_codec::multipart::{FrameHeader, write_frame};
35use s4_codec::{ChunkManifest, CodecError, CodecKind, CodecRegistry};
36use std::sync::Arc;
37use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt, BufReader};
38use tokio_util::io::{ReaderStream, StreamReader};
39
40/// `StreamingBlob` を AsyncRead として扱えるラッパ。
41///
42/// `s3s::dto::StreamingBlob` は `futures::Stream<Item = Result<Bytes, StdError>>`
43/// なので、`tokio_util::io::StreamReader` を使うと `tokio::io::AsyncRead` に変換できる。
44/// ただし StreamReader は `std::io::Error` を期待するので、StdError → io::Error への
45/// 変換層を挟む必要がある。
46pub fn blob_to_async_read(blob: StreamingBlob) -> impl AsyncRead + Unpin + Send + Sync {
47    let mapped = blob.map(|chunk| chunk.map_err(|e| io::Error::other(e.to_string())));
48    StreamReader::new(mapped)
49}
50
51/// `AsyncRead` を 1 chunk = 64 KiB の `StreamingBlob` に変換 (size 不明の chunked stream)。
52pub fn async_read_to_blob<R: AsyncRead + Unpin + Send + Sync + 'static>(
53    reader: R,
54) -> StreamingBlob {
55    let stream = ReaderStream::new(reader).map(|res| res.map_err(|e| Box::new(e) as StdError));
56    StreamingBlob::new(StreamWrapper { inner: stream })
57}
58
59pin_project_lite::pin_project! {
60    /// Stream<Item=Result<Bytes, StdError>> に ByteStream impl を生やすラッパ。
61    /// remaining_length は unknown を返す (streaming = size 未知)。
62    struct StreamWrapper<S> { #[pin] inner: S }
63}
64
65impl<S> Stream for StreamWrapper<S>
66where
67    S: Stream<Item = Result<Bytes, StdError>>,
68{
69    type Item = Result<Bytes, StdError>;
70    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
71        self.project().inner.poll_next(cx)
72    }
73    fn size_hint(&self) -> (usize, Option<usize>) {
74        self.inner.size_hint()
75    }
76}
77
78impl<S> ByteStream for StreamWrapper<S>
79where
80    S: Stream<Item = Result<Bytes, StdError>> + Send + Sync,
81{
82    fn remaining_length(&self) -> RemainingLength {
83        // streaming output: size unknown
84        RemainingLength::unknown()
85    }
86}
87
88/// CpuZstd で `body` を **streaming** に decompress した `StreamingBlob` を返す。
89///
90/// memory peak は zstd window size + chunk size 程度 (typically <10 MiB)。
91/// TTFB は最初の chunk が decompress された時点で client に渡る。
92///
93/// **multi-frame 対応**: zstd 仕様で「複数 frame の連結 = 1 つの valid な zstd
94/// stream」と定義されているため、`streaming_compress_cpu_zstd` のような per-chunk
95/// 圧縮された連結出力もそのまま decode 可能。`async_compression` の default は
96/// single-frame のため、明示的に `multiple_members(true)` を有効化。
97pub fn cpu_zstd_decompress_stream(body: StreamingBlob) -> StreamingBlob {
98    let read = blob_to_async_read(body);
99    let mut decoder = ZstdDecoder::new(BufReader::new(read));
100    decoder.multiple_members(true);
101    async_read_to_blob(decoder)
102}
103
104/// codec が streaming-aware かを判定 (S4Service 側で fast path 分岐に使う)。
105pub fn supports_streaming_decompress(codec: CodecKind) -> bool {
106    // NvcompZstd の出力は zstd frame の連結 (zstd 仕様で valid な single stream) なので
107    // CPU zstd decoder で stream decompress 可能。NvcompBitcomp は per-chunk metadata が
108    // 別形式なので未対応 (HLIF self-describing 化を待つ)。
109    matches!(
110        codec,
111        CodecKind::Passthrough | CodecKind::CpuZstd | CodecKind::NvcompZstd
112    )
113}
114
115pub fn supports_streaming_compress(codec: CodecKind) -> bool {
116    #[cfg(feature = "nvcomp-gpu")]
117    {
118        matches!(
119            codec,
120            CodecKind::Passthrough | CodecKind::CpuZstd | CodecKind::NvcompZstd
121        )
122    }
123    #[cfg(not(feature = "nvcomp-gpu"))]
124    {
125        matches!(codec, CodecKind::Passthrough | CodecKind::CpuZstd)
126    }
127}
128
129/// `body` を CPU zstd で **input-streaming** 圧縮し、(compressed bytes, manifest)
130/// を返す。memory peak は zstd window + 64 KiB read buffer + 圧縮済 output ≈
131/// `compressed_size + ~100 MB`。**入力 5 GB を 100 MB に圧縮するケースで peak は
132/// ~200 MB** (vs. naive bytes-buffered だと peak 5 GB)。
133///
134/// 圧縮済 output 全体を Bytes として保持する理由は backend (aws-sdk-s3) が
135/// chunked-without-content-length を SigV4 chunked encoding interceptor で reject
136/// するため。Phase 2.1 で SigV4 streaming mode に対応すれば true streaming PUT
137/// (compressed output も streaming) に拡張可。
138pub async fn streaming_compress_cpu_zstd(
139    body: StreamingBlob,
140    level: i32,
141) -> Result<(Bytes, ChunkManifest), CodecError> {
142    let mut read = blob_to_async_read(body);
143    let mut compressed_buf: Vec<u8> = Vec::with_capacity(256 * 1024);
144    let mut crc: u32 = 0;
145    let mut total_in: u64 = 0;
146    let mut in_buf = vec![0u8; 64 * 1024];
147
148    {
149        let mut encoder = ZstdEncoder::with_quality(&mut compressed_buf, Level::Precise(level));
150        loop {
151            let n = read.read(&mut in_buf).await.map_err(CodecError::Io)?;
152            if n == 0 {
153                break;
154            }
155            crc = crc32c::crc32c_append(crc, &in_buf[..n]);
156            total_in += n as u64;
157            encoder
158                .write_all(&in_buf[..n])
159                .await
160                .map_err(CodecError::Io)?;
161        }
162        encoder.shutdown().await.map_err(CodecError::Io)?;
163    }
164
165    let compressed_len = compressed_buf.len() as u64;
166    Ok((
167        Bytes::from(compressed_buf),
168        ChunkManifest {
169            codec: CodecKind::CpuZstd,
170            original_size: total_in,
171            compressed_size: compressed_len,
172            crc32c: crc,
173        },
174    ))
175}
176
177/// `streaming_compress_to_frames` の default chunk size。
178///
179/// 4 MiB を選んだ根拠:
180/// - Range GET 1 件の最小帯域 ~= compressed_size_per_chunk (~数百 KB-1 MB)
181/// - chunk 数が現実的 (1 GiB object → 256 frames、sidecar < 10 KB)
182/// - CPU/GPU codec の per-call overhead が amortized
183pub const DEFAULT_S4F2_CHUNK_SIZE: usize = 4 * 1024 * 1024;
184
185/// v0.4 #16: pick a chunk size for `streaming_compress_to_frames` based on
186/// the request's `Content-Length` (when known). Smaller objects get smaller
187/// chunks (avoids carrying multi-MiB framing infrastructure on a 64 KiB
188/// upload); large objects get larger chunks (amortises GPU launch overhead
189/// and keeps the sidecar small).
190///
191/// Thresholds:
192/// - `None` (chunked transfer-encoding, size unknown): default 4 MiB
193/// - `<= 1 MiB`:           1 MiB (single chunk for small uploads)
194/// - `1 MiB ..= 100 MiB`:  4 MiB (the v0.2 #4 default; balanced)
195/// - `> 100 MiB`:          16 MiB (fewer frames → less sidecar / GPU overhead)
196pub fn pick_chunk_size(content_length: Option<u64>) -> usize {
197    match content_length {
198        None => DEFAULT_S4F2_CHUNK_SIZE,
199        Some(len) if len <= 1024 * 1024 => 1024 * 1024,
200        Some(len) if len <= 100 * 1024 * 1024 => DEFAULT_S4F2_CHUNK_SIZE,
201        Some(_) => 16 * 1024 * 1024,
202    }
203}
204
205/// `streaming_compress_to_frames` の default in-flight depth (v0.3 #12)。
206///
207/// 同時に走らせる per-chunk compress task の数。chunk K-1 の compress 中に
208/// chunk K の host-side read + crc 計算 + spawn を走らせ、完了したら順次
209/// frame を書き出す pipeline で、CPU codec / GPU codec 両方で大物入力の
210/// total throughput を 2-4× 改善 (issue #12 acceptance)。
211///
212/// 3 を選んだ根拠:
213/// - 1 (= sequential) より明確に速い、4+ にしても reader / writer が
214///   bottleneck で improvement diminishing
215/// - host RAM peak が `N * chunk_size + accumulating output` で予測可能
216///   (3 × 4 MiB = 12 MiB の input buffering vs 1 chunk = 4 MiB)
217pub const DEFAULT_S4F2_INFLIGHT: usize = 3;
218
219/// 入力 `body` を **chunked + framed + pipelined** に圧縮した output を返す
220/// (v0.2 #4 + v0.3 #12)。
221///
222/// 各 chunk を `registry.compress(chunk_kind)` に投げ、最大
223/// [`DEFAULT_S4F2_INFLIGHT`] 件まで in-flight に保つ。 結果は元の chunk 順を
224/// 保持して S4F2 frame として連結。
225///
226/// **wire format**: `[S4F2 frame][S4F2 frame]...[S4F2 frame]` の連結。
227/// 各 frame は self-describing なので reader 側は `multipart::FrameIter` で
228/// そのまま parse 可能 (= 既存 multipart decompress 経路と同じ機構)。
229///
230/// **why chunked single-PUT**:
231/// - Range GET partial-fetch を sidecar で活用可能 (issue #4)
232/// - per-frame CRC で局所 corruption を検出可能
233/// - per-frame codec dispatch (将来 mixed-codec 対応)
234///
235/// **memory peak**: `inflight × chunk_size` (in-flight chunks の input/output) +
236/// `compressed_size` (output buffer accumulating)。
237/// 入力 5 GB を 200 MB に圧縮する case で peak ≈ 12 MiB + 200 MB = ~212 MB
238/// (vs sequential `chunk_size + compressed_size` = ~204 MB)。
239pub async fn streaming_compress_to_frames(
240    body: StreamingBlob,
241    registry: Arc<CodecRegistry>,
242    codec_kind: CodecKind,
243    chunk_size: usize,
244) -> Result<(Bytes, ChunkManifest), CodecError> {
245    streaming_compress_to_frames_with(
246        body,
247        registry,
248        codec_kind,
249        chunk_size,
250        DEFAULT_S4F2_INFLIGHT,
251    )
252    .await
253}
254
255/// Like [`streaming_compress_to_frames`] but lets callers tune the in-flight
256/// depth — useful in the bench harness, and as the building block any
257/// `streaming_compress_to_frames` callers extend if their workload needs a
258/// non-default pipelining depth.
259pub async fn streaming_compress_to_frames_with(
260    body: StreamingBlob,
261    registry: Arc<CodecRegistry>,
262    codec_kind: CodecKind,
263    chunk_size: usize,
264    inflight: usize,
265) -> Result<(Bytes, ChunkManifest), CodecError> {
266    use bytes::BytesMut;
267    use futures::StreamExt as _;
268    use futures::stream::FuturesOrdered;
269
270    let inflight = inflight.max(1);
271    let mut read = blob_to_async_read(body);
272    let mut framed = BytesMut::with_capacity(chunk_size);
273    let mut rolling_crc: u32 = 0;
274    let mut total_in: u64 = 0;
275    let mut chunk_buf = vec![0u8; chunk_size];
276
277    // Each in-flight task carries the per-chunk frame header (computed
278    // synchronously when the chunk was read) and a JoinHandle that resolves
279    // to the codec output. Ordering is preserved by FuturesOrdered.
280    type InFlight = futures::future::BoxFuture<'static, Result<(FrameHeader, Bytes), CodecError>>;
281    let mut queue: FuturesOrdered<InFlight> = FuturesOrdered::new();
282    let mut eof = false;
283
284    loop {
285        // Refill the in-flight queue.
286        while !eof && queue.len() < inflight {
287            let mut filled = 0;
288            while filled < chunk_size {
289                let n = read
290                    .read(&mut chunk_buf[filled..])
291                    .await
292                    .map_err(CodecError::Io)?;
293                if n == 0 {
294                    break;
295                }
296                filled += n;
297            }
298            if filled == 0 {
299                eof = true;
300                break;
301            }
302
303            let chunk_slice = &chunk_buf[..filled];
304            let chunk_crc = crc32c::crc32c(chunk_slice);
305            rolling_crc = crc32c::crc32c_append(rolling_crc, chunk_slice);
306            total_in += filled as u64;
307
308            let header = FrameHeader {
309                codec: codec_kind,
310                original_size: filled as u64,
311                compressed_size: 0, // patched after compress completes
312                crc32c: chunk_crc,
313            };
314            let original_chunk = Bytes::copy_from_slice(chunk_slice);
315            let registry = Arc::clone(&registry);
316            queue.push_back(Box::pin(async move {
317                let (compressed_chunk, _per_chunk_manifest) =
318                    registry.compress(original_chunk, codec_kind).await?;
319                let mut header = header;
320                header.compressed_size = compressed_chunk.len() as u64;
321                Ok::<_, CodecError>((header, compressed_chunk))
322            }));
323        }
324
325        // Drain the next ready frame in chunk order.
326        match queue.next().await {
327            Some(Ok((header, compressed_chunk))) => {
328                write_frame(&mut framed, header, &compressed_chunk);
329            }
330            Some(Err(e)) => return Err(e),
331            None => break,
332        }
333    }
334
335    let total_framed = framed.len() as u64;
336    Ok((
337        framed.freeze(),
338        ChunkManifest {
339            codec: codec_kind,
340            original_size: total_in,
341            compressed_size: total_framed,
342            crc32c: rolling_crc,
343        },
344    ))
345}
346
347/// `body` を passthrough で集めるだけ。CRC32C も計算する。
348pub async fn streaming_passthrough(
349    body: StreamingBlob,
350) -> Result<(Bytes, ChunkManifest), CodecError> {
351    let mut read = blob_to_async_read(body);
352    let mut buf: Vec<u8> = Vec::with_capacity(256 * 1024);
353    let mut crc: u32 = 0;
354    let mut total: u64 = 0;
355    let mut chunk = vec![0u8; 64 * 1024];
356    loop {
357        let n = read.read(&mut chunk).await.map_err(CodecError::Io)?;
358        if n == 0 {
359            break;
360        }
361        crc = crc32c::crc32c_append(crc, &chunk[..n]);
362        total += n as u64;
363        buf.extend_from_slice(&chunk[..n]);
364    }
365    let len = buf.len() as u64;
366    Ok((
367        Bytes::from(buf),
368        ChunkManifest {
369            codec: CodecKind::Passthrough,
370            original_size: total,
371            compressed_size: len,
372            crc32c: crc,
373        },
374    ))
375}
376
377#[cfg(test)]
378mod tests {
379    use super::*;
380    use bytes::BytesMut;
381    use futures::stream;
382    use futures::stream::StreamExt;
383
384    /// v0.4 #16: pick_chunk_size threshold table.
385    #[test]
386    fn pick_chunk_size_thresholds() {
387        // None (chunked transfer-encoding) → default 4 MiB
388        assert_eq!(pick_chunk_size(None), DEFAULT_S4F2_CHUNK_SIZE);
389        // <= 1 MiB → 1 MiB
390        assert_eq!(pick_chunk_size(Some(0)), 1024 * 1024);
391        assert_eq!(pick_chunk_size(Some(64 * 1024)), 1024 * 1024);
392        assert_eq!(pick_chunk_size(Some(1024 * 1024)), 1024 * 1024);
393        // 1 MiB ..= 100 MiB → 4 MiB (default)
394        assert_eq!(
395            pick_chunk_size(Some(1024 * 1024 + 1)),
396            DEFAULT_S4F2_CHUNK_SIZE
397        );
398        assert_eq!(
399            pick_chunk_size(Some(50 * 1024 * 1024)),
400            DEFAULT_S4F2_CHUNK_SIZE
401        );
402        assert_eq!(
403            pick_chunk_size(Some(100 * 1024 * 1024)),
404            DEFAULT_S4F2_CHUNK_SIZE
405        );
406        // > 100 MiB → 16 MiB
407        assert_eq!(
408            pick_chunk_size(Some(100 * 1024 * 1024 + 1)),
409            16 * 1024 * 1024
410        );
411        assert_eq!(
412            pick_chunk_size(Some(10 * 1024 * 1024 * 1024)),
413            16 * 1024 * 1024
414        );
415    }
416
417    async fn collect(blob: StreamingBlob) -> Bytes {
418        let mut buf = BytesMut::new();
419        let mut s = blob;
420        while let Some(chunk) = s.next().await {
421            buf.extend_from_slice(&chunk.unwrap());
422        }
423        buf.freeze()
424    }
425
426    fn make_blob(b: Bytes) -> StreamingBlob {
427        let stream = stream::once(async move { Ok::<_, std::io::Error>(b) });
428        StreamingBlob::wrap(stream)
429    }
430
431    #[tokio::test]
432    async fn cpu_zstd_streaming_roundtrip_small() {
433        let original = Bytes::from("the quick brown fox jumps over the lazy dog. ".repeat(100));
434        let compressed = zstd::stream::encode_all(original.as_ref(), 3).unwrap();
435        let blob = make_blob(Bytes::from(compressed));
436        let out_blob = cpu_zstd_decompress_stream(blob);
437        let out = collect(out_blob).await;
438        assert_eq!(out, original);
439    }
440
441    #[tokio::test]
442    async fn cpu_zstd_streaming_handles_chunked_input() {
443        let original = Bytes::from(vec![b'x'; 1_000_000]);
444        let compressed = zstd::stream::encode_all(original.as_ref(), 3).unwrap();
445        // Split compressed into many small chunks to stress the streaming decoder.
446        let mut chunks = Vec::new();
447        for chunk in compressed.chunks(1024) {
448            chunks.push(Ok::<_, std::io::Error>(Bytes::copy_from_slice(chunk)));
449        }
450        let in_stream = stream::iter(chunks);
451        let blob = StreamingBlob::wrap(in_stream);
452        let out_blob = cpu_zstd_decompress_stream(blob);
453        let out = collect(out_blob).await;
454        assert_eq!(out, original);
455    }
456
457    #[tokio::test]
458    async fn streaming_passes_through_for_passthrough() {
459        let original = Bytes::from_static(b"hello");
460        let blob = make_blob(original.clone());
461        let out_blob = async_read_to_blob(blob_to_async_read(blob));
462        let out = collect(out_blob).await;
463        assert_eq!(out, original);
464    }
465
466    #[tokio::test]
467    async fn streaming_compress_then_decompress_roundtrip() {
468        let original = Bytes::from(vec![b'q'; 200_000]);
469        let blob = make_blob(original.clone());
470        let (compressed, manifest) = streaming_compress_cpu_zstd(blob, 3).await.unwrap();
471        assert!(
472            compressed.len() < original.len() / 100,
473            "should be highly compressible"
474        );
475        assert_eq!(manifest.codec, CodecKind::CpuZstd);
476        assert_eq!(manifest.original_size, original.len() as u64);
477        assert_eq!(manifest.compressed_size, compressed.len() as u64);
478        // crc32c は all-in-one と一致する
479        assert_eq!(manifest.crc32c, crc32c::crc32c(&original));
480
481        // Decompress 経路で完全に元に戻る
482        let decompressed_blob = cpu_zstd_decompress_stream(make_blob(compressed));
483        let out = collect(decompressed_blob).await;
484        assert_eq!(out, original);
485    }
486
487    /// Verifies that `cpu_zstd_decompress_stream` correctly handles
488    /// multi-frame zstd streams (multi-call CPU encoder output produces
489    /// concatenated valid zstd frames per RFC 8478). `multiple_members(true)`
490    /// on the async_compression decoder is what makes this work.
491    #[tokio::test]
492    async fn concatenated_zstd_frames_are_a_single_valid_stream() {
493        let chunk_a = Bytes::from(vec![b'a'; 50_000]);
494        let chunk_b = Bytes::from(vec![b'b'; 50_000]);
495        let chunk_c = Bytes::from(vec![b'c'; 50_000]);
496
497        let frame_a = zstd::stream::encode_all(chunk_a.as_ref(), 3).unwrap();
498        let frame_b = zstd::stream::encode_all(chunk_b.as_ref(), 3).unwrap();
499        let frame_c = zstd::stream::encode_all(chunk_c.as_ref(), 3).unwrap();
500
501        let mut concatenated: Vec<u8> = Vec::new();
502        concatenated.extend_from_slice(&frame_a);
503        concatenated.extend_from_slice(&frame_b);
504        concatenated.extend_from_slice(&frame_c);
505
506        let expected: Vec<u8> = chunk_a
507            .iter()
508            .chain(chunk_b.iter())
509            .chain(chunk_c.iter())
510            .copied()
511            .collect();
512
513        let blob = make_blob(Bytes::from(concatenated));
514        let out_blob = cpu_zstd_decompress_stream(blob);
515        let out = collect(out_blob).await;
516        assert_eq!(out, Bytes::from(expected));
517    }
518
519    /// Validates the chunked pipeline shape (chunk size, CRC accumulation,
520    /// manifest aggregation, roundtrip via streaming CPU zstd decoder) used
521    /// by both `streaming_compress_cpu_zstd` and the GPU codec paths in
522    /// `streaming_compress_to_frames`.
523    #[tokio::test]
524    async fn streaming_chunked_compress_pipeline_roundtrip() {
525        // Use cpu zstd as a stand-in for the GPU codec to exercise the same
526        // chunking / CRC / output-concat pipeline that the nvcomp path uses.
527        // The nvcomp variant differs only in which codec processes each chunk.
528        async fn streaming_compress_chunked_cpu_zstd(
529            body: StreamingBlob,
530            chunk_size: usize,
531        ) -> Result<(Bytes, ChunkManifest), CodecError> {
532            let mut read = blob_to_async_read(body);
533            let mut compressed_buf: Vec<u8> = Vec::with_capacity(chunk_size / 2);
534            let mut crc: u32 = 0;
535            let mut total_in: u64 = 0;
536            let mut chunk_buf = vec![0u8; chunk_size];
537            loop {
538                let mut filled = 0;
539                while filled < chunk_size {
540                    let n = read
541                        .read(&mut chunk_buf[filled..])
542                        .await
543                        .map_err(CodecError::Io)?;
544                    if n == 0 {
545                        break;
546                    }
547                    filled += n;
548                }
549                if filled == 0 {
550                    break;
551                }
552                crc = crc32c::crc32c_append(crc, &chunk_buf[..filled]);
553                total_in += filled as u64;
554                let compressed_chunk =
555                    zstd::stream::encode_all(&chunk_buf[..filled], 3).map_err(CodecError::Io)?;
556                compressed_buf.extend_from_slice(&compressed_chunk);
557            }
558            let compressed_len = compressed_buf.len() as u64;
559            Ok((
560                Bytes::from(compressed_buf),
561                ChunkManifest {
562                    codec: CodecKind::CpuZstd,
563                    original_size: total_in,
564                    compressed_size: compressed_len,
565                    crc32c: crc,
566                },
567            ))
568        }
569
570        // 256 KiB input split into 8 chunks of 32 KiB.
571        let original = Bytes::from(
572            (0u32..65_536)
573                .flat_map(|n| n.to_le_bytes())
574                .collect::<Vec<u8>>(),
575        );
576        assert_eq!(original.len(), 262_144);
577
578        let blob = make_blob(original.clone());
579        let (compressed, manifest) = streaming_compress_chunked_cpu_zstd(blob, 32 * 1024)
580            .await
581            .unwrap();
582
583        assert_eq!(manifest.original_size, original.len() as u64);
584        assert_eq!(manifest.compressed_size, compressed.len() as u64);
585        assert_eq!(manifest.crc32c, crc32c::crc32c(&original));
586
587        // Decompress via the streaming CPU decoder (same path the GET handler uses).
588        let decompressed_blob = cpu_zstd_decompress_stream(make_blob(compressed));
589        let out = collect(decompressed_blob).await;
590        assert_eq!(out, original);
591    }
592
593    #[tokio::test]
594    async fn streaming_passthrough_yields_input_unchanged() {
595        let original = Bytes::from_static(b"hello world");
596        let (out, manifest) = streaming_passthrough(make_blob(original.clone()))
597            .await
598            .unwrap();
599        assert_eq!(out, original);
600        assert_eq!(manifest.codec, CodecKind::Passthrough);
601        assert_eq!(manifest.original_size, original.len() as u64);
602        assert_eq!(manifest.compressed_size, original.len() as u64);
603        assert_eq!(manifest.crc32c, crc32c::crc32c(&original));
604    }
605}