use crate::{
StreamData,
endpoint::FtpEndpoint,
error::Error,
reader::{ReaderNotification, StreamReader},
};
use async_std::channel::Sender;
use async_trait::async_trait;
use ftp::FtpError;
use std::{io::Read, path::Path};
#[async_trait]
impl StreamReader for FtpEndpoint {
async fn read_stream(
&self,
path: &str,
sender: Sender<StreamData>,
channel: &dyn ReaderNotification,
) -> Result<u64, Error> {
let absolute_path = self.absolute_path(path);
let path = Path::new(&absolute_path);
let directory = path.parent().unwrap().to_str().unwrap();
let filename = path.file_name().unwrap().to_str().unwrap();
let ftp_stream = self.connection();
ftp_stream.lock().unwrap().cwd(directory)?;
let mut total_file_size = 0;
let size = ftp_stream.lock().unwrap().size(filename)?;
if let Some(file_size) = size {
total_file_size = file_size;
sender.send(StreamData::Size(file_size as u64)).await?;
}
let buffer_size = if let Ok(buffer_size) = std::env::var("FTP_READER_BUFFER_SIZE") {
buffer_size
.parse::<u32>()
.map_err(|error| Error::from(("FTP_READER_BUFFER_SIZE", error)))? as usize
} else {
1024 * 1024
};
let total_read_bytes =
ftp_stream
.lock()
.unwrap()
.retr(filename, |reader| -> Result<u64, FtpError> {
let sender = sender.clone();
async_std::task::block_on(async {
Self::retrieve(reader, sender, channel, buffer_size, total_file_size).await
})
.map_err(|e| e.into())
})?;
Ok(total_read_bytes)
}
}
impl FtpEndpoint {
async fn retrieve(
reader: &mut dyn Read,
sender: Sender<StreamData>,
channel: &dyn ReaderNotification,
buffer_size: usize,
total_file_size: usize,
) -> Result<u64, Error> {
let mut total_read_bytes: u64 = 0;
loop {
if channel.is_stopped() {
sender.send(StreamData::Stop).await?;
return Ok(total_read_bytes);
}
let mut buffer = vec![0; buffer_size];
let read_size = reader
.read(&mut buffer)
.map_err(FtpError::ConnectionError)?;
if read_size == 0 {
sender.send(StreamData::Eof).await?;
log::debug!("Read {total_read_bytes} bytes on {total_file_size} expected.");
return Ok(total_read_bytes);
}
total_read_bytes += read_size as u64;
if let Err(error) = sender
.send(StreamData::Data(buffer[0..read_size].to_vec()))
.await
{
if channel.is_stopped() && sender.is_closed() {
log::warn!("Data channel closed: could not send {read_size} read bytes.");
break;
}
return Err(error.into());
}
}
Ok(total_read_bytes)
}
}