tako/plugins/compression/
zstd_stream.rs

1#![cfg(feature = "zstd")]
2
3//! Zstandard compression streaming utilities for high-performance HTTP response compression.
4//!
5//! This module provides streaming Zstandard (zstd) compression for HTTP response bodies,
6//! offering excellent compression ratios with fast decompression speeds. Zstandard is
7//! particularly well-suited for modern web applications that require both high compression
8//! efficiency and low latency. The streaming implementation enables memory-efficient
9//! compression of large responses without buffering entire content in memory.
10//!
11//! # Examples
12//!
13//! ```rust
14//! # #[cfg(feature = "zstd")]
15//! use tako::plugins::compression::zstd_stream::stream_zstd;
16//! # #[cfg(feature = "zstd")]
17//! use http_body_util::Full;
18//! # #[cfg(feature = "zstd")]
19//! use bytes::Bytes;
20//!
21//! # #[cfg(feature = "zstd")]
22//! # fn example() {
23//! // Compress a response body with Zstandard level 3
24//! let body = Full::from(Bytes::from("Hello, World! This is test content."));
25//! let compressed = stream_zstd(body, 3);
26//!
27//! // High compression for static assets
28//! let static_content = Full::from(Bytes::from("Large static file content..."));
29//! let high_compressed = stream_zstd(static_content, 19);
30//! # }
31//! ```
32
33use std::{
34  io::Write,
35  pin::Pin,
36  task::{Context, Poll},
37};
38
39use bytes::Bytes;
40use futures_util::{Stream, TryStreamExt};
41use http_body::{Body, Frame};
42use http_body_util::BodyExt;
43use pin_project_lite::pin_project;
44use zstd::stream::Encoder;
45
46use crate::{body::TakoBody, types::BoxError};
47
48/// Compresses an HTTP body stream using Zstandard compression algorithm.
49pub fn stream_zstd<B>(body: B, level: i32) -> TakoBody
50where
51  B: Body<Data = Bytes, Error = BoxError> + Send + 'static,
52{
53  let upstream = body.into_data_stream();
54  let zstd_stream = ZstdStream::new(upstream, level).map_ok(Frame::data);
55  TakoBody::from_try_stream(zstd_stream)
56}
57
58pin_project! {
59    /// Streaming Zstandard compressor that wraps an inner data stream.
60    pub struct ZstdStream<S> {
61        #[pin] inner: S,
62        encoder: Option<Encoder<'static, Vec<u8>>>,
63        buffer: Vec<u8>,
64        pos: usize,
65        done: bool,
66    }
67}
68
69impl<S> ZstdStream<S> {
70  /// Creates a new Zstandard compression stream with the specified compression level.
71  fn new(stream: S, level: i32) -> Self {
72    Self {
73      inner: stream,
74      encoder: Some(Encoder::new(Vec::new(), level).expect("zstd encoder")),
75      buffer: Vec::new(),
76      pos: 0,
77      done: false,
78    }
79  }
80}
81
82impl<S> Stream for ZstdStream<S>
83where
84  S: Stream<Item = Result<Bytes, BoxError>>,
85{
86  type Item = Result<Bytes, BoxError>;
87
88  /// Polls the stream for the next compressed data chunk.
89  fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
90    let mut this = self.project();
91
92    loop {
93      // 1) Drain the buffer first, if there is unread output.
94      if *this.pos < this.buffer.len() {
95        let chunk = &this.buffer[*this.pos..];
96        *this.pos = this.buffer.len();
97        return Poll::Ready(Some(Ok(Bytes::copy_from_slice(chunk))));
98      }
99      // 2) If we are done and the encoder is already consumed,
100      //    the stream is finished.
101      if *this.done && this.encoder.is_none() {
102        return Poll::Ready(None);
103      }
104      // 3) Poll the inner stream for more input data.
105      match this.inner.as_mut().poll_next(cx) {
106        // — New chunk arrived: compress it, then loop to drain.
107        Poll::Ready(Some(Ok(data))) => {
108          if let Some(enc) = this.encoder.as_mut() {
109            if let Err(e) = enc.write_all(&data).and_then(|_| enc.flush()) {
110              return Poll::Ready(Some(Err(e.into())));
111            }
112            // Copy freshly compressed bytes into our buffer.
113            let out = enc.get_ref();
114            if !out.is_empty() {
115              this.buffer.clear();
116              this.buffer.extend_from_slice(out);
117              *this.pos = 0;
118            }
119          }
120          continue; // go back to step 1
121        }
122        // — Propagate an error from the inner stream.
123        Poll::Ready(Some(Err(e))) => {
124          return Poll::Ready(Some(Err(e)));
125        }
126        // — Inner stream ended: finalise the encoder,
127        //   then loop to emit the remaining bytes.
128        Poll::Ready(None) => {
129          *this.done = true;
130          if let Some(enc) = this.encoder.take() {
131            match enc.finish() {
132              Ok(mut vec) => {
133                this.buffer.clear();
134                this.buffer.append(&mut vec);
135                *this.pos = 0;
136                continue; // step 1 will send the tail bytes
137              }
138              Err(e) => {
139                return Poll::Ready(Some(Err(e.into())));
140              }
141            }
142          } else {
143            return Poll::Ready(None);
144          }
145        }
146        // — No new input and nothing buffered.
147        Poll::Pending => {
148          return Poll::Pending;
149        }
150      }
151    }
152  }
153}