rs_transfer 8.0.0

A simple crate to handle downloads and uploads on multiple providers
Documentation
use crate::{
  StreamData,
  endpoint::S3Endpoint,
  error::Error,
  reader::{ReaderNotification, StreamReader},
};
use async_std::channel::Sender;
use async_trait::async_trait;
use aws_sdk_s3::Client;
use std::sync::Arc;
use tokio::io::AsyncReadExt;

impl S3Endpoint {
  async fn read_file(
    &self,
    client: Arc<Client>,
    path: &str,
    bucket: &str,
    sender: Sender<StreamData>,
    channel: &dyn ReaderNotification,
  ) -> Result<u64, Error> {
    let mut total_read_bytes: u64 = 0;
    let cloned_bucket = bucket.to_string();
    let cloned_key = path.to_string();
    let cloned_client = client.clone();
    let cloned_cloned_key = cloned_key.clone();
    let cloned_cloned_bucket = cloned_bucket.clone();

    let head = cloned_client
      .head_object()
      .bucket(cloned_cloned_bucket)
      .key(cloned_cloned_key)
      .send()
      .await?;

    if let Some(size) = head.content_length {
      sender.send(StreamData::Size(size as u64)).await?;
    }

    let object = client
      .get_object()
      .bucket(cloned_bucket)
      .key(cloned_key)
      .send()
      .await?;

    let s3_byte_stream = object.body;

    let mut reader = s3_byte_stream.into_async_read();

    let buffer_size = if let Ok(buffer_size) = std::env::var("S3_READER_BUFFER_SIZE") {
      buffer_size
        .parse::<u32>()
        .map_err(|error| Error::from(("S3_READER_BUFFER_SIZE", error)))? as usize
    } else {
      1024 * 1024
    };

    loop {
      if channel.is_stopped() {
        sender.send(StreamData::Stop).await?;
        return Ok(total_read_bytes);
      }

      let mut buffer: Vec<u8> = vec![0; buffer_size];

      let size = reader.read(&mut buffer).await?;

      total_read_bytes += size as u64;

      if size == 0 {
        break;
      }

      if let Err(error) = sender
        .send(StreamData::Data(buffer[0..size].to_vec()))
        .await
      {
        if channel.is_stopped() && sender.is_closed() {
          log::warn!("Data channel closed: could not send {size} read bytes.");
          return Ok(total_read_bytes);
        }

        return Err(error.into());
      }
    }
    sender.send(StreamData::Eof).await?;
    Ok(total_read_bytes)
  }
}

#[async_trait]
impl StreamReader for S3Endpoint {
  async fn read_stream(
    &self,
    path: &str,
    sender: Sender<StreamData>,
    channel: &dyn ReaderNotification,
  ) -> Result<u64, Error> {
    let cloned_bucket = self.bucket().to_string();
    let cloned_path = path.to_string();
    let client = self.connection();

    self
      .read_file(client, &cloned_path, &cloned_bucket, sender, channel)
      .await
  }
}