dynamo-subscriber 0.1.1

Subscribe DynamoDB Streams as tokio-stream
Documentation
use aws_config::{retry::RetryConfig, BehaviorVersion, Region, SdkConfig};
use aws_credential_types::{provider::SharedCredentialsProvider, Credentials};
use aws_sdk_dynamodb::{
    types::{
        AttributeDefinition, AttributeValue, BillingMode, KeySchemaElement, KeyType,
        ScalarAttributeType, StreamSpecification, StreamViewType,
    },
    Client,
};
use aws_sdk_dynamodbstreams::types::Record;
use dynamo_subscriber::stream::ConsumerChannel;
use tokio::time::{sleep, Duration};
use ulid::Ulid;

const TABLE: &str = "People";
const PK: &str = "Id";

pub struct TestConfig {
    table_name: String,
    config: SdkConfig,
}

impl TestConfig {
    pub fn table_name(&self) -> &str {
        self.table_name.as_str()
    }

    pub fn aws_sdk_config(&self) -> &SdkConfig {
        &self.config
    }
}

pub async fn setup() -> TestConfig {
    let creds = Credentials::from_keys(Ulid::new(), Ulid::new(), None);
    let creds_provider = SharedCredentialsProvider::new(creds);

    let retry = RetryConfig::standard().with_max_attempts(5);

    let config = SdkConfig::builder()
        .endpoint_url("http://localhost:8000")
        .credentials_provider(creds_provider)
        .retry_config(retry)
        .behavior_version(BehaviorVersion::latest())
        .region(Some(Region::from_static("us-east-1")))
        .build();

    create_table(&config).await;

    TestConfig {
        table_name: TABLE.to_string(),
        config,
    }
}

pub async fn put_item(pk: &str, config: &SdkConfig) {
    Client::new(config)
        .put_item()
        .table_name(TABLE)
        .item(PK, AttributeValue::S(pk.into()))
        .send()
        .await
        .unwrap();
}

pub async fn teardown(config: &SdkConfig) {
    drop_table(config).await;
}

pub async fn wait_until_initialized(channel: &mut ConsumerChannel) {
    let mut res = channel.initialized();

    while !res {
        sleep(Duration::from_millis(100)).await;
        res = channel.initialized();
    }
}

pub fn pk(record: &Record) -> String {
    record
        .dynamodb()
        .and_then(|dynamodb| dynamodb.keys())
        .unwrap()
        .get(PK)
        .unwrap()
        .as_s()
        .unwrap()
        .to_string()
}

async fn create_table(config: &SdkConfig) {
    Client::new(config)
        .create_table()
        .attribute_definitions(
            AttributeDefinition::builder()
                .attribute_name(PK)
                .attribute_type(ScalarAttributeType::S)
                .build()
                .unwrap(),
        )
        .table_name(TABLE)
        .key_schema(
            KeySchemaElement::builder()
                .attribute_name(PK)
                .key_type(KeyType::Hash)
                .build()
                .unwrap(),
        )
        .billing_mode(BillingMode::PayPerRequest)
        .stream_specification(
            StreamSpecification::builder()
                .stream_enabled(true)
                .stream_view_type(StreamViewType::NewImage)
                .build()
                .unwrap(),
        )
        .send()
        .await
        .unwrap();
}

async fn drop_table(config: &SdkConfig) {
    Client::new(config)
        .delete_table()
        .table_name(TABLE)
        .send()
        .await
        .unwrap();
}