avro_schema/write/
compression.rs

1//! APIs to read from Avro format to arrow.
2
3use crate::error::Error;
4
5use crate::file::{Block, CompressedBlock, Compression};
6
7#[cfg(feature = "compression")]
8const CRC_TABLE: crc::Crc<u32> = crc::Crc::<u32>::new(&crc::CRC_32_ISO_HDLC);
9
10/// Compresses a [`Block`] to a [`CompressedBlock`].
11pub fn compress(
12    block: &mut Block,
13    compressed: &mut CompressedBlock,
14    compression: Option<Compression>,
15) -> Result<bool, Error> {
16    compressed.number_of_rows = block.number_of_rows;
17    let block = &mut block.data;
18    let compressed = &mut compressed.data;
19
20    match compression {
21        None => {
22            std::mem::swap(block, compressed);
23            Ok(true)
24        }
25        #[cfg(feature = "compression")]
26        Some(Compression::Deflate) => {
27            use std::io::Write;
28            compressed.clear();
29            let mut encoder = libflate::deflate::Encoder::new(compressed);
30            encoder.write_all(block)?;
31            encoder.finish();
32            Ok(false)
33        }
34        #[cfg(feature = "compression")]
35        Some(Compression::Snappy) => {
36            use snap::raw::{max_compress_len, Encoder};
37
38            compressed.clear();
39
40            let required_len = max_compress_len(block.len());
41            compressed.resize(required_len, 0);
42            let compressed_bytes = Encoder::new()
43                .compress(block, compressed)
44                .map_err(|_| Error::OutOfSpec)?;
45            compressed.truncate(compressed_bytes);
46
47            compressed.extend(CRC_TABLE.checksum(block).to_be_bytes());
48            Ok(false)
49        }
50        #[cfg(not(feature = "compression"))]
51        Some(Compression::Deflate) => Err(Error::RequiresCompression),
52        #[cfg(not(feature = "compression"))]
53        Some(Compression::Snappy) => Err(Error::RequiresCompression),
54    }
55}