use aws_config::{retry::RetryConfig, BehaviorVersion, Region, SdkConfig};
use aws_credential_types::{provider::SharedCredentialsProvider, Credentials};
use aws_sdk_dynamodb::{
types::{
AttributeDefinition, AttributeValue, BillingMode, KeySchemaElement, KeyType,
ScalarAttributeType, StreamSpecification, StreamViewType,
},
Client,
};
use aws_sdk_dynamodbstreams::types::Record;
use dynamo_subscriber::stream::ConsumerChannel;
use tokio::time::{sleep, Duration};
use ulid::Ulid;
const TABLE: &str = "People";
const PK: &str = "Id";
pub struct TestConfig {
table_name: String,
config: SdkConfig,
}
impl TestConfig {
pub fn table_name(&self) -> &str {
self.table_name.as_str()
}
pub fn aws_sdk_config(&self) -> &SdkConfig {
&self.config
}
}
pub async fn setup() -> TestConfig {
let creds = Credentials::from_keys(Ulid::new(), Ulid::new(), None);
let creds_provider = SharedCredentialsProvider::new(creds);
let retry = RetryConfig::standard().with_max_attempts(5);
let config = SdkConfig::builder()
.endpoint_url("http://localhost:8000")
.credentials_provider(creds_provider)
.retry_config(retry)
.behavior_version(BehaviorVersion::latest())
.region(Some(Region::from_static("us-east-1")))
.build();
create_table(&config).await;
TestConfig {
table_name: TABLE.to_string(),
config,
}
}
pub async fn put_item(pk: &str, config: &SdkConfig) {
Client::new(config)
.put_item()
.table_name(TABLE)
.item(PK, AttributeValue::S(pk.into()))
.send()
.await
.unwrap();
}
pub async fn teardown(config: &SdkConfig) {
drop_table(config).await;
}
pub async fn wait_until_initialized(channel: &mut ConsumerChannel) {
let mut res = channel.initialized();
while !res {
sleep(Duration::from_millis(100)).await;
res = channel.initialized();
}
}
pub fn pk(record: &Record) -> String {
record
.dynamodb()
.and_then(|dynamodb| dynamodb.keys())
.unwrap()
.get(PK)
.unwrap()
.as_s()
.unwrap()
.to_string()
}
async fn create_table(config: &SdkConfig) {
Client::new(config)
.create_table()
.attribute_definitions(
AttributeDefinition::builder()
.attribute_name(PK)
.attribute_type(ScalarAttributeType::S)
.build()
.unwrap(),
)
.table_name(TABLE)
.key_schema(
KeySchemaElement::builder()
.attribute_name(PK)
.key_type(KeyType::Hash)
.build()
.unwrap(),
)
.billing_mode(BillingMode::PayPerRequest)
.stream_specification(
StreamSpecification::builder()
.stream_enabled(true)
.stream_view_type(StreamViewType::NewImage)
.build()
.unwrap(),
)
.send()
.await
.unwrap();
}
async fn drop_table(config: &SdkConfig) {
Client::new(config)
.delete_table()
.table_name(TABLE)
.send()
.await
.unwrap();
}