use crate::error::ZeroFsError;
use crate::file::File;
use std::future::Future;
use std::io::{self, SeekFrom};
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll, ready};
use tokio::io::{AsyncRead, AsyncSeek, AsyncWrite, ReadBuf};
type BoxFut<T> = Pin<Box<dyn Future<Output = T> + Send>>;
enum State {
Idle,
Reading(BoxFut<Result<bytes::Bytes, ZeroFsError>>),
Writing {
fut: BoxFut<Result<(), ZeroFsError>>,
len: u64,
},
SeekingEnd {
fut: BoxFut<Result<u64, ZeroFsError>>,
delta: i64,
},
}
pub struct FileCursor {
file: Arc<File>,
pos: u64,
state: State,
}
impl FileCursor {
pub(crate) fn new(file: Arc<File>) -> Self {
Self {
file,
pos: 0,
state: State::Idle,
}
}
pub fn position(&self) -> u64 {
self.pos
}
}
impl AsyncRead for FileCursor {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
loop {
match &mut self.state {
State::Reading(fut) => {
let data = ready!(fut.as_mut().poll(cx));
self.state = State::Idle;
let data = data.map_err(io::Error::from)?;
let n = data.len().min(buf.remaining());
buf.put_slice(&data[..n]);
self.pos += n as u64;
return Poll::Ready(Ok(()));
}
State::Idle => {
if buf.remaining() == 0 {
return Poll::Ready(Ok(()));
}
let chunk = self.file.max_read_chunk().max(1) as usize;
let len = buf.remaining().min(chunk) as u32;
let offset = self.pos;
let file = Arc::clone(&self.file);
self.state =
State::Reading(Box::pin(async move { file.read_at(offset, len).await }));
}
_ => return Poll::Ready(Err(busy())),
}
}
}
}
impl AsyncWrite for FileCursor {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
loop {
match &mut self.state {
State::Writing { fut, len } => {
let res = ready!(fut.as_mut().poll(cx));
let len = *len;
self.state = State::Idle;
res.map_err(io::Error::from)?;
self.pos += len;
return Poll::Ready(Ok(len as usize));
}
State::Idle => {
if buf.is_empty() {
return Poll::Ready(Ok(0));
}
let offset = self.pos;
let data = buf.to_vec();
let len = data.len() as u64;
let file = Arc::clone(&self.file);
self.state = State::Writing {
fut: Box::pin(async move { file.write_at(offset, &data).await }),
len,
};
}
_ => return Poll::Ready(Err(busy())),
}
}
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
match &mut self.state {
State::Writing { fut, len } => {
let res = ready!(fut.as_mut().poll(cx));
let len = *len;
self.state = State::Idle;
res.map_err(io::Error::from)?;
self.pos += len;
Poll::Ready(Ok(()))
}
_ => Poll::Ready(Ok(())),
}
}
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.poll_flush(cx)
}
}
impl AsyncSeek for FileCursor {
fn start_seek(mut self: Pin<&mut Self>, position: SeekFrom) -> io::Result<()> {
if !matches!(self.state, State::Idle) {
return Err(busy());
}
match position {
SeekFrom::Start(n) => self.pos = n,
SeekFrom::Current(delta) => {
self.pos = self
.pos
.checked_add_signed(delta)
.ok_or_else(negative_seek)?;
}
SeekFrom::End(delta) => {
let file = Arc::clone(&self.file);
self.state = State::SeekingEnd {
fut: Box::pin(async move { file.metadata().await.map(|m| m.size) }),
delta,
};
}
}
Ok(())
}
fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
if let State::SeekingEnd { fut, delta } = &mut self.state {
let size = ready!(fut.as_mut().poll(cx));
let delta = *delta;
self.state = State::Idle;
let size = size.map_err(io::Error::from)?;
self.pos = size.checked_add_signed(delta).ok_or_else(negative_seek)?;
}
Poll::Ready(Ok(self.pos))
}
}
fn busy() -> io::Error {
io::Error::other("zerofs FileCursor: another I/O operation is already in flight")
}
fn negative_seek() -> io::Error {
io::Error::new(
io::ErrorKind::InvalidInput,
"zerofs FileCursor: seek to a negative or out-of-range offset",
)
}