1use crate::actions::helpers;
2use crate::errors::{CancellationReason, DynoxideError, Result};
3use crate::storage::Storage;
4use crate::types::{self, AttributeValue, Item};
5use serde::{Deserialize, Serialize};
6use std::collections::{HashMap, HashSet};
7
8#[derive(Debug, Clone, Default, Deserialize, Serialize)]
9pub struct TransactWriteItemsRequest {
10 #[serde(rename = "TransactItems")]
11 pub transact_items: Vec<TransactWriteItem>,
12 #[serde(rename = "ClientRequestToken", default)]
13 pub client_request_token: Option<String>,
14 #[serde(rename = "ReturnConsumedCapacity", default)]
15 pub return_consumed_capacity: Option<String>,
16 #[serde(rename = "ReturnItemCollectionMetrics", default)]
17 pub return_item_collection_metrics: Option<String>,
18}
19
20#[derive(Debug, Clone, Default, Deserialize, Serialize)]
21pub struct TransactWriteItem {
22 #[serde(rename = "Put", default)]
23 pub put: Option<TransactPut>,
24 #[serde(rename = "Update", default)]
25 pub update: Option<TransactUpdate>,
26 #[serde(rename = "Delete", default)]
27 pub delete: Option<TransactDelete>,
28 #[serde(rename = "ConditionCheck", default)]
29 pub condition_check: Option<TransactConditionCheck>,
30}
31
32#[derive(Debug, Clone, Default, Deserialize, Serialize)]
33pub struct TransactPut {
34 #[serde(rename = "TableName")]
35 pub table_name: String,
36 #[serde(rename = "Item")]
37 pub item: Item,
38 #[serde(rename = "ConditionExpression", default)]
39 pub condition_expression: Option<String>,
40 #[serde(rename = "ExpressionAttributeNames", default)]
41 pub expression_attribute_names: Option<HashMap<String, String>>,
42 #[serde(rename = "ExpressionAttributeValues", default)]
43 pub expression_attribute_values: Option<HashMap<String, AttributeValue>>,
44 #[serde(rename = "ReturnValuesOnConditionCheckFailure", default)]
45 pub return_values_on_condition_check_failure: Option<String>,
46}
47
48#[derive(Debug, Clone, Default, Deserialize, Serialize)]
49pub struct TransactUpdate {
50 #[serde(rename = "TableName")]
51 pub table_name: String,
52 #[serde(rename = "Key")]
53 pub key: HashMap<String, AttributeValue>,
54 #[serde(rename = "UpdateExpression")]
55 pub update_expression: String,
56 #[serde(rename = "ConditionExpression", default)]
57 pub condition_expression: Option<String>,
58 #[serde(rename = "ExpressionAttributeNames", default)]
59 pub expression_attribute_names: Option<HashMap<String, String>>,
60 #[serde(rename = "ExpressionAttributeValues", default)]
61 pub expression_attribute_values: Option<HashMap<String, AttributeValue>>,
62 #[serde(rename = "ReturnValuesOnConditionCheckFailure", default)]
63 pub return_values_on_condition_check_failure: Option<String>,
64}
65
66#[derive(Debug, Clone, Default, Deserialize, Serialize)]
67pub struct TransactDelete {
68 #[serde(rename = "TableName")]
69 pub table_name: String,
70 #[serde(rename = "Key")]
71 pub key: HashMap<String, AttributeValue>,
72 #[serde(rename = "ConditionExpression", default)]
73 pub condition_expression: Option<String>,
74 #[serde(rename = "ExpressionAttributeNames", default)]
75 pub expression_attribute_names: Option<HashMap<String, String>>,
76 #[serde(rename = "ExpressionAttributeValues", default)]
77 pub expression_attribute_values: Option<HashMap<String, AttributeValue>>,
78 #[serde(rename = "ReturnValuesOnConditionCheckFailure", default)]
79 pub return_values_on_condition_check_failure: Option<String>,
80}
81
82#[derive(Debug, Clone, Default, Deserialize, Serialize)]
83pub struct TransactConditionCheck {
84 #[serde(rename = "TableName")]
85 pub table_name: String,
86 #[serde(rename = "Key")]
87 pub key: HashMap<String, AttributeValue>,
88 #[serde(rename = "ConditionExpression")]
89 pub condition_expression: String,
90 #[serde(rename = "ExpressionAttributeNames", default)]
91 pub expression_attribute_names: Option<HashMap<String, String>>,
92 #[serde(rename = "ExpressionAttributeValues", default)]
93 pub expression_attribute_values: Option<HashMap<String, AttributeValue>>,
94 #[serde(rename = "ReturnValuesOnConditionCheckFailure", default)]
95 pub return_values_on_condition_check_failure: Option<String>,
96}
97
98#[derive(Debug, Clone, Default, Serialize)]
99pub struct TransactWriteItemsResponse {
100 #[serde(rename = "ConsumedCapacity", skip_serializing_if = "Option::is_none")]
101 pub consumed_capacity: Option<Vec<crate::types::ConsumedCapacity>>,
102 #[serde(
105 rename = "ItemCollectionMetrics",
106 skip_serializing_if = "Option::is_none"
107 )]
108 pub item_collection_metrics: Option<HashMap<String, Vec<crate::types::ItemCollectionMetrics>>>,
109}
110
111pub fn execute(
112 storage: &Storage,
113 request: TransactWriteItemsRequest,
114) -> Result<TransactWriteItemsResponse> {
115 let items = &request.transact_items;
116
117 if items.is_empty() {
119 return Err(DynoxideError::ValidationException(
120 "1 validation error detected: Value '[]' at 'transactItems' failed to satisfy constraint: Member must have length greater than or equal to 1".to_string(),
121 ));
122 }
123
124 if items.len() > 100 {
126 return Err(DynoxideError::ValidationException(
127 "Member must have length less than or equal to 100".to_string(),
128 ));
129 }
130
131 let mut seen_targets = HashSet::new();
133 for item in items {
134 let target = get_item_target(storage, item)?;
135 if !seen_targets.insert(target) {
136 return Err(DynoxideError::ValidationException(
137 "Transaction request cannot include multiple operations on one item".to_string(),
138 ));
139 }
140 }
141
142 let total_size: usize = items.iter().map(|i| get_action_table_and_size(i).1).sum();
144 if total_size > 4 * 1024 * 1024 {
145 return Err(DynoxideError::ValidationException(
146 "Collection size of items exceeded, which can also be caused by the aggregate size of the items in the transaction exceeding the 4MB limit".to_string(),
147 ));
148 }
149
150 storage.begin_transaction()?;
152
153 let result = execute_within_transaction(storage, items);
154
155 match result {
156 Ok(()) => {
157 storage.commit()?;
158 let consumed_capacity = if matches!(
160 request.return_consumed_capacity.as_deref(),
161 Some("TOTAL") | Some("INDEXES")
162 ) {
163 let mut table_sizes: HashMap<String, usize> = HashMap::new();
164 for item in items {
165 let (table, size) = get_action_table_and_size(item);
166 *table_sizes.entry(table).or_default() += size;
167 }
168 let caps: Vec<_> = table_sizes
169 .iter()
170 .filter_map(|(table, &size)| {
171 crate::types::consumed_capacity(
172 table,
173 crate::types::write_capacity_units(size),
174 &request.return_consumed_capacity,
175 )
176 })
177 .collect();
178 Some(caps)
179 } else {
180 None
181 };
182 Ok(TransactWriteItemsResponse {
183 consumed_capacity,
184 item_collection_metrics: None,
185 })
186 }
187 Err(e) => {
188 if let Err(rb_err) = storage.rollback() {
189 return Err(DynoxideError::InternalServerError(format!(
190 "Transaction failed ({e}) and rollback also failed ({rb_err})"
191 )));
192 }
193 Err(e)
194 }
195 }
196}
197
198fn execute_within_transaction(storage: &Storage, items: &[TransactWriteItem]) -> Result<()> {
199 let mut cancellation_reasons: Vec<CancellationReason> = Vec::with_capacity(items.len());
200 let mut has_failure = false;
201
202 for item in items {
203 let reason = execute_single_action(storage, item);
204 match reason {
205 Ok(()) => {
206 cancellation_reasons.push(CancellationReason {
207 code: "None".to_string(),
208 message: None,
209 item: None,
210 });
211 }
212 Err(e) => {
213 has_failure = true;
214 let message = Some(e.to_string());
215 let (code, item) = match e {
216 DynoxideError::ConditionalCheckFailedException(_, item) => {
217 ("ConditionalCheckFailed".to_string(), item)
218 }
219 DynoxideError::ValidationException(_) => ("ValidationError".to_string(), None),
220 _ => ("InternalError".to_string(), None),
221 };
222 cancellation_reasons.push(CancellationReason {
223 code,
224 message,
225 item,
226 });
227 }
228 }
229 }
230
231 if has_failure {
232 let codes: Vec<&str> = cancellation_reasons
233 .iter()
234 .map(|r| r.code.as_str())
235 .collect();
236 let message = format!(
237 "Transaction cancelled, please refer cancellation reasons for specific reasons [{}]",
238 codes.join(", ")
239 );
240 return Err(DynoxideError::TransactionCanceledException(
241 message,
242 cancellation_reasons,
243 ));
244 }
245
246 Ok(())
247}
248
249fn execute_single_action(storage: &Storage, item: &TransactWriteItem) -> Result<()> {
250 if let Some(ref put) = item.put {
251 execute_put(storage, put)
252 } else if let Some(ref update) = item.update {
253 execute_update(storage, update)
254 } else if let Some(ref delete) = item.delete {
255 execute_delete(storage, delete)
256 } else if let Some(ref check) = item.condition_check {
257 execute_condition_check(storage, check)
258 } else {
259 Err(DynoxideError::ValidationException(
260 "TransactItem must contain exactly one of Put, Update, Delete, or ConditionCheck"
261 .to_string(),
262 ))
263 }
264}
265
266fn execute_put(storage: &Storage, put: &TransactPut) -> Result<()> {
267 crate::validation::validate_table_name(&put.table_name)?;
268 let meta = helpers::require_table_for_item_op(storage, &put.table_name)?;
269 let key_schema = helpers::parse_key_schema(&meta)?;
270
271 helpers::validate_item_keys(&put.item, &key_schema, &meta)?;
272 crate::validation::validate_item_attribute_values(&put.item)?;
273
274 let mut item = put.item.clone();
276 crate::validation::normalize_item_sets(&mut item);
277
278 let size = types::item_size(&item);
279 if size > types::MAX_ITEM_SIZE {
280 return Err(DynoxideError::ValidationException(
281 "Item size has exceeded the maximum allowed size".to_string(),
282 ));
283 }
284
285 let (pk, sk) = helpers::extract_key_strings(&item, &key_schema)?;
286
287 let tracker = crate::expressions::TrackedExpressionAttributes::new(
288 &put.expression_attribute_names,
289 &put.expression_attribute_values,
290 );
291
292 if let Some(ref cond_expr) = put.condition_expression {
294 if let Ok(parsed) = crate::expressions::condition::parse(cond_expr) {
295 tracker.track_condition_expr(&parsed);
296 }
297 }
298
299 if let Some(ref cond_expr) = put.condition_expression {
301 let existing_json = storage.get_item(&put.table_name, &pk, &sk)?;
302 let existing_item: Item = existing_json
303 .as_ref()
304 .and_then(|j| serde_json::from_str(j).ok())
305 .unwrap_or_default();
306
307 let return_item = if put.return_values_on_condition_check_failure.as_deref()
308 == Some("ALL_OLD")
309 && !existing_item.is_empty()
310 {
311 Some(existing_item.clone())
312 } else {
313 None
314 };
315 check_condition_tracked(cond_expr, &existing_item, &tracker, return_item)?;
316 }
317
318 tracker.check_unused()?;
319
320 let item_json = serde_json::to_string(&item)
321 .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
322 let hash_prefix = item
323 .get(&key_schema.partition_key)
324 .map(crate::storage::compute_hash_prefix)
325 .unwrap_or_default();
326 let old_json =
327 storage.put_item_with_hash(&put.table_name, &pk, &sk, &item_json, size, &hash_prefix)?;
328
329 let _ = super::gsi::maintain_gsis_after_write(
330 storage,
331 &put.table_name,
332 &meta,
333 &pk,
334 &sk,
335 &item,
336 &key_schema.partition_key,
337 key_schema.sort_key.as_deref(),
338 )?;
339
340 super::lsi::maintain_lsis_after_write(
341 storage,
342 &put.table_name,
343 &meta,
344 &pk,
345 &sk,
346 &item,
347 &key_schema.partition_key,
348 key_schema.sort_key.as_deref(),
349 )?;
350
351 let old_item: Option<Item> = old_json.and_then(|j| serde_json::from_str(&j).ok());
353 crate::streams::record_stream_event(storage, &meta, old_item.as_ref(), Some(&item))?;
354
355 Ok(())
356}
357
358fn execute_update(storage: &Storage, update: &TransactUpdate) -> Result<()> {
359 crate::validation::validate_table_name(&update.table_name)?;
360 let meta = helpers::require_table_for_item_op(storage, &update.table_name)?;
361 let key_schema = helpers::parse_key_schema(&meta)?;
362
363 helpers::validate_key_only(&update.key, &key_schema)?;
364 let (pk, sk) = helpers::extract_key_strings(&update.key, &key_schema)?;
365
366 let existing_json = storage.get_item(&update.table_name, &pk, &sk)?;
367 let existing_item: Item = existing_json
368 .as_ref()
369 .and_then(|j| serde_json::from_str(j).ok())
370 .unwrap_or_default();
371
372 let tracker = crate::expressions::TrackedExpressionAttributes::new(
373 &update.expression_attribute_names,
374 &update.expression_attribute_values,
375 );
376
377 if let Some(ref cond_expr) = update.condition_expression {
379 if let Ok(parsed) = crate::expressions::condition::parse(cond_expr) {
380 tracker.track_condition_expr(&parsed);
381 }
382 }
383 if let Ok(parsed) = crate::expressions::update::parse(&update.update_expression) {
384 tracker.track_update_expr(&parsed);
385 }
386
387 if let Some(ref cond_expr) = update.condition_expression {
391 let return_item = if update.return_values_on_condition_check_failure.as_deref()
392 == Some("ALL_OLD")
393 && existing_json.is_some()
394 {
395 Some(existing_item.clone())
396 } else {
397 None
398 };
399 check_condition_tracked(cond_expr, &existing_item, &tracker, return_item)?;
400 }
401
402 let mut item = existing_item;
405 if existing_json.is_none() {
406 for (k, v) in &update.key {
407 item.insert(k.clone(), v.clone());
408 }
409 }
410
411 let parsed = crate::expressions::update::parse(&update.update_expression)
413 .map_err(DynoxideError::ValidationException)?;
414 crate::expressions::update::apply(&mut item, &parsed, &tracker)
415 .map_err(DynoxideError::ValidationException)?;
416
417 tracker.check_unused()?;
418
419 crate::validation::validate_item_attribute_values(&item)?;
421 crate::validation::normalize_item_sets(&mut item);
422
423 let size = types::item_size(&item);
424 if size > types::MAX_ITEM_SIZE {
425 return Err(DynoxideError::ValidationException(
426 "Item size has exceeded the maximum allowed size".to_string(),
427 ));
428 }
429
430 let old_for_stream = existing_json.clone();
432
433 let item_json = serde_json::to_string(&item)
434 .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
435 let hash_prefix = update
436 .key
437 .get(&key_schema.partition_key)
438 .map(crate::storage::compute_hash_prefix)
439 .unwrap_or_default();
440 storage.put_item_with_hash(&update.table_name, &pk, &sk, &item_json, size, &hash_prefix)?;
441
442 let _ = super::gsi::maintain_gsis_after_write(
443 storage,
444 &update.table_name,
445 &meta,
446 &pk,
447 &sk,
448 &item,
449 &key_schema.partition_key,
450 key_schema.sort_key.as_deref(),
451 )?;
452
453 super::lsi::maintain_lsis_after_write(
454 storage,
455 &update.table_name,
456 &meta,
457 &pk,
458 &sk,
459 &item,
460 &key_schema.partition_key,
461 key_schema.sort_key.as_deref(),
462 )?;
463
464 let old_item: Option<Item> = old_for_stream.and_then(|j| serde_json::from_str(&j).ok());
466 crate::streams::record_stream_event(storage, &meta, old_item.as_ref(), Some(&item))?;
467
468 Ok(())
469}
470
471fn execute_delete(storage: &Storage, delete: &TransactDelete) -> Result<()> {
472 crate::validation::validate_table_name(&delete.table_name)?;
473 let meta = helpers::require_table_for_item_op(storage, &delete.table_name)?;
474 let key_schema = helpers::parse_key_schema(&meta)?;
475
476 helpers::validate_key_only(&delete.key, &key_schema)?;
477 let (pk, sk) = helpers::extract_key_strings(&delete.key, &key_schema)?;
478
479 let tracker = crate::expressions::TrackedExpressionAttributes::new(
480 &delete.expression_attribute_names,
481 &delete.expression_attribute_values,
482 );
483
484 if let Some(ref cond_expr) = delete.condition_expression {
486 if let Ok(parsed) = crate::expressions::condition::parse(cond_expr) {
487 tracker.track_condition_expr(&parsed);
488 }
489 }
490
491 if let Some(ref cond_expr) = delete.condition_expression {
493 let existing_json = storage.get_item(&delete.table_name, &pk, &sk)?;
494 let existing_item: Item = existing_json
495 .as_ref()
496 .and_then(|j| serde_json::from_str(j).ok())
497 .unwrap_or_default();
498
499 let return_item = if delete.return_values_on_condition_check_failure.as_deref()
500 == Some("ALL_OLD")
501 && !existing_item.is_empty()
502 {
503 Some(existing_item.clone())
504 } else {
505 None
506 };
507 check_condition_tracked(cond_expr, &existing_item, &tracker, return_item)?;
508 }
509
510 tracker.check_unused()?;
511
512 let old_json = storage.delete_item(&delete.table_name, &pk, &sk)?;
513 let _ = super::gsi::maintain_gsis_after_delete(storage, &delete.table_name, &meta, &pk, &sk)?;
514 super::lsi::maintain_lsis_after_delete(storage, &delete.table_name, &meta, &pk, &sk)?;
515
516 let old_item: Option<Item> = old_json.and_then(|j| serde_json::from_str(&j).ok());
518 if old_item.is_some() {
519 crate::streams::record_stream_event(storage, &meta, old_item.as_ref(), None)?;
520 }
521
522 Ok(())
523}
524
525fn execute_condition_check(storage: &Storage, check: &TransactConditionCheck) -> Result<()> {
526 crate::validation::validate_table_name(&check.table_name)?;
527 let meta = helpers::require_table_for_item_op(storage, &check.table_name)?;
528 let key_schema = helpers::parse_key_schema(&meta)?;
529
530 helpers::validate_key_only(&check.key, &key_schema)?;
531 let (pk, sk) = helpers::extract_key_strings(&check.key, &key_schema)?;
532
533 let existing_json = storage.get_item(&check.table_name, &pk, &sk)?;
534 let existing_item: Item = existing_json
535 .as_ref()
536 .and_then(|j| serde_json::from_str(j).ok())
537 .unwrap_or_default();
538
539 let tracker = crate::expressions::TrackedExpressionAttributes::new(
540 &check.expression_attribute_names,
541 &check.expression_attribute_values,
542 );
543
544 if let Ok(parsed) = crate::expressions::condition::parse(&check.condition_expression) {
546 tracker.track_condition_expr(&parsed);
547 }
548
549 let return_item = if check.return_values_on_condition_check_failure.as_deref()
550 == Some("ALL_OLD")
551 && !existing_item.is_empty()
552 {
553 Some(existing_item.clone())
554 } else {
555 None
556 };
557 check_condition_tracked(
558 &check.condition_expression,
559 &existing_item,
560 &tracker,
561 return_item,
562 )?;
563
564 tracker.check_unused()?;
565 Ok(())
566}
567
568fn check_condition_tracked(
569 expression: &str,
570 item: &Item,
571 tracker: &crate::expressions::TrackedExpressionAttributes,
572 return_item_on_failure: Option<Item>,
573) -> Result<()> {
574 let parsed = crate::expressions::condition::parse(expression)
575 .map_err(DynoxideError::ValidationException)?;
576 let result = crate::expressions::condition::evaluate(&parsed, item, tracker)
577 .map_err(DynoxideError::ValidationException)?;
578 if !result {
579 return Err(DynoxideError::ConditionalCheckFailedException(
580 "The conditional request failed".to_string(),
581 return_item_on_failure,
582 ));
583 }
584 Ok(())
585}
586
587fn get_action_table_and_size(item: &TransactWriteItem) -> (String, usize) {
593 if let Some(ref put) = item.put {
594 (put.table_name.clone(), types::item_size(&put.item))
595 } else if let Some(ref update) = item.update {
596 let key_size = types::item_size(&update.key);
597 let eav_size = update
598 .expression_attribute_values
599 .as_ref()
600 .map(|vals| vals.values().map(|v| v.size()).sum::<usize>())
601 .unwrap_or(0);
602 (update.table_name.clone(), key_size + eav_size)
603 } else if let Some(ref delete) = item.delete {
604 (delete.table_name.clone(), types::item_size(&delete.key))
605 } else if let Some(ref check) = item.condition_check {
606 (check.table_name.clone(), types::item_size(&check.key))
607 } else {
608 (String::new(), 0)
609 }
610}
611
612fn get_item_target(storage: &Storage, item: &TransactWriteItem) -> Result<String> {
614 if let Some(ref put) = item.put {
615 crate::validation::validate_table_name(&put.table_name)?;
616 let meta = helpers::require_table_for_item_op(storage, &put.table_name)?;
617 let key_schema = helpers::parse_key_schema(&meta)?;
618 let (pk, sk) = helpers::extract_key_strings(&put.item, &key_schema)?;
619 Ok(format!("{}#{}#{}", put.table_name, pk, sk))
620 } else if let Some(ref update) = item.update {
621 crate::validation::validate_table_name(&update.table_name)?;
622 let meta = helpers::require_table_for_item_op(storage, &update.table_name)?;
623 let key_schema = helpers::parse_key_schema(&meta)?;
624 let (pk, sk) = helpers::extract_key_strings(&update.key, &key_schema)?;
625 Ok(format!("{}#{}#{}", update.table_name, pk, sk))
626 } else if let Some(ref delete) = item.delete {
627 crate::validation::validate_table_name(&delete.table_name)?;
628 let meta = helpers::require_table_for_item_op(storage, &delete.table_name)?;
629 let key_schema = helpers::parse_key_schema(&meta)?;
630 let (pk, sk) = helpers::extract_key_strings(&delete.key, &key_schema)?;
631 Ok(format!("{}#{}#{}", delete.table_name, pk, sk))
632 } else if let Some(ref check) = item.condition_check {
633 crate::validation::validate_table_name(&check.table_name)?;
634 let meta = helpers::require_table_for_item_op(storage, &check.table_name)?;
635 let key_schema = helpers::parse_key_schema(&meta)?;
636 let (pk, sk) = helpers::extract_key_strings(&check.key, &key_schema)?;
637 Ok(format!("{}#{}#{}", check.table_name, pk, sk))
638 } else {
639 Err(DynoxideError::ValidationException(
640 "TransactItem must contain exactly one action".to_string(),
641 ))
642 }
643}