Skip to main content

dynoxide/actions/
transact_write_items.rs

1use crate::actions::helpers;
2use crate::errors::{CancellationReason, DynoxideError, Result};
3use crate::storage_backend::StorageBackend;
4use crate::types::{self, AttributeValue, Item};
5use serde::{Deserialize, Serialize};
6use std::collections::{HashMap, HashSet};
7
8#[derive(Debug, Clone, Default, Deserialize, Serialize)]
9pub struct TransactWriteItemsRequest {
10    #[serde(rename = "TransactItems")]
11    pub transact_items: Vec<TransactWriteItem>,
12    #[serde(rename = "ClientRequestToken", default)]
13    pub client_request_token: Option<String>,
14    #[serde(rename = "ReturnConsumedCapacity", default)]
15    pub return_consumed_capacity: Option<String>,
16    #[serde(rename = "ReturnItemCollectionMetrics", default)]
17    pub return_item_collection_metrics: Option<String>,
18}
19
20#[derive(Debug, Clone, Default, Deserialize, Serialize)]
21pub struct TransactWriteItem {
22    #[serde(rename = "Put", default)]
23    pub put: Option<TransactPut>,
24    #[serde(rename = "Update", default)]
25    pub update: Option<TransactUpdate>,
26    #[serde(rename = "Delete", default)]
27    pub delete: Option<TransactDelete>,
28    #[serde(rename = "ConditionCheck", default)]
29    pub condition_check: Option<TransactConditionCheck>,
30}
31
32#[derive(Debug, Clone, Default, Deserialize, Serialize)]
33pub struct TransactPut {
34    #[serde(rename = "TableName")]
35    pub table_name: String,
36    #[serde(rename = "Item")]
37    pub item: Item,
38    #[serde(rename = "ConditionExpression", default)]
39    pub condition_expression: Option<String>,
40    #[serde(rename = "ExpressionAttributeNames", default)]
41    pub expression_attribute_names: Option<HashMap<String, String>>,
42    #[serde(rename = "ExpressionAttributeValues", default)]
43    pub expression_attribute_values: Option<HashMap<String, AttributeValue>>,
44    #[serde(rename = "ReturnValuesOnConditionCheckFailure", default)]
45    pub return_values_on_condition_check_failure: Option<String>,
46}
47
48#[derive(Debug, Clone, Default, Deserialize, Serialize)]
49pub struct TransactUpdate {
50    #[serde(rename = "TableName")]
51    pub table_name: String,
52    #[serde(rename = "Key")]
53    pub key: HashMap<String, AttributeValue>,
54    #[serde(rename = "UpdateExpression")]
55    pub update_expression: String,
56    #[serde(rename = "ConditionExpression", default)]
57    pub condition_expression: Option<String>,
58    #[serde(rename = "ExpressionAttributeNames", default)]
59    pub expression_attribute_names: Option<HashMap<String, String>>,
60    #[serde(rename = "ExpressionAttributeValues", default)]
61    pub expression_attribute_values: Option<HashMap<String, AttributeValue>>,
62    #[serde(rename = "ReturnValuesOnConditionCheckFailure", default)]
63    pub return_values_on_condition_check_failure: Option<String>,
64}
65
66#[derive(Debug, Clone, Default, Deserialize, Serialize)]
67pub struct TransactDelete {
68    #[serde(rename = "TableName")]
69    pub table_name: String,
70    #[serde(rename = "Key")]
71    pub key: HashMap<String, AttributeValue>,
72    #[serde(rename = "ConditionExpression", default)]
73    pub condition_expression: Option<String>,
74    #[serde(rename = "ExpressionAttributeNames", default)]
75    pub expression_attribute_names: Option<HashMap<String, String>>,
76    #[serde(rename = "ExpressionAttributeValues", default)]
77    pub expression_attribute_values: Option<HashMap<String, AttributeValue>>,
78    #[serde(rename = "ReturnValuesOnConditionCheckFailure", default)]
79    pub return_values_on_condition_check_failure: Option<String>,
80}
81
82#[derive(Debug, Clone, Default, Deserialize, Serialize)]
83pub struct TransactConditionCheck {
84    #[serde(rename = "TableName")]
85    pub table_name: String,
86    #[serde(rename = "Key")]
87    pub key: HashMap<String, AttributeValue>,
88    #[serde(rename = "ConditionExpression")]
89    pub condition_expression: String,
90    #[serde(rename = "ExpressionAttributeNames", default)]
91    pub expression_attribute_names: Option<HashMap<String, String>>,
92    #[serde(rename = "ExpressionAttributeValues", default)]
93    pub expression_attribute_values: Option<HashMap<String, AttributeValue>>,
94    #[serde(rename = "ReturnValuesOnConditionCheckFailure", default)]
95    pub return_values_on_condition_check_failure: Option<String>,
96}
97
98#[derive(Debug, Clone, Default, Serialize)]
99pub struct TransactWriteItemsResponse {
100    #[serde(rename = "ConsumedCapacity", skip_serializing_if = "Option::is_none")]
101    pub consumed_capacity: Option<Vec<crate::types::ConsumedCapacity>>,
102    /// Item collection metrics per table. Currently always `None` — full metrics
103    /// computation for transactional writes is deferred to a future release.
104    #[serde(
105        rename = "ItemCollectionMetrics",
106        skip_serializing_if = "Option::is_none"
107    )]
108    pub item_collection_metrics: Option<HashMap<String, Vec<crate::types::ItemCollectionMetrics>>>,
109}
110
111pub async fn execute<S: StorageBackend>(
112    storage: &S,
113    request: TransactWriteItemsRequest,
114) -> Result<TransactWriteItemsResponse> {
115    let items = &request.transact_items;
116
117    // Validate: at least 1 action
118    if items.is_empty() {
119        return Err(DynoxideError::ValidationException(
120            "1 validation error detected: Value '[]' at 'transactItems' failed to satisfy constraint: Member must have length greater than or equal to 1".to_string(),
121        ));
122    }
123
124    // Validate: up to 100 actions.
125    // AWS surfaces this as the standard "1 validation error detected" envelope
126    // around `Value '[<dump>]' at 'transactItems'`. The conformance suite
127    // anchors a regex on the envelope and constraint phrase but leaves the
128    // dump body unconstrained.
129    if items.len() > 100 {
130        let dump = format!("{items:?}");
131        return Err(DynoxideError::ValidationException(format!(
132            "1 validation error detected: Value '[{dump}]' at 'transactItems' failed to satisfy constraint: Member must have length less than or equal to 100"
133        )));
134    }
135
136    // Validate: no duplicate item targets. A key that can't be stringified (non-scalar
137    // or missing) is skipped here and reported by the in-loop validation instead (#95).
138    let mut seen_targets = HashSet::new();
139    for item in items {
140        if let Some(target) = get_item_target(storage, item).await? {
141            if !seen_targets.insert(target) {
142                return Err(DynoxideError::ValidationException(
143                    "Transaction request cannot include multiple operations on one item"
144                        .to_string(),
145                ));
146            }
147        }
148    }
149
150    // Validate: aggregate item size must not exceed 4MB
151    let total_size: usize = items.iter().map(|i| get_action_table_and_size(i).1).sum();
152    if total_size > 4 * 1024 * 1024 {
153        return Err(DynoxideError::ValidationException(
154            "Collection size of items exceeded, which can also be caused by the aggregate size of the items in the transaction exceeding the 4MB limit".to_string(),
155        ));
156    }
157
158    // All actions run inside one SQLite transaction (all-or-nothing).
159    helpers::with_write_transaction(storage, execute_within_transaction(storage, items)).await?;
160
161    // Build consumed capacity per table
162    let consumed_capacity = if matches!(
163        request.return_consumed_capacity.as_deref(),
164        Some("TOTAL") | Some("INDEXES")
165    ) {
166        // AWS charges 2 WCU per item for a transactional write: round each item
167        // up to whole write units first, then apply the 2x factor, then sum per
168        // table. Aggregating sizes before rounding would undercharge items that
169        // straddle a 1KB boundary.
170        let mut table_units: HashMap<String, f64> = HashMap::new();
171        for item in items {
172            let (table, size) = get_action_table_and_size(item);
173            *table_units.entry(table).or_default() += crate::types::TRANSACTIONAL_CAPACITY_FACTOR
174                * crate::types::write_capacity_units(size);
175        }
176        let caps: Vec<_> = table_units
177            .iter()
178            .filter_map(|(table, &units)| {
179                crate::types::transactional_write_capacity(
180                    table,
181                    units,
182                    &request.return_consumed_capacity,
183                )
184            })
185            .collect();
186        Some(caps)
187    } else {
188        None
189    };
190    Ok(TransactWriteItemsResponse {
191        consumed_capacity,
192        item_collection_metrics: None,
193    })
194}
195
196async fn execute_within_transaction<S: StorageBackend>(
197    storage: &S,
198    items: &[TransactWriteItem],
199) -> Result<()> {
200    let mut cancellation_reasons: Vec<CancellationReason> = Vec::with_capacity(items.len());
201    let mut has_failure = false;
202
203    for item in items {
204        let reason = execute_single_action(storage, item).await;
205        match reason {
206            Ok(()) => {
207                cancellation_reasons.push(CancellationReason {
208                    code: "None".to_string(),
209                    message: None,
210                    item: None,
211                });
212            }
213            Err(e) => {
214                // An empty-value key (empty string or empty binary) surfaces top-level:
215                // returning here rolls the transaction back. Other errors become
216                // cancellation reasons below (#95).
217                if matches!(e, DynoxideError::KeyEmptyValueValidation(_)) {
218                    return Err(e);
219                }
220                has_failure = true;
221                let message = Some(e.to_string());
222                let (code, item) = match e {
223                    DynoxideError::ConditionalCheckFailedException(_, item) => {
224                        ("ConditionalCheckFailed".to_string(), item)
225                    }
226                    DynoxideError::ValidationException(_) => ("ValidationError".to_string(), None),
227                    _ => ("InternalError".to_string(), None),
228                };
229                cancellation_reasons.push(CancellationReason {
230                    code,
231                    message,
232                    item,
233                });
234            }
235        }
236    }
237
238    if has_failure {
239        let codes: Vec<&str> = cancellation_reasons
240            .iter()
241            .map(|r| r.code.as_str())
242            .collect();
243        let message = format!(
244            "Transaction cancelled, please refer cancellation reasons for specific reasons [{}]",
245            codes.join(", ")
246        );
247        return Err(DynoxideError::TransactionCanceledException(
248            message,
249            cancellation_reasons,
250        ));
251    }
252
253    Ok(())
254}
255
256async fn execute_single_action<S: StorageBackend>(
257    storage: &S,
258    item: &TransactWriteItem,
259) -> Result<()> {
260    if let Some(ref put) = item.put {
261        execute_put(storage, put).await
262    } else if let Some(ref update) = item.update {
263        execute_update(storage, update).await
264    } else if let Some(ref delete) = item.delete {
265        execute_delete(storage, delete).await
266    } else if let Some(ref check) = item.condition_check {
267        execute_condition_check(storage, check).await
268    } else {
269        Err(DynoxideError::ValidationException(
270            "TransactItem must contain exactly one of Put, Update, Delete, or ConditionCheck"
271                .to_string(),
272        ))
273    }
274}
275
276async fn execute_put<S: StorageBackend>(storage: &S, put: &TransactPut) -> Result<()> {
277    crate::validation::validate_table_name(&put.table_name)?;
278    let meta = helpers::require_table_for_item_op(storage, &put.table_name).await?;
279    let key_schema = helpers::parse_key_schema(&meta)?;
280
281    helpers::validate_item_keys(&put.item, &key_schema, &meta)?;
282    crate::validation::validate_item_attribute_values(&put.item)?;
283
284    // Deduplicate sets - need a mutable copy since put is borrowed immutably
285    let mut item = put.item.clone();
286    crate::validation::normalize_item_sets(&mut item);
287
288    let size = types::item_size(&item);
289    if size > types::MAX_ITEM_SIZE {
290        return Err(DynoxideError::ValidationException(
291            "Item size has exceeded the maximum allowed size".to_string(),
292        ));
293    }
294
295    // TODO: validation must precede this call -- if reaching this line, caller has already validated keys.
296    let (pk, sk) = helpers::extract_key_strings(&item, &key_schema)?;
297
298    let tracker = crate::expressions::TrackedExpressionAttributes::new(
299        &put.expression_attribute_names,
300        &put.expression_attribute_values,
301    );
302
303    // Pre-register references statically before runtime evaluation
304    if let Some(ref cond_expr) = put.condition_expression {
305        if let Ok(parsed) = crate::expressions::condition::parse(cond_expr) {
306            tracker.track_condition_expr(&parsed);
307        }
308    }
309
310    // Evaluate condition if present
311    if let Some(ref cond_expr) = put.condition_expression {
312        let existing_json = storage.get_item(&put.table_name, &pk, &sk).await?;
313        let existing_item: Item = existing_json
314            .as_ref()
315            .and_then(|j| serde_json::from_str(j).ok())
316            .unwrap_or_default();
317
318        let return_item = if put.return_values_on_condition_check_failure.as_deref()
319            == Some("ALL_OLD")
320            && !existing_item.is_empty()
321        {
322            Some(existing_item.clone())
323        } else {
324            None
325        };
326        check_condition_tracked(cond_expr, &existing_item, &tracker, return_item)?;
327    }
328
329    tracker.check_unused()?;
330
331    let item_json = serde_json::to_string(&item)
332        .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
333    let hash_prefix = item
334        .get(&key_schema.partition_key)
335        .map(crate::storage::compute_hash_prefix)
336        .unwrap_or_default();
337    let old_json = storage
338        .put_item_with_hash(&put.table_name, &pk, &sk, &item_json, size, &hash_prefix)
339        .await?;
340
341    let _ = super::gsi::maintain_gsis_after_write(
342        storage,
343        &put.table_name,
344        &meta,
345        &pk,
346        &sk,
347        &item,
348        &key_schema.partition_key,
349        key_schema.sort_key.as_deref(),
350    )
351    .await?;
352
353    super::lsi::maintain_lsis_after_write(
354        storage,
355        &put.table_name,
356        &meta,
357        &pk,
358        &sk,
359        &item,
360        &key_schema.partition_key,
361        key_schema.sort_key.as_deref(),
362    )
363    .await?;
364
365    // Record stream event
366    let old_item: Option<Item> = old_json.and_then(|j| serde_json::from_str(&j).ok());
367    crate::streams::record_stream_event(storage, &meta, old_item.as_ref(), Some(&item)).await?;
368
369    Ok(())
370}
371
372async fn execute_update<S: StorageBackend>(storage: &S, update: &TransactUpdate) -> Result<()> {
373    crate::validation::validate_table_name(&update.table_name)?;
374    let meta = helpers::require_table_for_item_op(storage, &update.table_name).await?;
375    let key_schema = helpers::parse_key_schema(&meta)?;
376
377    helpers::validate_key_only(&update.key, &key_schema)?;
378    // TODO: validation must precede this call -- if reaching this line, caller has already validated keys.
379    let (pk, sk) = helpers::extract_key_strings(&update.key, &key_schema)?;
380
381    let existing_json = storage.get_item(&update.table_name, &pk, &sk).await?;
382    let existing_item: Item = existing_json
383        .as_ref()
384        .and_then(|j| serde_json::from_str(j).ok())
385        .unwrap_or_default();
386
387    let tracker = crate::expressions::TrackedExpressionAttributes::new(
388        &update.expression_attribute_names,
389        &update.expression_attribute_values,
390    );
391
392    // Pre-register references statically before runtime evaluation
393    if let Some(ref cond_expr) = update.condition_expression {
394        if let Ok(parsed) = crate::expressions::condition::parse(cond_expr) {
395            tracker.track_condition_expr(&parsed);
396        }
397    }
398    if let Ok(parsed) = crate::expressions::update::parse(&update.update_expression) {
399        tracker.track_update_expr(&parsed);
400    }
401
402    // Evaluate condition against the original existing item BEFORE populating
403    // key attributes for upsert. Otherwise attribute_exists(PK) would always
404    // pass because the key was pre-populated.
405    if let Some(ref cond_expr) = update.condition_expression {
406        let return_item = if update.return_values_on_condition_check_failure.as_deref()
407            == Some("ALL_OLD")
408            && existing_json.is_some()
409        {
410            Some(existing_item.clone())
411        } else {
412            None
413        };
414        check_condition_tracked(cond_expr, &existing_item, &tracker, return_item)?;
415    }
416
417    // Build the mutable item for the update expression.
418    // If new item (upsert), populate key attrs.
419    let mut item = existing_item;
420    if existing_json.is_none() {
421        for (k, v) in &update.key {
422            item.insert(k.clone(), v.clone());
423        }
424    }
425    let before_item = item.clone();
426
427    // Apply update expression
428    let parsed = crate::expressions::update::parse(&update.update_expression)
429        .map_err(DynoxideError::ValidationException)?;
430    crate::expressions::update::apply(&mut item, &parsed, &tracker)
431        .map_err(DynoxideError::ValidationException)?;
432
433    tracker.check_unused()?;
434
435    // Validate attribute values after update expression applied
436    crate::validation::validate_item_attribute_values(&item)?;
437    crate::validation::normalize_item_sets(&mut item);
438
439    // Reject an index key this update set to an invalid value (see helpers).
440    helpers::validate_updated_index_keys(&before_item, &item, &meta)?;
441
442    let size = types::item_size(&item);
443    if size > types::MAX_ITEM_SIZE {
444        return Err(DynoxideError::ValidationException(
445            "Item size has exceeded the maximum allowed size".to_string(),
446        ));
447    }
448
449    // Save old item reference for streams
450    let old_for_stream = existing_json.clone();
451
452    let item_json = serde_json::to_string(&item)
453        .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
454    let hash_prefix = update
455        .key
456        .get(&key_schema.partition_key)
457        .map(crate::storage::compute_hash_prefix)
458        .unwrap_or_default();
459    storage
460        .put_item_with_hash(&update.table_name, &pk, &sk, &item_json, size, &hash_prefix)
461        .await?;
462
463    let _ = super::gsi::maintain_gsis_after_write(
464        storage,
465        &update.table_name,
466        &meta,
467        &pk,
468        &sk,
469        &item,
470        &key_schema.partition_key,
471        key_schema.sort_key.as_deref(),
472    )
473    .await?;
474
475    super::lsi::maintain_lsis_after_write(
476        storage,
477        &update.table_name,
478        &meta,
479        &pk,
480        &sk,
481        &item,
482        &key_schema.partition_key,
483        key_schema.sort_key.as_deref(),
484    )
485    .await?;
486
487    // Record stream event
488    let old_item: Option<Item> = old_for_stream.and_then(|j| serde_json::from_str(&j).ok());
489    crate::streams::record_stream_event(storage, &meta, old_item.as_ref(), Some(&item)).await?;
490
491    Ok(())
492}
493
494async fn execute_delete<S: StorageBackend>(storage: &S, delete: &TransactDelete) -> Result<()> {
495    crate::validation::validate_table_name(&delete.table_name)?;
496    let meta = helpers::require_table_for_item_op(storage, &delete.table_name).await?;
497    let key_schema = helpers::parse_key_schema(&meta)?;
498
499    helpers::validate_key_only(&delete.key, &key_schema)?;
500    // TODO: validation must precede this call -- if reaching this line, caller has already validated keys.
501    let (pk, sk) = helpers::extract_key_strings(&delete.key, &key_schema)?;
502
503    let tracker = crate::expressions::TrackedExpressionAttributes::new(
504        &delete.expression_attribute_names,
505        &delete.expression_attribute_values,
506    );
507
508    // Pre-register references statically before runtime evaluation
509    if let Some(ref cond_expr) = delete.condition_expression {
510        if let Ok(parsed) = crate::expressions::condition::parse(cond_expr) {
511            tracker.track_condition_expr(&parsed);
512        }
513    }
514
515    // Evaluate condition if present
516    if let Some(ref cond_expr) = delete.condition_expression {
517        let existing_json = storage.get_item(&delete.table_name, &pk, &sk).await?;
518        let existing_item: Item = existing_json
519            .as_ref()
520            .and_then(|j| serde_json::from_str(j).ok())
521            .unwrap_or_default();
522
523        let return_item = if delete.return_values_on_condition_check_failure.as_deref()
524            == Some("ALL_OLD")
525            && !existing_item.is_empty()
526        {
527            Some(existing_item.clone())
528        } else {
529            None
530        };
531        check_condition_tracked(cond_expr, &existing_item, &tracker, return_item)?;
532    }
533
534    tracker.check_unused()?;
535
536    let old_json = storage.delete_item(&delete.table_name, &pk, &sk).await?;
537    let _ = super::gsi::maintain_gsis_after_delete(storage, &delete.table_name, &meta, &pk, &sk)
538        .await?;
539    super::lsi::maintain_lsis_after_delete(storage, &delete.table_name, &meta, &pk, &sk).await?;
540
541    // Record stream event
542    let old_item: Option<Item> = old_json.and_then(|j| serde_json::from_str(&j).ok());
543    if old_item.is_some() {
544        crate::streams::record_stream_event(storage, &meta, old_item.as_ref(), None).await?;
545    }
546
547    Ok(())
548}
549
550async fn execute_condition_check<S: StorageBackend>(
551    storage: &S,
552    check: &TransactConditionCheck,
553) -> Result<()> {
554    crate::validation::validate_table_name(&check.table_name)?;
555    let meta = helpers::require_table_for_item_op(storage, &check.table_name).await?;
556    let key_schema = helpers::parse_key_schema(&meta)?;
557
558    helpers::validate_key_only(&check.key, &key_schema)?;
559    // TODO: validation must precede this call -- if reaching this line, caller has already validated keys.
560    let (pk, sk) = helpers::extract_key_strings(&check.key, &key_schema)?;
561
562    let existing_json = storage.get_item(&check.table_name, &pk, &sk).await?;
563    let existing_item: Item = existing_json
564        .as_ref()
565        .and_then(|j| serde_json::from_str(j).ok())
566        .unwrap_or_default();
567
568    let tracker = crate::expressions::TrackedExpressionAttributes::new(
569        &check.expression_attribute_names,
570        &check.expression_attribute_values,
571    );
572
573    // Pre-register references statically before runtime evaluation
574    if let Ok(parsed) = crate::expressions::condition::parse(&check.condition_expression) {
575        tracker.track_condition_expr(&parsed);
576    }
577
578    let return_item = if check.return_values_on_condition_check_failure.as_deref()
579        == Some("ALL_OLD")
580        && !existing_item.is_empty()
581    {
582        Some(existing_item.clone())
583    } else {
584        None
585    };
586    check_condition_tracked(
587        &check.condition_expression,
588        &existing_item,
589        &tracker,
590        return_item,
591    )?;
592
593    tracker.check_unused()?;
594    Ok(())
595}
596
597fn check_condition_tracked(
598    expression: &str,
599    item: &Item,
600    tracker: &crate::expressions::TrackedExpressionAttributes,
601    return_item_on_failure: Option<Item>,
602) -> Result<()> {
603    let parsed = crate::expressions::condition::parse(expression)
604        .map_err(DynoxideError::ValidationException)?;
605    let result = crate::expressions::condition::evaluate(&parsed, item, tracker)
606        .map_err(DynoxideError::ValidationException)?;
607    if !result {
608        return Err(DynoxideError::ConditionalCheckFailedException(
609            "The conditional request failed".to_string(),
610            return_item_on_failure,
611        ));
612    }
613    Ok(())
614}
615
616/// Get table name and estimated item size for an action.
617///
618/// For Put, uses the full item size. For Update, includes both the key size
619/// and the expression attribute values size (a better approximation of the
620/// request payload contribution). For Delete and ConditionCheck, uses key size.
621fn get_action_table_and_size(item: &TransactWriteItem) -> (String, usize) {
622    if let Some(ref put) = item.put {
623        (put.table_name.clone(), types::item_size(&put.item))
624    } else if let Some(ref update) = item.update {
625        let key_size = types::item_size(&update.key);
626        let eav_size = update
627            .expression_attribute_values
628            .as_ref()
629            .map(|vals| vals.values().map(|v| v.size()).sum::<usize>())
630            .unwrap_or(0);
631        (update.table_name.clone(), key_size + eav_size)
632    } else if let Some(ref delete) = item.delete {
633        (delete.table_name.clone(), types::item_size(&delete.key))
634    } else if let Some(ref check) = item.condition_check {
635        (check.table_name.clone(), types::item_size(&check.key))
636    } else {
637        (String::new(), 0)
638    }
639}
640
641/// Compute the dedup target (table + pk + sk) for one action's key source, or `None`
642/// when the key can't be stringified (non-scalar or missing). Table name and existence
643/// are still validated, so a bad name or missing table surfaces up front.
644async fn target_for<S: StorageBackend>(
645    storage: &S,
646    table_name: &str,
647    key_source: &HashMap<String, AttributeValue>,
648) -> Result<Option<String>> {
649    crate::validation::validate_table_name(table_name)?;
650    let meta = helpers::require_table_for_item_op(storage, table_name).await?;
651    let key_schema = helpers::parse_key_schema(&meta)?;
652    match helpers::extract_key_strings(key_source, &key_schema) {
653        Ok((pk, sk)) => Ok(Some(format!("{table_name}#{pk}#{sk}"))),
654        Err(_) => Ok(None),
655    }
656}
657
658/// Get a unique target key (table + pk + sk) for duplicate detection, or `None` when
659/// the action's key can't form one (see [`target_for`]).
660async fn get_item_target<S: StorageBackend>(
661    storage: &S,
662    item: &TransactWriteItem,
663) -> Result<Option<String>> {
664    if let Some(ref put) = item.put {
665        target_for(storage, &put.table_name, &put.item).await
666    } else if let Some(ref update) = item.update {
667        target_for(storage, &update.table_name, &update.key).await
668    } else if let Some(ref delete) = item.delete {
669        target_for(storage, &delete.table_name, &delete.key).await
670    } else if let Some(ref check) = item.condition_check {
671        target_for(storage, &check.table_name, &check.key).await
672    } else {
673        Err(DynoxideError::ValidationException(
674            "TransactItem must contain exactly one action".to_string(),
675        ))
676    }
677}