dynamodb_tools/
connector.rs1use crate::TableConfig;
2use anyhow::Result;
3use aws_config::BehaviorVersion;
4use aws_sdk_dynamodb::{operation::create_table::CreateTableInput, Client};
5use std::path::Path;
6#[cfg(test)]
7use tokio::runtime::Runtime;
8
9#[derive(Debug, Clone)]
10pub struct DynamodbConnector {
11 client: Option<Client>,
12 table_name: String,
13 #[cfg(test)]
14 delete_on_exit: bool,
15}
16
17impl DynamodbConnector {
18 pub async fn load(config: impl AsRef<Path>) -> Result<Self> {
20 let config = TableConfig::load_from_file(config)?;
21 DynamodbConnector::try_new(config).await
22 }
23
24 pub fn client(&self) -> &Client {
25 self.client.as_ref().expect("client should exists")
26 }
27
28 pub fn table_name(&self) -> &str {
29 self.table_name.as_str()
30 }
31
32 pub async fn try_new(table_config: TableConfig) -> Result<Self> {
34 let local_endpoint = table_config.local_endpoint.clone();
35 #[cfg(test)]
36 let delete_on_exit = if local_endpoint.is_some() {
37 table_config.delete_on_exit
38 } else {
39 false
40 };
41 let config = aws_config::load_from_env().await;
42
43 let config = aws_sdk_dynamodb::Config::builder()
44 .region(config.region().cloned())
45 .behavior_version(
46 config
47 .behavior_version()
48 .unwrap_or(BehaviorVersion::latest()),
49 )
50 .credentials_provider(
51 config
52 .credentials_provider()
53 .expect("cred should exists")
54 .clone(),
55 );
56
57 let config = if let Some(url) = local_endpoint.as_ref() {
58 config.endpoint_url(url)
59 } else {
60 config
61 };
62 let client = Client::from_conf(config.build());
63
64 let table_name = if let Some(info) = table_config.info {
65 let mut input = CreateTableInput::try_from(info)?;
66 let table_name = format!(
67 "{}-{}",
68 input.table_name.expect("table name must exist"),
69 xid::new()
70 );
71 input.table_name = Some(table_name.clone());
72
73 let mut req = client
74 .create_table()
75 .table_name(&table_name)
76 .set_key_schema(input.key_schema)
77 .set_attribute_definitions(input.attribute_definitions)
78 .set_global_secondary_indexes(input.global_secondary_indexes)
79 .set_local_secondary_indexes(input.local_secondary_indexes);
80 match input.provisioned_throughput {
81 Some(pt) => {
82 req = req.provisioned_throughput(pt);
83 }
84 None => {
85 req = req.billing_mode(input.billing_mode.expect("billing mode should exist"));
86 }
87 }
88 req.send().await?;
89 table_name
90 } else {
91 table_config.table_name
92 };
93
94 Ok(Self {
95 client: Some(client),
96 table_name,
97 #[cfg(test)]
98 delete_on_exit,
99 })
100 }
101}
102
103#[cfg(test)]
104impl Drop for DynamodbConnector {
105 fn drop(&mut self) {
106 let client = self.client.take().expect("client");
107 let table_name = self.table_name.clone();
108 #[cfg(test)]
109 if !self.delete_on_exit {
110 return;
111 }
112 std::thread::spawn(move || {
113 let rt = Runtime::new().expect("runtime");
114 rt.block_on(async move {
115 if let Err(e) = client.delete_table().table_name(&table_name).send().await {
116 println!("failed to delete table: {:?}", e);
117 }
118 });
119 });
120 }
121}
122
123#[cfg(test)]
124mod tests {
125 use super::*;
126
127 #[tokio::test]
128 async fn dev_config_should_work() {
129 let config = TableConfig::load_from_file("fixtures/dev.yml").unwrap();
130 let connector = DynamodbConnector::try_new(config).await.unwrap();
131 let table_name = connector.table_name();
132 let resp = connector
133 .client()
134 .describe_table()
135 .table_name(table_name)
136 .send()
137 .await
138 .unwrap();
139 assert_eq!(resp.table.and_then(|v| v.table_name).unwrap(), table_name);
140 }
141
142 #[tokio::test]
143 async fn prod_config_should_work() {
144 let config = TableConfig::load_from_file("fixtures/prod.yml").unwrap();
145 let connector = DynamodbConnector::try_new(config).await.unwrap();
146 let table_name = connector.table_name();
147 assert_eq!(table_name, "users");
148 assert!(!connector.delete_on_exit);
149 }
150}