rs_transfer 8.0.0

A simple crate to handle downloads and uploads on multiple providers
Documentation
use crate::{
  StreamData,
  endpoint::HttpEndpoint,
  error::Error,
  reader::{ReaderNotification, StreamReader},
};
use async_std::channel::Sender;
use async_trait::async_trait;
use futures::StreamExt;
use reqwest::StatusCode;
use tokio::runtime::Runtime;

#[async_trait]
impl StreamReader for HttpEndpoint {
  async fn read_stream(
    &self,
    path: &str,
    sender: Sender<StreamData>,
    channel: &dyn ReaderNotification,
  ) -> Result<u64, Error> {
    let mut total_read_bytes: u64 = 0;
    Runtime::new()
      .expect("Failed to create Tokio runtime")
      .block_on(async {
        let client = reqwest::Client::builder().build()?;

        let method = self.get_method()?;
        let url = self.get_url(path)?;

        let mut request_builder = client.request(method, url);
        request_builder = request_builder.headers(self.get_headers()?);

        if let Some(body) = self.body() {
          request_builder = request_builder.body(body);
        }

        let request = request_builder.build()?;

        let response = client.execute(request).await?;

        if response.status() != StatusCode::OK {
          return Err(Error::HttpStatus(response.status()));
        }

        let mut bytes_stream = response.bytes_stream();

        loop {
          if channel.is_stopped() {
            sender.send(StreamData::Stop).await?;
            return Ok(());
          }

          if let Some(Ok(bytes)) = bytes_stream.next().await {
            // let data_bytes = bytes.await.unwrap();
            total_read_bytes += bytes.len() as u64;
            if let Err(error) = sender.send(StreamData::Data(bytes.to_vec())).await {
              if channel.is_stopped() && sender.is_closed() {
                log::warn!(
                  "Data channel closed: could not send {} read bytes.",
                  bytes.len()
                );
                return Ok(());
              }

              return Err(error.into());
            }
          } else {
            sender.send(StreamData::Eof).await?;
            return Ok(());
          }
        }
      })?;
    Ok(total_read_bytes)
  }
}