use std::{pin::Pin, task::ready, task::Poll};
use futures::Future;
use crate::client::Error;
use crate::message::{Close, Data, Handle, Write};
use super::{File, OperationResult, PendingOperation};
impl File {
pub fn write(
&self,
offset: u64,
data: impl Into<Data>,
) -> impl Future<Output = Result<(), Error>> + Send + Sync + 'static {
let future = if let Some(handle) = &self.handle {
Ok(self.client.request(Write {
handle: Handle::clone(handle),
offset,
data: data.into(),
}))
} else {
Err(Error::Io(std::io::Error::new(
std::io::ErrorKind::BrokenPipe,
"File was already closed",
)))
};
async move { future?.await }
}
}
impl tokio::io::AsyncWrite for File {
fn poll_write(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> Poll<std::io::Result<usize>> {
let result = match ready!(self.pending.poll(cx)) {
OperationResult::Write(write) => write,
_ => {
let Some(handle) = &self.handle else {
return Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::BrokenPipe,
"File was closed",
)));
};
let handle = Handle::clone(handle);
let length = buf.len().min(32768);
let write = self.client.request(Write {
handle,
offset: self.offset,
data: buf[0..length].to_owned().into(),
});
self.pending = PendingOperation::Write(Box::pin(async move {
match write.await {
Ok(()) => Ok(length),
Err(status) => Err(std::io::Error::from(status)),
}
}));
if let PendingOperation::Write(pending) = &mut self.pending {
ready!(pending.as_mut().poll(cx))
} else {
unreachable!()
}
}
};
match result {
Ok(len) => {
self.offset += len as u64;
std::task::Poll::Ready(Ok(len))
}
Err(err) => Poll::Ready(Err(err)),
}
}
fn poll_flush(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<std::io::Result<()>> {
match ready!(self.pending.poll(cx)) {
OperationResult::Write(Ok(len)) => {
self.pending = PendingOperation::None;
self.offset += len as u64;
Poll::Ready(Ok(()))
}
OperationResult::Write(Err(err)) => Poll::Ready(Err(err)),
_ => Poll::Ready(Ok(())),
}
}
fn poll_shutdown(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
let result = match ready!(self.pending.poll(cx)) {
OperationResult::Close(close) => close,
_ => {
let Some(handle) = &self.handle else {
return Poll::Ready(Ok(()));
};
let handle = Handle::clone(handle);
let close = self.client.request(Close { handle });
self.pending = PendingOperation::Close(Box::pin(async move {
close.await.map_err(std::io::Error::from)
}));
if let PendingOperation::Close(pending) = &mut self.pending {
ready!(pending.as_mut().poll(cx))
} else {
unreachable!()
}
}
};
Poll::Ready(result)
}
}