rs_transfer 8.0.0

A simple crate to handle downloads and uploads on multiple providers
Documentation
use crate::{
  StreamData,
  endpoint::FtpEndpoint,
  error::Error,
  reader::{ReaderNotification, StreamReader},
};
use async_std::channel::Sender;
use async_trait::async_trait;
use ftp::FtpError;
use std::{io::Read, path::Path};

#[async_trait]
impl StreamReader for FtpEndpoint {
  async fn read_stream(
    &self,
    path: &str,
    sender: Sender<StreamData>,
    channel: &dyn ReaderNotification,
  ) -> Result<u64, Error> {
    let absolute_path = self.absolute_path(path);
    let path = Path::new(&absolute_path);
    let directory = path.parent().unwrap().to_str().unwrap();
    let filename = path.file_name().unwrap().to_str().unwrap();

    let ftp_stream = self.connection();

    ftp_stream.lock().unwrap().cwd(directory)?;

    let mut total_file_size = 0;
    let size = ftp_stream.lock().unwrap().size(filename)?;
    if let Some(file_size) = size {
      total_file_size = file_size;
      sender.send(StreamData::Size(file_size as u64)).await?;
    }

    let buffer_size = if let Ok(buffer_size) = std::env::var("FTP_READER_BUFFER_SIZE") {
      buffer_size
        .parse::<u32>()
        .map_err(|error| Error::from(("FTP_READER_BUFFER_SIZE", error)))? as usize
    } else {
      1024 * 1024
    };

    let total_read_bytes =
      ftp_stream
        .lock()
        .unwrap()
        .retr(filename, |reader| -> Result<u64, FtpError> {
          let sender = sender.clone();
          async_std::task::block_on(async {
            Self::retrieve(reader, sender, channel, buffer_size, total_file_size).await
          })
          .map_err(|e| e.into())
        })?;

    Ok(total_read_bytes)
  }
}

impl FtpEndpoint {
  async fn retrieve(
    reader: &mut dyn Read,
    sender: Sender<StreamData>,
    channel: &dyn ReaderNotification,
    buffer_size: usize,
    total_file_size: usize,
  ) -> Result<u64, Error> {
    let mut total_read_bytes: u64 = 0;
    loop {
      if channel.is_stopped() {
        sender.send(StreamData::Stop).await?;
        return Ok(total_read_bytes);
      }

      let mut buffer = vec![0; buffer_size];
      let read_size = reader
        .read(&mut buffer)
        .map_err(FtpError::ConnectionError)?;

      if read_size == 0 {
        sender.send(StreamData::Eof).await?;
        log::debug!("Read {total_read_bytes} bytes on {total_file_size} expected.");
        return Ok(total_read_bytes);
      }

      total_read_bytes += read_size as u64;

      if let Err(error) = sender
        .send(StreamData::Data(buffer[0..read_size].to_vec()))
        .await
      {
        if channel.is_stopped() && sender.is_closed() {
          log::warn!("Data channel closed: could not send {read_size} read bytes.");
          break;
        }

        return Err(error.into());
      }
    }
    Ok(total_read_bytes)
  }
}