avro_schema/read/
decompress.rs

1//! APIs to read from Avro format to arrow.
2use std::io::Read;
3
4use fallible_streaming_iterator::FallibleStreamingIterator;
5
6use crate::error::Error;
7
8use crate::file::Compression;
9use crate::file::{Block, CompressedBlock};
10
11use super::block::CompressedBlockStreamingIterator;
12
13#[cfg(feature = "compression")]
14const CRC_TABLE: crc::Crc<u32> = crc::Crc::<u32>::new(&crc::CRC_32_ISO_HDLC);
15
16/// Decompresses a [`CompressedBlock`] into [`Block`]
17/// Returns whether the buffers where swapped.
18pub fn decompress_block(
19    block: &mut CompressedBlock,
20    decompressed: &mut Block,
21    compression: Option<Compression>,
22) -> Result<bool, Error> {
23    decompressed.number_of_rows = block.number_of_rows;
24    let block = &mut block.data;
25    let decompressed = &mut decompressed.data;
26
27    match compression {
28        None => {
29            std::mem::swap(block, decompressed);
30            Ok(true)
31        }
32        #[cfg(feature = "compression")]
33        Some(Compression::Deflate) => {
34            decompressed.clear();
35            let mut decoder = libflate::deflate::Decoder::new(&block[..]);
36            decoder.read_to_end(decompressed)?;
37            Ok(false)
38        }
39        #[cfg(feature = "compression")]
40        Some(Compression::Snappy) => {
41            let crc = &block[block.len() - 4..];
42            let block = &block[..block.len() - 4];
43
44            let len = snap::raw::decompress_len(block).map_err(|_| Error::OutOfSpec)?;
45            decompressed.clear();
46            decompressed.resize(len, 0);
47            snap::raw::Decoder::new()
48                .decompress(block, decompressed)
49                .map_err(|_| Error::OutOfSpec)?;
50
51            let expected_crc = u32::from_be_bytes([crc[0], crc[1], crc[2], crc[3]]);
52
53            let actual_crc = CRC_TABLE.checksum(decompressed);
54            if expected_crc != actual_crc {
55                return Err(Error::OutOfSpec);
56            }
57            Ok(false)
58        }
59        #[cfg(not(feature = "compression"))]
60        Some(Compression::Deflate) => Err(Error::RequiresCompression),
61        #[cfg(not(feature = "compression"))]
62        Some(Compression::Snappy) => Err(Error::RequiresCompression),
63    }
64}
65
66/// [`FallibleStreamingIterator`] of decompressed [`Block`]
67pub struct BlockStreamingIterator<R: Read> {
68    blocks: CompressedBlockStreamingIterator<R>,
69    compression: Option<Compression>,
70    buf: Block,
71    was_swapped: bool,
72}
73
74/// Returns a [`FallibleStreamingIterator`] of [`Block`].
75pub fn block_iterator<R: Read>(
76    reader: R,
77    compression: Option<Compression>,
78    marker: [u8; 16],
79) -> BlockStreamingIterator<R> {
80    BlockStreamingIterator::<R>::new(reader, compression, marker)
81}
82
83impl<R: Read> BlockStreamingIterator<R> {
84    /// Returns a new [`BlockStreamingIterator`].
85    pub fn new(reader: R, compression: Option<Compression>, marker: [u8; 16]) -> Self {
86        Self {
87            blocks: CompressedBlockStreamingIterator::new(reader, marker, vec![]),
88            compression,
89            buf: Block::new(0, vec![]),
90            was_swapped: false,
91        }
92    }
93
94    /// Deconstructs itself into its internal reader
95    #[inline]
96    pub fn into_inner(self) -> R {
97        self.blocks.into_inner().0
98    }
99}
100
101impl<R: Read> FallibleStreamingIterator for BlockStreamingIterator<R> {
102    type Error = Error;
103    type Item = Block;
104
105    #[inline]
106    fn advance(&mut self) -> Result<(), Error> {
107        if self.was_swapped {
108            std::mem::swap(&mut self.blocks.buffer().data, &mut self.buf.data);
109        }
110        self.blocks.advance()?;
111        self.was_swapped = decompress_block(self.blocks.buffer(), &mut self.buf, self.compression)?;
112        Ok(())
113    }
114
115    #[inline]
116    fn get(&self) -> Option<&Self::Item> {
117        if self.buf.number_of_rows > 0 {
118            Some(&self.buf)
119        } else {
120            None
121        }
122    }
123}