rs_transfer 8.0.0

A simple crate to handle downloads and uploads on multiple providers
Documentation
use crate::{
  StreamData,
  endpoint::FtpEndpoint,
  error::Error,
  writer::{StreamWriter, WriteJob},
};
use async_std::channel::Receiver;
use async_trait::async_trait;
use ftp::FtpStream;
use std::{
  io::Write,
  path::{Path, PathBuf},
  sync::{Arc, Mutex},
};

fn get_directory(path: &str) -> Vec<String> {
  let destination_path = Path::new(path);
  destination_path
    .parent()
    .unwrap_or_else(|| Path::new("/"))
    .iter()
    .map(|item| item.to_os_string().to_str().unwrap().to_string())
    .collect()
}

fn get_filename(path: &str) -> Result<String, Error> {
  let destination_path = Path::new(path);
  Ok(
    destination_path
      .file_name()
      .ok_or_else(|| Error::Other("Cannot get destination filename.".to_string()))?
      .to_str()
      .ok_or_else(|| Error::Other("Cannot get destination filename as string.".to_string()))?
      .to_string(),
  )
}

impl FtpEndpoint {
  async fn upload_file(
    &self,
    ftp_stream: Arc<Mutex<FtpStream>>,
    path: &str,
    receiver: Receiver<StreamData>,
    job_and_notification: &dyn WriteJob,
  ) -> Result<(), Error> {
    let absolute_path = self.absolute_path(path);
    let destination_directory = get_directory(&absolute_path);
    let filename = get_filename(&absolute_path)?;

    // create destination directories if not exists
    let mut root_dir = PathBuf::from("/");

    for folder in destination_directory.iter() {
      if folder == "/" {
        continue;
      }

      root_dir = root_dir.join(folder);
      let pathname = root_dir.to_str().unwrap();
      let mut ftp_stream = ftp_stream.lock().unwrap();
      if ftp_stream.cwd(pathname).is_err() {
        ftp_stream.mkdir(pathname)?;
      }
    }
    ftp_stream.lock().unwrap().cwd(root_dir.to_str().unwrap())?;

    let mut file_size = None;
    let mut received_bytes = 0;
    let mut prev_percent = 0;

    log::debug!(target: &job_and_notification.get_str_job_id(), "Start FTP upload to file: {filename}, directory: {root_dir:?}.");
    let mut stream = ftp_stream.lock().unwrap().start_put_file(&filename)?;

    while let Ok(stream_data) = receiver.recv().await {
      match stream_data {
        StreamData::Size(size) => file_size = Some(size),
        StreamData::Stop => break,
        StreamData::Eof => {
          stream.flush()?;
          break;
        }
        StreamData::Data(ref data) => {
          received_bytes += data.len();
          if let Some(file_size) = file_size {
            let percent = (received_bytes as f32 / file_size as f32 * 100.0) as u8;

            if percent > prev_percent {
              prev_percent = percent;
              job_and_notification.progress(percent)?;
            }
          }

          stream.write_all(data)?;
        }
      }
    }
    Ok(())
  }
}

#[async_trait]
impl StreamWriter for FtpEndpoint {
  async fn write_stream(
    &self,
    path: &str,
    receiver: Receiver<StreamData>,
    job_and_notification: &dyn WriteJob,
  ) -> Result<(), Error> {
    let ftp_stream = self.connection();

    self
      .upload_file(ftp_stream.clone(), path, receiver, job_and_notification)
      .await?;

    log::info!(target: &job_and_notification.get_str_job_id(), "ending FTP data connection");
    ftp_stream.lock().unwrap().finish_put_file()?;

    log::info!(target: &job_and_notification.get_str_job_id(), "closing FTP connection");
    ftp_stream.lock().unwrap().quit()?;

    Ok(())
  }
}

#[test]
pub fn test_get_directory() {
  let path = "/path/to/directory/file.ext";
  let directory = get_directory(path);
  assert_eq!(
    directory,
    vec![
      "/".to_string(),
      "path".to_string(),
      "to".to_string(),
      "directory".to_string()
    ]
  );
}

#[test]
pub fn test_get_filename() {
  let path = "/path/to/directory/file.ext";
  let filename = get_filename(path).unwrap();
  assert_eq!(filename, "file.ext");
}