embeddenator_io/io/
buffer.rs1use std::fs::File;
7use std::io::{self, BufReader, BufWriter, Read, Write};
8use std::path::Path;
9
10pub const DEFAULT_BUFFER_SIZE: usize = 64 * 1024;
12
13pub const LARGE_BUFFER_SIZE: usize = 1024 * 1024;
15
16pub const SMALL_BUFFER_SIZE: usize = 4 * 1024;
18
19pub fn buffered_reader<P: AsRef<Path>>(path: P) -> io::Result<BufReader<File>> {
31 let file = File::open(path)?;
32 Ok(BufReader::with_capacity(DEFAULT_BUFFER_SIZE, file))
33}
34
35pub fn buffered_reader_with_capacity<P: AsRef<Path>>(
37 path: P,
38 capacity: usize,
39) -> io::Result<BufReader<File>> {
40 let file = File::open(path)?;
41 Ok(BufReader::with_capacity(capacity, file))
42}
43
44pub fn buffered_writer<P: AsRef<Path>>(path: P) -> io::Result<BufWriter<File>> {
56 let file = File::create(path)?;
57 Ok(BufWriter::with_capacity(DEFAULT_BUFFER_SIZE, file))
58}
59
60pub fn buffered_writer_with_capacity<P: AsRef<Path>>(
62 path: P,
63 capacity: usize,
64) -> io::Result<BufWriter<File>> {
65 let file = File::create(path)?;
66 Ok(BufWriter::with_capacity(capacity, file))
67}
68
69pub fn read_chunks<P, F>(path: P, chunk_size: usize, mut callback: F) -> io::Result<()>
85where
86 P: AsRef<Path>,
87 F: FnMut(&[u8]) -> io::Result<()>,
88{
89 let file = File::open(path)?;
90 let mut reader = BufReader::with_capacity(chunk_size.max(4096), file);
91 let mut buffer = vec![0u8; chunk_size];
92
93 loop {
94 let n = reader.read(&mut buffer)?;
95 if n == 0 {
96 break;
97 }
98 callback(&buffer[..n])?;
99 }
100
101 Ok(())
102}
103
104pub fn write_chunks<P, I, D>(path: P, chunks: I) -> io::Result<()>
114where
115 P: AsRef<Path>,
116 I: IntoIterator<Item = D>,
117 D: AsRef<[u8]>,
118{
119 let file = File::create(path)?;
120 let mut writer = BufWriter::with_capacity(DEFAULT_BUFFER_SIZE, file);
121
122 for chunk in chunks {
123 writer.write_all(chunk.as_ref())?;
124 }
125
126 writer.flush()?;
127 Ok(())
128}
129
130pub fn copy_buffered<R: Read, W: Write>(
145 reader: &mut R,
146 writer: &mut W,
147 buffer_size: usize,
148) -> io::Result<u64> {
149 let mut buffer = vec![0u8; buffer_size];
150 let mut total = 0u64;
151
152 loop {
153 let n = reader.read(&mut buffer)?;
154 if n == 0 {
155 break;
156 }
157 writer.write_all(&buffer[..n])?;
158 total += n as u64;
159 }
160
161 Ok(total)
162}
163
164pub struct ChunkStream<R> {
166 reader: BufReader<R>,
167 chunk_size: usize,
168}
169
170impl<R: Read> ChunkStream<R> {
171 pub fn new(reader: R) -> Self {
173 Self::with_chunk_size(reader, DEFAULT_BUFFER_SIZE)
174 }
175
176 pub fn with_chunk_size(reader: R, chunk_size: usize) -> Self {
178 Self {
179 reader: BufReader::with_capacity(chunk_size.max(4096), reader),
180 chunk_size,
181 }
182 }
183
184 pub fn next_chunk(&mut self) -> io::Result<Option<Vec<u8>>> {
188 let mut buffer = vec![0u8; self.chunk_size];
189 let n = self.reader.read(&mut buffer)?;
190 if n == 0 {
191 return Ok(None);
192 }
193 buffer.truncate(n);
194 Ok(Some(buffer))
195 }
196
197 pub fn process_all<F>(&mut self, mut callback: F) -> io::Result<()>
199 where
200 F: FnMut(&[u8]) -> io::Result<()>,
201 {
202 loop {
203 let mut buffer = vec![0u8; self.chunk_size];
204 let n = self.reader.read(&mut buffer)?;
205 if n == 0 {
206 break;
207 }
208 callback(&buffer[..n])?;
209 }
210 Ok(())
211 }
212}
213
214#[cfg(feature = "async")]
215pub mod async_buffer {
216 use std::io;
219 use std::path::Path;
220 use tokio::fs::File;
221 use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader, BufWriter};
222
223 use super::DEFAULT_BUFFER_SIZE;
224
225 pub async fn buffered_reader<P: AsRef<Path>>(path: P) -> io::Result<BufReader<File>> {
227 let file = File::open(path).await?;
228 Ok(BufReader::with_capacity(DEFAULT_BUFFER_SIZE, file))
229 }
230
231 pub async fn buffered_writer<P: AsRef<Path>>(path: P) -> io::Result<BufWriter<File>> {
233 let file = File::create(path).await?;
234 Ok(BufWriter::with_capacity(DEFAULT_BUFFER_SIZE, file))
235 }
236
237 pub async fn read_chunks<P, F, Fut>(
239 path: P,
240 chunk_size: usize,
241 mut callback: F,
242 ) -> io::Result<()>
243 where
244 P: AsRef<Path>,
245 F: FnMut(Vec<u8>) -> Fut,
246 Fut: std::future::Future<Output = io::Result<()>>,
247 {
248 let file = File::open(path).await?;
249 let mut reader = BufReader::with_capacity(chunk_size.max(4096), file);
250 let mut buffer = vec![0u8; chunk_size];
251
252 loop {
253 let n = reader.read(&mut buffer).await?;
254 if n == 0 {
255 break;
256 }
257 callback(buffer[..n].to_vec()).await?;
258 }
259
260 Ok(())
261 }
262
263 pub async fn copy_buffered<R, W>(
265 reader: &mut R,
266 writer: &mut W,
267 buffer_size: usize,
268 ) -> io::Result<u64>
269 where
270 R: AsyncReadExt + Unpin,
271 W: AsyncWriteExt + Unpin,
272 {
273 let mut buffer = vec![0u8; buffer_size];
274 let mut total = 0u64;
275
276 loop {
277 let n = reader.read(&mut buffer).await?;
278 if n == 0 {
279 break;
280 }
281 writer.write_all(&buffer[..n]).await?;
282 total += n as u64;
283 }
284
285 Ok(total)
286 }
287}
288
289#[cfg(test)]
290mod tests {
291 use super::*;
292 use std::io::Cursor;
293
294 #[test]
295 fn test_chunk_stream() {
296 let data = b"Hello, world! This is a test.";
297 let cursor = Cursor::new(data);
298 let mut stream = ChunkStream::with_chunk_size(cursor, 10);
299
300 let mut chunks = Vec::new();
301 while let Some(chunk) = stream.next_chunk().unwrap() {
302 chunks.push(chunk);
303 }
304
305 let reconstructed: Vec<u8> = chunks.into_iter().flatten().collect();
306 assert_eq!(reconstructed, data);
307 }
308
309 #[test]
310 fn test_copy_buffered() {
311 let data = b"Test data for copying";
312 let mut reader = Cursor::new(data);
313 let mut writer = Vec::new();
314
315 let copied = copy_buffered(&mut reader, &mut writer, 8).unwrap();
316 assert_eq!(copied, data.len() as u64);
317 assert_eq!(writer, data);
318 }
319
320 #[test]
321 fn test_process_all() {
322 let data = b"Process all chunks";
323 let cursor = Cursor::new(data);
324 let mut stream = ChunkStream::with_chunk_size(cursor, 5);
325
326 let mut total_bytes = 0;
327 stream
328 .process_all(|chunk| {
329 total_bytes += chunk.len();
330 Ok(())
331 })
332 .unwrap();
333
334 assert_eq!(total_bytes, data.len());
335 }
336}