embeddenator_io/io/
stream.rs1use std::io::{self, Read, Write};
7use std::path::Path;
8
9pub struct StreamReader<R> {
11 reader: R,
12 buffer_size: usize,
13}
14
15impl<R: Read> StreamReader<R> {
16 pub fn new(reader: R) -> Self {
18 Self::with_buffer_size(reader, super::buffer::DEFAULT_BUFFER_SIZE)
19 }
20
21 pub fn with_buffer_size(reader: R, buffer_size: usize) -> Self {
23 Self {
24 reader,
25 buffer_size,
26 }
27 }
28
29 pub fn read_all<F, T>(&mut self, mut transform: F) -> io::Result<Vec<T>>
31 where
32 F: FnMut(&[u8]) -> io::Result<T>,
33 {
34 let mut results = Vec::new();
35 let mut buffer = vec![0u8; self.buffer_size];
36
37 loop {
38 let n = self.reader.read(&mut buffer)?;
39 if n == 0 {
40 break;
41 }
42 let result = transform(&buffer[..n])?;
43 results.push(result);
44 }
45
46 Ok(results)
47 }
48
49 pub fn fold<F, T>(&mut self, init: T, mut fold_fn: F) -> io::Result<T>
51 where
52 F: FnMut(T, &[u8]) -> io::Result<T>,
53 {
54 let mut acc = init;
55 let mut buffer = vec![0u8; self.buffer_size];
56
57 loop {
58 let n = self.reader.read(&mut buffer)?;
59 if n == 0 {
60 break;
61 }
62 acc = fold_fn(acc, &buffer[..n])?;
63 }
64
65 Ok(acc)
66 }
67
68 pub fn count_bytes(&mut self) -> io::Result<u64> {
70 self.fold(0u64, |acc, chunk| Ok(acc + chunk.len() as u64))
71 }
72}
73
74pub struct StreamWriter<W> {
76 writer: W,
77 buffer: Vec<u8>,
78 buffer_size: usize,
79}
80
81impl<W: Write> StreamWriter<W> {
82 pub fn new(writer: W) -> Self {
84 Self::with_buffer_size(writer, super::buffer::DEFAULT_BUFFER_SIZE)
85 }
86
87 pub fn with_buffer_size(writer: W, buffer_size: usize) -> Self {
89 Self {
90 writer,
91 buffer: Vec::with_capacity(buffer_size),
92 buffer_size,
93 }
94 }
95
96 pub fn write_chunk(&mut self, data: &[u8]) -> io::Result<()> {
98 if self.buffer.len() + data.len() <= self.buffer_size {
100 self.buffer.extend_from_slice(data);
101 return Ok(());
102 }
103
104 if !self.buffer.is_empty() {
106 self.writer.write_all(&self.buffer)?;
107 self.buffer.clear();
108 }
109
110 if data.len() > self.buffer_size {
112 self.writer.write_all(data)?;
113 } else {
114 self.buffer.extend_from_slice(data);
115 }
116
117 Ok(())
118 }
119
120 pub fn flush(&mut self) -> io::Result<()> {
122 if !self.buffer.is_empty() {
123 self.writer.write_all(&self.buffer)?;
124 self.buffer.clear();
125 }
126 self.writer.flush()
127 }
128
129 pub fn finish(mut self) -> io::Result<W> {
131 self.flush()?;
132 Ok(self.writer)
133 }
134}
135
136pub fn stream_write_file<P, I, D>(path: P, chunks: I) -> io::Result<()>
146where
147 P: AsRef<Path>,
148 I: Iterator<Item = D>,
149 D: AsRef<[u8]>,
150{
151 let file = std::fs::File::create(path)?;
152 let mut writer = StreamWriter::new(file);
153
154 for chunk in chunks {
155 writer.write_chunk(chunk.as_ref())?;
156 }
157
158 writer.flush()?;
159 Ok(())
160}
161
162pub fn stream_read_file<P, F>(path: P, mut callback: F) -> io::Result<()>
176where
177 P: AsRef<Path>,
178 F: FnMut(&[u8]) -> io::Result<()>,
179{
180 let file = std::fs::File::open(path)?;
181 let mut reader = StreamReader::new(file);
182 let mut buffer = vec![0u8; reader.buffer_size];
183
184 loop {
185 let n = reader.reader.read(&mut buffer)?;
186 if n == 0 {
187 break;
188 }
189 callback(&buffer[..n])?;
190 }
191
192 Ok(())
193}
194
195#[cfg(feature = "async")]
196pub mod async_stream {
197 use std::io;
200 use std::path::Path;
201 use tokio::io::{AsyncReadExt, AsyncWriteExt};
202
203 use super::super::buffer::DEFAULT_BUFFER_SIZE;
204
205 pub struct AsyncStreamReader<R> {
207 reader: R,
208 buffer_size: usize,
209 }
210
211 impl<R: AsyncReadExt + Unpin> AsyncStreamReader<R> {
212 pub fn new(reader: R) -> Self {
214 Self::with_buffer_size(reader, DEFAULT_BUFFER_SIZE)
215 }
216
217 pub fn with_buffer_size(reader: R, buffer_size: usize) -> Self {
219 Self {
220 reader,
221 buffer_size,
222 }
223 }
224
225 pub async fn read_all<F, Fut, T>(&mut self, mut transform: F) -> io::Result<Vec<T>>
227 where
228 F: FnMut(Vec<u8>) -> Fut,
229 Fut: std::future::Future<Output = io::Result<T>>,
230 {
231 let mut results = Vec::new();
232 let mut buffer = vec![0u8; self.buffer_size];
233
234 loop {
235 let n = self.reader.read(&mut buffer).await?;
236 if n == 0 {
237 break;
238 }
239 let result = transform(buffer[..n].to_vec()).await?;
240 results.push(result);
241 }
242
243 Ok(results)
244 }
245
246 pub async fn count_bytes(&mut self) -> io::Result<u64> {
248 let mut total = 0u64;
249 let mut buffer = vec![0u8; self.buffer_size];
250
251 loop {
252 let n = self.reader.read(&mut buffer).await?;
253 if n == 0 {
254 break;
255 }
256 total += n as u64;
257 }
258
259 Ok(total)
260 }
261 }
262
263 pub struct AsyncStreamWriter<W> {
265 writer: W,
266 buffer: Vec<u8>,
267 buffer_size: usize,
268 }
269
270 impl<W: AsyncWriteExt + Unpin> AsyncStreamWriter<W> {
271 pub fn new(writer: W) -> Self {
273 Self::with_buffer_size(writer, DEFAULT_BUFFER_SIZE)
274 }
275
276 pub fn with_buffer_size(writer: W, buffer_size: usize) -> Self {
278 Self {
279 writer,
280 buffer: Vec::with_capacity(buffer_size),
281 buffer_size,
282 }
283 }
284
285 pub async fn write_chunk(&mut self, data: &[u8]) -> io::Result<()> {
287 if self.buffer.len() + data.len() <= self.buffer_size {
288 self.buffer.extend_from_slice(data);
289 return Ok(());
290 }
291
292 if !self.buffer.is_empty() {
293 self.writer.write_all(&self.buffer).await?;
294 self.buffer.clear();
295 }
296
297 if data.len() > self.buffer_size {
298 self.writer.write_all(data).await?;
299 } else {
300 self.buffer.extend_from_slice(data);
301 }
302
303 Ok(())
304 }
305
306 pub async fn flush(&mut self) -> io::Result<()> {
308 if !self.buffer.is_empty() {
309 self.writer.write_all(&self.buffer).await?;
310 self.buffer.clear();
311 }
312 self.writer.flush().await
313 }
314
315 pub async fn finish(mut self) -> io::Result<W> {
317 self.flush().await?;
318 Ok(self.writer)
319 }
320 }
321
322 pub async fn stream_write_file<P, I, D>(path: P, chunks: I) -> io::Result<()>
324 where
325 P: AsRef<Path>,
326 I: Iterator<Item = D>,
327 D: AsRef<[u8]>,
328 {
329 let file = tokio::fs::File::create(path).await?;
330 let mut writer = AsyncStreamWriter::new(file);
331
332 for chunk in chunks {
333 writer.write_chunk(chunk.as_ref()).await?;
334 }
335
336 writer.flush().await?;
337 Ok(())
338 }
339}
340
341#[cfg(test)]
342mod tests {
343 use super::*;
344 use std::io::Cursor;
345
346 #[test]
347 fn test_stream_reader_count_bytes() {
348 let data = b"Hello, world!";
349 let cursor = Cursor::new(data);
350 let mut reader = StreamReader::new(cursor);
351
352 let count = reader.count_bytes().unwrap();
353 assert_eq!(count, data.len() as u64);
354 }
355
356 #[test]
357 fn test_stream_writer() {
358 let mut buffer = Vec::new();
359 let mut writer = StreamWriter::with_buffer_size(&mut buffer, 10);
360
361 writer.write_chunk(b"Hello").unwrap();
362 writer.write_chunk(b", ").unwrap();
363 writer.write_chunk(b"world!").unwrap();
364 writer.flush().unwrap();
365
366 assert_eq!(buffer, b"Hello, world!");
367 }
368
369 #[test]
370 fn test_stream_reader_fold() {
371 let data = b"abcdefghij";
372 let cursor = Cursor::new(data);
373 let mut reader = StreamReader::with_buffer_size(cursor, 3);
374
375 let result = reader.fold(0, |acc, chunk| Ok(acc + chunk.len())).unwrap();
376 assert_eq!(result, data.len());
377 }
378}