use std::{
path::{Path, PathBuf},
str::FromStr,
time::SystemTime,
};
use async_trait::async_trait;
use aws_sdk_s3::{
config::Credentials,
error::SdkError,
primitives::ByteStream,
types::{Delete, ObjectIdentifier},
Client,
};
use aws_types::region::Region;
use super::{Driver, DriverError, DriverResult};
use crate::contents::Contents;
pub struct Config {
pub bucket: String,
pub region: String,
pub credentials: Option<ClientCredentials>,
}
pub struct ClientCredentials {
pub access_key: String,
pub secret_key: String,
pub session_token: Option<String>,
}
#[derive(Clone)]
#[allow(clippy::module_name_repetitions)]
pub struct AwsS3 {
client: Client,
bucket: String,
}
impl AwsS3 {
#[must_use]
pub fn new(config: Config) -> Self {
let mut client_builder = aws_sdk_s3::Config::builder()
.force_path_style(true)
.region(Region::new(config.region));
if let Some(credentials) = config.credentials {
let cred = Credentials::new(
credentials.access_key,
credentials.secret_key,
credentials.session_token,
None,
"active-store",
);
client_builder = client_builder.credentials_provider(cred);
}
Self {
bucket: config.bucket,
client: Client::from_conf(client_builder.build()),
}
}
#[must_use]
pub fn with_client(client: Client, bucket: &str) -> Self {
Self {
client,
bucket: bucket.to_string(),
}
}
async fn get_all_files_in_path(&self, path: &Path) -> DriverResult<Vec<std::path::PathBuf>> {
let mut paths = Vec::new();
let request = self
.client
.list_objects_v2()
.bucket(&self.bucket)
.prefix(format!(
"{}/",
path.to_str().ok_or(DriverError::InvalidPath)?
));
let mut response = request.into_paginator().send();
while let Some(result) = response.next().await {
let contents = match result {
Ok(result) => result.contents.unwrap_or_default(),
Err(error) => return Err(error.into()),
};
paths.extend(
contents
.iter()
.filter_map(|content| content.key())
.map(|s| PathBuf::from_str(s).unwrap()),
);
}
Ok(paths)
}
}
#[async_trait]
impl Driver for AwsS3 {
async fn read(&self, path: &Path) -> DriverResult<Vec<u8>> {
let request = match self
.client
.get_object()
.bucket(&self.bucket)
.key(path.to_str().ok_or(DriverError::InvalidPath)?)
.send()
.await
{
Ok(request) => request,
Err(error) => {
return Err(error.into());
}
};
Ok(Contents::from_bytestream(request.body)
.await
.map_err(|_| DriverError::DecodeError)?
.into())
}
async fn file_exists(&self, path: &Path) -> DriverResult<bool> {
let request = self
.client
.head_object()
.bucket(&self.bucket)
.key(path.to_str().ok_or(DriverError::InvalidPath)?)
.send()
.await;
match request {
Ok(_) => Ok(true),
Err(SdkError::ServiceError(error)) => {
if error.err().is_not_found() {
return Ok(false);
}
Err(SdkError::ServiceError(error).into())
}
Err(e) => Err(e.into()),
}
}
async fn write(&self, path: &Path, content: Vec<u8>) -> DriverResult<()> {
match self
.client
.put_object()
.bucket(&self.bucket)
.key(path.to_str().ok_or(DriverError::InvalidPath)?)
.body(ByteStream::from(content))
.send()
.await
{
Ok(_put) => Ok(()),
Err(error) => Err(error.into()),
}
}
async fn delete(&self, path: &Path) -> DriverResult<()> {
if !self.file_exists(path).await? {
return Err(DriverError::ResourceNotFound);
}
if let Err(err) = self
.client
.delete_object()
.bucket(&self.bucket)
.key(path.to_str().ok_or(DriverError::InvalidPath)?)
.send()
.await
{
return Err(err.into());
}
Ok(())
}
async fn delete_directory(&self, path: &Path) -> DriverResult<()> {
let paths_to_delete = self.get_all_files_in_path(path).await?;
if paths_to_delete.is_empty() {
return Err(DriverError::ResourceNotFound);
}
if let Err(err) = self
.client
.delete_objects()
.bucket(&self.bucket)
.delete(
Delete::builder()
.set_objects(Some(
paths_to_delete
.iter()
.map(|path| {
ObjectIdentifier::builder()
.key(path.to_str().unwrap().to_string())
.build()
.unwrap()
})
.collect(),
))
.build()
.unwrap(),
)
.send()
.await
{
return Err(err.into());
}
Ok(())
}
async fn last_modified(&self, path: &Path) -> DriverResult<SystemTime> {
let response = self
.client
.head_object()
.bucket(&self.bucket)
.key(path.to_str().ok_or(DriverError::InvalidPath)?)
.send()
.await;
match response {
Ok(response) => Ok(SystemTime::try_from(
response
.last_modified
.ok_or(DriverError::Any("last modify is missing".into()))?,
)
.map_err(Box::from)?),
Err(e) => Err(e.into()),
}
}
}
type AwsApiError<T> = aws_smithy_runtime_api::client::result::SdkError<
T,
aws_smithy_runtime_api::client::orchestrator::HttpResponse,
>;
impl From<AwsApiError<aws_sdk_s3::operation::get_object::GetObjectError>> for DriverError {
fn from(kind: AwsApiError<aws_sdk_s3::operation::get_object::GetObjectError>) -> Self {
match kind {
aws_smithy_runtime_api::client::result::SdkError::ConstructionFailure(_)
| aws_smithy_runtime_api::client::result::SdkError::TimeoutError(_)
| aws_smithy_runtime_api::client::result::SdkError::DispatchFailure(_) => {
Self::Network()
}
aws_smithy_runtime_api::client::result::SdkError::ResponseError(err) => {
let raw = err.raw();
if raw.status().as_u16() == 404 {
Self::ResourceNotFound
} else {
Self::Network()
}
}
aws_smithy_runtime_api::client::result::SdkError::ServiceError(err) => {
match err.err() {
aws_sdk_s3::operation::get_object::GetObjectError::NoSuchKey(_) => {
Self::ResourceNotFound
}
_ => Self::Any(err.err().to_string().into()),
}
}
_ => Self::Any(Box::new(kind) as Box<_>),
}
}
}
impl From<AwsApiError<aws_sdk_s3::operation::head_object::HeadObjectError>> for DriverError {
fn from(kind: AwsApiError<aws_sdk_s3::operation::head_object::HeadObjectError>) -> Self {
match kind {
aws_smithy_runtime_api::client::result::SdkError::ConstructionFailure(_)
| aws_smithy_runtime_api::client::result::SdkError::TimeoutError(_)
| aws_smithy_runtime_api::client::result::SdkError::DispatchFailure(_) => {
Self::Network()
}
aws_smithy_runtime_api::client::result::SdkError::ResponseError(err) => {
let raw = err.raw();
if raw.status().as_u16() == 404 {
Self::ResourceNotFound
} else {
Self::Network()
}
}
aws_smithy_runtime_api::client::result::SdkError::ServiceError(err) => {
match err.err() {
aws_sdk_s3::operation::head_object::HeadObjectError::NotFound(_) => {
Self::ResourceNotFound
}
_ => Self::Any(err.err().to_string().into()),
}
}
_ => Self::Any(Box::new(kind)),
}
}
}
impl From<AwsApiError<aws_sdk_s3::operation::put_object::PutObjectError>> for DriverError {
fn from(kind: AwsApiError<aws_sdk_s3::operation::put_object::PutObjectError>) -> Self {
match kind {
aws_smithy_runtime_api::client::result::SdkError::ConstructionFailure(_)
| aws_smithy_runtime_api::client::result::SdkError::TimeoutError(_)
| aws_smithy_runtime_api::client::result::SdkError::DispatchFailure(_) => {
Self::Network()
}
aws_smithy_runtime_api::client::result::SdkError::ResponseError(err) => {
let raw = err.raw();
if raw.status().as_u16() == 404 {
Self::ResourceNotFound
} else {
Self::Network()
}
}
aws_smithy_runtime_api::client::result::SdkError::ServiceError(err) => {
Self::Any(err.err().to_string().into())
}
_ => Self::Any(Box::new(kind)),
}
}
}
impl From<AwsApiError<aws_sdk_s3::operation::delete_object::DeleteObjectError>> for DriverError {
fn from(kind: AwsApiError<aws_sdk_s3::operation::delete_object::DeleteObjectError>) -> Self {
match kind {
aws_smithy_runtime_api::client::result::SdkError::ConstructionFailure(_)
| aws_smithy_runtime_api::client::result::SdkError::TimeoutError(_)
| aws_smithy_runtime_api::client::result::SdkError::DispatchFailure(_) => {
Self::Network()
}
aws_smithy_runtime_api::client::result::SdkError::ResponseError(err) => {
let raw = err.raw();
if raw.status().as_u16() == 404 {
Self::ResourceNotFound
} else {
Self::Network()
}
}
aws_smithy_runtime_api::client::result::SdkError::ServiceError(err) => {
Self::Any(err.err().to_string().into())
}
_ => Self::Any(Box::new(kind)),
}
}
}
impl From<AwsApiError<aws_sdk_s3::operation::delete_objects::DeleteObjectsError>> for DriverError {
fn from(kind: AwsApiError<aws_sdk_s3::operation::delete_objects::DeleteObjectsError>) -> Self {
match kind {
aws_smithy_runtime_api::client::result::SdkError::ConstructionFailure(_)
| aws_smithy_runtime_api::client::result::SdkError::TimeoutError(_)
| aws_smithy_runtime_api::client::result::SdkError::DispatchFailure(_) => {
Self::Network()
}
aws_smithy_runtime_api::client::result::SdkError::ResponseError(err) => {
let raw = err.raw();
if raw.status().as_u16() == 404 {
Self::ResourceNotFound
} else {
Self::Network()
}
}
aws_smithy_runtime_api::client::result::SdkError::ServiceError(err) => {
Self::Any(err.err().to_string().into())
}
_ => Self::Any(Box::new(kind)),
}
}
}
impl From<AwsApiError<aws_sdk_s3::operation::list_objects_v2::ListObjectsV2Error>> for DriverError {
fn from(kind: AwsApiError<aws_sdk_s3::operation::list_objects_v2::ListObjectsV2Error>) -> Self {
match kind {
aws_smithy_runtime_api::client::result::SdkError::ConstructionFailure(_)
| aws_smithy_runtime_api::client::result::SdkError::TimeoutError(_)
| aws_smithy_runtime_api::client::result::SdkError::DispatchFailure(_) => {
Self::Network()
}
aws_smithy_runtime_api::client::result::SdkError::ResponseError(err) => {
let raw = err.raw();
if raw.status().as_u16() == 404 {
Self::ResourceNotFound
} else {
Self::Network()
}
}
aws_smithy_runtime_api::client::result::SdkError::ServiceError(err) => {
match err.err() {
aws_sdk_s3::operation::list_objects_v2::ListObjectsV2Error::NoSuchBucket(_) => {
Self::ResourceNotFound
}
_ => Self::Any(err.err().to_string().into()),
}
}
_ => Self::Any(Box::new(kind)),
}
}
}