1use crate::actions::helpers;
2use crate::errors::{CancellationReason, DynoxideError, Result};
3use crate::storage_backend::StorageBackend;
4use crate::types::{self, AttributeValue, Item};
5use serde::{Deserialize, Serialize};
6use std::collections::{HashMap, HashSet};
7
8#[derive(Debug, Clone, Default, Deserialize, Serialize)]
9pub struct TransactWriteItemsRequest {
10 #[serde(rename = "TransactItems")]
11 pub transact_items: Vec<TransactWriteItem>,
12 #[serde(rename = "ClientRequestToken", default)]
13 pub client_request_token: Option<String>,
14 #[serde(rename = "ReturnConsumedCapacity", default)]
15 pub return_consumed_capacity: Option<String>,
16 #[serde(rename = "ReturnItemCollectionMetrics", default)]
17 pub return_item_collection_metrics: Option<String>,
18}
19
20#[derive(Debug, Clone, Default, Deserialize, Serialize)]
21pub struct TransactWriteItem {
22 #[serde(rename = "Put", default)]
23 pub put: Option<TransactPut>,
24 #[serde(rename = "Update", default)]
25 pub update: Option<TransactUpdate>,
26 #[serde(rename = "Delete", default)]
27 pub delete: Option<TransactDelete>,
28 #[serde(rename = "ConditionCheck", default)]
29 pub condition_check: Option<TransactConditionCheck>,
30}
31
32#[derive(Debug, Clone, Default, Deserialize, Serialize)]
33pub struct TransactPut {
34 #[serde(rename = "TableName")]
35 pub table_name: String,
36 #[serde(rename = "Item")]
37 pub item: Item,
38 #[serde(rename = "ConditionExpression", default)]
39 pub condition_expression: Option<String>,
40 #[serde(rename = "ExpressionAttributeNames", default)]
41 pub expression_attribute_names: Option<HashMap<String, String>>,
42 #[serde(rename = "ExpressionAttributeValues", default)]
43 pub expression_attribute_values: Option<HashMap<String, AttributeValue>>,
44 #[serde(rename = "ReturnValuesOnConditionCheckFailure", default)]
45 pub return_values_on_condition_check_failure: Option<String>,
46}
47
48#[derive(Debug, Clone, Default, Deserialize, Serialize)]
49pub struct TransactUpdate {
50 #[serde(rename = "TableName")]
51 pub table_name: String,
52 #[serde(rename = "Key")]
53 pub key: HashMap<String, AttributeValue>,
54 #[serde(rename = "UpdateExpression")]
55 pub update_expression: String,
56 #[serde(rename = "ConditionExpression", default)]
57 pub condition_expression: Option<String>,
58 #[serde(rename = "ExpressionAttributeNames", default)]
59 pub expression_attribute_names: Option<HashMap<String, String>>,
60 #[serde(rename = "ExpressionAttributeValues", default)]
61 pub expression_attribute_values: Option<HashMap<String, AttributeValue>>,
62 #[serde(rename = "ReturnValuesOnConditionCheckFailure", default)]
63 pub return_values_on_condition_check_failure: Option<String>,
64}
65
66#[derive(Debug, Clone, Default, Deserialize, Serialize)]
67pub struct TransactDelete {
68 #[serde(rename = "TableName")]
69 pub table_name: String,
70 #[serde(rename = "Key")]
71 pub key: HashMap<String, AttributeValue>,
72 #[serde(rename = "ConditionExpression", default)]
73 pub condition_expression: Option<String>,
74 #[serde(rename = "ExpressionAttributeNames", default)]
75 pub expression_attribute_names: Option<HashMap<String, String>>,
76 #[serde(rename = "ExpressionAttributeValues", default)]
77 pub expression_attribute_values: Option<HashMap<String, AttributeValue>>,
78 #[serde(rename = "ReturnValuesOnConditionCheckFailure", default)]
79 pub return_values_on_condition_check_failure: Option<String>,
80}
81
82#[derive(Debug, Clone, Default, Deserialize, Serialize)]
83pub struct TransactConditionCheck {
84 #[serde(rename = "TableName")]
85 pub table_name: String,
86 #[serde(rename = "Key")]
87 pub key: HashMap<String, AttributeValue>,
88 #[serde(rename = "ConditionExpression")]
89 pub condition_expression: String,
90 #[serde(rename = "ExpressionAttributeNames", default)]
91 pub expression_attribute_names: Option<HashMap<String, String>>,
92 #[serde(rename = "ExpressionAttributeValues", default)]
93 pub expression_attribute_values: Option<HashMap<String, AttributeValue>>,
94 #[serde(rename = "ReturnValuesOnConditionCheckFailure", default)]
95 pub return_values_on_condition_check_failure: Option<String>,
96}
97
98#[derive(Debug, Clone, Default, Serialize)]
99pub struct TransactWriteItemsResponse {
100 #[serde(rename = "ConsumedCapacity", skip_serializing_if = "Option::is_none")]
101 pub consumed_capacity: Option<Vec<crate::types::ConsumedCapacity>>,
102 #[serde(
105 rename = "ItemCollectionMetrics",
106 skip_serializing_if = "Option::is_none"
107 )]
108 pub item_collection_metrics: Option<HashMap<String, Vec<crate::types::ItemCollectionMetrics>>>,
109}
110
111pub async fn execute<S: StorageBackend>(
112 storage: &S,
113 request: TransactWriteItemsRequest,
114) -> Result<TransactWriteItemsResponse> {
115 let items = &request.transact_items;
116
117 if items.is_empty() {
119 return Err(DynoxideError::ValidationException(
120 "1 validation error detected: Value '[]' at 'transactItems' failed to satisfy constraint: Member must have length greater than or equal to 1".to_string(),
121 ));
122 }
123
124 if items.len() > 100 {
130 let dump = format!("{items:?}");
131 return Err(DynoxideError::ValidationException(format!(
132 "1 validation error detected: Value '[{dump}]' at 'transactItems' failed to satisfy constraint: Member must have length less than or equal to 100"
133 )));
134 }
135
136 let mut seen_targets = HashSet::new();
138 for item in items {
139 let target = get_item_target(storage, item).await?;
140 if !seen_targets.insert(target) {
141 return Err(DynoxideError::ValidationException(
142 "Transaction request cannot include multiple operations on one item".to_string(),
143 ));
144 }
145 }
146
147 let total_size: usize = items.iter().map(|i| get_action_table_and_size(i).1).sum();
149 if total_size > 4 * 1024 * 1024 {
150 return Err(DynoxideError::ValidationException(
151 "Collection size of items exceeded, which can also be caused by the aggregate size of the items in the transaction exceeding the 4MB limit".to_string(),
152 ));
153 }
154
155 helpers::with_write_transaction(storage, execute_within_transaction(storage, items)).await?;
157
158 let consumed_capacity = if matches!(
160 request.return_consumed_capacity.as_deref(),
161 Some("TOTAL") | Some("INDEXES")
162 ) {
163 let mut table_units: HashMap<String, f64> = HashMap::new();
168 for item in items {
169 let (table, size) = get_action_table_and_size(item);
170 *table_units.entry(table).or_default() += crate::types::TRANSACTIONAL_CAPACITY_FACTOR
171 * crate::types::write_capacity_units(size);
172 }
173 let caps: Vec<_> = table_units
174 .iter()
175 .filter_map(|(table, &units)| {
176 crate::types::transactional_write_capacity(
177 table,
178 units,
179 &request.return_consumed_capacity,
180 )
181 })
182 .collect();
183 Some(caps)
184 } else {
185 None
186 };
187 Ok(TransactWriteItemsResponse {
188 consumed_capacity,
189 item_collection_metrics: None,
190 })
191}
192
193async fn execute_within_transaction<S: StorageBackend>(
194 storage: &S,
195 items: &[TransactWriteItem],
196) -> Result<()> {
197 let mut cancellation_reasons: Vec<CancellationReason> = Vec::with_capacity(items.len());
198 let mut has_failure = false;
199
200 for item in items {
201 let reason = execute_single_action(storage, item).await;
202 match reason {
203 Ok(()) => {
204 cancellation_reasons.push(CancellationReason {
205 code: "None".to_string(),
206 message: None,
207 item: None,
208 });
209 }
210 Err(e) => {
211 has_failure = true;
212 let message = Some(e.to_string());
213 let (code, item) = match e {
214 DynoxideError::ConditionalCheckFailedException(_, item) => {
215 ("ConditionalCheckFailed".to_string(), item)
216 }
217 DynoxideError::ValidationException(_) => ("ValidationError".to_string(), None),
218 _ => ("InternalError".to_string(), None),
219 };
220 cancellation_reasons.push(CancellationReason {
221 code,
222 message,
223 item,
224 });
225 }
226 }
227 }
228
229 if has_failure {
230 let codes: Vec<&str> = cancellation_reasons
231 .iter()
232 .map(|r| r.code.as_str())
233 .collect();
234 let message = format!(
235 "Transaction cancelled, please refer cancellation reasons for specific reasons [{}]",
236 codes.join(", ")
237 );
238 return Err(DynoxideError::TransactionCanceledException(
239 message,
240 cancellation_reasons,
241 ));
242 }
243
244 Ok(())
245}
246
247async fn execute_single_action<S: StorageBackend>(
248 storage: &S,
249 item: &TransactWriteItem,
250) -> Result<()> {
251 if let Some(ref put) = item.put {
252 execute_put(storage, put).await
253 } else if let Some(ref update) = item.update {
254 execute_update(storage, update).await
255 } else if let Some(ref delete) = item.delete {
256 execute_delete(storage, delete).await
257 } else if let Some(ref check) = item.condition_check {
258 execute_condition_check(storage, check).await
259 } else {
260 Err(DynoxideError::ValidationException(
261 "TransactItem must contain exactly one of Put, Update, Delete, or ConditionCheck"
262 .to_string(),
263 ))
264 }
265}
266
267async fn execute_put<S: StorageBackend>(storage: &S, put: &TransactPut) -> Result<()> {
268 crate::validation::validate_table_name(&put.table_name)?;
269 let meta = helpers::require_table_for_item_op(storage, &put.table_name).await?;
270 let key_schema = helpers::parse_key_schema(&meta)?;
271
272 helpers::validate_item_keys(&put.item, &key_schema, &meta)?;
273 crate::validation::validate_item_attribute_values(&put.item)?;
274
275 let mut item = put.item.clone();
277 crate::validation::normalize_item_sets(&mut item);
278
279 let size = types::item_size(&item);
280 if size > types::MAX_ITEM_SIZE {
281 return Err(DynoxideError::ValidationException(
282 "Item size has exceeded the maximum allowed size".to_string(),
283 ));
284 }
285
286 let (pk, sk) = helpers::extract_key_strings(&item, &key_schema)?;
288
289 let tracker = crate::expressions::TrackedExpressionAttributes::new(
290 &put.expression_attribute_names,
291 &put.expression_attribute_values,
292 );
293
294 if let Some(ref cond_expr) = put.condition_expression {
296 if let Ok(parsed) = crate::expressions::condition::parse(cond_expr) {
297 tracker.track_condition_expr(&parsed);
298 }
299 }
300
301 if let Some(ref cond_expr) = put.condition_expression {
303 let existing_json = storage.get_item(&put.table_name, &pk, &sk).await?;
304 let existing_item: Item = existing_json
305 .as_ref()
306 .and_then(|j| serde_json::from_str(j).ok())
307 .unwrap_or_default();
308
309 let return_item = if put.return_values_on_condition_check_failure.as_deref()
310 == Some("ALL_OLD")
311 && !existing_item.is_empty()
312 {
313 Some(existing_item.clone())
314 } else {
315 None
316 };
317 check_condition_tracked(cond_expr, &existing_item, &tracker, return_item)?;
318 }
319
320 tracker.check_unused()?;
321
322 let item_json = serde_json::to_string(&item)
323 .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
324 let hash_prefix = item
325 .get(&key_schema.partition_key)
326 .map(crate::storage::compute_hash_prefix)
327 .unwrap_or_default();
328 let old_json = storage
329 .put_item_with_hash(&put.table_name, &pk, &sk, &item_json, size, &hash_prefix)
330 .await?;
331
332 let _ = super::gsi::maintain_gsis_after_write(
333 storage,
334 &put.table_name,
335 &meta,
336 &pk,
337 &sk,
338 &item,
339 &key_schema.partition_key,
340 key_schema.sort_key.as_deref(),
341 )
342 .await?;
343
344 super::lsi::maintain_lsis_after_write(
345 storage,
346 &put.table_name,
347 &meta,
348 &pk,
349 &sk,
350 &item,
351 &key_schema.partition_key,
352 key_schema.sort_key.as_deref(),
353 )
354 .await?;
355
356 let old_item: Option<Item> = old_json.and_then(|j| serde_json::from_str(&j).ok());
358 crate::streams::record_stream_event(storage, &meta, old_item.as_ref(), Some(&item)).await?;
359
360 Ok(())
361}
362
363async fn execute_update<S: StorageBackend>(storage: &S, update: &TransactUpdate) -> Result<()> {
364 crate::validation::validate_table_name(&update.table_name)?;
365 let meta = helpers::require_table_for_item_op(storage, &update.table_name).await?;
366 let key_schema = helpers::parse_key_schema(&meta)?;
367
368 helpers::validate_key_only(&update.key, &key_schema)?;
369 let (pk, sk) = helpers::extract_key_strings(&update.key, &key_schema)?;
371
372 let existing_json = storage.get_item(&update.table_name, &pk, &sk).await?;
373 let existing_item: Item = existing_json
374 .as_ref()
375 .and_then(|j| serde_json::from_str(j).ok())
376 .unwrap_or_default();
377
378 let tracker = crate::expressions::TrackedExpressionAttributes::new(
379 &update.expression_attribute_names,
380 &update.expression_attribute_values,
381 );
382
383 if let Some(ref cond_expr) = update.condition_expression {
385 if let Ok(parsed) = crate::expressions::condition::parse(cond_expr) {
386 tracker.track_condition_expr(&parsed);
387 }
388 }
389 if let Ok(parsed) = crate::expressions::update::parse(&update.update_expression) {
390 tracker.track_update_expr(&parsed);
391 }
392
393 if let Some(ref cond_expr) = update.condition_expression {
397 let return_item = if update.return_values_on_condition_check_failure.as_deref()
398 == Some("ALL_OLD")
399 && existing_json.is_some()
400 {
401 Some(existing_item.clone())
402 } else {
403 None
404 };
405 check_condition_tracked(cond_expr, &existing_item, &tracker, return_item)?;
406 }
407
408 let mut item = existing_item;
411 if existing_json.is_none() {
412 for (k, v) in &update.key {
413 item.insert(k.clone(), v.clone());
414 }
415 }
416
417 let parsed = crate::expressions::update::parse(&update.update_expression)
419 .map_err(DynoxideError::ValidationException)?;
420 crate::expressions::update::apply(&mut item, &parsed, &tracker)
421 .map_err(DynoxideError::ValidationException)?;
422
423 tracker.check_unused()?;
424
425 crate::validation::validate_item_attribute_values(&item)?;
427 crate::validation::normalize_item_sets(&mut item);
428
429 let size = types::item_size(&item);
430 if size > types::MAX_ITEM_SIZE {
431 return Err(DynoxideError::ValidationException(
432 "Item size has exceeded the maximum allowed size".to_string(),
433 ));
434 }
435
436 let old_for_stream = existing_json.clone();
438
439 let item_json = serde_json::to_string(&item)
440 .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
441 let hash_prefix = update
442 .key
443 .get(&key_schema.partition_key)
444 .map(crate::storage::compute_hash_prefix)
445 .unwrap_or_default();
446 storage
447 .put_item_with_hash(&update.table_name, &pk, &sk, &item_json, size, &hash_prefix)
448 .await?;
449
450 let _ = super::gsi::maintain_gsis_after_write(
451 storage,
452 &update.table_name,
453 &meta,
454 &pk,
455 &sk,
456 &item,
457 &key_schema.partition_key,
458 key_schema.sort_key.as_deref(),
459 )
460 .await?;
461
462 super::lsi::maintain_lsis_after_write(
463 storage,
464 &update.table_name,
465 &meta,
466 &pk,
467 &sk,
468 &item,
469 &key_schema.partition_key,
470 key_schema.sort_key.as_deref(),
471 )
472 .await?;
473
474 let old_item: Option<Item> = old_for_stream.and_then(|j| serde_json::from_str(&j).ok());
476 crate::streams::record_stream_event(storage, &meta, old_item.as_ref(), Some(&item)).await?;
477
478 Ok(())
479}
480
481async fn execute_delete<S: StorageBackend>(storage: &S, delete: &TransactDelete) -> Result<()> {
482 crate::validation::validate_table_name(&delete.table_name)?;
483 let meta = helpers::require_table_for_item_op(storage, &delete.table_name).await?;
484 let key_schema = helpers::parse_key_schema(&meta)?;
485
486 helpers::validate_key_only(&delete.key, &key_schema)?;
487 let (pk, sk) = helpers::extract_key_strings(&delete.key, &key_schema)?;
489
490 let tracker = crate::expressions::TrackedExpressionAttributes::new(
491 &delete.expression_attribute_names,
492 &delete.expression_attribute_values,
493 );
494
495 if let Some(ref cond_expr) = delete.condition_expression {
497 if let Ok(parsed) = crate::expressions::condition::parse(cond_expr) {
498 tracker.track_condition_expr(&parsed);
499 }
500 }
501
502 if let Some(ref cond_expr) = delete.condition_expression {
504 let existing_json = storage.get_item(&delete.table_name, &pk, &sk).await?;
505 let existing_item: Item = existing_json
506 .as_ref()
507 .and_then(|j| serde_json::from_str(j).ok())
508 .unwrap_or_default();
509
510 let return_item = if delete.return_values_on_condition_check_failure.as_deref()
511 == Some("ALL_OLD")
512 && !existing_item.is_empty()
513 {
514 Some(existing_item.clone())
515 } else {
516 None
517 };
518 check_condition_tracked(cond_expr, &existing_item, &tracker, return_item)?;
519 }
520
521 tracker.check_unused()?;
522
523 let old_json = storage.delete_item(&delete.table_name, &pk, &sk).await?;
524 let _ = super::gsi::maintain_gsis_after_delete(storage, &delete.table_name, &meta, &pk, &sk)
525 .await?;
526 super::lsi::maintain_lsis_after_delete(storage, &delete.table_name, &meta, &pk, &sk).await?;
527
528 let old_item: Option<Item> = old_json.and_then(|j| serde_json::from_str(&j).ok());
530 if old_item.is_some() {
531 crate::streams::record_stream_event(storage, &meta, old_item.as_ref(), None).await?;
532 }
533
534 Ok(())
535}
536
537async fn execute_condition_check<S: StorageBackend>(
538 storage: &S,
539 check: &TransactConditionCheck,
540) -> Result<()> {
541 crate::validation::validate_table_name(&check.table_name)?;
542 let meta = helpers::require_table_for_item_op(storage, &check.table_name).await?;
543 let key_schema = helpers::parse_key_schema(&meta)?;
544
545 helpers::validate_key_only(&check.key, &key_schema)?;
546 let (pk, sk) = helpers::extract_key_strings(&check.key, &key_schema)?;
548
549 let existing_json = storage.get_item(&check.table_name, &pk, &sk).await?;
550 let existing_item: Item = existing_json
551 .as_ref()
552 .and_then(|j| serde_json::from_str(j).ok())
553 .unwrap_or_default();
554
555 let tracker = crate::expressions::TrackedExpressionAttributes::new(
556 &check.expression_attribute_names,
557 &check.expression_attribute_values,
558 );
559
560 if let Ok(parsed) = crate::expressions::condition::parse(&check.condition_expression) {
562 tracker.track_condition_expr(&parsed);
563 }
564
565 let return_item = if check.return_values_on_condition_check_failure.as_deref()
566 == Some("ALL_OLD")
567 && !existing_item.is_empty()
568 {
569 Some(existing_item.clone())
570 } else {
571 None
572 };
573 check_condition_tracked(
574 &check.condition_expression,
575 &existing_item,
576 &tracker,
577 return_item,
578 )?;
579
580 tracker.check_unused()?;
581 Ok(())
582}
583
584fn check_condition_tracked(
585 expression: &str,
586 item: &Item,
587 tracker: &crate::expressions::TrackedExpressionAttributes,
588 return_item_on_failure: Option<Item>,
589) -> Result<()> {
590 let parsed = crate::expressions::condition::parse(expression)
591 .map_err(DynoxideError::ValidationException)?;
592 let result = crate::expressions::condition::evaluate(&parsed, item, tracker)
593 .map_err(DynoxideError::ValidationException)?;
594 if !result {
595 return Err(DynoxideError::ConditionalCheckFailedException(
596 "The conditional request failed".to_string(),
597 return_item_on_failure,
598 ));
599 }
600 Ok(())
601}
602
603fn get_action_table_and_size(item: &TransactWriteItem) -> (String, usize) {
609 if let Some(ref put) = item.put {
610 (put.table_name.clone(), types::item_size(&put.item))
611 } else if let Some(ref update) = item.update {
612 let key_size = types::item_size(&update.key);
613 let eav_size = update
614 .expression_attribute_values
615 .as_ref()
616 .map(|vals| vals.values().map(|v| v.size()).sum::<usize>())
617 .unwrap_or(0);
618 (update.table_name.clone(), key_size + eav_size)
619 } else if let Some(ref delete) = item.delete {
620 (delete.table_name.clone(), types::item_size(&delete.key))
621 } else if let Some(ref check) = item.condition_check {
622 (check.table_name.clone(), types::item_size(&check.key))
623 } else {
624 (String::new(), 0)
625 }
626}
627
628async fn get_item_target<S: StorageBackend>(
630 storage: &S,
631 item: &TransactWriteItem,
632) -> Result<String> {
633 if let Some(ref put) = item.put {
634 crate::validation::validate_table_name(&put.table_name)?;
635 let meta = helpers::require_table_for_item_op(storage, &put.table_name).await?;
636 let key_schema = helpers::parse_key_schema(&meta)?;
637 let (pk, sk) = helpers::extract_key_strings(&put.item, &key_schema)?;
639 Ok(format!("{}#{}#{}", put.table_name, pk, sk))
640 } else if let Some(ref update) = item.update {
641 crate::validation::validate_table_name(&update.table_name)?;
642 let meta = helpers::require_table_for_item_op(storage, &update.table_name).await?;
643 let key_schema = helpers::parse_key_schema(&meta)?;
644 let (pk, sk) = helpers::extract_key_strings(&update.key, &key_schema)?;
646 Ok(format!("{}#{}#{}", update.table_name, pk, sk))
647 } else if let Some(ref delete) = item.delete {
648 crate::validation::validate_table_name(&delete.table_name)?;
649 let meta = helpers::require_table_for_item_op(storage, &delete.table_name).await?;
650 let key_schema = helpers::parse_key_schema(&meta)?;
651 let (pk, sk) = helpers::extract_key_strings(&delete.key, &key_schema)?;
653 Ok(format!("{}#{}#{}", delete.table_name, pk, sk))
654 } else if let Some(ref check) = item.condition_check {
655 crate::validation::validate_table_name(&check.table_name)?;
656 let meta = helpers::require_table_for_item_op(storage, &check.table_name).await?;
657 let key_schema = helpers::parse_key_schema(&meta)?;
658 let (pk, sk) = helpers::extract_key_strings(&check.key, &key_schema)?;
660 Ok(format!("{}#{}#{}", check.table_name, pk, sk))
661 } else {
662 Err(DynoxideError::ValidationException(
663 "TransactItem must contain exactly one action".to_string(),
664 ))
665 }
666}