isal/
write.rs

1//! Encoder and Decoder implementing `std::io::Write`
2use crate::*;
3use std::io;
4use std::io::Write;
5
6/// Streaming compression for input streams implementing `std::io::Write`.
7///
8/// Notes
9/// -----
10/// One should consider using `crate::compress` or `crate::compress_into` if possible.
11/// In that context, we do not need to hold and maintain intermediate buffers for reading and writing.
12///
13/// Example
14/// -------
15/// ```
16/// use std::{io, io::Write};
17/// use isal::{write::Encoder, CompressionLevel, decompress, Codec};
18///
19/// let data = b"Hello, World!".to_vec();
20/// let mut compressed = vec![];
21///
22/// let mut encoder = Encoder::new(&mut compressed, CompressionLevel::Three, Codec::Gzip);
23///
24/// // Numbeer of compressed bytes written to `output`
25/// io::copy(&mut io::Cursor::new(&data), &mut encoder).unwrap();
26///
27/// // call .flush to finish the stream
28/// encoder.flush().unwrap();
29///
30/// let decompressed = decompress(io::Cursor::new(&compressed), Codec::Gzip).unwrap();
31/// assert_eq!(decompressed.as_slice(), data);
32///
33/// ```
34pub struct Encoder<W: io::Write> {
35    inner: W,
36    stream: ZStream,
37    out_buf: Vec<u8>,
38    dsts: usize,
39    dste: usize,
40    total_in: usize,
41    total_out: usize,
42    codec: Codec,
43}
44
45impl<W: io::Write> Encoder<W> {
46    /// Create a new `Encoder` which implements the `std::io::Read` trait.
47    pub fn new(writer: W, level: CompressionLevel, codec: Codec) -> Encoder<W> {
48        let out_buf = Vec::with_capacity(BUF_SIZE);
49
50        let mut zstream = ZStream::new(level, ZStreamKind::Stateful);
51
52        zstream.stream.end_of_stream = 0;
53        zstream.stream.flush = FlushFlags::NoFlush as _;
54        zstream.stream.gzip_flag = codec as _;
55
56        Self {
57            inner: writer,
58            stream: zstream,
59            out_buf,
60            dste: 0,
61            dsts: 0,
62            total_in: 0,
63            total_out: 0,
64            codec,
65        }
66    }
67
68    /// Mutable reference to underlying reader, not advisable to modify during reading.
69    pub fn get_ref_mut(&mut self) -> &mut W {
70        &mut self.inner
71    }
72
73    // Reference to underlying reader
74    pub fn get_ref(&self) -> &W {
75        &self.inner
76    }
77
78    /// Call flush and return the inner writer
79    pub fn finish(mut self) -> io::Result<W> {
80        self.flush()?;
81        Ok(self.inner)
82    }
83
84    /// total bytes written to the writer, inclusive of all streams if `flush` has been called before
85    pub fn total_out(&self) -> usize {
86        self.stream.stream.total_out as usize + self.total_out
87    }
88
89    /// total bytes processed, inclusive of all streams if `flush` has been called before
90    pub fn total_in(&self) -> usize {
91        self.stream.stream.total_in as usize + self.total_in
92    }
93
94    #[inline(always)]
95    fn write_from_out_buf(&mut self) -> io::Result<usize> {
96        let count = self.dste - self.dsts;
97        self.inner
98            .write_all(&mut self.out_buf[self.dsts..self.dste])?;
99        self.out_buf.truncate(0);
100        self.dsts = 0;
101        self.dste = 0;
102        Ok(count)
103    }
104}
105
106impl<W: io::Write> io::Write for Encoder<W> {
107    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
108        if buf.is_empty() {
109            return Ok(0);
110        }
111        self.stream.stream.avail_in = buf.len() as _;
112        self.stream.stream.next_in = buf.as_ptr() as *mut _;
113
114        while self.stream.stream.avail_in > 0 {
115            self.out_buf.resize(self.dste + BUF_SIZE, 0);
116
117            self.stream.stream.avail_out = BUF_SIZE as _;
118            self.stream.stream.next_out =
119                self.out_buf[self.dste..self.dste + BUF_SIZE].as_mut_ptr();
120
121            self.stream.deflate()?;
122
123            self.dste += BUF_SIZE - self.stream.stream.avail_out as usize;
124        }
125
126        self.write_from_out_buf()?;
127
128        Ok(buf.len())
129    }
130    fn flush(&mut self) -> io::Result<()> {
131        // Write footer and flush to inner
132        self.stream.stream.end_of_stream = 1;
133        self.stream.stream.flush = FlushFlags::FullFlush as _;
134        while self.stream.stream.internal_state.state != isal::isal_zstate_state_ZSTATE_END {
135            self.out_buf.resize(self.dste + BUF_SIZE, 0);
136            self.stream.stream.avail_out = BUF_SIZE as _;
137            self.stream.stream.next_out =
138                self.out_buf[self.dste..self.dste + BUF_SIZE].as_mut_ptr();
139            self.stream.deflate()?;
140            self.dste += BUF_SIZE - self.stream.stream.avail_out as usize;
141        }
142        self.write_from_out_buf()?;
143        self.inner.flush()?;
144
145        // Prep for next stream should user call 'write' again after flush.
146        // needs to store total_in/out separately as checksum is calculated
147        // from these values per stream
148        self.total_in += self.stream.stream.total_in as usize;
149        self.total_out += self.stream.stream.total_out as usize;
150        unsafe { isal::isal_deflate_reset(&mut self.stream.stream) };
151
152        self.stream.stream.flush = FlushFlags::NoFlush as _;
153        self.stream.stream.end_of_stream = 0;
154        self.stream.stream.gzip_flag = self.codec as _;
155        Ok(())
156    }
157}
158
159/// Streaming compression for input streams implementing `std::io::Write`.
160///
161/// Notes
162/// -----
163/// One should consider using `crate::decompress` or `crate::decompress_into` if possible.
164/// In that context, we do not need to hold and maintain intermediate buffers for reading and writing.
165///
166/// Example
167/// -------
168/// ```
169/// use std::{io, io::Write};
170/// use isal::{write::Decoder, CompressionLevel, compress, Codec};
171/// let data = b"Hello, World!".to_vec();
172///
173/// let compressed = compress(io::Cursor::new(data.as_slice()), CompressionLevel::Three, Codec::Gzip).unwrap();
174///
175/// let mut decompressed = vec![];
176/// let mut decoder = Decoder::new(&mut decompressed, Codec::Gzip);
177///
178/// // Numbeer of compressed bytes written to `output`
179/// let n = io::copy(&mut io::Cursor::new(&compressed), &mut decoder).unwrap();
180/// assert_eq!(n as usize, compressed.len());
181/// assert_eq!(decompressed.as_slice(), data);
182/// ```
183pub struct Decoder<W: io::Write> {
184    inner: W,
185    zst: InflateState,
186    out_buf: Vec<u8>,
187    total_out: usize,
188    total_in: usize,
189    #[allow(dead_code)]
190    codec: Codec,
191}
192
193impl<W: io::Write> Decoder<W> {
194    pub fn new(writer: W, codec: Codec) -> Decoder<W> {
195        let zst = InflateState::new(codec);
196
197        Self {
198            inner: writer,
199            zst,
200            out_buf: Vec::with_capacity(BUF_SIZE),
201            total_out: 0,
202            total_in: 0,
203            codec,
204        }
205    }
206
207    /// Mutable reference to underlying reader, not advisable to modify during reading.
208    pub fn get_ref_mut(&mut self) -> &mut W {
209        &mut self.inner
210    }
211
212    // Reference to underlying reader
213    pub fn get_ref(&self) -> &W {
214        &self.inner
215    }
216}
217
218impl<W: io::Write> io::Write for Decoder<W> {
219    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
220        self.zst.state.avail_in = buf.len() as _;
221        self.zst.state.next_in = buf.as_ptr() as *mut _;
222
223        self.total_in += buf.len();
224
225        let mut n_bytes = 0;
226        loop {
227            loop {
228                self.out_buf.resize(n_bytes + BUF_SIZE, 0);
229
230                self.zst.state.next_out = self.out_buf[n_bytes..n_bytes + BUF_SIZE].as_mut_ptr();
231                self.zst.state.avail_out = BUF_SIZE as _;
232
233                self.zst.step_inflate()?;
234
235                n_bytes += BUF_SIZE - self.zst.state.avail_out as usize;
236                self.total_out += n_bytes;
237
238                if self.zst.block_state() == isal::isal_block_state_ISAL_BLOCK_FINISH {
239                    break;
240                }
241            }
242            if self.zst.block_state() == isal::isal_block_state_ISAL_BLOCK_FINISH {
243                self.zst.reset();
244            }
245
246            if self.zst.state.avail_in == 0 {
247                break;
248            }
249        }
250        self.inner.write_all(&self.out_buf[..n_bytes])?;
251
252        let nbytes = buf.len() - self.zst.state.avail_in as usize;
253        Ok(nbytes)
254    }
255    fn flush(&mut self) -> io::Result<()> {
256        if self.total_out == 0 && self.total_in == 0 {
257            return Err(io::Error::new(
258                io::ErrorKind::InvalidData,
259                Error::DecompressionError(DecompCode::EndInput),
260            ));
261        }
262        self.inner.flush()
263    }
264}
265
266/// Deflate compression
267/// Basically a wrapper to `Encoder` which sets the codec for you.
268pub struct DeflateEncoder<R: io::Write> {
269    inner: Encoder<R>,
270}
271
272impl<W: io::Write> DeflateEncoder<W> {
273    pub fn new(writer: W, level: CompressionLevel) -> Self {
274        Self {
275            inner: Encoder::new(writer, level, Codec::Deflate),
276        }
277    }
278    /// Mutable reference to underlying reader, not advisable to modify during reading.
279    pub fn get_ref_mut(&mut self) -> &mut W {
280        &mut self.inner.inner
281    }
282
283    // Reference to underlying reader
284    pub fn get_ref(&self) -> &W {
285        &self.inner.inner
286    }
287
288    /// Call flush and return the inner writer
289    pub fn finish(mut self) -> io::Result<W> {
290        self.flush()?;
291        Ok(self.inner.inner)
292    }
293
294    /// total bytes written to the writer, inclusive of all streams if `flush` has been called before
295    pub fn total_out(&self) -> usize {
296        self.inner.stream.stream.total_out as usize + self.inner.total_out
297    }
298
299    /// total bytes processed, inclusive of all streams if `flush` has been called before
300    pub fn total_in(&self) -> usize {
301        self.inner.stream.stream.total_in as usize + self.inner.total_in
302    }
303}
304
305impl<W: io::Write> io::Write for DeflateEncoder<W> {
306    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
307        self.inner.write(buf)
308    }
309    fn flush(&mut self) -> io::Result<()> {
310        self.inner.flush()
311    }
312}
313
314/// Deflate decompression
315/// Basically a wrapper to `Decoder` which sets the codec for you.
316pub struct DeflateDecoder<W: io::Write> {
317    inner: Decoder<W>,
318}
319
320impl<W: io::Write> DeflateDecoder<W> {
321    pub fn new(writer: W) -> Self {
322        Self {
323            inner: Decoder::new(writer, Codec::Deflate),
324        }
325    }
326    /// Mutable reference to underlying reader, not advisable to modify during reading.
327    pub fn get_ref_mut(&mut self) -> &mut W {
328        &mut self.inner.inner
329    }
330
331    // Reference to underlying reader
332    pub fn get_ref(&self) -> &W {
333        &self.inner.inner
334    }
335}
336
337impl<W: io::Write> io::Write for DeflateDecoder<W> {
338    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
339        self.inner.write(buf)
340    }
341    fn flush(&mut self) -> io::Result<()> {
342        self.inner.flush()
343    }
344}
345
346/// Zlib compression
347/// Basically a wrapper to `Encoder` which sets the codec for you.
348pub struct ZlibEncoder<R: io::Write> {
349    inner: Encoder<R>,
350}
351
352impl<W: io::Write> ZlibEncoder<W> {
353    pub fn new(writer: W, level: CompressionLevel) -> Self {
354        Self {
355            inner: Encoder::new(writer, level, Codec::Zlib),
356        }
357    }
358    /// Mutable reference to underlying reader, not advisable to modify during reading.
359    pub fn get_ref_mut(&mut self) -> &mut W {
360        &mut self.inner.inner
361    }
362
363    // Reference to underlying reader
364    pub fn get_ref(&self) -> &W {
365        &self.inner.inner
366    }
367
368    /// Call flush and return the inner writer
369    pub fn finish(mut self) -> io::Result<W> {
370        self.flush()?;
371        Ok(self.inner.inner)
372    }
373
374    /// total bytes written to the writer, inclusive of all streams if `flush` has been called before
375    pub fn total_out(&self) -> usize {
376        self.inner.stream.stream.total_out as usize + self.inner.total_out
377    }
378
379    /// total bytes processed, inclusive of all streams if `flush` has been called before
380    pub fn total_in(&self) -> usize {
381        self.inner.stream.stream.total_in as usize + self.inner.total_in
382    }
383}
384
385impl<W: io::Write> io::Write for ZlibEncoder<W> {
386    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
387        self.inner.write(buf)
388    }
389    fn flush(&mut self) -> io::Result<()> {
390        self.inner.flush()
391    }
392}
393
394/// Zlib decompression
395/// Basically a wrapper to `Decoder` which sets the codec for you.
396pub struct ZlibDecoder<W: io::Write> {
397    inner: Decoder<W>,
398}
399
400impl<W: io::Write> ZlibDecoder<W> {
401    pub fn new(writer: W) -> Self {
402        Self {
403            inner: Decoder::new(writer, Codec::Zlib),
404        }
405    }
406    /// Mutable reference to underlying reader, not advisable to modify during reading.
407    pub fn get_ref_mut(&mut self) -> &mut W {
408        &mut self.inner.inner
409    }
410
411    // Reference to underlying reader
412    pub fn get_ref(&self) -> &W {
413        &self.inner.inner
414    }
415}
416
417impl<W: io::Write> io::Write for ZlibDecoder<W> {
418    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
419        self.inner.write(buf)
420    }
421    fn flush(&mut self) -> io::Result<()> {
422        self.inner.flush()
423    }
424}
425
426/// Gzip compression
427/// Basically a wrapper to `Encoder` which sets the codec for you.
428pub struct GzipEncoder<R: io::Write> {
429    inner: Encoder<R>,
430}
431
432impl<W: io::Write> GzipEncoder<W> {
433    pub fn new(writer: W, level: CompressionLevel) -> Self {
434        Self {
435            inner: Encoder::new(writer, level, Codec::Gzip),
436        }
437    }
438    /// Mutable reference to underlying reader, not advisable to modify during reading.
439    pub fn get_ref_mut(&mut self) -> &mut W {
440        &mut self.inner.inner
441    }
442
443    // Reference to underlying reader
444    pub fn get_ref(&self) -> &W {
445        &self.inner.inner
446    }
447
448    /// Call flush and return the inner writer
449    pub fn finish(mut self) -> io::Result<W> {
450        self.flush()?;
451        Ok(self.inner.inner)
452    }
453
454    /// total bytes written to the writer, inclusive of all streams if `flush` has been called before
455    pub fn total_out(&self) -> usize {
456        self.inner.stream.stream.total_out as usize + self.inner.total_out
457    }
458
459    /// total bytes processed, inclusive of all streams if `flush` has been called before
460    pub fn total_in(&self) -> usize {
461        self.inner.stream.stream.total_in as usize + self.inner.total_in
462    }
463}
464
465impl<W: io::Write> io::Write for GzipEncoder<W> {
466    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
467        self.inner.write(buf)
468    }
469    fn flush(&mut self) -> io::Result<()> {
470        self.inner.flush()
471    }
472}
473
474/// Gzip decompression
475/// Basically a wrapper to `Decoder` which sets the codec for you.
476pub struct GzipDecoder<W: io::Write> {
477    inner: Decoder<W>,
478}
479
480impl<W: io::Write> GzipDecoder<W> {
481    pub fn new(writer: W) -> Self {
482        Self {
483            inner: Decoder::new(writer, Codec::Gzip),
484        }
485    }
486    /// Mutable reference to underlying reader, not advisable to modify during reading.
487    pub fn get_ref_mut(&mut self) -> &mut W {
488        &mut self.inner.inner
489    }
490
491    // Reference to underlying reader
492    pub fn get_ref(&self) -> &W {
493        &self.inner.inner
494    }
495}
496
497impl<W: io::Write> io::Write for GzipDecoder<W> {
498    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
499        self.inner.write(buf)
500    }
501    fn flush(&mut self) -> io::Result<()> {
502        self.inner.flush()
503    }
504}