use crate::{
StreamData,
endpoint::SftpEndpoint,
error::Error,
reader::{ReaderNotification, StreamReader},
};
use async_std::channel::Sender;
use std::io::Read;
#[async_trait::async_trait]
impl StreamReader for SftpEndpoint {
async fn read_stream(
&self,
path: &str,
sender: Sender<StreamData>,
channel: &dyn ReaderNotification,
) -> Result<u64, Error> {
let connection = self.connection();
let absolute_path = self.absolute_path(path);
let mut sftp_reader = connection.lock().unwrap().read_over_sftp(&absolute_path)?;
let file_size = sftp_reader.get_size()?;
log::debug!("Size of {} remote file: {}", absolute_path, file_size);
sender.send(StreamData::Size(file_size)).await?;
log::info!("Start reading remote file {}...", absolute_path);
let buffer_size = if let Ok(buffer_size) = std::env::var("SFTP_READER_BUFFER_SIZE") {
buffer_size
.parse::<u32>()
.map_err(|error| Error::from(("SFTP_READER_BUFFER_SIZE", error)))? as usize
} else {
1024 * 1024
};
let mut total_read_bytes = 0;
loop {
if channel.is_stopped() {
sender.send(StreamData::Stop).await?;
return Ok(total_read_bytes as u64);
}
let mut buffer = vec![0; buffer_size];
let read_size = sftp_reader.read(&mut buffer)?;
if read_size == 0 {
sender.send(StreamData::Eof).await?;
log::debug!("Read {total_read_bytes} bytes on {file_size} expected.");
return Ok(total_read_bytes as u64);
}
total_read_bytes += read_size;
if let Err(error) = sender
.send(StreamData::Data(buffer[0..read_size].to_vec()))
.await
{
if channel.is_stopped() && sender.is_closed() {
log::warn!("Data channel closed: could not send {read_size} read bytes.");
return Ok(total_read_bytes as u64);
}
return Err(error.into());
}
}
}
}