#![allow(clippy::default_trait_access)]
#![allow(clippy::module_name_repetitions)]
#![allow(clippy::redundant_closure)]
#![allow(clippy::redundant_closure_for_method_calls)]
#![allow(clippy::should_implement_trait)]
#![allow(clippy::must_use_candidate)]
#![allow(clippy::missing_errors_doc)]
#![allow(clippy::type_repetition_in_bounds)]
pub mod iter;
use crate::iter::{GetObjectStream, ObjectStream};
pub mod error;
use crate::error::{S3ExtError, S3ExtResult};
mod upload;
use async_trait::async_trait;
use log::debug;
use rusoto_core::{
request::{HttpClient, TlsError},
Region,
};
use rusoto_credential::StaticProvider;
use rusoto_s3::{
CompleteMultipartUploadOutput, GetObjectOutput, GetObjectRequest, PutObjectOutput,
PutObjectRequest, S3Client, StreamingBody, S3,
};
use std::{convert::AsRef, path::Path};
use tokio::{
fs::{File, OpenOptions},
io,
};
pub fn new_s3client_with_credentials(
region: Region,
access_key: String,
secret_key: String,
) -> Result<S3Client, TlsError> {
Ok(S3Client::new_with(
HttpClient::new()?,
StaticProvider::new_minimal(access_key, secret_key),
region,
))
}
#[async_trait]
pub trait S3Ext {
async fn download_to_file<F>(
&self,
source: GetObjectRequest,
target: F,
) -> S3ExtResult<GetObjectOutput>
where
F: AsRef<Path> + Send;
async fn upload_from_file<F>(
&self,
source: F,
target: PutObjectRequest,
) -> S3ExtResult<PutObjectOutput>
where
F: AsRef<Path> + Send;
async fn upload_from_file_multipart<F>(
&self,
source: F,
target: &PutObjectRequest,
part_size: usize,
) -> S3ExtResult<CompleteMultipartUploadOutput>
where
F: AsRef<Path> + Send;
async fn download<W>(
&self,
source: GetObjectRequest,
target: &mut W,
) -> S3ExtResult<GetObjectOutput>
where
W: io::AsyncWrite + Unpin + Send;
async fn upload<R>(
&self,
source: &mut R,
target: PutObjectRequest,
) -> S3ExtResult<PutObjectOutput>
where
R: io::AsyncRead + Unpin + Send;
async fn upload_multipart<R>(
&self,
source: &mut R,
target: &PutObjectRequest,
part_size: usize,
) -> S3ExtResult<CompleteMultipartUploadOutput>
where
R: io::AsyncRead + Unpin + Send;
fn stream_objects(&self, bucket: &str) -> ObjectStream;
fn stream_objects_with_prefix(&self, bucket: &str, prefix: &str) -> ObjectStream;
fn stream_get_objects(&self, bucket: &str) -> GetObjectStream;
fn stream_get_objects_with_prefix(&self, bucket: &str, prefix: &str) -> GetObjectStream;
}
#[async_trait]
impl S3Ext for S3Client {
async fn download_to_file<F>(
&self,
source: GetObjectRequest,
target: F,
) -> Result<GetObjectOutput, S3ExtError>
where
F: AsRef<Path> + Send,
{
debug!("downloading to file {:?}", target.as_ref());
let mut resp = self.get_object(source).await?;
let body = resp.body.take().expect("no body");
let mut target = OpenOptions::new()
.write(true)
.create_new(true)
.open(target)
.await?;
copy(body, &mut target).await?;
Ok(resp)
}
#[inline]
async fn upload_from_file<F>(
&self,
source: F,
target: PutObjectRequest,
) -> S3ExtResult<PutObjectOutput>
where
F: AsRef<Path> + Send,
{
debug!("uploading file {:?}", source.as_ref());
let mut source = File::open(source).await?;
upload::upload(&self, &mut source, target).await
}
#[inline]
async fn upload_from_file_multipart<F>(
&self,
source: F,
target: &PutObjectRequest,
part_size: usize,
) -> S3ExtResult<CompleteMultipartUploadOutput>
where
F: AsRef<Path> + Send,
{
debug!("uploading file {:?}", source.as_ref());
let mut source = File::open(source).await?;
upload::upload_multipart(&self, &mut source, target, part_size).await
}
async fn download<W>(
&self,
source: GetObjectRequest,
mut target: &mut W,
) -> S3ExtResult<GetObjectOutput>
where
W: io::AsyncWrite + Unpin + Send,
{
let mut resp = self.get_object(source).await?;
let body = resp.body.take().expect("no body");
copy(body, &mut target).await?;
Ok(resp)
}
#[inline]
async fn upload<R>(
&self,
source: &mut R,
target: PutObjectRequest,
) -> S3ExtResult<PutObjectOutput>
where
R: io::AsyncRead + Unpin + Send,
{
upload::upload(&self, source, target).await
}
#[inline]
async fn upload_multipart<R>(
&self,
mut source: &mut R,
target: &PutObjectRequest,
part_size: usize,
) -> S3ExtResult<CompleteMultipartUploadOutput>
where
R: io::AsyncRead + Unpin + Send,
{
upload::upload_multipart(&self, &mut source, target, part_size).await
}
#[inline]
fn stream_objects(&self, bucket: &str) -> ObjectStream {
ObjectStream::new(self, bucket, None)
}
#[inline]
fn stream_objects_with_prefix(&self, bucket: &str, prefix: &str) -> ObjectStream {
ObjectStream::new(self, bucket, Some(prefix))
}
#[inline]
fn stream_get_objects(&self, bucket: &str) -> GetObjectStream {
GetObjectStream::new(self, bucket, None)
}
#[inline]
fn stream_get_objects_with_prefix(&self, bucket: &str, prefix: &str) -> GetObjectStream {
GetObjectStream::new(self, bucket, Some(prefix))
}
}
async fn copy<W>(src: StreamingBody, dest: &mut W) -> S3ExtResult<()>
where
W: io::AsyncWrite + Unpin + Send,
{
io::copy(&mut src.into_async_read(), dest).await?;
Ok(())
}