actori_http/encoding/
encoder.rs

1//! Stream encoder
2use std::future::Future;
3use std::io::{self, Write};
4use std::pin::Pin;
5use std::task::{Context, Poll};
6
7use actori_threadpool::{run, CpuFuture};
8use brotli2::write::BrotliEncoder;
9use bytes::Bytes;
10use flate2::write::{GzEncoder, ZlibEncoder};
11use futures_core::ready;
12
13use crate::body::{Body, BodySize, MessageBody, ResponseBody};
14use crate::http::header::{ContentEncoding, CONTENT_ENCODING};
15use crate::http::{HeaderValue, StatusCode};
16use crate::{Error, ResponseHead};
17
18use super::Writer;
19
20const INPLACE: usize = 1024;
21
22pub struct Encoder<B> {
23    eof: bool,
24    body: EncoderBody<B>,
25    encoder: Option<ContentEncoder>,
26    fut: Option<CpuFuture<ContentEncoder, io::Error>>,
27}
28
29impl<B: MessageBody> Encoder<B> {
30    pub fn response(
31        encoding: ContentEncoding,
32        head: &mut ResponseHead,
33        body: ResponseBody<B>,
34    ) -> ResponseBody<Encoder<B>> {
35        let can_encode = !(head.headers().contains_key(&CONTENT_ENCODING)
36            || head.status == StatusCode::SWITCHING_PROTOCOLS
37            || head.status == StatusCode::NO_CONTENT
38            || encoding == ContentEncoding::Identity
39            || encoding == ContentEncoding::Auto);
40
41        let body = match body {
42            ResponseBody::Other(b) => match b {
43                Body::None => return ResponseBody::Other(Body::None),
44                Body::Empty => return ResponseBody::Other(Body::Empty),
45                Body::Bytes(buf) => {
46                    if can_encode {
47                        EncoderBody::Bytes(buf)
48                    } else {
49                        return ResponseBody::Other(Body::Bytes(buf));
50                    }
51                }
52                Body::Message(stream) => EncoderBody::BoxedStream(stream),
53            },
54            ResponseBody::Body(stream) => EncoderBody::Stream(stream),
55        };
56
57        if can_encode {
58            // Modify response body only if encoder is not None
59            if let Some(enc) = ContentEncoder::encoder(encoding) {
60                update_head(encoding, head);
61                head.no_chunking(false);
62                return ResponseBody::Body(Encoder {
63                    body,
64                    eof: false,
65                    fut: None,
66                    encoder: Some(enc),
67                });
68            }
69        }
70        ResponseBody::Body(Encoder {
71            body,
72            eof: false,
73            fut: None,
74            encoder: None,
75        })
76    }
77}
78
79enum EncoderBody<B> {
80    Bytes(Bytes),
81    Stream(B),
82    BoxedStream(Box<dyn MessageBody>),
83}
84
85impl<B: MessageBody> MessageBody for Encoder<B> {
86    fn size(&self) -> BodySize {
87        if self.encoder.is_none() {
88            match self.body {
89                EncoderBody::Bytes(ref b) => b.size(),
90                EncoderBody::Stream(ref b) => b.size(),
91                EncoderBody::BoxedStream(ref b) => b.size(),
92            }
93        } else {
94            BodySize::Stream
95        }
96    }
97
98    fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes, Error>>> {
99        loop {
100            if self.eof {
101                return Poll::Ready(None);
102            }
103
104            if let Some(ref mut fut) = self.fut {
105                let mut encoder = match ready!(Pin::new(fut).poll(cx)) {
106                    Ok(item) => item,
107                    Err(e) => return Poll::Ready(Some(Err(e.into()))),
108                };
109                let chunk = encoder.take();
110                self.encoder = Some(encoder);
111                self.fut.take();
112                if !chunk.is_empty() {
113                    return Poll::Ready(Some(Ok(chunk)));
114                }
115            }
116
117            let result = match self.body {
118                EncoderBody::Bytes(ref mut b) => {
119                    if b.is_empty() {
120                        Poll::Ready(None)
121                    } else {
122                        Poll::Ready(Some(Ok(std::mem::replace(b, Bytes::new()))))
123                    }
124                }
125                EncoderBody::Stream(ref mut b) => b.poll_next(cx),
126                EncoderBody::BoxedStream(ref mut b) => b.poll_next(cx),
127            };
128            match result {
129                Poll::Ready(Some(Ok(chunk))) => {
130                    if let Some(mut encoder) = self.encoder.take() {
131                        if chunk.len() < INPLACE {
132                            encoder.write(&chunk)?;
133                            let chunk = encoder.take();
134                            self.encoder = Some(encoder);
135                            if !chunk.is_empty() {
136                                return Poll::Ready(Some(Ok(chunk)));
137                            }
138                        } else {
139                            self.fut = Some(run(move || {
140                                encoder.write(&chunk)?;
141                                Ok(encoder)
142                            }));
143                        }
144                    } else {
145                        return Poll::Ready(Some(Ok(chunk)));
146                    }
147                }
148                Poll::Ready(None) => {
149                    if let Some(encoder) = self.encoder.take() {
150                        let chunk = encoder.finish()?;
151                        if chunk.is_empty() {
152                            return Poll::Ready(None);
153                        } else {
154                            self.eof = true;
155                            return Poll::Ready(Some(Ok(chunk)));
156                        }
157                    } else {
158                        return Poll::Ready(None);
159                    }
160                }
161                val => return val,
162            }
163        }
164    }
165}
166
167fn update_head(encoding: ContentEncoding, head: &mut ResponseHead) {
168    head.headers_mut().insert(
169        CONTENT_ENCODING,
170        HeaderValue::from_static(encoding.as_str()),
171    );
172}
173
174enum ContentEncoder {
175    Deflate(ZlibEncoder<Writer>),
176    Gzip(GzEncoder<Writer>),
177    Br(BrotliEncoder<Writer>),
178}
179
180impl ContentEncoder {
181    fn encoder(encoding: ContentEncoding) -> Option<Self> {
182        match encoding {
183            ContentEncoding::Deflate => Some(ContentEncoder::Deflate(ZlibEncoder::new(
184                Writer::new(),
185                flate2::Compression::fast(),
186            ))),
187            ContentEncoding::Gzip => Some(ContentEncoder::Gzip(GzEncoder::new(
188                Writer::new(),
189                flate2::Compression::fast(),
190            ))),
191            ContentEncoding::Br => {
192                Some(ContentEncoder::Br(BrotliEncoder::new(Writer::new(), 3)))
193            }
194            _ => None,
195        }
196    }
197
198    #[inline]
199    pub(crate) fn take(&mut self) -> Bytes {
200        match *self {
201            ContentEncoder::Br(ref mut encoder) => encoder.get_mut().take(),
202            ContentEncoder::Deflate(ref mut encoder) => encoder.get_mut().take(),
203            ContentEncoder::Gzip(ref mut encoder) => encoder.get_mut().take(),
204        }
205    }
206
207    fn finish(self) -> Result<Bytes, io::Error> {
208        match self {
209            ContentEncoder::Br(encoder) => match encoder.finish() {
210                Ok(writer) => Ok(writer.buf.freeze()),
211                Err(err) => Err(err),
212            },
213            ContentEncoder::Gzip(encoder) => match encoder.finish() {
214                Ok(writer) => Ok(writer.buf.freeze()),
215                Err(err) => Err(err),
216            },
217            ContentEncoder::Deflate(encoder) => match encoder.finish() {
218                Ok(writer) => Ok(writer.buf.freeze()),
219                Err(err) => Err(err),
220            },
221        }
222    }
223
224    fn write(&mut self, data: &[u8]) -> Result<(), io::Error> {
225        match *self {
226            ContentEncoder::Br(ref mut encoder) => match encoder.write_all(data) {
227                Ok(_) => Ok(()),
228                Err(err) => {
229                    trace!("Error decoding br encoding: {}", err);
230                    Err(err)
231                }
232            },
233            ContentEncoder::Gzip(ref mut encoder) => match encoder.write_all(data) {
234                Ok(_) => Ok(()),
235                Err(err) => {
236                    trace!("Error decoding gzip encoding: {}", err);
237                    Err(err)
238                }
239            },
240            ContentEncoder::Deflate(ref mut encoder) => match encoder.write_all(data) {
241                Ok(_) => Ok(()),
242                Err(err) => {
243                    trace!("Error decoding deflate encoding: {}", err);
244                    Err(err)
245                }
246            },
247        }
248    }
249}