use std::{future::Future, pin::Pin, task::ready, task::Poll};
use bytes::Bytes;
use crate::client::{Error, SftpFuture};
use crate::message::{Handle, Read, Status, StatusCode};
use super::{File, OperationResult, PendingOperation};
impl File {
pub fn read(&self, offset: u64, length: u32) -> SftpFuture<Bytes> {
if let Some(handle) = &self.handle {
self.client.read(Handle::clone(handle), offset, length)
} else {
SftpFuture::Error(Error::Io(std::io::Error::new(
std::io::ErrorKind::BrokenPipe,
"File was already closed",
)))
}
}
}
impl tokio::io::AsyncRead for File {
fn poll_read(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> Poll<std::result::Result<(), std::io::Error>> {
let result = match ready!(self.pending.poll(cx)) {
OperationResult::Read(read) => read,
_ => {
let Some(handle) = &self.handle else {
return Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::BrokenPipe,
"File was closed",
)));
};
let handle = Handle::clone(handle);
self.pending = PendingOperation::Read(self.client.request(Read {
handle,
offset: self.offset,
length: buf.remaining().min(32768) as u32, }));
if let PendingOperation::Read(pending) = &mut self.pending {
ready!(Pin::new(pending).poll(cx))
} else {
unreachable!()
}
}
};
match result {
Ok(data) => {
buf.put_slice(&data);
self.offset += data.len() as u64;
std::task::Poll::Ready(Ok(()))
}
Err(Error::Sftp(Status {
code: StatusCode::Eof,
..
})) => std::task::Poll::Ready(Ok(())),
Err(err) => Poll::Ready(Err(err.into())),
}
}
}