1use super::liblz4::*;
15use super::size_t;
16use std::io::{Error, ErrorKind, Read, Result};
17use std::ptr;
18
19const BUFFER_SIZE: usize = 4 * 1024 * 1024 + 64 * 1024;
26
27#[derive(Debug)]
31struct DecoderContext {
32 c: LZ4FDecompressionContext,
33}
34
35#[derive(Debug)]
44pub struct Decoder<R> {
45 c: DecoderContext,
46 r: R,
47 buf: Box<[u8]>,
48 pos: usize,
49 len: usize,
50 next: usize,
51 drain_pending: bool,
52}
53
54unsafe impl<R: Read + Sync> Sync for Decoder<R> {}
56
57impl<R: Read> Decoder<R> {
58 pub fn new(r: R) -> Result<Decoder<R>> {
62 Ok(Decoder {
63 r,
64 c: DecoderContext::new()?,
65 buf: vec![0; BUFFER_SIZE].into_boxed_slice(),
66 pos: BUFFER_SIZE,
67 len: BUFFER_SIZE,
68 next: 11,
70 drain_pending: false,
71 })
72 }
73
74 pub fn reader(&self) -> &R {
76 &self.r
77 }
78
79 pub fn finish(self) -> (R, Result<()>) {
86 (
87 self.r,
88 match self.next {
89 0 => Ok(()),
90 _ => Err(Error::new(
91 ErrorKind::Interrupted,
92 "Finish runned before read end of compressed stream",
93 )),
94 },
95 )
96 }
97}
98
99impl<R: Read> Read for Decoder<R> {
100 fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
101 if self.next == 0 || buf.is_empty() {
102 return Ok(0);
103 }
104 let mut dst_offset: usize = 0;
105 while dst_offset == 0 {
106 if self.pos >= self.len {
107 if self.drain_pending {
108 self.len = 0;
109 self.drain_pending = false;
110 } else {
111 let need = if self.buf.len() < self.next {
112 self.buf.len()
113 } else {
114 self.next
115 };
116 self.len = self.r.read(&mut self.buf[0..need])?;
117 }
120 self.pos = 0;
121 self.next -= self.len;
122 }
123 while (dst_offset < buf.len()) && ((self.pos < self.len) || self.len == 0) {
124 let mut src_size = (self.len - self.pos) as size_t;
125 let mut dst_size = (buf.len() - dst_offset) as size_t;
126 let len = check_error(unsafe {
127 LZ4F_decompress(
128 self.c.c,
129 buf[dst_offset..].as_mut_ptr(),
130 &mut dst_size,
131 self.buf[self.pos..].as_ptr(),
132 &mut src_size,
133 ptr::null(),
134 )
135 })?;
136 self.pos += src_size as usize;
137 dst_offset += dst_size as usize;
138 self.drain_pending = len == 1 && dst_size > 0;
139
140 if dst_size == 0 && src_size == 0 {
144 if dst_offset > 0 {
145 return Ok(dst_offset);
146 }
147 if self.len == 0 && len != 0 {
148 return Err(Error::new(
149 ErrorKind::UnexpectedEof,
150 "unexpected end of compressed stream",
151 ));
152 }
153 return Ok(dst_offset);
154 }
155
156 if len == 0 {
157 self.next = 0;
158 return Ok(dst_offset);
159 } else if self.next < len {
160 self.next = len;
161 }
162 }
163 }
164 Ok(dst_offset)
165 }
166}
167
168impl DecoderContext {
169 fn new() -> Result<DecoderContext> {
170 let mut context = LZ4FDecompressionContext(ptr::null_mut());
171 check_error(unsafe { LZ4F_createDecompressionContext(&mut context, LZ4F_VERSION) })?;
172 Ok(DecoderContext { c: context })
173 }
174}
175
176impl Drop for DecoderContext {
177 fn drop(&mut self) {
178 unsafe { LZ4F_freeDecompressionContext(self.c) };
179 }
180}
181
182#[cfg(test)]
183mod test {
184 extern crate rand;
185
186 use self::rand::rngs::StdRng;
187 use self::rand::Rng;
188 use super::super::encoder::{Encoder, EncoderBuilder};
189 use super::Decoder;
190 use std::io::{Cursor, Error, ErrorKind, Read, Result, Write};
191
192 const BUFFER_SIZE: usize = 64 * 1024;
193 const END_MARK: [u8; 4] = [0x9f, 0x77, 0x22, 0x71];
194
195 struct ErrorWrapper<R: Read, Rn: Rng> {
196 r: R,
197 rng: Rn,
198 }
199
200 impl<R: Read, Rn: Rng> ErrorWrapper<R, Rn> {
201 fn new(rng: Rn, read: R) -> Self {
202 ErrorWrapper { r: read, rng }
203 }
204 }
205
206 impl<R: Read, Rn: Rng> Read for ErrorWrapper<R, Rn> {
207 fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
208 if self.rng.next_u32() & 0x03 == 0 {
209 self.r.read(buf)
210 } else {
211 Err(Error::other("Opss..."))
212 }
213 }
214 }
215
216 struct RetryWrapper<R: Read> {
217 r: R,
218 }
219
220 impl<R: Read> RetryWrapper<R> {
221 fn new(read: R) -> Self {
222 RetryWrapper { r: read }
223 }
224 }
225
226 impl<R: Read> Read for RetryWrapper<R> {
227 fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
228 loop {
229 match self.r.read(buf) {
230 Ok(v) => {
231 return Ok(v);
232 }
233 Err(e) => {
234 if e.kind() == ErrorKind::Other {
235 continue;
236 }
237 return Err(e);
238 }
239 }
240 }
241 }
242 }
243
244 fn finish_encode<W: Write>(encoder: Encoder<W>) -> W {
245 let (mut buffer, result) = encoder.finish();
246 result.unwrap();
247 buffer.write_all(&END_MARK).unwrap();
248 buffer
249 }
250
251 fn finish_decode<R: Read>(decoder: Decoder<R>) {
252 let (buffer, result) = decoder.finish();
253 result.unwrap();
254
255 let mut mark = Vec::new();
256 let mut data = Vec::new();
257 mark.write_all(&END_MARK).unwrap();
258 RetryWrapper::new(buffer).read_to_end(&mut data).unwrap();
259 assert_eq!(mark, data);
260 }
261
262 #[test]
263 fn test_decoder_empty() {
264 let expected: Vec<u8> = Vec::new();
265 let buffer = finish_encode(EncoderBuilder::new().level(1).build(Vec::new()).unwrap());
266
267 let mut decoder = Decoder::new(Cursor::new(buffer)).unwrap();
268 let mut actual = Vec::new();
269
270 decoder.read_to_end(&mut actual).unwrap();
271 assert_eq!(expected, actual);
272 finish_decode(decoder);
273 }
274
275 #[test]
276 fn test_decoder_smallest() {
277 let expected: Vec<u8> = Vec::new();
278 let mut buffer = b"\x04\x22\x4d\x18\x40\x40\xc0\x00\x00\x00\x00".to_vec();
279 buffer.write_all(&END_MARK).unwrap();
280
281 let mut decoder = Decoder::new(Cursor::new(buffer)).unwrap();
282 let mut actual = Vec::new();
283
284 decoder.read_to_end(&mut actual).unwrap();
285 assert_eq!(expected, actual);
286 finish_decode(decoder);
287 }
288
289 #[test]
290 fn test_decoder_smoke() {
291 let mut encoder = EncoderBuilder::new().level(1).build(Vec::new()).unwrap();
292 let mut expected = Vec::new();
293 expected.write_all(b"Some data").unwrap();
294 encoder.write_all(&expected[..4]).unwrap();
295 encoder.write_all(&expected[4..]).unwrap();
296 let buffer = finish_encode(encoder);
297
298 let mut decoder = Decoder::new(Cursor::new(buffer)).unwrap();
299 let mut actual = Vec::new();
300
301 decoder.read_to_end(&mut actual).unwrap();
302 assert_eq!(expected, actual);
303 finish_decode(decoder);
304 }
305
306 #[test]
307 fn test_decoder_random() {
308 let mut rnd = random();
309 let expected = random_stream(&mut rnd, 1027 * 1023 * 7);
310 let mut encoder = EncoderBuilder::new().level(1).build(Vec::new()).unwrap();
311 encoder.write_all(&expected).unwrap();
312 let encoded = finish_encode(encoder);
313
314 let mut decoder = Decoder::new(Cursor::new(encoded)).unwrap();
315 let mut actual = Vec::new();
316 loop {
317 let mut buffer = [0; BUFFER_SIZE];
318 let size = decoder.read(&mut buffer).unwrap();
319 if size == 0 {
320 break;
321 }
322 actual.write_all(&buffer[0..size]).unwrap();
323 }
324 assert_eq!(expected, actual);
325 finish_decode(decoder);
326 }
327
328 #[test]
329 fn test_retry_read() {
330 let mut rnd = random();
331 let expected = random_stream(&mut rnd, 1027 * 1023 * 7);
332 let mut encoder = EncoderBuilder::new().level(1).build(Vec::new()).unwrap();
333 encoder.write_all(&expected).unwrap();
334 let encoded = finish_encode(encoder);
335
336 let mut decoder =
337 Decoder::new(ErrorWrapper::new(rnd.clone(), Cursor::new(encoded))).unwrap();
338 let mut actual = Vec::new();
339 loop {
340 let mut buffer = [0; BUFFER_SIZE];
341 if let Ok(size) = decoder.read(&mut buffer) {
342 if size == 0 {
343 break;
344 }
345 actual.write_all(&buffer[0..size]).unwrap();
346 }
347 }
348
349 assert_eq!(expected, actual);
350 finish_decode(decoder);
351 }
352
353 #[test]
356 fn issue_45() {
357 let mut enc = crate::EncoderBuilder::new().build(Vec::new()).unwrap();
359
360 let text: Vec<u8> = vec![b'a'; 100];
362 enc.write_all(&text[..]).unwrap();
363
364 let encoded = finish_encode(enc);
365
366 for buf_size in [5, 10, 15, 20, 25] {
368 let mut buf = vec![0; buf_size];
369
370 let mut total_bytes_read = 0;
371
372 let mut dec = crate::Decoder::new(&encoded[..]).unwrap();
374 while let Ok(n) = dec.read(&mut buf[..]) {
375 if n == 0 {
376 break;
377 }
378
379 total_bytes_read += n;
380 }
381
382 assert_eq!(total_bytes_read, text.len());
383 }
384 }
385
386 #[test]
387 fn truncated_frame_reports_unexpected_eof() {
388 let mut encoder = EncoderBuilder::new().level(1).build(Vec::new()).unwrap();
389 encoder.write_all(b"truncated frame").unwrap();
390 let (encoded, result) = encoder.finish();
391 result.unwrap();
392 for missing in [1, 4] {
393 let mut truncated = encoded.clone();
394 truncated.truncate(truncated.len() - missing);
395
396 let mut decoder = Decoder::new(Cursor::new(truncated)).unwrap();
397 let mut actual = Vec::new();
398 let err = decoder.read_to_end(&mut actual).unwrap_err();
399 assert_eq!(err.kind(), ErrorKind::UnexpectedEof);
400 }
401 }
402
403 fn random() -> StdRng {
404 let seed: [u8; 32] = [
405 157, 164, 190, 237, 231, 103, 60, 22, 197, 108, 51, 176, 30, 170, 155, 21, 163, 249,
406 56, 192, 57, 112, 142, 240, 233, 46, 51, 122, 222, 137, 225, 243,
407 ];
408
409 rand::SeedableRng::from_seed(seed)
410 }
411
412 fn random_stream<R: Rng>(rng: &mut R, size: usize) -> Vec<u8> {
413 (0..size).map(|_| rng.gen()).collect()
414 }
415
416 #[test]
417 fn test_decoder_send() {
418 fn check_send<S: Send>(_: &S) {}
419 let dec = Decoder::new(Cursor::new(Vec::new())).unwrap();
420 check_send(&dec);
421 }
422}