http_encoding/
coder.rs

1use core::{
2    pin::Pin,
3    task::{ready, Context, Poll},
4};
5
6use std::io;
7
8use bytes::Bytes;
9use futures_core::stream::Stream;
10use pin_project_lite::pin_project;
11
12use super::error::CoderError;
13
14pin_project! {
15    /// A coder type that can be used for either encode or decode which determined by De type.
16    #[derive(Default)]
17    pub struct Coder<S, C = FeaturedCode>{
18        #[pin]
19        body: S,
20        coder: C,
21    }
22}
23
24impl<S, C, T, E> Coder<S, C>
25where
26    S: Stream<Item = Result<T, E>>,
27    C: Code<T>,
28    T: AsRef<[u8]>,
29{
30    /// Construct a new coder.
31    #[inline]
32    pub const fn new(body: S, coder: C) -> Self {
33        Self { body, coder }
34    }
35
36    #[inline]
37    pub fn into_inner(self) -> S {
38        self.body
39    }
40}
41
42impl<S, C, T, E> Stream for Coder<S, C>
43where
44    S: Stream<Item = Result<T, E>>,
45    CoderError: From<E>,
46    C: Code<T>,
47{
48    type Item = Result<C::Item, CoderError>;
49
50    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
51        let mut this = self.project();
52
53        while let Some(res) = ready!(this.body.as_mut().poll_next(cx)) {
54            let item = res?;
55            if let Some(item) = this.coder.code(item)? {
56                return Poll::Ready(Some(Ok(item)));
57            }
58        }
59
60        match this.coder.code_eof()? {
61            Some(res) => Poll::Ready(Some(Ok(res))),
62            None => Poll::Ready(None),
63        }
64    }
65
66    #[inline]
67    fn size_hint(&self) -> (usize, Option<usize>) {
68        // forward size_hint to coder as it determines the data length after (de)compress.
69        self.coder.size_hint(&self.body)
70    }
71}
72
73pub trait Code<T>: Sized {
74    type Item;
75
76    fn code(&mut self, item: T) -> io::Result<Option<Self::Item>>;
77
78    fn code_eof(&mut self) -> io::Result<Option<Self::Item>>;
79
80    /// A helper method for overriding associated input stream's size_hint.
81    /// by default it returns value the same as [Stream::size_hint]'s default value.
82    /// in other word the default prediction is (de)compress can not hint an exact size.
83    #[allow(unused_variables)]
84    #[inline]
85    fn size_hint(&self, stream: &impl Stream) -> (usize, Option<usize>) {
86        (0, None)
87    }
88}
89
90/// coder serve as pass through that just forward items.
91pub struct NoOpCode;
92
93impl<T> Code<T> for NoOpCode
94where
95    T: AsRef<[u8]> + 'static,
96{
97    type Item = Bytes;
98
99    fn code(&mut self, item: T) -> io::Result<Option<Self::Item>> {
100        Ok(Some(
101            try_downcast_to_bytes(item).unwrap_or_else(|item| Bytes::copy_from_slice(item.as_ref())),
102        ))
103    }
104
105    #[inline]
106    fn code_eof(&mut self) -> io::Result<Option<Self::Item>> {
107        Ok(None)
108    }
109
110    // noop coder can take advantage of not doing any de/encoding work and hint the output stream
111    // size. this would help downstream to infer the size of body and avoid going through
112    // transfer-encoding: chunked when possible.
113    #[inline]
114    fn size_hint(&self, stream: &impl Stream) -> (usize, Option<usize>) {
115        stream.size_hint()
116    }
117}
118
119pub enum FeaturedCode {
120    NoOp(NoOpCode),
121    #[cfg(feature = "br")]
122    DecodeBr(super::brotli::Decoder),
123    #[cfg(feature = "br")]
124    EncodeBr(super::brotli::Encoder),
125    #[cfg(feature = "gz")]
126    DecodeGz(super::gzip::Decoder),
127    #[cfg(feature = "gz")]
128    EncodeGz(super::gzip::Encoder),
129    #[cfg(feature = "de")]
130    DecodeDe(super::deflate::Decoder),
131    #[cfg(feature = "de")]
132    EncodeDe(super::deflate::Encoder),
133}
134
135impl Default for FeaturedCode {
136    fn default() -> Self {
137        Self::NoOp(NoOpCode)
138    }
139}
140
141impl<T> Code<T> for FeaturedCode
142where
143    T: AsRef<[u8]> + 'static,
144{
145    type Item = Bytes;
146
147    fn code(&mut self, item: T) -> io::Result<Option<Self::Item>> {
148        match self {
149            Self::NoOp(ref mut coder) => coder.code(item),
150            #[cfg(feature = "br")]
151            Self::DecodeBr(ref mut coder) => coder.code(item),
152            #[cfg(feature = "br")]
153            Self::EncodeBr(ref mut coder) => coder.code(item),
154            #[cfg(feature = "gz")]
155            Self::DecodeGz(ref mut coder) => coder.code(item),
156            #[cfg(feature = "gz")]
157            Self::EncodeGz(ref mut coder) => coder.code(item),
158            #[cfg(feature = "de")]
159            Self::DecodeDe(ref mut coder) => coder.code(item),
160            #[cfg(feature = "de")]
161            Self::EncodeDe(ref mut coder) => coder.code(item),
162        }
163    }
164
165    fn code_eof(&mut self) -> io::Result<Option<Self::Item>> {
166        match self {
167            Self::NoOp(ref mut coder) => <NoOpCode as Code<T>>::code_eof(coder),
168            #[cfg(feature = "br")]
169            Self::DecodeBr(ref mut coder) => <super::brotli::Decoder as Code<T>>::code_eof(coder),
170            #[cfg(feature = "br")]
171            Self::EncodeBr(ref mut coder) => <super::brotli::Encoder as Code<T>>::code_eof(coder),
172            #[cfg(feature = "gz")]
173            Self::DecodeGz(ref mut coder) => <super::gzip::Decoder as Code<T>>::code_eof(coder),
174            #[cfg(feature = "gz")]
175            Self::EncodeGz(ref mut coder) => <super::gzip::Encoder as Code<T>>::code_eof(coder),
176            #[cfg(feature = "de")]
177            Self::DecodeDe(ref mut coder) => <super::deflate::Decoder as Code<T>>::code_eof(coder),
178            #[cfg(feature = "de")]
179            Self::EncodeDe(ref mut coder) => <super::deflate::Encoder as Code<T>>::code_eof(coder),
180        }
181    }
182
183    fn size_hint(&self, stream: &impl Stream) -> (usize, Option<usize>) {
184        match self {
185            Self::NoOp(ref coder) => <NoOpCode as Code<T>>::size_hint(coder, stream),
186            #[cfg(feature = "br")]
187            Self::DecodeBr(ref coder) => <super::brotli::Decoder as Code<T>>::size_hint(coder, stream),
188            #[cfg(feature = "br")]
189            Self::EncodeBr(ref coder) => <super::brotli::Encoder as Code<T>>::size_hint(coder, stream),
190            #[cfg(feature = "gz")]
191            Self::DecodeGz(ref coder) => <super::gzip::Decoder as Code<T>>::size_hint(coder, stream),
192            #[cfg(feature = "gz")]
193            Self::EncodeGz(ref coder) => <super::gzip::Encoder as Code<T>>::size_hint(coder, stream),
194            #[cfg(feature = "de")]
195            Self::DecodeDe(ref coder) => <super::deflate::Decoder as Code<T>>::size_hint(coder, stream),
196            #[cfg(feature = "de")]
197            Self::EncodeDe(ref coder) => <super::deflate::Encoder as Code<T>>::size_hint(coder, stream),
198        }
199    }
200}
201
202#[cfg(any(feature = "gz", feature = "de"))]
203macro_rules! code_impl {
204    ($coder: ident) => {
205        impl<T> crate::Code<T> for $coder<crate::writer::BytesMutWriter>
206        where
207            T: AsRef<[u8]>,
208        {
209            type Item = ::bytes::Bytes;
210
211            fn code(&mut self, item: T) -> ::std::io::Result<Option<Self::Item>> {
212                use ::std::io::Write;
213
214                self.write_all(item.as_ref())?;
215                let b = self.get_mut().take();
216                if !b.is_empty() {
217                    Ok(Some(b))
218                } else {
219                    Ok(None)
220                }
221            }
222
223            fn code_eof(&mut self) -> ::std::io::Result<Option<Self::Item>> {
224                self.try_finish()?;
225                let b = self.get_mut().take();
226                if !b.is_empty() {
227                    Ok(Some(b))
228                } else {
229                    Ok(None)
230                }
231            }
232        }
233    };
234}
235
236fn try_downcast_to_bytes<T: 'static>(item: T) -> Result<Bytes, T> {
237    use core::any::Any;
238
239    let item = &mut Some(item);
240    match (item as &mut dyn Any).downcast_mut::<Option<Bytes>>() {
241        Some(bytes) => Ok(bytes.take().unwrap()),
242        None => Err(item.take().unwrap()),
243    }
244}
245
246#[cfg(test)]
247mod test {
248    use super::*;
249
250    #[test]
251    fn downcast_bytes() {
252        let bytes = Bytes::new();
253        assert!(try_downcast_to_bytes(bytes).is_ok());
254        let bytes = Vec::<u8>::new();
255        assert!(try_downcast_to_bytes(bytes).is_err());
256    }
257}