Skip to main content

dynoxide/actions/
batch_write_item.rs

1use crate::actions::helpers;
2use crate::errors::{DynoxideError, Result};
3use crate::storage::Storage;
4use crate::types::{self, AttributeValue, Item};
5use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7
8#[derive(Debug, Default, Deserialize)]
9pub struct BatchWriteItemRequest {
10    #[serde(rename = "RequestItems")]
11    pub request_items: HashMap<String, Vec<WriteRequest>>,
12    #[serde(rename = "ReturnConsumedCapacity", default)]
13    pub return_consumed_capacity: Option<String>,
14    #[serde(rename = "ReturnItemCollectionMetrics", default)]
15    pub return_item_collection_metrics: Option<String>,
16}
17
18#[derive(Debug, Default, Deserialize)]
19pub struct WriteRequest {
20    #[serde(rename = "PutRequest", default)]
21    pub put_request: Option<PutRequest>,
22    #[serde(rename = "DeleteRequest", default)]
23    pub delete_request: Option<DeleteRequest>,
24}
25
26#[derive(Debug, Default, Deserialize)]
27pub struct PutRequest {
28    #[serde(rename = "Item")]
29    pub item: Item,
30}
31
32#[derive(Debug, Default, Deserialize)]
33pub struct DeleteRequest {
34    #[serde(rename = "Key")]
35    pub key: HashMap<String, AttributeValue>,
36}
37
38#[derive(Debug, Default, Serialize)]
39pub struct BatchWriteItemResponse {
40    #[serde(rename = "UnprocessedItems")]
41    pub unprocessed_items: HashMap<String, serde_json::Value>,
42    #[serde(rename = "ConsumedCapacity", skip_serializing_if = "Option::is_none")]
43    pub consumed_capacity: Option<Vec<crate::types::ConsumedCapacity>>,
44    #[serde(
45        rename = "ItemCollectionMetrics",
46        skip_serializing_if = "Option::is_none"
47    )]
48    pub item_collection_metrics: Option<HashMap<String, Vec<crate::types::ItemCollectionMetrics>>>,
49}
50
51pub fn execute(
52    storage: &Storage,
53    mut request: BatchWriteItemRequest,
54) -> Result<BatchWriteItemResponse> {
55    const MAX_REQUEST_SIZE: usize = 16 * 1024 * 1024; // 16MB
56
57    // Validate RequestItems is not empty.
58    // AWS routes the empty-map case through a separate parameter-required path
59    // rather than the standard "N validation errors detected" envelope.
60    if request.request_items.is_empty() {
61        return Err(DynoxideError::ValidationException(
62            "The requestItems parameter is required for BatchWriteItem".to_string(),
63        ));
64    }
65
66    // Validate each table entry has at least one write request
67    for (table_name, wrs) in &request.request_items {
68        if wrs.is_empty() {
69            return Err(DynoxideError::ValidationException(format!(
70                "1 validation error detected: Value at 'requestItems.{table_name}.member' failed to satisfy constraint: Member must have length greater than or equal to 1"
71            )));
72        }
73    }
74
75    // Validate table name format for all tables before checking existence
76    for table_name in request.request_items.keys() {
77        crate::validation::validate_table_name(table_name)?;
78    }
79
80    // Validate total request count.
81    // AWS surfaces this as the standard "1 validation error detected" envelope
82    // and echoes the WriteRequest list inside `Value '{<table>=[<dump>]}'`. The
83    // conformance suite anchors a regex around the envelope and the constraint
84    // phrase but leaves the dump body unconstrained (because the AWS SDK's
85    // Java-toString shape adds new AttributeValue fields over time). We emit
86    // the table name verbatim and a Rust Debug dump of the WriteRequests so
87    // the envelope matches without coupling to a specific SDK version. If a
88    // future suite tightens the regex to pin the dump exactly, this site
89    // will need a follow-up change.
90    let total_requests: usize = request.request_items.values().map(|v| v.len()).sum();
91    if total_requests > 25 {
92        let empty: Vec<WriteRequest> = Vec::new();
93        let (table_name, requests) = request
94            .request_items
95            .iter()
96            .max_by_key(|(_, v)| v.len())
97            .map(|(name, v)| (name.as_str(), v))
98            .unwrap_or(("", &empty));
99        let dump = format!("{requests:?}");
100        return Err(DynoxideError::ValidationException(format!(
101            "1 validation error detected: Value '{{{table_name}=[{dump}]}}' at 'requestItems' failed to satisfy constraint: Map value must satisfy constraint: [Member must have length less than or equal to 25, Member must have length greater than or equal to 1]"
102        )));
103    }
104
105    // --- Pre-table validations ---
106    // DynamoDB validates attribute values, item size, and empty write requests
107    // BEFORE checking table existence.
108    for write_requests in request.request_items.values() {
109        for wr in write_requests {
110            if wr.put_request.is_none() && wr.delete_request.is_none() {
111                return Err(DynoxideError::ValidationException(
112                    "Supplied AttributeValue has more than one datatypes set, must contain exactly one of the supported datatypes".to_string(),
113                ));
114            }
115            if let Some(ref put_req) = wr.put_request {
116                // Validate attribute values (empty strings, empty sets, invalid numbers)
117                crate::validation::validate_item_attribute_values(&put_req.item)?;
118
119                // Validate item size before table lookup
120                let size = types::item_size(&put_req.item);
121                if size > types::MAX_ITEM_SIZE {
122                    return Err(DynoxideError::ValidationException(
123                        "Item size has exceeded the maximum allowed size".to_string(),
124                    ));
125                }
126            }
127            if let Some(ref del_req) = wr.delete_request {
128                crate::validation::validate_item_attribute_values(&del_req.key)?;
129            }
130        }
131    }
132
133    // Validate aggregate request size
134    let total_size: usize = request
135        .request_items
136        .values()
137        .flat_map(|wrs| wrs.iter())
138        .map(|wr| {
139            if let Some(ref put_req) = wr.put_request {
140                types::item_size(&put_req.item)
141            } else if let Some(ref del_req) = wr.delete_request {
142                types::item_size(&del_req.key)
143            } else {
144                0
145            }
146        })
147        .sum();
148    if total_size > MAX_REQUEST_SIZE {
149        return Err(DynoxideError::ValidationException(
150            "Item collection too large: aggregate size of items in BatchWriteItem exceeds 16MB limit".to_string(),
151        ));
152    }
153
154    // Validate: no duplicate keys across all operations
155    {
156        let mut seen_keys: std::collections::HashSet<(String, String, String)> =
157            std::collections::HashSet::new();
158        for (table_name, write_requests) in &request.request_items {
159            let meta = helpers::require_table_for_item_op(storage, table_name)?;
160            let key_schema = helpers::parse_key_schema(&meta)?;
161            for wr in write_requests {
162                let key_item = if let Some(ref put) = wr.put_request {
163                    &put.item
164                } else if let Some(ref del) = wr.delete_request {
165                    &del.key
166                } else {
167                    continue;
168                };
169                // TODO: validation must precede this call -- if reaching this line, caller has already validated keys.
170                let (pk, sk) = helpers::extract_key_strings(key_item, &key_schema)?;
171                let key = (table_name.clone(), pk, sk);
172                if !seen_keys.insert(key) {
173                    return Err(DynoxideError::ValidationException(
174                        "Provided list of item keys contains duplicates".to_string(),
175                    ));
176                }
177            }
178        }
179    }
180
181    // Track per-table GSI capacity and affected partition keys for deferred metrics
182    let mut table_gsi_units: HashMap<String, HashMap<String, f64>> = HashMap::new();
183    // Track per-table WCU (table-level, excludes GSI)
184    let mut table_wcu: HashMap<String, f64> = HashMap::new();
185    // Collect unique (table, pk_str, pk_attr, pk_value) for deferred metrics computation
186    let mut affected_partitions: Vec<(String, String, String, AttributeValue)> = Vec::new();
187
188    // OPTIMISATION: maintain_gsis_after_write/maintain_lsis_after_write each
189    // deserialise GSI/LSI definitions from JSON on every call. For batch writes
190    // of 25 items against one table, that's 50 redundant deserialise calls.
191    // A future improvement would hoist parse_gsi_defs/parse_lsi_defs to this
192    // level and pass pre-parsed defs into the maintenance functions.
193
194    for (table_name, write_requests) in &mut request.request_items {
195        let meta = helpers::require_table_for_item_op(storage, table_name)?;
196        let key_schema = helpers::parse_key_schema(&meta)?;
197
198        for wr in write_requests {
199            if let Some(ref mut put_req) = wr.put_request {
200                // Validate keys
201                helpers::validate_item_keys(&put_req.item, &key_schema, &meta)?;
202
203                // Validate attribute values (empty strings, empty sets)
204                crate::validation::validate_item_attribute_values(&put_req.item)?;
205
206                // Normalize sets (deduplication)
207                crate::validation::normalize_item_sets(&mut put_req.item);
208
209                // Validate item size
210                let size = types::item_size(&put_req.item);
211                if size > types::MAX_ITEM_SIZE {
212                    return Err(DynoxideError::ValidationException(
213                        "Item size has exceeded the maximum allowed size".to_string(),
214                    ));
215                }
216
217                // TODO: validation must precede this call -- if reaching this line, caller has already validated keys.
218                let (pk, sk) = helpers::extract_key_strings(&put_req.item, &key_schema)?;
219                let item_json = serde_json::to_string(&put_req.item)
220                    .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
221                let hash_prefix = put_req
222                    .item
223                    .get(&key_schema.partition_key)
224                    .map(crate::storage::compute_hash_prefix)
225                    .unwrap_or_default();
226                let old_json = storage.put_item_with_hash(
227                    table_name,
228                    &pk,
229                    &sk,
230                    &item_json,
231                    size,
232                    &hash_prefix,
233                )?;
234
235                // Accumulate WCU based on item size
236                *table_wcu.entry(table_name.clone()).or_insert(0.0) +=
237                    types::write_capacity_units(size);
238
239                // Maintain GSI tables
240                let gsi_units = super::gsi::maintain_gsis_after_write(
241                    storage,
242                    table_name,
243                    &meta,
244                    &pk,
245                    &sk,
246                    &put_req.item,
247                    &key_schema.partition_key,
248                    key_schema.sort_key.as_deref(),
249                )?;
250
251                // Accumulate GSI units per table
252                let table_entry = table_gsi_units.entry(table_name.clone()).or_default();
253                for (gsi_name, units) in &gsi_units {
254                    *table_entry.entry(gsi_name.clone()).or_insert(0.0) += units;
255                }
256
257                // Maintain LSI tables
258                super::lsi::maintain_lsis_after_write(
259                    storage,
260                    table_name,
261                    &meta,
262                    &pk,
263                    &sk,
264                    &put_req.item,
265                    &key_schema.partition_key,
266                    key_schema.sort_key.as_deref(),
267                )?;
268
269                // Track affected partition for deferred metrics
270                if let Some(pk_val) = put_req.item.get(&key_schema.partition_key) {
271                    affected_partitions.push((
272                        table_name.clone(),
273                        pk.clone(),
274                        key_schema.partition_key.clone(),
275                        pk_val.clone(),
276                    ));
277                }
278
279                // Record stream event
280                let old_item: Option<Item> = old_json.and_then(|j| serde_json::from_str(&j).ok());
281                crate::streams::record_stream_event(
282                    storage,
283                    &meta,
284                    old_item.as_ref(),
285                    Some(&put_req.item),
286                )?;
287            } else if let Some(ref del_req) = wr.delete_request {
288                helpers::validate_key_only(&del_req.key, &key_schema)?;
289                // TODO: validation must precede this call -- if reaching this line, caller has already validated keys.
290                let (pk, sk) = helpers::extract_key_strings(&del_req.key, &key_schema)?;
291                let old_json = storage.delete_item(table_name, &pk, &sk)?;
292
293                // Accumulate WCU: based on old item size if it existed, else 1 WCU
294                let old_item: Option<Item> =
295                    old_json.as_ref().and_then(|j| serde_json::from_str(j).ok());
296                let delete_wcu = if let Some(ref old) = old_item {
297                    types::write_capacity_units(types::item_size(old))
298                } else {
299                    1.0
300                };
301                *table_wcu.entry(table_name.clone()).or_insert(0.0) += delete_wcu;
302
303                // Maintain GSI tables
304                let gsi_units =
305                    super::gsi::maintain_gsis_after_delete(storage, table_name, &meta, &pk, &sk)?;
306
307                // Accumulate GSI units per table
308                let table_entry = table_gsi_units.entry(table_name.clone()).or_default();
309                for (gsi_name, units) in &gsi_units {
310                    *table_entry.entry(gsi_name.clone()).or_insert(0.0) += units;
311                }
312
313                // Maintain LSI tables
314                super::lsi::maintain_lsis_after_delete(storage, table_name, &meta, &pk, &sk)?;
315
316                // Track affected partition for deferred metrics
317                if let Some(pk_val) = del_req.key.get(&key_schema.partition_key) {
318                    affected_partitions.push((
319                        table_name.clone(),
320                        pk.clone(),
321                        key_schema.partition_key.clone(),
322                        pk_val.clone(),
323                    ));
324                }
325
326                // Record stream event (old_item already parsed above)
327                if old_item.is_some() {
328                    crate::streams::record_stream_event(storage, &meta, old_item.as_ref(), None)?;
329                }
330            } else {
331                return Err(DynoxideError::ValidationException(
332                    "WriteRequest must contain either PutRequest or DeleteRequest".to_string(),
333                ));
334            }
335        }
336    }
337
338    // Build consumed capacity per table using pre-tracked WCU
339    let consumed_capacity = if matches!(
340        request.return_consumed_capacity.as_deref(),
341        Some("TOTAL") | Some("INDEXES")
342    ) {
343        let mut caps = Vec::new();
344        for table_name in request.request_items.keys() {
345            let total_wcu = table_wcu.get(table_name).copied().unwrap_or(0.0);
346            let gsi_units = table_gsi_units.get(table_name).cloned().unwrap_or_default();
347            if let Some(cc) = crate::types::consumed_capacity_with_indexes(
348                table_name,
349                total_wcu,
350                &gsi_units,
351                &request.return_consumed_capacity,
352            ) {
353                caps.push(cc);
354            }
355        }
356        Some(caps)
357    } else {
358        None
359    };
360
361    // Compute item collection metrics once per unique (table, pk) — deferred from the write loop
362    let mut all_item_collection_metrics: HashMap<String, Vec<crate::types::ItemCollectionMetrics>> =
363        HashMap::new();
364    if matches!(
365        request.return_item_collection_metrics.as_deref(),
366        Some("SIZE")
367    ) {
368        // Deduplicate by (table, pk) to avoid redundant queries
369        let mut seen = std::collections::HashSet::new();
370        for (tbl, pk_str, pk_attr, pk_val) in &affected_partitions {
371            let key = (tbl.as_str(), pk_str.as_str());
372            if !seen.insert(key) {
373                continue;
374            }
375            let meta = helpers::require_table(storage, tbl)?;
376            if let Some(icm) = helpers::build_item_collection_metrics(
377                storage,
378                &meta,
379                tbl,
380                pk_str,
381                pk_attr,
382                pk_val,
383                &request.return_item_collection_metrics,
384            )? {
385                all_item_collection_metrics
386                    .entry(tbl.clone())
387                    .or_default()
388                    .push(icm);
389            }
390        }
391    }
392    let item_collection_metrics = if all_item_collection_metrics.is_empty() {
393        None
394    } else {
395        Some(all_item_collection_metrics)
396    };
397
398    Ok(BatchWriteItemResponse {
399        unprocessed_items: HashMap::new(),
400        consumed_capacity,
401        item_collection_metrics,
402    })
403}