base_d/encoders/streaming/
decoder.rs

1use crate::core::dictionary::Dictionary;
2use crate::encoders::algorithms::DecodeError;
3use crate::features::compression::CompressionAlgorithm;
4use crate::features::hashing::HashAlgorithm;
5use std::io::{Read, Write};
6
7use super::hasher::{HasherWriter, create_hasher_writer};
8
9const CHUNK_SIZE: usize = 4096; // 4KB chunks
10
11/// Streaming decoder for processing large amounts of encoded data efficiently.
12///
13/// Processes data in chunks to avoid loading entire files into memory.
14/// Suitable for decoding large files or network streams.
15/// Supports optional decompression and hashing during decoding.
16pub struct StreamingDecoder<'a, W: Write> {
17    dictionary: &'a Dictionary,
18    writer: W,
19    decompress_algo: Option<CompressionAlgorithm>,
20    hash_algo: Option<HashAlgorithm>,
21    xxhash_config: crate::features::hashing::XxHashConfig,
22}
23
24impl<'a, W: Write> StreamingDecoder<'a, W> {
25    /// Creates a new streaming decoder.
26    ///
27    /// # Arguments
28    ///
29    /// * `dictionary` - The dictionary used for encoding
30    /// * `writer` - The destination for decoded output
31    pub fn new(dictionary: &'a Dictionary, writer: W) -> Self {
32        StreamingDecoder {
33            dictionary,
34            writer,
35            decompress_algo: None,
36            hash_algo: None,
37            xxhash_config: crate::features::hashing::XxHashConfig::default(),
38        }
39    }
40
41    /// Sets decompression algorithm.
42    pub fn with_decompression(mut self, algo: CompressionAlgorithm) -> Self {
43        self.decompress_algo = Some(algo);
44        self
45    }
46
47    /// Sets hash algorithm for computing hash during decoding.
48    pub fn with_hashing(mut self, algo: HashAlgorithm) -> Self {
49        self.hash_algo = Some(algo);
50        self
51    }
52
53    /// Sets xxHash configuration (seed and secret).
54    pub fn with_xxhash_config(mut self, config: crate::features::hashing::XxHashConfig) -> Self {
55        self.xxhash_config = config;
56        self
57    }
58
59    /// Decodes data from a reader in chunks.
60    ///
61    /// Note: Radix mode requires reading the entire input at once
62    /// due to the nature of true base conversion. For truly streaming
63    /// behavior, use Chunked or ByteRange modes.
64    ///
65    /// Returns the computed hash if hash_algo was set, otherwise None.
66    pub fn decode<R: Read>(&mut self, reader: &mut R) -> Result<Option<Vec<u8>>, DecodeError> {
67        // If decompression is enabled, decode then decompress
68        if let Some(algo) = self.decompress_algo {
69            return self.decode_with_decompression(reader, algo);
70        }
71
72        // No decompression - decode directly with optional hashing
73        match self.dictionary.mode() {
74            crate::core::config::EncodingMode::Chunked => self.decode_chunked(reader),
75            crate::core::config::EncodingMode::ByteRange => self.decode_byte_range(reader),
76            crate::core::config::EncodingMode::Radix => {
77                // Radix mode requires entire input
78                let mut buffer = String::new();
79                reader
80                    .read_to_string(&mut buffer)
81                    .map_err(|_| DecodeError::InvalidCharacter {
82                        char: '\0',
83                        position: 0,
84                        input: String::new(),
85                        valid_chars: String::new(),
86                    })?;
87                let decoded = crate::encoders::algorithms::radix::decode(&buffer, self.dictionary)?;
88
89                let hash = self
90                    .hash_algo
91                    .map(|algo| crate::features::hashing::hash(&decoded, algo));
92
93                self.writer
94                    .write_all(&decoded)
95                    .map_err(|_| DecodeError::InvalidCharacter {
96                        char: '\0',
97                        position: 0,
98                        input: String::new(),
99                        valid_chars: String::new(),
100                    })?;
101                Ok(hash)
102            }
103        }
104    }
105
106    /// Decode with decompression: decode stream then decompress decoded data.
107    fn decode_with_decompression<R: Read>(
108        &mut self,
109        reader: &mut R,
110        algo: CompressionAlgorithm,
111    ) -> Result<Option<Vec<u8>>, DecodeError> {
112        use std::io::Cursor;
113
114        // Decode the input stream to get compressed data
115        let mut compressed_data = Vec::new();
116        {
117            let mut temp_decoder = StreamingDecoder::new(self.dictionary, &mut compressed_data);
118            temp_decoder.decode(reader)?;
119        }
120
121        // Decompress and write to output with optional hashing
122        let mut cursor = Cursor::new(compressed_data);
123        let hash = self.decompress_stream(&mut cursor, algo).map_err(|_| {
124            DecodeError::InvalidCharacter {
125                char: '\0',
126                position: 0,
127                input: String::new(),
128                valid_chars: String::new(),
129            }
130        })?;
131
132        Ok(hash)
133    }
134
135    /// Decompress a stream with optional hashing.
136    fn decompress_stream<R: Read>(
137        &mut self,
138        reader: &mut R,
139        algo: CompressionAlgorithm,
140    ) -> std::io::Result<Option<Vec<u8>>> {
141        use flate2::read::GzDecoder;
142        use xz2::read::XzDecoder;
143
144        let mut hasher = self
145            .hash_algo
146            .map(|algo| create_hasher_writer(algo, &self.xxhash_config));
147
148        match algo {
149            CompressionAlgorithm::Gzip => {
150                let mut decoder = GzDecoder::new(reader);
151                Self::copy_with_hash_to_writer(&mut decoder, &mut self.writer, &mut hasher)?;
152            }
153            CompressionAlgorithm::Zstd => {
154                let mut decoder =
155                    zstd::stream::read::Decoder::new(reader).map_err(std::io::Error::other)?;
156                Self::copy_with_hash_to_writer(&mut decoder, &mut self.writer, &mut hasher)?;
157            }
158            CompressionAlgorithm::Brotli => {
159                let mut decoder = brotli::Decompressor::new(reader, 4096);
160                Self::copy_with_hash_to_writer(&mut decoder, &mut self.writer, &mut hasher)?;
161            }
162            CompressionAlgorithm::Lzma => {
163                let mut decoder = XzDecoder::new(reader);
164                Self::copy_with_hash_to_writer(&mut decoder, &mut self.writer, &mut hasher)?;
165            }
166            CompressionAlgorithm::Lz4 | CompressionAlgorithm::Snappy => {
167                // LZ4 and Snappy don't have streaming decoders
168                let mut compressed = Vec::new();
169                reader.read_to_end(&mut compressed)?;
170
171                let decompressed = match algo {
172                    CompressionAlgorithm::Lz4 => {
173                        lz4::block::decompress(&compressed, Some(100 * 1024 * 1024))
174                            .map_err(std::io::Error::other)?
175                    }
176                    CompressionAlgorithm::Snappy => {
177                        let mut decoder = snap::raw::Decoder::new();
178                        decoder
179                            .decompress_vec(&compressed)
180                            .map_err(std::io::Error::other)?
181                    }
182                    _ => unreachable!(),
183                };
184
185                let hash = self
186                    .hash_algo
187                    .map(|algo| crate::features::hashing::hash(&decompressed, algo));
188                self.writer.write_all(&decompressed)?;
189                return Ok(hash);
190            }
191        }
192
193        Ok(hasher.map(|h| h.finalize()))
194    }
195
196    fn copy_with_hash_to_writer<R: Read>(
197        reader: &mut R,
198        writer: &mut W,
199        hasher: &mut Option<HasherWriter>,
200    ) -> std::io::Result<()> {
201        let mut buffer = vec![0u8; CHUNK_SIZE];
202
203        loop {
204            let bytes_read = reader.read(&mut buffer)?;
205            if bytes_read == 0 {
206                break;
207            }
208
209            let chunk = &buffer[..bytes_read];
210            if let Some(h) = hasher {
211                h.update(chunk);
212            }
213            writer.write_all(chunk)?;
214        }
215
216        Ok(())
217    }
218
219    fn decode_chunked<R: Read>(&mut self, reader: &mut R) -> Result<Option<Vec<u8>>, DecodeError> {
220        let base = self.dictionary.base();
221        let bits_per_char = (base as f64).log2() as usize;
222        let chars_per_group = 8 / bits_per_char;
223
224        // Read text in chunks
225        let mut text_buffer = String::new();
226        let mut char_buffer = vec![0u8; CHUNK_SIZE];
227        let mut hasher = self
228            .hash_algo
229            .map(|algo| create_hasher_writer(algo, &self.xxhash_config));
230
231        loop {
232            let bytes_read =
233                reader
234                    .read(&mut char_buffer)
235                    .map_err(|_| DecodeError::InvalidCharacter {
236                        char: '\0',
237                        position: 0,
238                        input: String::new(),
239                        valid_chars: String::new(),
240                    })?;
241            if bytes_read == 0 {
242                break;
243            }
244
245            let chunk_str = std::str::from_utf8(&char_buffer[..bytes_read]).map_err(|_| {
246                DecodeError::InvalidCharacter {
247                    char: '\0',
248                    position: 0,
249                    input: String::new(),
250                    valid_chars: String::new(),
251                }
252            })?;
253            text_buffer.push_str(chunk_str);
254
255            // Process complete character groups
256            let chars: Vec<char> = text_buffer.chars().collect();
257            let complete_groups = (chars.len() / chars_per_group) * chars_per_group;
258
259            if complete_groups > 0 {
260                let to_decode: String = chars[..complete_groups].iter().collect();
261                let decoded = crate::encoders::algorithms::chunked::decode_chunked(
262                    &to_decode,
263                    self.dictionary,
264                )?;
265
266                if let Some(ref mut h) = hasher {
267                    h.update(&decoded);
268                }
269
270                self.writer
271                    .write_all(&decoded)
272                    .map_err(|_| DecodeError::InvalidCharacter {
273                        char: '\0',
274                        position: 0,
275                        input: String::new(),
276                        valid_chars: String::new(),
277                    })?;
278
279                // Keep remaining chars for next iteration
280                text_buffer = chars[complete_groups..].iter().collect();
281            }
282        }
283
284        // Process any remaining characters
285        if !text_buffer.is_empty() {
286            let decoded = crate::encoders::algorithms::chunked::decode_chunked(
287                &text_buffer,
288                self.dictionary,
289            )?;
290
291            if let Some(ref mut h) = hasher {
292                h.update(&decoded);
293            }
294
295            self.writer
296                .write_all(&decoded)
297                .map_err(|_| DecodeError::InvalidCharacter {
298                    char: '\0',
299                    position: 0,
300                    input: String::new(),
301                    valid_chars: String::new(),
302                })?;
303        }
304
305        Ok(hasher.map(|h| h.finalize()))
306    }
307
308    fn decode_byte_range<R: Read>(
309        &mut self,
310        reader: &mut R,
311    ) -> Result<Option<Vec<u8>>, DecodeError> {
312        let mut char_buffer = vec![0u8; CHUNK_SIZE];
313        let mut hasher = self
314            .hash_algo
315            .map(|algo| create_hasher_writer(algo, &self.xxhash_config));
316
317        loop {
318            let bytes_read =
319                reader
320                    .read(&mut char_buffer)
321                    .map_err(|_| DecodeError::InvalidCharacter {
322                        char: '\0',
323                        position: 0,
324                        input: String::new(),
325                        valid_chars: String::new(),
326                    })?;
327            if bytes_read == 0 {
328                break;
329            }
330
331            let chunk_str = std::str::from_utf8(&char_buffer[..bytes_read]).map_err(|_| {
332                DecodeError::InvalidCharacter {
333                    char: '\0',
334                    position: 0,
335                    input: String::new(),
336                    valid_chars: String::new(),
337                }
338            })?;
339
340            let decoded = crate::encoders::algorithms::byte_range::decode_byte_range(
341                chunk_str,
342                self.dictionary,
343            )?;
344
345            if let Some(ref mut h) = hasher {
346                h.update(&decoded);
347            }
348
349            self.writer
350                .write_all(&decoded)
351                .map_err(|_| DecodeError::InvalidCharacter {
352                    char: '\0',
353                    position: 0,
354                    input: String::new(),
355                    valid_chars: String::new(),
356                })?;
357        }
358
359        Ok(hasher.map(|h| h.finalize()))
360    }
361}