dynamodb_tools/
connector.rs1use crate::TableConfig;
2use crate::error::{DynamoToolsError, Result};
3use aws_config::meta::region::RegionProviderChain;
4use aws_config::{BehaviorVersion, Region};
5use aws_sdk_dynamodb::config::Credentials;
6use aws_sdk_dynamodb::types::{AttributeValue, PutRequest, WriteRequest};
7use aws_sdk_dynamodb::{Client, operation::create_table::CreateTableInput};
8use serde_json::Value;
9use std::{collections::HashMap, fs, path::Path};
10#[cfg(feature = "test_utils")]
11use tokio::runtime::Runtime;
12
13#[derive(Debug)]
23pub struct DynamodbConnector {
24 client: Option<Client>,
25 created_tables: HashMap<String, String>,
27 #[cfg(feature = "test_utils")]
29 config: TableConfig,
30}
31
32impl DynamodbConnector {
33 pub async fn load(config_path: impl AsRef<Path>) -> Result<Self> {
42 let config = TableConfig::load_from_file(config_path)?;
43 DynamodbConnector::try_new(config).await
44 }
45
46 pub fn client(&self) -> Result<&Client> {
53 self.client
54 .as_ref()
55 .ok_or_else(|| DynamoToolsError::Internal("Client instance is missing".to_string()))
56 }
57
58 pub fn get_created_table_name(&self, base_name: &str) -> Option<&str> {
63 self.created_tables.get(base_name).map(|s| s.as_str())
64 }
65
66 pub fn get_all_created_table_names(&self) -> &HashMap<String, String> {
69 &self.created_tables
70 }
71
72 pub async fn try_new(config: TableConfig) -> Result<Self> {
84 let endpoint = config.endpoint.clone();
85 #[cfg(feature = "test_utils")]
87 let connector_config = config.clone();
88
89 let base_sdk_config_builder = aws_config::defaults(BehaviorVersion::latest()).region(
90 RegionProviderChain::first_try(Region::new(config.region.clone()))
91 .or_default_provider(),
92 );
93 let loaded_sdk_config = base_sdk_config_builder.load().await;
94 let builder = aws_sdk_dynamodb::config::Builder::from(&loaded_sdk_config);
95 let dynamodb_config = if let Some(url) = endpoint.as_ref() {
96 builder
97 .endpoint_url(url)
98 .credentials_provider(Credentials::for_tests())
99 .build()
100 } else {
101 builder.build()
102 };
103 let client = Client::from_conf(dynamodb_config);
104
105 let mut created_tables = HashMap::new();
106
107 for table_info in config.tables {
108 let base_table_name = table_info.table_name.clone();
109 let seed_file = table_info.seed_data_file.clone(); let mut input = CreateTableInput::try_from(table_info)?;
111
112 let unique_table_name = format!("{}-{}", base_table_name, xid::new());
113 input.table_name = Some(unique_table_name.clone());
114
115 let create_table_builder = client
117 .create_table()
118 .table_name(&unique_table_name)
119 .set_key_schema(input.key_schema)
120 .set_attribute_definitions(input.attribute_definitions)
121 .set_global_secondary_indexes(input.global_secondary_indexes)
122 .set_local_secondary_indexes(input.local_secondary_indexes);
123
124 let create_table_builder = match input.provisioned_throughput {
125 Some(pt) => create_table_builder.provisioned_throughput(pt),
126 None => create_table_builder.billing_mode(input.billing_mode.ok_or_else(|| {
127 DynamoToolsError::MissingField(format!(
128 "Billing mode missing for table '{}' with no throughput",
129 base_table_name
130 ))
131 })?),
132 };
133
134 create_table_builder
136 .send()
137 .await
138 .map_err(DynamoToolsError::TableCreation)?; created_tables.insert(base_table_name.clone(), unique_table_name.clone());
141
142 if let Some(file_path) = seed_file {
144 println!(
145 "[INFO] Seeding data for table '{}' from file '{}'",
146 unique_table_name, file_path
147 );
148 let content = fs::read_to_string(&file_path)
150 .map_err(|e| DynamoToolsError::SeedFileRead(file_path.clone(), e))?;
151
152 let items_json: Vec<Value> = serde_json::from_str(&content)
154 .map_err(|e| DynamoToolsError::SeedJsonParse(file_path.clone(), e))?;
155
156 let mut write_requests = Vec::new();
158 for item_value in items_json {
159 let item_map: HashMap<String, AttributeValue> =
160 serde_dynamo::to_item(item_value)?;
161 let put_request = PutRequest::builder()
162 .set_item(Some(item_map))
163 .build()
164 .map_err(|e| {
165 DynamoToolsError::Internal(format!("Failed to build PutRequest: {}", e))
166 })?;
167 write_requests.push(WriteRequest::builder().put_request(put_request).build());
168 }
169
170 for chunk in write_requests.chunks(25) {
172 let request_items =
173 HashMap::from([(unique_table_name.clone(), chunk.to_vec())]);
174 client
175 .batch_write_item()
176 .set_request_items(Some(request_items))
177 .send()
178 .await
179 .map_err(|e| {
180 DynamoToolsError::SeedBatchWrite(unique_table_name.clone(), e)
181 })?;
182 println!(
183 "[INFO] Wrote batch of {} items to table '{}'",
184 chunk.len(),
185 unique_table_name
186 );
187 }
188 }
189 }
191
192 Ok(Self {
193 client: Some(client),
194 created_tables,
195 #[cfg(feature = "test_utils")]
196 config: connector_config,
197 })
198 }
199}
200
201#[cfg(feature = "test_utils")]
206impl Drop for DynamodbConnector {
207 fn drop(&mut self) {
208 if !self.config.delete_on_exit || self.config.endpoint.is_none() {
210 println!(
211 "[INFO] Skipping delete on drop (delete_on_exit: {}, endpoint: {:?})",
212 self.config.delete_on_exit, self.config.endpoint
213 );
214 return;
215 }
216
217 if let Some(client) = self.client.take() {
218 let tables_to_delete = self.created_tables.clone();
220 println!(
221 "[INFO] Drop: Attempting to delete tables: {:?}",
222 tables_to_delete.values()
223 );
224
225 for (_base_name, unique_name) in tables_to_delete {
226 let client_clone = client.clone(); std::thread::spawn(move || {
228 let rt = match Runtime::new() {
229 Ok(rt) => rt,
230 Err(e) => {
231 eprintln!(
232 "[ERROR] Failed to create Tokio runtime for table deletion: {}",
233 e
234 );
235 return;
236 }
237 };
238
239 rt.block_on(async move {
240 match client_clone
241 .delete_table()
242 .table_name(&unique_name)
243 .send()
244 .await
245 {
246 Ok(_) => println!("[INFO] Deleted table: {}", unique_name),
247 Err(e) => {
248 eprintln!("[ERROR] Failed to delete table '{}': {}", unique_name, e)
249 }
250 }
251 });
252 });
253 }
254 }
255 }
256}