dynamodb_tools/
connector.rs

1use crate::TableConfig;
2use anyhow::Result;
3use aws_config::BehaviorVersion;
4use aws_sdk_dynamodb::{operation::create_table::CreateTableInput, Client};
5use std::path::Path;
6#[cfg(test)]
7use tokio::runtime::Runtime;
8
9#[derive(Debug, Clone)]
10pub struct DynamodbConnector {
11    client: Option<Client>,
12    table_name: String,
13    #[cfg(test)]
14    delete_on_exit: bool,
15}
16
17impl DynamodbConnector {
18    /// Load dynamodb connector from configuration file
19    pub async fn load(config: impl AsRef<Path>) -> Result<Self> {
20        let config = TableConfig::load_from_file(config)?;
21        DynamodbConnector::try_new(config).await
22    }
23
24    pub fn client(&self) -> &Client {
25        self.client.as_ref().expect("client should exists")
26    }
27
28    pub fn table_name(&self) -> &str {
29        self.table_name.as_str()
30    }
31
32    /// create a new local client
33    pub async fn try_new(table_config: TableConfig) -> Result<Self> {
34        let local_endpoint = table_config.local_endpoint.clone();
35        #[cfg(test)]
36        let delete_on_exit = if local_endpoint.is_some() {
37            table_config.delete_on_exit
38        } else {
39            false
40        };
41        let config = aws_config::load_from_env().await;
42
43        let config = aws_sdk_dynamodb::Config::builder()
44            .region(config.region().cloned())
45            .behavior_version(
46                config
47                    .behavior_version()
48                    .unwrap_or(BehaviorVersion::latest()),
49            )
50            .credentials_provider(
51                config
52                    .credentials_provider()
53                    .expect("cred should exists")
54                    .clone(),
55            );
56
57        let config = if let Some(url) = local_endpoint.as_ref() {
58            config.endpoint_url(url)
59        } else {
60            config
61        };
62        let client = Client::from_conf(config.build());
63
64        let table_name = if let Some(info) = table_config.info {
65            let mut input = CreateTableInput::try_from(info)?;
66            let table_name = format!(
67                "{}-{}",
68                input.table_name.expect("table name must exist"),
69                xid::new()
70            );
71            input.table_name = Some(table_name.clone());
72
73            let mut req = client
74                .create_table()
75                .table_name(&table_name)
76                .set_key_schema(input.key_schema)
77                .set_attribute_definitions(input.attribute_definitions)
78                .set_global_secondary_indexes(input.global_secondary_indexes)
79                .set_local_secondary_indexes(input.local_secondary_indexes);
80            match input.provisioned_throughput {
81                Some(pt) => {
82                    req = req.provisioned_throughput(pt);
83                }
84                None => {
85                    req = req.billing_mode(input.billing_mode.expect("billing mode should exist"));
86                }
87            }
88            req.send().await?;
89            table_name
90        } else {
91            table_config.table_name
92        };
93
94        Ok(Self {
95            client: Some(client),
96            table_name,
97            #[cfg(test)]
98            delete_on_exit,
99        })
100    }
101}
102
103#[cfg(test)]
104impl Drop for DynamodbConnector {
105    fn drop(&mut self) {
106        let client = self.client.take().expect("client");
107        let table_name = self.table_name.clone();
108        #[cfg(test)]
109        if !self.delete_on_exit {
110            return;
111        }
112        std::thread::spawn(move || {
113            let rt = Runtime::new().expect("runtime");
114            rt.block_on(async move {
115                if let Err(e) = client.delete_table().table_name(&table_name).send().await {
116                    println!("failed to delete table: {:?}", e);
117                }
118            });
119        });
120    }
121}
122
123#[cfg(test)]
124mod tests {
125    use super::*;
126
127    #[tokio::test]
128    async fn dev_config_should_work() {
129        let config = TableConfig::load_from_file("fixtures/dev.yml").unwrap();
130        let connector = DynamodbConnector::try_new(config).await.unwrap();
131        let table_name = connector.table_name();
132        let resp = connector
133            .client()
134            .describe_table()
135            .table_name(table_name)
136            .send()
137            .await
138            .unwrap();
139        assert_eq!(resp.table.and_then(|v| v.table_name).unwrap(), table_name);
140    }
141
142    #[tokio::test]
143    async fn prod_config_should_work() {
144        let config = TableConfig::load_from_file("fixtures/prod.yml").unwrap();
145        let connector = DynamodbConnector::try_new(config).await.unwrap();
146        let table_name = connector.table_name();
147        assert_eq!(table_name, "users");
148        assert!(!connector.delete_on_exit);
149    }
150}