Skip to main content

dynoxide/actions/
transact_write_items.rs

1use crate::actions::helpers;
2use crate::errors::{CancellationReason, DynoxideError, Result};
3use crate::storage::Storage;
4use crate::types::{self, AttributeValue, Item};
5use serde::{Deserialize, Serialize};
6use std::collections::{HashMap, HashSet};
7
8#[derive(Debug, Clone, Default, Deserialize, Serialize)]
9pub struct TransactWriteItemsRequest {
10    #[serde(rename = "TransactItems")]
11    pub transact_items: Vec<TransactWriteItem>,
12    #[serde(rename = "ClientRequestToken", default)]
13    pub client_request_token: Option<String>,
14    #[serde(rename = "ReturnConsumedCapacity", default)]
15    pub return_consumed_capacity: Option<String>,
16    #[serde(rename = "ReturnItemCollectionMetrics", default)]
17    pub return_item_collection_metrics: Option<String>,
18}
19
20#[derive(Debug, Clone, Default, Deserialize, Serialize)]
21pub struct TransactWriteItem {
22    #[serde(rename = "Put", default)]
23    pub put: Option<TransactPut>,
24    #[serde(rename = "Update", default)]
25    pub update: Option<TransactUpdate>,
26    #[serde(rename = "Delete", default)]
27    pub delete: Option<TransactDelete>,
28    #[serde(rename = "ConditionCheck", default)]
29    pub condition_check: Option<TransactConditionCheck>,
30}
31
32#[derive(Debug, Clone, Default, Deserialize, Serialize)]
33pub struct TransactPut {
34    #[serde(rename = "TableName")]
35    pub table_name: String,
36    #[serde(rename = "Item")]
37    pub item: Item,
38    #[serde(rename = "ConditionExpression", default)]
39    pub condition_expression: Option<String>,
40    #[serde(rename = "ExpressionAttributeNames", default)]
41    pub expression_attribute_names: Option<HashMap<String, String>>,
42    #[serde(rename = "ExpressionAttributeValues", default)]
43    pub expression_attribute_values: Option<HashMap<String, AttributeValue>>,
44    #[serde(rename = "ReturnValuesOnConditionCheckFailure", default)]
45    pub return_values_on_condition_check_failure: Option<String>,
46}
47
48#[derive(Debug, Clone, Default, Deserialize, Serialize)]
49pub struct TransactUpdate {
50    #[serde(rename = "TableName")]
51    pub table_name: String,
52    #[serde(rename = "Key")]
53    pub key: HashMap<String, AttributeValue>,
54    #[serde(rename = "UpdateExpression")]
55    pub update_expression: String,
56    #[serde(rename = "ConditionExpression", default)]
57    pub condition_expression: Option<String>,
58    #[serde(rename = "ExpressionAttributeNames", default)]
59    pub expression_attribute_names: Option<HashMap<String, String>>,
60    #[serde(rename = "ExpressionAttributeValues", default)]
61    pub expression_attribute_values: Option<HashMap<String, AttributeValue>>,
62    #[serde(rename = "ReturnValuesOnConditionCheckFailure", default)]
63    pub return_values_on_condition_check_failure: Option<String>,
64}
65
66#[derive(Debug, Clone, Default, Deserialize, Serialize)]
67pub struct TransactDelete {
68    #[serde(rename = "TableName")]
69    pub table_name: String,
70    #[serde(rename = "Key")]
71    pub key: HashMap<String, AttributeValue>,
72    #[serde(rename = "ConditionExpression", default)]
73    pub condition_expression: Option<String>,
74    #[serde(rename = "ExpressionAttributeNames", default)]
75    pub expression_attribute_names: Option<HashMap<String, String>>,
76    #[serde(rename = "ExpressionAttributeValues", default)]
77    pub expression_attribute_values: Option<HashMap<String, AttributeValue>>,
78    #[serde(rename = "ReturnValuesOnConditionCheckFailure", default)]
79    pub return_values_on_condition_check_failure: Option<String>,
80}
81
82#[derive(Debug, Clone, Default, Deserialize, Serialize)]
83pub struct TransactConditionCheck {
84    #[serde(rename = "TableName")]
85    pub table_name: String,
86    #[serde(rename = "Key")]
87    pub key: HashMap<String, AttributeValue>,
88    #[serde(rename = "ConditionExpression")]
89    pub condition_expression: String,
90    #[serde(rename = "ExpressionAttributeNames", default)]
91    pub expression_attribute_names: Option<HashMap<String, String>>,
92    #[serde(rename = "ExpressionAttributeValues", default)]
93    pub expression_attribute_values: Option<HashMap<String, AttributeValue>>,
94    #[serde(rename = "ReturnValuesOnConditionCheckFailure", default)]
95    pub return_values_on_condition_check_failure: Option<String>,
96}
97
98#[derive(Debug, Clone, Default, Serialize)]
99pub struct TransactWriteItemsResponse {
100    #[serde(rename = "ConsumedCapacity", skip_serializing_if = "Option::is_none")]
101    pub consumed_capacity: Option<Vec<crate::types::ConsumedCapacity>>,
102    /// Item collection metrics per table. Currently always `None` — full metrics
103    /// computation for transactional writes is deferred to a future release.
104    #[serde(
105        rename = "ItemCollectionMetrics",
106        skip_serializing_if = "Option::is_none"
107    )]
108    pub item_collection_metrics: Option<HashMap<String, Vec<crate::types::ItemCollectionMetrics>>>,
109}
110
111pub fn execute(
112    storage: &Storage,
113    request: TransactWriteItemsRequest,
114) -> Result<TransactWriteItemsResponse> {
115    let items = &request.transact_items;
116
117    // Validate: at least 1 action
118    if items.is_empty() {
119        return Err(DynoxideError::ValidationException(
120            "1 validation error detected: Value '[]' at 'transactItems' failed to satisfy constraint: Member must have length greater than or equal to 1".to_string(),
121        ));
122    }
123
124    // Validate: up to 100 actions
125    if items.len() > 100 {
126        return Err(DynoxideError::ValidationException(
127            "Member must have length less than or equal to 100".to_string(),
128        ));
129    }
130
131    // Validate: no duplicate item targets
132    let mut seen_targets = HashSet::new();
133    for item in items {
134        let target = get_item_target(storage, item)?;
135        if !seen_targets.insert(target) {
136            return Err(DynoxideError::ValidationException(
137                "Transaction request cannot include multiple operations on one item".to_string(),
138            ));
139        }
140    }
141
142    // Validate: aggregate item size must not exceed 4MB
143    let total_size: usize = items.iter().map(|i| get_action_table_and_size(i).1).sum();
144    if total_size > 4 * 1024 * 1024 {
145        return Err(DynoxideError::ValidationException(
146            "Collection size of items exceeded, which can also be caused by the aggregate size of the items in the transaction exceeding the 4MB limit".to_string(),
147        ));
148    }
149
150    // Begin SQLite transaction
151    storage.begin_transaction()?;
152
153    let result = execute_within_transaction(storage, items);
154
155    match result {
156        Ok(()) => {
157            storage.commit()?;
158            // Build consumed capacity per table
159            let consumed_capacity = if matches!(
160                request.return_consumed_capacity.as_deref(),
161                Some("TOTAL") | Some("INDEXES")
162            ) {
163                let mut table_sizes: HashMap<String, usize> = HashMap::new();
164                for item in items {
165                    let (table, size) = get_action_table_and_size(item);
166                    *table_sizes.entry(table).or_default() += size;
167                }
168                let caps: Vec<_> = table_sizes
169                    .iter()
170                    .filter_map(|(table, &size)| {
171                        crate::types::consumed_capacity(
172                            table,
173                            crate::types::write_capacity_units(size),
174                            &request.return_consumed_capacity,
175                        )
176                    })
177                    .collect();
178                Some(caps)
179            } else {
180                None
181            };
182            Ok(TransactWriteItemsResponse {
183                consumed_capacity,
184                item_collection_metrics: None,
185            })
186        }
187        Err(e) => {
188            if let Err(rb_err) = storage.rollback() {
189                return Err(DynoxideError::InternalServerError(format!(
190                    "Transaction failed ({e}) and rollback also failed ({rb_err})"
191                )));
192            }
193            Err(e)
194        }
195    }
196}
197
198fn execute_within_transaction(storage: &Storage, items: &[TransactWriteItem]) -> Result<()> {
199    let mut cancellation_reasons: Vec<CancellationReason> = Vec::with_capacity(items.len());
200    let mut has_failure = false;
201
202    for item in items {
203        let reason = execute_single_action(storage, item);
204        match reason {
205            Ok(()) => {
206                cancellation_reasons.push(CancellationReason {
207                    code: "None".to_string(),
208                    message: None,
209                    item: None,
210                });
211            }
212            Err(e) => {
213                has_failure = true;
214                let message = Some(e.to_string());
215                let (code, item) = match e {
216                    DynoxideError::ConditionalCheckFailedException(_, item) => {
217                        ("ConditionalCheckFailed".to_string(), item)
218                    }
219                    DynoxideError::ValidationException(_) => ("ValidationError".to_string(), None),
220                    _ => ("InternalError".to_string(), None),
221                };
222                cancellation_reasons.push(CancellationReason {
223                    code,
224                    message,
225                    item,
226                });
227            }
228        }
229    }
230
231    if has_failure {
232        let codes: Vec<&str> = cancellation_reasons
233            .iter()
234            .map(|r| r.code.as_str())
235            .collect();
236        let message = format!(
237            "Transaction cancelled, please refer cancellation reasons for specific reasons [{}]",
238            codes.join(", ")
239        );
240        return Err(DynoxideError::TransactionCanceledException(
241            message,
242            cancellation_reasons,
243        ));
244    }
245
246    Ok(())
247}
248
249fn execute_single_action(storage: &Storage, item: &TransactWriteItem) -> Result<()> {
250    if let Some(ref put) = item.put {
251        execute_put(storage, put)
252    } else if let Some(ref update) = item.update {
253        execute_update(storage, update)
254    } else if let Some(ref delete) = item.delete {
255        execute_delete(storage, delete)
256    } else if let Some(ref check) = item.condition_check {
257        execute_condition_check(storage, check)
258    } else {
259        Err(DynoxideError::ValidationException(
260            "TransactItem must contain exactly one of Put, Update, Delete, or ConditionCheck"
261                .to_string(),
262        ))
263    }
264}
265
266fn execute_put(storage: &Storage, put: &TransactPut) -> Result<()> {
267    crate::validation::validate_table_name(&put.table_name)?;
268    let meta = helpers::require_table_for_item_op(storage, &put.table_name)?;
269    let key_schema = helpers::parse_key_schema(&meta)?;
270
271    helpers::validate_item_keys(&put.item, &key_schema, &meta)?;
272    crate::validation::validate_item_attribute_values(&put.item)?;
273
274    // Deduplicate sets - need a mutable copy since put is borrowed immutably
275    let mut item = put.item.clone();
276    crate::validation::normalize_item_sets(&mut item);
277
278    let size = types::item_size(&item);
279    if size > types::MAX_ITEM_SIZE {
280        return Err(DynoxideError::ValidationException(
281            "Item size has exceeded the maximum allowed size".to_string(),
282        ));
283    }
284
285    let (pk, sk) = helpers::extract_key_strings(&item, &key_schema)?;
286
287    let tracker = crate::expressions::TrackedExpressionAttributes::new(
288        &put.expression_attribute_names,
289        &put.expression_attribute_values,
290    );
291
292    // Pre-register references statically before runtime evaluation
293    if let Some(ref cond_expr) = put.condition_expression {
294        if let Ok(parsed) = crate::expressions::condition::parse(cond_expr) {
295            tracker.track_condition_expr(&parsed);
296        }
297    }
298
299    // Evaluate condition if present
300    if let Some(ref cond_expr) = put.condition_expression {
301        let existing_json = storage.get_item(&put.table_name, &pk, &sk)?;
302        let existing_item: Item = existing_json
303            .as_ref()
304            .and_then(|j| serde_json::from_str(j).ok())
305            .unwrap_or_default();
306
307        let return_item = if put.return_values_on_condition_check_failure.as_deref()
308            == Some("ALL_OLD")
309            && !existing_item.is_empty()
310        {
311            Some(existing_item.clone())
312        } else {
313            None
314        };
315        check_condition_tracked(cond_expr, &existing_item, &tracker, return_item)?;
316    }
317
318    tracker.check_unused()?;
319
320    let item_json = serde_json::to_string(&item)
321        .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
322    let hash_prefix = item
323        .get(&key_schema.partition_key)
324        .map(crate::storage::compute_hash_prefix)
325        .unwrap_or_default();
326    let old_json =
327        storage.put_item_with_hash(&put.table_name, &pk, &sk, &item_json, size, &hash_prefix)?;
328
329    let _ = super::gsi::maintain_gsis_after_write(
330        storage,
331        &put.table_name,
332        &meta,
333        &pk,
334        &sk,
335        &item,
336        &key_schema.partition_key,
337        key_schema.sort_key.as_deref(),
338    )?;
339
340    super::lsi::maintain_lsis_after_write(
341        storage,
342        &put.table_name,
343        &meta,
344        &pk,
345        &sk,
346        &item,
347        &key_schema.partition_key,
348        key_schema.sort_key.as_deref(),
349    )?;
350
351    // Record stream event
352    let old_item: Option<Item> = old_json.and_then(|j| serde_json::from_str(&j).ok());
353    crate::streams::record_stream_event(storage, &meta, old_item.as_ref(), Some(&item))?;
354
355    Ok(())
356}
357
358fn execute_update(storage: &Storage, update: &TransactUpdate) -> Result<()> {
359    crate::validation::validate_table_name(&update.table_name)?;
360    let meta = helpers::require_table_for_item_op(storage, &update.table_name)?;
361    let key_schema = helpers::parse_key_schema(&meta)?;
362
363    helpers::validate_key_only(&update.key, &key_schema)?;
364    let (pk, sk) = helpers::extract_key_strings(&update.key, &key_schema)?;
365
366    let existing_json = storage.get_item(&update.table_name, &pk, &sk)?;
367    let mut item: Item = existing_json
368        .as_ref()
369        .and_then(|j| serde_json::from_str(j).ok())
370        .unwrap_or_default();
371
372    // If new item (upsert), populate key attrs
373    if existing_json.is_none() {
374        for (k, v) in &update.key {
375            item.insert(k.clone(), v.clone());
376        }
377    }
378
379    let tracker = crate::expressions::TrackedExpressionAttributes::new(
380        &update.expression_attribute_names,
381        &update.expression_attribute_values,
382    );
383
384    // Pre-register references statically before runtime evaluation
385    if let Some(ref cond_expr) = update.condition_expression {
386        if let Ok(parsed) = crate::expressions::condition::parse(cond_expr) {
387            tracker.track_condition_expr(&parsed);
388        }
389    }
390    if let Ok(parsed) = crate::expressions::update::parse(&update.update_expression) {
391        tracker.track_update_expr(&parsed);
392    }
393
394    // Evaluate condition if present
395    if let Some(ref cond_expr) = update.condition_expression {
396        let return_item = if update.return_values_on_condition_check_failure.as_deref()
397            == Some("ALL_OLD")
398            && existing_json.is_some()
399        {
400            Some(item.clone())
401        } else {
402            None
403        };
404        check_condition_tracked(cond_expr, &item, &tracker, return_item)?;
405    }
406
407    // Apply update expression
408    let parsed = crate::expressions::update::parse(&update.update_expression)
409        .map_err(DynoxideError::ValidationException)?;
410    crate::expressions::update::apply(&mut item, &parsed, &tracker)
411        .map_err(DynoxideError::ValidationException)?;
412
413    tracker.check_unused()?;
414
415    // Validate attribute values after update expression applied
416    crate::validation::validate_item_attribute_values(&item)?;
417    crate::validation::normalize_item_sets(&mut item);
418
419    let size = types::item_size(&item);
420    if size > types::MAX_ITEM_SIZE {
421        return Err(DynoxideError::ValidationException(
422            "Item size has exceeded the maximum allowed size".to_string(),
423        ));
424    }
425
426    // Save old item reference for streams
427    let old_for_stream = existing_json.clone();
428
429    let item_json = serde_json::to_string(&item)
430        .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
431    let hash_prefix = update
432        .key
433        .get(&key_schema.partition_key)
434        .map(crate::storage::compute_hash_prefix)
435        .unwrap_or_default();
436    storage.put_item_with_hash(&update.table_name, &pk, &sk, &item_json, size, &hash_prefix)?;
437
438    let _ = super::gsi::maintain_gsis_after_write(
439        storage,
440        &update.table_name,
441        &meta,
442        &pk,
443        &sk,
444        &item,
445        &key_schema.partition_key,
446        key_schema.sort_key.as_deref(),
447    )?;
448
449    super::lsi::maintain_lsis_after_write(
450        storage,
451        &update.table_name,
452        &meta,
453        &pk,
454        &sk,
455        &item,
456        &key_schema.partition_key,
457        key_schema.sort_key.as_deref(),
458    )?;
459
460    // Record stream event
461    let old_item: Option<Item> = old_for_stream.and_then(|j| serde_json::from_str(&j).ok());
462    crate::streams::record_stream_event(storage, &meta, old_item.as_ref(), Some(&item))?;
463
464    Ok(())
465}
466
467fn execute_delete(storage: &Storage, delete: &TransactDelete) -> Result<()> {
468    crate::validation::validate_table_name(&delete.table_name)?;
469    let meta = helpers::require_table_for_item_op(storage, &delete.table_name)?;
470    let key_schema = helpers::parse_key_schema(&meta)?;
471
472    helpers::validate_key_only(&delete.key, &key_schema)?;
473    let (pk, sk) = helpers::extract_key_strings(&delete.key, &key_schema)?;
474
475    let tracker = crate::expressions::TrackedExpressionAttributes::new(
476        &delete.expression_attribute_names,
477        &delete.expression_attribute_values,
478    );
479
480    // Pre-register references statically before runtime evaluation
481    if let Some(ref cond_expr) = delete.condition_expression {
482        if let Ok(parsed) = crate::expressions::condition::parse(cond_expr) {
483            tracker.track_condition_expr(&parsed);
484        }
485    }
486
487    // Evaluate condition if present
488    if let Some(ref cond_expr) = delete.condition_expression {
489        let existing_json = storage.get_item(&delete.table_name, &pk, &sk)?;
490        let existing_item: Item = existing_json
491            .as_ref()
492            .and_then(|j| serde_json::from_str(j).ok())
493            .unwrap_or_default();
494
495        let return_item = if delete.return_values_on_condition_check_failure.as_deref()
496            == Some("ALL_OLD")
497            && !existing_item.is_empty()
498        {
499            Some(existing_item.clone())
500        } else {
501            None
502        };
503        check_condition_tracked(cond_expr, &existing_item, &tracker, return_item)?;
504    }
505
506    tracker.check_unused()?;
507
508    let old_json = storage.delete_item(&delete.table_name, &pk, &sk)?;
509    let _ = super::gsi::maintain_gsis_after_delete(storage, &delete.table_name, &meta, &pk, &sk)?;
510    super::lsi::maintain_lsis_after_delete(storage, &delete.table_name, &meta, &pk, &sk)?;
511
512    // Record stream event
513    let old_item: Option<Item> = old_json.and_then(|j| serde_json::from_str(&j).ok());
514    if old_item.is_some() {
515        crate::streams::record_stream_event(storage, &meta, old_item.as_ref(), None)?;
516    }
517
518    Ok(())
519}
520
521fn execute_condition_check(storage: &Storage, check: &TransactConditionCheck) -> Result<()> {
522    crate::validation::validate_table_name(&check.table_name)?;
523    let meta = helpers::require_table_for_item_op(storage, &check.table_name)?;
524    let key_schema = helpers::parse_key_schema(&meta)?;
525
526    helpers::validate_key_only(&check.key, &key_schema)?;
527    let (pk, sk) = helpers::extract_key_strings(&check.key, &key_schema)?;
528
529    let existing_json = storage.get_item(&check.table_name, &pk, &sk)?;
530    let existing_item: Item = existing_json
531        .as_ref()
532        .and_then(|j| serde_json::from_str(j).ok())
533        .unwrap_or_default();
534
535    let tracker = crate::expressions::TrackedExpressionAttributes::new(
536        &check.expression_attribute_names,
537        &check.expression_attribute_values,
538    );
539
540    // Pre-register references statically before runtime evaluation
541    if let Ok(parsed) = crate::expressions::condition::parse(&check.condition_expression) {
542        tracker.track_condition_expr(&parsed);
543    }
544
545    let return_item = if check.return_values_on_condition_check_failure.as_deref()
546        == Some("ALL_OLD")
547        && !existing_item.is_empty()
548    {
549        Some(existing_item.clone())
550    } else {
551        None
552    };
553    check_condition_tracked(
554        &check.condition_expression,
555        &existing_item,
556        &tracker,
557        return_item,
558    )?;
559
560    tracker.check_unused()?;
561    Ok(())
562}
563
564fn check_condition_tracked(
565    expression: &str,
566    item: &Item,
567    tracker: &crate::expressions::TrackedExpressionAttributes,
568    return_item_on_failure: Option<Item>,
569) -> Result<()> {
570    let parsed = crate::expressions::condition::parse(expression)
571        .map_err(DynoxideError::ValidationException)?;
572    let result = crate::expressions::condition::evaluate(&parsed, item, tracker)
573        .map_err(DynoxideError::ValidationException)?;
574    if !result {
575        return Err(DynoxideError::ConditionalCheckFailedException(
576            "The conditional request failed".to_string(),
577            return_item_on_failure,
578        ));
579    }
580    Ok(())
581}
582
583/// Get table name and estimated item size for an action.
584///
585/// For Put, uses the full item size. For Update, includes both the key size
586/// and the expression attribute values size (a better approximation of the
587/// request payload contribution). For Delete and ConditionCheck, uses key size.
588fn get_action_table_and_size(item: &TransactWriteItem) -> (String, usize) {
589    if let Some(ref put) = item.put {
590        (put.table_name.clone(), types::item_size(&put.item))
591    } else if let Some(ref update) = item.update {
592        let key_size = types::item_size(&update.key);
593        let eav_size = update
594            .expression_attribute_values
595            .as_ref()
596            .map(|vals| vals.values().map(|v| v.size()).sum::<usize>())
597            .unwrap_or(0);
598        (update.table_name.clone(), key_size + eav_size)
599    } else if let Some(ref delete) = item.delete {
600        (delete.table_name.clone(), types::item_size(&delete.key))
601    } else if let Some(ref check) = item.condition_check {
602        (check.table_name.clone(), types::item_size(&check.key))
603    } else {
604        (String::new(), 0)
605    }
606}
607
608/// Get a unique target key (table + pk + sk) for duplicate detection.
609fn get_item_target(storage: &Storage, item: &TransactWriteItem) -> Result<String> {
610    if let Some(ref put) = item.put {
611        crate::validation::validate_table_name(&put.table_name)?;
612        let meta = helpers::require_table_for_item_op(storage, &put.table_name)?;
613        let key_schema = helpers::parse_key_schema(&meta)?;
614        let (pk, sk) = helpers::extract_key_strings(&put.item, &key_schema)?;
615        Ok(format!("{}#{}#{}", put.table_name, pk, sk))
616    } else if let Some(ref update) = item.update {
617        crate::validation::validate_table_name(&update.table_name)?;
618        let meta = helpers::require_table_for_item_op(storage, &update.table_name)?;
619        let key_schema = helpers::parse_key_schema(&meta)?;
620        let (pk, sk) = helpers::extract_key_strings(&update.key, &key_schema)?;
621        Ok(format!("{}#{}#{}", update.table_name, pk, sk))
622    } else if let Some(ref delete) = item.delete {
623        crate::validation::validate_table_name(&delete.table_name)?;
624        let meta = helpers::require_table_for_item_op(storage, &delete.table_name)?;
625        let key_schema = helpers::parse_key_schema(&meta)?;
626        let (pk, sk) = helpers::extract_key_strings(&delete.key, &key_schema)?;
627        Ok(format!("{}#{}#{}", delete.table_name, pk, sk))
628    } else if let Some(ref check) = item.condition_check {
629        crate::validation::validate_table_name(&check.table_name)?;
630        let meta = helpers::require_table_for_item_op(storage, &check.table_name)?;
631        let key_schema = helpers::parse_key_schema(&meta)?;
632        let (pk, sk) = helpers::extract_key_strings(&check.key, &key_schema)?;
633        Ok(format!("{}#{}#{}", check.table_name, pk, sk))
634    } else {
635        Err(DynoxideError::ValidationException(
636            "TransactItem must contain exactly one action".to_string(),
637        ))
638    }
639}