zff/
io.rs

1// - STD
2use std::io::{Read, copy as io_copy};
3
4// - internal
5use crate::{
6    Result,
7    ZffError,
8    ZffErrorKind,
9    CompressionAlgorithm,
10    header::{CompressionHeader},
11
12};
13
14// - external
15use crc32fast::{Hasher as CRC32Hasher};
16
17// returns the buffer with the read bytes and the number of bytes which was read.
18pub(crate) fn buffer_chunk<R>(
19	input: &mut R,
20	chunk_size: usize,
21	) -> Result<(Vec<u8>, u64)> 
22where
23	R: Read
24{
25	let mut buf = vec![0u8; chunk_size];
26    let mut bytes_read = 0;
27
28    while bytes_read < chunk_size {
29        let r = match input.read(&mut buf[bytes_read..]) {
30        	Ok(r) => r,
31        	Err(e) => match e.kind() {
32        		std::io::ErrorKind::Interrupted => return Err(ZffError::new(ZffErrorKind::InterruptedInputStream, "")),
33        		_ => return Err(ZffError::from(e)),
34        	},
35        };
36        if r == 0 {
37            break;
38        }
39        bytes_read += r;
40    }
41
42    let buf = if bytes_read == chunk_size {
43        buf
44    } else {
45        buf[..bytes_read].to_vec()
46    };
47    Ok((buf, bytes_read as u64))
48}
49
50/// calculates a crc32 hash for the given bytes.
51pub fn calculate_crc32(buffer: &[u8]) -> u32 {
52    let mut crc32_hasher = CRC32Hasher::new();
53    crc32_hasher.update(buffer);
54    
55    crc32_hasher.finalize()
56}
57
58/// This function takes the buffered bytes and tries to compress them. If the compression rate is greater than the threshold value of the given
59/// [CompressionHeader], the function returns a tuple of compressed bytes and the flag, if the bytes was compressed or not.
60pub fn compress_buffer(buf: Vec<u8>, chunk_size: usize, compression_header: &CompressionHeader) -> Result<(Vec<u8>, bool)> {
61    let mut compression_flag = false;
62    let compression_threshold = compression_header.threshold();
63
64    match compression_header.algorithm() {
65        CompressionAlgorithm::None => Ok((buf, compression_flag)),
66        CompressionAlgorithm::Zstd => {
67            let compression_level = *compression_header.level() as i32;
68            let mut stream = zstd::stream::read::Encoder::new(buf.as_slice(), compression_level)?;
69            let (compressed_data, _) = buffer_chunk(&mut stream, chunk_size * *compression_header.level() as usize)?;
70            if (buf.len() as f32 / compressed_data.len() as f32) < compression_threshold {
71                Ok((buf, compression_flag))
72            } else {
73                compression_flag = true;
74                Ok((compressed_data, compression_flag))
75            }
76        },
77        CompressionAlgorithm::Lz4 => {
78            let buffer = Vec::new();
79            let mut compressor = lz4_flex::frame::FrameEncoder::new(buffer);
80            io_copy(&mut buf.as_slice(), &mut compressor)?;
81            let compressed_data = compressor.finish()?;
82            if (buf.len() as f32 / compressed_data.len() as f32) < compression_threshold {
83                Ok((buf, compression_flag))
84            } else {
85                compression_flag = true;
86                Ok((compressed_data, compression_flag))
87            }
88        }
89    }
90}