1use crate::actions::helpers;
2use crate::errors::{CancellationReason, DynoxideError, Result};
3use crate::storage_backend::StorageBackend;
4use crate::types::{self, AttributeValue, Item};
5use serde::{Deserialize, Serialize};
6use std::collections::{HashMap, HashSet};
7
8#[derive(Debug, Clone, Default, Deserialize, Serialize)]
9pub struct TransactWriteItemsRequest {
10 #[serde(rename = "TransactItems")]
11 pub transact_items: Vec<TransactWriteItem>,
12 #[serde(rename = "ClientRequestToken", default)]
13 pub client_request_token: Option<String>,
14 #[serde(rename = "ReturnConsumedCapacity", default)]
15 pub return_consumed_capacity: Option<String>,
16 #[serde(rename = "ReturnItemCollectionMetrics", default)]
17 pub return_item_collection_metrics: Option<String>,
18}
19
20#[derive(Debug, Clone, Default, Deserialize, Serialize)]
21pub struct TransactWriteItem {
22 #[serde(rename = "Put", default)]
23 pub put: Option<TransactPut>,
24 #[serde(rename = "Update", default)]
25 pub update: Option<TransactUpdate>,
26 #[serde(rename = "Delete", default)]
27 pub delete: Option<TransactDelete>,
28 #[serde(rename = "ConditionCheck", default)]
29 pub condition_check: Option<TransactConditionCheck>,
30}
31
32#[derive(Debug, Clone, Default, Deserialize, Serialize)]
33pub struct TransactPut {
34 #[serde(rename = "TableName")]
35 pub table_name: String,
36 #[serde(rename = "Item")]
37 pub item: Item,
38 #[serde(rename = "ConditionExpression", default)]
39 pub condition_expression: Option<String>,
40 #[serde(rename = "ExpressionAttributeNames", default)]
41 pub expression_attribute_names: Option<HashMap<String, String>>,
42 #[serde(rename = "ExpressionAttributeValues", default)]
43 pub expression_attribute_values: Option<HashMap<String, AttributeValue>>,
44 #[serde(rename = "ReturnValuesOnConditionCheckFailure", default)]
45 pub return_values_on_condition_check_failure: Option<String>,
46}
47
48#[derive(Debug, Clone, Default, Deserialize, Serialize)]
49pub struct TransactUpdate {
50 #[serde(rename = "TableName")]
51 pub table_name: String,
52 #[serde(rename = "Key")]
53 pub key: HashMap<String, AttributeValue>,
54 #[serde(rename = "UpdateExpression")]
55 pub update_expression: String,
56 #[serde(rename = "ConditionExpression", default)]
57 pub condition_expression: Option<String>,
58 #[serde(rename = "ExpressionAttributeNames", default)]
59 pub expression_attribute_names: Option<HashMap<String, String>>,
60 #[serde(rename = "ExpressionAttributeValues", default)]
61 pub expression_attribute_values: Option<HashMap<String, AttributeValue>>,
62 #[serde(rename = "ReturnValuesOnConditionCheckFailure", default)]
63 pub return_values_on_condition_check_failure: Option<String>,
64}
65
66#[derive(Debug, Clone, Default, Deserialize, Serialize)]
67pub struct TransactDelete {
68 #[serde(rename = "TableName")]
69 pub table_name: String,
70 #[serde(rename = "Key")]
71 pub key: HashMap<String, AttributeValue>,
72 #[serde(rename = "ConditionExpression", default)]
73 pub condition_expression: Option<String>,
74 #[serde(rename = "ExpressionAttributeNames", default)]
75 pub expression_attribute_names: Option<HashMap<String, String>>,
76 #[serde(rename = "ExpressionAttributeValues", default)]
77 pub expression_attribute_values: Option<HashMap<String, AttributeValue>>,
78 #[serde(rename = "ReturnValuesOnConditionCheckFailure", default)]
79 pub return_values_on_condition_check_failure: Option<String>,
80}
81
82#[derive(Debug, Clone, Default, Deserialize, Serialize)]
83pub struct TransactConditionCheck {
84 #[serde(rename = "TableName")]
85 pub table_name: String,
86 #[serde(rename = "Key")]
87 pub key: HashMap<String, AttributeValue>,
88 #[serde(rename = "ConditionExpression")]
89 pub condition_expression: String,
90 #[serde(rename = "ExpressionAttributeNames", default)]
91 pub expression_attribute_names: Option<HashMap<String, String>>,
92 #[serde(rename = "ExpressionAttributeValues", default)]
93 pub expression_attribute_values: Option<HashMap<String, AttributeValue>>,
94 #[serde(rename = "ReturnValuesOnConditionCheckFailure", default)]
95 pub return_values_on_condition_check_failure: Option<String>,
96}
97
98#[derive(Debug, Clone, Default, Serialize)]
99pub struct TransactWriteItemsResponse {
100 #[serde(rename = "ConsumedCapacity", skip_serializing_if = "Option::is_none")]
101 pub consumed_capacity: Option<Vec<crate::types::ConsumedCapacity>>,
102 #[serde(
105 rename = "ItemCollectionMetrics",
106 skip_serializing_if = "Option::is_none"
107 )]
108 pub item_collection_metrics: Option<HashMap<String, Vec<crate::types::ItemCollectionMetrics>>>,
109}
110
111pub async fn execute<S: StorageBackend>(
112 storage: &S,
113 request: TransactWriteItemsRequest,
114) -> Result<TransactWriteItemsResponse> {
115 let items = &request.transact_items;
116
117 if items.is_empty() {
119 return Err(DynoxideError::ValidationException(
120 "1 validation error detected: Value '[]' at 'transactItems' failed to satisfy constraint: Member must have length greater than or equal to 1".to_string(),
121 ));
122 }
123
124 if items.len() > 100 {
130 let dump = format!("{items:?}");
131 return Err(DynoxideError::ValidationException(format!(
132 "1 validation error detected: Value '[{dump}]' at 'transactItems' failed to satisfy constraint: Member must have length less than or equal to 100"
133 )));
134 }
135
136 let mut seen_targets = HashSet::new();
139 for item in items {
140 if let Some(target) = get_item_target(storage, item).await? {
141 if !seen_targets.insert(target) {
142 return Err(DynoxideError::ValidationException(
143 "Transaction request cannot include multiple operations on one item"
144 .to_string(),
145 ));
146 }
147 }
148 }
149
150 let total_size: usize = items.iter().map(|i| get_action_table_and_size(i).1).sum();
152 if total_size > 4 * 1024 * 1024 {
153 return Err(DynoxideError::ValidationException(
154 "Collection size of items exceeded, which can also be caused by the aggregate size of the items in the transaction exceeding the 4MB limit".to_string(),
155 ));
156 }
157
158 helpers::with_write_transaction(storage, execute_within_transaction(storage, items)).await?;
160
161 let consumed_capacity = if matches!(
163 request.return_consumed_capacity.as_deref(),
164 Some("TOTAL") | Some("INDEXES")
165 ) {
166 let mut table_units: HashMap<String, f64> = HashMap::new();
171 for item in items {
172 let (table, size) = get_action_table_and_size(item);
173 *table_units.entry(table).or_default() += crate::types::TRANSACTIONAL_CAPACITY_FACTOR
174 * crate::types::write_capacity_units(size);
175 }
176 let caps: Vec<_> = table_units
177 .iter()
178 .filter_map(|(table, &units)| {
179 crate::types::transactional_write_capacity(
180 table,
181 units,
182 &request.return_consumed_capacity,
183 )
184 })
185 .collect();
186 Some(caps)
187 } else {
188 None
189 };
190 Ok(TransactWriteItemsResponse {
191 consumed_capacity,
192 item_collection_metrics: None,
193 })
194}
195
196async fn execute_within_transaction<S: StorageBackend>(
197 storage: &S,
198 items: &[TransactWriteItem],
199) -> Result<()> {
200 let mut cancellation_reasons: Vec<CancellationReason> = Vec::with_capacity(items.len());
201 let mut has_failure = false;
202
203 for item in items {
204 let reason = execute_single_action(storage, item).await;
205 match reason {
206 Ok(()) => {
207 cancellation_reasons.push(CancellationReason {
208 code: "None".to_string(),
209 message: None,
210 item: None,
211 });
212 }
213 Err(e) => {
214 if matches!(e, DynoxideError::KeyEmptyValueValidation(_)) {
218 return Err(e);
219 }
220 has_failure = true;
221 let message = Some(e.to_string());
222 let (code, item) = match e {
223 DynoxideError::ConditionalCheckFailedException(_, item) => {
224 ("ConditionalCheckFailed".to_string(), item)
225 }
226 DynoxideError::ValidationException(_) => ("ValidationError".to_string(), None),
227 _ => ("InternalError".to_string(), None),
228 };
229 cancellation_reasons.push(CancellationReason {
230 code,
231 message,
232 item,
233 });
234 }
235 }
236 }
237
238 if has_failure {
239 let codes: Vec<&str> = cancellation_reasons
240 .iter()
241 .map(|r| r.code.as_str())
242 .collect();
243 let message = format!(
244 "Transaction cancelled, please refer cancellation reasons for specific reasons [{}]",
245 codes.join(", ")
246 );
247 return Err(DynoxideError::TransactionCanceledException(
248 message,
249 cancellation_reasons,
250 ));
251 }
252
253 Ok(())
254}
255
256async fn execute_single_action<S: StorageBackend>(
257 storage: &S,
258 item: &TransactWriteItem,
259) -> Result<()> {
260 if let Some(ref put) = item.put {
261 execute_put(storage, put).await
262 } else if let Some(ref update) = item.update {
263 execute_update(storage, update).await
264 } else if let Some(ref delete) = item.delete {
265 execute_delete(storage, delete).await
266 } else if let Some(ref check) = item.condition_check {
267 execute_condition_check(storage, check).await
268 } else {
269 Err(DynoxideError::ValidationException(
270 "TransactItem must contain exactly one of Put, Update, Delete, or ConditionCheck"
271 .to_string(),
272 ))
273 }
274}
275
276fn validate_eav_nesting(values: &Option<HashMap<String, AttributeValue>>) -> Result<()> {
280 if let Some(map) = values {
281 for value in map.values() {
282 crate::validation::validate_nesting_depth(value)?;
283 }
284 }
285 Ok(())
286}
287
288async fn execute_put<S: StorageBackend>(storage: &S, put: &TransactPut) -> Result<()> {
289 crate::validation::validate_table_name(&put.table_name)?;
290 let meta = helpers::require_table_for_item_op(storage, &put.table_name).await?;
291 let key_schema = helpers::parse_key_schema(&meta)?;
292
293 helpers::validate_item_keys(&put.item, &key_schema, &meta)?;
294 crate::validation::validate_item_attribute_values(&put.item)?;
295
296 let mut item = put.item.clone();
298 crate::validation::normalize_item_sets(&mut item);
299
300 let size = types::item_size(&item);
301 if size > types::MAX_ITEM_SIZE {
302 return Err(DynoxideError::ValidationException(
303 "Item size has exceeded the maximum allowed size".to_string(),
304 ));
305 }
306
307 let (pk, sk) = helpers::extract_key_strings(&item, &key_schema)?;
309
310 validate_eav_nesting(&put.expression_attribute_values)?;
311
312 let tracker = crate::expressions::TrackedExpressionAttributes::new(
313 &put.expression_attribute_names,
314 &put.expression_attribute_values,
315 );
316
317 if let Some(ref cond_expr) = put.condition_expression {
319 if let Ok(parsed) = crate::expressions::condition::parse(cond_expr) {
320 tracker.track_condition_expr(&parsed);
321 }
322 }
323
324 if let Some(ref cond_expr) = put.condition_expression {
326 let existing_json = storage.get_item(&put.table_name, &pk, &sk).await?;
327 let existing_item: Item = existing_json
328 .as_ref()
329 .and_then(|j| serde_json::from_str(j).ok())
330 .unwrap_or_default();
331
332 let return_item = if put.return_values_on_condition_check_failure.as_deref()
333 == Some("ALL_OLD")
334 && !existing_item.is_empty()
335 {
336 Some(existing_item.clone())
337 } else {
338 None
339 };
340 check_condition_tracked(cond_expr, &existing_item, &tracker, return_item)?;
341 }
342
343 tracker.check_unused()?;
344
345 let item_json = serde_json::to_string(&item)
346 .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
347 let hash_prefix = item
348 .get(&key_schema.partition_key)
349 .map(crate::storage::compute_hash_prefix)
350 .unwrap_or_default();
351 let old_json = storage
352 .put_item_with_hash(&put.table_name, &pk, &sk, &item_json, size, &hash_prefix)
353 .await?;
354
355 let _ = super::gsi::maintain_gsis_after_write(
356 storage,
357 &put.table_name,
358 &meta,
359 &pk,
360 &sk,
361 &item,
362 &key_schema.partition_key,
363 key_schema.sort_key.as_deref(),
364 )
365 .await?;
366
367 super::lsi::maintain_lsis_after_write(
368 storage,
369 &put.table_name,
370 &meta,
371 &pk,
372 &sk,
373 &item,
374 &key_schema.partition_key,
375 key_schema.sort_key.as_deref(),
376 )
377 .await?;
378
379 let old_item: Option<Item> = old_json.and_then(|j| serde_json::from_str(&j).ok());
381 crate::streams::record_stream_event(storage, &meta, old_item.as_ref(), Some(&item)).await?;
382
383 Ok(())
384}
385
386async fn execute_update<S: StorageBackend>(storage: &S, update: &TransactUpdate) -> Result<()> {
387 crate::validation::validate_table_name(&update.table_name)?;
388 let meta = helpers::require_table_for_item_op(storage, &update.table_name).await?;
389 let key_schema = helpers::parse_key_schema(&meta)?;
390
391 helpers::validate_key_only(&update.key, &key_schema)?;
392 let (pk, sk) = helpers::extract_key_strings(&update.key, &key_schema)?;
394
395 let existing_json = storage.get_item(&update.table_name, &pk, &sk).await?;
396 let existing_item: Item = existing_json
397 .as_ref()
398 .and_then(|j| serde_json::from_str(j).ok())
399 .unwrap_or_default();
400
401 validate_eav_nesting(&update.expression_attribute_values)?;
402
403 let tracker = crate::expressions::TrackedExpressionAttributes::new(
404 &update.expression_attribute_names,
405 &update.expression_attribute_values,
406 );
407
408 if let Some(ref cond_expr) = update.condition_expression {
410 if let Ok(parsed) = crate::expressions::condition::parse(cond_expr) {
411 tracker.track_condition_expr(&parsed);
412 }
413 }
414 if let Ok(parsed) = crate::expressions::update::parse(&update.update_expression) {
415 tracker.track_update_expr(&parsed);
416 }
417
418 if let Some(ref cond_expr) = update.condition_expression {
422 let return_item = if update.return_values_on_condition_check_failure.as_deref()
423 == Some("ALL_OLD")
424 && existing_json.is_some()
425 {
426 Some(existing_item.clone())
427 } else {
428 None
429 };
430 check_condition_tracked(cond_expr, &existing_item, &tracker, return_item)?;
431 }
432
433 let mut item = existing_item;
436 if existing_json.is_none() {
437 for (k, v) in &update.key {
438 item.insert(k.clone(), v.clone());
439 }
440 }
441 let before_item = item.clone();
442
443 let parsed = crate::expressions::update::parse(&update.update_expression)
445 .map_err(DynoxideError::ValidationException)?;
446 crate::expressions::update::apply(&mut item, &parsed, &tracker)
447 .map_err(DynoxideError::ValidationException)?;
448
449 tracker.check_unused()?;
450
451 crate::validation::validate_item_attribute_values(&item)?;
453 crate::validation::normalize_item_sets(&mut item);
454
455 helpers::validate_updated_index_keys(&before_item, &item, &meta)?;
457
458 let size = types::item_size(&item);
459 if size > types::MAX_ITEM_SIZE {
460 return Err(DynoxideError::ValidationException(
461 "Item size has exceeded the maximum allowed size".to_string(),
462 ));
463 }
464
465 let old_for_stream = existing_json.clone();
467
468 let item_json = serde_json::to_string(&item)
469 .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
470 let hash_prefix = update
471 .key
472 .get(&key_schema.partition_key)
473 .map(crate::storage::compute_hash_prefix)
474 .unwrap_or_default();
475 storage
476 .put_item_with_hash(&update.table_name, &pk, &sk, &item_json, size, &hash_prefix)
477 .await?;
478
479 let _ = super::gsi::maintain_gsis_after_write(
480 storage,
481 &update.table_name,
482 &meta,
483 &pk,
484 &sk,
485 &item,
486 &key_schema.partition_key,
487 key_schema.sort_key.as_deref(),
488 )
489 .await?;
490
491 super::lsi::maintain_lsis_after_write(
492 storage,
493 &update.table_name,
494 &meta,
495 &pk,
496 &sk,
497 &item,
498 &key_schema.partition_key,
499 key_schema.sort_key.as_deref(),
500 )
501 .await?;
502
503 let old_item: Option<Item> = old_for_stream.and_then(|j| serde_json::from_str(&j).ok());
505 crate::streams::record_stream_event(storage, &meta, old_item.as_ref(), Some(&item)).await?;
506
507 Ok(())
508}
509
510async fn execute_delete<S: StorageBackend>(storage: &S, delete: &TransactDelete) -> Result<()> {
511 crate::validation::validate_table_name(&delete.table_name)?;
512 let meta = helpers::require_table_for_item_op(storage, &delete.table_name).await?;
513 let key_schema = helpers::parse_key_schema(&meta)?;
514
515 helpers::validate_key_only(&delete.key, &key_schema)?;
516 let (pk, sk) = helpers::extract_key_strings(&delete.key, &key_schema)?;
518
519 validate_eav_nesting(&delete.expression_attribute_values)?;
520
521 let tracker = crate::expressions::TrackedExpressionAttributes::new(
522 &delete.expression_attribute_names,
523 &delete.expression_attribute_values,
524 );
525
526 if let Some(ref cond_expr) = delete.condition_expression {
528 if let Ok(parsed) = crate::expressions::condition::parse(cond_expr) {
529 tracker.track_condition_expr(&parsed);
530 }
531 }
532
533 if let Some(ref cond_expr) = delete.condition_expression {
535 let existing_json = storage.get_item(&delete.table_name, &pk, &sk).await?;
536 let existing_item: Item = existing_json
537 .as_ref()
538 .and_then(|j| serde_json::from_str(j).ok())
539 .unwrap_or_default();
540
541 let return_item = if delete.return_values_on_condition_check_failure.as_deref()
542 == Some("ALL_OLD")
543 && !existing_item.is_empty()
544 {
545 Some(existing_item.clone())
546 } else {
547 None
548 };
549 check_condition_tracked(cond_expr, &existing_item, &tracker, return_item)?;
550 }
551
552 tracker.check_unused()?;
553
554 let old_json = storage.delete_item(&delete.table_name, &pk, &sk).await?;
555 let _ = super::gsi::maintain_gsis_after_delete(storage, &delete.table_name, &meta, &pk, &sk)
556 .await?;
557 super::lsi::maintain_lsis_after_delete(storage, &delete.table_name, &meta, &pk, &sk).await?;
558
559 let old_item: Option<Item> = old_json.and_then(|j| serde_json::from_str(&j).ok());
561 if old_item.is_some() {
562 crate::streams::record_stream_event(storage, &meta, old_item.as_ref(), None).await?;
563 }
564
565 Ok(())
566}
567
568async fn execute_condition_check<S: StorageBackend>(
569 storage: &S,
570 check: &TransactConditionCheck,
571) -> Result<()> {
572 crate::validation::validate_table_name(&check.table_name)?;
573 let meta = helpers::require_table_for_item_op(storage, &check.table_name).await?;
574 let key_schema = helpers::parse_key_schema(&meta)?;
575
576 helpers::validate_key_only(&check.key, &key_schema)?;
577 let (pk, sk) = helpers::extract_key_strings(&check.key, &key_schema)?;
579
580 let existing_json = storage.get_item(&check.table_name, &pk, &sk).await?;
581 let existing_item: Item = existing_json
582 .as_ref()
583 .and_then(|j| serde_json::from_str(j).ok())
584 .unwrap_or_default();
585
586 validate_eav_nesting(&check.expression_attribute_values)?;
587
588 let tracker = crate::expressions::TrackedExpressionAttributes::new(
589 &check.expression_attribute_names,
590 &check.expression_attribute_values,
591 );
592
593 if let Ok(parsed) = crate::expressions::condition::parse(&check.condition_expression) {
595 tracker.track_condition_expr(&parsed);
596 }
597
598 let return_item = if check.return_values_on_condition_check_failure.as_deref()
599 == Some("ALL_OLD")
600 && !existing_item.is_empty()
601 {
602 Some(existing_item.clone())
603 } else {
604 None
605 };
606 check_condition_tracked(
607 &check.condition_expression,
608 &existing_item,
609 &tracker,
610 return_item,
611 )?;
612
613 tracker.check_unused()?;
614 Ok(())
615}
616
617fn check_condition_tracked(
618 expression: &str,
619 item: &Item,
620 tracker: &crate::expressions::TrackedExpressionAttributes,
621 return_item_on_failure: Option<Item>,
622) -> Result<()> {
623 let parsed = crate::expressions::condition::parse(expression)
624 .map_err(DynoxideError::ValidationException)?;
625 let result = crate::expressions::condition::evaluate(&parsed, item, tracker)
626 .map_err(DynoxideError::ValidationException)?;
627 if !result {
628 return Err(DynoxideError::ConditionalCheckFailedException(
629 "The conditional request failed".to_string(),
630 return_item_on_failure,
631 ));
632 }
633 Ok(())
634}
635
636fn get_action_table_and_size(item: &TransactWriteItem) -> (String, usize) {
642 if let Some(ref put) = item.put {
643 (put.table_name.clone(), types::item_size(&put.item))
644 } else if let Some(ref update) = item.update {
645 let key_size = types::item_size(&update.key);
646 let eav_size = update
647 .expression_attribute_values
648 .as_ref()
649 .map(|vals| vals.values().map(|v| v.size()).sum::<usize>())
650 .unwrap_or(0);
651 (update.table_name.clone(), key_size + eav_size)
652 } else if let Some(ref delete) = item.delete {
653 (delete.table_name.clone(), types::item_size(&delete.key))
654 } else if let Some(ref check) = item.condition_check {
655 (check.table_name.clone(), types::item_size(&check.key))
656 } else {
657 (String::new(), 0)
658 }
659}
660
661async fn target_for<S: StorageBackend>(
665 storage: &S,
666 table_name: &str,
667 key_source: &HashMap<String, AttributeValue>,
668) -> Result<Option<String>> {
669 crate::validation::validate_table_name(table_name)?;
670 let meta = helpers::require_table_for_item_op(storage, table_name).await?;
671 let key_schema = helpers::parse_key_schema(&meta)?;
672 match helpers::extract_key_strings(key_source, &key_schema) {
673 Ok((pk, sk)) => Ok(Some(format!("{table_name}#{pk}#{sk}"))),
674 Err(_) => Ok(None),
675 }
676}
677
678async fn get_item_target<S: StorageBackend>(
681 storage: &S,
682 item: &TransactWriteItem,
683) -> Result<Option<String>> {
684 if let Some(ref put) = item.put {
685 target_for(storage, &put.table_name, &put.item).await
686 } else if let Some(ref update) = item.update {
687 target_for(storage, &update.table_name, &update.key).await
688 } else if let Some(ref delete) = item.delete {
689 target_for(storage, &delete.table_name, &delete.key).await
690 } else if let Some(ref check) = item.condition_check {
691 target_for(storage, &check.table_name, &check.key).await
692 } else {
693 Err(DynoxideError::ValidationException(
694 "TransactItem must contain exactly one action".to_string(),
695 ))
696 }
697}