use std::{
io,
marker::PhantomData,
mem::replace,
pin::Pin,
task::{Context, Poll},
};
use tempest_io::{Io, IoBuf, OpHandle, WriteHandle};
use crate::context::{current_io, get_op_handle};
enum WriteAtState<B: IoBuf> {
NeedsSubmit {
buf: B,
op_handle: Option<OpHandle>,
},
InFlight {
write_handle: WriteHandle<B>,
op_handle: OpHandle,
},
Done,
}
#[must_use = "futures do nothing unless awaited"]
pub struct WriteAt<B: IoBuf, I: Io> {
state: WriteAtState<B>,
fd: I::Fd,
offset: u64,
_marker: PhantomData<fn() -> I>,
}
pub fn write_at<B: IoBuf, I: Io>(fd: I::Fd, buf: B, offset: u64) -> WriteAt<B, I> {
WriteAt {
state: WriteAtState::NeedsSubmit {
buf,
op_handle: None,
},
fd,
offset,
_marker: PhantomData,
}
}
impl<B: IoBuf, I: Io> Unpin for WriteAt<B, I> {}
impl<B: IoBuf, I: Io> Future for WriteAt<B, I> {
type Output = (io::Result<usize>, B);
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
match replace(&mut this.state, WriteAtState::Done) {
WriteAtState::NeedsSubmit { buf, op_handle } => {
let op_handle = op_handle.unwrap_or_else(|| unsafe { get_op_handle(cx) });
let io = unsafe { current_io::<I>() };
match io.write_at(this.fd, buf, this.offset, op_handle) {
Ok(write_handle) => {
this.state = WriteAtState::InFlight {
write_handle,
op_handle,
};
Poll::Pending
}
Err((e, buf)) if e.kind() == io::ErrorKind::WouldBlock => {
this.state = WriteAtState::NeedsSubmit {
buf,
op_handle: Some(op_handle),
};
cx.waker().wake_by_ref();
Poll::Pending
}
Err((e, buf)) => Poll::Ready((Err(e), buf)),
}
}
WriteAtState::InFlight {
write_handle,
op_handle,
} => {
let io = unsafe { current_io::<I>() };
match io.get_cqe(op_handle) {
Some(Ok(bytes_written)) => {
let buf = write_handle.complete();
Poll::Ready((Ok(bytes_written as usize), buf))
}
Some(Err(e)) => {
let buf = write_handle.complete();
Poll::Ready((Err(e), buf))
}
None => Poll::Pending,
}
}
WriteAtState::Done => panic!("polled after completion"),
}
}
}