base_d/encoders/
streaming.rs

1use crate::compression::CompressionAlgorithm;
2use crate::core::dictionary::Dictionary;
3use crate::encoders::encoding::DecodeError;
4use crate::hashing::HashAlgorithm;
5use std::io::{Read, Write};
6
7const CHUNK_SIZE: usize = 4096; // 4KB chunks
8
9/// Streaming encoder for processing large amounts of data efficiently.
10///
11/// Processes data in chunks to avoid loading entire files into memory.
12/// Suitable for encoding large files or network streams.
13/// Supports optional compression and hashing during encoding.
14pub struct StreamingEncoder<'a, W: Write> {
15    dictionary: &'a Dictionary,
16    writer: W,
17    compress_algo: Option<CompressionAlgorithm>,
18    compress_level: u32,
19    hash_algo: Option<HashAlgorithm>,
20    xxhash_config: crate::hashing::XxHashConfig,
21}
22
23impl<'a, W: Write> StreamingEncoder<'a, W> {
24    /// Creates a new streaming encoder.
25    ///
26    /// # Arguments
27    ///
28    /// * `dictionary` - The dictionary to use for encoding
29    /// * `writer` - The destination for encoded output
30    pub fn new(dictionary: &'a Dictionary, writer: W) -> Self {
31        StreamingEncoder {
32            dictionary,
33            writer,
34            compress_algo: None,
35            compress_level: 6,
36            hash_algo: None,
37            xxhash_config: crate::hashing::XxHashConfig::default(),
38        }
39    }
40
41    /// Sets compression algorithm and level.
42    pub fn with_compression(mut self, algo: CompressionAlgorithm, level: u32) -> Self {
43        self.compress_algo = Some(algo);
44        self.compress_level = level;
45        self
46    }
47
48    /// Sets hash algorithm for computing hash during encoding.
49    pub fn with_hashing(mut self, algo: HashAlgorithm) -> Self {
50        self.hash_algo = Some(algo);
51        self
52    }
53
54    /// Sets xxHash configuration (seed and secret).
55    pub fn with_xxhash_config(mut self, config: crate::hashing::XxHashConfig) -> Self {
56        self.xxhash_config = config;
57        self
58    }
59
60    /// Encodes data from a reader in chunks.
61    ///
62    /// Note: BaseConversion mode requires reading the entire input at once
63    /// due to the mathematical nature of the algorithm. For truly streaming
64    /// behavior, use Chunked or ByteRange modes.
65    ///
66    /// Returns the computed hash if hash_algo was set, otherwise None.
67    pub fn encode<R: Read>(&mut self, reader: &mut R) -> std::io::Result<Option<Vec<u8>>> {
68        // If compression is enabled, we need to compress then encode
69        if let Some(algo) = self.compress_algo {
70            return self.encode_with_compression(reader, algo);
71        }
72
73        // No compression - encode directly with optional hashing
74        let hash = match self.dictionary.mode() {
75            crate::core::config::EncodingMode::Chunked => self.encode_chunked(reader)?,
76            crate::core::config::EncodingMode::ByteRange => self.encode_byte_range(reader)?,
77            crate::core::config::EncodingMode::BaseConversion => {
78                // Mathematical mode requires entire input - read all and encode
79                let mut buffer = Vec::new();
80                reader.read_to_end(&mut buffer)?;
81
82                let hash = self
83                    .hash_algo
84                    .map(|algo| crate::hashing::hash(&buffer, algo));
85
86                let encoded = crate::encoders::encoding::encode(&buffer, self.dictionary);
87                self.writer.write_all(encoded.as_bytes())?;
88                hash
89            }
90        };
91
92        Ok(hash)
93    }
94
95    /// Encode with compression: compress stream then encode compressed data.
96    fn encode_with_compression<R: Read>(
97        &mut self,
98        reader: &mut R,
99        algo: CompressionAlgorithm,
100    ) -> std::io::Result<Option<Vec<u8>>> {
101        use std::io::Cursor;
102
103        // Compress the input stream
104        let mut compressed_data = Vec::new();
105        let hash = self.compress_stream(reader, &mut compressed_data, algo)?;
106
107        // Encode the compressed data
108        let mut cursor = Cursor::new(compressed_data);
109        match self.dictionary.mode() {
110            crate::core::config::EncodingMode::Chunked => {
111                self.encode_chunked_no_hash(&mut cursor)?;
112            }
113            crate::core::config::EncodingMode::ByteRange => {
114                self.encode_byte_range_no_hash(&mut cursor)?;
115            }
116            crate::core::config::EncodingMode::BaseConversion => {
117                let buffer = cursor.into_inner();
118                let encoded = crate::encoders::encoding::encode(&buffer, self.dictionary);
119                self.writer.write_all(encoded.as_bytes())?;
120            }
121        }
122
123        Ok(hash)
124    }
125
126    /// Compress a stream with optional hashing.
127    fn compress_stream<R: Read>(
128        &mut self,
129        reader: &mut R,
130        output: &mut Vec<u8>,
131        algo: CompressionAlgorithm,
132    ) -> std::io::Result<Option<Vec<u8>>> {
133        use flate2::write::GzEncoder;
134        use xz2::write::XzEncoder;
135
136        let hasher = self
137            .hash_algo
138            .map(|algo| create_hasher_writer(algo, &self.xxhash_config));
139
140        match algo {
141            CompressionAlgorithm::Gzip => {
142                let mut encoder =
143                    GzEncoder::new(output, flate2::Compression::new(self.compress_level));
144                let hash = Self::copy_with_hash(reader, &mut encoder, hasher)?;
145                encoder.finish()?;
146                Ok(hash)
147            }
148            CompressionAlgorithm::Zstd => {
149                let mut encoder =
150                    zstd::stream::write::Encoder::new(output, self.compress_level as i32)
151                        .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
152                let hash = Self::copy_with_hash(reader, &mut encoder, hasher)?;
153                encoder.finish()?;
154                Ok(hash)
155            }
156            CompressionAlgorithm::Brotli => {
157                let mut encoder =
158                    brotli::CompressorWriter::new(output, 4096, self.compress_level, 22);
159                let hash = Self::copy_with_hash(reader, &mut encoder, hasher)?;
160                Ok(hash)
161            }
162            CompressionAlgorithm::Lzma => {
163                let mut encoder = XzEncoder::new(output, self.compress_level);
164                let hash = Self::copy_with_hash(reader, &mut encoder, hasher)?;
165                encoder.finish()?;
166                Ok(hash)
167            }
168            CompressionAlgorithm::Lz4 | CompressionAlgorithm::Snappy => {
169                // LZ4 and Snappy don't have streaming encoders in their crates
170                // Read all, compress, write
171                let mut buffer = Vec::new();
172                reader.read_to_end(&mut buffer)?;
173
174                let hash = self
175                    .hash_algo
176                    .map(|algo| crate::hashing::hash(&buffer, algo));
177
178                let compressed = match algo {
179                    CompressionAlgorithm::Lz4 => lz4::block::compress(&buffer, None, false)
180                        .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?,
181                    CompressionAlgorithm::Snappy => {
182                        let mut encoder = snap::raw::Encoder::new();
183                        encoder
184                            .compress_vec(&buffer)
185                            .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?
186                    }
187                    _ => unreachable!(),
188                };
189                output.extend_from_slice(&compressed);
190                Ok(hash)
191            }
192        }
193    }
194
195    fn copy_with_hash<R: Read>(
196        reader: &mut R,
197        writer: &mut impl Write,
198        mut hasher: Option<HasherWriter>,
199    ) -> std::io::Result<Option<Vec<u8>>> {
200        let mut buffer = vec![0u8; CHUNK_SIZE];
201
202        loop {
203            let bytes_read = reader.read(&mut buffer)?;
204            if bytes_read == 0 {
205                break;
206            }
207
208            let chunk = &buffer[..bytes_read];
209            if let Some(ref mut h) = hasher {
210                h.update(chunk);
211            }
212            writer.write_all(chunk)?;
213        }
214
215        Ok(hasher.map(|h| h.finalize()))
216    }
217
218    fn encode_chunked<R: Read>(&mut self, reader: &mut R) -> std::io::Result<Option<Vec<u8>>> {
219        let base = self.dictionary.base();
220        let bits_per_char = (base as f64).log2() as usize;
221        let bytes_per_group = bits_per_char;
222
223        // Adjust chunk size to align with encoding groups
224        let aligned_chunk_size = (CHUNK_SIZE / bytes_per_group) * bytes_per_group;
225        let mut buffer = vec![0u8; aligned_chunk_size];
226
227        let mut hasher = self
228            .hash_algo
229            .map(|algo| create_hasher_writer(algo, &self.xxhash_config));
230
231        loop {
232            let bytes_read = reader.read(&mut buffer)?;
233            if bytes_read == 0 {
234                break;
235            }
236
237            let chunk = &buffer[..bytes_read];
238            if let Some(ref mut h) = hasher {
239                h.update(chunk);
240            }
241
242            let encoded = crate::encoders::chunked::encode_chunked(chunk, self.dictionary);
243            self.writer.write_all(encoded.as_bytes())?;
244        }
245
246        Ok(hasher.map(|h| h.finalize()))
247    }
248
249    fn encode_chunked_no_hash<R: Read>(&mut self, reader: &mut R) -> std::io::Result<()> {
250        let base = self.dictionary.base();
251        let bits_per_char = (base as f64).log2() as usize;
252        let bytes_per_group = bits_per_char;
253
254        let aligned_chunk_size = (CHUNK_SIZE / bytes_per_group) * bytes_per_group;
255        let mut buffer = vec![0u8; aligned_chunk_size];
256
257        loop {
258            let bytes_read = reader.read(&mut buffer)?;
259            if bytes_read == 0 {
260                break;
261            }
262
263            let encoded =
264                crate::encoders::chunked::encode_chunked(&buffer[..bytes_read], self.dictionary);
265            self.writer.write_all(encoded.as_bytes())?;
266        }
267
268        Ok(())
269    }
270
271    fn encode_byte_range<R: Read>(&mut self, reader: &mut R) -> std::io::Result<Option<Vec<u8>>> {
272        let mut buffer = vec![0u8; CHUNK_SIZE];
273        let mut hasher = self
274            .hash_algo
275            .map(|algo| create_hasher_writer(algo, &self.xxhash_config));
276
277        loop {
278            let bytes_read = reader.read(&mut buffer)?;
279            if bytes_read == 0 {
280                break;
281            }
282
283            let chunk = &buffer[..bytes_read];
284            if let Some(ref mut h) = hasher {
285                h.update(chunk);
286            }
287
288            let encoded = crate::encoders::byte_range::encode_byte_range(chunk, self.dictionary);
289            self.writer.write_all(encoded.as_bytes())?;
290        }
291
292        Ok(hasher.map(|h| h.finalize()))
293    }
294
295    fn encode_byte_range_no_hash<R: Read>(&mut self, reader: &mut R) -> std::io::Result<()> {
296        let mut buffer = vec![0u8; CHUNK_SIZE];
297
298        loop {
299            let bytes_read = reader.read(&mut buffer)?;
300            if bytes_read == 0 {
301                break;
302            }
303
304            let encoded = crate::encoders::byte_range::encode_byte_range(
305                &buffer[..bytes_read],
306                self.dictionary,
307            );
308            self.writer.write_all(encoded.as_bytes())?;
309        }
310
311        Ok(())
312    }
313}
314
315/// Streaming decoder for processing large amounts of encoded data efficiently.
316///
317/// Processes data in chunks to avoid loading entire files into memory.
318/// Suitable for decoding large files or network streams.
319/// Supports optional decompression and hashing during decoding.
320pub struct StreamingDecoder<'a, W: Write> {
321    dictionary: &'a Dictionary,
322    writer: W,
323    decompress_algo: Option<CompressionAlgorithm>,
324    hash_algo: Option<HashAlgorithm>,
325    xxhash_config: crate::hashing::XxHashConfig,
326}
327
328impl<'a, W: Write> StreamingDecoder<'a, W> {
329    /// Creates a new streaming decoder.
330    ///
331    /// # Arguments
332    ///
333    /// * `dictionary` - The dictionary used for encoding
334    /// * `writer` - The destination for decoded output
335    pub fn new(dictionary: &'a Dictionary, writer: W) -> Self {
336        StreamingDecoder {
337            dictionary,
338            writer,
339            decompress_algo: None,
340            hash_algo: None,
341            xxhash_config: crate::hashing::XxHashConfig::default(),
342        }
343    }
344
345    /// Sets decompression algorithm.
346    pub fn with_decompression(mut self, algo: CompressionAlgorithm) -> Self {
347        self.decompress_algo = Some(algo);
348        self
349    }
350
351    /// Sets hash algorithm for computing hash during decoding.
352    pub fn with_hashing(mut self, algo: HashAlgorithm) -> Self {
353        self.hash_algo = Some(algo);
354        self
355    }
356
357    /// Sets xxHash configuration (seed and secret).
358    pub fn with_xxhash_config(mut self, config: crate::hashing::XxHashConfig) -> Self {
359        self.xxhash_config = config;
360        self
361    }
362
363    /// Decodes data from a reader in chunks.
364    ///
365    /// Note: BaseConversion mode requires reading the entire input at once
366    /// due to the mathematical nature of the algorithm. For truly streaming
367    /// behavior, use Chunked or ByteRange modes.
368    ///
369    /// Returns the computed hash if hash_algo was set, otherwise None.
370    pub fn decode<R: Read>(&mut self, reader: &mut R) -> Result<Option<Vec<u8>>, DecodeError> {
371        // If decompression is enabled, decode then decompress
372        if let Some(algo) = self.decompress_algo {
373            return self.decode_with_decompression(reader, algo);
374        }
375
376        // No decompression - decode directly with optional hashing
377        match self.dictionary.mode() {
378            crate::core::config::EncodingMode::Chunked => self.decode_chunked(reader),
379            crate::core::config::EncodingMode::ByteRange => self.decode_byte_range(reader),
380            crate::core::config::EncodingMode::BaseConversion => {
381                // Mathematical mode requires entire input
382                let mut buffer = String::new();
383                reader
384                    .read_to_string(&mut buffer)
385                    .map_err(|_| DecodeError::InvalidCharacter('\0'))?;
386                let decoded = crate::encoders::encoding::decode(&buffer, self.dictionary)?;
387
388                let hash = self
389                    .hash_algo
390                    .map(|algo| crate::hashing::hash(&decoded, algo));
391
392                self.writer
393                    .write_all(&decoded)
394                    .map_err(|_| DecodeError::InvalidCharacter('\0'))?;
395                Ok(hash)
396            }
397        }
398    }
399
400    /// Decode with decompression: decode stream then decompress decoded data.
401    fn decode_with_decompression<R: Read>(
402        &mut self,
403        reader: &mut R,
404        algo: CompressionAlgorithm,
405    ) -> Result<Option<Vec<u8>>, DecodeError> {
406        use std::io::Cursor;
407
408        // Decode the input stream to get compressed data
409        let mut compressed_data = Vec::new();
410        {
411            let mut temp_decoder = StreamingDecoder::new(self.dictionary, &mut compressed_data);
412            temp_decoder.decode(reader)?;
413        }
414
415        // Decompress and write to output with optional hashing
416        let mut cursor = Cursor::new(compressed_data);
417        let hash = self
418            .decompress_stream(&mut cursor, algo)
419            .map_err(|_| DecodeError::InvalidCharacter('\0'))?;
420
421        Ok(hash)
422    }
423
424    /// Decompress a stream with optional hashing.
425    fn decompress_stream<R: Read>(
426        &mut self,
427        reader: &mut R,
428        algo: CompressionAlgorithm,
429    ) -> std::io::Result<Option<Vec<u8>>> {
430        use flate2::read::GzDecoder;
431        use xz2::read::XzDecoder;
432
433        let mut hasher = self
434            .hash_algo
435            .map(|algo| create_hasher_writer(algo, &self.xxhash_config));
436
437        match algo {
438            CompressionAlgorithm::Gzip => {
439                let mut decoder = GzDecoder::new(reader);
440                Self::copy_with_hash_to_writer(&mut decoder, &mut self.writer, &mut hasher)?;
441            }
442            CompressionAlgorithm::Zstd => {
443                let mut decoder = zstd::stream::read::Decoder::new(reader)
444                    .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
445                Self::copy_with_hash_to_writer(&mut decoder, &mut self.writer, &mut hasher)?;
446            }
447            CompressionAlgorithm::Brotli => {
448                let mut decoder = brotli::Decompressor::new(reader, 4096);
449                Self::copy_with_hash_to_writer(&mut decoder, &mut self.writer, &mut hasher)?;
450            }
451            CompressionAlgorithm::Lzma => {
452                let mut decoder = XzDecoder::new(reader);
453                Self::copy_with_hash_to_writer(&mut decoder, &mut self.writer, &mut hasher)?;
454            }
455            CompressionAlgorithm::Lz4 | CompressionAlgorithm::Snappy => {
456                // LZ4 and Snappy don't have streaming decoders
457                let mut compressed = Vec::new();
458                reader.read_to_end(&mut compressed)?;
459
460                let decompressed = match algo {
461                    CompressionAlgorithm::Lz4 => {
462                        lz4::block::decompress(&compressed, Some(100 * 1024 * 1024))
463                            .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?
464                    }
465                    CompressionAlgorithm::Snappy => {
466                        let mut decoder = snap::raw::Decoder::new();
467                        decoder
468                            .decompress_vec(&compressed)
469                            .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?
470                    }
471                    _ => unreachable!(),
472                };
473
474                let hash = self
475                    .hash_algo
476                    .map(|algo| crate::hashing::hash(&decompressed, algo));
477                self.writer.write_all(&decompressed)?;
478                return Ok(hash);
479            }
480        }
481
482        Ok(hasher.map(|h| h.finalize()))
483    }
484
485    fn copy_with_hash_to_writer<R: Read>(
486        reader: &mut R,
487        writer: &mut W,
488        hasher: &mut Option<HasherWriter>,
489    ) -> std::io::Result<()> {
490        let mut buffer = vec![0u8; CHUNK_SIZE];
491
492        loop {
493            let bytes_read = reader.read(&mut buffer)?;
494            if bytes_read == 0 {
495                break;
496            }
497
498            let chunk = &buffer[..bytes_read];
499            if let Some(ref mut h) = hasher {
500                h.update(chunk);
501            }
502            writer.write_all(chunk)?;
503        }
504
505        Ok(())
506    }
507
508    fn decode_chunked<R: Read>(&mut self, reader: &mut R) -> Result<Option<Vec<u8>>, DecodeError> {
509        let base = self.dictionary.base();
510        let bits_per_char = (base as f64).log2() as usize;
511        let chars_per_group = 8 / bits_per_char;
512
513        // Read text in chunks
514        let mut text_buffer = String::new();
515        let mut char_buffer = vec![0u8; CHUNK_SIZE];
516        let mut hasher = self
517            .hash_algo
518            .map(|algo| create_hasher_writer(algo, &self.xxhash_config));
519
520        loop {
521            let bytes_read = reader
522                .read(&mut char_buffer)
523                .map_err(|_| DecodeError::InvalidCharacter('\0'))?;
524            if bytes_read == 0 {
525                break;
526            }
527
528            let chunk_str = std::str::from_utf8(&char_buffer[..bytes_read])
529                .map_err(|_| DecodeError::InvalidCharacter('\0'))?;
530            text_buffer.push_str(chunk_str);
531
532            // Process complete character groups
533            let chars: Vec<char> = text_buffer.chars().collect();
534            let complete_groups = (chars.len() / chars_per_group) * chars_per_group;
535
536            if complete_groups > 0 {
537                let to_decode: String = chars[..complete_groups].iter().collect();
538                let decoded =
539                    crate::encoders::chunked::decode_chunked(&to_decode, self.dictionary)?;
540
541                if let Some(ref mut h) = hasher {
542                    h.update(&decoded);
543                }
544
545                self.writer
546                    .write_all(&decoded)
547                    .map_err(|_| DecodeError::InvalidCharacter('\0'))?;
548
549                // Keep remaining chars for next iteration
550                text_buffer = chars[complete_groups..].iter().collect();
551            }
552        }
553
554        // Process any remaining characters
555        if !text_buffer.is_empty() {
556            let decoded = crate::encoders::chunked::decode_chunked(&text_buffer, self.dictionary)?;
557
558            if let Some(ref mut h) = hasher {
559                h.update(&decoded);
560            }
561
562            self.writer
563                .write_all(&decoded)
564                .map_err(|_| DecodeError::InvalidCharacter('\0'))?;
565        }
566
567        Ok(hasher.map(|h| h.finalize()))
568    }
569
570    fn decode_byte_range<R: Read>(
571        &mut self,
572        reader: &mut R,
573    ) -> Result<Option<Vec<u8>>, DecodeError> {
574        let mut char_buffer = vec![0u8; CHUNK_SIZE];
575        let mut hasher = self
576            .hash_algo
577            .map(|algo| create_hasher_writer(algo, &self.xxhash_config));
578
579        loop {
580            let bytes_read = reader
581                .read(&mut char_buffer)
582                .map_err(|_| DecodeError::InvalidCharacter('\0'))?;
583            if bytes_read == 0 {
584                break;
585            }
586
587            let chunk_str = std::str::from_utf8(&char_buffer[..bytes_read])
588                .map_err(|_| DecodeError::InvalidCharacter('\0'))?;
589
590            let decoded =
591                crate::encoders::byte_range::decode_byte_range(chunk_str, self.dictionary)?;
592
593            if let Some(ref mut h) = hasher {
594                h.update(&decoded);
595            }
596
597            self.writer
598                .write_all(&decoded)
599                .map_err(|_| DecodeError::InvalidCharacter('\0'))?;
600        }
601
602        Ok(hasher.map(|h| h.finalize()))
603    }
604}
605
606// Helper for managing hash state during streaming
607enum HasherWriter {
608    Md5(md5::Md5),
609    Sha224(sha2::Sha224),
610    Sha256(sha2::Sha256),
611    Sha384(sha2::Sha384),
612    Sha512(sha2::Sha512),
613    Sha3_224(sha3::Sha3_224),
614    Sha3_256(sha3::Sha3_256),
615    Sha3_384(sha3::Sha3_384),
616    Sha3_512(sha3::Sha3_512),
617    Keccak224(sha3::Keccak224),
618    Keccak256(sha3::Keccak256),
619    Keccak384(sha3::Keccak384),
620    Keccak512(sha3::Keccak512),
621    Blake2b(blake2::Blake2b512),
622    Blake2s(blake2::Blake2s256),
623    Blake3(blake3::Hasher),
624    Crc16(Box<crc::Digest<'static, u16>>),
625    Crc32(Box<crc::Digest<'static, u32>>),
626    Crc32c(Box<crc::Digest<'static, u32>>),
627    Crc64(Box<crc::Digest<'static, u64>>),
628    XxHash32(twox_hash::XxHash32),
629    XxHash64(twox_hash::XxHash64),
630    XxHash3_64(twox_hash::xxhash3_64::Hasher),
631    XxHash3_128(twox_hash::xxhash3_128::Hasher),
632}
633
634impl HasherWriter {
635    fn update(&mut self, data: &[u8]) {
636        use sha2::Digest;
637        use std::hash::Hasher;
638
639        match self {
640            HasherWriter::Md5(h) => {
641                h.update(data);
642            }
643            HasherWriter::Sha224(h) => {
644                h.update(data);
645            }
646            HasherWriter::Sha256(h) => {
647                h.update(data);
648            }
649            HasherWriter::Sha384(h) => {
650                h.update(data);
651            }
652            HasherWriter::Sha512(h) => {
653                h.update(data);
654            }
655            HasherWriter::Sha3_224(h) => {
656                h.update(data);
657            }
658            HasherWriter::Sha3_256(h) => {
659                h.update(data);
660            }
661            HasherWriter::Sha3_384(h) => {
662                h.update(data);
663            }
664            HasherWriter::Sha3_512(h) => {
665                h.update(data);
666            }
667            HasherWriter::Keccak224(h) => {
668                h.update(data);
669            }
670            HasherWriter::Keccak256(h) => {
671                h.update(data);
672            }
673            HasherWriter::Keccak384(h) => {
674                h.update(data);
675            }
676            HasherWriter::Keccak512(h) => {
677                h.update(data);
678            }
679            HasherWriter::Blake2b(h) => {
680                h.update(data);
681            }
682            HasherWriter::Blake2s(h) => {
683                h.update(data);
684            }
685            HasherWriter::Blake3(h) => {
686                h.update(data);
687            }
688            HasherWriter::Crc16(digest) => {
689                digest.update(data);
690            }
691            HasherWriter::Crc32(digest) => {
692                digest.update(data);
693            }
694            HasherWriter::Crc32c(digest) => {
695                digest.update(data);
696            }
697            HasherWriter::Crc64(digest) => {
698                digest.update(data);
699            }
700            HasherWriter::XxHash32(h) => {
701                h.write(data);
702            }
703            HasherWriter::XxHash64(h) => {
704                h.write(data);
705            }
706            HasherWriter::XxHash3_64(h) => {
707                h.write(data);
708            }
709            HasherWriter::XxHash3_128(h) => {
710                h.write(data);
711            }
712        }
713    }
714
715    fn finalize(self) -> Vec<u8> {
716        use sha2::Digest;
717        use std::hash::Hasher;
718
719        match self {
720            HasherWriter::Md5(h) => h.finalize().to_vec(),
721            HasherWriter::Sha224(h) => h.finalize().to_vec(),
722            HasherWriter::Sha256(h) => h.finalize().to_vec(),
723            HasherWriter::Sha384(h) => h.finalize().to_vec(),
724            HasherWriter::Sha512(h) => h.finalize().to_vec(),
725            HasherWriter::Sha3_224(h) => h.finalize().to_vec(),
726            HasherWriter::Sha3_256(h) => h.finalize().to_vec(),
727            HasherWriter::Sha3_384(h) => h.finalize().to_vec(),
728            HasherWriter::Sha3_512(h) => h.finalize().to_vec(),
729            HasherWriter::Keccak224(h) => h.finalize().to_vec(),
730            HasherWriter::Keccak256(h) => h.finalize().to_vec(),
731            HasherWriter::Keccak384(h) => h.finalize().to_vec(),
732            HasherWriter::Keccak512(h) => h.finalize().to_vec(),
733            HasherWriter::Blake2b(h) => h.finalize().to_vec(),
734            HasherWriter::Blake2s(h) => h.finalize().to_vec(),
735            HasherWriter::Blake3(h) => h.finalize().as_bytes().to_vec(),
736            HasherWriter::Crc16(digest) => digest.finalize().to_be_bytes().to_vec(),
737            HasherWriter::Crc32(digest) => digest.finalize().to_be_bytes().to_vec(),
738            HasherWriter::Crc32c(digest) => digest.finalize().to_be_bytes().to_vec(),
739            HasherWriter::Crc64(digest) => digest.finalize().to_be_bytes().to_vec(),
740            HasherWriter::XxHash32(h) => (h.finish() as u32).to_be_bytes().to_vec(),
741            HasherWriter::XxHash64(h) => h.finish().to_be_bytes().to_vec(),
742            HasherWriter::XxHash3_64(h) => h.finish().to_be_bytes().to_vec(),
743            HasherWriter::XxHash3_128(h) => {
744                let hash = h.finish_128();
745                let mut result = Vec::with_capacity(16);
746                result.extend_from_slice(&hash.to_be_bytes());
747                result
748            }
749        }
750    }
751}
752
753fn create_hasher_writer(
754    algo: HashAlgorithm,
755    config: &crate::hashing::XxHashConfig,
756) -> HasherWriter {
757    use sha2::Digest;
758
759    match algo {
760        HashAlgorithm::Md5 => HasherWriter::Md5(md5::Md5::new()),
761        HashAlgorithm::Sha224 => HasherWriter::Sha224(sha2::Sha224::new()),
762        HashAlgorithm::Sha256 => HasherWriter::Sha256(sha2::Sha256::new()),
763        HashAlgorithm::Sha384 => HasherWriter::Sha384(sha2::Sha384::new()),
764        HashAlgorithm::Sha512 => HasherWriter::Sha512(sha2::Sha512::new()),
765        HashAlgorithm::Sha3_224 => HasherWriter::Sha3_224(sha3::Sha3_224::new()),
766        HashAlgorithm::Sha3_256 => HasherWriter::Sha3_256(sha3::Sha3_256::new()),
767        HashAlgorithm::Sha3_384 => HasherWriter::Sha3_384(sha3::Sha3_384::new()),
768        HashAlgorithm::Sha3_512 => HasherWriter::Sha3_512(sha3::Sha3_512::new()),
769        HashAlgorithm::Keccak224 => HasherWriter::Keccak224(sha3::Keccak224::new()),
770        HashAlgorithm::Keccak256 => HasherWriter::Keccak256(sha3::Keccak256::new()),
771        HashAlgorithm::Keccak384 => HasherWriter::Keccak384(sha3::Keccak384::new()),
772        HashAlgorithm::Keccak512 => HasherWriter::Keccak512(sha3::Keccak512::new()),
773        HashAlgorithm::Blake2b => HasherWriter::Blake2b(blake2::Blake2b512::new()),
774        HashAlgorithm::Blake2s => HasherWriter::Blake2s(blake2::Blake2s256::new()),
775        HashAlgorithm::Blake3 => HasherWriter::Blake3(blake3::Hasher::new()),
776        HashAlgorithm::Crc16 => {
777            static CRC: crc::Crc<u16> = crc::Crc::<u16>::new(&crc::CRC_16_IBM_SDLC);
778            HasherWriter::Crc16(Box::new(CRC.digest()))
779        }
780        HashAlgorithm::Crc32 => {
781            static CRC: crc::Crc<u32> = crc::Crc::<u32>::new(&crc::CRC_32_ISO_HDLC);
782            HasherWriter::Crc32(Box::new(CRC.digest()))
783        }
784        HashAlgorithm::Crc32c => {
785            static CRC: crc::Crc<u32> = crc::Crc::<u32>::new(&crc::CRC_32_ISCSI);
786            HasherWriter::Crc32c(Box::new(CRC.digest()))
787        }
788        HashAlgorithm::Crc64 => {
789            static CRC: crc::Crc<u64> = crc::Crc::<u64>::new(&crc::CRC_64_ECMA_182);
790            HasherWriter::Crc64(Box::new(CRC.digest()))
791        }
792        HashAlgorithm::XxHash32 => {
793            HasherWriter::XxHash32(twox_hash::XxHash32::with_seed(config.seed as u32))
794        }
795        HashAlgorithm::XxHash64 => {
796            HasherWriter::XxHash64(twox_hash::XxHash64::with_seed(config.seed))
797        }
798        HashAlgorithm::XxHash3_64 => {
799            if let Some(ref secret) = config.secret {
800                HasherWriter::XxHash3_64(
801                    twox_hash::xxhash3_64::Hasher::with_seed_and_secret(
802                        config.seed,
803                        secret.as_slice(),
804                    )
805                    .expect(
806                        "XXH3 secret validation should have been done in XxHashConfig::with_secret",
807                    ),
808                )
809            } else {
810                HasherWriter::XxHash3_64(twox_hash::xxhash3_64::Hasher::with_seed(config.seed))
811            }
812        }
813        HashAlgorithm::XxHash3_128 => {
814            if let Some(ref secret) = config.secret {
815                HasherWriter::XxHash3_128(
816                    twox_hash::xxhash3_128::Hasher::with_seed_and_secret(
817                        config.seed,
818                        secret.as_slice(),
819                    )
820                    .expect(
821                        "XXH3 secret validation should have been done in XxHashConfig::with_secret",
822                    ),
823                )
824            } else {
825                HasherWriter::XxHash3_128(twox_hash::xxhash3_128::Hasher::with_seed(config.seed))
826            }
827        }
828    }
829}
830
831#[cfg(test)]
832mod tests {
833    use super::*;
834    use crate::{DictionariesConfig, Dictionary};
835    use std::io::Cursor;
836
837    fn get_dictionary(name: &str) -> Dictionary {
838        let config = DictionariesConfig::load_default().unwrap();
839        let alphabet_config = config.get_dictionary(name).unwrap();
840
841        match alphabet_config.mode {
842            crate::core::config::EncodingMode::ByteRange => {
843                let start = alphabet_config.start_codepoint.unwrap();
844                Dictionary::new_with_mode_and_range(
845                    Vec::new(),
846                    alphabet_config.mode.clone(),
847                    None,
848                    Some(start),
849                )
850                .unwrap()
851            }
852            _ => {
853                let chars: Vec<char> = alphabet_config.chars.chars().collect();
854                let padding = alphabet_config
855                    .padding
856                    .as_ref()
857                    .and_then(|s| s.chars().next());
858                Dictionary::new_with_mode(chars, alphabet_config.mode.clone(), padding).unwrap()
859            }
860        }
861    }
862
863    #[test]
864    fn test_streaming_encode_decode_base64() {
865        let dictionary = get_dictionary("base64");
866        let data = b"Hello, World! This is a streaming test with multiple chunks of data.";
867
868        // Encode
869        let mut encoded_output = Vec::new();
870        {
871            let mut encoder = StreamingEncoder::new(&dictionary, &mut encoded_output);
872            let mut reader = Cursor::new(data);
873            encoder.encode(&mut reader).unwrap();
874        }
875
876        // Decode
877        let mut decoded_output = Vec::new();
878        {
879            let mut decoder = StreamingDecoder::new(&dictionary, &mut decoded_output);
880            let mut reader = Cursor::new(&encoded_output);
881            decoder.decode(&mut reader).unwrap();
882        }
883
884        assert_eq!(data, &decoded_output[..]);
885    }
886
887    #[test]
888    fn test_streaming_encode_decode_base100() {
889        let dictionary = get_dictionary("base100");
890        let data = b"Test data for byte range streaming";
891
892        // Encode
893        let mut encoded_output = Vec::new();
894        {
895            let mut encoder = StreamingEncoder::new(&dictionary, &mut encoded_output);
896            let mut reader = Cursor::new(data);
897            encoder.encode(&mut reader).unwrap();
898        }
899
900        // Decode
901        let mut decoded_output = Vec::new();
902        {
903            let mut decoder = StreamingDecoder::new(&dictionary, &mut decoded_output);
904            let mut reader = Cursor::new(&encoded_output);
905            decoder.decode(&mut reader).unwrap();
906        }
907
908        assert_eq!(data, &decoded_output[..]);
909    }
910
911    #[test]
912    fn test_streaming_large_data() {
913        let dictionary = get_dictionary("base64");
914        // Create 100KB of data
915        let data: Vec<u8> = (0..100000).map(|i| (i % 256) as u8).collect();
916
917        // Encode
918        let mut encoded_output = Vec::new();
919        {
920            let mut encoder = StreamingEncoder::new(&dictionary, &mut encoded_output);
921            let mut reader = Cursor::new(&data);
922            encoder.encode(&mut reader).unwrap();
923        }
924
925        // Decode
926        let mut decoded_output = Vec::new();
927        {
928            let mut decoder = StreamingDecoder::new(&dictionary, &mut decoded_output);
929            let mut reader = Cursor::new(&encoded_output);
930            decoder.decode(&mut reader).unwrap();
931        }
932
933        assert_eq!(data, decoded_output);
934    }
935}