Skip to main content

dynoxide/actions/
transact_get_items.rs

1use crate::actions::helpers;
2use crate::errors::{CancellationReason, DynoxideError, Result};
3use crate::expressions;
4use crate::storage_backend::StorageBackend;
5use crate::types::{AttributeValue, Item};
6use serde::{Deserialize, Serialize};
7use std::collections::{HashMap, HashSet};
8
9#[derive(Debug, Default, Deserialize)]
10pub struct TransactGetItemsRequest {
11    #[serde(rename = "TransactItems")]
12    pub transact_items: Vec<TransactGetItem>,
13    #[serde(rename = "ReturnConsumedCapacity", default)]
14    pub return_consumed_capacity: Option<String>,
15}
16
17#[derive(Debug, Default, Deserialize)]
18pub struct TransactGetItem {
19    #[serde(rename = "Get")]
20    pub get: TransactGet,
21}
22
23#[derive(Debug, Default, Deserialize)]
24pub struct TransactGet {
25    #[serde(rename = "TableName")]
26    pub table_name: String,
27    #[serde(rename = "Key")]
28    pub key: HashMap<String, AttributeValue>,
29    #[serde(rename = "ProjectionExpression", default)]
30    pub projection_expression: Option<String>,
31    #[serde(rename = "ExpressionAttributeNames", default)]
32    pub expression_attribute_names: Option<HashMap<String, String>>,
33}
34
35#[derive(Debug, Default, Serialize)]
36pub struct TransactGetItemsResponse {
37    #[serde(rename = "Responses")]
38    pub responses: Vec<TransactGetResponse>,
39    #[serde(rename = "ConsumedCapacity", skip_serializing_if = "Option::is_none")]
40    pub consumed_capacity: Option<Vec<crate::types::ConsumedCapacity>>,
41}
42
43#[derive(Debug, Default, Serialize)]
44pub struct TransactGetResponse {
45    #[serde(rename = "Item", skip_serializing_if = "Option::is_none")]
46    pub item: Option<Item>,
47}
48
49pub async fn execute<S: StorageBackend>(
50    storage: &S,
51    request: TransactGetItemsRequest,
52) -> Result<TransactGetItemsResponse> {
53    // Validate: at least 1 action
54    if request.transact_items.is_empty() {
55        return Err(DynoxideError::ValidationException(
56            "1 validation error detected: Value '[]' at 'transactItems' failed to satisfy constraint: Member must have length greater than or equal to 1".to_string(),
57        ));
58    }
59
60    // Validate: up to 100 actions.
61    // AWS surfaces this as the standard "1 validation error detected" envelope
62    // around `Value '[<dump>]' at 'transactItems'`. The conformance suite
63    // anchors a regex on the envelope and constraint phrase but leaves the
64    // dump body unconstrained.
65    if request.transact_items.len() > 100 {
66        let dump = format!("{:?}", request.transact_items);
67        return Err(DynoxideError::ValidationException(format!(
68            "1 validation error detected: Value '[{dump}]' at 'transactItems' failed to satisfy constraint: Member must have length less than or equal to 100"
69        )));
70    }
71
72    // Per-action validation pass.
73    //
74    // AWS surfaces per-action validation failures (empty Key, schema mismatch,
75    // etc.) through the cancellation channel rather than as a request-level
76    // ValidationException, so we collect a CancellationReason for each action
77    // up-front. Validation here must run BEFORE any call to
78    // helpers::extract_key_strings: that helper returns InternalServerError
79    // for a missing partition or sort key, which would leak as HTTP 500
80    // instead of a per-action ValidationError. validate_key_only is the
81    // ValidationException-returning equivalent.
82    let mut reasons: Vec<CancellationReason> = Vec::with_capacity(request.transact_items.len());
83    let mut validated_schemas: Vec<Option<helpers::KeySchema>> =
84        Vec::with_capacity(request.transact_items.len());
85    let mut has_failure = false;
86
87    for transact_item in &request.transact_items {
88        let get = &transact_item.get;
89        match validate_action(storage, get).await {
90            Ok(schema) => {
91                reasons.push(CancellationReason {
92                    code: "None".to_string(),
93                    message: None,
94                    item: None,
95                });
96                validated_schemas.push(Some(schema));
97            }
98            // Group KeyEmptyValueValidation with ValidationException so an
99            // empty-value key stays a per-action ValidationError cancellation
100            // reason here. Unlike TransactWriteItems, a transact read surfaces
101            // per-action key validation through the cancellation channel rather
102            // than as a top-level error (captured AWS behaviour).
103            Err(DynoxideError::ValidationException(msg))
104            | Err(DynoxideError::KeyEmptyValueValidation(msg)) => {
105                has_failure = true;
106                reasons.push(CancellationReason {
107                    code: "ValidationError".to_string(),
108                    message: Some(msg),
109                    item: None,
110                });
111                validated_schemas.push(None);
112            }
113            Err(DynoxideError::ResourceNotFoundException(msg)) => {
114                // Resource-not-found at the request level is the existing AWS
115                // behaviour (mirrors transact-get's pre-fix path); preserve it.
116                return Err(DynoxideError::ResourceNotFoundException(msg));
117            }
118            Err(other) => return Err(other),
119        }
120    }
121
122    if has_failure {
123        let codes: Vec<&str> = reasons.iter().map(|r| r.code.as_str()).collect();
124        let message = format!(
125            "Transaction cancelled, please refer cancellation reasons for specific reasons [{}]",
126            codes.join(", ")
127        );
128        return Err(DynoxideError::TransactionCanceledException(
129            message, reasons,
130        ));
131    }
132
133    // Validate: no duplicate item targets.
134    // Safe to call extract_key_strings here because validate_key_only has
135    // already passed for every action.
136    let mut seen_targets = HashSet::new();
137    for (transact_item, schema) in request.transact_items.iter().zip(validated_schemas.iter()) {
138        let get = &transact_item.get;
139        let key_schema = schema.as_ref().expect("validated above");
140        // TODO: validation must precede this call -- if reaching this line, caller has already validated keys.
141        let (pk, sk) = helpers::extract_key_strings(&get.key, key_schema)?;
142        let target = format!("{}#{}#{}", get.table_name, pk, sk);
143        if !seen_targets.insert(target) {
144            return Err(DynoxideError::ValidationException(
145                "Transaction request cannot include multiple operations on one item".to_string(),
146            ));
147        }
148    }
149
150    let mut responses = Vec::with_capacity(request.transact_items.len());
151
152    for (transact_item, schema) in request.transact_items.iter().zip(validated_schemas.iter()) {
153        let get = &transact_item.get;
154        let key_schema = schema.as_ref().expect("validated above");
155
156        // TODO: validation must precede this call -- if reaching this line, caller has already validated keys.
157        let (pk, sk) = helpers::extract_key_strings(&get.key, key_schema)?;
158
159        let item_json = storage.get_item(&get.table_name, &pk, &sk).await?;
160
161        let item: Option<Item> = item_json.and_then(|j| serde_json::from_str(&j).ok());
162
163        // Apply projection if present
164        let tracker = crate::expressions::TrackedExpressionAttributes::new(
165            &get.expression_attribute_names,
166            &None, // TransactGet has no ExpressionAttributeValues
167        );
168
169        let item = if let Some(proj_expr) = &get.projection_expression {
170            let projection = expressions::projection::parse(proj_expr)
171                .map_err(DynoxideError::ValidationException)?;
172            tracker.track_projection_expr(&projection);
173
174            if let Some(item) = item {
175                let mut key_attrs = vec![key_schema.partition_key.clone()];
176                if let Some(ref sk) = key_schema.sort_key {
177                    key_attrs.push(sk.clone());
178                }
179
180                // AWS omits `Item` entirely when a ProjectionExpression matches
181                // no attribute on an otherwise-present item. `projection::apply`
182                // always re-injects the key attributes, so its result is never
183                // literally empty. Apply the projection without those keys to
184                // see whether any path actually resolved, then return the
185                // key-bearing result only when one did.
186                let matched = expressions::projection::apply(&item, &projection, &tracker, &[])
187                    .map_err(DynoxideError::ValidationException)?;
188                if matched.is_empty() {
189                    None
190                } else {
191                    let projected =
192                        expressions::projection::apply(&item, &projection, &tracker, &key_attrs)
193                            .map_err(DynoxideError::ValidationException)?;
194                    Some(projected)
195                }
196            } else {
197                None
198            }
199        } else {
200            item
201        };
202
203        tracker.check_unused()?;
204
205        responses.push(TransactGetResponse { item });
206    }
207
208    // Build consumed capacity per table
209    let consumed_capacity = if matches!(
210        request.return_consumed_capacity.as_deref(),
211        Some("TOTAL") | Some("INDEXES")
212    ) {
213        // AWS charges 2 RCU per requested item for a transactional read,
214        // including items that turned out to be missing (a missing item has
215        // size 0, which still rounds up to 1 RCU before the 2x factor). Round
216        // each item up to whole read units first, then double, then sum per
217        // table, so a boundary-straddling item is not undercharged.
218        let mut table_units: std::collections::HashMap<String, f64> =
219            std::collections::HashMap::new();
220        for (resp, req_item) in responses.iter().zip(request.transact_items.iter()) {
221            let size = resp.item.as_ref().map(crate::types::item_size).unwrap_or(0);
222            *table_units
223                .entry(req_item.get.table_name.clone())
224                .or_default() += crate::types::TRANSACTIONAL_CAPACITY_FACTOR
225                * crate::types::read_capacity_units_with_consistency(size, true);
226        }
227        let caps: Vec<_> = table_units
228            .iter()
229            .filter_map(|(table, &units)| {
230                crate::types::transactional_read_capacity(
231                    table,
232                    units,
233                    &request.return_consumed_capacity,
234                )
235            })
236            .collect();
237        Some(caps)
238    } else {
239        None
240    };
241
242    Ok(TransactGetItemsResponse {
243        responses,
244        consumed_capacity,
245    })
246}
247
248/// Run the validation that AWS treats as per-action (and therefore reportable
249/// through the cancellation channel as ValidationError) for a single
250/// TransactGet action: table-name shape, table existence, parsed key schema,
251/// and key shape against that schema. Returns the resolved KeySchema so the
252/// caller can avoid re-parsing it before extract_key_strings.
253async fn validate_action<S: StorageBackend>(
254    storage: &S,
255    get: &TransactGet,
256) -> Result<helpers::KeySchema> {
257    crate::validation::validate_table_name(&get.table_name)?;
258    let meta = helpers::require_table_for_item_op(storage, &get.table_name).await?;
259    let key_schema = helpers::parse_key_schema(&meta)?;
260    helpers::validate_key_only(&get.key, &key_schema)?;
261    Ok(key_schema)
262}