lz4/
decoder.rs

1use super::liblz4::*;
2use super::size_t;
3use std::io::{Error, ErrorKind, Read, Result};
4use std::ptr;
5
6const BUFFER_SIZE: usize = 32 * 1024;
7
8// NOTE: unsafe to device Clone or Copy, otherwise
9// there can be multiple copies of the same inner LZ4 pointer
10#[derive(Debug)]
11struct DecoderContext {
12    c: LZ4FDecompressionContext,
13}
14
15// NOTE: unsafe to derive Clone or Copy
16#[derive(Debug)]
17pub struct Decoder<R> {
18    c: DecoderContext,
19    r: R,
20    buf: Box<[u8]>,
21    pos: usize,
22    len: usize,
23    next: usize,
24}
25
26// No interior mutability, so Decoder is Sync as long as R is Sync.
27unsafe impl<R: Read + Sync> Sync for Decoder<R> {}
28
29impl<R: Read> Decoder<R> {
30    /// Creates a new decoder which reads its input from the given
31    /// input stream. The input stream can be re-acquired by calling
32    /// `finish()`
33    pub fn new(r: R) -> Result<Decoder<R>> {
34        Ok(Decoder {
35            r,
36            c: DecoderContext::new()?,
37            buf: vec![0; BUFFER_SIZE].into_boxed_slice(),
38            pos: BUFFER_SIZE,
39            len: BUFFER_SIZE,
40            // Minimal LZ4 stream size
41            next: 11,
42        })
43    }
44
45    /// Immutable reader reference.
46    pub fn reader(&self) -> &R {
47        &self.r
48    }
49
50    pub fn finish(self) -> (R, Result<()>) {
51        (
52            self.r,
53            match self.next {
54                0 => Ok(()),
55                _ => Err(Error::new(
56                    ErrorKind::Interrupted,
57                    "Finish runned before read end of compressed stream",
58                )),
59            },
60        )
61    }
62}
63
64impl<R: Read> Read for Decoder<R> {
65    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
66        if self.next == 0 || buf.is_empty() {
67            return Ok(0);
68        }
69        let mut dst_offset: usize = 0;
70        while dst_offset == 0 {
71            if self.pos >= self.len {
72                let need = if self.buf.len() < self.next {
73                    self.buf.len()
74                } else {
75                    self.next
76                };
77                self.len = self.r.read(&mut self.buf[0..need])?;
78                // NOTE: we do not exit here if there was nothing read
79                // The lz4 context may still have more bytes to emit.
80
81                self.pos = 0;
82                self.next -= self.len;
83            }
84            while (dst_offset < buf.len()) && ((self.pos < self.len) || self.len == 0) {
85                let mut src_size = (self.len - self.pos) as size_t;
86                let mut dst_size = (buf.len() - dst_offset) as size_t;
87                let len = check_error(unsafe {
88                    LZ4F_decompress(
89                        self.c.c,
90                        buf[dst_offset..].as_mut_ptr(),
91                        &mut dst_size,
92                        self.buf[self.pos..].as_ptr(),
93                        &mut src_size,
94                        ptr::null(),
95                    )
96                })?;
97                self.pos += src_size as usize;
98                dst_offset += dst_size as usize;
99
100                // We need to keep trying to read bytes from the decompressor
101                // until it is no longer emitting them, even after it
102                // has finished reading bytes.
103                if dst_size == 0 && src_size == 0 {
104                    return Ok(dst_offset);
105                }
106
107                if len == 0 {
108                    self.next = 0;
109                    return Ok(dst_offset);
110                } else if self.next < len {
111                    self.next = len;
112                }
113            }
114        }
115        Ok(dst_offset)
116    }
117}
118
119impl DecoderContext {
120    fn new() -> Result<DecoderContext> {
121        let mut context = LZ4FDecompressionContext(ptr::null_mut());
122        check_error(unsafe { LZ4F_createDecompressionContext(&mut context, LZ4F_VERSION) })?;
123        Ok(DecoderContext { c: context })
124    }
125}
126
127impl Drop for DecoderContext {
128    fn drop(&mut self) {
129        unsafe { LZ4F_freeDecompressionContext(self.c) };
130    }
131}
132
133#[cfg(test)]
134mod test {
135    extern crate rand;
136
137    use self::rand::rngs::StdRng;
138    use self::rand::Rng;
139    use super::super::encoder::{Encoder, EncoderBuilder};
140    use super::Decoder;
141    use std::io::{Cursor, Error, ErrorKind, Read, Result, Write};
142
143    const BUFFER_SIZE: usize = 64 * 1024;
144    const END_MARK: [u8; 4] = [0x9f, 0x77, 0x22, 0x71];
145
146    struct ErrorWrapper<R: Read, Rn: Rng> {
147        r: R,
148        rng: Rn,
149    }
150
151    impl<R: Read, Rn: Rng> ErrorWrapper<R, Rn> {
152        fn new(rng: Rn, read: R) -> Self {
153            ErrorWrapper { r: read, rng }
154        }
155    }
156
157    impl<R: Read, Rn: Rng> Read for ErrorWrapper<R, Rn> {
158        fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
159            if self.rng.next_u32() & 0x03 == 0 {
160                self.r.read(buf)
161            } else {
162                Err(Error::new(ErrorKind::Other, "Opss..."))
163            }
164        }
165    }
166
167    struct RetryWrapper<R: Read> {
168        r: R,
169    }
170
171    impl<R: Read> RetryWrapper<R> {
172        fn new(read: R) -> Self {
173            RetryWrapper { r: read }
174        }
175    }
176
177    impl<R: Read> Read for RetryWrapper<R> {
178        fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
179            loop {
180                match self.r.read(buf) {
181                    Ok(v) => {
182                        return Ok(v);
183                    }
184                    Err(e) => {
185                        if e.kind() == ErrorKind::Other {
186                            continue;
187                        }
188                        return Err(e);
189                    }
190                }
191            }
192        }
193    }
194
195    fn finish_encode<W: Write>(encoder: Encoder<W>) -> W {
196        let (mut buffer, result) = encoder.finish();
197        result.unwrap();
198        buffer.write(&END_MARK).unwrap();
199        buffer
200    }
201
202    fn finish_decode<R: Read>(decoder: Decoder<R>) {
203        let (buffer, result) = decoder.finish();
204        result.unwrap();
205
206        let mut mark = Vec::new();
207        let mut data = Vec::new();
208        mark.write(&END_MARK).unwrap();
209        RetryWrapper::new(buffer).read_to_end(&mut data).unwrap();
210        assert_eq!(mark, data);
211    }
212
213    #[test]
214    fn test_decoder_empty() {
215        let expected: Vec<u8> = Vec::new();
216        let buffer = finish_encode(EncoderBuilder::new().level(1).build(Vec::new()).unwrap());
217
218        let mut decoder = Decoder::new(Cursor::new(buffer)).unwrap();
219        let mut actual = Vec::new();
220
221        decoder.read_to_end(&mut actual).unwrap();
222        assert_eq!(expected, actual);
223        finish_decode(decoder);
224    }
225
226    #[test]
227    fn test_decoder_smallest() {
228        let expected: Vec<u8> = Vec::new();
229        let mut buffer = b"\x04\x22\x4d\x18\x40\x40\xc0\x00\x00\x00\x00".to_vec();
230        buffer.write(&END_MARK).unwrap();
231
232        let mut decoder = Decoder::new(Cursor::new(buffer)).unwrap();
233        let mut actual = Vec::new();
234
235        decoder.read_to_end(&mut actual).unwrap();
236        assert_eq!(expected, actual);
237        finish_decode(decoder);
238    }
239
240    #[test]
241    fn test_decoder_smoke() {
242        let mut encoder = EncoderBuilder::new().level(1).build(Vec::new()).unwrap();
243        let mut expected = Vec::new();
244        expected.write(b"Some data").unwrap();
245        encoder.write(&expected[..4]).unwrap();
246        encoder.write(&expected[4..]).unwrap();
247        let buffer = finish_encode(encoder);
248
249        let mut decoder = Decoder::new(Cursor::new(buffer)).unwrap();
250        let mut actual = Vec::new();
251
252        decoder.read_to_end(&mut actual).unwrap();
253        assert_eq!(expected, actual);
254        finish_decode(decoder);
255    }
256
257    #[test]
258    fn test_decoder_random() {
259        let mut rnd = random();
260        let expected = random_stream(&mut rnd, 1027 * 1023 * 7);
261        let mut encoder = EncoderBuilder::new().level(1).build(Vec::new()).unwrap();
262        encoder.write(&expected).unwrap();
263        let encoded = finish_encode(encoder);
264
265        let mut decoder = Decoder::new(Cursor::new(encoded)).unwrap();
266        let mut actual = Vec::new();
267        loop {
268            let mut buffer = [0; BUFFER_SIZE];
269            let size = decoder.read(&mut buffer).unwrap();
270            if size == 0 {
271                break;
272            }
273            actual.write(&buffer[0..size]).unwrap();
274        }
275        assert_eq!(expected, actual);
276        finish_decode(decoder);
277    }
278
279    #[test]
280    fn test_retry_read() {
281        let mut rnd = random();
282        let expected = random_stream(&mut rnd, 1027 * 1023 * 7);
283        let mut encoder = EncoderBuilder::new().level(1).build(Vec::new()).unwrap();
284        encoder.write(&expected).unwrap();
285        let encoded = finish_encode(encoder);
286
287        let mut decoder =
288            Decoder::new(ErrorWrapper::new(rnd.clone(), Cursor::new(encoded))).unwrap();
289        let mut actual = Vec::new();
290        loop {
291            let mut buffer = [0; BUFFER_SIZE];
292            match decoder.read(&mut buffer) {
293                Ok(size) => {
294                    if size == 0 {
295                        break;
296                    }
297                    actual.write(&buffer[0..size]).unwrap();
298                }
299                Err(_) => {}
300            }
301        }
302
303        assert_eq!(expected, actual);
304        finish_decode(decoder);
305    }
306
307    /// Ensure that we emit the full decompressed stream even if we're
308    /// using a very small output buffer.
309    #[test]
310    fn issue_45() {
311        // create an encoder
312        let mut enc = crate::EncoderBuilder::new().build(Vec::new()).unwrap();
313
314        // write 'a' 100 times to the encoder
315        let text: Vec<u8> = vec!['a' as u8; 100];
316        enc.write_all(&text[..]).unwrap();
317
318        // flush the encoder
319        enc.flush().unwrap();
320
321        // read from the decoder, buf_size bytes at a time
322        for buf_size in [5, 10, 15, 20, 25] {
323            let mut buf = vec![0; buf_size];
324
325            let mut total_bytes_read = 0;
326
327            // create a decoder wrapping the backing buffer
328            let mut dec = crate::Decoder::new(&enc.writer()[..]).unwrap();
329            while let Ok(n) = dec.read(&mut buf[..]) {
330                if n == 0 {
331                    break;
332                }
333
334                total_bytes_read += n;
335            }
336
337            assert_eq!(total_bytes_read, text.len());
338        }
339    }
340
341    fn random() -> StdRng {
342        let seed: [u8; 32] = [
343            157, 164, 190, 237, 231, 103, 60, 22, 197, 108, 51, 176, 30, 170, 155, 21, 163, 249,
344            56, 192, 57, 112, 142, 240, 233, 46, 51, 122, 222, 137, 225, 243,
345        ];
346
347        rand::SeedableRng::from_seed(seed)
348    }
349
350    fn random_stream<R: Rng>(rng: &mut R, size: usize) -> Vec<u8> {
351        (0..size).map(|_| rng.gen()).collect()
352    }
353
354    #[test]
355    fn test_decoder_send() {
356        fn check_send<S: Send>(_: &S) {}
357        let dec = Decoder::new(Cursor::new(Vec::new())).unwrap();
358        check_send(&dec);
359    }
360}