1use super::liblz4::*;
2use super::size_t;
3use std::io::{Error, ErrorKind, Read, Result};
4use std::ptr;
5
6const BUFFER_SIZE: usize = 32 * 1024;
7
8#[derive(Debug)]
11struct DecoderContext {
12 c: LZ4FDecompressionContext,
13}
14
15#[derive(Debug)]
17pub struct Decoder<R> {
18 c: DecoderContext,
19 r: R,
20 buf: Box<[u8]>,
21 pos: usize,
22 len: usize,
23 next: usize,
24}
25
26unsafe impl<R: Read + Sync> Sync for Decoder<R> {}
28
29impl<R: Read> Decoder<R> {
30 pub fn new(r: R) -> Result<Decoder<R>> {
34 Ok(Decoder {
35 r,
36 c: DecoderContext::new()?,
37 buf: vec![0; BUFFER_SIZE].into_boxed_slice(),
38 pos: BUFFER_SIZE,
39 len: BUFFER_SIZE,
40 next: 11,
42 })
43 }
44
45 pub fn reader(&self) -> &R {
47 &self.r
48 }
49
50 pub fn finish(self) -> (R, Result<()>) {
51 (
52 self.r,
53 match self.next {
54 0 => Ok(()),
55 _ => Err(Error::new(
56 ErrorKind::Interrupted,
57 "Finish runned before read end of compressed stream",
58 )),
59 },
60 )
61 }
62}
63
64impl<R: Read> Read for Decoder<R> {
65 fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
66 if self.next == 0 || buf.is_empty() {
67 return Ok(0);
68 }
69 let mut dst_offset: usize = 0;
70 while dst_offset == 0 {
71 if self.pos >= self.len {
72 let need = if self.buf.len() < self.next {
73 self.buf.len()
74 } else {
75 self.next
76 };
77 self.len = self.r.read(&mut self.buf[0..need])?;
78 self.pos = 0;
82 self.next -= self.len;
83 }
84 while (dst_offset < buf.len()) && ((self.pos < self.len) || self.len == 0) {
85 let mut src_size = (self.len - self.pos) as size_t;
86 let mut dst_size = (buf.len() - dst_offset) as size_t;
87 let len = check_error(unsafe {
88 LZ4F_decompress(
89 self.c.c,
90 buf[dst_offset..].as_mut_ptr(),
91 &mut dst_size,
92 self.buf[self.pos..].as_ptr(),
93 &mut src_size,
94 ptr::null(),
95 )
96 })?;
97 self.pos += src_size as usize;
98 dst_offset += dst_size as usize;
99
100 if dst_size == 0 && src_size == 0 {
104 return Ok(dst_offset);
105 }
106
107 if len == 0 {
108 self.next = 0;
109 return Ok(dst_offset);
110 } else if self.next < len {
111 self.next = len;
112 }
113 }
114 }
115 Ok(dst_offset)
116 }
117}
118
119impl DecoderContext {
120 fn new() -> Result<DecoderContext> {
121 let mut context = LZ4FDecompressionContext(ptr::null_mut());
122 check_error(unsafe { LZ4F_createDecompressionContext(&mut context, LZ4F_VERSION) })?;
123 Ok(DecoderContext { c: context })
124 }
125}
126
127impl Drop for DecoderContext {
128 fn drop(&mut self) {
129 unsafe { LZ4F_freeDecompressionContext(self.c) };
130 }
131}
132
133#[cfg(test)]
134mod test {
135 extern crate rand;
136
137 use self::rand::rngs::StdRng;
138 use self::rand::Rng;
139 use super::super::encoder::{Encoder, EncoderBuilder};
140 use super::Decoder;
141 use std::io::{Cursor, Error, ErrorKind, Read, Result, Write};
142
143 const BUFFER_SIZE: usize = 64 * 1024;
144 const END_MARK: [u8; 4] = [0x9f, 0x77, 0x22, 0x71];
145
146 struct ErrorWrapper<R: Read, Rn: Rng> {
147 r: R,
148 rng: Rn,
149 }
150
151 impl<R: Read, Rn: Rng> ErrorWrapper<R, Rn> {
152 fn new(rng: Rn, read: R) -> Self {
153 ErrorWrapper { r: read, rng }
154 }
155 }
156
157 impl<R: Read, Rn: Rng> Read for ErrorWrapper<R, Rn> {
158 fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
159 if self.rng.next_u32() & 0x03 == 0 {
160 self.r.read(buf)
161 } else {
162 Err(Error::new(ErrorKind::Other, "Opss..."))
163 }
164 }
165 }
166
167 struct RetryWrapper<R: Read> {
168 r: R,
169 }
170
171 impl<R: Read> RetryWrapper<R> {
172 fn new(read: R) -> Self {
173 RetryWrapper { r: read }
174 }
175 }
176
177 impl<R: Read> Read for RetryWrapper<R> {
178 fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
179 loop {
180 match self.r.read(buf) {
181 Ok(v) => {
182 return Ok(v);
183 }
184 Err(e) => {
185 if e.kind() == ErrorKind::Other {
186 continue;
187 }
188 return Err(e);
189 }
190 }
191 }
192 }
193 }
194
195 fn finish_encode<W: Write>(encoder: Encoder<W>) -> W {
196 let (mut buffer, result) = encoder.finish();
197 result.unwrap();
198 buffer.write(&END_MARK).unwrap();
199 buffer
200 }
201
202 fn finish_decode<R: Read>(decoder: Decoder<R>) {
203 let (buffer, result) = decoder.finish();
204 result.unwrap();
205
206 let mut mark = Vec::new();
207 let mut data = Vec::new();
208 mark.write(&END_MARK).unwrap();
209 RetryWrapper::new(buffer).read_to_end(&mut data).unwrap();
210 assert_eq!(mark, data);
211 }
212
213 #[test]
214 fn test_decoder_empty() {
215 let expected: Vec<u8> = Vec::new();
216 let buffer = finish_encode(EncoderBuilder::new().level(1).build(Vec::new()).unwrap());
217
218 let mut decoder = Decoder::new(Cursor::new(buffer)).unwrap();
219 let mut actual = Vec::new();
220
221 decoder.read_to_end(&mut actual).unwrap();
222 assert_eq!(expected, actual);
223 finish_decode(decoder);
224 }
225
226 #[test]
227 fn test_decoder_smallest() {
228 let expected: Vec<u8> = Vec::new();
229 let mut buffer = b"\x04\x22\x4d\x18\x40\x40\xc0\x00\x00\x00\x00".to_vec();
230 buffer.write(&END_MARK).unwrap();
231
232 let mut decoder = Decoder::new(Cursor::new(buffer)).unwrap();
233 let mut actual = Vec::new();
234
235 decoder.read_to_end(&mut actual).unwrap();
236 assert_eq!(expected, actual);
237 finish_decode(decoder);
238 }
239
240 #[test]
241 fn test_decoder_smoke() {
242 let mut encoder = EncoderBuilder::new().level(1).build(Vec::new()).unwrap();
243 let mut expected = Vec::new();
244 expected.write(b"Some data").unwrap();
245 encoder.write(&expected[..4]).unwrap();
246 encoder.write(&expected[4..]).unwrap();
247 let buffer = finish_encode(encoder);
248
249 let mut decoder = Decoder::new(Cursor::new(buffer)).unwrap();
250 let mut actual = Vec::new();
251
252 decoder.read_to_end(&mut actual).unwrap();
253 assert_eq!(expected, actual);
254 finish_decode(decoder);
255 }
256
257 #[test]
258 fn test_decoder_random() {
259 let mut rnd = random();
260 let expected = random_stream(&mut rnd, 1027 * 1023 * 7);
261 let mut encoder = EncoderBuilder::new().level(1).build(Vec::new()).unwrap();
262 encoder.write(&expected).unwrap();
263 let encoded = finish_encode(encoder);
264
265 let mut decoder = Decoder::new(Cursor::new(encoded)).unwrap();
266 let mut actual = Vec::new();
267 loop {
268 let mut buffer = [0; BUFFER_SIZE];
269 let size = decoder.read(&mut buffer).unwrap();
270 if size == 0 {
271 break;
272 }
273 actual.write(&buffer[0..size]).unwrap();
274 }
275 assert_eq!(expected, actual);
276 finish_decode(decoder);
277 }
278
279 #[test]
280 fn test_retry_read() {
281 let mut rnd = random();
282 let expected = random_stream(&mut rnd, 1027 * 1023 * 7);
283 let mut encoder = EncoderBuilder::new().level(1).build(Vec::new()).unwrap();
284 encoder.write(&expected).unwrap();
285 let encoded = finish_encode(encoder);
286
287 let mut decoder =
288 Decoder::new(ErrorWrapper::new(rnd.clone(), Cursor::new(encoded))).unwrap();
289 let mut actual = Vec::new();
290 loop {
291 let mut buffer = [0; BUFFER_SIZE];
292 match decoder.read(&mut buffer) {
293 Ok(size) => {
294 if size == 0 {
295 break;
296 }
297 actual.write(&buffer[0..size]).unwrap();
298 }
299 Err(_) => {}
300 }
301 }
302
303 assert_eq!(expected, actual);
304 finish_decode(decoder);
305 }
306
307 #[test]
310 fn issue_45() {
311 let mut enc = crate::EncoderBuilder::new().build(Vec::new()).unwrap();
313
314 let text: Vec<u8> = vec!['a' as u8; 100];
316 enc.write_all(&text[..]).unwrap();
317
318 enc.flush().unwrap();
320
321 for buf_size in [5, 10, 15, 20, 25] {
323 let mut buf = vec![0; buf_size];
324
325 let mut total_bytes_read = 0;
326
327 let mut dec = crate::Decoder::new(&enc.writer()[..]).unwrap();
329 while let Ok(n) = dec.read(&mut buf[..]) {
330 if n == 0 {
331 break;
332 }
333
334 total_bytes_read += n;
335 }
336
337 assert_eq!(total_bytes_read, text.len());
338 }
339 }
340
341 fn random() -> StdRng {
342 let seed: [u8; 32] = [
343 157, 164, 190, 237, 231, 103, 60, 22, 197, 108, 51, 176, 30, 170, 155, 21, 163, 249,
344 56, 192, 57, 112, 142, 240, 233, 46, 51, 122, 222, 137, 225, 243,
345 ];
346
347 rand::SeedableRng::from_seed(seed)
348 }
349
350 fn random_stream<R: Rng>(rng: &mut R, size: usize) -> Vec<u8> {
351 (0..size).map(|_| rng.gen()).collect()
352 }
353
354 #[test]
355 fn test_decoder_send() {
356 fn check_send<S: Send>(_: &S) {}
357 let dec = Decoder::new(Cursor::new(Vec::new())).unwrap();
358 check_send(&dec);
359 }
360}