base_d/encoders/streaming/
encoder.rs

1use crate::core::dictionary::Dictionary;
2use crate::features::compression::CompressionAlgorithm;
3use crate::features::hashing::HashAlgorithm;
4use std::io::{Read, Write};
5
6use super::hasher::{HasherWriter, create_hasher_writer};
7
8const CHUNK_SIZE: usize = 4096; // 4KB chunks
9
10/// Streaming encoder for processing large amounts of data efficiently.
11///
12/// Processes data in chunks to avoid loading entire files into memory.
13/// Suitable for encoding large files or network streams.
14/// Supports optional compression and hashing during encoding.
15pub struct StreamingEncoder<'a, W: Write> {
16    dictionary: &'a Dictionary,
17    writer: W,
18    compress_algo: Option<CompressionAlgorithm>,
19    compress_level: u32,
20    hash_algo: Option<HashAlgorithm>,
21    xxhash_config: crate::features::hashing::XxHashConfig,
22}
23
24impl<'a, W: Write> StreamingEncoder<'a, W> {
25    /// Creates a new streaming encoder.
26    ///
27    /// # Arguments
28    ///
29    /// * `dictionary` - The dictionary to use for encoding
30    /// * `writer` - The destination for encoded output
31    pub fn new(dictionary: &'a Dictionary, writer: W) -> Self {
32        StreamingEncoder {
33            dictionary,
34            writer,
35            compress_algo: None,
36            compress_level: 6,
37            hash_algo: None,
38            xxhash_config: crate::features::hashing::XxHashConfig::default(),
39        }
40    }
41
42    /// Sets compression algorithm and level.
43    pub fn with_compression(mut self, algo: CompressionAlgorithm, level: u32) -> Self {
44        self.compress_algo = Some(algo);
45        self.compress_level = level;
46        self
47    }
48
49    /// Sets hash algorithm for computing hash during encoding.
50    pub fn with_hashing(mut self, algo: HashAlgorithm) -> Self {
51        self.hash_algo = Some(algo);
52        self
53    }
54
55    /// Sets xxHash configuration (seed and secret).
56    pub fn with_xxhash_config(mut self, config: crate::features::hashing::XxHashConfig) -> Self {
57        self.xxhash_config = config;
58        self
59    }
60
61    /// Encodes data from a reader in chunks.
62    ///
63    /// Note: Radix mode requires reading the entire input at once
64    /// due to the nature of true base conversion. For truly streaming
65    /// behavior, use Chunked or ByteRange modes.
66    ///
67    /// Returns the computed hash if hash_algo was set, otherwise None.
68    pub fn encode<R: Read>(&mut self, reader: &mut R) -> std::io::Result<Option<Vec<u8>>> {
69        // If compression is enabled, we need to compress then encode
70        if let Some(algo) = self.compress_algo {
71            return self.encode_with_compression(reader, algo);
72        }
73
74        // No compression - encode directly with optional hashing
75        let hash = match self.dictionary.mode() {
76            crate::core::config::EncodingMode::Chunked => self.encode_chunked(reader)?,
77            crate::core::config::EncodingMode::ByteRange => self.encode_byte_range(reader)?,
78            crate::core::config::EncodingMode::Radix => {
79                // Radix mode requires entire input - read all and encode
80                let mut buffer = Vec::new();
81                reader.read_to_end(&mut buffer)?;
82
83                let hash = self
84                    .hash_algo
85                    .map(|algo| crate::features::hashing::hash(&buffer, algo));
86
87                let encoded = crate::encoders::algorithms::radix::encode(&buffer, self.dictionary);
88                self.writer.write_all(encoded.as_bytes())?;
89                hash
90            }
91        };
92
93        Ok(hash)
94    }
95
96    /// Encode with compression: compress stream then encode compressed data.
97    fn encode_with_compression<R: Read>(
98        &mut self,
99        reader: &mut R,
100        algo: CompressionAlgorithm,
101    ) -> std::io::Result<Option<Vec<u8>>> {
102        use std::io::Cursor;
103
104        // Compress the input stream
105        let mut compressed_data = Vec::new();
106        let hash = self.compress_stream(reader, &mut compressed_data, algo)?;
107
108        // Encode the compressed data
109        let mut cursor = Cursor::new(compressed_data);
110        match self.dictionary.mode() {
111            crate::core::config::EncodingMode::Chunked => {
112                self.encode_chunked_no_hash(&mut cursor)?;
113            }
114            crate::core::config::EncodingMode::ByteRange => {
115                self.encode_byte_range_no_hash(&mut cursor)?;
116            }
117            crate::core::config::EncodingMode::Radix => {
118                let buffer = cursor.into_inner();
119                let encoded = crate::encoders::algorithms::radix::encode(&buffer, self.dictionary);
120                self.writer.write_all(encoded.as_bytes())?;
121            }
122        }
123
124        Ok(hash)
125    }
126
127    /// Compress a stream with optional hashing.
128    fn compress_stream<R: Read>(
129        &mut self,
130        reader: &mut R,
131        output: &mut Vec<u8>,
132        algo: CompressionAlgorithm,
133    ) -> std::io::Result<Option<Vec<u8>>> {
134        use flate2::write::GzEncoder;
135        use xz2::write::XzEncoder;
136
137        let hasher = self
138            .hash_algo
139            .map(|algo| create_hasher_writer(algo, &self.xxhash_config));
140
141        match algo {
142            CompressionAlgorithm::Gzip => {
143                let mut encoder =
144                    GzEncoder::new(output, flate2::Compression::new(self.compress_level));
145                let hash = Self::copy_with_hash(reader, &mut encoder, hasher)?;
146                encoder.finish()?;
147                Ok(hash)
148            }
149            CompressionAlgorithm::Zstd => {
150                let mut encoder =
151                    zstd::stream::write::Encoder::new(output, self.compress_level as i32)
152                        .map_err(std::io::Error::other)?;
153                let hash = Self::copy_with_hash(reader, &mut encoder, hasher)?;
154                encoder.finish()?;
155                Ok(hash)
156            }
157            CompressionAlgorithm::Brotli => {
158                let mut encoder =
159                    brotli::CompressorWriter::new(output, 4096, self.compress_level, 22);
160                let hash = Self::copy_with_hash(reader, &mut encoder, hasher)?;
161                Ok(hash)
162            }
163            CompressionAlgorithm::Lzma => {
164                let mut encoder = XzEncoder::new(output, self.compress_level);
165                let hash = Self::copy_with_hash(reader, &mut encoder, hasher)?;
166                encoder.finish()?;
167                Ok(hash)
168            }
169            CompressionAlgorithm::Lz4 | CompressionAlgorithm::Snappy => {
170                // LZ4 and Snappy don't have streaming encoders in their crates
171                // Read all, compress, write
172                let mut buffer = Vec::new();
173                reader.read_to_end(&mut buffer)?;
174
175                let hash = self
176                    .hash_algo
177                    .map(|algo| crate::features::hashing::hash(&buffer, algo));
178
179                let compressed = match algo {
180                    CompressionAlgorithm::Lz4 => {
181                        lz4::block::compress(&buffer, None, false).map_err(std::io::Error::other)?
182                    }
183                    CompressionAlgorithm::Snappy => {
184                        let mut encoder = snap::raw::Encoder::new();
185                        encoder
186                            .compress_vec(&buffer)
187                            .map_err(std::io::Error::other)?
188                    }
189                    _ => unreachable!(),
190                };
191                output.extend_from_slice(&compressed);
192                Ok(hash)
193            }
194        }
195    }
196
197    fn copy_with_hash<R: Read>(
198        reader: &mut R,
199        writer: &mut impl Write,
200        mut hasher: Option<HasherWriter>,
201    ) -> std::io::Result<Option<Vec<u8>>> {
202        let mut buffer = vec![0u8; CHUNK_SIZE];
203
204        loop {
205            let bytes_read = reader.read(&mut buffer)?;
206            if bytes_read == 0 {
207                break;
208            }
209
210            let chunk = &buffer[..bytes_read];
211            if let Some(ref mut h) = hasher {
212                h.update(chunk);
213            }
214            writer.write_all(chunk)?;
215        }
216
217        Ok(hasher.map(|h| h.finalize()))
218    }
219
220    fn encode_chunked<R: Read>(&mut self, reader: &mut R) -> std::io::Result<Option<Vec<u8>>> {
221        let base = self.dictionary.base();
222        let bits_per_char = (base as f64).log2() as usize;
223        let bytes_per_group = bits_per_char;
224
225        // Adjust chunk size to align with encoding groups
226        let aligned_chunk_size = (CHUNK_SIZE / bytes_per_group) * bytes_per_group;
227        let mut buffer = vec![0u8; aligned_chunk_size];
228
229        let mut hasher = self
230            .hash_algo
231            .map(|algo| create_hasher_writer(algo, &self.xxhash_config));
232
233        loop {
234            let bytes_read = reader.read(&mut buffer)?;
235            if bytes_read == 0 {
236                break;
237            }
238
239            let chunk = &buffer[..bytes_read];
240            if let Some(ref mut h) = hasher {
241                h.update(chunk);
242            }
243
244            let encoded =
245                crate::encoders::algorithms::chunked::encode_chunked(chunk, self.dictionary);
246            self.writer.write_all(encoded.as_bytes())?;
247        }
248
249        Ok(hasher.map(|h| h.finalize()))
250    }
251
252    fn encode_chunked_no_hash<R: Read>(&mut self, reader: &mut R) -> std::io::Result<()> {
253        let base = self.dictionary.base();
254        let bits_per_char = (base as f64).log2() as usize;
255        let bytes_per_group = bits_per_char;
256
257        let aligned_chunk_size = (CHUNK_SIZE / bytes_per_group) * bytes_per_group;
258        let mut buffer = vec![0u8; aligned_chunk_size];
259
260        loop {
261            let bytes_read = reader.read(&mut buffer)?;
262            if bytes_read == 0 {
263                break;
264            }
265
266            let encoded = crate::encoders::algorithms::chunked::encode_chunked(
267                &buffer[..bytes_read],
268                self.dictionary,
269            );
270            self.writer.write_all(encoded.as_bytes())?;
271        }
272
273        Ok(())
274    }
275
276    fn encode_byte_range<R: Read>(&mut self, reader: &mut R) -> std::io::Result<Option<Vec<u8>>> {
277        let mut buffer = vec![0u8; CHUNK_SIZE];
278        let mut hasher = self
279            .hash_algo
280            .map(|algo| create_hasher_writer(algo, &self.xxhash_config));
281
282        loop {
283            let bytes_read = reader.read(&mut buffer)?;
284            if bytes_read == 0 {
285                break;
286            }
287
288            let chunk = &buffer[..bytes_read];
289            if let Some(ref mut h) = hasher {
290                h.update(chunk);
291            }
292
293            let encoded =
294                crate::encoders::algorithms::byte_range::encode_byte_range(chunk, self.dictionary);
295            self.writer.write_all(encoded.as_bytes())?;
296        }
297
298        Ok(hasher.map(|h| h.finalize()))
299    }
300
301    fn encode_byte_range_no_hash<R: Read>(&mut self, reader: &mut R) -> std::io::Result<()> {
302        let mut buffer = vec![0u8; CHUNK_SIZE];
303
304        loop {
305            let bytes_read = reader.read(&mut buffer)?;
306            if bytes_read == 0 {
307                break;
308            }
309
310            let encoded = crate::encoders::algorithms::byte_range::encode_byte_range(
311                &buffer[..bytes_read],
312                self.dictionary,
313            );
314            self.writer.write_all(encoded.as_bytes())?;
315        }
316
317        Ok(())
318    }
319}