Skip to main content

flate2_expose/gz/
write.rs

1use std::cmp;
2use std::io;
3use std::io::prelude::*;
4
5use super::bufread::{corrupt, read_gz_header};
6use super::{GzBuilder, GzHeader};
7use crate::crc::{Crc, CrcWriter};
8use crate::zio;
9use crate::{Compress, Compression, Decompress, Status};
10
11/// A gzip streaming encoder
12///
13/// This structure exposes a [`Write`] interface that will emit compressed data
14/// to the underlying writer `W`.
15///
16/// [`Write`]: https://doc.rust-lang.org/std/io/trait.Write.html
17///
18/// # Examples
19///
20/// ```
21/// use std::io::prelude::*;
22/// use flate2_expose::Compression;
23/// use flate2_expose::write::GzEncoder;
24///
25/// // Vec<u8> implements Write to print the compressed bytes of sample string
26/// # fn main() {
27///
28/// let mut e = GzEncoder::new(Vec::new(), Compression::default());
29/// e.write_all(b"Hello World").unwrap();
30/// println!("{:?}", e.finish().unwrap());
31/// # }
32/// ```
33#[derive(Debug)]
34pub struct GzEncoder<W: Write> {
35    inner: zio::Writer<W, Compress>,
36    crc: Crc,
37    crc_bytes_written: usize,
38    header: Vec<u8>,
39}
40
41pub fn gz_encoder<W: Write>(header: Vec<u8>, w: W, lvl: Compression) -> GzEncoder<W> {
42    GzEncoder {
43        inner: zio::Writer::new(w, Compress::new(lvl, false)),
44        crc: Crc::new(),
45        header,
46        crc_bytes_written: 0,
47    }
48}
49
50impl<W: Write> GzEncoder<W> {
51    /// Creates a new encoder which will use the given compression level.
52    ///
53    /// The encoder is not configured specially for the emitted header. For
54    /// header configuration, see the `GzBuilder` type.
55    ///
56    /// The data written to the returned encoder will be compressed and then
57    /// written to the stream `w`.
58    pub fn new(w: W, level: Compression) -> GzEncoder<W> {
59        GzBuilder::new().write(w, level)
60    }
61
62    /// Acquires a reference to the underlying writer.
63    pub fn get_ref(&self) -> &W {
64        self.inner.get_ref()
65    }
66
67    /// Acquires a mutable reference to the underlying writer.
68    ///
69    /// Note that mutation of the writer may result in surprising results if
70    /// this encoder is continued to be used.
71    pub fn get_mut(&mut self) -> &mut W {
72        self.inner.get_mut()
73    }
74
75    /// Attempt to finish this output stream, writing out final chunks of data.
76    ///
77    /// Note that this function can only be used once data has finished being
78    /// written to the output stream. After this function is called then further
79    /// calls to `write` may result in a panic.
80    ///
81    /// # Panics
82    ///
83    /// Attempts to write data to this stream may result in a panic after this
84    /// function is called.
85    ///
86    /// # Errors
87    ///
88    /// This function will perform I/O to complete this stream, and any I/O
89    /// errors which occur will be returned from this function.
90    pub fn try_finish(&mut self) -> io::Result<()> {
91        self.write_header()?;
92        self.inner.finish()?;
93
94        while self.crc_bytes_written < 8 {
95            let (sum, amt) = (self.crc.sum() as u32, self.crc.amount());
96            let buf = [
97                (sum >> 0) as u8,
98                (sum >> 8) as u8,
99                (sum >> 16) as u8,
100                (sum >> 24) as u8,
101                (amt >> 0) as u8,
102                (amt >> 8) as u8,
103                (amt >> 16) as u8,
104                (amt >> 24) as u8,
105            ];
106            let inner = self.inner.get_mut();
107            let n = inner.write(&buf[self.crc_bytes_written..])?;
108            self.crc_bytes_written += n;
109        }
110        Ok(())
111    }
112
113    /// Finish encoding this stream, returning the underlying writer once the
114    /// encoding is done.
115    ///
116    /// Note that this function may not be suitable to call in a situation where
117    /// the underlying stream is an asynchronous I/O stream. To finish a stream
118    /// the `try_finish` (or `shutdown`) method should be used instead. To
119    /// re-acquire ownership of a stream it is safe to call this method after
120    /// `try_finish` or `shutdown` has returned `Ok`.
121    ///
122    /// # Errors
123    ///
124    /// This function will perform I/O to complete this stream, and any I/O
125    /// errors which occur will be returned from this function.
126    pub fn finish(mut self) -> io::Result<W> {
127        self.try_finish()?;
128        Ok(self.inner.take_inner())
129    }
130
131    fn write_header(&mut self) -> io::Result<()> {
132        while !self.header.is_empty() {
133            let n = self.inner.get_mut().write(&self.header)?;
134            self.header.drain(..n);
135        }
136        Ok(())
137    }
138}
139
140impl<W: Write> Write for GzEncoder<W> {
141    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
142        assert_eq!(self.crc_bytes_written, 0);
143        self.write_header()?;
144        let n = self.inner.write(buf)?;
145        self.crc.update(&buf[..n]);
146        Ok(n)
147    }
148
149    fn flush(&mut self) -> io::Result<()> {
150        assert_eq!(self.crc_bytes_written, 0);
151        self.write_header()?;
152        self.inner.flush()
153    }
154}
155
156impl<R: Read + Write> Read for GzEncoder<R> {
157    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
158        self.get_mut().read(buf)
159    }
160}
161
162impl<W: Write> Drop for GzEncoder<W> {
163    fn drop(&mut self) {
164        if self.inner.is_present() {
165            let _ = self.try_finish();
166        }
167    }
168}
169
170/// A gzip streaming decoder
171///
172/// This structure exposes a [`Write`] interface that will emit compressed data
173/// to the underlying writer `W`.
174///
175/// [`Write`]: https://doc.rust-lang.org/std/io/trait.Write.html
176///
177/// # Examples
178///
179/// ```
180/// use std::io::prelude::*;
181/// use std::io;
182/// use flate2_expose::Compression;
183/// use flate2_expose::write::{GzEncoder, GzDecoder};
184///
185/// # fn main() {
186/// #    let mut e = GzEncoder::new(Vec::new(), Compression::default());
187/// #    e.write(b"Hello World").unwrap();
188/// #    let bytes = e.finish().unwrap();
189/// #    assert_eq!("Hello World", decode_writer(bytes).unwrap());
190/// # }
191/// // Uncompresses a gzip encoded vector of bytes and returns a string or error
192/// // Here Vec<u8> implements Write
193/// fn decode_writer(bytes: Vec<u8>) -> io::Result<String> {
194///    let mut writer = Vec::new();
195///    let mut decoder = GzDecoder::new(writer);
196///    decoder.write_all(&bytes[..])?;
197///    writer = decoder.finish()?;
198///    let return_string = String::from_utf8(writer).expect("String parsing error");
199///    Ok(return_string)
200/// }
201/// ```
202#[derive(Debug)]
203pub struct GzDecoder<W: Write> {
204    inner: zio::Writer<CrcWriter<W>, Decompress>,
205    crc_bytes: Vec<u8>,
206    header: Option<GzHeader>,
207    header_buf: Vec<u8>,
208}
209
210const CRC_BYTES_LEN: usize = 8;
211
212impl<W: Write> GzDecoder<W> {
213    /// Creates a new decoder which will write uncompressed data to the stream.
214    ///
215    /// When this encoder is dropped or unwrapped the final pieces of data will
216    /// be flushed.
217    pub fn new(w: W) -> GzDecoder<W> {
218        GzDecoder {
219            inner: zio::Writer::new(CrcWriter::new(w), Decompress::new(false)),
220            crc_bytes: Vec::with_capacity(CRC_BYTES_LEN),
221            header: None,
222            header_buf: Vec::new(),
223        }
224    }
225
226    /// Returns the header associated with this stream.
227    pub fn header(&self) -> Option<&GzHeader> {
228        self.header.as_ref()
229    }
230
231    /// Acquires a reference to the underlying writer.
232    pub fn get_ref(&self) -> &W {
233        self.inner.get_ref().get_ref()
234    }
235
236    /// Acquires a mutable reference to the underlying writer.
237    ///
238    /// Note that mutating the output/input state of the stream may corrupt this
239    /// object, so care must be taken when using this method.
240    pub fn get_mut(&mut self) -> &mut W {
241        self.inner.get_mut().get_mut()
242    }
243
244    /// Attempt to finish this output stream, writing out final chunks of data.
245    ///
246    /// Note that this function can only be used once data has finished being
247    /// written to the output stream. After this function is called then further
248    /// calls to `write` may result in a panic.
249    ///
250    /// # Panics
251    ///
252    /// Attempts to write data to this stream may result in a panic after this
253    /// function is called.
254    ///
255    /// # Errors
256    ///
257    /// This function will perform I/O to finish the stream, returning any
258    /// errors which happen.
259    pub fn try_finish(&mut self) -> io::Result<()> {
260        self.finish_and_check_crc()?;
261        Ok(())
262    }
263
264    /// Consumes this decoder, flushing the output stream.
265    ///
266    /// This will flush the underlying data stream and then return the contained
267    /// writer if the flush succeeded.
268    ///
269    /// Note that this function may not be suitable to call in a situation where
270    /// the underlying stream is an asynchronous I/O stream. To finish a stream
271    /// the `try_finish` (or `shutdown`) method should be used instead. To
272    /// re-acquire ownership of a stream it is safe to call this method after
273    /// `try_finish` or `shutdown` has returned `Ok`.
274    ///
275    /// # Errors
276    ///
277    /// This function will perform I/O to complete this stream, and any I/O
278    /// errors which occur will be returned from this function.
279    pub fn finish(mut self) -> io::Result<W> {
280        self.finish_and_check_crc()?;
281        Ok(self.inner.take_inner().into_inner())
282    }
283
284    fn finish_and_check_crc(&mut self) -> io::Result<()> {
285        self.inner.finish()?;
286
287        if self.crc_bytes.len() != 8 {
288            return Err(corrupt());
289        }
290
291        let crc = ((self.crc_bytes[0] as u32) << 0)
292            | ((self.crc_bytes[1] as u32) << 8)
293            | ((self.crc_bytes[2] as u32) << 16)
294            | ((self.crc_bytes[3] as u32) << 24);
295        let amt = ((self.crc_bytes[4] as u32) << 0)
296            | ((self.crc_bytes[5] as u32) << 8)
297            | ((self.crc_bytes[6] as u32) << 16)
298            | ((self.crc_bytes[7] as u32) << 24);
299        if crc != self.inner.get_ref().crc().sum() as u32 {
300            return Err(corrupt());
301        }
302        if amt != self.inner.get_ref().crc().amount() {
303            return Err(corrupt());
304        }
305        Ok(())
306    }
307}
308
309struct Counter<T: Read> {
310    inner: T,
311    pos: usize,
312}
313
314impl<T: Read> Read for Counter<T> {
315    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
316        let pos = self.inner.read(buf)?;
317        self.pos += pos;
318        Ok(pos)
319    }
320}
321
322impl<W: Write> Write for GzDecoder<W> {
323    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
324        if self.header.is_none() {
325            // trying to avoid buffer usage
326            let (res, pos) = {
327                let mut counter = Counter {
328                    inner: self.header_buf.chain(buf),
329                    pos: 0,
330                };
331                let res = read_gz_header(&mut counter);
332                (res, counter.pos)
333            };
334
335            match res {
336                Err(err) => {
337                    if err.kind() == io::ErrorKind::UnexpectedEof {
338                        // not enough data for header, save to the buffer
339                        self.header_buf.extend(buf);
340                        Ok(buf.len())
341                    } else {
342                        Err(err)
343                    }
344                }
345                Ok(header) => {
346                    self.header = Some(header);
347                    let pos = pos - self.header_buf.len();
348                    self.header_buf.truncate(0);
349                    Ok(pos)
350                }
351            }
352        } else {
353            let (n, status) = self.inner.write_with_status(buf)?;
354
355            if status == Status::StreamEnd && n < buf.len() && self.crc_bytes.len() < 8 {
356                let remaining = buf.len() - n;
357                let crc_bytes = cmp::min(remaining, CRC_BYTES_LEN - self.crc_bytes.len());
358                self.crc_bytes.extend(&buf[n..n + crc_bytes]);
359                return Ok(n + crc_bytes);
360            }
361            Ok(n)
362        }
363    }
364
365    fn flush(&mut self) -> io::Result<()> {
366        self.inner.flush()
367    }
368}
369
370impl<W: Read + Write> Read for GzDecoder<W> {
371    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
372        self.inner.get_mut().get_mut().read(buf)
373    }
374}
375
376#[cfg(test)]
377mod tests {
378    use super::*;
379
380    const STR: &'static str = "Hello World Hello World Hello World Hello World Hello World \
381                               Hello World Hello World Hello World Hello World Hello World \
382                               Hello World Hello World Hello World Hello World Hello World \
383                               Hello World Hello World Hello World Hello World Hello World \
384                               Hello World Hello World Hello World Hello World Hello World";
385
386    #[test]
387    fn decode_writer_one_chunk() {
388        let mut e = GzEncoder::new(Vec::new(), Compression::default());
389        e.write(STR.as_ref()).unwrap();
390        let bytes = e.finish().unwrap();
391
392        let mut writer = Vec::new();
393        let mut decoder = GzDecoder::new(writer);
394        let n = decoder.write(&bytes[..]).unwrap();
395        decoder.write(&bytes[n..]).unwrap();
396        decoder.try_finish().unwrap();
397        writer = decoder.finish().unwrap();
398        let return_string = String::from_utf8(writer).expect("String parsing error");
399        assert_eq!(return_string, STR);
400    }
401
402    #[test]
403    fn decode_writer_partial_header() {
404        let mut e = GzEncoder::new(Vec::new(), Compression::default());
405        e.write(STR.as_ref()).unwrap();
406        let bytes = e.finish().unwrap();
407
408        let mut writer = Vec::new();
409        let mut decoder = GzDecoder::new(writer);
410        assert_eq!(decoder.write(&bytes[..5]).unwrap(), 5);
411        let n = decoder.write(&bytes[5..]).unwrap();
412        if n < bytes.len() - 5 {
413            decoder.write(&bytes[n + 5..]).unwrap();
414        }
415        writer = decoder.finish().unwrap();
416        let return_string = String::from_utf8(writer).expect("String parsing error");
417        assert_eq!(return_string, STR);
418    }
419
420    #[test]
421    fn decode_writer_exact_header() {
422        let mut e = GzEncoder::new(Vec::new(), Compression::default());
423        e.write(STR.as_ref()).unwrap();
424        let bytes = e.finish().unwrap();
425
426        let mut writer = Vec::new();
427        let mut decoder = GzDecoder::new(writer);
428        assert_eq!(decoder.write(&bytes[..10]).unwrap(), 10);
429        decoder.write(&bytes[10..]).unwrap();
430        writer = decoder.finish().unwrap();
431        let return_string = String::from_utf8(writer).expect("String parsing error");
432        assert_eq!(return_string, STR);
433    }
434
435    #[test]
436    fn decode_writer_partial_crc() {
437        let mut e = GzEncoder::new(Vec::new(), Compression::default());
438        e.write(STR.as_ref()).unwrap();
439        let bytes = e.finish().unwrap();
440
441        let mut writer = Vec::new();
442        let mut decoder = GzDecoder::new(writer);
443        let l = bytes.len() - 5;
444        let n = decoder.write(&bytes[..l]).unwrap();
445        decoder.write(&bytes[n..]).unwrap();
446        writer = decoder.finish().unwrap();
447        let return_string = String::from_utf8(writer).expect("String parsing error");
448        assert_eq!(return_string, STR);
449    }
450}