1use super::util;
2use byteorder;
3use crc;
4use error::{self, ErrorKind};
5use flate2;
6use snap;
7use std::io;
8
9pub trait Limit {
10 fn take_limit(&mut self) -> io::Result<bool>;
11}
12
13pub struct Direct<R> {
14 input: R,
15 remaining: usize,
16}
17
18pub enum Codec {
19 Null,
20 Deflate,
21 Snappy,
22}
23
24pub struct Blocks<R>
25 where R: io::Read
26{
27 input: R,
28 codec: Codec,
29 sync_marker: Vec<u8>,
30 current_block: io::Cursor<Vec<u8>>,
31 remaining: usize,
32}
33
34impl Codec {
35 pub fn parse(codec: Option<&[u8]>) -> error::Result<Codec> {
36 match codec {
37 None | Some(b"null") => Ok(Codec::Null),
38 Some(b"deflate") => Ok(Codec::Deflate),
39 Some(b"snappy") => Ok(Codec::Snappy),
40 Some(codec) => {
41 Err(ErrorKind::UnsupportedCodec(String::from_utf8_lossy(codec).into_owned()).into())
42 },
43 }
44 }
45}
46
47impl<R> Direct<R>
48 where R: io::Read
49{
50 pub fn new(input: R, remaining: usize) -> Direct<R> {
51 Direct {
52 input: input,
53 remaining: remaining,
54 }
55 }
56}
57
58impl<R> io::Read for Direct<R>
59 where R: io::Read
60{
61 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
62 self.input.read(buf)
63 }
64}
65
66impl<R> Limit for Direct<R>
67 where R: io::Read
68{
69 fn take_limit(&mut self) -> io::Result<bool> {
70 if self.remaining == 0 {
71 Ok(false)
72 } else {
73 self.remaining -= 1;
74 Ok(true)
75 }
76 }
77}
78
79impl<R> Blocks<R>
80 where R: io::Read
81{
82 pub fn new(input: R, codec: Codec, sync_marker: Vec<u8>) -> Blocks<R> {
83 Blocks {
84 input: input,
85 codec: codec,
86 sync_marker: sync_marker,
87 current_block: io::Cursor::new(Vec::new()),
88 remaining: 0,
89 }
90 }
91
92 fn read_next_block(&mut self) -> io::Result<()> {
93 use std::io::Read;
94
95 assert_eq!(0, self.remaining);
97 let pos = self.current_block.position() as usize;
98 let len = self.current_block.get_ref().len();
99 assert_eq!(pos, len);
100
101 self.current_block.set_position(0);
102 let mut buffer = self.current_block.get_mut();
103 buffer.clear();
104
105 let obj_count = match util::read_long(&mut self.input) {
106 Ok(v) => v,
107 Err(ref e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(()),
108 Err(e) => return Err(e),
109 };
110
111 let compressed_size = util::read_long(&mut self.input)?;
112 debug!("Loading block with compressed size {} containing {} objects",
113 compressed_size,
114 obj_count);
115
116 match self.codec {
117 Codec::Null => {
118 debug!("Copying block data with null codec");
119 let mut limited = (&mut self.input).take(compressed_size as u64);
120 buffer.reserve(compressed_size as usize);
121 limited.read_to_end(buffer)?;
122 },
123 Codec::Deflate => {
124 debug!("Copying block data with deflate codec");
125 let limited = (&mut self.input).take(compressed_size as u64);
126 let mut reader = flate2::read::DeflateDecoder::new(limited);
127 reader.read_to_end(buffer)?;
128 },
129 Codec::Snappy => {
130 use byteorder::ByteOrder;
131
132 debug!("Copying block data with snappy codec");
133 let mut compressed = vec![0; compressed_size as usize - 4];
134 self.input.read_exact(&mut compressed)?;
135 let decompressed_size = snap::decompress_len(&compressed)?;
136 debug!("Decompressed block is expected to be {} bytes",
137 decompressed_size);
138 buffer.resize(decompressed_size, 0);
139 snap::Decoder::new().decompress(&compressed, buffer)?;
140
141 let mut crc_buffer = [0; 4];
142 self.input.read_exact(&mut crc_buffer)?;
143 let expected_crc = byteorder::BigEndian::read_u32(&crc_buffer);
144 let actual_crc = crc::crc32::checksum_ieee(&buffer);
145
146 if expected_crc != actual_crc {
147 let m = format!("bad CRC32; expected {:x} but got {:x}",
148 expected_crc,
149 actual_crc);
150 return Err(io::Error::new(io::ErrorKind::InvalidInput, m));
151 }
152 },
153 }
154 debug!("Uncompressed block contains {} bytes", buffer.len());
155
156 let mut sync_marker = vec![0; 16];
157 self.input.read_exact(&mut sync_marker)?;
158
159 self.remaining = obj_count as usize;
160
161 if self.sync_marker != sync_marker {
162 Err(io::Error::new(io::ErrorKind::InvalidInput, "bad sync marker"))
163 } else {
164 Ok(())
165 }
166 }
167}
168
169impl<R> io::Read for Blocks<R>
170 where R: io::Read
171{
172 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
173 self.current_block.read(buf)
174 }
175}
176
177impl<R> Limit for Blocks<R>
178 where R: io::Read
179{
180 fn take_limit(&mut self) -> io::Result<bool> {
181 if self.remaining == 0 {
182 self.read_next_block()?;
183 }
184
185 if self.remaining == 0 {
186 Ok(false)
187 } else {
188 self.remaining -= 1;
189 Ok(true)
190 }
191 }
192}