use crate::prelude::*;
use aws_config::Region;
use aws_config::meta::region::RegionProviderChain;
use aws_sdk_dynamodb::Client;
use aws_sdk_dynamodb::error::SdkError;
use aws_sdk_dynamodb::operation;
use aws_sdk_dynamodb::types::AttributeValue;
use aws_sdk_dynamodb::types::TableStatus;
use beet_core::prelude::*;
use bytes::Bytes;
#[derive(Clone, Deref, DerefMut, Resource)]
pub struct DynamoDbProvider(pub Client);
impl DynamoDbProvider {
pub async fn create() -> Self {
Self::create_with_region("us-west-2").await
}
pub async fn create_with_region(region: &str) -> Self {
let region = Region::new(region.to_string());
let config = aws_config::from_env()
.region(RegionProviderChain::default_provider().or_else(region))
.load()
.await;
Self(Client::new(&config))
}
fn resolve_key(&self, path: &RoutePath) -> AttributeValue {
let str = path.to_string().trim_start_matches('/').to_string();
AttributeValue::S(str)
}
async fn table_status(
&self,
table_name: &str,
) -> Result<Option<TableStatus>> {
match self.describe_table().table_name(table_name).send().await {
Ok(out) => {
let Some(desc) = out.table() else {
bevybail!("Failed to get table description: {out:?}")
};
let Some(status) = desc.table_status() else {
bevybail!("Failed to get table status: {out:?}")
};
Ok(Some(status.clone()))
}
Err(SdkError::ServiceError(service_err))
if matches!(
service_err.err(),
operation::describe_table::DescribeTableError::ResourceNotFoundException(_)
) =>
{
Ok(None)
}
Err(other) => {
bevybail!("Failed to check table: {other:?}")
}
}
}
async fn await_table_create(&self, table_name: &str) -> Result<()> {
let mut stream = Backoff::default().with_max_attempts(20).stream();
while let Some(_) = stream.next().await {
match self.table_status(table_name).await? {
Some(TableStatus::Creating) => {}
Some(TableStatus::Active) => return Ok(()),
status => {
bevybail!("Unexpected table state: {:?}", status);
}
}
}
bevybail!("Table did not become active in time");
}
async fn await_table_remove(&self, table_name: &str) -> Result<()> {
let mut stream = Backoff::default().with_max_attempts(20).stream();
while let Some(_) = stream.next().await {
match self.table_status(table_name).await? {
Some(TableStatus::Deleting) => {}
None => return Ok(()),
status => {
bevybail!("Unexpected table state: {:?}", status);
}
}
}
bevybail!("Table did not delete in time");
}
}
impl BucketProvider for DynamoDbProvider {
fn box_clone(&self) -> Box<dyn BucketProvider> {
Box::new(Self(self.0.clone()))
}
fn region(&self) -> Option<String> {
self.0.config().region().map(|r| r.to_string())
}
fn bucket_exists(&self, table_name: &str) -> SendBoxedFuture<Result<bool>> {
let table_name = table_name.to_string();
let this = self.clone();
Box::pin(async move {
match this.table_status(&table_name).await {
Ok(Some(TableStatus::Active)) => Ok(true),
Ok(Some(_)) => Ok(false),
Ok(None) => Ok(false),
Err(err) => Err(err),
}
})
}
fn bucket_create(&self, table_name: &str) -> SendBoxedFuture<Result<()>> {
let builder = self.create_table().table_name(table_name);
let table_name = table_name.to_string();
let this = self.clone();
Box::pin(async move {
let result = builder
.attribute_definitions(
aws_sdk_dynamodb::types::AttributeDefinition::builder()
.attribute_name("id")
.attribute_type(
aws_sdk_dynamodb::types::ScalarAttributeType::S,
)
.build()?,
)
.key_schema(
aws_sdk_dynamodb::types::KeySchemaElement::builder()
.attribute_name("id")
.key_type(aws_sdk_dynamodb::types::KeyType::Hash)
.build()?,
)
.provisioned_throughput(
aws_sdk_dynamodb::types::ProvisionedThroughput::builder()
.read_capacity_units(1)
.write_capacity_units(1)
.build()?,
)
.send()
.await;
match result {
Ok(_) => {
this.await_table_create(&table_name).await?;
Ok(())
}
Err(err) => bevybail!("Failed to create table: {:?}", err),
}
})
}
fn bucket_remove(&self, table_name: &str) -> SendBoxedFuture<Result<()>> {
let delete_fut = self.delete_table().table_name(table_name).send();
let this = self.clone();
let table_name = table_name.to_string();
Box::pin(async move {
delete_fut.await?;
this.await_table_remove(&table_name).await?;
Ok(())
})
}
fn insert(
&self,
table_name: &str,
path: &RoutePath,
body: Bytes,
) -> SendBoxedFuture<Result<()>> {
let fut = self
.put_item()
.table_name(table_name)
.item("id", self.resolve_key(path))
.item("data", AttributeValue::B(body.to_vec().into()))
.send();
Box::pin(async move {
fut.await?;
Ok(())
})
}
fn exists(
&self,
table_name: &str,
path: &RoutePath,
) -> SendBoxedFuture<Result<bool>> {
let fut = self
.get_item()
.table_name(table_name)
.key("id", self.resolve_key(path))
.send();
Box::pin(async move {
use aws_sdk_dynamodb::error::SdkError;
use aws_sdk_dynamodb::operation::get_item::GetItemError;
match fut.await {
Ok(out) => Ok(out.item.is_some()),
Err(SdkError::ServiceError(service_err))
if matches!(
service_err.err(),
GetItemError::ResourceNotFoundException(_)
) =>
{
Ok(false)
}
Err(other) => Err(other.into()),
}
})
}
fn list(
&self,
table_name: &str,
) -> SendBoxedFuture<Result<Vec<RoutePath>>> {
let fut = self.scan().table_name(table_name).send();
Box::pin(async move {
let out = fut.await?;
let mut paths = Vec::new();
if let Some(items) = out.items {
for item in items {
if let Some(AttributeValue::S(id)) = item.get("id") {
paths.push(RoutePath::new(id));
}
}
}
Ok(paths)
})
}
fn get(
&self,
table_name: &str,
path: &RoutePath,
) -> SendBoxedFuture<Result<Bytes>> {
let fut = self
.get_item()
.table_name(table_name)
.key("id", self.resolve_key(path))
.send();
Box::pin(async move {
let out = fut.await?;
let Some(item) = out.item else {
bevybail!("Item not found");
};
let Some(AttributeValue::B(data)) = item.get("data") else {
bevybail!("No data field found");
};
Ok(Bytes::from(data.clone().into_inner()))
})
}
fn remove(
&self,
table_name: &str,
path: &RoutePath,
) -> SendBoxedFuture<Result<()>> {
let fut = self
.delete_item()
.table_name(table_name)
.key("id", self.resolve_key(path))
.return_values(aws_sdk_dynamodb::types::ReturnValue::AllOld)
.send();
Box::pin(async move {
let result = fut.await?;
if result.attributes.is_none() {
bevybail!("Item not found");
}
Ok(())
})
}
fn public_url(
&self,
_table_name: &str,
_path: &RoutePath,
) -> SendBoxedFuture<Result<Option<String>>> {
Box::pin(async move { Ok(None) })
}
}
impl<T: TableRow> TableProvider<T> for DynamoDbProvider {
fn box_clone_table(&self) -> Box<dyn TableProvider<T>> {
Box::new(self.clone())
}
fn insert_row(&self, table_name: &str, body: T) -> SendBoxedFuture<Result> {
let Ok(item) = serde_dynamo::to_item(body) else {
return Box::pin(async move {
bevybail!("Failed to serialize item for dynamo");
});
};
let fut = self
.put_item()
.table_name(table_name)
.set_item(Some(item))
.send();
Box::pin(async move {
fut.await?;
Ok(())
})
}
fn get_row(
&self,
table_name: &str,
id: Uuid,
) -> SendBoxedFuture<Result<T>> {
let fut = self
.get_item()
.table_name(table_name)
.key("id", AttributeValue::S(id.to_string()))
.send();
Box::pin(async move {
let out = fut.await?;
let Some(item) = out.item else {
bevybail!("Item not found");
};
let item = serde_dynamo::from_item(item)?;
Ok(item)
})
}
}
#[cfg(test)]
mod test {
use super::*;
#[beet_core::test]
#[ignore = "takes ages"]
async fn bucket() {
let provider = DynamoDbProvider::create().await;
bucket_test::run(provider).await;
}
#[beet_core::test]
#[ignore = "takes ages"]
async fn table() {
let provider = DynamoDbProvider::create().await;
table_test::run(provider).await;
}
}