actori_http/encoding/
decoder.rs1use std::future::Future;
2use std::io::{self, Write};
3use std::pin::Pin;
4use std::task::{Context, Poll};
5
6use actori_threadpool::{run, CpuFuture};
7use brotli2::write::BrotliDecoder;
8use bytes::Bytes;
9use flate2::write::{GzDecoder, ZlibDecoder};
10use futures_core::{ready, Stream};
11
12use super::Writer;
13use crate::error::PayloadError;
14use crate::http::header::{ContentEncoding, HeaderMap, CONTENT_ENCODING};
15
16const INPLACE: usize = 2049;
17
18pub struct Decoder<S> {
19 decoder: Option<ContentDecoder>,
20 stream: S,
21 eof: bool,
22 fut: Option<CpuFuture<(Option<Bytes>, ContentDecoder), io::Error>>,
23}
24
25impl<S> Decoder<S>
26where
27 S: Stream<Item = Result<Bytes, PayloadError>>,
28{
29 #[inline]
31 pub fn new(stream: S, encoding: ContentEncoding) -> Decoder<S> {
32 let decoder = match encoding {
33 ContentEncoding::Br => Some(ContentDecoder::Br(Box::new(
34 BrotliDecoder::new(Writer::new()),
35 ))),
36 ContentEncoding::Deflate => Some(ContentDecoder::Deflate(Box::new(
37 ZlibDecoder::new(Writer::new()),
38 ))),
39 ContentEncoding::Gzip => Some(ContentDecoder::Gzip(Box::new(
40 GzDecoder::new(Writer::new()),
41 ))),
42 _ => None,
43 };
44 Decoder {
45 decoder,
46 stream,
47 fut: None,
48 eof: false,
49 }
50 }
51
52 #[inline]
54 pub fn from_headers(stream: S, headers: &HeaderMap) -> Decoder<S> {
55 let encoding = if let Some(enc) = headers.get(&CONTENT_ENCODING) {
57 if let Ok(enc) = enc.to_str() {
58 ContentEncoding::from(enc)
59 } else {
60 ContentEncoding::Identity
61 }
62 } else {
63 ContentEncoding::Identity
64 };
65
66 Self::new(stream, encoding)
67 }
68}
69
70impl<S> Stream for Decoder<S>
71where
72 S: Stream<Item = Result<Bytes, PayloadError>> + Unpin,
73{
74 type Item = Result<Bytes, PayloadError>;
75
76 fn poll_next(
77 mut self: Pin<&mut Self>,
78 cx: &mut Context<'_>,
79 ) -> Poll<Option<Self::Item>> {
80 loop {
81 if let Some(ref mut fut) = self.fut {
82 let (chunk, decoder) = match ready!(Pin::new(fut).poll(cx)) {
83 Ok(item) => item,
84 Err(e) => return Poll::Ready(Some(Err(e.into()))),
85 };
86 self.decoder = Some(decoder);
87 self.fut.take();
88 if let Some(chunk) = chunk {
89 return Poll::Ready(Some(Ok(chunk)));
90 }
91 }
92
93 if self.eof {
94 return Poll::Ready(None);
95 }
96
97 match Pin::new(&mut self.stream).poll_next(cx) {
98 Poll::Ready(Some(Err(err))) => return Poll::Ready(Some(Err(err))),
99 Poll::Ready(Some(Ok(chunk))) => {
100 if let Some(mut decoder) = self.decoder.take() {
101 if chunk.len() < INPLACE {
102 let chunk = decoder.feed_data(chunk)?;
103 self.decoder = Some(decoder);
104 if let Some(chunk) = chunk {
105 return Poll::Ready(Some(Ok(chunk)));
106 }
107 } else {
108 self.fut = Some(run(move || {
109 let chunk = decoder.feed_data(chunk)?;
110 Ok((chunk, decoder))
111 }));
112 }
113 continue;
114 } else {
115 return Poll::Ready(Some(Ok(chunk)));
116 }
117 }
118 Poll::Ready(None) => {
119 self.eof = true;
120 return if let Some(mut decoder) = self.decoder.take() {
121 match decoder.feed_eof() {
122 Ok(Some(res)) => Poll::Ready(Some(Ok(res))),
123 Ok(None) => Poll::Ready(None),
124 Err(err) => Poll::Ready(Some(Err(err.into()))),
125 }
126 } else {
127 Poll::Ready(None)
128 };
129 }
130 Poll::Pending => break,
131 }
132 }
133 Poll::Pending
134 }
135}
136
137enum ContentDecoder {
138 Deflate(Box<ZlibDecoder<Writer>>),
139 Gzip(Box<GzDecoder<Writer>>),
140 Br(Box<BrotliDecoder<Writer>>),
141}
142
143impl ContentDecoder {
144 fn feed_eof(&mut self) -> io::Result<Option<Bytes>> {
145 match self {
146 ContentDecoder::Br(ref mut decoder) => match decoder.flush() {
147 Ok(()) => {
148 let b = decoder.get_mut().take();
149 if !b.is_empty() {
150 Ok(Some(b))
151 } else {
152 Ok(None)
153 }
154 }
155 Err(e) => Err(e),
156 },
157 ContentDecoder::Gzip(ref mut decoder) => match decoder.try_finish() {
158 Ok(_) => {
159 let b = decoder.get_mut().take();
160 if !b.is_empty() {
161 Ok(Some(b))
162 } else {
163 Ok(None)
164 }
165 }
166 Err(e) => Err(e),
167 },
168 ContentDecoder::Deflate(ref mut decoder) => match decoder.try_finish() {
169 Ok(_) => {
170 let b = decoder.get_mut().take();
171 if !b.is_empty() {
172 Ok(Some(b))
173 } else {
174 Ok(None)
175 }
176 }
177 Err(e) => Err(e),
178 },
179 }
180 }
181
182 fn feed_data(&mut self, data: Bytes) -> io::Result<Option<Bytes>> {
183 match self {
184 ContentDecoder::Br(ref mut decoder) => match decoder.write_all(&data) {
185 Ok(_) => {
186 decoder.flush()?;
187 let b = decoder.get_mut().take();
188 if !b.is_empty() {
189 Ok(Some(b))
190 } else {
191 Ok(None)
192 }
193 }
194 Err(e) => Err(e),
195 },
196 ContentDecoder::Gzip(ref mut decoder) => match decoder.write_all(&data) {
197 Ok(_) => {
198 decoder.flush()?;
199 let b = decoder.get_mut().take();
200 if !b.is_empty() {
201 Ok(Some(b))
202 } else {
203 Ok(None)
204 }
205 }
206 Err(e) => Err(e),
207 },
208 ContentDecoder::Deflate(ref mut decoder) => match decoder.write_all(&data) {
209 Ok(_) => {
210 decoder.flush()?;
211 let b = decoder.get_mut().take();
212 if !b.is_empty() {
213 Ok(Some(b))
214 } else {
215 Ok(None)
216 }
217 }
218 Err(e) => Err(e),
219 },
220 }
221 }
222}