use std::io;
use std::path::Path;
use std::pin::Pin;
use std::task::{Context, Poll};
use futures_core::Stream;
use minarrow::{Vec64, vec64};
use tokio::fs::File;
use tokio::io::{AsyncRead, AsyncReadExt, BufReader};
use crate::enums::BufferChunkSize;
pub struct DiskByteStream {
reader: BufReader<File>,
eof: bool,
buf: Vec64<u8>,
chunk_size: usize,
}
impl DiskByteStream {
pub async fn open(path: impl AsRef<Path>, size: BufferChunkSize) -> io::Result<Self> {
let chunk_size = size.chunk_size();
let file = File::open(path).await?;
Ok(Self {
reader: BufReader::with_capacity(chunk_size, file),
eof: false,
buf: vec64![0u8; chunk_size],
chunk_size,
})
}
}
impl Stream for DiskByteStream {
type Item = Result<Vec64<u8>, io::Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let me = self.get_mut();
if me.eof {
return Poll::Ready(None);
}
let read = {
let mut fut = Box::pin(me.reader.read(&mut me.buf[..me.chunk_size]));
futures_core::ready!(fut.as_mut().poll(cx))
};
match read {
Ok(0) => {
me.eof = true;
Poll::Ready(None) }
Ok(n) => {
let mut out = std::mem::replace(
&mut me.buf,
vec64![0u8; me.chunk_size], );
out.truncate(n); Poll::Ready(Some(Ok(out))) }
Err(e) => {
me.eof = true;
Poll::Ready(Some(Err(e)))
}
}
}
}
impl AsyncRead for DiskByteStream {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> Poll<io::Result<()>> {
let me = self.get_mut();
Pin::new(&mut me.reader).poll_read(cx, buf)
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures_util::StreamExt;
use std::fs::File as StdFile;
use std::io::Write;
use std::path::PathBuf;
use tokio::runtime::Runtime;
fn create_test_file(size: usize, pattern: u8) -> PathBuf {
let tmp_path = std::env::temp_dir().join(format!("disk_bytestream_test_{}.bin", pattern));
let mut f = StdFile::create(&tmp_path).expect("create temp file");
f.write_all(&vec![pattern; size]).expect("write data");
tmp_path
}
#[test]
fn test_disk_bytestream_fileio_chunks() {
const FILE_SIZE: usize = 4 * 1024 * 1024;
let path = create_test_file(FILE_SIZE, 0xAA);
let rt = Runtime::new().expect("create runtime");
rt.block_on(async {
let stream = DiskByteStream::open(&path, BufferChunkSize::FileIO)
.await
.expect("open stream");
let mut s = Box::pin(stream);
let mut count = 0usize;
let mut total_bytes = 0usize;
while let Some(item) = s.next().await {
let chunk = item.expect("chunk read error");
assert!(chunk.len() <= BufferChunkSize::FileIO.chunk_size());
for b in chunk.iter() {
assert_eq!(*b, 0xAA);
}
count += 1;
total_bytes += chunk.len();
}
assert!(count > 0);
assert_eq!(total_bytes, FILE_SIZE);
});
std::fs::remove_file(path).unwrap();
}
#[test]
fn test_disk_bytestream_custom_chunk() {
const FILE_SIZE: usize = 1 * 1024 * 1024; const CHUNK: usize = 128 * 1024;
let path = create_test_file(FILE_SIZE, 0x55);
let rt = Runtime::new().expect("create runtime");
rt.block_on(async {
let stream = DiskByteStream::open(&path, BufferChunkSize::Custom(CHUNK))
.await
.expect("open stream");
let mut s = Box::pin(stream);
let mut count = 0usize;
let mut total_bytes = 0usize;
while let Some(item) = s.next().await {
let chunk = item.expect("chunk read error");
assert!(chunk.len() <= CHUNK);
for b in chunk.iter() {
assert_eq!(*b, 0x55);
}
count += 1;
total_bytes += chunk.len();
}
assert!(count > 0);
assert_eq!(total_bytes, FILE_SIZE);
});
std::fs::remove_file(path).unwrap();
}
}