Skip to main content

dynoxide/actions/
batch_get_item.rs

1use crate::actions::helpers;
2use crate::errors::{DynoxideError, Result};
3use crate::expressions;
4use crate::storage::Storage;
5use crate::types::{AttributeValue, Item};
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8
9#[derive(Debug, Default, Deserialize)]
10pub struct BatchGetItemRequest {
11    #[serde(rename = "RequestItems")]
12    pub request_items: HashMap<String, KeysAndAttributes>,
13    #[serde(rename = "ReturnConsumedCapacity", default)]
14    pub return_consumed_capacity: Option<String>,
15}
16
17#[derive(Debug, Default, Deserialize)]
18pub struct KeysAndAttributes {
19    #[serde(rename = "Keys")]
20    pub keys: Vec<HashMap<String, AttributeValue>>,
21    #[serde(rename = "ProjectionExpression", default)]
22    pub projection_expression: Option<String>,
23    #[serde(rename = "ExpressionAttributeNames", default)]
24    pub expression_attribute_names: Option<HashMap<String, String>>,
25    #[serde(rename = "ConsistentRead", default)]
26    pub consistent_read: Option<bool>,
27    #[serde(rename = "AttributesToGet", default)]
28    pub attributes_to_get: Option<Vec<String>>,
29}
30
31#[derive(Debug, Default, Serialize)]
32pub struct BatchGetItemResponse {
33    #[serde(rename = "Responses")]
34    pub responses: HashMap<String, Vec<Item>>,
35    #[serde(rename = "UnprocessedKeys")]
36    pub unprocessed_keys: HashMap<String, serde_json::Value>,
37    #[serde(rename = "ConsumedCapacity", skip_serializing_if = "Option::is_none")]
38    pub consumed_capacity: Option<Vec<crate::types::ConsumedCapacity>>,
39}
40
41pub fn execute(storage: &Storage, request: BatchGetItemRequest) -> Result<BatchGetItemResponse> {
42    // Validate RequestItems is not empty
43    if request.request_items.is_empty() {
44        return Err(DynoxideError::ValidationException(
45            "1 validation error detected: Value '{}' at 'requestItems' failed to satisfy constraint: Member must have length greater than or equal to 1".to_string(),
46        ));
47    }
48
49    // Validate each table entry has at least one key
50    for (table_name, ka) in &request.request_items {
51        if ka.keys.is_empty() {
52            return Err(DynoxideError::ValidationException(format!(
53                "1 validation error detected: Value at 'requestItems.{table_name}.member.keys' failed to satisfy constraint: Member must have length greater than or equal to 1"
54            )));
55        }
56    }
57
58    // Validate table name format for all tables before checking existence
59    for table_name in request.request_items.keys() {
60        crate::validation::validate_table_name(table_name)?;
61    }
62
63    // Validate total key count
64    let total_keys: usize = request.request_items.values().map(|ka| ka.keys.len()).sum();
65    if total_keys > 100 {
66        return Err(DynoxideError::ValidationException(
67            "Too many items requested for the BatchGetItem call".to_string(),
68        ));
69    }
70
71    // --- Pre-table validations ---
72    // DynamoDB validates expression attributes, key values, projections, and duplicates
73    // BEFORE checking table existence. Perform these checks first.
74    for keys_and_attrs in request.request_items.values() {
75        // Check AttributesToGet + expression conflict
76        let has_attributes_to_get = keys_and_attrs.attributes_to_get.is_some();
77        let has_projection_expr = keys_and_attrs.projection_expression.is_some();
78        let has_expr_attr_names = keys_and_attrs.expression_attribute_names.is_some();
79
80        if has_attributes_to_get && has_projection_expr {
81            return Err(DynoxideError::ValidationException(
82                "Can not use both expression and non-expression parameters in the same request: Non-expression parameters: {AttributesToGet} Expression parameters: {ProjectionExpression}".to_string(),
83            ));
84        }
85
86        // ExpressionAttributeNames without expression
87        if has_expr_attr_names && !has_projection_expr {
88            return Err(DynoxideError::ValidationException(
89                "ExpressionAttributeNames can only be specified when using expressions".to_string(),
90            ));
91        }
92
93        // Empty ExpressionAttributeNames
94        if let Some(ref ean) = keys_and_attrs.expression_attribute_names {
95            if ean.is_empty() {
96                return Err(DynoxideError::ValidationException(
97                    "ExpressionAttributeNames must not be empty".to_string(),
98                ));
99            }
100            // Invalid EAN keys (must start with #)
101            for key in ean.keys() {
102                if !key.starts_with('#') {
103                    return Err(DynoxideError::ValidationException(format!(
104                        "ExpressionAttributeNames contains invalid key: Syntax error; key: \"{key}\""
105                    )));
106                }
107            }
108        }
109
110        // Empty ProjectionExpression
111        if let Some(ref pe) = keys_and_attrs.projection_expression {
112            if pe.is_empty() {
113                return Err(DynoxideError::ValidationException(
114                    "Invalid ProjectionExpression: The expression can not be empty;".to_string(),
115                ));
116            }
117        }
118
119        // Duplicate AttributesToGet check (must come before duplicate keys check)
120        if let Some(ref atg) = keys_and_attrs.attributes_to_get {
121            let mut seen = std::collections::HashSet::new();
122            for attr in atg {
123                if !seen.insert(attr.as_str()) {
124                    return Err(DynoxideError::ValidationException(format!(
125                        "One or more parameter values were invalid: Duplicate value in attribute name: {attr}"
126                    )));
127                }
128            }
129        }
130
131        // Validate key attribute values (empty attrs, invalid numbers, etc.)
132        for key in &keys_and_attrs.keys {
133            crate::validation::validate_item_attribute_values(key)?;
134        }
135
136        // Duplicate keys check
137        if keys_and_attrs.keys.len() > 1 {
138            let serialised: Vec<String> = keys_and_attrs
139                .keys
140                .iter()
141                .map(|k| {
142                    let mut pairs: Vec<_> = k.iter().map(|(k, v)| format!("{k}={v:?}")).collect();
143                    pairs.sort();
144                    pairs.join(",")
145                })
146                .collect();
147            let mut seen = std::collections::HashSet::new();
148            for s in &serialised {
149                if !seen.insert(s) {
150                    return Err(DynoxideError::ValidationException(
151                        "Provided list of item keys contains duplicates".to_string(),
152                    ));
153                }
154            }
155        }
156    }
157
158    const MAX_RESPONSE_SIZE: usize = 16 * 1024 * 1024; // 16MB
159
160    let mut responses: HashMap<String, Vec<Item>> = HashMap::new();
161    let mut unprocessed_keys: HashMap<String, serde_json::Value> = HashMap::new();
162    let mut cumulative_size: usize = 0;
163    let mut size_limit_reached = false;
164    // Track per-key RCU for ConsumedCapacity (uses full item size, not projected)
165    let mut table_rcu: HashMap<String, f64> = HashMap::new();
166
167    for (table_name, keys_and_attrs) in &request.request_items {
168        let meta = helpers::require_table_for_item_op(storage, table_name)?;
169        let key_schema = helpers::parse_key_schema(&meta)?;
170
171        // Parse projection if present; also handle legacy AttributesToGet
172        let projection = if let Some(ref expr) = keys_and_attrs.projection_expression {
173            Some(expressions::projection::parse(expr).map_err(DynoxideError::ValidationException)?)
174        } else {
175            keys_and_attrs
176                .attributes_to_get
177                .as_ref()
178                .map(|attrs| crate::actions::helpers::attributes_to_get_to_projection(attrs))
179        };
180
181        let tracker = crate::expressions::TrackedExpressionAttributes::new(
182            &keys_and_attrs.expression_attribute_names,
183            &None, // BatchGetItem has no ExpressionAttributeValues
184        );
185
186        // Pre-register projection expression references
187        if let Some(ref proj) = projection {
188            tracker.track_projection_expr(proj);
189        }
190
191        // BatchGetItem does NOT automatically include key attributes in projections.
192        let key_attrs = Vec::new();
193
194        let consistent = keys_and_attrs.consistent_read.unwrap_or(false);
195        let mut table_items = Vec::new();
196        let mut remaining_keys: Vec<HashMap<String, AttributeValue>> = Vec::new();
197        let mut per_table_rcu: f64 = 0.0;
198
199        for key in &keys_and_attrs.keys {
200            if size_limit_reached {
201                remaining_keys.push(key.clone());
202                continue;
203            }
204
205            helpers::validate_key_only(key, &key_schema)?;
206            let (pk, sk) = helpers::extract_key_strings(key, &key_schema)?;
207
208            if let Some(item_json) = storage.get_item(table_name, &pk, &sk)? {
209                let item: Item = serde_json::from_str(&item_json).map_err(|e| {
210                    DynoxideError::InternalServerError(format!("Bad item JSON: {e}"))
211                })?;
212
213                // Use full item size for both capacity and response limit
214                let item_size = crate::types::item_size(&item);
215
216                if cumulative_size + item_size > MAX_RESPONSE_SIZE {
217                    size_limit_reached = true;
218                    remaining_keys.push(key.clone());
219                    continue;
220                }
221
222                cumulative_size += item_size;
223
224                // RCU is based on full item size, not projected size
225                per_table_rcu +=
226                    crate::types::read_capacity_units_with_consistency(item_size, consistent);
227
228                let result_item = if let Some(ref proj) = projection {
229                    expressions::projection::apply(&item, proj, &tracker, &key_attrs)
230                        .map_err(DynoxideError::ValidationException)?
231                } else {
232                    item
233                };
234
235                table_items.push(result_item);
236            } else {
237                // DynamoDB charges for the read attempt even if the item is not found
238                per_table_rcu += crate::types::read_capacity_units_with_consistency(0, consistent);
239            }
240        }
241
242        // Check for unused expression attribute names
243        tracker.check_unused()?;
244
245        table_rcu.insert(table_name.clone(), per_table_rcu);
246        responses.insert(table_name.clone(), table_items);
247
248        if !remaining_keys.is_empty() {
249            let mut unprocessed = serde_json::json!({
250                "Keys": remaining_keys,
251            });
252            // Preserve original request settings so the caller can retry
253            // without losing projection or consistency configuration.
254            if let Some(ref pe) = keys_and_attrs.projection_expression {
255                unprocessed["ProjectionExpression"] = serde_json::json!(pe);
256            }
257            if let Some(ref ean) = keys_and_attrs.expression_attribute_names {
258                unprocessed["ExpressionAttributeNames"] = serde_json::json!(ean);
259            }
260            if let Some(cr) = keys_and_attrs.consistent_read {
261                unprocessed["ConsistentRead"] = serde_json::json!(cr);
262            }
263            unprocessed_keys.insert(table_name.clone(), unprocessed);
264        }
265    }
266
267    // Build consumed capacity per table
268    let consumed_capacity = if matches!(
269        request.return_consumed_capacity.as_deref(),
270        Some("TOTAL") | Some("INDEXES")
271    ) {
272        let mut caps = Vec::new();
273        for table_name in request.request_items.keys() {
274            let total_rcu = table_rcu.get(table_name).copied().unwrap_or(0.0);
275            if let Some(cc) = crate::types::consumed_capacity(
276                table_name,
277                total_rcu,
278                &request.return_consumed_capacity,
279            ) {
280                caps.push(cc);
281            }
282        }
283        Some(caps)
284    } else {
285        None
286    };
287
288    Ok(BatchGetItemResponse {
289        responses,
290        unprocessed_keys,
291        consumed_capacity,
292    })
293}