use std::{
io,
marker::PhantomData,
mem::replace,
pin::Pin,
task::{Context, Poll},
};
use tempest_io::{Io, IoBufMut, OpHandle, ReadHandle};
use crate::context::{current_io, get_op_handle};
enum ReadAtState<B: IoBufMut> {
NeedsSubmit {
buf: B,
op_handle: Option<OpHandle>,
},
InFlight {
read_handle: ReadHandle<B>,
op_handle: OpHandle,
},
Done,
}
#[must_use = "futures do nothing unless awaited"]
pub struct ReadAt<B: IoBufMut, I: Io> {
state: ReadAtState<B>,
fd: I::Fd,
offset: u64,
_marker: PhantomData<fn() -> I>,
}
pub fn read_at<B: IoBufMut, I: Io>(fd: I::Fd, buf: B, offset: u64) -> ReadAt<B, I> {
ReadAt {
state: ReadAtState::NeedsSubmit {
buf,
op_handle: None,
},
fd,
offset,
_marker: PhantomData,
}
}
impl<B: IoBufMut, I: Io> Unpin for ReadAt<B, I> {}
impl<B: IoBufMut, I: Io> Future for ReadAt<B, I> {
type Output = (io::Result<usize>, B);
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
match replace(&mut this.state, ReadAtState::Done) {
ReadAtState::NeedsSubmit { buf, op_handle } => {
let op_handle = op_handle.unwrap_or_else(|| unsafe { get_op_handle(cx) });
let io = unsafe { current_io::<I>() };
match io.read_at(this.fd, buf, this.offset, op_handle) {
Ok(read_handle) => {
this.state = ReadAtState::InFlight {
read_handle,
op_handle,
};
Poll::Pending
}
Err((e, buf)) if e.kind() == io::ErrorKind::WouldBlock => {
this.state = ReadAtState::NeedsSubmit {
buf,
op_handle: Some(op_handle),
};
cx.waker().wake_by_ref();
Poll::Pending
}
Err((e, buf)) => Poll::Ready((Err(e), buf)),
}
}
ReadAtState::InFlight {
read_handle,
op_handle,
} => {
let io = unsafe { current_io::<I>() };
match io.get_cqe(op_handle) {
Some(Ok(bytes_written)) => {
let buf = unsafe { read_handle.complete(bytes_written) };
Poll::Ready((Ok(bytes_written as usize), buf))
}
Some(Err(e)) => {
let buf = unsafe { read_handle.complete(0) };
Poll::Ready((Err(e), buf))
}
None => Poll::Pending,
}
}
ReadAtState::Done => panic!("polled after completion"),
}
}
}