1use crate::actions::helpers;
2use crate::errors::{CancellationReason, DynoxideError, Result};
3use crate::storage_backend::StorageBackend;
4use crate::types::{self, AttributeValue, Item};
5use serde::{Deserialize, Serialize};
6use std::collections::{HashMap, HashSet};
7
8#[derive(Debug, Clone, Default, Deserialize, Serialize)]
9pub struct TransactWriteItemsRequest {
10 #[serde(rename = "TransactItems")]
11 pub transact_items: Vec<TransactWriteItem>,
12 #[serde(rename = "ClientRequestToken", default)]
13 pub client_request_token: Option<String>,
14 #[serde(rename = "ReturnConsumedCapacity", default)]
15 pub return_consumed_capacity: Option<String>,
16 #[serde(rename = "ReturnItemCollectionMetrics", default)]
17 pub return_item_collection_metrics: Option<String>,
18}
19
20#[derive(Debug, Clone, Default, Deserialize, Serialize)]
21pub struct TransactWriteItem {
22 #[serde(rename = "Put", default)]
23 pub put: Option<TransactPut>,
24 #[serde(rename = "Update", default)]
25 pub update: Option<TransactUpdate>,
26 #[serde(rename = "Delete", default)]
27 pub delete: Option<TransactDelete>,
28 #[serde(rename = "ConditionCheck", default)]
29 pub condition_check: Option<TransactConditionCheck>,
30}
31
32#[derive(Debug, Clone, Default, Deserialize, Serialize)]
33pub struct TransactPut {
34 #[serde(rename = "TableName")]
35 pub table_name: String,
36 #[serde(rename = "Item")]
37 pub item: Item,
38 #[serde(rename = "ConditionExpression", default)]
39 pub condition_expression: Option<String>,
40 #[serde(rename = "ExpressionAttributeNames", default)]
41 pub expression_attribute_names: Option<HashMap<String, String>>,
42 #[serde(rename = "ExpressionAttributeValues", default)]
43 pub expression_attribute_values: Option<HashMap<String, AttributeValue>>,
44 #[serde(rename = "ReturnValuesOnConditionCheckFailure", default)]
45 pub return_values_on_condition_check_failure: Option<String>,
46}
47
48#[derive(Debug, Clone, Default, Deserialize, Serialize)]
49pub struct TransactUpdate {
50 #[serde(rename = "TableName")]
51 pub table_name: String,
52 #[serde(rename = "Key")]
53 pub key: HashMap<String, AttributeValue>,
54 #[serde(rename = "UpdateExpression")]
55 pub update_expression: String,
56 #[serde(rename = "ConditionExpression", default)]
57 pub condition_expression: Option<String>,
58 #[serde(rename = "ExpressionAttributeNames", default)]
59 pub expression_attribute_names: Option<HashMap<String, String>>,
60 #[serde(rename = "ExpressionAttributeValues", default)]
61 pub expression_attribute_values: Option<HashMap<String, AttributeValue>>,
62 #[serde(rename = "ReturnValuesOnConditionCheckFailure", default)]
63 pub return_values_on_condition_check_failure: Option<String>,
64}
65
66#[derive(Debug, Clone, Default, Deserialize, Serialize)]
67pub struct TransactDelete {
68 #[serde(rename = "TableName")]
69 pub table_name: String,
70 #[serde(rename = "Key")]
71 pub key: HashMap<String, AttributeValue>,
72 #[serde(rename = "ConditionExpression", default)]
73 pub condition_expression: Option<String>,
74 #[serde(rename = "ExpressionAttributeNames", default)]
75 pub expression_attribute_names: Option<HashMap<String, String>>,
76 #[serde(rename = "ExpressionAttributeValues", default)]
77 pub expression_attribute_values: Option<HashMap<String, AttributeValue>>,
78 #[serde(rename = "ReturnValuesOnConditionCheckFailure", default)]
79 pub return_values_on_condition_check_failure: Option<String>,
80}
81
82#[derive(Debug, Clone, Default, Deserialize, Serialize)]
83pub struct TransactConditionCheck {
84 #[serde(rename = "TableName")]
85 pub table_name: String,
86 #[serde(rename = "Key")]
87 pub key: HashMap<String, AttributeValue>,
88 #[serde(rename = "ConditionExpression")]
89 pub condition_expression: String,
90 #[serde(rename = "ExpressionAttributeNames", default)]
91 pub expression_attribute_names: Option<HashMap<String, String>>,
92 #[serde(rename = "ExpressionAttributeValues", default)]
93 pub expression_attribute_values: Option<HashMap<String, AttributeValue>>,
94 #[serde(rename = "ReturnValuesOnConditionCheckFailure", default)]
95 pub return_values_on_condition_check_failure: Option<String>,
96}
97
98#[derive(Debug, Clone, Default, Serialize)]
99pub struct TransactWriteItemsResponse {
100 #[serde(rename = "ConsumedCapacity", skip_serializing_if = "Option::is_none")]
101 pub consumed_capacity: Option<Vec<crate::types::ConsumedCapacity>>,
102 #[serde(
105 rename = "ItemCollectionMetrics",
106 skip_serializing_if = "Option::is_none"
107 )]
108 pub item_collection_metrics: Option<HashMap<String, Vec<crate::types::ItemCollectionMetrics>>>,
109}
110
111pub async fn execute<S: StorageBackend>(
112 storage: &S,
113 request: TransactWriteItemsRequest,
114) -> Result<TransactWriteItemsResponse> {
115 let items = &request.transact_items;
116
117 if items.is_empty() {
119 return Err(DynoxideError::ValidationException(
120 "1 validation error detected: Value '[]' at 'transactItems' failed to satisfy constraint: Member must have length greater than or equal to 1".to_string(),
121 ));
122 }
123
124 if items.len() > 100 {
130 let dump = format!("{items:?}");
131 return Err(DynoxideError::ValidationException(format!(
132 "1 validation error detected: Value '[{dump}]' at 'transactItems' failed to satisfy constraint: Member must have length less than or equal to 100"
133 )));
134 }
135
136 let mut seen_targets = HashSet::new();
139 for item in items {
140 if let Some(target) = get_item_target(storage, item).await? {
141 if !seen_targets.insert(target) {
142 return Err(DynoxideError::ValidationException(
143 "Transaction request cannot include multiple operations on one item"
144 .to_string(),
145 ));
146 }
147 }
148 }
149
150 let total_size: usize = items.iter().map(|i| get_action_table_and_size(i).1).sum();
152 if total_size > 4 * 1024 * 1024 {
153 return Err(DynoxideError::ValidationException(
154 "Collection size of items exceeded, which can also be caused by the aggregate size of the items in the transaction exceeding the 4MB limit".to_string(),
155 ));
156 }
157
158 helpers::with_write_transaction(storage, execute_within_transaction(storage, items)).await?;
160
161 let consumed_capacity = if matches!(
163 request.return_consumed_capacity.as_deref(),
164 Some("TOTAL") | Some("INDEXES")
165 ) {
166 let mut table_units: HashMap<String, f64> = HashMap::new();
171 for item in items {
172 let (table, size) = get_action_table_and_size(item);
173 *table_units.entry(table).or_default() += crate::types::TRANSACTIONAL_CAPACITY_FACTOR
174 * crate::types::write_capacity_units(size);
175 }
176 let caps: Vec<_> = table_units
177 .iter()
178 .filter_map(|(table, &units)| {
179 crate::types::transactional_write_capacity(
180 table,
181 units,
182 &request.return_consumed_capacity,
183 )
184 })
185 .collect();
186 Some(caps)
187 } else {
188 None
189 };
190 Ok(TransactWriteItemsResponse {
191 consumed_capacity,
192 item_collection_metrics: None,
193 })
194}
195
196async fn execute_within_transaction<S: StorageBackend>(
197 storage: &S,
198 items: &[TransactWriteItem],
199) -> Result<()> {
200 let mut cancellation_reasons: Vec<CancellationReason> = Vec::with_capacity(items.len());
201 let mut has_failure = false;
202
203 for item in items {
204 let reason = execute_single_action(storage, item).await;
205 match reason {
206 Ok(()) => {
207 cancellation_reasons.push(CancellationReason {
208 code: "None".to_string(),
209 message: None,
210 item: None,
211 });
212 }
213 Err(e) => {
214 if matches!(e, DynoxideError::KeyEmptyValueValidation(_)) {
218 return Err(e);
219 }
220 has_failure = true;
221 let message = Some(e.to_string());
222 let (code, item) = match e {
223 DynoxideError::ConditionalCheckFailedException(_, item) => {
224 ("ConditionalCheckFailed".to_string(), item)
225 }
226 DynoxideError::ValidationException(_) => ("ValidationError".to_string(), None),
227 _ => ("InternalError".to_string(), None),
228 };
229 cancellation_reasons.push(CancellationReason {
230 code,
231 message,
232 item,
233 });
234 }
235 }
236 }
237
238 if has_failure {
239 let codes: Vec<&str> = cancellation_reasons
240 .iter()
241 .map(|r| r.code.as_str())
242 .collect();
243 let message = format!(
244 "Transaction cancelled, please refer cancellation reasons for specific reasons [{}]",
245 codes.join(", ")
246 );
247 return Err(DynoxideError::TransactionCanceledException(
248 message,
249 cancellation_reasons,
250 ));
251 }
252
253 Ok(())
254}
255
256async fn execute_single_action<S: StorageBackend>(
257 storage: &S,
258 item: &TransactWriteItem,
259) -> Result<()> {
260 if let Some(ref put) = item.put {
261 execute_put(storage, put).await
262 } else if let Some(ref update) = item.update {
263 execute_update(storage, update).await
264 } else if let Some(ref delete) = item.delete {
265 execute_delete(storage, delete).await
266 } else if let Some(ref check) = item.condition_check {
267 execute_condition_check(storage, check).await
268 } else {
269 Err(DynoxideError::ValidationException(
270 "TransactItem must contain exactly one of Put, Update, Delete, or ConditionCheck"
271 .to_string(),
272 ))
273 }
274}
275
276async fn execute_put<S: StorageBackend>(storage: &S, put: &TransactPut) -> Result<()> {
277 crate::validation::validate_table_name(&put.table_name)?;
278 let meta = helpers::require_table_for_item_op(storage, &put.table_name).await?;
279 let key_schema = helpers::parse_key_schema(&meta)?;
280
281 helpers::validate_item_keys(&put.item, &key_schema, &meta)?;
282 crate::validation::validate_item_attribute_values(&put.item)?;
283
284 let mut item = put.item.clone();
286 crate::validation::normalize_item_sets(&mut item);
287
288 let size = types::item_size(&item);
289 if size > types::MAX_ITEM_SIZE {
290 return Err(DynoxideError::ValidationException(
291 "Item size has exceeded the maximum allowed size".to_string(),
292 ));
293 }
294
295 let (pk, sk) = helpers::extract_key_strings(&item, &key_schema)?;
297
298 let tracker = crate::expressions::TrackedExpressionAttributes::new(
299 &put.expression_attribute_names,
300 &put.expression_attribute_values,
301 );
302
303 if let Some(ref cond_expr) = put.condition_expression {
305 if let Ok(parsed) = crate::expressions::condition::parse(cond_expr) {
306 tracker.track_condition_expr(&parsed);
307 }
308 }
309
310 if let Some(ref cond_expr) = put.condition_expression {
312 let existing_json = storage.get_item(&put.table_name, &pk, &sk).await?;
313 let existing_item: Item = existing_json
314 .as_ref()
315 .and_then(|j| serde_json::from_str(j).ok())
316 .unwrap_or_default();
317
318 let return_item = if put.return_values_on_condition_check_failure.as_deref()
319 == Some("ALL_OLD")
320 && !existing_item.is_empty()
321 {
322 Some(existing_item.clone())
323 } else {
324 None
325 };
326 check_condition_tracked(cond_expr, &existing_item, &tracker, return_item)?;
327 }
328
329 tracker.check_unused()?;
330
331 let item_json = serde_json::to_string(&item)
332 .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
333 let hash_prefix = item
334 .get(&key_schema.partition_key)
335 .map(crate::storage::compute_hash_prefix)
336 .unwrap_or_default();
337 let old_json = storage
338 .put_item_with_hash(&put.table_name, &pk, &sk, &item_json, size, &hash_prefix)
339 .await?;
340
341 let _ = super::gsi::maintain_gsis_after_write(
342 storage,
343 &put.table_name,
344 &meta,
345 &pk,
346 &sk,
347 &item,
348 &key_schema.partition_key,
349 key_schema.sort_key.as_deref(),
350 )
351 .await?;
352
353 super::lsi::maintain_lsis_after_write(
354 storage,
355 &put.table_name,
356 &meta,
357 &pk,
358 &sk,
359 &item,
360 &key_schema.partition_key,
361 key_schema.sort_key.as_deref(),
362 )
363 .await?;
364
365 let old_item: Option<Item> = old_json.and_then(|j| serde_json::from_str(&j).ok());
367 crate::streams::record_stream_event(storage, &meta, old_item.as_ref(), Some(&item)).await?;
368
369 Ok(())
370}
371
372async fn execute_update<S: StorageBackend>(storage: &S, update: &TransactUpdate) -> Result<()> {
373 crate::validation::validate_table_name(&update.table_name)?;
374 let meta = helpers::require_table_for_item_op(storage, &update.table_name).await?;
375 let key_schema = helpers::parse_key_schema(&meta)?;
376
377 helpers::validate_key_only(&update.key, &key_schema)?;
378 let (pk, sk) = helpers::extract_key_strings(&update.key, &key_schema)?;
380
381 let existing_json = storage.get_item(&update.table_name, &pk, &sk).await?;
382 let existing_item: Item = existing_json
383 .as_ref()
384 .and_then(|j| serde_json::from_str(j).ok())
385 .unwrap_or_default();
386
387 let tracker = crate::expressions::TrackedExpressionAttributes::new(
388 &update.expression_attribute_names,
389 &update.expression_attribute_values,
390 );
391
392 if let Some(ref cond_expr) = update.condition_expression {
394 if let Ok(parsed) = crate::expressions::condition::parse(cond_expr) {
395 tracker.track_condition_expr(&parsed);
396 }
397 }
398 if let Ok(parsed) = crate::expressions::update::parse(&update.update_expression) {
399 tracker.track_update_expr(&parsed);
400 }
401
402 if let Some(ref cond_expr) = update.condition_expression {
406 let return_item = if update.return_values_on_condition_check_failure.as_deref()
407 == Some("ALL_OLD")
408 && existing_json.is_some()
409 {
410 Some(existing_item.clone())
411 } else {
412 None
413 };
414 check_condition_tracked(cond_expr, &existing_item, &tracker, return_item)?;
415 }
416
417 let mut item = existing_item;
420 if existing_json.is_none() {
421 for (k, v) in &update.key {
422 item.insert(k.clone(), v.clone());
423 }
424 }
425 let before_item = item.clone();
426
427 let parsed = crate::expressions::update::parse(&update.update_expression)
429 .map_err(DynoxideError::ValidationException)?;
430 crate::expressions::update::apply(&mut item, &parsed, &tracker)
431 .map_err(DynoxideError::ValidationException)?;
432
433 tracker.check_unused()?;
434
435 crate::validation::validate_item_attribute_values(&item)?;
437 crate::validation::normalize_item_sets(&mut item);
438
439 helpers::validate_updated_index_keys(&before_item, &item, &meta)?;
441
442 let size = types::item_size(&item);
443 if size > types::MAX_ITEM_SIZE {
444 return Err(DynoxideError::ValidationException(
445 "Item size has exceeded the maximum allowed size".to_string(),
446 ));
447 }
448
449 let old_for_stream = existing_json.clone();
451
452 let item_json = serde_json::to_string(&item)
453 .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
454 let hash_prefix = update
455 .key
456 .get(&key_schema.partition_key)
457 .map(crate::storage::compute_hash_prefix)
458 .unwrap_or_default();
459 storage
460 .put_item_with_hash(&update.table_name, &pk, &sk, &item_json, size, &hash_prefix)
461 .await?;
462
463 let _ = super::gsi::maintain_gsis_after_write(
464 storage,
465 &update.table_name,
466 &meta,
467 &pk,
468 &sk,
469 &item,
470 &key_schema.partition_key,
471 key_schema.sort_key.as_deref(),
472 )
473 .await?;
474
475 super::lsi::maintain_lsis_after_write(
476 storage,
477 &update.table_name,
478 &meta,
479 &pk,
480 &sk,
481 &item,
482 &key_schema.partition_key,
483 key_schema.sort_key.as_deref(),
484 )
485 .await?;
486
487 let old_item: Option<Item> = old_for_stream.and_then(|j| serde_json::from_str(&j).ok());
489 crate::streams::record_stream_event(storage, &meta, old_item.as_ref(), Some(&item)).await?;
490
491 Ok(())
492}
493
494async fn execute_delete<S: StorageBackend>(storage: &S, delete: &TransactDelete) -> Result<()> {
495 crate::validation::validate_table_name(&delete.table_name)?;
496 let meta = helpers::require_table_for_item_op(storage, &delete.table_name).await?;
497 let key_schema = helpers::parse_key_schema(&meta)?;
498
499 helpers::validate_key_only(&delete.key, &key_schema)?;
500 let (pk, sk) = helpers::extract_key_strings(&delete.key, &key_schema)?;
502
503 let tracker = crate::expressions::TrackedExpressionAttributes::new(
504 &delete.expression_attribute_names,
505 &delete.expression_attribute_values,
506 );
507
508 if let Some(ref cond_expr) = delete.condition_expression {
510 if let Ok(parsed) = crate::expressions::condition::parse(cond_expr) {
511 tracker.track_condition_expr(&parsed);
512 }
513 }
514
515 if let Some(ref cond_expr) = delete.condition_expression {
517 let existing_json = storage.get_item(&delete.table_name, &pk, &sk).await?;
518 let existing_item: Item = existing_json
519 .as_ref()
520 .and_then(|j| serde_json::from_str(j).ok())
521 .unwrap_or_default();
522
523 let return_item = if delete.return_values_on_condition_check_failure.as_deref()
524 == Some("ALL_OLD")
525 && !existing_item.is_empty()
526 {
527 Some(existing_item.clone())
528 } else {
529 None
530 };
531 check_condition_tracked(cond_expr, &existing_item, &tracker, return_item)?;
532 }
533
534 tracker.check_unused()?;
535
536 let old_json = storage.delete_item(&delete.table_name, &pk, &sk).await?;
537 let _ = super::gsi::maintain_gsis_after_delete(storage, &delete.table_name, &meta, &pk, &sk)
538 .await?;
539 super::lsi::maintain_lsis_after_delete(storage, &delete.table_name, &meta, &pk, &sk).await?;
540
541 let old_item: Option<Item> = old_json.and_then(|j| serde_json::from_str(&j).ok());
543 if old_item.is_some() {
544 crate::streams::record_stream_event(storage, &meta, old_item.as_ref(), None).await?;
545 }
546
547 Ok(())
548}
549
550async fn execute_condition_check<S: StorageBackend>(
551 storage: &S,
552 check: &TransactConditionCheck,
553) -> Result<()> {
554 crate::validation::validate_table_name(&check.table_name)?;
555 let meta = helpers::require_table_for_item_op(storage, &check.table_name).await?;
556 let key_schema = helpers::parse_key_schema(&meta)?;
557
558 helpers::validate_key_only(&check.key, &key_schema)?;
559 let (pk, sk) = helpers::extract_key_strings(&check.key, &key_schema)?;
561
562 let existing_json = storage.get_item(&check.table_name, &pk, &sk).await?;
563 let existing_item: Item = existing_json
564 .as_ref()
565 .and_then(|j| serde_json::from_str(j).ok())
566 .unwrap_or_default();
567
568 let tracker = crate::expressions::TrackedExpressionAttributes::new(
569 &check.expression_attribute_names,
570 &check.expression_attribute_values,
571 );
572
573 if let Ok(parsed) = crate::expressions::condition::parse(&check.condition_expression) {
575 tracker.track_condition_expr(&parsed);
576 }
577
578 let return_item = if check.return_values_on_condition_check_failure.as_deref()
579 == Some("ALL_OLD")
580 && !existing_item.is_empty()
581 {
582 Some(existing_item.clone())
583 } else {
584 None
585 };
586 check_condition_tracked(
587 &check.condition_expression,
588 &existing_item,
589 &tracker,
590 return_item,
591 )?;
592
593 tracker.check_unused()?;
594 Ok(())
595}
596
597fn check_condition_tracked(
598 expression: &str,
599 item: &Item,
600 tracker: &crate::expressions::TrackedExpressionAttributes,
601 return_item_on_failure: Option<Item>,
602) -> Result<()> {
603 let parsed = crate::expressions::condition::parse(expression)
604 .map_err(DynoxideError::ValidationException)?;
605 let result = crate::expressions::condition::evaluate(&parsed, item, tracker)
606 .map_err(DynoxideError::ValidationException)?;
607 if !result {
608 return Err(DynoxideError::ConditionalCheckFailedException(
609 "The conditional request failed".to_string(),
610 return_item_on_failure,
611 ));
612 }
613 Ok(())
614}
615
616fn get_action_table_and_size(item: &TransactWriteItem) -> (String, usize) {
622 if let Some(ref put) = item.put {
623 (put.table_name.clone(), types::item_size(&put.item))
624 } else if let Some(ref update) = item.update {
625 let key_size = types::item_size(&update.key);
626 let eav_size = update
627 .expression_attribute_values
628 .as_ref()
629 .map(|vals| vals.values().map(|v| v.size()).sum::<usize>())
630 .unwrap_or(0);
631 (update.table_name.clone(), key_size + eav_size)
632 } else if let Some(ref delete) = item.delete {
633 (delete.table_name.clone(), types::item_size(&delete.key))
634 } else if let Some(ref check) = item.condition_check {
635 (check.table_name.clone(), types::item_size(&check.key))
636 } else {
637 (String::new(), 0)
638 }
639}
640
641async fn target_for<S: StorageBackend>(
645 storage: &S,
646 table_name: &str,
647 key_source: &HashMap<String, AttributeValue>,
648) -> Result<Option<String>> {
649 crate::validation::validate_table_name(table_name)?;
650 let meta = helpers::require_table_for_item_op(storage, table_name).await?;
651 let key_schema = helpers::parse_key_schema(&meta)?;
652 match helpers::extract_key_strings(key_source, &key_schema) {
653 Ok((pk, sk)) => Ok(Some(format!("{table_name}#{pk}#{sk}"))),
654 Err(_) => Ok(None),
655 }
656}
657
658async fn get_item_target<S: StorageBackend>(
661 storage: &S,
662 item: &TransactWriteItem,
663) -> Result<Option<String>> {
664 if let Some(ref put) = item.put {
665 target_for(storage, &put.table_name, &put.item).await
666 } else if let Some(ref update) = item.update {
667 target_for(storage, &update.table_name, &update.key).await
668 } else if let Some(ref delete) = item.delete {
669 target_for(storage, &delete.table_name, &delete.key).await
670 } else if let Some(ref check) = item.condition_check {
671 target_for(storage, &check.table_name, &check.key).await
672 } else {
673 Err(DynoxideError::ValidationException(
674 "TransactItem must contain exactly one action".to_string(),
675 ))
676 }
677}