base_d/encoders/streaming/
decoder.rs

1use crate::core::dictionary::Dictionary;
2use crate::encoders::algorithms::DecodeError;
3use crate::features::compression::CompressionAlgorithm;
4use crate::features::hashing::HashAlgorithm;
5use std::io::{Read, Write};
6
7use super::hasher::{create_hasher_writer, HasherWriter};
8
9const CHUNK_SIZE: usize = 4096; // 4KB chunks
10
11/// Streaming decoder for processing large amounts of encoded data efficiently.
12///
13/// Processes data in chunks to avoid loading entire files into memory.
14/// Suitable for decoding large files or network streams.
15/// Supports optional decompression and hashing during decoding.
16pub struct StreamingDecoder<'a, W: Write> {
17    dictionary: &'a Dictionary,
18    writer: W,
19    decompress_algo: Option<CompressionAlgorithm>,
20    hash_algo: Option<HashAlgorithm>,
21    xxhash_config: crate::features::hashing::XxHashConfig,
22}
23
24impl<'a, W: Write> StreamingDecoder<'a, W> {
25    /// Creates a new streaming decoder.
26    ///
27    /// # Arguments
28    ///
29    /// * `dictionary` - The dictionary used for encoding
30    /// * `writer` - The destination for decoded output
31    pub fn new(dictionary: &'a Dictionary, writer: W) -> Self {
32        StreamingDecoder {
33            dictionary,
34            writer,
35            decompress_algo: None,
36            hash_algo: None,
37            xxhash_config: crate::features::hashing::XxHashConfig::default(),
38        }
39    }
40
41    /// Sets decompression algorithm.
42    pub fn with_decompression(mut self, algo: CompressionAlgorithm) -> Self {
43        self.decompress_algo = Some(algo);
44        self
45    }
46
47    /// Sets hash algorithm for computing hash during decoding.
48    pub fn with_hashing(mut self, algo: HashAlgorithm) -> Self {
49        self.hash_algo = Some(algo);
50        self
51    }
52
53    /// Sets xxHash configuration (seed and secret).
54    pub fn with_xxhash_config(mut self, config: crate::features::hashing::XxHashConfig) -> Self {
55        self.xxhash_config = config;
56        self
57    }
58
59    /// Decodes data from a reader in chunks.
60    ///
61    /// Note: BaseConversion mode requires reading the entire input at once
62    /// due to the mathematical nature of the algorithm. For truly streaming
63    /// behavior, use Chunked or ByteRange modes.
64    ///
65    /// Returns the computed hash if hash_algo was set, otherwise None.
66    pub fn decode<R: Read>(&mut self, reader: &mut R) -> Result<Option<Vec<u8>>, DecodeError> {
67        // If decompression is enabled, decode then decompress
68        if let Some(algo) = self.decompress_algo {
69            return self.decode_with_decompression(reader, algo);
70        }
71
72        // No decompression - decode directly with optional hashing
73        match self.dictionary.mode() {
74            crate::core::config::EncodingMode::Chunked => self.decode_chunked(reader),
75            crate::core::config::EncodingMode::ByteRange => self.decode_byte_range(reader),
76            crate::core::config::EncodingMode::BaseConversion => {
77                // Mathematical mode requires entire input
78                let mut buffer = String::new();
79                reader
80                    .read_to_string(&mut buffer)
81                    .map_err(|_| DecodeError::InvalidCharacter('\0'))?;
82                let decoded = crate::encoders::algorithms::math::decode(&buffer, self.dictionary)?;
83
84                let hash = self
85                    .hash_algo
86                    .map(|algo| crate::features::hashing::hash(&decoded, algo));
87
88                self.writer
89                    .write_all(&decoded)
90                    .map_err(|_| DecodeError::InvalidCharacter('\0'))?;
91                Ok(hash)
92            }
93        }
94    }
95
96    /// Decode with decompression: decode stream then decompress decoded data.
97    fn decode_with_decompression<R: Read>(
98        &mut self,
99        reader: &mut R,
100        algo: CompressionAlgorithm,
101    ) -> Result<Option<Vec<u8>>, DecodeError> {
102        use std::io::Cursor;
103
104        // Decode the input stream to get compressed data
105        let mut compressed_data = Vec::new();
106        {
107            let mut temp_decoder = StreamingDecoder::new(self.dictionary, &mut compressed_data);
108            temp_decoder.decode(reader)?;
109        }
110
111        // Decompress and write to output with optional hashing
112        let mut cursor = Cursor::new(compressed_data);
113        let hash = self
114            .decompress_stream(&mut cursor, algo)
115            .map_err(|_| DecodeError::InvalidCharacter('\0'))?;
116
117        Ok(hash)
118    }
119
120    /// Decompress a stream with optional hashing.
121    fn decompress_stream<R: Read>(
122        &mut self,
123        reader: &mut R,
124        algo: CompressionAlgorithm,
125    ) -> std::io::Result<Option<Vec<u8>>> {
126        use flate2::read::GzDecoder;
127        use xz2::read::XzDecoder;
128
129        let mut hasher = self
130            .hash_algo
131            .map(|algo| create_hasher_writer(algo, &self.xxhash_config));
132
133        match algo {
134            CompressionAlgorithm::Gzip => {
135                let mut decoder = GzDecoder::new(reader);
136                Self::copy_with_hash_to_writer(&mut decoder, &mut self.writer, &mut hasher)?;
137            }
138            CompressionAlgorithm::Zstd => {
139                let mut decoder = zstd::stream::read::Decoder::new(reader)
140                    .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
141                Self::copy_with_hash_to_writer(&mut decoder, &mut self.writer, &mut hasher)?;
142            }
143            CompressionAlgorithm::Brotli => {
144                let mut decoder = brotli::Decompressor::new(reader, 4096);
145                Self::copy_with_hash_to_writer(&mut decoder, &mut self.writer, &mut hasher)?;
146            }
147            CompressionAlgorithm::Lzma => {
148                let mut decoder = XzDecoder::new(reader);
149                Self::copy_with_hash_to_writer(&mut decoder, &mut self.writer, &mut hasher)?;
150            }
151            CompressionAlgorithm::Lz4 | CompressionAlgorithm::Snappy => {
152                // LZ4 and Snappy don't have streaming decoders
153                let mut compressed = Vec::new();
154                reader.read_to_end(&mut compressed)?;
155
156                let decompressed = match algo {
157                    CompressionAlgorithm::Lz4 => {
158                        lz4::block::decompress(&compressed, Some(100 * 1024 * 1024))
159                            .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?
160                    }
161                    CompressionAlgorithm::Snappy => {
162                        let mut decoder = snap::raw::Decoder::new();
163                        decoder
164                            .decompress_vec(&compressed)
165                            .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?
166                    }
167                    _ => unreachable!(),
168                };
169
170                let hash = self
171                    .hash_algo
172                    .map(|algo| crate::features::hashing::hash(&decompressed, algo));
173                self.writer.write_all(&decompressed)?;
174                return Ok(hash);
175            }
176        }
177
178        Ok(hasher.map(|h| h.finalize()))
179    }
180
181    fn copy_with_hash_to_writer<R: Read>(
182        reader: &mut R,
183        writer: &mut W,
184        hasher: &mut Option<HasherWriter>,
185    ) -> std::io::Result<()> {
186        let mut buffer = vec![0u8; CHUNK_SIZE];
187
188        loop {
189            let bytes_read = reader.read(&mut buffer)?;
190            if bytes_read == 0 {
191                break;
192            }
193
194            let chunk = &buffer[..bytes_read];
195            if let Some(ref mut h) = hasher {
196                h.update(chunk);
197            }
198            writer.write_all(chunk)?;
199        }
200
201        Ok(())
202    }
203
204    fn decode_chunked<R: Read>(&mut self, reader: &mut R) -> Result<Option<Vec<u8>>, DecodeError> {
205        let base = self.dictionary.base();
206        let bits_per_char = (base as f64).log2() as usize;
207        let chars_per_group = 8 / bits_per_char;
208
209        // Read text in chunks
210        let mut text_buffer = String::new();
211        let mut char_buffer = vec![0u8; CHUNK_SIZE];
212        let mut hasher = self
213            .hash_algo
214            .map(|algo| create_hasher_writer(algo, &self.xxhash_config));
215
216        loop {
217            let bytes_read = reader
218                .read(&mut char_buffer)
219                .map_err(|_| DecodeError::InvalidCharacter('\0'))?;
220            if bytes_read == 0 {
221                break;
222            }
223
224            let chunk_str = std::str::from_utf8(&char_buffer[..bytes_read])
225                .map_err(|_| DecodeError::InvalidCharacter('\0'))?;
226            text_buffer.push_str(chunk_str);
227
228            // Process complete character groups
229            let chars: Vec<char> = text_buffer.chars().collect();
230            let complete_groups = (chars.len() / chars_per_group) * chars_per_group;
231
232            if complete_groups > 0 {
233                let to_decode: String = chars[..complete_groups].iter().collect();
234                let decoded = crate::encoders::algorithms::chunked::decode_chunked(
235                    &to_decode,
236                    self.dictionary,
237                )?;
238
239                if let Some(ref mut h) = hasher {
240                    h.update(&decoded);
241                }
242
243                self.writer
244                    .write_all(&decoded)
245                    .map_err(|_| DecodeError::InvalidCharacter('\0'))?;
246
247                // Keep remaining chars for next iteration
248                text_buffer = chars[complete_groups..].iter().collect();
249            }
250        }
251
252        // Process any remaining characters
253        if !text_buffer.is_empty() {
254            let decoded = crate::encoders::algorithms::chunked::decode_chunked(
255                &text_buffer,
256                self.dictionary,
257            )?;
258
259            if let Some(ref mut h) = hasher {
260                h.update(&decoded);
261            }
262
263            self.writer
264                .write_all(&decoded)
265                .map_err(|_| DecodeError::InvalidCharacter('\0'))?;
266        }
267
268        Ok(hasher.map(|h| h.finalize()))
269    }
270
271    fn decode_byte_range<R: Read>(
272        &mut self,
273        reader: &mut R,
274    ) -> Result<Option<Vec<u8>>, DecodeError> {
275        let mut char_buffer = vec![0u8; CHUNK_SIZE];
276        let mut hasher = self
277            .hash_algo
278            .map(|algo| create_hasher_writer(algo, &self.xxhash_config));
279
280        loop {
281            let bytes_read = reader
282                .read(&mut char_buffer)
283                .map_err(|_| DecodeError::InvalidCharacter('\0'))?;
284            if bytes_read == 0 {
285                break;
286            }
287
288            let chunk_str = std::str::from_utf8(&char_buffer[..bytes_read])
289                .map_err(|_| DecodeError::InvalidCharacter('\0'))?;
290
291            let decoded = crate::encoders::algorithms::byte_range::decode_byte_range(
292                chunk_str,
293                self.dictionary,
294            )?;
295
296            if let Some(ref mut h) = hasher {
297                h.update(&decoded);
298            }
299
300            self.writer
301                .write_all(&decoded)
302                .map_err(|_| DecodeError::InvalidCharacter('\0'))?;
303        }
304
305        Ok(hasher.map(|h| h.finalize()))
306    }
307}