base_d/encoders/streaming/
encoder.rs

1use crate::core::dictionary::Dictionary;
2use crate::features::compression::CompressionAlgorithm;
3use crate::features::hashing::HashAlgorithm;
4use std::io::{Read, Write};
5
6use super::hasher::{HasherWriter, create_hasher_writer};
7
8const CHUNK_SIZE: usize = 4096; // 4KB chunks
9
10/// Streaming encoder for processing large amounts of data efficiently.
11///
12/// Processes data in chunks to avoid loading entire files into memory.
13/// Suitable for encoding large files or network streams.
14/// Supports optional compression and hashing during encoding.
15pub struct StreamingEncoder<'a, W: Write> {
16    dictionary: &'a Dictionary,
17    writer: W,
18    compress_algo: Option<CompressionAlgorithm>,
19    compress_level: u32,
20    hash_algo: Option<HashAlgorithm>,
21    xxhash_config: crate::features::hashing::XxHashConfig,
22}
23
24impl<'a, W: Write> StreamingEncoder<'a, W> {
25    /// Creates a new streaming encoder.
26    ///
27    /// # Arguments
28    ///
29    /// * `dictionary` - The dictionary to use for encoding
30    /// * `writer` - The destination for encoded output
31    pub fn new(dictionary: &'a Dictionary, writer: W) -> Self {
32        StreamingEncoder {
33            dictionary,
34            writer,
35            compress_algo: None,
36            compress_level: 6,
37            hash_algo: None,
38            xxhash_config: crate::features::hashing::XxHashConfig::default(),
39        }
40    }
41
42    /// Sets compression algorithm and level.
43    pub fn with_compression(mut self, algo: CompressionAlgorithm, level: u32) -> Self {
44        self.compress_algo = Some(algo);
45        self.compress_level = level;
46        self
47    }
48
49    /// Sets hash algorithm for computing hash during encoding.
50    pub fn with_hashing(mut self, algo: HashAlgorithm) -> Self {
51        self.hash_algo = Some(algo);
52        self
53    }
54
55    /// Sets xxHash configuration (seed and secret).
56    pub fn with_xxhash_config(mut self, config: crate::features::hashing::XxHashConfig) -> Self {
57        self.xxhash_config = config;
58        self
59    }
60
61    /// Encodes data from a reader in chunks.
62    ///
63    /// Note: Radix mode requires reading the entire input at once
64    /// due to the nature of true base conversion. For truly streaming
65    /// behavior, use Chunked or ByteRange modes.
66    ///
67    /// Returns the computed hash if hash_algo was set, otherwise None.
68    pub fn encode<R: Read>(&mut self, reader: &mut R) -> std::io::Result<Option<Vec<u8>>> {
69        // If compression is enabled, we need to compress then encode
70        if let Some(algo) = self.compress_algo {
71            return self.encode_with_compression(reader, algo);
72        }
73
74        // No compression - encode directly with optional hashing
75        let hash = match self.dictionary.mode() {
76            crate::core::config::EncodingMode::Chunked => self.encode_chunked(reader)?,
77            crate::core::config::EncodingMode::ByteRange => self.encode_byte_range(reader)?,
78            crate::core::config::EncodingMode::Radix => {
79                // Radix mode requires entire input - read all and encode
80                let mut buffer = Vec::new();
81                reader.read_to_end(&mut buffer)?;
82
83                let hash = self
84                    .hash_algo
85                    .map(|algo| crate::features::hashing::hash(&buffer, algo));
86
87                let encoded = crate::encoders::algorithms::radix::encode(&buffer, self.dictionary);
88                self.writer.write_all(encoded.as_bytes())?;
89                hash
90            }
91        };
92
93        Ok(hash)
94    }
95
96    /// Encode with compression: compress stream then encode compressed data.
97    fn encode_with_compression<R: Read>(
98        &mut self,
99        reader: &mut R,
100        algo: CompressionAlgorithm,
101    ) -> std::io::Result<Option<Vec<u8>>> {
102        use std::io::Cursor;
103
104        // Compress the input stream
105        let mut compressed_data = Vec::new();
106        let hash = self.compress_stream(reader, &mut compressed_data, algo)?;
107
108        // Encode the compressed data
109        let mut cursor = Cursor::new(compressed_data);
110        match self.dictionary.mode() {
111            crate::core::config::EncodingMode::Chunked => {
112                self.encode_chunked_no_hash(&mut cursor)?;
113            }
114            crate::core::config::EncodingMode::ByteRange => {
115                self.encode_byte_range_no_hash(&mut cursor)?;
116            }
117            crate::core::config::EncodingMode::Radix => {
118                let buffer = cursor.into_inner();
119                let encoded = crate::encoders::algorithms::radix::encode(&buffer, self.dictionary);
120                self.writer.write_all(encoded.as_bytes())?;
121            }
122        }
123
124        Ok(hash)
125    }
126
127    /// Compress a stream with optional hashing.
128    fn compress_stream<R: Read>(
129        &mut self,
130        reader: &mut R,
131        output: &mut Vec<u8>,
132        algo: CompressionAlgorithm,
133    ) -> std::io::Result<Option<Vec<u8>>> {
134        use flate2::write::GzEncoder;
135
136        let hasher = self
137            .hash_algo
138            .map(|algo| create_hasher_writer(algo, &self.xxhash_config));
139
140        match algo {
141            CompressionAlgorithm::Gzip => {
142                let mut encoder =
143                    GzEncoder::new(output, flate2::Compression::new(self.compress_level));
144                let hash = Self::copy_with_hash(reader, &mut encoder, hasher)?;
145                encoder.finish()?;
146                Ok(hash)
147            }
148            #[cfg(feature = "native-compression")]
149            CompressionAlgorithm::Zstd => {
150                let mut encoder =
151                    zstd::stream::write::Encoder::new(output, self.compress_level as i32)
152                        .map_err(std::io::Error::other)?;
153                let hash = Self::copy_with_hash(reader, &mut encoder, hasher)?;
154                encoder.finish()?;
155                Ok(hash)
156            }
157            #[cfg(all(feature = "wasm", not(feature = "native-compression")))]
158            CompressionAlgorithm::Zstd => {
159                // ruzstd is decode-only, buffer and use block compression
160                Err(std::io::Error::other(
161                    "Zstd compression not supported in WASM (ruzstd is decode-only)",
162                ))
163            }
164            CompressionAlgorithm::Brotli => {
165                let mut encoder =
166                    brotli::CompressorWriter::new(output, 4096, self.compress_level, 22);
167                let hash = Self::copy_with_hash(reader, &mut encoder, hasher)?;
168                Ok(hash)
169            }
170            #[cfg(feature = "native-compression")]
171            CompressionAlgorithm::Lzma => {
172                use xz2::write::XzEncoder;
173                let mut encoder = XzEncoder::new(output, self.compress_level);
174                let hash = Self::copy_with_hash(reader, &mut encoder, hasher)?;
175                encoder.finish()?;
176                Ok(hash)
177            }
178            #[cfg(all(feature = "wasm", not(feature = "native-compression")))]
179            CompressionAlgorithm::Lzma => {
180                // lzma-rs doesn't have streaming writer, buffer all
181                let mut buffer = Vec::new();
182                reader.read_to_end(&mut buffer)?;
183
184                let hash = self
185                    .hash_algo
186                    .map(|algo| crate::features::hashing::hash(&buffer, algo));
187
188                use std::io::Cursor;
189                lzma_rs::lzma_compress(&mut Cursor::new(&buffer), output)
190                    .map_err(std::io::Error::other)?;
191                Ok(hash)
192            }
193            CompressionAlgorithm::Lz4 | CompressionAlgorithm::Snappy => {
194                // LZ4 and Snappy don't have streaming encoders in their crates
195                // Read all, compress, write
196                let mut buffer = Vec::new();
197                reader.read_to_end(&mut buffer)?;
198
199                let hash = self
200                    .hash_algo
201                    .map(|algo| crate::features::hashing::hash(&buffer, algo));
202
203                let compressed = match algo {
204                    #[cfg(feature = "native-compression")]
205                    CompressionAlgorithm::Lz4 => {
206                        lz4::block::compress(&buffer, None, false).map_err(std::io::Error::other)?
207                    }
208                    #[cfg(all(feature = "wasm", not(feature = "native-compression")))]
209                    CompressionAlgorithm::Lz4 => lz4_flex::compress_prepend_size(&buffer),
210                    CompressionAlgorithm::Snappy => {
211                        let mut encoder = snap::raw::Encoder::new();
212                        encoder
213                            .compress_vec(&buffer)
214                            .map_err(std::io::Error::other)?
215                    }
216                    _ => unreachable!(),
217                };
218                output.extend_from_slice(&compressed);
219                Ok(hash)
220            }
221        }
222    }
223
224    fn copy_with_hash<R: Read>(
225        reader: &mut R,
226        writer: &mut impl Write,
227        mut hasher: Option<HasherWriter>,
228    ) -> std::io::Result<Option<Vec<u8>>> {
229        let mut buffer = vec![0u8; CHUNK_SIZE];
230
231        loop {
232            let bytes_read = reader.read(&mut buffer)?;
233            if bytes_read == 0 {
234                break;
235            }
236
237            let chunk = &buffer[..bytes_read];
238            if let Some(ref mut h) = hasher {
239                h.update(chunk);
240            }
241            writer.write_all(chunk)?;
242        }
243
244        Ok(hasher.map(|h| h.finalize()))
245    }
246
247    fn encode_chunked<R: Read>(&mut self, reader: &mut R) -> std::io::Result<Option<Vec<u8>>> {
248        let base = self.dictionary.base();
249        let bits_per_char = (base as f64).log2() as usize;
250        let bytes_per_group = bits_per_char;
251
252        // Adjust chunk size to align with encoding groups
253        let aligned_chunk_size = (CHUNK_SIZE / bytes_per_group) * bytes_per_group;
254        let mut buffer = vec![0u8; aligned_chunk_size];
255
256        let mut hasher = self
257            .hash_algo
258            .map(|algo| create_hasher_writer(algo, &self.xxhash_config));
259
260        loop {
261            let bytes_read = reader.read(&mut buffer)?;
262            if bytes_read == 0 {
263                break;
264            }
265
266            let chunk = &buffer[..bytes_read];
267            if let Some(ref mut h) = hasher {
268                h.update(chunk);
269            }
270
271            let encoded =
272                crate::encoders::algorithms::chunked::encode_chunked(chunk, self.dictionary);
273            self.writer.write_all(encoded.as_bytes())?;
274        }
275
276        Ok(hasher.map(|h| h.finalize()))
277    }
278
279    fn encode_chunked_no_hash<R: Read>(&mut self, reader: &mut R) -> std::io::Result<()> {
280        let base = self.dictionary.base();
281        let bits_per_char = (base as f64).log2() as usize;
282        let bytes_per_group = bits_per_char;
283
284        let aligned_chunk_size = (CHUNK_SIZE / bytes_per_group) * bytes_per_group;
285        let mut buffer = vec![0u8; aligned_chunk_size];
286
287        loop {
288            let bytes_read = reader.read(&mut buffer)?;
289            if bytes_read == 0 {
290                break;
291            }
292
293            let encoded = crate::encoders::algorithms::chunked::encode_chunked(
294                &buffer[..bytes_read],
295                self.dictionary,
296            );
297            self.writer.write_all(encoded.as_bytes())?;
298        }
299
300        Ok(())
301    }
302
303    fn encode_byte_range<R: Read>(&mut self, reader: &mut R) -> std::io::Result<Option<Vec<u8>>> {
304        let mut buffer = vec![0u8; CHUNK_SIZE];
305        let mut hasher = self
306            .hash_algo
307            .map(|algo| create_hasher_writer(algo, &self.xxhash_config));
308
309        loop {
310            let bytes_read = reader.read(&mut buffer)?;
311            if bytes_read == 0 {
312                break;
313            }
314
315            let chunk = &buffer[..bytes_read];
316            if let Some(ref mut h) = hasher {
317                h.update(chunk);
318            }
319
320            let encoded =
321                crate::encoders::algorithms::byte_range::encode_byte_range(chunk, self.dictionary);
322            self.writer.write_all(encoded.as_bytes())?;
323        }
324
325        Ok(hasher.map(|h| h.finalize()))
326    }
327
328    fn encode_byte_range_no_hash<R: Read>(&mut self, reader: &mut R) -> std::io::Result<()> {
329        let mut buffer = vec![0u8; CHUNK_SIZE];
330
331        loop {
332            let bytes_read = reader.read(&mut buffer)?;
333            if bytes_read == 0 {
334                break;
335            }
336
337            let encoded = crate::encoders::algorithms::byte_range::encode_byte_range(
338                &buffer[..bytes_read],
339                self.dictionary,
340            );
341            self.writer.write_all(encoded.as_bytes())?;
342        }
343
344        Ok(())
345    }
346}