use crate::prelude::*;
use aws_config::Region;
use aws_config::meta::region::RegionProviderChain;
use aws_sdk_s3::Client;
use aws_sdk_s3::error::SdkError;
use aws_sdk_s3::operation::head_bucket::HeadBucketError;
use aws_sdk_s3::operation::head_object::HeadObjectError;
use beet_core::prelude::*;
use bytes::Bytes;
#[derive(Clone, Deref, DerefMut, Resource)]
pub struct S3Provider(pub Client);
impl<T: TableRow> TableProvider<T> for S3Provider {
fn box_clone_table(&self) -> Box<dyn TableProvider<T>> {
Box::new(self.clone())
}
}
impl S3Provider {
pub async fn create() -> Self {
Self::create_with_region("us-west-2").await
}
pub async fn create_with_region(region: &str) -> Self {
let region = Region::new(region.to_string());
let config = aws_config::from_env()
.region(RegionProviderChain::default_provider().or_else(region))
.load()
.await;
Self(Client::new(&config))
}
fn resolve_key(&self, path: &RoutePath) -> String {
path.to_string().trim_start_matches('/').to_string()
}
}
impl BucketProvider for S3Provider {
fn box_clone(&self) -> Box<dyn BucketProvider> {
Box::new(Self(self.0.clone()))
}
fn region(&self) -> Option<String> {
self.0.config().region().map(|r| r.to_string())
}
fn bucket_exists(
&self,
bucket_name: &str,
) -> SendBoxedFuture<Result<bool>> {
let client = self.0.clone();
let bucket_name = bucket_name.to_string();
Box::pin(async move {
match client.head_bucket().bucket(&bucket_name).send().await {
Ok(_) => Ok(true),
Err(SdkError::ServiceError(service_err))
if let HeadBucketError::NotFound(_) = service_err.err() =>
{
Ok(false)
}
Err(other) => {
bevybail!("Failed to check bucket: {:?}", other)
}
}
})
}
fn bucket_create(&self, bucket_name: &str) -> SendBoxedFuture<Result<()>> {
let client = self.0.clone();
let bucket_name = bucket_name.to_string();
Box::pin(async move {
let mut create_bucket_req =
client.create_bucket().bucket(&bucket_name);
use aws_sdk_s3::types::CreateBucketConfiguration;
let mut bucket_config = CreateBucketConfiguration::builder();
if let Some(region) = client.config().region() {
bucket_config = bucket_config
.location_constraint(region.to_string().as_str().into());
}
create_bucket_req = create_bucket_req
.create_bucket_configuration(bucket_config.build());
create_bucket_req.send().await?;
Ok(())
})
}
fn bucket_remove(&self, bucket_name: &str) -> SendBoxedFuture<Result<()>> {
let client = self.0.clone();
let bucket_name = bucket_name.to_string();
Box::pin(async move {
let mut continuation_token = None;
loop {
let mut req = client.list_objects_v2().bucket(&bucket_name);
if let Some(token) = &continuation_token {
req = req.continuation_token(token);
}
let list_result = req.send().await?;
let contents = list_result.contents.unwrap_or_default();
if !contents.is_empty() {
let delete_objects = aws_sdk_s3::types::Delete::builder()
.set_objects(Some(
contents
.into_iter()
.filter_map(|obj| {
obj.key.map(|k| {
aws_sdk_s3::types::ObjectIdentifier::builder().key(k).build()
})
})
.collect::<Result<_, _>>()?,
))
.build()?;
client
.delete_objects()
.bucket(&bucket_name)
.delete(delete_objects)
.send()
.await?;
}
if list_result.is_truncated == Some(true) {
continuation_token = list_result.next_continuation_token;
if continuation_token.is_none() {
break;
}
} else {
break;
}
}
client.delete_bucket().bucket(&bucket_name).send().await?;
Ok(())
})
}
fn insert(
&self,
bucket_name: &str,
path: &RoutePath,
body: Bytes,
) -> SendBoxedFuture<Result<()>> {
let client = self.0.clone();
let bucket_name = bucket_name.to_string();
let key = self.resolve_key(path);
Box::pin(async move {
client
.put_object()
.bucket(&bucket_name)
.key(&key)
.body(body.to_vec().into())
.send()
.await?;
Ok(())
})
}
fn exists(
&self,
bucket_name: &str,
path: &RoutePath,
) -> SendBoxedFuture<Result<bool>> {
let client = self.0.clone();
let bucket_name = bucket_name.to_string();
let key = self.resolve_key(path);
Box::pin(async move {
let head_result = client
.head_object()
.bucket(&bucket_name)
.key(&key)
.send()
.await;
match head_result {
Ok(_) => Ok(true),
Err(SdkError::ServiceError(service_err))
if let HeadObjectError::NotFound(_) = service_err.err() =>
{
Ok(false)
}
Err(err) => Err(err.into()),
}
})
}
fn list(
&self,
bucket_name: &str,
) -> SendBoxedFuture<Result<Vec<RoutePath>>> {
let client = self.0.clone();
let bucket_name = bucket_name.to_string();
Box::pin(async move {
let mut paths = Vec::new();
let mut continuation_token = None;
loop {
let mut req = client.list_objects_v2().bucket(&bucket_name);
if let Some(token) = &continuation_token {
req = req.continuation_token(token);
}
let list_result = req.send().await?;
let contents = list_result.contents.unwrap_or_default();
paths.extend(
contents
.into_iter()
.filter_map(|obj| obj.key.map(RoutePath::new)),
);
if list_result.is_truncated == Some(true) {
continuation_token = list_result.next_continuation_token;
if continuation_token.is_none() {
break;
}
} else {
break;
}
}
Ok(paths)
})
}
fn get(
&self,
bucket_name: &str,
path: &RoutePath,
) -> SendBoxedFuture<Result<Bytes>> {
let client = self.0.clone();
let bucket_name = bucket_name.to_string();
let key = self.resolve_key(path);
Box::pin(async move {
let get_result = client
.get_object()
.bucket(&bucket_name)
.key(&key)
.send()
.await?;
let body_bytes = get_result.body.collect().await?.into_bytes();
Ok(body_bytes)
})
}
fn remove(
&self,
bucket_name: &str,
path: &RoutePath,
) -> SendBoxedFuture<Result<()>> {
let this = self.clone();
let bucket_name = bucket_name.to_string();
let path = path.clone();
let key = this.resolve_key(&path);
Box::pin(async move {
match this.exists(&bucket_name, &path).await? {
true => {
this.0
.delete_object()
.bucket(&bucket_name)
.key(&key)
.send()
.await?;
Ok(())
}
false => {
bevybail!("Object not found: {}", key);
}
}
})
}
fn public_url(
&self,
bucket_name: &str,
path: &RoutePath,
) -> SendBoxedFuture<Result<Option<String>>> {
let region = self.region().unwrap_or_else(|| "us-west-2".to_string());
let bucket_name = bucket_name.to_string();
let key = self.resolve_key(path);
let public_url =
format!("https://{bucket_name}.s3.{region}.amazonaws.com/{key}",);
Box::pin(async move { Ok(Some(public_url)) })
}
}
#[cfg(test)]
mod test {
use super::*;
use beet_core::prelude::*;
#[beet_core::test]
#[ignore = "hits remote s3"]
async fn works() {
let provider = S3Provider::create().await;
bucket_test::run(provider).await;
}
#[beet_core::test]
#[ignore = "hits remote s3"]
async fn infra_bucket() -> Result<()> {
let client = S3Provider::create().await;
let bucket = Bucket::new(client, "beet-site-bucket-dev".to_string());
bucket.bucket_try_create().await?;
bucket.bucket_exists().await.xpect_ok();
bucket
.get(&RoutePath::new("index.html"))
.await
.unwrap()
.xmap(|bytes| String::from_utf8(bytes.to_vec()).unwrap())
.xpect_starts_with("<!DOCTYPE html>");
Ok(())
}
#[beet_core::test]
#[ignore = "hits remote s3"]
async fn s3_public_url() -> Result<()> {
let bucket_name: &str = "beet-test";
let client = S3Provider::create().await;
let test_key = RoutePath::from("test-file.txt");
Bucket::new(client, bucket_name.to_string())
.public_url(&test_key)
.await?
.unwrap()
.xpect_eq(format!(
"https://{bucket_name}.s3.us-west-2.amazonaws.com{test_key}"
));
Ok(())
}
}