use bytes::{Bytes, BytesMut};
use futures_util::Stream;
use std::fs::Metadata;
use std::io::Read;
use std::pin::Pin;
use std::task::{Context, Poll};
use crate::Result;
#[cfg(unix)]
const DEFAULT_READ_BUF_SIZE: usize = 4_096;
#[cfg(not(unix))]
const DEFAULT_READ_BUF_SIZE: usize = 8_192;
#[derive(Debug)]
pub(crate) struct FileStream<T> {
pub(crate) reader: T,
pub(crate) buf_size: usize,
}
impl<T: Read + Unpin> Stream for FileStream<T> {
type Item = Result<Bytes>;
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut buf = BytesMut::zeroed(self.buf_size);
match Pin::into_inner(self).reader.read(&mut buf[..]) {
Ok(n) => {
if n == 0 {
Poll::Ready(None)
} else {
buf.truncate(n);
Poll::Ready(Some(Ok(buf.freeze())))
}
}
Err(err) => Poll::Ready(Some(Err(anyhow::Error::from(err)))),
}
}
}
pub(crate) fn optimal_buf_size(metadata: &Metadata) -> usize {
let block_size = get_block_size(metadata);
std::cmp::min(block_size as u64, metadata.len()) as usize
}
#[cfg(unix)]
fn get_block_size(metadata: &Metadata) -> usize {
use std::os::unix::fs::MetadataExt;
std::cmp::max(metadata.blksize() as usize, DEFAULT_READ_BUF_SIZE)
}
#[cfg(not(unix))]
fn get_block_size(_metadata: &Metadata) -> usize {
DEFAULT_READ_BUF_SIZE
}