1use crate::actions::helpers;
2use crate::errors::{CancellationReason, DynoxideError, Result};
3use crate::storage::Storage;
4use crate::types::{self, AttributeValue, Item};
5use serde::{Deserialize, Serialize};
6use std::collections::{HashMap, HashSet};
7
8#[derive(Debug, Clone, Default, Deserialize, Serialize)]
9pub struct TransactWriteItemsRequest {
10 #[serde(rename = "TransactItems")]
11 pub transact_items: Vec<TransactWriteItem>,
12 #[serde(rename = "ClientRequestToken", default)]
13 pub client_request_token: Option<String>,
14 #[serde(rename = "ReturnConsumedCapacity", default)]
15 pub return_consumed_capacity: Option<String>,
16 #[serde(rename = "ReturnItemCollectionMetrics", default)]
17 pub return_item_collection_metrics: Option<String>,
18}
19
20#[derive(Debug, Clone, Default, Deserialize, Serialize)]
21pub struct TransactWriteItem {
22 #[serde(rename = "Put", default)]
23 pub put: Option<TransactPut>,
24 #[serde(rename = "Update", default)]
25 pub update: Option<TransactUpdate>,
26 #[serde(rename = "Delete", default)]
27 pub delete: Option<TransactDelete>,
28 #[serde(rename = "ConditionCheck", default)]
29 pub condition_check: Option<TransactConditionCheck>,
30}
31
32#[derive(Debug, Clone, Default, Deserialize, Serialize)]
33pub struct TransactPut {
34 #[serde(rename = "TableName")]
35 pub table_name: String,
36 #[serde(rename = "Item")]
37 pub item: Item,
38 #[serde(rename = "ConditionExpression", default)]
39 pub condition_expression: Option<String>,
40 #[serde(rename = "ExpressionAttributeNames", default)]
41 pub expression_attribute_names: Option<HashMap<String, String>>,
42 #[serde(rename = "ExpressionAttributeValues", default)]
43 pub expression_attribute_values: Option<HashMap<String, AttributeValue>>,
44 #[serde(rename = "ReturnValuesOnConditionCheckFailure", default)]
45 pub return_values_on_condition_check_failure: Option<String>,
46}
47
48#[derive(Debug, Clone, Default, Deserialize, Serialize)]
49pub struct TransactUpdate {
50 #[serde(rename = "TableName")]
51 pub table_name: String,
52 #[serde(rename = "Key")]
53 pub key: HashMap<String, AttributeValue>,
54 #[serde(rename = "UpdateExpression")]
55 pub update_expression: String,
56 #[serde(rename = "ConditionExpression", default)]
57 pub condition_expression: Option<String>,
58 #[serde(rename = "ExpressionAttributeNames", default)]
59 pub expression_attribute_names: Option<HashMap<String, String>>,
60 #[serde(rename = "ExpressionAttributeValues", default)]
61 pub expression_attribute_values: Option<HashMap<String, AttributeValue>>,
62 #[serde(rename = "ReturnValuesOnConditionCheckFailure", default)]
63 pub return_values_on_condition_check_failure: Option<String>,
64}
65
66#[derive(Debug, Clone, Default, Deserialize, Serialize)]
67pub struct TransactDelete {
68 #[serde(rename = "TableName")]
69 pub table_name: String,
70 #[serde(rename = "Key")]
71 pub key: HashMap<String, AttributeValue>,
72 #[serde(rename = "ConditionExpression", default)]
73 pub condition_expression: Option<String>,
74 #[serde(rename = "ExpressionAttributeNames", default)]
75 pub expression_attribute_names: Option<HashMap<String, String>>,
76 #[serde(rename = "ExpressionAttributeValues", default)]
77 pub expression_attribute_values: Option<HashMap<String, AttributeValue>>,
78 #[serde(rename = "ReturnValuesOnConditionCheckFailure", default)]
79 pub return_values_on_condition_check_failure: Option<String>,
80}
81
82#[derive(Debug, Clone, Default, Deserialize, Serialize)]
83pub struct TransactConditionCheck {
84 #[serde(rename = "TableName")]
85 pub table_name: String,
86 #[serde(rename = "Key")]
87 pub key: HashMap<String, AttributeValue>,
88 #[serde(rename = "ConditionExpression")]
89 pub condition_expression: String,
90 #[serde(rename = "ExpressionAttributeNames", default)]
91 pub expression_attribute_names: Option<HashMap<String, String>>,
92 #[serde(rename = "ExpressionAttributeValues", default)]
93 pub expression_attribute_values: Option<HashMap<String, AttributeValue>>,
94 #[serde(rename = "ReturnValuesOnConditionCheckFailure", default)]
95 pub return_values_on_condition_check_failure: Option<String>,
96}
97
98#[derive(Debug, Clone, Default, Serialize)]
99pub struct TransactWriteItemsResponse {
100 #[serde(rename = "ConsumedCapacity", skip_serializing_if = "Option::is_none")]
101 pub consumed_capacity: Option<Vec<crate::types::ConsumedCapacity>>,
102 #[serde(
105 rename = "ItemCollectionMetrics",
106 skip_serializing_if = "Option::is_none"
107 )]
108 pub item_collection_metrics: Option<HashMap<String, Vec<crate::types::ItemCollectionMetrics>>>,
109}
110
111pub fn execute(
112 storage: &Storage,
113 request: TransactWriteItemsRequest,
114) -> Result<TransactWriteItemsResponse> {
115 let items = &request.transact_items;
116
117 if items.is_empty() {
119 return Err(DynoxideError::ValidationException(
120 "1 validation error detected: Value '[]' at 'transactItems' failed to satisfy constraint: Member must have length greater than or equal to 1".to_string(),
121 ));
122 }
123
124 if items.len() > 100 {
130 let dump = format!("{items:?}");
131 return Err(DynoxideError::ValidationException(format!(
132 "1 validation error detected: Value '[{dump}]' at 'transactItems' failed to satisfy constraint: Member must have length less than or equal to 100"
133 )));
134 }
135
136 let mut seen_targets = HashSet::new();
138 for item in items {
139 let target = get_item_target(storage, item)?;
140 if !seen_targets.insert(target) {
141 return Err(DynoxideError::ValidationException(
142 "Transaction request cannot include multiple operations on one item".to_string(),
143 ));
144 }
145 }
146
147 let total_size: usize = items.iter().map(|i| get_action_table_and_size(i).1).sum();
149 if total_size > 4 * 1024 * 1024 {
150 return Err(DynoxideError::ValidationException(
151 "Collection size of items exceeded, which can also be caused by the aggregate size of the items in the transaction exceeding the 4MB limit".to_string(),
152 ));
153 }
154
155 storage.begin_transaction()?;
157
158 let result = execute_within_transaction(storage, items);
159
160 match result {
161 Ok(()) => {
162 storage.commit()?;
163 let consumed_capacity = if matches!(
165 request.return_consumed_capacity.as_deref(),
166 Some("TOTAL") | Some("INDEXES")
167 ) {
168 let mut table_sizes: HashMap<String, usize> = HashMap::new();
169 for item in items {
170 let (table, size) = get_action_table_and_size(item);
171 *table_sizes.entry(table).or_default() += size;
172 }
173 let caps: Vec<_> = table_sizes
174 .iter()
175 .filter_map(|(table, &size)| {
176 crate::types::consumed_capacity(
177 table,
178 crate::types::write_capacity_units(size),
179 &request.return_consumed_capacity,
180 )
181 })
182 .collect();
183 Some(caps)
184 } else {
185 None
186 };
187 Ok(TransactWriteItemsResponse {
188 consumed_capacity,
189 item_collection_metrics: None,
190 })
191 }
192 Err(e) => {
193 if let Err(rb_err) = storage.rollback() {
194 return Err(DynoxideError::InternalServerError(format!(
195 "Transaction failed ({e}) and rollback also failed ({rb_err})"
196 )));
197 }
198 Err(e)
199 }
200 }
201}
202
203fn execute_within_transaction(storage: &Storage, items: &[TransactWriteItem]) -> Result<()> {
204 let mut cancellation_reasons: Vec<CancellationReason> = Vec::with_capacity(items.len());
205 let mut has_failure = false;
206
207 for item in items {
208 let reason = execute_single_action(storage, item);
209 match reason {
210 Ok(()) => {
211 cancellation_reasons.push(CancellationReason {
212 code: "None".to_string(),
213 message: None,
214 item: None,
215 });
216 }
217 Err(e) => {
218 has_failure = true;
219 let message = Some(e.to_string());
220 let (code, item) = match e {
221 DynoxideError::ConditionalCheckFailedException(_, item) => {
222 ("ConditionalCheckFailed".to_string(), item)
223 }
224 DynoxideError::ValidationException(_) => ("ValidationError".to_string(), None),
225 _ => ("InternalError".to_string(), None),
226 };
227 cancellation_reasons.push(CancellationReason {
228 code,
229 message,
230 item,
231 });
232 }
233 }
234 }
235
236 if has_failure {
237 let codes: Vec<&str> = cancellation_reasons
238 .iter()
239 .map(|r| r.code.as_str())
240 .collect();
241 let message = format!(
242 "Transaction cancelled, please refer cancellation reasons for specific reasons [{}]",
243 codes.join(", ")
244 );
245 return Err(DynoxideError::TransactionCanceledException(
246 message,
247 cancellation_reasons,
248 ));
249 }
250
251 Ok(())
252}
253
254fn execute_single_action(storage: &Storage, item: &TransactWriteItem) -> Result<()> {
255 if let Some(ref put) = item.put {
256 execute_put(storage, put)
257 } else if let Some(ref update) = item.update {
258 execute_update(storage, update)
259 } else if let Some(ref delete) = item.delete {
260 execute_delete(storage, delete)
261 } else if let Some(ref check) = item.condition_check {
262 execute_condition_check(storage, check)
263 } else {
264 Err(DynoxideError::ValidationException(
265 "TransactItem must contain exactly one of Put, Update, Delete, or ConditionCheck"
266 .to_string(),
267 ))
268 }
269}
270
271fn execute_put(storage: &Storage, put: &TransactPut) -> Result<()> {
272 crate::validation::validate_table_name(&put.table_name)?;
273 let meta = helpers::require_table_for_item_op(storage, &put.table_name)?;
274 let key_schema = helpers::parse_key_schema(&meta)?;
275
276 helpers::validate_item_keys(&put.item, &key_schema, &meta)?;
277 crate::validation::validate_item_attribute_values(&put.item)?;
278
279 let mut item = put.item.clone();
281 crate::validation::normalize_item_sets(&mut item);
282
283 let size = types::item_size(&item);
284 if size > types::MAX_ITEM_SIZE {
285 return Err(DynoxideError::ValidationException(
286 "Item size has exceeded the maximum allowed size".to_string(),
287 ));
288 }
289
290 let (pk, sk) = helpers::extract_key_strings(&item, &key_schema)?;
292
293 let tracker = crate::expressions::TrackedExpressionAttributes::new(
294 &put.expression_attribute_names,
295 &put.expression_attribute_values,
296 );
297
298 if let Some(ref cond_expr) = put.condition_expression {
300 if let Ok(parsed) = crate::expressions::condition::parse(cond_expr) {
301 tracker.track_condition_expr(&parsed);
302 }
303 }
304
305 if let Some(ref cond_expr) = put.condition_expression {
307 let existing_json = storage.get_item(&put.table_name, &pk, &sk)?;
308 let existing_item: Item = existing_json
309 .as_ref()
310 .and_then(|j| serde_json::from_str(j).ok())
311 .unwrap_or_default();
312
313 let return_item = if put.return_values_on_condition_check_failure.as_deref()
314 == Some("ALL_OLD")
315 && !existing_item.is_empty()
316 {
317 Some(existing_item.clone())
318 } else {
319 None
320 };
321 check_condition_tracked(cond_expr, &existing_item, &tracker, return_item)?;
322 }
323
324 tracker.check_unused()?;
325
326 let item_json = serde_json::to_string(&item)
327 .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
328 let hash_prefix = item
329 .get(&key_schema.partition_key)
330 .map(crate::storage::compute_hash_prefix)
331 .unwrap_or_default();
332 let old_json =
333 storage.put_item_with_hash(&put.table_name, &pk, &sk, &item_json, size, &hash_prefix)?;
334
335 let _ = super::gsi::maintain_gsis_after_write(
336 storage,
337 &put.table_name,
338 &meta,
339 &pk,
340 &sk,
341 &item,
342 &key_schema.partition_key,
343 key_schema.sort_key.as_deref(),
344 )?;
345
346 super::lsi::maintain_lsis_after_write(
347 storage,
348 &put.table_name,
349 &meta,
350 &pk,
351 &sk,
352 &item,
353 &key_schema.partition_key,
354 key_schema.sort_key.as_deref(),
355 )?;
356
357 let old_item: Option<Item> = old_json.and_then(|j| serde_json::from_str(&j).ok());
359 crate::streams::record_stream_event(storage, &meta, old_item.as_ref(), Some(&item))?;
360
361 Ok(())
362}
363
364fn execute_update(storage: &Storage, update: &TransactUpdate) -> Result<()> {
365 crate::validation::validate_table_name(&update.table_name)?;
366 let meta = helpers::require_table_for_item_op(storage, &update.table_name)?;
367 let key_schema = helpers::parse_key_schema(&meta)?;
368
369 helpers::validate_key_only(&update.key, &key_schema)?;
370 let (pk, sk) = helpers::extract_key_strings(&update.key, &key_schema)?;
372
373 let existing_json = storage.get_item(&update.table_name, &pk, &sk)?;
374 let existing_item: Item = existing_json
375 .as_ref()
376 .and_then(|j| serde_json::from_str(j).ok())
377 .unwrap_or_default();
378
379 let tracker = crate::expressions::TrackedExpressionAttributes::new(
380 &update.expression_attribute_names,
381 &update.expression_attribute_values,
382 );
383
384 if let Some(ref cond_expr) = update.condition_expression {
386 if let Ok(parsed) = crate::expressions::condition::parse(cond_expr) {
387 tracker.track_condition_expr(&parsed);
388 }
389 }
390 if let Ok(parsed) = crate::expressions::update::parse(&update.update_expression) {
391 tracker.track_update_expr(&parsed);
392 }
393
394 if let Some(ref cond_expr) = update.condition_expression {
398 let return_item = if update.return_values_on_condition_check_failure.as_deref()
399 == Some("ALL_OLD")
400 && existing_json.is_some()
401 {
402 Some(existing_item.clone())
403 } else {
404 None
405 };
406 check_condition_tracked(cond_expr, &existing_item, &tracker, return_item)?;
407 }
408
409 let mut item = existing_item;
412 if existing_json.is_none() {
413 for (k, v) in &update.key {
414 item.insert(k.clone(), v.clone());
415 }
416 }
417
418 let parsed = crate::expressions::update::parse(&update.update_expression)
420 .map_err(DynoxideError::ValidationException)?;
421 crate::expressions::update::apply(&mut item, &parsed, &tracker)
422 .map_err(DynoxideError::ValidationException)?;
423
424 tracker.check_unused()?;
425
426 crate::validation::validate_item_attribute_values(&item)?;
428 crate::validation::normalize_item_sets(&mut item);
429
430 let size = types::item_size(&item);
431 if size > types::MAX_ITEM_SIZE {
432 return Err(DynoxideError::ValidationException(
433 "Item size has exceeded the maximum allowed size".to_string(),
434 ));
435 }
436
437 let old_for_stream = existing_json.clone();
439
440 let item_json = serde_json::to_string(&item)
441 .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
442 let hash_prefix = update
443 .key
444 .get(&key_schema.partition_key)
445 .map(crate::storage::compute_hash_prefix)
446 .unwrap_or_default();
447 storage.put_item_with_hash(&update.table_name, &pk, &sk, &item_json, size, &hash_prefix)?;
448
449 let _ = super::gsi::maintain_gsis_after_write(
450 storage,
451 &update.table_name,
452 &meta,
453 &pk,
454 &sk,
455 &item,
456 &key_schema.partition_key,
457 key_schema.sort_key.as_deref(),
458 )?;
459
460 super::lsi::maintain_lsis_after_write(
461 storage,
462 &update.table_name,
463 &meta,
464 &pk,
465 &sk,
466 &item,
467 &key_schema.partition_key,
468 key_schema.sort_key.as_deref(),
469 )?;
470
471 let old_item: Option<Item> = old_for_stream.and_then(|j| serde_json::from_str(&j).ok());
473 crate::streams::record_stream_event(storage, &meta, old_item.as_ref(), Some(&item))?;
474
475 Ok(())
476}
477
478fn execute_delete(storage: &Storage, delete: &TransactDelete) -> Result<()> {
479 crate::validation::validate_table_name(&delete.table_name)?;
480 let meta = helpers::require_table_for_item_op(storage, &delete.table_name)?;
481 let key_schema = helpers::parse_key_schema(&meta)?;
482
483 helpers::validate_key_only(&delete.key, &key_schema)?;
484 let (pk, sk) = helpers::extract_key_strings(&delete.key, &key_schema)?;
486
487 let tracker = crate::expressions::TrackedExpressionAttributes::new(
488 &delete.expression_attribute_names,
489 &delete.expression_attribute_values,
490 );
491
492 if let Some(ref cond_expr) = delete.condition_expression {
494 if let Ok(parsed) = crate::expressions::condition::parse(cond_expr) {
495 tracker.track_condition_expr(&parsed);
496 }
497 }
498
499 if let Some(ref cond_expr) = delete.condition_expression {
501 let existing_json = storage.get_item(&delete.table_name, &pk, &sk)?;
502 let existing_item: Item = existing_json
503 .as_ref()
504 .and_then(|j| serde_json::from_str(j).ok())
505 .unwrap_or_default();
506
507 let return_item = if delete.return_values_on_condition_check_failure.as_deref()
508 == Some("ALL_OLD")
509 && !existing_item.is_empty()
510 {
511 Some(existing_item.clone())
512 } else {
513 None
514 };
515 check_condition_tracked(cond_expr, &existing_item, &tracker, return_item)?;
516 }
517
518 tracker.check_unused()?;
519
520 let old_json = storage.delete_item(&delete.table_name, &pk, &sk)?;
521 let _ = super::gsi::maintain_gsis_after_delete(storage, &delete.table_name, &meta, &pk, &sk)?;
522 super::lsi::maintain_lsis_after_delete(storage, &delete.table_name, &meta, &pk, &sk)?;
523
524 let old_item: Option<Item> = old_json.and_then(|j| serde_json::from_str(&j).ok());
526 if old_item.is_some() {
527 crate::streams::record_stream_event(storage, &meta, old_item.as_ref(), None)?;
528 }
529
530 Ok(())
531}
532
533fn execute_condition_check(storage: &Storage, check: &TransactConditionCheck) -> Result<()> {
534 crate::validation::validate_table_name(&check.table_name)?;
535 let meta = helpers::require_table_for_item_op(storage, &check.table_name)?;
536 let key_schema = helpers::parse_key_schema(&meta)?;
537
538 helpers::validate_key_only(&check.key, &key_schema)?;
539 let (pk, sk) = helpers::extract_key_strings(&check.key, &key_schema)?;
541
542 let existing_json = storage.get_item(&check.table_name, &pk, &sk)?;
543 let existing_item: Item = existing_json
544 .as_ref()
545 .and_then(|j| serde_json::from_str(j).ok())
546 .unwrap_or_default();
547
548 let tracker = crate::expressions::TrackedExpressionAttributes::new(
549 &check.expression_attribute_names,
550 &check.expression_attribute_values,
551 );
552
553 if let Ok(parsed) = crate::expressions::condition::parse(&check.condition_expression) {
555 tracker.track_condition_expr(&parsed);
556 }
557
558 let return_item = if check.return_values_on_condition_check_failure.as_deref()
559 == Some("ALL_OLD")
560 && !existing_item.is_empty()
561 {
562 Some(existing_item.clone())
563 } else {
564 None
565 };
566 check_condition_tracked(
567 &check.condition_expression,
568 &existing_item,
569 &tracker,
570 return_item,
571 )?;
572
573 tracker.check_unused()?;
574 Ok(())
575}
576
577fn check_condition_tracked(
578 expression: &str,
579 item: &Item,
580 tracker: &crate::expressions::TrackedExpressionAttributes,
581 return_item_on_failure: Option<Item>,
582) -> Result<()> {
583 let parsed = crate::expressions::condition::parse(expression)
584 .map_err(DynoxideError::ValidationException)?;
585 let result = crate::expressions::condition::evaluate(&parsed, item, tracker)
586 .map_err(DynoxideError::ValidationException)?;
587 if !result {
588 return Err(DynoxideError::ConditionalCheckFailedException(
589 "The conditional request failed".to_string(),
590 return_item_on_failure,
591 ));
592 }
593 Ok(())
594}
595
596fn get_action_table_and_size(item: &TransactWriteItem) -> (String, usize) {
602 if let Some(ref put) = item.put {
603 (put.table_name.clone(), types::item_size(&put.item))
604 } else if let Some(ref update) = item.update {
605 let key_size = types::item_size(&update.key);
606 let eav_size = update
607 .expression_attribute_values
608 .as_ref()
609 .map(|vals| vals.values().map(|v| v.size()).sum::<usize>())
610 .unwrap_or(0);
611 (update.table_name.clone(), key_size + eav_size)
612 } else if let Some(ref delete) = item.delete {
613 (delete.table_name.clone(), types::item_size(&delete.key))
614 } else if let Some(ref check) = item.condition_check {
615 (check.table_name.clone(), types::item_size(&check.key))
616 } else {
617 (String::new(), 0)
618 }
619}
620
621fn get_item_target(storage: &Storage, item: &TransactWriteItem) -> Result<String> {
623 if let Some(ref put) = item.put {
624 crate::validation::validate_table_name(&put.table_name)?;
625 let meta = helpers::require_table_for_item_op(storage, &put.table_name)?;
626 let key_schema = helpers::parse_key_schema(&meta)?;
627 let (pk, sk) = helpers::extract_key_strings(&put.item, &key_schema)?;
629 Ok(format!("{}#{}#{}", put.table_name, pk, sk))
630 } else if let Some(ref update) = item.update {
631 crate::validation::validate_table_name(&update.table_name)?;
632 let meta = helpers::require_table_for_item_op(storage, &update.table_name)?;
633 let key_schema = helpers::parse_key_schema(&meta)?;
634 let (pk, sk) = helpers::extract_key_strings(&update.key, &key_schema)?;
636 Ok(format!("{}#{}#{}", update.table_name, pk, sk))
637 } else if let Some(ref delete) = item.delete {
638 crate::validation::validate_table_name(&delete.table_name)?;
639 let meta = helpers::require_table_for_item_op(storage, &delete.table_name)?;
640 let key_schema = helpers::parse_key_schema(&meta)?;
641 let (pk, sk) = helpers::extract_key_strings(&delete.key, &key_schema)?;
643 Ok(format!("{}#{}#{}", delete.table_name, pk, sk))
644 } else if let Some(ref check) = item.condition_check {
645 crate::validation::validate_table_name(&check.table_name)?;
646 let meta = helpers::require_table_for_item_op(storage, &check.table_name)?;
647 let key_schema = helpers::parse_key_schema(&meta)?;
648 let (pk, sk) = helpers::extract_key_strings(&check.key, &key_schema)?;
650 Ok(format!("{}#{}#{}", check.table_name, pk, sk))
651 } else {
652 Err(DynoxideError::ValidationException(
653 "TransactItem must contain exactly one action".to_string(),
654 ))
655 }
656}