avro_schema/read/
decompress.rs1use std::io::Read;
3
4use fallible_streaming_iterator::FallibleStreamingIterator;
5
6use crate::error::Error;
7
8use crate::file::Compression;
9use crate::file::{Block, CompressedBlock};
10
11use super::block::CompressedBlockStreamingIterator;
12
13#[cfg(feature = "compression")]
14const CRC_TABLE: crc::Crc<u32> = crc::Crc::<u32>::new(&crc::CRC_32_ISO_HDLC);
15
16pub fn decompress_block(
19 block: &mut CompressedBlock,
20 decompressed: &mut Block,
21 compression: Option<Compression>,
22) -> Result<bool, Error> {
23 decompressed.number_of_rows = block.number_of_rows;
24 let block = &mut block.data;
25 let decompressed = &mut decompressed.data;
26
27 match compression {
28 None => {
29 std::mem::swap(block, decompressed);
30 Ok(true)
31 }
32 #[cfg(feature = "compression")]
33 Some(Compression::Deflate) => {
34 decompressed.clear();
35 let mut decoder = libflate::deflate::Decoder::new(&block[..]);
36 decoder.read_to_end(decompressed)?;
37 Ok(false)
38 }
39 #[cfg(feature = "compression")]
40 Some(Compression::Snappy) => {
41 let crc = &block[block.len() - 4..];
42 let block = &block[..block.len() - 4];
43
44 let len = snap::raw::decompress_len(block).map_err(|_| Error::OutOfSpec)?;
45 decompressed.clear();
46 decompressed.resize(len, 0);
47 snap::raw::Decoder::new()
48 .decompress(block, decompressed)
49 .map_err(|_| Error::OutOfSpec)?;
50
51 let expected_crc = u32::from_be_bytes([crc[0], crc[1], crc[2], crc[3]]);
52
53 let actual_crc = CRC_TABLE.checksum(decompressed);
54 if expected_crc != actual_crc {
55 return Err(Error::OutOfSpec);
56 }
57 Ok(false)
58 }
59 #[cfg(not(feature = "compression"))]
60 Some(Compression::Deflate) => Err(Error::RequiresCompression),
61 #[cfg(not(feature = "compression"))]
62 Some(Compression::Snappy) => Err(Error::RequiresCompression),
63 }
64}
65
66pub struct BlockStreamingIterator<R: Read> {
68 blocks: CompressedBlockStreamingIterator<R>,
69 compression: Option<Compression>,
70 buf: Block,
71 was_swapped: bool,
72}
73
74pub fn block_iterator<R: Read>(
76 reader: R,
77 compression: Option<Compression>,
78 marker: [u8; 16],
79) -> BlockStreamingIterator<R> {
80 BlockStreamingIterator::<R>::new(reader, compression, marker)
81}
82
83impl<R: Read> BlockStreamingIterator<R> {
84 pub fn new(reader: R, compression: Option<Compression>, marker: [u8; 16]) -> Self {
86 Self {
87 blocks: CompressedBlockStreamingIterator::new(reader, marker, vec![]),
88 compression,
89 buf: Block::new(0, vec![]),
90 was_swapped: false,
91 }
92 }
93
94 #[inline]
96 pub fn into_inner(self) -> R {
97 self.blocks.into_inner().0
98 }
99}
100
101impl<R: Read> FallibleStreamingIterator for BlockStreamingIterator<R> {
102 type Error = Error;
103 type Item = Block;
104
105 #[inline]
106 fn advance(&mut self) -> Result<(), Error> {
107 if self.was_swapped {
108 std::mem::swap(&mut self.blocks.buffer().data, &mut self.buf.data);
109 }
110 self.blocks.advance()?;
111 self.was_swapped = decompress_block(self.blocks.buffer(), &mut self.buf, self.compression)?;
112 Ok(())
113 }
114
115 #[inline]
116 fn get(&self) -> Option<&Self::Item> {
117 if self.buf.number_of_rows > 0 {
118 Some(&self.buf)
119 } else {
120 None
121 }
122 }
123}