use crate::{
StreamData,
endpoint::S3Endpoint,
error::Error,
reader::{ReaderNotification, StreamReader},
};
use async_std::channel::Sender;
use async_trait::async_trait;
use aws_sdk_s3::Client;
use std::sync::Arc;
use tokio::io::AsyncReadExt;
impl S3Endpoint {
async fn read_file(
&self,
client: Arc<Client>,
path: &str,
bucket: &str,
sender: Sender<StreamData>,
channel: &dyn ReaderNotification,
) -> Result<u64, Error> {
let mut total_read_bytes: u64 = 0;
let cloned_bucket = bucket.to_string();
let cloned_key = path.to_string();
let cloned_client = client.clone();
let cloned_cloned_key = cloned_key.clone();
let cloned_cloned_bucket = cloned_bucket.clone();
let head = cloned_client
.head_object()
.bucket(cloned_cloned_bucket)
.key(cloned_cloned_key)
.send()
.await?;
if let Some(size) = head.content_length {
sender.send(StreamData::Size(size as u64)).await?;
}
let object = client
.get_object()
.bucket(cloned_bucket)
.key(cloned_key)
.send()
.await?;
let s3_byte_stream = object.body;
let mut reader = s3_byte_stream.into_async_read();
let buffer_size = if let Ok(buffer_size) = std::env::var("S3_READER_BUFFER_SIZE") {
buffer_size
.parse::<u32>()
.map_err(|error| Error::from(("S3_READER_BUFFER_SIZE", error)))? as usize
} else {
1024 * 1024
};
loop {
if channel.is_stopped() {
sender.send(StreamData::Stop).await?;
return Ok(total_read_bytes);
}
let mut buffer: Vec<u8> = vec![0; buffer_size];
let size = reader.read(&mut buffer).await?;
total_read_bytes += size as u64;
if size == 0 {
break;
}
if let Err(error) = sender
.send(StreamData::Data(buffer[0..size].to_vec()))
.await
{
if channel.is_stopped() && sender.is_closed() {
log::warn!("Data channel closed: could not send {size} read bytes.");
return Ok(total_read_bytes);
}
return Err(error.into());
}
}
sender.send(StreamData::Eof).await?;
Ok(total_read_bytes)
}
}
#[async_trait]
impl StreamReader for S3Endpoint {
async fn read_stream(
&self,
path: &str,
sender: Sender<StreamData>,
channel: &dyn ReaderNotification,
) -> Result<u64, Error> {
let cloned_bucket = self.bucket().to_string();
let cloned_path = path.to_string();
let client = self.connection();
self
.read_file(client, &cloned_path, &cloned_bucket, sender, channel)
.await
}
}