serde_avro/de/
read.rs

1use super::util;
2use byteorder;
3use crc;
4use error::{self, ErrorKind};
5use flate2;
6use snap;
7use std::io;
8
9pub trait Limit {
10    fn take_limit(&mut self) -> io::Result<bool>;
11}
12
13pub struct Direct<R> {
14    input: R,
15    remaining: usize,
16}
17
18pub enum Codec {
19    Null,
20    Deflate,
21    Snappy,
22}
23
24pub struct Blocks<R>
25    where R: io::Read
26{
27    input: R,
28    codec: Codec,
29    sync_marker: Vec<u8>,
30    current_block: io::Cursor<Vec<u8>>,
31    remaining: usize,
32}
33
34impl Codec {
35    pub fn parse(codec: Option<&[u8]>) -> error::Result<Codec> {
36        match codec {
37            None | Some(b"null") => Ok(Codec::Null),
38            Some(b"deflate") => Ok(Codec::Deflate),
39            Some(b"snappy") => Ok(Codec::Snappy),
40            Some(codec) => {
41                Err(ErrorKind::UnsupportedCodec(String::from_utf8_lossy(codec).into_owned()).into())
42            },
43        }
44    }
45}
46
47impl<R> Direct<R>
48    where R: io::Read
49{
50    pub fn new(input: R, remaining: usize) -> Direct<R> {
51        Direct {
52            input: input,
53            remaining: remaining,
54        }
55    }
56}
57
58impl<R> io::Read for Direct<R>
59    where R: io::Read
60{
61    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
62        self.input.read(buf)
63    }
64}
65
66impl<R> Limit for Direct<R>
67    where R: io::Read
68{
69    fn take_limit(&mut self) -> io::Result<bool> {
70        if self.remaining == 0 {
71            Ok(false)
72        } else {
73            self.remaining -= 1;
74            Ok(true)
75        }
76    }
77}
78
79impl<R> Blocks<R>
80    where R: io::Read
81{
82    pub fn new(input: R, codec: Codec, sync_marker: Vec<u8>) -> Blocks<R> {
83        Blocks {
84            input: input,
85            codec: codec,
86            sync_marker: sync_marker,
87            current_block: io::Cursor::new(Vec::new()),
88            remaining: 0,
89        }
90    }
91
92    fn read_next_block(&mut self) -> io::Result<()> {
93        use std::io::Read;
94
95        // Make sure the current block is empty
96        assert_eq!(0, self.remaining);
97        let pos = self.current_block.position() as usize;
98        let len = self.current_block.get_ref().len();
99        assert_eq!(pos, len);
100
101        self.current_block.set_position(0);
102        let mut buffer = self.current_block.get_mut();
103        buffer.clear();
104
105        let obj_count = match util::read_long(&mut self.input) {
106            Ok(v) => v,
107            Err(ref e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(()),
108            Err(e) => return Err(e),
109        };
110
111        let compressed_size = util::read_long(&mut self.input)?;
112        debug!("Loading block with compressed size {} containing {} objects",
113               compressed_size,
114               obj_count);
115
116        match self.codec {
117            Codec::Null => {
118                debug!("Copying block data with null codec");
119                let mut limited = (&mut self.input).take(compressed_size as u64);
120                buffer.reserve(compressed_size as usize);
121                limited.read_to_end(buffer)?;
122            },
123            Codec::Deflate => {
124                debug!("Copying block data with deflate codec");
125                let limited = (&mut self.input).take(compressed_size as u64);
126                let mut reader = flate2::read::DeflateDecoder::new(limited);
127                reader.read_to_end(buffer)?;
128            },
129            Codec::Snappy => {
130                use byteorder::ByteOrder;
131
132                debug!("Copying block data with snappy codec");
133                let mut compressed = vec![0; compressed_size as usize - 4];
134                self.input.read_exact(&mut compressed)?;
135                let decompressed_size = snap::decompress_len(&compressed)?;
136                debug!("Decompressed block is expected to be {} bytes",
137                       decompressed_size);
138                buffer.resize(decompressed_size, 0);
139                snap::Decoder::new().decompress(&compressed, buffer)?;
140
141                let mut crc_buffer = [0; 4];
142                self.input.read_exact(&mut crc_buffer)?;
143                let expected_crc = byteorder::BigEndian::read_u32(&crc_buffer);
144                let actual_crc = crc::crc32::checksum_ieee(&buffer);
145
146                if expected_crc != actual_crc {
147                    let m = format!("bad CRC32; expected {:x} but got {:x}",
148                                    expected_crc,
149                                    actual_crc);
150                    return Err(io::Error::new(io::ErrorKind::InvalidInput, m));
151                }
152            },
153        }
154        debug!("Uncompressed block contains {} bytes", buffer.len());
155
156        let mut sync_marker = vec![0; 16];
157        self.input.read_exact(&mut sync_marker)?;
158
159        self.remaining = obj_count as usize;
160
161        if self.sync_marker != sync_marker {
162            Err(io::Error::new(io::ErrorKind::InvalidInput, "bad sync marker"))
163        } else {
164            Ok(())
165        }
166    }
167}
168
169impl<R> io::Read for Blocks<R>
170    where R: io::Read
171{
172    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
173        self.current_block.read(buf)
174    }
175}
176
177impl<R> Limit for Blocks<R>
178    where R: io::Read
179{
180    fn take_limit(&mut self) -> io::Result<bool> {
181        if self.remaining == 0 {
182            self.read_next_block()?;
183        }
184
185        if self.remaining == 0 {
186            Ok(false)
187        } else {
188            self.remaining -= 1;
189            Ok(true)
190        }
191    }
192}