actori_http/encoding/
decoder.rs

1use std::future::Future;
2use std::io::{self, Write};
3use std::pin::Pin;
4use std::task::{Context, Poll};
5
6use actori_threadpool::{run, CpuFuture};
7use brotli2::write::BrotliDecoder;
8use bytes::Bytes;
9use flate2::write::{GzDecoder, ZlibDecoder};
10use futures_core::{ready, Stream};
11
12use super::Writer;
13use crate::error::PayloadError;
14use crate::http::header::{ContentEncoding, HeaderMap, CONTENT_ENCODING};
15
16const INPLACE: usize = 2049;
17
18pub struct Decoder<S> {
19    decoder: Option<ContentDecoder>,
20    stream: S,
21    eof: bool,
22    fut: Option<CpuFuture<(Option<Bytes>, ContentDecoder), io::Error>>,
23}
24
25impl<S> Decoder<S>
26where
27    S: Stream<Item = Result<Bytes, PayloadError>>,
28{
29    /// Construct a decoder.
30    #[inline]
31    pub fn new(stream: S, encoding: ContentEncoding) -> Decoder<S> {
32        let decoder = match encoding {
33            ContentEncoding::Br => Some(ContentDecoder::Br(Box::new(
34                BrotliDecoder::new(Writer::new()),
35            ))),
36            ContentEncoding::Deflate => Some(ContentDecoder::Deflate(Box::new(
37                ZlibDecoder::new(Writer::new()),
38            ))),
39            ContentEncoding::Gzip => Some(ContentDecoder::Gzip(Box::new(
40                GzDecoder::new(Writer::new()),
41            ))),
42            _ => None,
43        };
44        Decoder {
45            decoder,
46            stream,
47            fut: None,
48            eof: false,
49        }
50    }
51
52    /// Construct decoder based on headers.
53    #[inline]
54    pub fn from_headers(stream: S, headers: &HeaderMap) -> Decoder<S> {
55        // check content-encoding
56        let encoding = if let Some(enc) = headers.get(&CONTENT_ENCODING) {
57            if let Ok(enc) = enc.to_str() {
58                ContentEncoding::from(enc)
59            } else {
60                ContentEncoding::Identity
61            }
62        } else {
63            ContentEncoding::Identity
64        };
65
66        Self::new(stream, encoding)
67    }
68}
69
70impl<S> Stream for Decoder<S>
71where
72    S: Stream<Item = Result<Bytes, PayloadError>> + Unpin,
73{
74    type Item = Result<Bytes, PayloadError>;
75
76    fn poll_next(
77        mut self: Pin<&mut Self>,
78        cx: &mut Context<'_>,
79    ) -> Poll<Option<Self::Item>> {
80        loop {
81            if let Some(ref mut fut) = self.fut {
82                let (chunk, decoder) = match ready!(Pin::new(fut).poll(cx)) {
83                    Ok(item) => item,
84                    Err(e) => return Poll::Ready(Some(Err(e.into()))),
85                };
86                self.decoder = Some(decoder);
87                self.fut.take();
88                if let Some(chunk) = chunk {
89                    return Poll::Ready(Some(Ok(chunk)));
90                }
91            }
92
93            if self.eof {
94                return Poll::Ready(None);
95            }
96
97            match Pin::new(&mut self.stream).poll_next(cx) {
98                Poll::Ready(Some(Err(err))) => return Poll::Ready(Some(Err(err))),
99                Poll::Ready(Some(Ok(chunk))) => {
100                    if let Some(mut decoder) = self.decoder.take() {
101                        if chunk.len() < INPLACE {
102                            let chunk = decoder.feed_data(chunk)?;
103                            self.decoder = Some(decoder);
104                            if let Some(chunk) = chunk {
105                                return Poll::Ready(Some(Ok(chunk)));
106                            }
107                        } else {
108                            self.fut = Some(run(move || {
109                                let chunk = decoder.feed_data(chunk)?;
110                                Ok((chunk, decoder))
111                            }));
112                        }
113                        continue;
114                    } else {
115                        return Poll::Ready(Some(Ok(chunk)));
116                    }
117                }
118                Poll::Ready(None) => {
119                    self.eof = true;
120                    return if let Some(mut decoder) = self.decoder.take() {
121                        match decoder.feed_eof() {
122                            Ok(Some(res)) => Poll::Ready(Some(Ok(res))),
123                            Ok(None) => Poll::Ready(None),
124                            Err(err) => Poll::Ready(Some(Err(err.into()))),
125                        }
126                    } else {
127                        Poll::Ready(None)
128                    };
129                }
130                Poll::Pending => break,
131            }
132        }
133        Poll::Pending
134    }
135}
136
137enum ContentDecoder {
138    Deflate(Box<ZlibDecoder<Writer>>),
139    Gzip(Box<GzDecoder<Writer>>),
140    Br(Box<BrotliDecoder<Writer>>),
141}
142
143impl ContentDecoder {
144    fn feed_eof(&mut self) -> io::Result<Option<Bytes>> {
145        match self {
146            ContentDecoder::Br(ref mut decoder) => match decoder.flush() {
147                Ok(()) => {
148                    let b = decoder.get_mut().take();
149                    if !b.is_empty() {
150                        Ok(Some(b))
151                    } else {
152                        Ok(None)
153                    }
154                }
155                Err(e) => Err(e),
156            },
157            ContentDecoder::Gzip(ref mut decoder) => match decoder.try_finish() {
158                Ok(_) => {
159                    let b = decoder.get_mut().take();
160                    if !b.is_empty() {
161                        Ok(Some(b))
162                    } else {
163                        Ok(None)
164                    }
165                }
166                Err(e) => Err(e),
167            },
168            ContentDecoder::Deflate(ref mut decoder) => match decoder.try_finish() {
169                Ok(_) => {
170                    let b = decoder.get_mut().take();
171                    if !b.is_empty() {
172                        Ok(Some(b))
173                    } else {
174                        Ok(None)
175                    }
176                }
177                Err(e) => Err(e),
178            },
179        }
180    }
181
182    fn feed_data(&mut self, data: Bytes) -> io::Result<Option<Bytes>> {
183        match self {
184            ContentDecoder::Br(ref mut decoder) => match decoder.write_all(&data) {
185                Ok(_) => {
186                    decoder.flush()?;
187                    let b = decoder.get_mut().take();
188                    if !b.is_empty() {
189                        Ok(Some(b))
190                    } else {
191                        Ok(None)
192                    }
193                }
194                Err(e) => Err(e),
195            },
196            ContentDecoder::Gzip(ref mut decoder) => match decoder.write_all(&data) {
197                Ok(_) => {
198                    decoder.flush()?;
199                    let b = decoder.get_mut().take();
200                    if !b.is_empty() {
201                        Ok(Some(b))
202                    } else {
203                        Ok(None)
204                    }
205                }
206                Err(e) => Err(e),
207            },
208            ContentDecoder::Deflate(ref mut decoder) => match decoder.write_all(&data) {
209                Ok(_) => {
210                    decoder.flush()?;
211                    let b = decoder.get_mut().take();
212                    if !b.is_empty() {
213                        Ok(Some(b))
214                    } else {
215                        Ok(None)
216                    }
217                }
218                Err(e) => Err(e),
219            },
220        }
221    }
222}