rs_transfer 8.0.0

A simple crate to handle downloads and uploads on multiple providers
Documentation
use crate::{
  StreamData,
  endpoint::GcsEndpoint,
  error::Error,
  reader::{ReaderNotification, StreamReader},
};
use async_std::channel::Sender;
use async_trait::async_trait;
use futures_util::StreamExt;

const BUFFER_SIZE: usize = 1024 * 1024;

#[async_trait]
impl StreamReader for GcsEndpoint {
  async fn read_stream(
    &self,
    path: &str,
    sender: Sender<StreamData>,
    channel: &dyn ReaderNotification,
  ) -> Result<u64, Error> {
    let client = self.connection();

    let object = client.object().read(self.bucket(), path).await?;
    let file_size = object.size;

    let mut total_read_bytes = 0;

    sender.send(StreamData::Size(file_size)).await?;

    let stream = client
      .object()
      .download_streamed(self.bucket(), path)
      .await?;

    let mut chunks = stream.chunks(BUFFER_SIZE);

    loop {
      if channel.is_stopped() {
        sender.send(StreamData::Stop).await?;
        return Ok(total_read_bytes);
      }

      if let Some(chunk) = chunks.next().await {
        let vector = chunk
          .into_iter()
          .collect::<Result<Vec<u8>, cloud_storage::Error>>()?;

        let chunk_size = vector.len();
        send_buffer(&sender, channel, vector).await?;
        total_read_bytes += chunk_size as u64;
      } else {
        break;
      }
    }

    sender.send(StreamData::Eof).await?;
    Ok(total_read_bytes)
  }
}

async fn send_buffer(
  sender: &Sender<StreamData>,
  channel: &dyn ReaderNotification,
  buffer: Vec<u8>,
) -> Result<(), Error> {
  if let Err(error) = sender.send(StreamData::Data(buffer.clone())).await {
    if channel.is_stopped() && sender.is_closed() {
      log::warn!("Data channel closed: could not send read bytes.");
      return Ok(());
    }
    return Err(error.into());
  }
  Ok(())
}