pub mod config;
#[cfg(feature = "__test_helpers")]
#[doc(hidden)]
pub mod test_helpers;
use std::time::Duration;
use aoc_leaderboard::aoc::Leaderboard;
use aoc_leaderbot_lib::ErrorKind;
use aoc_leaderbot_lib::leaderbot::Storage;
use aws_config::SdkConfig;
use aws_sdk_dynamodb::operation::create_table::CreateTableOutput;
use aws_sdk_dynamodb::types::{
AttributeDefinition, AttributeValue, KeySchemaElement, KeyType, ScalarAttributeType,
TableDescription, TableStatus,
};
use serde::{Deserialize, Serialize};
use tokio::time::sleep;
use crate::error::DynamoDbError;
use crate::leaderbot::storage::aws::dynamodb::config::table::{CreateTableBuilderExt, TableConfig};
pub const HASH_KEY: &str = "leaderboard_id";
pub const RANGE_KEY: &str = "year";
pub const LEADERBOARD_DATA: &str = "leaderboard_data";
pub const LAST_ERROR: &str = "last_error";
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct DynamoDbLeaderboardData {
pub leaderboard_id: u64,
pub year: i32,
#[serde(default)]
pub leaderboard_data: Option<Leaderboard>,
#[serde(default)]
pub last_error: Option<ErrorKind>,
}
impl DynamoDbLeaderboardData {
pub fn for_success(year: i32, leaderboard_id: u64, leaderboard: Leaderboard) -> Self {
Self { leaderboard_id, year, leaderboard_data: Some(leaderboard), last_error: None }
}
}
#[derive(Debug, Clone)]
pub struct DynamoDbStorage {
client: aws_sdk_dynamodb::Client,
table_name: String,
}
impl DynamoDbStorage {
#[cfg_attr(coverage_nightly, coverage(off))]
pub async fn new<T>(table_name: T) -> Self
where
T: Into<String>,
{
let config = aws_config::load_from_env().await;
Self::with_config(&config, table_name).await
}
pub async fn with_config<T>(config: &SdkConfig, table_name: T) -> Self
where
T: Into<String>,
{
Self { client: aws_sdk_dynamodb::Client::new(config), table_name: table_name.into() }
}
#[cfg_attr(not(coverage), tracing::instrument(skip(self), ret, err))]
pub async fn create_table(&self, table_config: Option<TableConfig>) -> crate::Result<()> {
let output = self
.client
.create_table()
.table_name(self.table_name.clone())
.set_attribute_definitions(Some(vec![
Self::attribute_definition(HASH_KEY, ScalarAttributeType::N),
Self::attribute_definition(RANGE_KEY, ScalarAttributeType::N),
]))
.set_key_schema(Some(vec![
Self::key_schema_element(HASH_KEY, KeyType::Hash),
Self::key_schema_element(RANGE_KEY, KeyType::Range),
]))
.table_config(table_config)
.send()
.await
.map_err(|source| DynamoDbError::CreateTable {
table_name: self.table_name.clone(),
source: Box::new(source).into(),
})?;
self.wait_for_table_creation(&output).await
}
fn attribute_definition(
attribute_name: &str,
attribute_type: ScalarAttributeType,
) -> AttributeDefinition {
AttributeDefinition::builder()
.attribute_name(attribute_name)
.attribute_type(attribute_type)
.build()
.expect("all attributes for attribution definition should be set")
}
fn key_schema_element(attribute_name: &str, key_type: KeyType) -> KeySchemaElement {
KeySchemaElement::builder()
.attribute_name(attribute_name)
.key_type(key_type)
.build()
.expect("all attributes for key schema element should be set")
}
#[cfg_attr(coverage_nightly, coverage(off))]
#[tracing::instrument(skip_all, level = "trace", ret, err)]
async fn wait_for_table_creation(
&self,
create_output: &CreateTableOutput,
) -> crate::Result<()> {
let mut status = create_output
.table_description()
.and_then(TableDescription::table_status)
.cloned();
while let Some(TableStatus::Creating) = status {
sleep(Duration::from_millis(100)).await;
let output = self
.client
.describe_table()
.table_name(self.table_name.clone())
.send()
.await
.map_err(|source| DynamoDbError::CreateTable {
table_name: self.table_name.clone(),
source: Box::new(source).into(),
})?;
status = output
.table()
.and_then(TableDescription::table_status)
.cloned();
}
Ok(())
}
}
impl Storage for DynamoDbStorage {
type Err = crate::Error;
#[cfg_attr(not(coverage), tracing::instrument(skip(self), ret, err))]
async fn load_previous(
&self,
year: i32,
leaderboard_id: u64,
) -> Result<(Option<Leaderboard>, Option<ErrorKind>), Self::Err> {
let load_previous_error =
|source| DynamoDbError::LoadPreviousLeaderboard { leaderboard_id, year, source };
Ok(self
.client
.get_item()
.table_name(self.table_name.clone())
.key(HASH_KEY, AttributeValue::N(leaderboard_id.to_string()))
.key(RANGE_KEY, AttributeValue::N(year.to_string()))
.send()
.await
.map_err(|err| load_previous_error(Box::new(err).into()))?
.item
.map(|item| {
let data: Result<DynamoDbLeaderboardData, _> = serde_dynamo::from_item(item);
data.map(|data| (data.leaderboard_data, data.last_error))
})
.transpose()
.map(Option::unwrap_or_default)
.map_err(|err| load_previous_error(err.into()))?)
}
#[cfg_attr(not(coverage), tracing::instrument(skip(self), ret, err))]
async fn save_success(
&mut self,
year: i32,
leaderboard_id: u64,
leaderboard: &Leaderboard,
) -> Result<(), Self::Err> {
let save_error = |source| DynamoDbError::SaveLeaderboard { leaderboard_id, year, source };
let leaderboard_data =
DynamoDbLeaderboardData::for_success(year, leaderboard_id, leaderboard.clone());
let item = serde_dynamo::to_item(leaderboard_data).map_err(|err| save_error(err.into()))?;
self.client
.put_item()
.table_name(self.table_name.clone())
.set_item(Some(item))
.send()
.await
.map_err(|err| save_error(Box::new(err).into()))?;
Ok(())
}
#[cfg_attr(not(coverage), tracing::instrument(skip(self), ret, err))]
async fn save_error(
&mut self,
year: i32,
leaderboard_id: u64,
error_kind: ErrorKind,
) -> Result<(), Self::Err> {
let save_error = |source| DynamoDbError::SaveLastError { leaderboard_id, year, source };
let attribute_value =
serde_dynamo::to_attribute_value(error_kind).map_err(|err| save_error(err.into()))?;
self.client
.update_item()
.table_name(self.table_name.clone())
.key(HASH_KEY, AttributeValue::N(leaderboard_id.to_string()))
.key(RANGE_KEY, AttributeValue::N(year.to_string()))
.update_expression("SET #last_error = :last_error")
.expression_attribute_names("#last_error", LAST_ERROR)
.expression_attribute_values(":last_error", attribute_value)
.send()
.await
.map_err(|err| save_error(Box::new(err).into()))?;
Ok(())
}
}