Skip to main content

haagenti_stream/
reader.rs

1//! Read adapters for streaming decompression.
2
3use std::io::{self, Read};
4
5use haagenti_core::Decompressor;
6
7use crate::{StreamBuffer, DEFAULT_BUFFER_SIZE};
8
9/// A reader that decompresses data from the inner reader.
10///
11/// Reads compressed data from the inner reader, decompresses it,
12/// and provides the decompressed data to the caller.
13pub struct DecompressReader<R: Read, D: Decompressor> {
14    inner: R,
15    decompressor: D,
16    input_buffer: StreamBuffer,
17    output_buffer: StreamBuffer,
18    finished: bool,
19}
20
21impl<R: Read, D: Decompressor> DecompressReader<R, D> {
22    /// Create a new decompressing reader with default buffer size.
23    pub fn new(inner: R, decompressor: D) -> Self {
24        Self::with_buffer_size(inner, decompressor, DEFAULT_BUFFER_SIZE)
25    }
26
27    /// Create a new decompressing reader with specified buffer size.
28    pub fn with_buffer_size(inner: R, decompressor: D, buffer_size: usize) -> Self {
29        Self {
30            inner,
31            decompressor,
32            input_buffer: StreamBuffer::with_capacity(buffer_size),
33            output_buffer: StreamBuffer::with_capacity(buffer_size * 4), // Decompressed is usually larger
34            finished: false,
35        }
36    }
37
38    /// Get a reference to the inner reader.
39    pub fn get_ref(&self) -> &R {
40        &self.inner
41    }
42
43    /// Get a mutable reference to the inner reader.
44    pub fn get_mut(&mut self) -> &mut R {
45        &mut self.inner
46    }
47
48    /// Get a reference to the decompressor.
49    pub fn decompressor(&self) -> &D {
50        &self.decompressor
51    }
52
53    /// Fill the input buffer from the inner reader.
54    fn fill_input(&mut self) -> io::Result<bool> {
55        self.input_buffer.compact();
56
57        let buf = self.input_buffer.writable();
58        if buf.is_empty() {
59            return Ok(true); // Buffer is full
60        }
61
62        let n = self.inner.read(buf)?;
63        if n == 0 {
64            self.finished = true;
65            return Ok(false);
66        }
67
68        self.input_buffer.advance(n);
69        Ok(true)
70    }
71
72    /// Decompress data from input buffer to output buffer.
73    fn decompress_chunk(&mut self) -> io::Result<()> {
74        if self.input_buffer.is_empty() {
75            return Ok(());
76        }
77
78        let input = self.input_buffer.readable();
79        let decompressed = self
80            .decompressor
81            .decompress(input)
82            .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))?;
83
84        self.output_buffer.clear();
85        self.output_buffer.write(&decompressed);
86        self.input_buffer.clear();
87
88        Ok(())
89    }
90}
91
92impl<R: Read, D: Decompressor> Read for DecompressReader<R, D> {
93    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
94        // First, try to read from output buffer
95        if self.output_buffer.available() > 0 {
96            return Ok(self.output_buffer.read(buf));
97        }
98
99        // If we've finished reading input, we're done
100        if self.finished && self.input_buffer.is_empty() {
101            return Ok(0);
102        }
103
104        // Read more input and decompress
105        self.fill_input()?;
106        if !self.input_buffer.is_empty() {
107            self.decompress_chunk()?;
108        }
109
110        // Return what we have
111        Ok(self.output_buffer.read(buf))
112    }
113}
114
115/// A generic read adapter for transforming data.
116///
117/// This is a simpler interface that reads all input, transforms it,
118/// and provides the result.
119pub struct ReadAdapter<R: Read, F> {
120    inner: R,
121    transform: F,
122    buffer: Vec<u8>,
123    position: usize,
124    transformed: bool,
125}
126
127impl<R: Read, F> ReadAdapter<R, F>
128where
129    F: FnMut(Vec<u8>) -> io::Result<Vec<u8>>,
130{
131    /// Create a new read adapter.
132    pub fn new(inner: R, transform: F) -> Self {
133        Self {
134            inner,
135            transform,
136            buffer: Vec::new(),
137            position: 0,
138            transformed: false,
139        }
140    }
141
142    /// Get a reference to the inner reader.
143    pub fn get_ref(&self) -> &R {
144        &self.inner
145    }
146
147    /// Get a mutable reference to the inner reader.
148    pub fn get_mut(&mut self) -> &mut R {
149        &mut self.inner
150    }
151
152    /// Read all input and transform it.
153    fn ensure_transformed(&mut self) -> io::Result<()> {
154        if self.transformed {
155            return Ok(());
156        }
157
158        let mut input = Vec::new();
159        self.inner.read_to_end(&mut input)?;
160
161        self.buffer = (self.transform)(input)?;
162        self.transformed = true;
163        Ok(())
164    }
165}
166
167impl<R: Read, F> Read for ReadAdapter<R, F>
168where
169    F: FnMut(Vec<u8>) -> io::Result<Vec<u8>>,
170{
171    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
172        self.ensure_transformed()?;
173
174        let remaining = self.buffer.len() - self.position;
175        if remaining == 0 {
176            return Ok(0);
177        }
178
179        let to_read = buf.len().min(remaining);
180        buf[..to_read].copy_from_slice(&self.buffer[self.position..self.position + to_read]);
181        self.position += to_read;
182
183        Ok(to_read)
184    }
185}
186
187#[cfg(test)]
188mod tests {
189    use super::*;
190
191    // Simple mock decompressor for testing
192    struct MockDecompressor;
193
194    impl Decompressor for MockDecompressor {
195        fn algorithm(&self) -> haagenti_core::Algorithm {
196            haagenti_core::Algorithm::Lz4
197        }
198
199        fn decompress(&self, input: &[u8]) -> haagenti_core::Result<Vec<u8>> {
200            // Simple "decompression": read length prefix, return data
201            if input.len() < 4 {
202                return Err(haagenti_core::Error::corrupted("input too short"));
203            }
204            let len = u32::from_le_bytes(input[..4].try_into().unwrap()) as usize;
205            if input.len() < 4 + len {
206                return Err(haagenti_core::Error::corrupted("truncated data"));
207            }
208            Ok(input[4..4 + len].to_vec())
209        }
210
211        fn decompress_to(&self, input: &[u8], output: &mut [u8]) -> haagenti_core::Result<usize> {
212            let decompressed = self.decompress(input)?;
213            if decompressed.len() > output.len() {
214                return Err(haagenti_core::Error::buffer_too_small(
215                    decompressed.len(),
216                    output.len(),
217                ));
218            }
219            output[..decompressed.len()].copy_from_slice(&decompressed);
220            Ok(decompressed.len())
221        }
222    }
223
224    #[test]
225    fn test_decompress_reader() {
226        // Create mock compressed data: 4-byte length + data
227        let mut compressed = Vec::new();
228        compressed.extend_from_slice(&5u32.to_le_bytes());
229        compressed.extend_from_slice(b"Hello");
230
231        let cursor = io::Cursor::new(compressed);
232        let mut reader = DecompressReader::with_buffer_size(cursor, MockDecompressor, 64);
233
234        let mut output = Vec::new();
235        reader.read_to_end(&mut output).unwrap();
236
237        assert_eq!(output, b"Hello");
238    }
239
240    #[test]
241    fn test_read_adapter() {
242        let input = io::Cursor::new(b"hello".to_vec());
243        let mut adapter = ReadAdapter::new(input, |data: Vec<u8>| {
244            // Transform: uppercase
245            Ok(data.to_ascii_uppercase())
246        });
247
248        let mut output = Vec::new();
249        adapter.read_to_end(&mut output).unwrap();
250
251        assert_eq!(output, b"HELLO");
252    }
253}