base_d/encoders/streaming/
decoder.rs

1use crate::core::dictionary::Dictionary;
2use crate::encoders::algorithms::DecodeError;
3use crate::features::compression::CompressionAlgorithm;
4use crate::features::hashing::HashAlgorithm;
5use std::io::{Read, Write};
6
7use super::hasher::{HasherWriter, create_hasher_writer};
8
9const CHUNK_SIZE: usize = 4096; // 4KB chunks
10
11/// Streaming decoder for processing large amounts of encoded data efficiently.
12///
13/// Processes data in chunks to avoid loading entire files into memory.
14/// Suitable for decoding large files or network streams.
15/// Supports optional decompression and hashing during decoding.
16pub struct StreamingDecoder<'a, W: Write> {
17    dictionary: &'a Dictionary,
18    writer: W,
19    decompress_algo: Option<CompressionAlgorithm>,
20    hash_algo: Option<HashAlgorithm>,
21    xxhash_config: crate::features::hashing::XxHashConfig,
22}
23
24impl<'a, W: Write> StreamingDecoder<'a, W> {
25    /// Creates a new streaming decoder.
26    ///
27    /// # Arguments
28    ///
29    /// * `dictionary` - The dictionary used for encoding
30    /// * `writer` - The destination for decoded output
31    pub fn new(dictionary: &'a Dictionary, writer: W) -> Self {
32        StreamingDecoder {
33            dictionary,
34            writer,
35            decompress_algo: None,
36            hash_algo: None,
37            xxhash_config: crate::features::hashing::XxHashConfig::default(),
38        }
39    }
40
41    /// Sets decompression algorithm.
42    pub fn with_decompression(mut self, algo: CompressionAlgorithm) -> Self {
43        self.decompress_algo = Some(algo);
44        self
45    }
46
47    /// Sets hash algorithm for computing hash during decoding.
48    pub fn with_hashing(mut self, algo: HashAlgorithm) -> Self {
49        self.hash_algo = Some(algo);
50        self
51    }
52
53    /// Sets xxHash configuration (seed and secret).
54    pub fn with_xxhash_config(mut self, config: crate::features::hashing::XxHashConfig) -> Self {
55        self.xxhash_config = config;
56        self
57    }
58
59    /// Decodes data from a reader in chunks.
60    ///
61    /// Note: Radix mode requires reading the entire input at once
62    /// due to the nature of true base conversion. For truly streaming
63    /// behavior, use Chunked or ByteRange modes.
64    ///
65    /// Returns the computed hash if hash_algo was set, otherwise None.
66    pub fn decode<R: Read>(&mut self, reader: &mut R) -> Result<Option<Vec<u8>>, DecodeError> {
67        // If decompression is enabled, decode then decompress
68        if let Some(algo) = self.decompress_algo {
69            return self.decode_with_decompression(reader, algo);
70        }
71
72        // No decompression - decode directly with optional hashing
73        match self.dictionary.mode() {
74            crate::core::config::EncodingMode::Chunked => self.decode_chunked(reader),
75            crate::core::config::EncodingMode::ByteRange => self.decode_byte_range(reader),
76            crate::core::config::EncodingMode::Radix => {
77                // Radix mode requires entire input
78                let mut buffer = String::new();
79                reader
80                    .read_to_string(&mut buffer)
81                    .map_err(|_| DecodeError::InvalidCharacter {
82                        char: '\0',
83                        position: 0,
84                        input: String::new(),
85                        valid_chars: String::new(),
86                    })?;
87                let decoded = crate::encoders::algorithms::radix::decode(&buffer, self.dictionary)?;
88
89                let hash = self
90                    .hash_algo
91                    .map(|algo| crate::features::hashing::hash(&decoded, algo));
92
93                self.writer
94                    .write_all(&decoded)
95                    .map_err(|_| DecodeError::InvalidCharacter {
96                        char: '\0',
97                        position: 0,
98                        input: String::new(),
99                        valid_chars: String::new(),
100                    })?;
101                Ok(hash)
102            }
103        }
104    }
105
106    /// Decode with decompression: decode stream then decompress decoded data.
107    fn decode_with_decompression<R: Read>(
108        &mut self,
109        reader: &mut R,
110        algo: CompressionAlgorithm,
111    ) -> Result<Option<Vec<u8>>, DecodeError> {
112        use std::io::Cursor;
113
114        // Decode the input stream to get compressed data
115        let mut compressed_data = Vec::new();
116        {
117            let mut temp_decoder = StreamingDecoder::new(self.dictionary, &mut compressed_data);
118            temp_decoder.decode(reader)?;
119        }
120
121        // Decompress and write to output with optional hashing
122        let mut cursor = Cursor::new(compressed_data);
123        let hash = self.decompress_stream(&mut cursor, algo).map_err(|_| {
124            DecodeError::InvalidCharacter {
125                char: '\0',
126                position: 0,
127                input: String::new(),
128                valid_chars: String::new(),
129            }
130        })?;
131
132        Ok(hash)
133    }
134
135    /// Decompress a stream with optional hashing.
136    fn decompress_stream<R: Read>(
137        &mut self,
138        reader: &mut R,
139        algo: CompressionAlgorithm,
140    ) -> std::io::Result<Option<Vec<u8>>> {
141        use flate2::read::GzDecoder;
142
143        let mut hasher = self
144            .hash_algo
145            .map(|algo| create_hasher_writer(algo, &self.xxhash_config));
146
147        match algo {
148            CompressionAlgorithm::Gzip => {
149                let mut decoder = GzDecoder::new(reader);
150                Self::copy_with_hash_to_writer(&mut decoder, &mut self.writer, &mut hasher)?;
151            }
152            #[cfg(feature = "native-compression")]
153            CompressionAlgorithm::Zstd => {
154                let mut decoder =
155                    zstd::stream::read::Decoder::new(reader).map_err(std::io::Error::other)?;
156                Self::copy_with_hash_to_writer(&mut decoder, &mut self.writer, &mut hasher)?;
157            }
158            #[cfg(all(feature = "wasm", not(feature = "native-compression")))]
159            CompressionAlgorithm::Zstd => {
160                let mut decoder =
161                    ruzstd::StreamingDecoder::new(reader).map_err(std::io::Error::other)?;
162                Self::copy_with_hash_to_writer(&mut decoder, &mut self.writer, &mut hasher)?;
163            }
164            CompressionAlgorithm::Brotli => {
165                let mut decoder = brotli::Decompressor::new(reader, 4096);
166                Self::copy_with_hash_to_writer(&mut decoder, &mut self.writer, &mut hasher)?;
167            }
168            #[cfg(feature = "native-compression")]
169            CompressionAlgorithm::Lzma => {
170                use xz2::read::XzDecoder;
171                let mut decoder = XzDecoder::new(reader);
172                Self::copy_with_hash_to_writer(&mut decoder, &mut self.writer, &mut hasher)?;
173            }
174            #[cfg(all(feature = "wasm", not(feature = "native-compression")))]
175            CompressionAlgorithm::Lzma => {
176                // lzma-rs doesn't have streaming reader, buffer all
177                let mut compressed = Vec::new();
178                reader.read_to_end(&mut compressed)?;
179
180                use std::io::Cursor;
181                let mut decompressed = Vec::new();
182                lzma_rs::lzma_decompress(&mut Cursor::new(&compressed), &mut decompressed)
183                    .map_err(std::io::Error::other)?;
184
185                let hash = self
186                    .hash_algo
187                    .map(|algo| crate::features::hashing::hash(&decompressed, algo));
188                self.writer.write_all(&decompressed)?;
189                return Ok(hash);
190            }
191            CompressionAlgorithm::Lz4 | CompressionAlgorithm::Snappy => {
192                // LZ4 and Snappy don't have streaming decoders
193                let mut compressed = Vec::new();
194                reader.read_to_end(&mut compressed)?;
195
196                let decompressed = match algo {
197                    #[cfg(feature = "native-compression")]
198                    CompressionAlgorithm::Lz4 => {
199                        lz4::block::decompress(&compressed, Some(100 * 1024 * 1024))
200                            .map_err(std::io::Error::other)?
201                    }
202                    #[cfg(all(feature = "wasm", not(feature = "native-compression")))]
203                    CompressionAlgorithm::Lz4 => lz4_flex::decompress_size_prepended(&compressed)
204                        .map_err(std::io::Error::other)?,
205                    CompressionAlgorithm::Snappy => {
206                        let mut decoder = snap::raw::Decoder::new();
207                        decoder
208                            .decompress_vec(&compressed)
209                            .map_err(std::io::Error::other)?
210                    }
211                    _ => unreachable!(),
212                };
213
214                let hash = self
215                    .hash_algo
216                    .map(|algo| crate::features::hashing::hash(&decompressed, algo));
217                self.writer.write_all(&decompressed)?;
218                return Ok(hash);
219            }
220        }
221
222        Ok(hasher.map(|h| h.finalize()))
223    }
224
225    fn copy_with_hash_to_writer<R: Read>(
226        reader: &mut R,
227        writer: &mut W,
228        hasher: &mut Option<HasherWriter>,
229    ) -> std::io::Result<()> {
230        let mut buffer = vec![0u8; CHUNK_SIZE];
231
232        loop {
233            let bytes_read = reader.read(&mut buffer)?;
234            if bytes_read == 0 {
235                break;
236            }
237
238            let chunk = &buffer[..bytes_read];
239            if let Some(h) = hasher {
240                h.update(chunk);
241            }
242            writer.write_all(chunk)?;
243        }
244
245        Ok(())
246    }
247
248    fn decode_chunked<R: Read>(&mut self, reader: &mut R) -> Result<Option<Vec<u8>>, DecodeError> {
249        let base = self.dictionary.base();
250        let bits_per_char = (base as f64).log2() as usize;
251        let chars_per_group = 8 / bits_per_char;
252
253        // Read text in chunks
254        let mut text_buffer = String::new();
255        let mut char_buffer = vec![0u8; CHUNK_SIZE];
256        let mut hasher = self
257            .hash_algo
258            .map(|algo| create_hasher_writer(algo, &self.xxhash_config));
259
260        loop {
261            let bytes_read =
262                reader
263                    .read(&mut char_buffer)
264                    .map_err(|_| DecodeError::InvalidCharacter {
265                        char: '\0',
266                        position: 0,
267                        input: String::new(),
268                        valid_chars: String::new(),
269                    })?;
270            if bytes_read == 0 {
271                break;
272            }
273
274            let chunk_str = std::str::from_utf8(&char_buffer[..bytes_read]).map_err(|_| {
275                DecodeError::InvalidCharacter {
276                    char: '\0',
277                    position: 0,
278                    input: String::new(),
279                    valid_chars: String::new(),
280                }
281            })?;
282            text_buffer.push_str(chunk_str);
283
284            // Process complete character groups
285            let chars: Vec<char> = text_buffer.chars().collect();
286            let complete_groups = (chars.len() / chars_per_group) * chars_per_group;
287
288            if complete_groups > 0 {
289                let to_decode: String = chars[..complete_groups].iter().collect();
290                let decoded = crate::encoders::algorithms::chunked::decode_chunked(
291                    &to_decode,
292                    self.dictionary,
293                )?;
294
295                if let Some(ref mut h) = hasher {
296                    h.update(&decoded);
297                }
298
299                self.writer
300                    .write_all(&decoded)
301                    .map_err(|_| DecodeError::InvalidCharacter {
302                        char: '\0',
303                        position: 0,
304                        input: String::new(),
305                        valid_chars: String::new(),
306                    })?;
307
308                // Keep remaining chars for next iteration
309                text_buffer = chars[complete_groups..].iter().collect();
310            }
311        }
312
313        // Process any remaining characters
314        if !text_buffer.is_empty() {
315            let decoded = crate::encoders::algorithms::chunked::decode_chunked(
316                &text_buffer,
317                self.dictionary,
318            )?;
319
320            if let Some(ref mut h) = hasher {
321                h.update(&decoded);
322            }
323
324            self.writer
325                .write_all(&decoded)
326                .map_err(|_| DecodeError::InvalidCharacter {
327                    char: '\0',
328                    position: 0,
329                    input: String::new(),
330                    valid_chars: String::new(),
331                })?;
332        }
333
334        Ok(hasher.map(|h| h.finalize()))
335    }
336
337    fn decode_byte_range<R: Read>(
338        &mut self,
339        reader: &mut R,
340    ) -> Result<Option<Vec<u8>>, DecodeError> {
341        let mut char_buffer = vec![0u8; CHUNK_SIZE];
342        let mut hasher = self
343            .hash_algo
344            .map(|algo| create_hasher_writer(algo, &self.xxhash_config));
345
346        loop {
347            let bytes_read =
348                reader
349                    .read(&mut char_buffer)
350                    .map_err(|_| DecodeError::InvalidCharacter {
351                        char: '\0',
352                        position: 0,
353                        input: String::new(),
354                        valid_chars: String::new(),
355                    })?;
356            if bytes_read == 0 {
357                break;
358            }
359
360            let chunk_str = std::str::from_utf8(&char_buffer[..bytes_read]).map_err(|_| {
361                DecodeError::InvalidCharacter {
362                    char: '\0',
363                    position: 0,
364                    input: String::new(),
365                    valid_chars: String::new(),
366                }
367            })?;
368
369            let decoded = crate::encoders::algorithms::byte_range::decode_byte_range(
370                chunk_str,
371                self.dictionary,
372            )?;
373
374            if let Some(ref mut h) = hasher {
375                h.update(&decoded);
376            }
377
378            self.writer
379                .write_all(&decoded)
380                .map_err(|_| DecodeError::InvalidCharacter {
381                    char: '\0',
382                    position: 0,
383                    input: String::new(),
384                    valid_chars: String::new(),
385                })?;
386        }
387
388        Ok(hasher.map(|h| h.finalize()))
389    }
390}