tower_web/util/buf_stream/
deflate.rs

1//! TODO: Dox
2
3use super::{BufStream, SizeHint};
4
5use bytes::{BytesMut, Bytes, Buf, BufMut};
6use flate2::{Compress, Compression, FlushCompress, Status};
7use futures::Poll;
8use futures::Async::*;
9
10use std::cmp;
11use std::io;
12
13/// Compress a buf stream using zlib deflate.
14#[derive(Debug)]
15pub struct CompressStream<T> {
16    // The inner BufStream
17    inner: T,
18
19    // `true` when the inner buffer returned `None`
20    inner_eof: bool,
21
22    // `true` when the deflated stream is complete
23    eof: bool,
24
25    // Buffers input
26    src_buf: BytesMut,
27
28    // Buffers output
29    dst_buf: BytesMut,
30
31    // Compression
32    compress: Compress,
33}
34
35/// Errors returned by `CompressStream`.
36#[derive(Debug)]
37pub struct Error<T> {
38    /// `None` represents a deflate error
39    inner: Option<T>,
40}
41
42const MIN_BUF: usize = 1024;
43
44impl<T> CompressStream<T>
45where T: BufStream,
46{
47    /// Create a new `CompressStream` which returns the compressed data from
48    /// `inner`.
49    ///
50    /// `level` specifies the compression level.
51    pub fn new(inner: T, level: Compression) -> CompressStream<T> {
52        CompressStream {
53            inner,
54            inner_eof: false,
55            eof: false,
56            src_buf: BytesMut::new(),
57            dst_buf: BytesMut::new(),
58            compress: Compress::new(level, false),
59        }
60    }
61}
62
63impl<T> BufStream for CompressStream<T>
64where T: BufStream,
65{
66    type Item = io::Cursor<Bytes>;
67    type Error = Error<T::Error>;
68
69    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
70        loop {
71            if self.eof {
72                return Ok(Ready(None));
73            }
74
75            // First, if needed, try filling the buffer
76            if !self.inner_eof {
77                let res = self.inner.poll()
78                    .map_err(|e| {
79                        Error { inner: Some(e) }
80                    });
81
82                match try_ready!(res) {
83                    Some(buf) => {
84                        self.src_buf.reserve(buf.remaining());
85                        self.src_buf.put(buf);
86                    }
87                    None => {
88                        self.inner_eof = true;
89                    }
90                }
91            }
92
93            let before_out = self.compress.total_out();
94            let before_in = self.compress.total_in();
95
96            let flush = if self.inner_eof {
97                FlushCompress::Finish
98            } else {
99                FlushCompress::None
100            };
101
102            // Ensure the destination buffer has capacity
103            let amt = cmp::max(self.src_buf.len() / 2, MIN_BUF);
104            self.dst_buf.reserve(amt);
105
106            let ret = unsafe {
107                self.compress.compress(
108                    &self.src_buf,
109                    self.dst_buf.bytes_mut(),
110                    flush)
111            };
112
113            let written = (self.compress.total_out() - before_out) as usize;
114            let consumed = (self.compress.total_in() - before_in) as usize;
115
116            unsafe { self.dst_buf.advance_mut(written); }
117            self.src_buf.split_to(consumed);
118
119            match ret {
120                // If we haven't ready any data and we haven't hit EOF yet,
121                // then we need to keep asking for more data because if we
122                // return that 0 bytes of data have been read then it will
123                // be interpreted as EOF.
124                Ok(Status::Ok) | Ok(Status::BufError) if written == 0 && !self.inner_eof => {
125                    continue
126                }
127                Ok(Status::Ok) | Ok(Status::BufError) => {
128                    break;
129                }
130                Ok(Status::StreamEnd) => {
131                    self.eof = true;
132                    break;
133                }
134                Err(..) => {
135                    return Err(Error { inner: None });
136                }
137            }
138        }
139
140        let buf = io::Cursor::new(self.dst_buf.take().freeze());
141        return Ok(Ready(Some(buf)));
142    }
143
144    fn size_hint(&self) -> SizeHint {
145        // TODO: How should this work?
146        self.inner.size_hint()
147    }
148}