tako/plugins/compression/
deflate_stream.rs1use std::{
26 io::Write,
27 pin::Pin,
28 task::{Context, Poll},
29};
30
31use anyhow::Result;
32use bytes::Bytes;
33use flate2::{Compression, write::DeflateEncoder};
34use futures_util::{Stream, TryStreamExt};
35use http_body::{Body, Frame};
36use http_body_util::BodyExt;
37use pin_project_lite::pin_project;
38
39use crate::{body::TakoBody, types::BoxError};
40
41pub fn stream_deflate<B>(body: B, level: u32) -> TakoBody
43where
44 B: Body<Data = Bytes, Error = BoxError> + Send + 'static,
45{
46 let upstream = body.into_data_stream();
47 let deflate = DeflateStream::new(upstream, level).map_ok(Frame::data);
48 TakoBody::from_try_stream(deflate)
49}
50
51pin_project! {
52 pub struct DeflateStream<S> {
54 #[pin] inner: S,
55 encoder: DeflateEncoder<Vec<u8>>,
56 pos: usize,
57 done: bool,
58 }
59}
60
61impl<S> DeflateStream<S> {
62 pub fn new(inner: S, level: u32) -> Self {
64 Self {
65 inner,
66 encoder: DeflateEncoder::new(Vec::new(), Compression::new(level)),
67 pos: 0,
68 done: false,
69 }
70 }
71}
72
73impl<S> Stream for DeflateStream<S>
74where
75 S: Stream<Item = Result<Bytes, BoxError>>,
76{
77 type Item = Result<Bytes, BoxError>;
78
79 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
81 let mut this = self.project();
82
83 loop {
84 if *this.pos < this.encoder.get_ref().len() {
86 let buf = &this.encoder.get_ref()[*this.pos..];
87 *this.pos = this.encoder.get_ref().len();
88 return Poll::Ready(Some(Ok(Bytes::copy_from_slice(buf))));
89 }
90
91 if *this.done {
93 return Poll::Ready(None);
94 }
95
96 match this.inner.as_mut().poll_next(cx) {
98 Poll::Ready(Some(Ok(chunk))) => {
99 if let Err(e) = this
101 .encoder
102 .write_all(&chunk)
103 .and_then(|_| this.encoder.flush())
104 {
105 return Poll::Ready(Some(Err(e.into())));
106 }
107 continue;
108 }
109 Poll::Ready(Some(Err(e))) => {
110 return Poll::Ready(Some(Err(e)));
112 }
113 Poll::Ready(None) => {
114 *this.done = true;
116 if let Err(e) = this.encoder.try_finish() {
117 return Poll::Ready(Some(Err(e.into())));
118 }
119 continue;
120 }
121 Poll::Pending => {
122 return Poll::Pending;
124 }
125 }
126 }
127 }
128}