Skip to main content

lz4/
decoder.rs

1//! `Read`-based streaming decoder wrapping `LZ4F_decompress`.
2//!
3//! [`Decoder`] consumes a single LZ4 frame from an inner [`Read`] source
4//! and exposes the decompressed payload as a byte stream. The decoder owns
5//! a 4 MiB + 64 KiB scratch buffer used both to stage compressed input and
6//! to satisfy `LZ4F_decompress`'s "consume some src, produce some dst"
7//! contract. A single [`Decoder`] handles exactly one frame — to decode
8//! concatenated frames, instantiate a new decoder per frame (see the CLI
9//! in `src/bin/lz4.rs` for an example).
10//!
11//! The API mirrors the `lz4-rs` crate's [`Decoder`] so that this crate is a
12//! drop-in replacement.
13
14use super::liblz4::*;
15use super::size_t;
16use std::io::{Error, ErrorKind, Read, Result};
17use std::ptr;
18
19// 4 MiB block payload + 4-byte block header + 4-byte block checksum +
20// frame header (max ~19 bytes) + content checksum trailer (4 bytes) +
21// slack so the slice-to-dst fast path in `LZ4F_decompress` can fire even
22// for incompressible 4 MiB blocks where the block is exactly the maximum
23// size. Without slack the fast path never fires for raw 4 MiB blocks and
24// the decoder always pays the buffered-input copy overhead.
25const BUFFER_SIZE: usize = 4 * 1024 * 1024 + 64 * 1024;
26
27// NOTE: unsafe to device Clone or Copy, otherwise
28// there can be multiple copies of the same inner LZ4 pointer
29/// RAII wrapper around an `LZ4F_dctx*` that frees the context on drop.
30#[derive(Debug)]
31struct DecoderContext {
32    c: LZ4FDecompressionContext,
33}
34
35// NOTE: unsafe to derive Clone or Copy
36/// Streaming LZ4 frame decoder wrapping an inner [`Read`] source.
37///
38/// Pull bytes via the [`Read`] impl; the decoder transparently reads from
39/// the source, feeds compressed input into `LZ4F_decompress`, and emits
40/// decompressed bytes into the caller's buffer. When the frame ends,
41/// further reads return `Ok(0)`. Call [`finish`](Decoder::finish) to
42/// recover the inner reader and confirm the frame was fully consumed.
43#[derive(Debug)]
44pub struct Decoder<R> {
45    c: DecoderContext,
46    r: R,
47    buf: Box<[u8]>,
48    pos: usize,
49    len: usize,
50    next: usize,
51    drain_pending: bool,
52}
53
54// No interior mutability, so Decoder is Sync as long as R is Sync.
55unsafe impl<R: Read + Sync> Sync for Decoder<R> {}
56
57impl<R: Read> Decoder<R> {
58    /// Creates a new decoder which reads its input from the given
59    /// input stream. The input stream can be re-acquired by calling
60    /// `finish()`
61    pub fn new(r: R) -> Result<Decoder<R>> {
62        Ok(Decoder {
63            r,
64            c: DecoderContext::new()?,
65            buf: vec![0; BUFFER_SIZE].into_boxed_slice(),
66            pos: BUFFER_SIZE,
67            len: BUFFER_SIZE,
68            // Minimal LZ4 stream size
69            next: 11,
70            drain_pending: false,
71        })
72    }
73
74    /// Immutable reader reference.
75    pub fn reader(&self) -> &R {
76        &self.r
77    }
78
79    /// Returns the wrapped reader together with a status result.
80    ///
81    /// The result is `Ok(())` if the LZ4 frame was fully consumed (the
82    /// frame footer and any content checksum were read and accepted) and
83    /// `Err(ErrorKind::Interrupted)` if `finish` was called before the end
84    /// of the compressed stream.
85    pub fn finish(self) -> (R, Result<()>) {
86        (
87            self.r,
88            match self.next {
89                0 => Ok(()),
90                _ => Err(Error::new(
91                    ErrorKind::Interrupted,
92                    "Finish runned before read end of compressed stream",
93                )),
94            },
95        )
96    }
97}
98
99impl<R: Read> Read for Decoder<R> {
100    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
101        if self.next == 0 || buf.is_empty() {
102            return Ok(0);
103        }
104        let mut dst_offset: usize = 0;
105        while dst_offset == 0 {
106            if self.pos >= self.len {
107                if self.drain_pending {
108                    self.len = 0;
109                    self.drain_pending = false;
110                } else {
111                    let need = if self.buf.len() < self.next {
112                        self.buf.len()
113                    } else {
114                        self.next
115                    };
116                    self.len = self.r.read(&mut self.buf[0..need])?;
117                    // NOTE: we do not exit here if there was nothing read
118                    // The lz4 context may still have more bytes to emit.
119                }
120                self.pos = 0;
121                self.next -= self.len;
122            }
123            while (dst_offset < buf.len()) && ((self.pos < self.len) || self.len == 0) {
124                let mut src_size = (self.len - self.pos) as size_t;
125                let mut dst_size = (buf.len() - dst_offset) as size_t;
126                let len = check_error(unsafe {
127                    LZ4F_decompress(
128                        self.c.c,
129                        buf[dst_offset..].as_mut_ptr(),
130                        &mut dst_size,
131                        self.buf[self.pos..].as_ptr(),
132                        &mut src_size,
133                        ptr::null(),
134                    )
135                })?;
136                self.pos += src_size as usize;
137                dst_offset += dst_size as usize;
138                self.drain_pending = len == 1 && dst_size > 0;
139
140                // We need to keep trying to read bytes from the decompressor
141                // until it is no longer emitting them, even after it
142                // has finished reading bytes.
143                if dst_size == 0 && src_size == 0 {
144                    if dst_offset > 0 {
145                        return Ok(dst_offset);
146                    }
147                    if self.len == 0 && len != 0 {
148                        return Err(Error::new(
149                            ErrorKind::UnexpectedEof,
150                            "unexpected end of compressed stream",
151                        ));
152                    }
153                    return Ok(dst_offset);
154                }
155
156                if len == 0 {
157                    self.next = 0;
158                    return Ok(dst_offset);
159                } else if self.next < len {
160                    self.next = len;
161                }
162            }
163        }
164        Ok(dst_offset)
165    }
166}
167
168impl DecoderContext {
169    fn new() -> Result<DecoderContext> {
170        let mut context = LZ4FDecompressionContext(ptr::null_mut());
171        check_error(unsafe { LZ4F_createDecompressionContext(&mut context, LZ4F_VERSION) })?;
172        Ok(DecoderContext { c: context })
173    }
174}
175
176impl Drop for DecoderContext {
177    fn drop(&mut self) {
178        unsafe { LZ4F_freeDecompressionContext(self.c) };
179    }
180}
181
182#[cfg(test)]
183mod test {
184    extern crate rand;
185
186    use self::rand::rngs::StdRng;
187    use self::rand::Rng;
188    use super::super::encoder::{Encoder, EncoderBuilder};
189    use super::Decoder;
190    use std::io::{Cursor, Error, ErrorKind, Read, Result, Write};
191
192    const BUFFER_SIZE: usize = 64 * 1024;
193    const END_MARK: [u8; 4] = [0x9f, 0x77, 0x22, 0x71];
194
195    struct ErrorWrapper<R: Read, Rn: Rng> {
196        r: R,
197        rng: Rn,
198    }
199
200    impl<R: Read, Rn: Rng> ErrorWrapper<R, Rn> {
201        fn new(rng: Rn, read: R) -> Self {
202            ErrorWrapper { r: read, rng }
203        }
204    }
205
206    impl<R: Read, Rn: Rng> Read for ErrorWrapper<R, Rn> {
207        fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
208            if self.rng.next_u32() & 0x03 == 0 {
209                self.r.read(buf)
210            } else {
211                Err(Error::other("Opss..."))
212            }
213        }
214    }
215
216    struct RetryWrapper<R: Read> {
217        r: R,
218    }
219
220    impl<R: Read> RetryWrapper<R> {
221        fn new(read: R) -> Self {
222            RetryWrapper { r: read }
223        }
224    }
225
226    impl<R: Read> Read for RetryWrapper<R> {
227        fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
228            loop {
229                match self.r.read(buf) {
230                    Ok(v) => {
231                        return Ok(v);
232                    }
233                    Err(e) => {
234                        if e.kind() == ErrorKind::Other {
235                            continue;
236                        }
237                        return Err(e);
238                    }
239                }
240            }
241        }
242    }
243
244    fn finish_encode<W: Write>(encoder: Encoder<W>) -> W {
245        let (mut buffer, result) = encoder.finish();
246        result.unwrap();
247        buffer.write_all(&END_MARK).unwrap();
248        buffer
249    }
250
251    fn finish_decode<R: Read>(decoder: Decoder<R>) {
252        let (buffer, result) = decoder.finish();
253        result.unwrap();
254
255        let mut mark = Vec::new();
256        let mut data = Vec::new();
257        mark.write_all(&END_MARK).unwrap();
258        RetryWrapper::new(buffer).read_to_end(&mut data).unwrap();
259        assert_eq!(mark, data);
260    }
261
262    #[test]
263    fn test_decoder_empty() {
264        let expected: Vec<u8> = Vec::new();
265        let buffer = finish_encode(EncoderBuilder::new().level(1).build(Vec::new()).unwrap());
266
267        let mut decoder = Decoder::new(Cursor::new(buffer)).unwrap();
268        let mut actual = Vec::new();
269
270        decoder.read_to_end(&mut actual).unwrap();
271        assert_eq!(expected, actual);
272        finish_decode(decoder);
273    }
274
275    #[test]
276    fn test_decoder_smallest() {
277        let expected: Vec<u8> = Vec::new();
278        let mut buffer = b"\x04\x22\x4d\x18\x40\x40\xc0\x00\x00\x00\x00".to_vec();
279        buffer.write_all(&END_MARK).unwrap();
280
281        let mut decoder = Decoder::new(Cursor::new(buffer)).unwrap();
282        let mut actual = Vec::new();
283
284        decoder.read_to_end(&mut actual).unwrap();
285        assert_eq!(expected, actual);
286        finish_decode(decoder);
287    }
288
289    #[test]
290    fn test_decoder_smoke() {
291        let mut encoder = EncoderBuilder::new().level(1).build(Vec::new()).unwrap();
292        let mut expected = Vec::new();
293        expected.write_all(b"Some data").unwrap();
294        encoder.write_all(&expected[..4]).unwrap();
295        encoder.write_all(&expected[4..]).unwrap();
296        let buffer = finish_encode(encoder);
297
298        let mut decoder = Decoder::new(Cursor::new(buffer)).unwrap();
299        let mut actual = Vec::new();
300
301        decoder.read_to_end(&mut actual).unwrap();
302        assert_eq!(expected, actual);
303        finish_decode(decoder);
304    }
305
306    #[test]
307    fn test_decoder_random() {
308        let mut rnd = random();
309        let expected = random_stream(&mut rnd, 1027 * 1023 * 7);
310        let mut encoder = EncoderBuilder::new().level(1).build(Vec::new()).unwrap();
311        encoder.write_all(&expected).unwrap();
312        let encoded = finish_encode(encoder);
313
314        let mut decoder = Decoder::new(Cursor::new(encoded)).unwrap();
315        let mut actual = Vec::new();
316        loop {
317            let mut buffer = [0; BUFFER_SIZE];
318            let size = decoder.read(&mut buffer).unwrap();
319            if size == 0 {
320                break;
321            }
322            actual.write_all(&buffer[0..size]).unwrap();
323        }
324        assert_eq!(expected, actual);
325        finish_decode(decoder);
326    }
327
328    #[test]
329    fn test_retry_read() {
330        let mut rnd = random();
331        let expected = random_stream(&mut rnd, 1027 * 1023 * 7);
332        let mut encoder = EncoderBuilder::new().level(1).build(Vec::new()).unwrap();
333        encoder.write_all(&expected).unwrap();
334        let encoded = finish_encode(encoder);
335
336        let mut decoder =
337            Decoder::new(ErrorWrapper::new(rnd.clone(), Cursor::new(encoded))).unwrap();
338        let mut actual = Vec::new();
339        loop {
340            let mut buffer = [0; BUFFER_SIZE];
341            if let Ok(size) = decoder.read(&mut buffer) {
342                if size == 0 {
343                    break;
344                }
345                actual.write_all(&buffer[0..size]).unwrap();
346            }
347        }
348
349        assert_eq!(expected, actual);
350        finish_decode(decoder);
351    }
352
353    /// Ensure that we emit the full decompressed stream even if we're
354    /// using a very small output buffer.
355    #[test]
356    fn issue_45() {
357        // create an encoder
358        let mut enc = crate::EncoderBuilder::new().build(Vec::new()).unwrap();
359
360        // write 'a' 100 times to the encoder
361        let text: Vec<u8> = vec![b'a'; 100];
362        enc.write_all(&text[..]).unwrap();
363
364        let encoded = finish_encode(enc);
365
366        // read from the decoder, buf_size bytes at a time
367        for buf_size in [5, 10, 15, 20, 25] {
368            let mut buf = vec![0; buf_size];
369
370            let mut total_bytes_read = 0;
371
372            // create a decoder wrapping the backing buffer
373            let mut dec = crate::Decoder::new(&encoded[..]).unwrap();
374            while let Ok(n) = dec.read(&mut buf[..]) {
375                if n == 0 {
376                    break;
377                }
378
379                total_bytes_read += n;
380            }
381
382            assert_eq!(total_bytes_read, text.len());
383        }
384    }
385
386    #[test]
387    fn truncated_frame_reports_unexpected_eof() {
388        let mut encoder = EncoderBuilder::new().level(1).build(Vec::new()).unwrap();
389        encoder.write_all(b"truncated frame").unwrap();
390        let (encoded, result) = encoder.finish();
391        result.unwrap();
392        for missing in [1, 4] {
393            let mut truncated = encoded.clone();
394            truncated.truncate(truncated.len() - missing);
395
396            let mut decoder = Decoder::new(Cursor::new(truncated)).unwrap();
397            let mut actual = Vec::new();
398            let err = decoder.read_to_end(&mut actual).unwrap_err();
399            assert_eq!(err.kind(), ErrorKind::UnexpectedEof);
400        }
401    }
402
403    fn random() -> StdRng {
404        let seed: [u8; 32] = [
405            157, 164, 190, 237, 231, 103, 60, 22, 197, 108, 51, 176, 30, 170, 155, 21, 163, 249,
406            56, 192, 57, 112, 142, 240, 233, 46, 51, 122, 222, 137, 225, 243,
407        ];
408
409        rand::SeedableRng::from_seed(seed)
410    }
411
412    fn random_stream<R: Rng>(rng: &mut R, size: usize) -> Vec<u8> {
413        (0..size).map(|_| rng.gen()).collect()
414    }
415
416    #[test]
417    fn test_decoder_send() {
418        fn check_send<S: Send>(_: &S) {}
419        let dec = Decoder::new(Cursor::new(Vec::new())).unwrap();
420        check_send(&dec);
421    }
422}