dynoxide/actions/
transact_get_items.rs1use crate::actions::helpers;
2use crate::errors::{CancellationReason, DynoxideError, Result};
3use crate::expressions;
4use crate::storage::Storage;
5use crate::types::{AttributeValue, Item};
6use serde::{Deserialize, Serialize};
7use std::collections::{HashMap, HashSet};
8
9#[derive(Debug, Default, Deserialize)]
10pub struct TransactGetItemsRequest {
11 #[serde(rename = "TransactItems")]
12 pub transact_items: Vec<TransactGetItem>,
13 #[serde(rename = "ReturnConsumedCapacity", default)]
14 pub return_consumed_capacity: Option<String>,
15}
16
17#[derive(Debug, Default, Deserialize)]
18pub struct TransactGetItem {
19 #[serde(rename = "Get")]
20 pub get: TransactGet,
21}
22
23#[derive(Debug, Default, Deserialize)]
24pub struct TransactGet {
25 #[serde(rename = "TableName")]
26 pub table_name: String,
27 #[serde(rename = "Key")]
28 pub key: HashMap<String, AttributeValue>,
29 #[serde(rename = "ProjectionExpression", default)]
30 pub projection_expression: Option<String>,
31 #[serde(rename = "ExpressionAttributeNames", default)]
32 pub expression_attribute_names: Option<HashMap<String, String>>,
33}
34
35#[derive(Debug, Default, Serialize)]
36pub struct TransactGetItemsResponse {
37 #[serde(rename = "Responses")]
38 pub responses: Vec<TransactGetResponse>,
39 #[serde(rename = "ConsumedCapacity", skip_serializing_if = "Option::is_none")]
40 pub consumed_capacity: Option<Vec<crate::types::ConsumedCapacity>>,
41}
42
43#[derive(Debug, Default, Serialize)]
44pub struct TransactGetResponse {
45 #[serde(rename = "Item", skip_serializing_if = "Option::is_none")]
46 pub item: Option<Item>,
47}
48
49pub fn execute(
50 storage: &Storage,
51 request: TransactGetItemsRequest,
52) -> Result<TransactGetItemsResponse> {
53 if request.transact_items.is_empty() {
55 return Err(DynoxideError::ValidationException(
56 "1 validation error detected: Value '[]' at 'transactItems' failed to satisfy constraint: Member must have length greater than or equal to 1".to_string(),
57 ));
58 }
59
60 if request.transact_items.len() > 100 {
66 let dump = format!("{:?}", request.transact_items);
67 return Err(DynoxideError::ValidationException(format!(
68 "1 validation error detected: Value '[{dump}]' at 'transactItems' failed to satisfy constraint: Member must have length less than or equal to 100"
69 )));
70 }
71
72 let mut reasons: Vec<CancellationReason> = Vec::with_capacity(request.transact_items.len());
83 let mut validated_schemas: Vec<Option<helpers::KeySchema>> =
84 Vec::with_capacity(request.transact_items.len());
85 let mut has_failure = false;
86
87 for transact_item in &request.transact_items {
88 let get = &transact_item.get;
89 match validate_action(storage, get) {
90 Ok(schema) => {
91 reasons.push(CancellationReason {
92 code: "None".to_string(),
93 message: None,
94 item: None,
95 });
96 validated_schemas.push(Some(schema));
97 }
98 Err(DynoxideError::ValidationException(msg)) => {
99 has_failure = true;
100 reasons.push(CancellationReason {
101 code: "ValidationError".to_string(),
102 message: Some(msg),
103 item: None,
104 });
105 validated_schemas.push(None);
106 }
107 Err(DynoxideError::ResourceNotFoundException(msg)) => {
108 return Err(DynoxideError::ResourceNotFoundException(msg));
111 }
112 Err(other) => return Err(other),
113 }
114 }
115
116 if has_failure {
117 let codes: Vec<&str> = reasons.iter().map(|r| r.code.as_str()).collect();
118 let message = format!(
119 "Transaction cancelled, please refer cancellation reasons for specific reasons [{}]",
120 codes.join(", ")
121 );
122 return Err(DynoxideError::TransactionCanceledException(
123 message, reasons,
124 ));
125 }
126
127 let mut seen_targets = HashSet::new();
131 for (transact_item, schema) in request.transact_items.iter().zip(validated_schemas.iter()) {
132 let get = &transact_item.get;
133 let key_schema = schema.as_ref().expect("validated above");
134 let (pk, sk) = helpers::extract_key_strings(&get.key, key_schema)?;
136 let target = format!("{}#{}#{}", get.table_name, pk, sk);
137 if !seen_targets.insert(target) {
138 return Err(DynoxideError::ValidationException(
139 "Transaction request cannot include multiple operations on one item".to_string(),
140 ));
141 }
142 }
143
144 let mut responses = Vec::with_capacity(request.transact_items.len());
145
146 for (transact_item, schema) in request.transact_items.iter().zip(validated_schemas.iter()) {
147 let get = &transact_item.get;
148 let key_schema = schema.as_ref().expect("validated above");
149
150 let (pk, sk) = helpers::extract_key_strings(&get.key, key_schema)?;
152
153 let item_json = storage.get_item(&get.table_name, &pk, &sk)?;
154
155 let item: Option<Item> = item_json.and_then(|j| serde_json::from_str(&j).ok());
156
157 let tracker = crate::expressions::TrackedExpressionAttributes::new(
159 &get.expression_attribute_names,
160 &None, );
162
163 let item = if let Some(proj_expr) = &get.projection_expression {
164 let projection = expressions::projection::parse(proj_expr)
165 .map_err(DynoxideError::ValidationException)?;
166 tracker.track_projection_expr(&projection);
167
168 if let Some(item) = item {
169 let mut key_attrs = vec![key_schema.partition_key.clone()];
170 if let Some(ref sk) = key_schema.sort_key {
171 key_attrs.push(sk.clone());
172 }
173
174 let projected =
175 expressions::projection::apply(&item, &projection, &tracker, &key_attrs)
176 .map_err(DynoxideError::ValidationException)?;
177 Some(projected)
178 } else {
179 None
180 }
181 } else {
182 item
183 };
184
185 tracker.check_unused()?;
186
187 responses.push(TransactGetResponse { item });
188 }
189
190 let consumed_capacity = if matches!(
192 request.return_consumed_capacity.as_deref(),
193 Some("TOTAL") | Some("INDEXES")
194 ) {
195 let mut table_sizes: std::collections::HashMap<String, usize> =
196 std::collections::HashMap::new();
197 for (resp, req_item) in responses.iter().zip(request.transact_items.iter()) {
198 let size = resp.item.as_ref().map(crate::types::item_size).unwrap_or(0);
199 *table_sizes
200 .entry(req_item.get.table_name.clone())
201 .or_default() += size;
202 }
203 let caps: Vec<_> = table_sizes
204 .iter()
205 .filter_map(|(table, &size)| {
206 crate::types::consumed_capacity(
207 table,
208 crate::types::read_capacity_units_with_consistency(size, true),
209 &request.return_consumed_capacity,
210 )
211 })
212 .collect();
213 Some(caps)
214 } else {
215 None
216 };
217
218 Ok(TransactGetItemsResponse {
219 responses,
220 consumed_capacity,
221 })
222}
223
224fn validate_action(storage: &Storage, get: &TransactGet) -> Result<helpers::KeySchema> {
230 crate::validation::validate_table_name(&get.table_name)?;
231 let meta = helpers::require_table_for_item_op(storage, &get.table_name)?;
232 let key_schema = helpers::parse_key_schema(&meta)?;
233 helpers::validate_key_only(&get.key, &key_schema)?;
234 Ok(key_schema)
235}