rs_transfer 8.0.0

A simple crate to handle downloads and uploads on multiple providers
Documentation
use crate::{
  StreamData,
  endpoint::SftpEndpoint,
  error::Error,
  reader::{ReaderNotification, StreamReader},
};
use async_std::channel::Sender;
use std::io::Read;

#[async_trait::async_trait]
impl StreamReader for SftpEndpoint {
  async fn read_stream(
    &self,
    path: &str,
    sender: Sender<StreamData>,
    channel: &dyn ReaderNotification,
  ) -> Result<u64, Error> {
    let connection = self.connection();

    let absolute_path = self.absolute_path(path);
    let mut sftp_reader = connection.lock().unwrap().read_over_sftp(&absolute_path)?;
    let file_size = sftp_reader.get_size()?;

    log::debug!("Size of {} remote file: {}", absolute_path, file_size);

    sender.send(StreamData::Size(file_size)).await?;

    log::info!("Start reading remote file {}...", absolute_path);

    let buffer_size = if let Ok(buffer_size) = std::env::var("SFTP_READER_BUFFER_SIZE") {
      buffer_size
        .parse::<u32>()
        .map_err(|error| Error::from(("SFTP_READER_BUFFER_SIZE", error)))? as usize
    } else {
      1024 * 1024
    };

    let mut total_read_bytes = 0;

    loop {
      if channel.is_stopped() {
        sender.send(StreamData::Stop).await?;
        return Ok(total_read_bytes as u64);
      }

      let mut buffer = vec![0; buffer_size];
      let read_size = sftp_reader.read(&mut buffer)?;

      if read_size == 0 {
        sender.send(StreamData::Eof).await?;
        log::debug!("Read {total_read_bytes} bytes on {file_size} expected.");
        return Ok(total_read_bytes as u64);
      }

      total_read_bytes += read_size;

      if let Err(error) = sender
        .send(StreamData::Data(buffer[0..read_size].to_vec()))
        .await
      {
        if channel.is_stopped() && sender.is_closed() {
          log::warn!("Data channel closed: could not send {read_size} read bytes.");
          return Ok(total_read_bytes as u64);
        }

        return Err(error.into());
      }
    }
  }
}