haagenti_stream/
reader.rs1use std::io::{self, Read};
4
5use haagenti_core::Decompressor;
6
7use crate::{StreamBuffer, DEFAULT_BUFFER_SIZE};
8
9pub struct DecompressReader<R: Read, D: Decompressor> {
14 inner: R,
15 decompressor: D,
16 input_buffer: StreamBuffer,
17 output_buffer: StreamBuffer,
18 finished: bool,
19}
20
21impl<R: Read, D: Decompressor> DecompressReader<R, D> {
22 pub fn new(inner: R, decompressor: D) -> Self {
24 Self::with_buffer_size(inner, decompressor, DEFAULT_BUFFER_SIZE)
25 }
26
27 pub fn with_buffer_size(inner: R, decompressor: D, buffer_size: usize) -> Self {
29 Self {
30 inner,
31 decompressor,
32 input_buffer: StreamBuffer::with_capacity(buffer_size),
33 output_buffer: StreamBuffer::with_capacity(buffer_size * 4), finished: false,
35 }
36 }
37
38 pub fn get_ref(&self) -> &R {
40 &self.inner
41 }
42
43 pub fn get_mut(&mut self) -> &mut R {
45 &mut self.inner
46 }
47
48 pub fn decompressor(&self) -> &D {
50 &self.decompressor
51 }
52
53 fn fill_input(&mut self) -> io::Result<bool> {
55 self.input_buffer.compact();
56
57 let buf = self.input_buffer.writable();
58 if buf.is_empty() {
59 return Ok(true); }
61
62 let n = self.inner.read(buf)?;
63 if n == 0 {
64 self.finished = true;
65 return Ok(false);
66 }
67
68 self.input_buffer.advance(n);
69 Ok(true)
70 }
71
72 fn decompress_chunk(&mut self) -> io::Result<()> {
74 if self.input_buffer.is_empty() {
75 return Ok(());
76 }
77
78 let input = self.input_buffer.readable();
79 let decompressed = self
80 .decompressor
81 .decompress(input)
82 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))?;
83
84 self.output_buffer.clear();
85 self.output_buffer.write(&decompressed);
86 self.input_buffer.clear();
87
88 Ok(())
89 }
90}
91
92impl<R: Read, D: Decompressor> Read for DecompressReader<R, D> {
93 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
94 if self.output_buffer.available() > 0 {
96 return Ok(self.output_buffer.read(buf));
97 }
98
99 if self.finished && self.input_buffer.is_empty() {
101 return Ok(0);
102 }
103
104 self.fill_input()?;
106 if !self.input_buffer.is_empty() {
107 self.decompress_chunk()?;
108 }
109
110 Ok(self.output_buffer.read(buf))
112 }
113}
114
115pub struct ReadAdapter<R: Read, F> {
120 inner: R,
121 transform: F,
122 buffer: Vec<u8>,
123 position: usize,
124 transformed: bool,
125}
126
127impl<R: Read, F> ReadAdapter<R, F>
128where
129 F: FnMut(Vec<u8>) -> io::Result<Vec<u8>>,
130{
131 pub fn new(inner: R, transform: F) -> Self {
133 Self {
134 inner,
135 transform,
136 buffer: Vec::new(),
137 position: 0,
138 transformed: false,
139 }
140 }
141
142 pub fn get_ref(&self) -> &R {
144 &self.inner
145 }
146
147 pub fn get_mut(&mut self) -> &mut R {
149 &mut self.inner
150 }
151
152 fn ensure_transformed(&mut self) -> io::Result<()> {
154 if self.transformed {
155 return Ok(());
156 }
157
158 let mut input = Vec::new();
159 self.inner.read_to_end(&mut input)?;
160
161 self.buffer = (self.transform)(input)?;
162 self.transformed = true;
163 Ok(())
164 }
165}
166
167impl<R: Read, F> Read for ReadAdapter<R, F>
168where
169 F: FnMut(Vec<u8>) -> io::Result<Vec<u8>>,
170{
171 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
172 self.ensure_transformed()?;
173
174 let remaining = self.buffer.len() - self.position;
175 if remaining == 0 {
176 return Ok(0);
177 }
178
179 let to_read = buf.len().min(remaining);
180 buf[..to_read].copy_from_slice(&self.buffer[self.position..self.position + to_read]);
181 self.position += to_read;
182
183 Ok(to_read)
184 }
185}
186
187#[cfg(test)]
188mod tests {
189 use super::*;
190
191 struct MockDecompressor;
193
194 impl Decompressor for MockDecompressor {
195 fn algorithm(&self) -> haagenti_core::Algorithm {
196 haagenti_core::Algorithm::Lz4
197 }
198
199 fn decompress(&self, input: &[u8]) -> haagenti_core::Result<Vec<u8>> {
200 if input.len() < 4 {
202 return Err(haagenti_core::Error::corrupted("input too short"));
203 }
204 let len = u32::from_le_bytes(input[..4].try_into().unwrap()) as usize;
205 if input.len() < 4 + len {
206 return Err(haagenti_core::Error::corrupted("truncated data"));
207 }
208 Ok(input[4..4 + len].to_vec())
209 }
210
211 fn decompress_to(&self, input: &[u8], output: &mut [u8]) -> haagenti_core::Result<usize> {
212 let decompressed = self.decompress(input)?;
213 if decompressed.len() > output.len() {
214 return Err(haagenti_core::Error::buffer_too_small(
215 decompressed.len(),
216 output.len(),
217 ));
218 }
219 output[..decompressed.len()].copy_from_slice(&decompressed);
220 Ok(decompressed.len())
221 }
222 }
223
224 #[test]
225 fn test_decompress_reader() {
226 let mut compressed = Vec::new();
228 compressed.extend_from_slice(&5u32.to_le_bytes());
229 compressed.extend_from_slice(b"Hello");
230
231 let cursor = io::Cursor::new(compressed);
232 let mut reader = DecompressReader::with_buffer_size(cursor, MockDecompressor, 64);
233
234 let mut output = Vec::new();
235 reader.read_to_end(&mut output).unwrap();
236
237 assert_eq!(output, b"Hello");
238 }
239
240 #[test]
241 fn test_read_adapter() {
242 let input = io::Cursor::new(b"hello".to_vec());
243 let mut adapter = ReadAdapter::new(input, |data: Vec<u8>| {
244 Ok(data.to_ascii_uppercase())
246 });
247
248 let mut output = Vec::new();
249 adapter.read_to_end(&mut output).unwrap();
250
251 assert_eq!(output, b"HELLO");
252 }
253}