tower_web/util/buf_stream/
deflate.rs1use super::{BufStream, SizeHint};
4
5use bytes::{BytesMut, Bytes, Buf, BufMut};
6use flate2::{Compress, Compression, FlushCompress, Status};
7use futures::Poll;
8use futures::Async::*;
9
10use std::cmp;
11use std::io;
12
13#[derive(Debug)]
15pub struct CompressStream<T> {
16 inner: T,
18
19 inner_eof: bool,
21
22 eof: bool,
24
25 src_buf: BytesMut,
27
28 dst_buf: BytesMut,
30
31 compress: Compress,
33}
34
35#[derive(Debug)]
37pub struct Error<T> {
38 inner: Option<T>,
40}
41
42const MIN_BUF: usize = 1024;
43
44impl<T> CompressStream<T>
45where T: BufStream,
46{
47 pub fn new(inner: T, level: Compression) -> CompressStream<T> {
52 CompressStream {
53 inner,
54 inner_eof: false,
55 eof: false,
56 src_buf: BytesMut::new(),
57 dst_buf: BytesMut::new(),
58 compress: Compress::new(level, false),
59 }
60 }
61}
62
63impl<T> BufStream for CompressStream<T>
64where T: BufStream,
65{
66 type Item = io::Cursor<Bytes>;
67 type Error = Error<T::Error>;
68
69 fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
70 loop {
71 if self.eof {
72 return Ok(Ready(None));
73 }
74
75 if !self.inner_eof {
77 let res = self.inner.poll()
78 .map_err(|e| {
79 Error { inner: Some(e) }
80 });
81
82 match try_ready!(res) {
83 Some(buf) => {
84 self.src_buf.reserve(buf.remaining());
85 self.src_buf.put(buf);
86 }
87 None => {
88 self.inner_eof = true;
89 }
90 }
91 }
92
93 let before_out = self.compress.total_out();
94 let before_in = self.compress.total_in();
95
96 let flush = if self.inner_eof {
97 FlushCompress::Finish
98 } else {
99 FlushCompress::None
100 };
101
102 let amt = cmp::max(self.src_buf.len() / 2, MIN_BUF);
104 self.dst_buf.reserve(amt);
105
106 let ret = unsafe {
107 self.compress.compress(
108 &self.src_buf,
109 self.dst_buf.bytes_mut(),
110 flush)
111 };
112
113 let written = (self.compress.total_out() - before_out) as usize;
114 let consumed = (self.compress.total_in() - before_in) as usize;
115
116 unsafe { self.dst_buf.advance_mut(written); }
117 self.src_buf.split_to(consumed);
118
119 match ret {
120 Ok(Status::Ok) | Ok(Status::BufError) if written == 0 && !self.inner_eof => {
125 continue
126 }
127 Ok(Status::Ok) | Ok(Status::BufError) => {
128 break;
129 }
130 Ok(Status::StreamEnd) => {
131 self.eof = true;
132 break;
133 }
134 Err(..) => {
135 return Err(Error { inner: None });
136 }
137 }
138 }
139
140 let buf = io::Cursor::new(self.dst_buf.take().freeze());
141 return Ok(Ready(Some(buf)));
142 }
143
144 fn size_hint(&self) -> SizeHint {
145 self.inner.size_hint()
147 }
148}