use std::{task::ready, task::Poll};
use bytes::Bytes;
use futures::Future;
use crate::client::Error;
use crate::message::{Handle, Read, Status, StatusCode};
use super::{File, OperationResult, PendingOperation};
impl File {
pub fn read(
&self,
offset: u64,
length: u32,
) -> impl Future<Output = Result<Bytes, Error>> + Send + Sync + 'static {
let future = if let Some(handle) = &self.handle {
Ok(self.client.request(Read {
handle: Handle::clone(handle),
offset,
length,
}))
} else {
Err(Error::Io(std::io::Error::new(
std::io::ErrorKind::BrokenPipe,
"File was already closed",
)))
};
async move { Ok(future?.await?.0) }
}
}
impl tokio::io::AsyncRead for File {
fn poll_read(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> Poll<std::result::Result<(), std::io::Error>> {
let result = match ready!(self.pending.poll(cx)) {
OperationResult::Read(read) => read,
_ => {
let Some(handle) = &self.handle else {
return Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::BrokenPipe,
"File was closed",
)));
};
let handle = Handle::clone(handle);
let read = self.client.request(Read {
handle,
offset: self.offset,
length: buf.remaining().min(32768) as u32, });
self.pending = PendingOperation::Read(Box::pin(async move {
match read.await {
Ok(data) => Ok(data.0),
Err(Error::Sftp(Status {
code: StatusCode::Eof,
..
})) => Ok(Bytes::default()),
Err(status) => Err(status.into()),
}
}));
if let PendingOperation::Read(pending) = &mut self.pending {
ready!(pending.as_mut().poll(cx))
} else {
unreachable!()
}
}
};
match result {
Ok(data) => {
if data.is_empty() {
std::task::Poll::Ready(Ok(()))
} else {
buf.put_slice(&data);
self.offset += data.len() as u64;
std::task::Poll::Ready(Ok(()))
}
}
Err(err) => Poll::Ready(Err(err)),
}
}
}