use std::{future::Future, pin::Pin, task::ready, task::Poll};
use crate::client::{Error, SftpFuture, SftpReply, SftpRequest};
use crate::message::{Close, Data, Handle, Write};
use super::{File, OperationResult, PendingOperation};
impl File {
pub fn write(&self, offset: u64, data: impl Into<Data>) -> SftpFuture {
if let Some(handle) = &self.handle {
self.client.write(Handle::clone(handle), offset, data)
} else {
SftpFuture::Error(Error::Io(std::io::Error::new(
std::io::ErrorKind::BrokenPipe,
"File was already closed",
)))
}
}
}
impl tokio::io::AsyncWrite for File {
fn poll_write(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> Poll<std::io::Result<usize>> {
let result = match ready!(self.pending.poll(cx)) {
OperationResult::Write(write) => write,
_ => {
let Some(handle) = &self.handle else {
return Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::BrokenPipe,
"File was closed",
)));
};
let handle = Handle::clone(handle);
let length = buf.len().min(32768);
self.pending = PendingOperation::Write(
self.client.request_with(
Write {
handle,
offset: self.offset,
data: buf[0..length].to_owned().into(),
}
.to_request_message(),
length,
|length, msg| {
<()>::from_reply_message(msg)?;
Ok(length)
},
),
);
if let PendingOperation::Write(pending) = &mut self.pending {
ready!(Pin::new(pending).poll(cx))
} else {
unreachable!()
}
}
};
match result {
Ok(len) => {
self.offset += len as u64;
std::task::Poll::Ready(Ok(len))
}
Err(err) => Poll::Ready(Err(err.into())),
}
}
fn poll_flush(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<std::io::Result<()>> {
match ready!(self.pending.poll(cx)) {
OperationResult::Write(Ok(len)) => {
self.pending = PendingOperation::None;
self.offset += len as u64;
Poll::Ready(Ok(()))
}
OperationResult::Write(Err(err)) => Poll::Ready(Err(err.into())),
_ => Poll::Ready(Ok(())),
}
}
fn poll_shutdown(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
let result = match ready!(self.pending.poll(cx)) {
OperationResult::Close(close) => close,
_ => {
let Some(handle) = &self.handle else {
return Poll::Ready(Ok(()));
};
let handle = Handle::clone(handle);
self.pending = PendingOperation::Close(self.client.request(Close { handle }));
if let PendingOperation::Close(pending) = &mut self.pending {
ready!(Pin::new(pending).poll(cx))
} else {
unreachable!()
}
}
};
Poll::Ready(result.map_err(Into::into))
}
}