use crate::core::dictionary::Dictionary;
use crate::features::compression::CompressionAlgorithm;
use crate::features::hashing::HashAlgorithm;
use std::io::{Read, Write};
use super::hasher::{HasherWriter, create_hasher_writer};
const CHUNK_SIZE: usize = 4096;
pub struct StreamingEncoder<'a, W: Write> {
dictionary: &'a Dictionary,
writer: W,
compress_algo: Option<CompressionAlgorithm>,
compress_level: u32,
hash_algo: Option<HashAlgorithm>,
xxhash_config: crate::features::hashing::XxHashConfig,
}
impl<'a, W: Write> StreamingEncoder<'a, W> {
pub fn new(dictionary: &'a Dictionary, writer: W) -> Self {
StreamingEncoder {
dictionary,
writer,
compress_algo: None,
compress_level: 6,
hash_algo: None,
xxhash_config: crate::features::hashing::XxHashConfig::default(),
}
}
pub fn with_compression(mut self, algo: CompressionAlgorithm, level: u32) -> Self {
self.compress_algo = Some(algo);
self.compress_level = level;
self
}
pub fn with_hashing(mut self, algo: HashAlgorithm) -> Self {
self.hash_algo = Some(algo);
self
}
pub fn with_xxhash_config(mut self, config: crate::features::hashing::XxHashConfig) -> Self {
self.xxhash_config = config;
self
}
pub fn encode<R: Read>(&mut self, reader: &mut R) -> std::io::Result<Option<Vec<u8>>> {
if let Some(algo) = self.compress_algo {
return self.encode_with_compression(reader, algo);
}
let hash = match self.dictionary.mode() {
crate::core::config::EncodingMode::Chunked => self.encode_chunked(reader)?,
crate::core::config::EncodingMode::ByteRange => self.encode_byte_range(reader)?,
crate::core::config::EncodingMode::Radix => {
let mut buffer = Vec::new();
reader.read_to_end(&mut buffer)?;
let hash = self
.hash_algo
.map(|algo| crate::features::hashing::hash(&buffer, algo));
let encoded = crate::encoders::algorithms::radix::encode(&buffer, self.dictionary);
self.writer.write_all(encoded.as_bytes())?;
hash
}
};
Ok(hash)
}
fn encode_with_compression<R: Read>(
&mut self,
reader: &mut R,
algo: CompressionAlgorithm,
) -> std::io::Result<Option<Vec<u8>>> {
use std::io::Cursor;
let mut compressed_data = Vec::new();
let hash = self.compress_stream(reader, &mut compressed_data, algo)?;
let mut cursor = Cursor::new(compressed_data);
match self.dictionary.mode() {
crate::core::config::EncodingMode::Chunked => {
self.encode_chunked_no_hash(&mut cursor)?;
}
crate::core::config::EncodingMode::ByteRange => {
self.encode_byte_range_no_hash(&mut cursor)?;
}
crate::core::config::EncodingMode::Radix => {
let buffer = cursor.into_inner();
let encoded = crate::encoders::algorithms::radix::encode(&buffer, self.dictionary);
self.writer.write_all(encoded.as_bytes())?;
}
}
Ok(hash)
}
fn compress_stream<R: Read>(
&mut self,
reader: &mut R,
output: &mut Vec<u8>,
algo: CompressionAlgorithm,
) -> std::io::Result<Option<Vec<u8>>> {
use flate2::write::GzEncoder;
let hasher = self
.hash_algo
.map(|algo| create_hasher_writer(algo, &self.xxhash_config));
match algo {
CompressionAlgorithm::Gzip => {
let mut encoder =
GzEncoder::new(output, flate2::Compression::new(self.compress_level));
let hash = Self::copy_with_hash(reader, &mut encoder, hasher)?;
encoder.finish()?;
Ok(hash)
}
#[cfg(feature = "native-compression")]
CompressionAlgorithm::Zstd => {
let mut encoder =
zstd::stream::write::Encoder::new(output, self.compress_level as i32)
.map_err(std::io::Error::other)?;
let hash = Self::copy_with_hash(reader, &mut encoder, hasher)?;
encoder.finish()?;
Ok(hash)
}
#[cfg(all(feature = "wasm", not(feature = "native-compression")))]
CompressionAlgorithm::Zstd => {
Err(std::io::Error::other(
"Zstd compression not supported in WASM (ruzstd is decode-only)",
))
}
CompressionAlgorithm::Brotli => {
let mut encoder =
brotli::CompressorWriter::new(output, 4096, self.compress_level, 22);
let hash = Self::copy_with_hash(reader, &mut encoder, hasher)?;
Ok(hash)
}
#[cfg(feature = "native-compression")]
CompressionAlgorithm::Lzma => {
use xz2::write::XzEncoder;
let mut encoder = XzEncoder::new(output, self.compress_level);
let hash = Self::copy_with_hash(reader, &mut encoder, hasher)?;
encoder.finish()?;
Ok(hash)
}
#[cfg(all(feature = "wasm", not(feature = "native-compression")))]
CompressionAlgorithm::Lzma => {
let mut buffer = Vec::new();
reader.read_to_end(&mut buffer)?;
let hash = self
.hash_algo
.map(|algo| crate::features::hashing::hash(&buffer, algo));
use std::io::Cursor;
lzma_rs::lzma_compress(&mut Cursor::new(&buffer), output)
.map_err(std::io::Error::other)?;
Ok(hash)
}
CompressionAlgorithm::Lz4 | CompressionAlgorithm::Snappy => {
let mut buffer = Vec::new();
reader.read_to_end(&mut buffer)?;
let hash = self
.hash_algo
.map(|algo| crate::features::hashing::hash(&buffer, algo));
let compressed = match algo {
#[cfg(feature = "native-compression")]
CompressionAlgorithm::Lz4 => {
lz4::block::compress(&buffer, None, false).map_err(std::io::Error::other)?
}
#[cfg(all(feature = "wasm", not(feature = "native-compression")))]
CompressionAlgorithm::Lz4 => lz4_flex::compress_prepend_size(&buffer),
CompressionAlgorithm::Snappy => {
let mut encoder = snap::raw::Encoder::new();
encoder
.compress_vec(&buffer)
.map_err(std::io::Error::other)?
}
_ => unreachable!(),
};
output.extend_from_slice(&compressed);
Ok(hash)
}
}
}
fn copy_with_hash<R: Read>(
reader: &mut R,
writer: &mut impl Write,
mut hasher: Option<HasherWriter>,
) -> std::io::Result<Option<Vec<u8>>> {
let mut buffer = vec![0u8; CHUNK_SIZE];
loop {
let bytes_read = reader.read(&mut buffer)?;
if bytes_read == 0 {
break;
}
let chunk = &buffer[..bytes_read];
if let Some(ref mut h) = hasher {
h.update(chunk);
}
writer.write_all(chunk)?;
}
Ok(hasher.map(|h| h.finalize()))
}
fn encode_chunked<R: Read>(&mut self, reader: &mut R) -> std::io::Result<Option<Vec<u8>>> {
let base = self.dictionary.base();
let bits_per_char = (base as f64).log2() as usize;
let bytes_per_group = bits_per_char;
let aligned_chunk_size = (CHUNK_SIZE / bytes_per_group) * bytes_per_group;
let mut buffer = vec![0u8; aligned_chunk_size];
let mut hasher = self
.hash_algo
.map(|algo| create_hasher_writer(algo, &self.xxhash_config));
loop {
let bytes_read = reader.read(&mut buffer)?;
if bytes_read == 0 {
break;
}
let chunk = &buffer[..bytes_read];
if let Some(ref mut h) = hasher {
h.update(chunk);
}
let encoded =
crate::encoders::algorithms::chunked::encode_chunked(chunk, self.dictionary);
self.writer.write_all(encoded.as_bytes())?;
}
Ok(hasher.map(|h| h.finalize()))
}
fn encode_chunked_no_hash<R: Read>(&mut self, reader: &mut R) -> std::io::Result<()> {
let base = self.dictionary.base();
let bits_per_char = (base as f64).log2() as usize;
let bytes_per_group = bits_per_char;
let aligned_chunk_size = (CHUNK_SIZE / bytes_per_group) * bytes_per_group;
let mut buffer = vec![0u8; aligned_chunk_size];
loop {
let bytes_read = reader.read(&mut buffer)?;
if bytes_read == 0 {
break;
}
let encoded = crate::encoders::algorithms::chunked::encode_chunked(
&buffer[..bytes_read],
self.dictionary,
);
self.writer.write_all(encoded.as_bytes())?;
}
Ok(())
}
fn encode_byte_range<R: Read>(&mut self, reader: &mut R) -> std::io::Result<Option<Vec<u8>>> {
let mut buffer = vec![0u8; CHUNK_SIZE];
let mut hasher = self
.hash_algo
.map(|algo| create_hasher_writer(algo, &self.xxhash_config));
loop {
let bytes_read = reader.read(&mut buffer)?;
if bytes_read == 0 {
break;
}
let chunk = &buffer[..bytes_read];
if let Some(ref mut h) = hasher {
h.update(chunk);
}
let encoded =
crate::encoders::algorithms::byte_range::encode_byte_range(chunk, self.dictionary)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
self.writer.write_all(encoded.as_bytes())?;
}
Ok(hasher.map(|h| h.finalize()))
}
fn encode_byte_range_no_hash<R: Read>(&mut self, reader: &mut R) -> std::io::Result<()> {
let mut buffer = vec![0u8; CHUNK_SIZE];
loop {
let bytes_read = reader.read(&mut buffer)?;
if bytes_read == 0 {
break;
}
let encoded = crate::encoders::algorithms::byte_range::encode_byte_range(
&buffer[..bytes_read],
self.dictionary,
)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
self.writer.write_all(encoded.as_bytes())?;
}
Ok(())
}
}