Skip to main content

dynoxide/actions/
transact_write_items.rs

1use crate::actions::helpers;
2use crate::errors::{CancellationReason, DynoxideError, Result};
3use crate::storage_backend::StorageBackend;
4use crate::types::{self, AttributeValue, Item};
5use serde::{Deserialize, Serialize};
6use std::collections::{HashMap, HashSet};
7
8#[derive(Debug, Clone, Default, Deserialize, Serialize)]
9pub struct TransactWriteItemsRequest {
10    #[serde(rename = "TransactItems")]
11    pub transact_items: Vec<TransactWriteItem>,
12    #[serde(rename = "ClientRequestToken", default)]
13    pub client_request_token: Option<String>,
14    #[serde(rename = "ReturnConsumedCapacity", default)]
15    pub return_consumed_capacity: Option<String>,
16    #[serde(rename = "ReturnItemCollectionMetrics", default)]
17    pub return_item_collection_metrics: Option<String>,
18}
19
20#[derive(Debug, Clone, Default, Deserialize, Serialize)]
21pub struct TransactWriteItem {
22    #[serde(rename = "Put", default)]
23    pub put: Option<TransactPut>,
24    #[serde(rename = "Update", default)]
25    pub update: Option<TransactUpdate>,
26    #[serde(rename = "Delete", default)]
27    pub delete: Option<TransactDelete>,
28    #[serde(rename = "ConditionCheck", default)]
29    pub condition_check: Option<TransactConditionCheck>,
30}
31
32#[derive(Debug, Clone, Default, Deserialize, Serialize)]
33pub struct TransactPut {
34    #[serde(rename = "TableName")]
35    pub table_name: String,
36    #[serde(rename = "Item")]
37    pub item: Item,
38    #[serde(rename = "ConditionExpression", default)]
39    pub condition_expression: Option<String>,
40    #[serde(rename = "ExpressionAttributeNames", default)]
41    pub expression_attribute_names: Option<HashMap<String, String>>,
42    #[serde(rename = "ExpressionAttributeValues", default)]
43    pub expression_attribute_values: Option<HashMap<String, AttributeValue>>,
44    #[serde(rename = "ReturnValuesOnConditionCheckFailure", default)]
45    pub return_values_on_condition_check_failure: Option<String>,
46}
47
48#[derive(Debug, Clone, Default, Deserialize, Serialize)]
49pub struct TransactUpdate {
50    #[serde(rename = "TableName")]
51    pub table_name: String,
52    #[serde(rename = "Key")]
53    pub key: HashMap<String, AttributeValue>,
54    #[serde(rename = "UpdateExpression")]
55    pub update_expression: String,
56    #[serde(rename = "ConditionExpression", default)]
57    pub condition_expression: Option<String>,
58    #[serde(rename = "ExpressionAttributeNames", default)]
59    pub expression_attribute_names: Option<HashMap<String, String>>,
60    #[serde(rename = "ExpressionAttributeValues", default)]
61    pub expression_attribute_values: Option<HashMap<String, AttributeValue>>,
62    #[serde(rename = "ReturnValuesOnConditionCheckFailure", default)]
63    pub return_values_on_condition_check_failure: Option<String>,
64}
65
66#[derive(Debug, Clone, Default, Deserialize, Serialize)]
67pub struct TransactDelete {
68    #[serde(rename = "TableName")]
69    pub table_name: String,
70    #[serde(rename = "Key")]
71    pub key: HashMap<String, AttributeValue>,
72    #[serde(rename = "ConditionExpression", default)]
73    pub condition_expression: Option<String>,
74    #[serde(rename = "ExpressionAttributeNames", default)]
75    pub expression_attribute_names: Option<HashMap<String, String>>,
76    #[serde(rename = "ExpressionAttributeValues", default)]
77    pub expression_attribute_values: Option<HashMap<String, AttributeValue>>,
78    #[serde(rename = "ReturnValuesOnConditionCheckFailure", default)]
79    pub return_values_on_condition_check_failure: Option<String>,
80}
81
82#[derive(Debug, Clone, Default, Deserialize, Serialize)]
83pub struct TransactConditionCheck {
84    #[serde(rename = "TableName")]
85    pub table_name: String,
86    #[serde(rename = "Key")]
87    pub key: HashMap<String, AttributeValue>,
88    #[serde(rename = "ConditionExpression")]
89    pub condition_expression: String,
90    #[serde(rename = "ExpressionAttributeNames", default)]
91    pub expression_attribute_names: Option<HashMap<String, String>>,
92    #[serde(rename = "ExpressionAttributeValues", default)]
93    pub expression_attribute_values: Option<HashMap<String, AttributeValue>>,
94    #[serde(rename = "ReturnValuesOnConditionCheckFailure", default)]
95    pub return_values_on_condition_check_failure: Option<String>,
96}
97
98#[derive(Debug, Clone, Default, Serialize)]
99pub struct TransactWriteItemsResponse {
100    #[serde(rename = "ConsumedCapacity", skip_serializing_if = "Option::is_none")]
101    pub consumed_capacity: Option<Vec<crate::types::ConsumedCapacity>>,
102    /// Item collection metrics per table. Currently always `None` — full metrics
103    /// computation for transactional writes is deferred to a future release.
104    #[serde(
105        rename = "ItemCollectionMetrics",
106        skip_serializing_if = "Option::is_none"
107    )]
108    pub item_collection_metrics: Option<HashMap<String, Vec<crate::types::ItemCollectionMetrics>>>,
109}
110
111pub async fn execute<S: StorageBackend>(
112    storage: &S,
113    request: TransactWriteItemsRequest,
114) -> Result<TransactWriteItemsResponse> {
115    let items = &request.transact_items;
116
117    // Validate: at least 1 action
118    if items.is_empty() {
119        return Err(DynoxideError::ValidationException(
120            "1 validation error detected: Value '[]' at 'transactItems' failed to satisfy constraint: Member must have length greater than or equal to 1".to_string(),
121        ));
122    }
123
124    // Validate: up to 100 actions.
125    // AWS surfaces this as the standard "1 validation error detected" envelope
126    // around `Value '[<dump>]' at 'transactItems'`. The conformance suite
127    // anchors a regex on the envelope and constraint phrase but leaves the
128    // dump body unconstrained.
129    if items.len() > 100 {
130        let dump = format!("{items:?}");
131        return Err(DynoxideError::ValidationException(format!(
132            "1 validation error detected: Value '[{dump}]' at 'transactItems' failed to satisfy constraint: Member must have length less than or equal to 100"
133        )));
134    }
135
136    // Validate: no duplicate item targets. A key that can't be stringified (non-scalar
137    // or missing) is skipped here and reported by the in-loop validation instead (#95).
138    let mut seen_targets = HashSet::new();
139    for item in items {
140        if let Some(target) = get_item_target(storage, item).await? {
141            if !seen_targets.insert(target) {
142                return Err(DynoxideError::ValidationException(
143                    "Transaction request cannot include multiple operations on one item"
144                        .to_string(),
145                ));
146            }
147        }
148    }
149
150    // Validate: aggregate item size must not exceed 4MB
151    let total_size: usize = items.iter().map(|i| get_action_table_and_size(i).1).sum();
152    if total_size > 4 * 1024 * 1024 {
153        return Err(DynoxideError::ValidationException(
154            "Collection size of items exceeded, which can also be caused by the aggregate size of the items in the transaction exceeding the 4MB limit".to_string(),
155        ));
156    }
157
158    // All actions run inside one SQLite transaction (all-or-nothing).
159    helpers::with_write_transaction(storage, execute_within_transaction(storage, items)).await?;
160
161    // Build consumed capacity per table
162    let consumed_capacity = if matches!(
163        request.return_consumed_capacity.as_deref(),
164        Some("TOTAL") | Some("INDEXES")
165    ) {
166        // AWS charges 2 WCU per item for a transactional write: round each item
167        // up to whole write units first, then apply the 2x factor, then sum per
168        // table. Aggregating sizes before rounding would undercharge items that
169        // straddle a 1KB boundary.
170        let mut table_units: HashMap<String, f64> = HashMap::new();
171        for item in items {
172            let (table, size) = get_action_table_and_size(item);
173            *table_units.entry(table).or_default() += crate::types::TRANSACTIONAL_CAPACITY_FACTOR
174                * crate::types::write_capacity_units(size);
175        }
176        let caps: Vec<_> = table_units
177            .iter()
178            .filter_map(|(table, &units)| {
179                crate::types::transactional_write_capacity(
180                    table,
181                    units,
182                    &request.return_consumed_capacity,
183                )
184            })
185            .collect();
186        Some(caps)
187    } else {
188        None
189    };
190    Ok(TransactWriteItemsResponse {
191        consumed_capacity,
192        item_collection_metrics: None,
193    })
194}
195
196async fn execute_within_transaction<S: StorageBackend>(
197    storage: &S,
198    items: &[TransactWriteItem],
199) -> Result<()> {
200    let mut cancellation_reasons: Vec<CancellationReason> = Vec::with_capacity(items.len());
201    let mut has_failure = false;
202
203    for item in items {
204        let reason = execute_single_action(storage, item).await;
205        match reason {
206            Ok(()) => {
207                cancellation_reasons.push(CancellationReason {
208                    code: "None".to_string(),
209                    message: None,
210                    item: None,
211                });
212            }
213            Err(e) => {
214                // An empty-value key (empty string or empty binary) surfaces top-level:
215                // returning here rolls the transaction back. Other errors become
216                // cancellation reasons below (#95).
217                if matches!(e, DynoxideError::KeyEmptyValueValidation(_)) {
218                    return Err(e);
219                }
220                has_failure = true;
221                let message = Some(e.to_string());
222                let (code, item) = match e {
223                    DynoxideError::ConditionalCheckFailedException(_, item) => {
224                        ("ConditionalCheckFailed".to_string(), item)
225                    }
226                    DynoxideError::ValidationException(_) => ("ValidationError".to_string(), None),
227                    _ => ("InternalError".to_string(), None),
228                };
229                cancellation_reasons.push(CancellationReason {
230                    code,
231                    message,
232                    item,
233                });
234            }
235        }
236    }
237
238    if has_failure {
239        let codes: Vec<&str> = cancellation_reasons
240            .iter()
241            .map(|r| r.code.as_str())
242            .collect();
243        let message = format!(
244            "Transaction cancelled, please refer cancellation reasons for specific reasons [{}]",
245            codes.join(", ")
246        );
247        return Err(DynoxideError::TransactionCanceledException(
248            message,
249            cancellation_reasons,
250        ));
251    }
252
253    Ok(())
254}
255
256async fn execute_single_action<S: StorageBackend>(
257    storage: &S,
258    item: &TransactWriteItem,
259) -> Result<()> {
260    if let Some(ref put) = item.put {
261        execute_put(storage, put).await
262    } else if let Some(ref update) = item.update {
263        execute_update(storage, update).await
264    } else if let Some(ref delete) = item.delete {
265        execute_delete(storage, delete).await
266    } else if let Some(ref check) = item.condition_check {
267        execute_condition_check(storage, check).await
268    } else {
269        Err(DynoxideError::ValidationException(
270            "TransactItem must contain exactly one of Put, Update, Delete, or ConditionCheck"
271                .to_string(),
272        ))
273    }
274}
275
276/// Reject any ExpressionAttributeValue that nests deeper than DynamoDB allows.
277/// TransactWriteItems does not route through the shared expression-param helper, so
278/// the per-value nesting check is applied here for each sub-action's values.
279fn validate_eav_nesting(values: &Option<HashMap<String, AttributeValue>>) -> Result<()> {
280    if let Some(map) = values {
281        for value in map.values() {
282            crate::validation::validate_nesting_depth(value)?;
283        }
284    }
285    Ok(())
286}
287
288async fn execute_put<S: StorageBackend>(storage: &S, put: &TransactPut) -> Result<()> {
289    crate::validation::validate_table_name(&put.table_name)?;
290    let meta = helpers::require_table_for_item_op(storage, &put.table_name).await?;
291    let key_schema = helpers::parse_key_schema(&meta)?;
292
293    helpers::validate_item_keys(&put.item, &key_schema, &meta)?;
294    crate::validation::validate_item_attribute_values(&put.item)?;
295
296    // Deduplicate sets - need a mutable copy since put is borrowed immutably
297    let mut item = put.item.clone();
298    crate::validation::normalize_item_sets(&mut item);
299
300    let size = types::item_size(&item);
301    if size > types::MAX_ITEM_SIZE {
302        return Err(DynoxideError::ValidationException(
303            "Item size has exceeded the maximum allowed size".to_string(),
304        ));
305    }
306
307    // TODO: validation must precede this call -- if reaching this line, caller has already validated keys.
308    let (pk, sk) = helpers::extract_key_strings(&item, &key_schema)?;
309
310    validate_eav_nesting(&put.expression_attribute_values)?;
311
312    let tracker = crate::expressions::TrackedExpressionAttributes::new(
313        &put.expression_attribute_names,
314        &put.expression_attribute_values,
315    );
316
317    // Pre-register references statically before runtime evaluation
318    if let Some(ref cond_expr) = put.condition_expression {
319        if let Ok(parsed) = crate::expressions::condition::parse(cond_expr) {
320            tracker.track_condition_expr(&parsed);
321        }
322    }
323
324    // Evaluate condition if present
325    if let Some(ref cond_expr) = put.condition_expression {
326        let existing_json = storage.get_item(&put.table_name, &pk, &sk).await?;
327        let existing_item: Item = existing_json
328            .as_ref()
329            .and_then(|j| serde_json::from_str(j).ok())
330            .unwrap_or_default();
331
332        let return_item = if put.return_values_on_condition_check_failure.as_deref()
333            == Some("ALL_OLD")
334            && !existing_item.is_empty()
335        {
336            Some(existing_item.clone())
337        } else {
338            None
339        };
340        check_condition_tracked(cond_expr, &existing_item, &tracker, return_item)?;
341    }
342
343    tracker.check_unused()?;
344
345    let item_json = serde_json::to_string(&item)
346        .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
347    let hash_prefix = item
348        .get(&key_schema.partition_key)
349        .map(crate::storage::compute_hash_prefix)
350        .unwrap_or_default();
351    let old_json = storage
352        .put_item_with_hash(&put.table_name, &pk, &sk, &item_json, size, &hash_prefix)
353        .await?;
354
355    let _ = super::gsi::maintain_gsis_after_write(
356        storage,
357        &put.table_name,
358        &meta,
359        &pk,
360        &sk,
361        &item,
362        &key_schema.partition_key,
363        key_schema.sort_key.as_deref(),
364    )
365    .await?;
366
367    super::lsi::maintain_lsis_after_write(
368        storage,
369        &put.table_name,
370        &meta,
371        &pk,
372        &sk,
373        &item,
374        &key_schema.partition_key,
375        key_schema.sort_key.as_deref(),
376    )
377    .await?;
378
379    // Record stream event
380    let old_item: Option<Item> = old_json.and_then(|j| serde_json::from_str(&j).ok());
381    crate::streams::record_stream_event(storage, &meta, old_item.as_ref(), Some(&item)).await?;
382
383    Ok(())
384}
385
386async fn execute_update<S: StorageBackend>(storage: &S, update: &TransactUpdate) -> Result<()> {
387    crate::validation::validate_table_name(&update.table_name)?;
388    let meta = helpers::require_table_for_item_op(storage, &update.table_name).await?;
389    let key_schema = helpers::parse_key_schema(&meta)?;
390
391    helpers::validate_key_only(&update.key, &key_schema)?;
392    // TODO: validation must precede this call -- if reaching this line, caller has already validated keys.
393    let (pk, sk) = helpers::extract_key_strings(&update.key, &key_schema)?;
394
395    let existing_json = storage.get_item(&update.table_name, &pk, &sk).await?;
396    let existing_item: Item = existing_json
397        .as_ref()
398        .and_then(|j| serde_json::from_str(j).ok())
399        .unwrap_or_default();
400
401    validate_eav_nesting(&update.expression_attribute_values)?;
402
403    let tracker = crate::expressions::TrackedExpressionAttributes::new(
404        &update.expression_attribute_names,
405        &update.expression_attribute_values,
406    );
407
408    // Pre-register references statically before runtime evaluation
409    if let Some(ref cond_expr) = update.condition_expression {
410        if let Ok(parsed) = crate::expressions::condition::parse(cond_expr) {
411            tracker.track_condition_expr(&parsed);
412        }
413    }
414    if let Ok(parsed) = crate::expressions::update::parse(&update.update_expression) {
415        tracker.track_update_expr(&parsed);
416    }
417
418    // Evaluate condition against the original existing item BEFORE populating
419    // key attributes for upsert. Otherwise attribute_exists(PK) would always
420    // pass because the key was pre-populated.
421    if let Some(ref cond_expr) = update.condition_expression {
422        let return_item = if update.return_values_on_condition_check_failure.as_deref()
423            == Some("ALL_OLD")
424            && existing_json.is_some()
425        {
426            Some(existing_item.clone())
427        } else {
428            None
429        };
430        check_condition_tracked(cond_expr, &existing_item, &tracker, return_item)?;
431    }
432
433    // Build the mutable item for the update expression.
434    // If new item (upsert), populate key attrs.
435    let mut item = existing_item;
436    if existing_json.is_none() {
437        for (k, v) in &update.key {
438            item.insert(k.clone(), v.clone());
439        }
440    }
441    let before_item = item.clone();
442
443    // Apply update expression
444    let parsed = crate::expressions::update::parse(&update.update_expression)
445        .map_err(DynoxideError::ValidationException)?;
446    crate::expressions::update::apply(&mut item, &parsed, &tracker)
447        .map_err(DynoxideError::ValidationException)?;
448
449    tracker.check_unused()?;
450
451    // Validate attribute values after update expression applied
452    crate::validation::validate_item_attribute_values(&item)?;
453    crate::validation::normalize_item_sets(&mut item);
454
455    // Reject an index key this update set to an invalid value (see helpers).
456    helpers::validate_updated_index_keys(&before_item, &item, &meta)?;
457
458    let size = types::item_size(&item);
459    if size > types::MAX_ITEM_SIZE {
460        return Err(DynoxideError::ValidationException(
461            "Item size has exceeded the maximum allowed size".to_string(),
462        ));
463    }
464
465    // Save old item reference for streams
466    let old_for_stream = existing_json.clone();
467
468    let item_json = serde_json::to_string(&item)
469        .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
470    let hash_prefix = update
471        .key
472        .get(&key_schema.partition_key)
473        .map(crate::storage::compute_hash_prefix)
474        .unwrap_or_default();
475    storage
476        .put_item_with_hash(&update.table_name, &pk, &sk, &item_json, size, &hash_prefix)
477        .await?;
478
479    let _ = super::gsi::maintain_gsis_after_write(
480        storage,
481        &update.table_name,
482        &meta,
483        &pk,
484        &sk,
485        &item,
486        &key_schema.partition_key,
487        key_schema.sort_key.as_deref(),
488    )
489    .await?;
490
491    super::lsi::maintain_lsis_after_write(
492        storage,
493        &update.table_name,
494        &meta,
495        &pk,
496        &sk,
497        &item,
498        &key_schema.partition_key,
499        key_schema.sort_key.as_deref(),
500    )
501    .await?;
502
503    // Record stream event
504    let old_item: Option<Item> = old_for_stream.and_then(|j| serde_json::from_str(&j).ok());
505    crate::streams::record_stream_event(storage, &meta, old_item.as_ref(), Some(&item)).await?;
506
507    Ok(())
508}
509
510async fn execute_delete<S: StorageBackend>(storage: &S, delete: &TransactDelete) -> Result<()> {
511    crate::validation::validate_table_name(&delete.table_name)?;
512    let meta = helpers::require_table_for_item_op(storage, &delete.table_name).await?;
513    let key_schema = helpers::parse_key_schema(&meta)?;
514
515    helpers::validate_key_only(&delete.key, &key_schema)?;
516    // TODO: validation must precede this call -- if reaching this line, caller has already validated keys.
517    let (pk, sk) = helpers::extract_key_strings(&delete.key, &key_schema)?;
518
519    validate_eav_nesting(&delete.expression_attribute_values)?;
520
521    let tracker = crate::expressions::TrackedExpressionAttributes::new(
522        &delete.expression_attribute_names,
523        &delete.expression_attribute_values,
524    );
525
526    // Pre-register references statically before runtime evaluation
527    if let Some(ref cond_expr) = delete.condition_expression {
528        if let Ok(parsed) = crate::expressions::condition::parse(cond_expr) {
529            tracker.track_condition_expr(&parsed);
530        }
531    }
532
533    // Evaluate condition if present
534    if let Some(ref cond_expr) = delete.condition_expression {
535        let existing_json = storage.get_item(&delete.table_name, &pk, &sk).await?;
536        let existing_item: Item = existing_json
537            .as_ref()
538            .and_then(|j| serde_json::from_str(j).ok())
539            .unwrap_or_default();
540
541        let return_item = if delete.return_values_on_condition_check_failure.as_deref()
542            == Some("ALL_OLD")
543            && !existing_item.is_empty()
544        {
545            Some(existing_item.clone())
546        } else {
547            None
548        };
549        check_condition_tracked(cond_expr, &existing_item, &tracker, return_item)?;
550    }
551
552    tracker.check_unused()?;
553
554    let old_json = storage.delete_item(&delete.table_name, &pk, &sk).await?;
555    let _ = super::gsi::maintain_gsis_after_delete(storage, &delete.table_name, &meta, &pk, &sk)
556        .await?;
557    super::lsi::maintain_lsis_after_delete(storage, &delete.table_name, &meta, &pk, &sk).await?;
558
559    // Record stream event
560    let old_item: Option<Item> = old_json.and_then(|j| serde_json::from_str(&j).ok());
561    if old_item.is_some() {
562        crate::streams::record_stream_event(storage, &meta, old_item.as_ref(), None).await?;
563    }
564
565    Ok(())
566}
567
568async fn execute_condition_check<S: StorageBackend>(
569    storage: &S,
570    check: &TransactConditionCheck,
571) -> Result<()> {
572    crate::validation::validate_table_name(&check.table_name)?;
573    let meta = helpers::require_table_for_item_op(storage, &check.table_name).await?;
574    let key_schema = helpers::parse_key_schema(&meta)?;
575
576    helpers::validate_key_only(&check.key, &key_schema)?;
577    // TODO: validation must precede this call -- if reaching this line, caller has already validated keys.
578    let (pk, sk) = helpers::extract_key_strings(&check.key, &key_schema)?;
579
580    let existing_json = storage.get_item(&check.table_name, &pk, &sk).await?;
581    let existing_item: Item = existing_json
582        .as_ref()
583        .and_then(|j| serde_json::from_str(j).ok())
584        .unwrap_or_default();
585
586    validate_eav_nesting(&check.expression_attribute_values)?;
587
588    let tracker = crate::expressions::TrackedExpressionAttributes::new(
589        &check.expression_attribute_names,
590        &check.expression_attribute_values,
591    );
592
593    // Pre-register references statically before runtime evaluation
594    if let Ok(parsed) = crate::expressions::condition::parse(&check.condition_expression) {
595        tracker.track_condition_expr(&parsed);
596    }
597
598    let return_item = if check.return_values_on_condition_check_failure.as_deref()
599        == Some("ALL_OLD")
600        && !existing_item.is_empty()
601    {
602        Some(existing_item.clone())
603    } else {
604        None
605    };
606    check_condition_tracked(
607        &check.condition_expression,
608        &existing_item,
609        &tracker,
610        return_item,
611    )?;
612
613    tracker.check_unused()?;
614    Ok(())
615}
616
617fn check_condition_tracked(
618    expression: &str,
619    item: &Item,
620    tracker: &crate::expressions::TrackedExpressionAttributes,
621    return_item_on_failure: Option<Item>,
622) -> Result<()> {
623    let parsed = crate::expressions::condition::parse(expression)
624        .map_err(DynoxideError::ValidationException)?;
625    let result = crate::expressions::condition::evaluate(&parsed, item, tracker)
626        .map_err(DynoxideError::ValidationException)?;
627    if !result {
628        return Err(DynoxideError::ConditionalCheckFailedException(
629            "The conditional request failed".to_string(),
630            return_item_on_failure,
631        ));
632    }
633    Ok(())
634}
635
636/// Get table name and estimated item size for an action.
637///
638/// For Put, uses the full item size. For Update, includes both the key size
639/// and the expression attribute values size (a better approximation of the
640/// request payload contribution). For Delete and ConditionCheck, uses key size.
641fn get_action_table_and_size(item: &TransactWriteItem) -> (String, usize) {
642    if let Some(ref put) = item.put {
643        (put.table_name.clone(), types::item_size(&put.item))
644    } else if let Some(ref update) = item.update {
645        let key_size = types::item_size(&update.key);
646        let eav_size = update
647            .expression_attribute_values
648            .as_ref()
649            .map(|vals| vals.values().map(|v| v.size()).sum::<usize>())
650            .unwrap_or(0);
651        (update.table_name.clone(), key_size + eav_size)
652    } else if let Some(ref delete) = item.delete {
653        (delete.table_name.clone(), types::item_size(&delete.key))
654    } else if let Some(ref check) = item.condition_check {
655        (check.table_name.clone(), types::item_size(&check.key))
656    } else {
657        (String::new(), 0)
658    }
659}
660
661/// Compute the dedup target (table + pk + sk) for one action's key source, or `None`
662/// when the key can't be stringified (non-scalar or missing). Table name and existence
663/// are still validated, so a bad name or missing table surfaces up front.
664async fn target_for<S: StorageBackend>(
665    storage: &S,
666    table_name: &str,
667    key_source: &HashMap<String, AttributeValue>,
668) -> Result<Option<String>> {
669    crate::validation::validate_table_name(table_name)?;
670    let meta = helpers::require_table_for_item_op(storage, table_name).await?;
671    let key_schema = helpers::parse_key_schema(&meta)?;
672    match helpers::extract_key_strings(key_source, &key_schema) {
673        Ok((pk, sk)) => Ok(Some(format!("{table_name}#{pk}#{sk}"))),
674        Err(_) => Ok(None),
675    }
676}
677
678/// Get a unique target key (table + pk + sk) for duplicate detection, or `None` when
679/// the action's key can't form one (see [`target_for`]).
680async fn get_item_target<S: StorageBackend>(
681    storage: &S,
682    item: &TransactWriteItem,
683) -> Result<Option<String>> {
684    if let Some(ref put) = item.put {
685        target_for(storage, &put.table_name, &put.item).await
686    } else if let Some(ref update) = item.update {
687        target_for(storage, &update.table_name, &update.key).await
688    } else if let Some(ref delete) = item.delete {
689        target_for(storage, &delete.table_name, &delete.key).await
690    } else if let Some(ref check) = item.condition_check {
691        target_for(storage, &check.table_name, &check.key).await
692    } else {
693        Err(DynoxideError::ValidationException(
694            "TransactItem must contain exactly one action".to_string(),
695        ))
696    }
697}