base_d/encoders/streaming/
encoder.rs

1use crate::core::dictionary::Dictionary;
2use crate::features::compression::CompressionAlgorithm;
3use crate::features::hashing::HashAlgorithm;
4use std::io::{Read, Write};
5
6use super::hasher::{create_hasher_writer, HasherWriter};
7
8const CHUNK_SIZE: usize = 4096; // 4KB chunks
9
10/// Streaming encoder for processing large amounts of data efficiently.
11///
12/// Processes data in chunks to avoid loading entire files into memory.
13/// Suitable for encoding large files or network streams.
14/// Supports optional compression and hashing during encoding.
15pub struct StreamingEncoder<'a, W: Write> {
16    dictionary: &'a Dictionary,
17    writer: W,
18    compress_algo: Option<CompressionAlgorithm>,
19    compress_level: u32,
20    hash_algo: Option<HashAlgorithm>,
21    xxhash_config: crate::features::hashing::XxHashConfig,
22}
23
24impl<'a, W: Write> StreamingEncoder<'a, W> {
25    /// Creates a new streaming encoder.
26    ///
27    /// # Arguments
28    ///
29    /// * `dictionary` - The dictionary to use for encoding
30    /// * `writer` - The destination for encoded output
31    pub fn new(dictionary: &'a Dictionary, writer: W) -> Self {
32        StreamingEncoder {
33            dictionary,
34            writer,
35            compress_algo: None,
36            compress_level: 6,
37            hash_algo: None,
38            xxhash_config: crate::features::hashing::XxHashConfig::default(),
39        }
40    }
41
42    /// Sets compression algorithm and level.
43    pub fn with_compression(mut self, algo: CompressionAlgorithm, level: u32) -> Self {
44        self.compress_algo = Some(algo);
45        self.compress_level = level;
46        self
47    }
48
49    /// Sets hash algorithm for computing hash during encoding.
50    pub fn with_hashing(mut self, algo: HashAlgorithm) -> Self {
51        self.hash_algo = Some(algo);
52        self
53    }
54
55    /// Sets xxHash configuration (seed and secret).
56    pub fn with_xxhash_config(mut self, config: crate::features::hashing::XxHashConfig) -> Self {
57        self.xxhash_config = config;
58        self
59    }
60
61    /// Encodes data from a reader in chunks.
62    ///
63    /// Note: BaseConversion mode requires reading the entire input at once
64    /// due to the mathematical nature of the algorithm. For truly streaming
65    /// behavior, use Chunked or ByteRange modes.
66    ///
67    /// Returns the computed hash if hash_algo was set, otherwise None.
68    pub fn encode<R: Read>(&mut self, reader: &mut R) -> std::io::Result<Option<Vec<u8>>> {
69        // If compression is enabled, we need to compress then encode
70        if let Some(algo) = self.compress_algo {
71            return self.encode_with_compression(reader, algo);
72        }
73
74        // No compression - encode directly with optional hashing
75        let hash = match self.dictionary.mode() {
76            crate::core::config::EncodingMode::Chunked => self.encode_chunked(reader)?,
77            crate::core::config::EncodingMode::ByteRange => self.encode_byte_range(reader)?,
78            crate::core::config::EncodingMode::BaseConversion => {
79                // Mathematical mode requires entire input - read all and encode
80                let mut buffer = Vec::new();
81                reader.read_to_end(&mut buffer)?;
82
83                let hash = self
84                    .hash_algo
85                    .map(|algo| crate::features::hashing::hash(&buffer, algo));
86
87                let encoded = crate::encoders::algorithms::math::encode(&buffer, self.dictionary);
88                self.writer.write_all(encoded.as_bytes())?;
89                hash
90            }
91        };
92
93        Ok(hash)
94    }
95
96    /// Encode with compression: compress stream then encode compressed data.
97    fn encode_with_compression<R: Read>(
98        &mut self,
99        reader: &mut R,
100        algo: CompressionAlgorithm,
101    ) -> std::io::Result<Option<Vec<u8>>> {
102        use std::io::Cursor;
103
104        // Compress the input stream
105        let mut compressed_data = Vec::new();
106        let hash = self.compress_stream(reader, &mut compressed_data, algo)?;
107
108        // Encode the compressed data
109        let mut cursor = Cursor::new(compressed_data);
110        match self.dictionary.mode() {
111            crate::core::config::EncodingMode::Chunked => {
112                self.encode_chunked_no_hash(&mut cursor)?;
113            }
114            crate::core::config::EncodingMode::ByteRange => {
115                self.encode_byte_range_no_hash(&mut cursor)?;
116            }
117            crate::core::config::EncodingMode::BaseConversion => {
118                let buffer = cursor.into_inner();
119                let encoded = crate::encoders::algorithms::math::encode(&buffer, self.dictionary);
120                self.writer.write_all(encoded.as_bytes())?;
121            }
122        }
123
124        Ok(hash)
125    }
126
127    /// Compress a stream with optional hashing.
128    fn compress_stream<R: Read>(
129        &mut self,
130        reader: &mut R,
131        output: &mut Vec<u8>,
132        algo: CompressionAlgorithm,
133    ) -> std::io::Result<Option<Vec<u8>>> {
134        use flate2::write::GzEncoder;
135        use xz2::write::XzEncoder;
136
137        let hasher = self
138            .hash_algo
139            .map(|algo| create_hasher_writer(algo, &self.xxhash_config));
140
141        match algo {
142            CompressionAlgorithm::Gzip => {
143                let mut encoder =
144                    GzEncoder::new(output, flate2::Compression::new(self.compress_level));
145                let hash = Self::copy_with_hash(reader, &mut encoder, hasher)?;
146                encoder.finish()?;
147                Ok(hash)
148            }
149            CompressionAlgorithm::Zstd => {
150                let mut encoder =
151                    zstd::stream::write::Encoder::new(output, self.compress_level as i32)
152                        .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
153                let hash = Self::copy_with_hash(reader, &mut encoder, hasher)?;
154                encoder.finish()?;
155                Ok(hash)
156            }
157            CompressionAlgorithm::Brotli => {
158                let mut encoder =
159                    brotli::CompressorWriter::new(output, 4096, self.compress_level, 22);
160                let hash = Self::copy_with_hash(reader, &mut encoder, hasher)?;
161                Ok(hash)
162            }
163            CompressionAlgorithm::Lzma => {
164                let mut encoder = XzEncoder::new(output, self.compress_level);
165                let hash = Self::copy_with_hash(reader, &mut encoder, hasher)?;
166                encoder.finish()?;
167                Ok(hash)
168            }
169            CompressionAlgorithm::Lz4 | CompressionAlgorithm::Snappy => {
170                // LZ4 and Snappy don't have streaming encoders in their crates
171                // Read all, compress, write
172                let mut buffer = Vec::new();
173                reader.read_to_end(&mut buffer)?;
174
175                let hash = self
176                    .hash_algo
177                    .map(|algo| crate::features::hashing::hash(&buffer, algo));
178
179                let compressed = match algo {
180                    CompressionAlgorithm::Lz4 => lz4::block::compress(&buffer, None, false)
181                        .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?,
182                    CompressionAlgorithm::Snappy => {
183                        let mut encoder = snap::raw::Encoder::new();
184                        encoder
185                            .compress_vec(&buffer)
186                            .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?
187                    }
188                    _ => unreachable!(),
189                };
190                output.extend_from_slice(&compressed);
191                Ok(hash)
192            }
193        }
194    }
195
196    fn copy_with_hash<R: Read>(
197        reader: &mut R,
198        writer: &mut impl Write,
199        mut hasher: Option<HasherWriter>,
200    ) -> std::io::Result<Option<Vec<u8>>> {
201        let mut buffer = vec![0u8; CHUNK_SIZE];
202
203        loop {
204            let bytes_read = reader.read(&mut buffer)?;
205            if bytes_read == 0 {
206                break;
207            }
208
209            let chunk = &buffer[..bytes_read];
210            if let Some(ref mut h) = hasher {
211                h.update(chunk);
212            }
213            writer.write_all(chunk)?;
214        }
215
216        Ok(hasher.map(|h| h.finalize()))
217    }
218
219    fn encode_chunked<R: Read>(&mut self, reader: &mut R) -> std::io::Result<Option<Vec<u8>>> {
220        let base = self.dictionary.base();
221        let bits_per_char = (base as f64).log2() as usize;
222        let bytes_per_group = bits_per_char;
223
224        // Adjust chunk size to align with encoding groups
225        let aligned_chunk_size = (CHUNK_SIZE / bytes_per_group) * bytes_per_group;
226        let mut buffer = vec![0u8; aligned_chunk_size];
227
228        let mut hasher = self
229            .hash_algo
230            .map(|algo| create_hasher_writer(algo, &self.xxhash_config));
231
232        loop {
233            let bytes_read = reader.read(&mut buffer)?;
234            if bytes_read == 0 {
235                break;
236            }
237
238            let chunk = &buffer[..bytes_read];
239            if let Some(ref mut h) = hasher {
240                h.update(chunk);
241            }
242
243            let encoded =
244                crate::encoders::algorithms::chunked::encode_chunked(chunk, self.dictionary);
245            self.writer.write_all(encoded.as_bytes())?;
246        }
247
248        Ok(hasher.map(|h| h.finalize()))
249    }
250
251    fn encode_chunked_no_hash<R: Read>(&mut self, reader: &mut R) -> std::io::Result<()> {
252        let base = self.dictionary.base();
253        let bits_per_char = (base as f64).log2() as usize;
254        let bytes_per_group = bits_per_char;
255
256        let aligned_chunk_size = (CHUNK_SIZE / bytes_per_group) * bytes_per_group;
257        let mut buffer = vec![0u8; aligned_chunk_size];
258
259        loop {
260            let bytes_read = reader.read(&mut buffer)?;
261            if bytes_read == 0 {
262                break;
263            }
264
265            let encoded = crate::encoders::algorithms::chunked::encode_chunked(
266                &buffer[..bytes_read],
267                self.dictionary,
268            );
269            self.writer.write_all(encoded.as_bytes())?;
270        }
271
272        Ok(())
273    }
274
275    fn encode_byte_range<R: Read>(&mut self, reader: &mut R) -> std::io::Result<Option<Vec<u8>>> {
276        let mut buffer = vec![0u8; CHUNK_SIZE];
277        let mut hasher = self
278            .hash_algo
279            .map(|algo| create_hasher_writer(algo, &self.xxhash_config));
280
281        loop {
282            let bytes_read = reader.read(&mut buffer)?;
283            if bytes_read == 0 {
284                break;
285            }
286
287            let chunk = &buffer[..bytes_read];
288            if let Some(ref mut h) = hasher {
289                h.update(chunk);
290            }
291
292            let encoded =
293                crate::encoders::algorithms::byte_range::encode_byte_range(chunk, self.dictionary);
294            self.writer.write_all(encoded.as_bytes())?;
295        }
296
297        Ok(hasher.map(|h| h.finalize()))
298    }
299
300    fn encode_byte_range_no_hash<R: Read>(&mut self, reader: &mut R) -> std::io::Result<()> {
301        let mut buffer = vec![0u8; CHUNK_SIZE];
302
303        loop {
304            let bytes_read = reader.read(&mut buffer)?;
305            if bytes_read == 0 {
306                break;
307            }
308
309            let encoded = crate::encoders::algorithms::byte_range::encode_byte_range(
310                &buffer[..bytes_read],
311                self.dictionary,
312            );
313            self.writer.write_all(encoded.as_bytes())?;
314        }
315
316        Ok(())
317    }
318}