use std::io::{self, Read, Write};
use std::path::Path;
pub struct StreamReader<R> {
reader: R,
buffer_size: usize,
}
impl<R: Read> StreamReader<R> {
pub fn new(reader: R) -> Self {
Self::with_buffer_size(reader, super::buffer::DEFAULT_BUFFER_SIZE)
}
pub fn with_buffer_size(reader: R, buffer_size: usize) -> Self {
Self {
reader,
buffer_size,
}
}
pub fn read_all<F, T>(&mut self, mut transform: F) -> io::Result<Vec<T>>
where
F: FnMut(&[u8]) -> io::Result<T>,
{
let mut results = Vec::new();
let mut buffer = vec![0u8; self.buffer_size];
loop {
let n = self.reader.read(&mut buffer)?;
if n == 0 {
break;
}
let result = transform(&buffer[..n])?;
results.push(result);
}
Ok(results)
}
pub fn fold<F, T>(&mut self, init: T, mut fold_fn: F) -> io::Result<T>
where
F: FnMut(T, &[u8]) -> io::Result<T>,
{
let mut acc = init;
let mut buffer = vec![0u8; self.buffer_size];
loop {
let n = self.reader.read(&mut buffer)?;
if n == 0 {
break;
}
acc = fold_fn(acc, &buffer[..n])?;
}
Ok(acc)
}
pub fn count_bytes(&mut self) -> io::Result<u64> {
self.fold(0u64, |acc, chunk| Ok(acc + chunk.len() as u64))
}
}
pub struct StreamWriter<W> {
writer: W,
buffer: Vec<u8>,
buffer_size: usize,
}
impl<W: Write> StreamWriter<W> {
pub fn new(writer: W) -> Self {
Self::with_buffer_size(writer, super::buffer::DEFAULT_BUFFER_SIZE)
}
pub fn with_buffer_size(writer: W, buffer_size: usize) -> Self {
Self {
writer,
buffer: Vec::with_capacity(buffer_size),
buffer_size,
}
}
pub fn write_chunk(&mut self, data: &[u8]) -> io::Result<()> {
if self.buffer.len() + data.len() <= self.buffer_size {
self.buffer.extend_from_slice(data);
return Ok(());
}
if !self.buffer.is_empty() {
self.writer.write_all(&self.buffer)?;
self.buffer.clear();
}
if data.len() > self.buffer_size {
self.writer.write_all(data)?;
} else {
self.buffer.extend_from_slice(data);
}
Ok(())
}
pub fn flush(&mut self) -> io::Result<()> {
if !self.buffer.is_empty() {
self.writer.write_all(&self.buffer)?;
self.buffer.clear();
}
self.writer.flush()
}
pub fn finish(mut self) -> io::Result<W> {
self.flush()?;
Ok(self.writer)
}
}
pub fn stream_write_file<P, I, D>(path: P, chunks: I) -> io::Result<()>
where
P: AsRef<Path>,
I: Iterator<Item = D>,
D: AsRef<[u8]>,
{
let file = std::fs::File::create(path)?;
let mut writer = StreamWriter::new(file);
for chunk in chunks {
writer.write_chunk(chunk.as_ref())?;
}
writer.flush()?;
Ok(())
}
pub fn stream_read_file<P, F>(path: P, mut callback: F) -> io::Result<()>
where
P: AsRef<Path>,
F: FnMut(&[u8]) -> io::Result<()>,
{
let file = std::fs::File::open(path)?;
let mut reader = StreamReader::new(file);
let mut buffer = vec![0u8; reader.buffer_size];
loop {
let n = reader.reader.read(&mut buffer)?;
if n == 0 {
break;
}
callback(&buffer[..n])?;
}
Ok(())
}
#[cfg(feature = "async")]
pub mod async_stream {
use std::io;
use std::path::Path;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use super::super::buffer::DEFAULT_BUFFER_SIZE;
pub struct AsyncStreamReader<R> {
reader: R,
buffer_size: usize,
}
impl<R: AsyncReadExt + Unpin> AsyncStreamReader<R> {
pub fn new(reader: R) -> Self {
Self::with_buffer_size(reader, DEFAULT_BUFFER_SIZE)
}
pub fn with_buffer_size(reader: R, buffer_size: usize) -> Self {
Self {
reader,
buffer_size,
}
}
pub async fn read_all<F, Fut, T>(&mut self, mut transform: F) -> io::Result<Vec<T>>
where
F: FnMut(Vec<u8>) -> Fut,
Fut: std::future::Future<Output = io::Result<T>>,
{
let mut results = Vec::new();
let mut buffer = vec![0u8; self.buffer_size];
loop {
let n = self.reader.read(&mut buffer).await?;
if n == 0 {
break;
}
let result = transform(buffer[..n].to_vec()).await?;
results.push(result);
}
Ok(results)
}
pub async fn count_bytes(&mut self) -> io::Result<u64> {
let mut total = 0u64;
let mut buffer = vec![0u8; self.buffer_size];
loop {
let n = self.reader.read(&mut buffer).await?;
if n == 0 {
break;
}
total += n as u64;
}
Ok(total)
}
}
pub struct AsyncStreamWriter<W> {
writer: W,
buffer: Vec<u8>,
buffer_size: usize,
}
impl<W: AsyncWriteExt + Unpin> AsyncStreamWriter<W> {
pub fn new(writer: W) -> Self {
Self::with_buffer_size(writer, DEFAULT_BUFFER_SIZE)
}
pub fn with_buffer_size(writer: W, buffer_size: usize) -> Self {
Self {
writer,
buffer: Vec::with_capacity(buffer_size),
buffer_size,
}
}
pub async fn write_chunk(&mut self, data: &[u8]) -> io::Result<()> {
if self.buffer.len() + data.len() <= self.buffer_size {
self.buffer.extend_from_slice(data);
return Ok(());
}
if !self.buffer.is_empty() {
self.writer.write_all(&self.buffer).await?;
self.buffer.clear();
}
if data.len() > self.buffer_size {
self.writer.write_all(data).await?;
} else {
self.buffer.extend_from_slice(data);
}
Ok(())
}
pub async fn flush(&mut self) -> io::Result<()> {
if !self.buffer.is_empty() {
self.writer.write_all(&self.buffer).await?;
self.buffer.clear();
}
self.writer.flush().await
}
pub async fn finish(mut self) -> io::Result<W> {
self.flush().await?;
Ok(self.writer)
}
}
pub async fn stream_write_file<P, I, D>(path: P, chunks: I) -> io::Result<()>
where
P: AsRef<Path>,
I: Iterator<Item = D>,
D: AsRef<[u8]>,
{
let file = tokio::fs::File::create(path).await?;
let mut writer = AsyncStreamWriter::new(file);
for chunk in chunks {
writer.write_chunk(chunk.as_ref()).await?;
}
writer.flush().await?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Cursor;
#[test]
fn test_stream_reader_count_bytes() {
let data = b"Hello, world!";
let cursor = Cursor::new(data);
let mut reader = StreamReader::new(cursor);
let count = reader.count_bytes().unwrap();
assert_eq!(count, data.len() as u64);
}
#[test]
fn test_stream_writer() {
let mut buffer = Vec::new();
let mut writer = StreamWriter::with_buffer_size(&mut buffer, 10);
writer.write_chunk(b"Hello").unwrap();
writer.write_chunk(b", ").unwrap();
writer.write_chunk(b"world!").unwrap();
writer.flush().unwrap();
assert_eq!(buffer, b"Hello, world!");
}
#[test]
fn test_stream_reader_fold() {
let data = b"abcdefghij";
let cursor = Cursor::new(data);
let mut reader = StreamReader::with_buffer_size(cursor, 3);
let result = reader.fold(0, |acc, chunk| Ok(acc + chunk.len())).unwrap();
assert_eq!(result, data.len());
}
}