Skip to main content

embeddenator_io/io/
stream.rs

1//! Streaming I/O utilities for processing large datasets
2//!
3//! Provides efficient streaming interfaces for reading and writing data
4//! without loading entire files into memory.
5
6use std::io::{self, Read, Write};
7use std::path::Path;
8
9/// Stream reader for processing data in chunks
10pub struct StreamReader<R> {
11    reader: R,
12    buffer_size: usize,
13}
14
15impl<R: Read> StreamReader<R> {
16    /// Create a new stream reader with default buffer size
17    pub fn new(reader: R) -> Self {
18        Self::with_buffer_size(reader, super::buffer::DEFAULT_BUFFER_SIZE)
19    }
20
21    /// Create a new stream reader with custom buffer size
22    pub fn with_buffer_size(reader: R, buffer_size: usize) -> Self {
23        Self {
24            reader,
25            buffer_size,
26        }
27    }
28
29    /// Read all data and apply a transformation function
30    pub fn read_all<F, T>(&mut self, mut transform: F) -> io::Result<Vec<T>>
31    where
32        F: FnMut(&[u8]) -> io::Result<T>,
33    {
34        let mut results = Vec::new();
35        let mut buffer = vec![0u8; self.buffer_size];
36
37        loop {
38            let n = self.reader.read(&mut buffer)?;
39            if n == 0 {
40                break;
41            }
42            let result = transform(&buffer[..n])?;
43            results.push(result);
44        }
45
46        Ok(results)
47    }
48
49    /// Read data and fold it into an accumulator
50    pub fn fold<F, T>(&mut self, init: T, mut fold_fn: F) -> io::Result<T>
51    where
52        F: FnMut(T, &[u8]) -> io::Result<T>,
53    {
54        let mut acc = init;
55        let mut buffer = vec![0u8; self.buffer_size];
56
57        loop {
58            let n = self.reader.read(&mut buffer)?;
59            if n == 0 {
60                break;
61            }
62            acc = fold_fn(acc, &buffer[..n])?;
63        }
64
65        Ok(acc)
66    }
67
68    /// Count bytes in the stream
69    pub fn count_bytes(&mut self) -> io::Result<u64> {
70        self.fold(0u64, |acc, chunk| Ok(acc + chunk.len() as u64))
71    }
72}
73
74/// Stream writer for efficient data output
75pub struct StreamWriter<W> {
76    writer: W,
77    buffer: Vec<u8>,
78    buffer_size: usize,
79}
80
81impl<W: Write> StreamWriter<W> {
82    /// Create a new stream writer with default buffer size
83    pub fn new(writer: W) -> Self {
84        Self::with_buffer_size(writer, super::buffer::DEFAULT_BUFFER_SIZE)
85    }
86
87    /// Create a new stream writer with custom buffer size
88    pub fn with_buffer_size(writer: W, buffer_size: usize) -> Self {
89        Self {
90            writer,
91            buffer: Vec::with_capacity(buffer_size),
92            buffer_size,
93        }
94    }
95
96    /// Write a chunk of data
97    pub fn write_chunk(&mut self, data: &[u8]) -> io::Result<()> {
98        // If data fits in buffer, append it
99        if self.buffer.len() + data.len() <= self.buffer_size {
100            self.buffer.extend_from_slice(data);
101            return Ok(());
102        }
103
104        // Flush current buffer
105        if !self.buffer.is_empty() {
106            self.writer.write_all(&self.buffer)?;
107            self.buffer.clear();
108        }
109
110        // If data is larger than buffer, write directly
111        if data.len() > self.buffer_size {
112            self.writer.write_all(data)?;
113        } else {
114            self.buffer.extend_from_slice(data);
115        }
116
117        Ok(())
118    }
119
120    /// Flush any buffered data
121    pub fn flush(&mut self) -> io::Result<()> {
122        if !self.buffer.is_empty() {
123            self.writer.write_all(&self.buffer)?;
124            self.buffer.clear();
125        }
126        self.writer.flush()
127    }
128
129    /// Finish writing and return the inner writer
130    pub fn finish(mut self) -> io::Result<W> {
131        self.flush()?;
132        Ok(self.writer)
133    }
134}
135
136/// Write data to a file in streaming fashion
137///
138/// # Examples
139/// ```no_run
140/// use embeddenator_io::stream_write_file;
141///
142/// let chunks: Vec<&[u8]> = vec![b"Hello, ", b"world!"];
143/// stream_write_file("output.txt", chunks.iter().copied()).unwrap();
144/// ```
145pub fn stream_write_file<P, I, D>(path: P, chunks: I) -> io::Result<()>
146where
147    P: AsRef<Path>,
148    I: Iterator<Item = D>,
149    D: AsRef<[u8]>,
150{
151    let file = std::fs::File::create(path)?;
152    let mut writer = StreamWriter::new(file);
153
154    for chunk in chunks {
155        writer.write_chunk(chunk.as_ref())?;
156    }
157
158    writer.flush()?;
159    Ok(())
160}
161
162/// Read a file in streaming fashion
163///
164/// # Examples
165/// ```no_run
166/// use embeddenator_io::stream_read_file;
167///
168/// let mut total_size = 0;
169/// stream_read_file("input.txt", |chunk| {
170///     total_size += chunk.len();
171///     Ok(())
172/// }).unwrap();
173/// println!("Total size: {} bytes", total_size);
174/// ```
175pub fn stream_read_file<P, F>(path: P, mut callback: F) -> io::Result<()>
176where
177    P: AsRef<Path>,
178    F: FnMut(&[u8]) -> io::Result<()>,
179{
180    let file = std::fs::File::open(path)?;
181    let mut reader = StreamReader::new(file);
182    let mut buffer = vec![0u8; reader.buffer_size];
183
184    loop {
185        let n = reader.reader.read(&mut buffer)?;
186        if n == 0 {
187            break;
188        }
189        callback(&buffer[..n])?;
190    }
191
192    Ok(())
193}
194
195#[cfg(feature = "async")]
196pub mod async_stream {
197    //! Async streaming I/O utilities
198
199    use std::io;
200    use std::path::Path;
201    use tokio::io::{AsyncReadExt, AsyncWriteExt};
202
203    use super::super::buffer::DEFAULT_BUFFER_SIZE;
204
205    /// Async stream reader
206    pub struct AsyncStreamReader<R> {
207        reader: R,
208        buffer_size: usize,
209    }
210
211    impl<R: AsyncReadExt + Unpin> AsyncStreamReader<R> {
212        /// Create a new async stream reader
213        pub fn new(reader: R) -> Self {
214            Self::with_buffer_size(reader, DEFAULT_BUFFER_SIZE)
215        }
216
217        /// Create a new async stream reader with custom buffer size
218        pub fn with_buffer_size(reader: R, buffer_size: usize) -> Self {
219            Self {
220                reader,
221                buffer_size,
222            }
223        }
224
225        /// Read all data and apply async transformation
226        pub async fn read_all<F, Fut, T>(&mut self, mut transform: F) -> io::Result<Vec<T>>
227        where
228            F: FnMut(Vec<u8>) -> Fut,
229            Fut: std::future::Future<Output = io::Result<T>>,
230        {
231            let mut results = Vec::new();
232            let mut buffer = vec![0u8; self.buffer_size];
233
234            loop {
235                let n = self.reader.read(&mut buffer).await?;
236                if n == 0 {
237                    break;
238                }
239                let result = transform(buffer[..n].to_vec()).await?;
240                results.push(result);
241            }
242
243            Ok(results)
244        }
245
246        /// Count bytes asynchronously
247        pub async fn count_bytes(&mut self) -> io::Result<u64> {
248            let mut total = 0u64;
249            let mut buffer = vec![0u8; self.buffer_size];
250
251            loop {
252                let n = self.reader.read(&mut buffer).await?;
253                if n == 0 {
254                    break;
255                }
256                total += n as u64;
257            }
258
259            Ok(total)
260        }
261    }
262
263    /// Async stream writer
264    pub struct AsyncStreamWriter<W> {
265        writer: W,
266        buffer: Vec<u8>,
267        buffer_size: usize,
268    }
269
270    impl<W: AsyncWriteExt + Unpin> AsyncStreamWriter<W> {
271        /// Create a new async stream writer
272        pub fn new(writer: W) -> Self {
273            Self::with_buffer_size(writer, DEFAULT_BUFFER_SIZE)
274        }
275
276        /// Create a new async stream writer with custom buffer size
277        pub fn with_buffer_size(writer: W, buffer_size: usize) -> Self {
278            Self {
279                writer,
280                buffer: Vec::with_capacity(buffer_size),
281                buffer_size,
282            }
283        }
284
285        /// Write a chunk asynchronously
286        pub async fn write_chunk(&mut self, data: &[u8]) -> io::Result<()> {
287            if self.buffer.len() + data.len() <= self.buffer_size {
288                self.buffer.extend_from_slice(data);
289                return Ok(());
290            }
291
292            if !self.buffer.is_empty() {
293                self.writer.write_all(&self.buffer).await?;
294                self.buffer.clear();
295            }
296
297            if data.len() > self.buffer_size {
298                self.writer.write_all(data).await?;
299            } else {
300                self.buffer.extend_from_slice(data);
301            }
302
303            Ok(())
304        }
305
306        /// Flush asynchronously
307        pub async fn flush(&mut self) -> io::Result<()> {
308            if !self.buffer.is_empty() {
309                self.writer.write_all(&self.buffer).await?;
310                self.buffer.clear();
311            }
312            self.writer.flush().await
313        }
314
315        /// Finish writing asynchronously
316        pub async fn finish(mut self) -> io::Result<W> {
317            self.flush().await?;
318            Ok(self.writer)
319        }
320    }
321
322    /// Stream write to file asynchronously
323    pub async fn stream_write_file<P, I, D>(path: P, chunks: I) -> io::Result<()>
324    where
325        P: AsRef<Path>,
326        I: Iterator<Item = D>,
327        D: AsRef<[u8]>,
328    {
329        let file = tokio::fs::File::create(path).await?;
330        let mut writer = AsyncStreamWriter::new(file);
331
332        for chunk in chunks {
333            writer.write_chunk(chunk.as_ref()).await?;
334        }
335
336        writer.flush().await?;
337        Ok(())
338    }
339}
340
341#[cfg(test)]
342mod tests {
343    use super::*;
344    use std::io::Cursor;
345
346    #[test]
347    fn test_stream_reader_count_bytes() {
348        let data = b"Hello, world!";
349        let cursor = Cursor::new(data);
350        let mut reader = StreamReader::new(cursor);
351
352        let count = reader.count_bytes().unwrap();
353        assert_eq!(count, data.len() as u64);
354    }
355
356    #[test]
357    fn test_stream_writer() {
358        let mut buffer = Vec::new();
359        let mut writer = StreamWriter::with_buffer_size(&mut buffer, 10);
360
361        writer.write_chunk(b"Hello").unwrap();
362        writer.write_chunk(b", ").unwrap();
363        writer.write_chunk(b"world!").unwrap();
364        writer.flush().unwrap();
365
366        assert_eq!(buffer, b"Hello, world!");
367    }
368
369    #[test]
370    fn test_stream_reader_fold() {
371        let data = b"abcdefghij";
372        let cursor = Cursor::new(data);
373        let mut reader = StreamReader::with_buffer_size(cursor, 3);
374
375        let result = reader.fold(0, |acc, chunk| Ok(acc + chunk.len())).unwrap();
376        assert_eq!(result, data.len());
377    }
378}