dynoxide/actions/
batch_get_item.rs1use crate::actions::helpers;
2use crate::errors::{DynoxideError, Result};
3use crate::expressions;
4use crate::storage_backend::StorageBackend;
5use crate::types::{AttributeValue, Item};
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8
9#[derive(Debug, Default, Deserialize)]
10pub struct BatchGetItemRequest {
11 #[serde(rename = "RequestItems")]
12 pub request_items: HashMap<String, KeysAndAttributes>,
13 #[serde(rename = "ReturnConsumedCapacity", default)]
14 pub return_consumed_capacity: Option<String>,
15}
16
17#[derive(Debug, Default, Deserialize)]
18pub struct KeysAndAttributes {
19 #[serde(rename = "Keys")]
20 pub keys: Vec<HashMap<String, AttributeValue>>,
21 #[serde(rename = "ProjectionExpression", default)]
22 pub projection_expression: Option<String>,
23 #[serde(rename = "ExpressionAttributeNames", default)]
24 pub expression_attribute_names: Option<HashMap<String, String>>,
25 #[serde(rename = "ConsistentRead", default)]
26 pub consistent_read: Option<bool>,
27 #[serde(rename = "AttributesToGet", default)]
28 pub attributes_to_get: Option<Vec<String>>,
29}
30
31#[derive(Debug, Default, Serialize)]
32pub struct BatchGetItemResponse {
33 #[serde(rename = "Responses")]
34 pub responses: HashMap<String, Vec<Item>>,
35 #[serde(rename = "UnprocessedKeys")]
36 pub unprocessed_keys: HashMap<String, serde_json::Value>,
37 #[serde(rename = "ConsumedCapacity", skip_serializing_if = "Option::is_none")]
38 pub consumed_capacity: Option<Vec<crate::types::ConsumedCapacity>>,
39}
40
41pub async fn execute<S: StorageBackend>(
42 storage: &S,
43 request: BatchGetItemRequest,
44) -> Result<BatchGetItemResponse> {
45 if request.request_items.is_empty() {
49 return Err(DynoxideError::ValidationException(
50 "The requestItems parameter is required for BatchGetItem".to_string(),
51 ));
52 }
53
54 for (table_name, ka) in &request.request_items {
56 if ka.keys.is_empty() {
57 return Err(DynoxideError::ValidationException(format!(
58 "1 validation error detected: Value at 'requestItems.{table_name}.member.keys' failed to satisfy constraint: Member must have length greater than or equal to 1"
59 )));
60 }
61 }
62
63 for table_name in request.request_items.keys() {
65 crate::validation::validate_table_name(table_name)?;
66 }
67
68 let total_keys: usize = request.request_items.values().map(|ka| ka.keys.len()).sum();
76 if total_keys > 100 {
77 let table_name = request
78 .request_items
79 .iter()
80 .max_by_key(|(_, ka)| ka.keys.len())
81 .map(|(name, _)| name.as_str())
82 .unwrap_or("");
83 return Err(DynoxideError::ValidationException(format!(
84 "1 validation error detected: Value at 'RequestItems.{table_name}.member.Keys' failed to satisfy constraint: Member must have length less than or equal to 100"
85 )));
86 }
87
88 for keys_and_attrs in request.request_items.values() {
92 let has_attributes_to_get = keys_and_attrs.attributes_to_get.is_some();
94 let has_projection_expr = keys_and_attrs.projection_expression.is_some();
95 let has_expr_attr_names = keys_and_attrs.expression_attribute_names.is_some();
96
97 if has_attributes_to_get && has_projection_expr {
98 return Err(DynoxideError::ValidationException(
99 "Can not use both expression and non-expression parameters in the same request: Non-expression parameters: {AttributesToGet} Expression parameters: {ProjectionExpression}".to_string(),
100 ));
101 }
102
103 if has_expr_attr_names && !has_projection_expr {
105 return Err(DynoxideError::ValidationException(
106 "ExpressionAttributeNames can only be specified when using expressions".to_string(),
107 ));
108 }
109
110 if let Some(ref ean) = keys_and_attrs.expression_attribute_names {
112 if ean.is_empty() {
113 return Err(DynoxideError::ValidationException(
114 "ExpressionAttributeNames must not be empty".to_string(),
115 ));
116 }
117 for key in ean.keys() {
119 if !key.starts_with('#') {
120 return Err(DynoxideError::ValidationException(format!(
121 "ExpressionAttributeNames contains invalid key: Syntax error; key: \"{key}\""
122 )));
123 }
124 }
125 }
126
127 if let Some(ref pe) = keys_and_attrs.projection_expression {
129 if pe.is_empty() {
130 return Err(DynoxideError::ValidationException(
131 "Invalid ProjectionExpression: The expression can not be empty;".to_string(),
132 ));
133 }
134 }
135
136 if let Some(ref atg) = keys_and_attrs.attributes_to_get {
138 let mut seen = std::collections::HashSet::new();
139 for attr in atg {
140 if !seen.insert(attr.as_str()) {
141 return Err(DynoxideError::ValidationException(format!(
142 "One or more parameter values were invalid: Duplicate value in attribute name: {attr}"
143 )));
144 }
145 }
146 }
147
148 for key in &keys_and_attrs.keys {
150 crate::validation::validate_item_attribute_values(key)?;
151 }
152
153 if keys_and_attrs.keys.len() > 1 {
155 let serialised: Vec<String> = keys_and_attrs
156 .keys
157 .iter()
158 .map(|k| {
159 let mut pairs: Vec<_> = k.iter().map(|(k, v)| format!("{k}={v:?}")).collect();
160 pairs.sort();
161 pairs.join(",")
162 })
163 .collect();
164 let mut seen = std::collections::HashSet::new();
165 for s in &serialised {
166 if !seen.insert(s) {
167 return Err(DynoxideError::ValidationException(
168 "Provided list of item keys contains duplicates".to_string(),
169 ));
170 }
171 }
172 }
173 }
174
175 const MAX_RESPONSE_SIZE: usize = 16 * 1024 * 1024; let mut responses: HashMap<String, Vec<Item>> = HashMap::new();
178 let mut unprocessed_keys: HashMap<String, serde_json::Value> = HashMap::new();
179 let mut cumulative_size: usize = 0;
180 let mut size_limit_reached = false;
181 let mut table_rcu: HashMap<String, f64> = HashMap::new();
183
184 for (table_name, keys_and_attrs) in &request.request_items {
185 let meta = helpers::require_table_for_item_op(storage, table_name).await?;
186 let key_schema = helpers::parse_key_schema(&meta)?;
187
188 let projection = if let Some(ref expr) = keys_and_attrs.projection_expression {
190 Some(expressions::projection::parse(expr).map_err(DynoxideError::ValidationException)?)
191 } else {
192 keys_and_attrs
193 .attributes_to_get
194 .as_ref()
195 .map(|attrs| crate::actions::helpers::attributes_to_get_to_projection(attrs))
196 };
197
198 let tracker = crate::expressions::TrackedExpressionAttributes::new(
199 &keys_and_attrs.expression_attribute_names,
200 &None, );
202
203 if let Some(ref proj) = projection {
205 tracker.track_projection_expr(proj);
206 }
207
208 let key_attrs = Vec::new();
210
211 let consistent = keys_and_attrs.consistent_read.unwrap_or(false);
212 let mut table_items = Vec::new();
213 let mut remaining_keys: Vec<HashMap<String, AttributeValue>> = Vec::new();
214 let mut per_table_rcu: f64 = 0.0;
215
216 for key in &keys_and_attrs.keys {
217 if size_limit_reached {
218 remaining_keys.push(key.clone());
219 continue;
220 }
221
222 helpers::validate_key_only(key, &key_schema)?;
223 let (pk, sk) = helpers::extract_key_strings(key, &key_schema)?;
225
226 if let Some(item_json) = storage.get_item(table_name, &pk, &sk).await? {
227 let item: Item = serde_json::from_str(&item_json).map_err(|e| {
228 DynoxideError::InternalServerError(format!("Bad item JSON: {e}"))
229 })?;
230
231 let item_size = crate::types::item_size(&item);
233
234 if cumulative_size + item_size > MAX_RESPONSE_SIZE {
235 size_limit_reached = true;
236 remaining_keys.push(key.clone());
237 continue;
238 }
239
240 cumulative_size += item_size;
241
242 per_table_rcu +=
244 crate::types::read_capacity_units_with_consistency(item_size, consistent);
245
246 let result_item = if let Some(ref proj) = projection {
247 expressions::projection::apply(&item, proj, &tracker, &key_attrs)
248 .map_err(DynoxideError::ValidationException)?
249 } else {
250 item
251 };
252
253 table_items.push(result_item);
254 } else {
255 per_table_rcu += crate::types::read_capacity_units_with_consistency(0, consistent);
257 }
258 }
259
260 tracker.check_unused()?;
262
263 table_rcu.insert(table_name.clone(), per_table_rcu);
264 responses.insert(table_name.clone(), table_items);
265
266 if !remaining_keys.is_empty() {
267 let mut unprocessed = serde_json::json!({
268 "Keys": remaining_keys,
269 });
270 if let Some(ref pe) = keys_and_attrs.projection_expression {
273 unprocessed["ProjectionExpression"] = serde_json::json!(pe);
274 }
275 if let Some(ref ean) = keys_and_attrs.expression_attribute_names {
276 unprocessed["ExpressionAttributeNames"] = serde_json::json!(ean);
277 }
278 if let Some(cr) = keys_and_attrs.consistent_read {
279 unprocessed["ConsistentRead"] = serde_json::json!(cr);
280 }
281 unprocessed_keys.insert(table_name.clone(), unprocessed);
282 }
283 }
284
285 let consumed_capacity = if matches!(
287 request.return_consumed_capacity.as_deref(),
288 Some("TOTAL") | Some("INDEXES")
289 ) {
290 let mut caps = Vec::new();
291 for table_name in request.request_items.keys() {
292 let total_rcu = table_rcu.get(table_name).copied().unwrap_or(0.0);
293 if let Some(cc) = crate::types::consumed_capacity(
294 table_name,
295 total_rcu,
296 &request.return_consumed_capacity,
297 ) {
298 caps.push(cc);
299 }
300 }
301 Some(caps)
302 } else {
303 None
304 };
305
306 Ok(BatchGetItemResponse {
307 responses,
308 unprocessed_keys,
309 consumed_capacity,
310 })
311}