dynoxide/actions/
transact_get_items.rs1use crate::actions::helpers;
2use crate::errors::{CancellationReason, DynoxideError, Result};
3use crate::expressions;
4use crate::storage_backend::StorageBackend;
5use crate::types::{AttributeValue, Item};
6use serde::{Deserialize, Serialize};
7use std::collections::{HashMap, HashSet};
8
9#[derive(Debug, Default, Deserialize)]
10pub struct TransactGetItemsRequest {
11 #[serde(rename = "TransactItems")]
12 pub transact_items: Vec<TransactGetItem>,
13 #[serde(rename = "ReturnConsumedCapacity", default)]
14 pub return_consumed_capacity: Option<String>,
15}
16
17#[derive(Debug, Default, Deserialize)]
18pub struct TransactGetItem {
19 #[serde(rename = "Get")]
20 pub get: TransactGet,
21}
22
23#[derive(Debug, Default, Deserialize)]
24pub struct TransactGet {
25 #[serde(rename = "TableName")]
26 pub table_name: String,
27 #[serde(rename = "Key")]
28 pub key: HashMap<String, AttributeValue>,
29 #[serde(rename = "ProjectionExpression", default)]
30 pub projection_expression: Option<String>,
31 #[serde(rename = "ExpressionAttributeNames", default)]
32 pub expression_attribute_names: Option<HashMap<String, String>>,
33}
34
35#[derive(Debug, Default, Serialize)]
36pub struct TransactGetItemsResponse {
37 #[serde(rename = "Responses")]
38 pub responses: Vec<TransactGetResponse>,
39 #[serde(rename = "ConsumedCapacity", skip_serializing_if = "Option::is_none")]
40 pub consumed_capacity: Option<Vec<crate::types::ConsumedCapacity>>,
41}
42
43#[derive(Debug, Default, Serialize)]
44pub struct TransactGetResponse {
45 #[serde(rename = "Item", skip_serializing_if = "Option::is_none")]
46 pub item: Option<Item>,
47}
48
49pub async fn execute<S: StorageBackend>(
50 storage: &S,
51 request: TransactGetItemsRequest,
52) -> Result<TransactGetItemsResponse> {
53 if request.transact_items.is_empty() {
55 return Err(DynoxideError::ValidationException(
56 "1 validation error detected: Value '[]' at 'transactItems' failed to satisfy constraint: Member must have length greater than or equal to 1".to_string(),
57 ));
58 }
59
60 if request.transact_items.len() > 100 {
66 let dump = format!("{:?}", request.transact_items);
67 return Err(DynoxideError::ValidationException(format!(
68 "1 validation error detected: Value '[{dump}]' at 'transactItems' failed to satisfy constraint: Member must have length less than or equal to 100"
69 )));
70 }
71
72 let mut reasons: Vec<CancellationReason> = Vec::with_capacity(request.transact_items.len());
83 let mut validated_schemas: Vec<Option<helpers::KeySchema>> =
84 Vec::with_capacity(request.transact_items.len());
85 let mut has_failure = false;
86
87 for transact_item in &request.transact_items {
88 let get = &transact_item.get;
89 match validate_action(storage, get).await {
90 Ok(schema) => {
91 reasons.push(CancellationReason {
92 code: "None".to_string(),
93 message: None,
94 item: None,
95 });
96 validated_schemas.push(Some(schema));
97 }
98 Err(DynoxideError::ValidationException(msg))
104 | Err(DynoxideError::KeyEmptyValueValidation(msg)) => {
105 has_failure = true;
106 reasons.push(CancellationReason {
107 code: "ValidationError".to_string(),
108 message: Some(msg),
109 item: None,
110 });
111 validated_schemas.push(None);
112 }
113 Err(DynoxideError::ResourceNotFoundException(msg)) => {
114 return Err(DynoxideError::ResourceNotFoundException(msg));
117 }
118 Err(other) => return Err(other),
119 }
120 }
121
122 if has_failure {
123 let codes: Vec<&str> = reasons.iter().map(|r| r.code.as_str()).collect();
124 let message = format!(
125 "Transaction cancelled, please refer cancellation reasons for specific reasons [{}]",
126 codes.join(", ")
127 );
128 return Err(DynoxideError::TransactionCanceledException(
129 message, reasons,
130 ));
131 }
132
133 let mut seen_targets = HashSet::new();
137 for (transact_item, schema) in request.transact_items.iter().zip(validated_schemas.iter()) {
138 let get = &transact_item.get;
139 let key_schema = schema.as_ref().expect("validated above");
140 let (pk, sk) = helpers::extract_key_strings(&get.key, key_schema)?;
142 let target = format!("{}#{}#{}", get.table_name, pk, sk);
143 if !seen_targets.insert(target) {
144 return Err(DynoxideError::ValidationException(
145 "Transaction request cannot include multiple operations on one item".to_string(),
146 ));
147 }
148 }
149
150 let mut responses = Vec::with_capacity(request.transact_items.len());
151
152 for (transact_item, schema) in request.transact_items.iter().zip(validated_schemas.iter()) {
153 let get = &transact_item.get;
154 let key_schema = schema.as_ref().expect("validated above");
155
156 let (pk, sk) = helpers::extract_key_strings(&get.key, key_schema)?;
158
159 let item_json = storage.get_item(&get.table_name, &pk, &sk).await?;
160
161 let item: Option<Item> = item_json.and_then(|j| serde_json::from_str(&j).ok());
162
163 let tracker = crate::expressions::TrackedExpressionAttributes::new(
165 &get.expression_attribute_names,
166 &None, );
168
169 let item = if let Some(proj_expr) = &get.projection_expression {
170 let projection = expressions::projection::parse(proj_expr)
171 .map_err(DynoxideError::ValidationException)?;
172 tracker.track_projection_expr(&projection);
173
174 if let Some(item) = item {
175 let mut key_attrs = vec![key_schema.partition_key.clone()];
176 if let Some(ref sk) = key_schema.sort_key {
177 key_attrs.push(sk.clone());
178 }
179
180 let matched = expressions::projection::apply(&item, &projection, &tracker, &[])
187 .map_err(DynoxideError::ValidationException)?;
188 if matched.is_empty() {
189 None
190 } else {
191 let projected =
192 expressions::projection::apply(&item, &projection, &tracker, &key_attrs)
193 .map_err(DynoxideError::ValidationException)?;
194 Some(projected)
195 }
196 } else {
197 None
198 }
199 } else {
200 item
201 };
202
203 tracker.check_unused()?;
204
205 responses.push(TransactGetResponse { item });
206 }
207
208 let consumed_capacity = if matches!(
210 request.return_consumed_capacity.as_deref(),
211 Some("TOTAL") | Some("INDEXES")
212 ) {
213 let mut table_units: std::collections::HashMap<String, f64> =
219 std::collections::HashMap::new();
220 for (resp, req_item) in responses.iter().zip(request.transact_items.iter()) {
221 let size = resp.item.as_ref().map(crate::types::item_size).unwrap_or(0);
222 *table_units
223 .entry(req_item.get.table_name.clone())
224 .or_default() += crate::types::TRANSACTIONAL_CAPACITY_FACTOR
225 * crate::types::read_capacity_units_with_consistency(size, true);
226 }
227 let caps: Vec<_> = table_units
228 .iter()
229 .filter_map(|(table, &units)| {
230 crate::types::transactional_read_capacity(
231 table,
232 units,
233 &request.return_consumed_capacity,
234 )
235 })
236 .collect();
237 Some(caps)
238 } else {
239 None
240 };
241
242 Ok(TransactGetItemsResponse {
243 responses,
244 consumed_capacity,
245 })
246}
247
248async fn validate_action<S: StorageBackend>(
254 storage: &S,
255 get: &TransactGet,
256) -> Result<helpers::KeySchema> {
257 crate::validation::validate_table_name(&get.table_name)?;
258 let meta = helpers::require_table_for_item_op(storage, &get.table_name).await?;
259 let key_schema = helpers::parse_key_schema(&meta)?;
260 helpers::validate_key_only(&get.key, &key_schema)?;
261 Ok(key_schema)
262}