tako/plugins/compression/zstd_stream.rs
1#![cfg(feature = "zstd")]
2
3//! Zstandard compression streaming utilities for high-performance HTTP response compression.
4//!
5//! This module provides streaming Zstandard (zstd) compression for HTTP response bodies,
6//! offering excellent compression ratios with fast decompression speeds. Zstandard is
7//! particularly well-suited for modern web applications that require both high compression
8//! efficiency and low latency. The streaming implementation enables memory-efficient
9//! compression of large responses without buffering entire content in memory.
10//!
11//! # Examples
12//!
13//! ```rust
14//! # #[cfg(feature = "zstd")]
15//! use tako::plugins::compression::zstd_stream::stream_zstd;
16//! # #[cfg(feature = "zstd")]
17//! use http_body_util::Full;
18//! # #[cfg(feature = "zstd")]
19//! use bytes::Bytes;
20//!
21//! # #[cfg(feature = "zstd")]
22//! # fn example() {
23//! // Compress a response body with Zstandard level 3
24//! let body = Full::from(Bytes::from("Hello, World! This is test content."));
25//! let compressed = stream_zstd(body, 3);
26//!
27//! // High compression for static assets
28//! let static_content = Full::from(Bytes::from("Large static file content..."));
29//! let high_compressed = stream_zstd(static_content, 19);
30//! # }
31//! ```
32
33use std::{
34 io::Write,
35 pin::Pin,
36 task::{Context, Poll},
37};
38
39use bytes::Bytes;
40use futures_util::{Stream, TryStreamExt};
41use http_body::{Body, Frame};
42use http_body_util::BodyExt;
43use pin_project_lite::pin_project;
44use zstd::stream::Encoder;
45
46use crate::{body::TakoBody, types::BoxError};
47
48/// Compresses an HTTP body stream using Zstandard compression algorithm.
49pub fn stream_zstd<B>(body: B, level: i32) -> TakoBody
50where
51 B: Body<Data = Bytes, Error = BoxError> + Send + 'static,
52{
53 let upstream = body.into_data_stream();
54 let zstd_stream = ZstdStream::new(upstream, level).map_ok(Frame::data);
55 TakoBody::from_try_stream(zstd_stream)
56}
57
58pin_project! {
59 /// Streaming Zstandard compressor that wraps an inner data stream.
60 pub struct ZstdStream<S> {
61 #[pin] inner: S,
62 encoder: Option<Encoder<'static, Vec<u8>>>,
63 buffer: Vec<u8>,
64 pos: usize,
65 done: bool,
66 }
67}
68
69impl<S> ZstdStream<S> {
70 /// Creates a new Zstandard compression stream with the specified compression level.
71 fn new(stream: S, level: i32) -> Self {
72 Self {
73 inner: stream,
74 encoder: Some(Encoder::new(Vec::new(), level).expect("zstd encoder")),
75 buffer: Vec::new(),
76 pos: 0,
77 done: false,
78 }
79 }
80}
81
82impl<S> Stream for ZstdStream<S>
83where
84 S: Stream<Item = Result<Bytes, BoxError>>,
85{
86 type Item = Result<Bytes, BoxError>;
87
88 /// Polls the stream for the next compressed data chunk.
89 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
90 let mut this = self.project();
91
92 loop {
93 // 1) Drain the buffer first, if there is unread output.
94 if *this.pos < this.buffer.len() {
95 let chunk = &this.buffer[*this.pos..];
96 *this.pos = this.buffer.len();
97 return Poll::Ready(Some(Ok(Bytes::copy_from_slice(chunk))));
98 }
99 // 2) If we are done and the encoder is already consumed,
100 // the stream is finished.
101 if *this.done && this.encoder.is_none() {
102 return Poll::Ready(None);
103 }
104 // 3) Poll the inner stream for more input data.
105 match this.inner.as_mut().poll_next(cx) {
106 // — New chunk arrived: compress it, then loop to drain.
107 Poll::Ready(Some(Ok(data))) => {
108 if let Some(enc) = this.encoder.as_mut() {
109 if let Err(e) = enc.write_all(&data).and_then(|_| enc.flush()) {
110 return Poll::Ready(Some(Err(e.into())));
111 }
112 // Copy freshly compressed bytes into our buffer.
113 let out = enc.get_ref();
114 if !out.is_empty() {
115 this.buffer.clear();
116 this.buffer.extend_from_slice(out);
117 *this.pos = 0;
118 }
119 }
120 continue; // go back to step 1
121 }
122 // — Propagate an error from the inner stream.
123 Poll::Ready(Some(Err(e))) => {
124 return Poll::Ready(Some(Err(e)));
125 }
126 // — Inner stream ended: finalise the encoder,
127 // then loop to emit the remaining bytes.
128 Poll::Ready(None) => {
129 *this.done = true;
130 if let Some(enc) = this.encoder.take() {
131 match enc.finish() {
132 Ok(mut vec) => {
133 this.buffer.clear();
134 this.buffer.append(&mut vec);
135 *this.pos = 0;
136 continue; // step 1 will send the tail bytes
137 }
138 Err(e) => {
139 return Poll::Ready(Some(Err(e.into())));
140 }
141 }
142 } else {
143 return Poll::Ready(None);
144 }
145 }
146 // — No new input and nothing buffered.
147 Poll::Pending => {
148 return Poll::Pending;
149 }
150 }
151 }
152 }
153}