Skip to main content

dynoxide/actions/
batch_write_item.rs

1use crate::actions::helpers;
2use crate::errors::{DynoxideError, Result};
3use crate::storage::Storage;
4use crate::types::{self, AttributeValue, Item};
5use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7
8#[derive(Debug, Default, Deserialize)]
9pub struct BatchWriteItemRequest {
10    #[serde(rename = "RequestItems")]
11    pub request_items: HashMap<String, Vec<WriteRequest>>,
12    #[serde(rename = "ReturnConsumedCapacity", default)]
13    pub return_consumed_capacity: Option<String>,
14    #[serde(rename = "ReturnItemCollectionMetrics", default)]
15    pub return_item_collection_metrics: Option<String>,
16}
17
18#[derive(Debug, Default, Deserialize)]
19pub struct WriteRequest {
20    #[serde(rename = "PutRequest", default)]
21    pub put_request: Option<PutRequest>,
22    #[serde(rename = "DeleteRequest", default)]
23    pub delete_request: Option<DeleteRequest>,
24}
25
26#[derive(Debug, Default, Deserialize)]
27pub struct PutRequest {
28    #[serde(rename = "Item")]
29    pub item: Item,
30}
31
32#[derive(Debug, Default, Deserialize)]
33pub struct DeleteRequest {
34    #[serde(rename = "Key")]
35    pub key: HashMap<String, AttributeValue>,
36}
37
38#[derive(Debug, Default, Serialize)]
39pub struct BatchWriteItemResponse {
40    #[serde(rename = "UnprocessedItems")]
41    pub unprocessed_items: HashMap<String, serde_json::Value>,
42    #[serde(rename = "ConsumedCapacity", skip_serializing_if = "Option::is_none")]
43    pub consumed_capacity: Option<Vec<crate::types::ConsumedCapacity>>,
44    #[serde(
45        rename = "ItemCollectionMetrics",
46        skip_serializing_if = "Option::is_none"
47    )]
48    pub item_collection_metrics: Option<HashMap<String, Vec<crate::types::ItemCollectionMetrics>>>,
49}
50
51pub fn execute(
52    storage: &Storage,
53    mut request: BatchWriteItemRequest,
54) -> Result<BatchWriteItemResponse> {
55    const MAX_REQUEST_SIZE: usize = 16 * 1024 * 1024; // 16MB
56
57    // Validate RequestItems is not empty
58    if request.request_items.is_empty() {
59        return Err(DynoxideError::ValidationException(
60            "1 validation error detected: Value at 'requestItems' failed to satisfy constraint: Member must have length greater than or equal to 1".to_string(),
61        ));
62    }
63
64    // Validate each table entry has at least one write request
65    for (table_name, wrs) in &request.request_items {
66        if wrs.is_empty() {
67            return Err(DynoxideError::ValidationException(format!(
68                "1 validation error detected: Value at 'requestItems.{table_name}.member' failed to satisfy constraint: Member must have length greater than or equal to 1"
69            )));
70        }
71    }
72
73    // Validate table name format for all tables before checking existence
74    for table_name in request.request_items.keys() {
75        crate::validation::validate_table_name(table_name)?;
76    }
77
78    // Validate total request count
79    let total_requests: usize = request.request_items.values().map(|v| v.len()).sum();
80    if total_requests > 25 {
81        return Err(DynoxideError::ValidationException(
82            "Too many items requested for the BatchWriteItem call".to_string(),
83        ));
84    }
85
86    // --- Pre-table validations ---
87    // DynamoDB validates attribute values, item size, and empty write requests
88    // BEFORE checking table existence.
89    for write_requests in request.request_items.values() {
90        for wr in write_requests {
91            if wr.put_request.is_none() && wr.delete_request.is_none() {
92                return Err(DynoxideError::ValidationException(
93                    "Supplied AttributeValue has more than one datatypes set, must contain exactly one of the supported datatypes".to_string(),
94                ));
95            }
96            if let Some(ref put_req) = wr.put_request {
97                // Validate attribute values (empty strings, empty sets, invalid numbers)
98                crate::validation::validate_item_attribute_values(&put_req.item)?;
99
100                // Validate item size before table lookup
101                let size = types::item_size(&put_req.item);
102                if size > types::MAX_ITEM_SIZE {
103                    return Err(DynoxideError::ValidationException(
104                        "Item size has exceeded the maximum allowed size".to_string(),
105                    ));
106                }
107            }
108            if let Some(ref del_req) = wr.delete_request {
109                crate::validation::validate_item_attribute_values(&del_req.key)?;
110            }
111        }
112    }
113
114    // Validate aggregate request size
115    let total_size: usize = request
116        .request_items
117        .values()
118        .flat_map(|wrs| wrs.iter())
119        .map(|wr| {
120            if let Some(ref put_req) = wr.put_request {
121                types::item_size(&put_req.item)
122            } else if let Some(ref del_req) = wr.delete_request {
123                types::item_size(&del_req.key)
124            } else {
125                0
126            }
127        })
128        .sum();
129    if total_size > MAX_REQUEST_SIZE {
130        return Err(DynoxideError::ValidationException(
131            "Item collection too large: aggregate size of items in BatchWriteItem exceeds 16MB limit".to_string(),
132        ));
133    }
134
135    // Validate: no duplicate keys across all operations
136    {
137        let mut seen_keys: std::collections::HashSet<(String, String, String)> =
138            std::collections::HashSet::new();
139        for (table_name, write_requests) in &request.request_items {
140            let meta = helpers::require_table_for_item_op(storage, table_name)?;
141            let key_schema = helpers::parse_key_schema(&meta)?;
142            for wr in write_requests {
143                let key_item = if let Some(ref put) = wr.put_request {
144                    &put.item
145                } else if let Some(ref del) = wr.delete_request {
146                    &del.key
147                } else {
148                    continue;
149                };
150                let (pk, sk) = helpers::extract_key_strings(key_item, &key_schema)?;
151                let key = (table_name.clone(), pk, sk);
152                if !seen_keys.insert(key) {
153                    return Err(DynoxideError::ValidationException(
154                        "Provided list of item keys contains duplicates".to_string(),
155                    ));
156                }
157            }
158        }
159    }
160
161    // Track per-table GSI capacity and affected partition keys for deferred metrics
162    let mut table_gsi_units: HashMap<String, HashMap<String, f64>> = HashMap::new();
163    // Track per-table WCU (table-level, excludes GSI)
164    let mut table_wcu: HashMap<String, f64> = HashMap::new();
165    // Collect unique (table, pk_str, pk_attr, pk_value) for deferred metrics computation
166    let mut affected_partitions: Vec<(String, String, String, AttributeValue)> = Vec::new();
167
168    // OPTIMISATION: maintain_gsis_after_write/maintain_lsis_after_write each
169    // deserialise GSI/LSI definitions from JSON on every call. For batch writes
170    // of 25 items against one table, that's 50 redundant deserialise calls.
171    // A future improvement would hoist parse_gsi_defs/parse_lsi_defs to this
172    // level and pass pre-parsed defs into the maintenance functions.
173
174    for (table_name, write_requests) in &mut request.request_items {
175        let meta = helpers::require_table_for_item_op(storage, table_name)?;
176        let key_schema = helpers::parse_key_schema(&meta)?;
177
178        for wr in write_requests {
179            if let Some(ref mut put_req) = wr.put_request {
180                // Validate keys
181                helpers::validate_item_keys(&put_req.item, &key_schema, &meta)?;
182
183                // Validate attribute values (empty strings, empty sets)
184                crate::validation::validate_item_attribute_values(&put_req.item)?;
185
186                // Normalize sets (deduplication)
187                crate::validation::normalize_item_sets(&mut put_req.item);
188
189                // Validate item size
190                let size = types::item_size(&put_req.item);
191                if size > types::MAX_ITEM_SIZE {
192                    return Err(DynoxideError::ValidationException(
193                        "Item size has exceeded the maximum allowed size".to_string(),
194                    ));
195                }
196
197                let (pk, sk) = helpers::extract_key_strings(&put_req.item, &key_schema)?;
198                let item_json = serde_json::to_string(&put_req.item)
199                    .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
200                let hash_prefix = put_req
201                    .item
202                    .get(&key_schema.partition_key)
203                    .map(crate::storage::compute_hash_prefix)
204                    .unwrap_or_default();
205                let old_json = storage.put_item_with_hash(
206                    table_name,
207                    &pk,
208                    &sk,
209                    &item_json,
210                    size,
211                    &hash_prefix,
212                )?;
213
214                // Accumulate WCU based on item size
215                *table_wcu.entry(table_name.clone()).or_insert(0.0) +=
216                    types::write_capacity_units(size);
217
218                // Maintain GSI tables
219                let gsi_units = super::gsi::maintain_gsis_after_write(
220                    storage,
221                    table_name,
222                    &meta,
223                    &pk,
224                    &sk,
225                    &put_req.item,
226                    &key_schema.partition_key,
227                    key_schema.sort_key.as_deref(),
228                )?;
229
230                // Accumulate GSI units per table
231                let table_entry = table_gsi_units.entry(table_name.clone()).or_default();
232                for (gsi_name, units) in &gsi_units {
233                    *table_entry.entry(gsi_name.clone()).or_insert(0.0) += units;
234                }
235
236                // Maintain LSI tables
237                super::lsi::maintain_lsis_after_write(
238                    storage,
239                    table_name,
240                    &meta,
241                    &pk,
242                    &sk,
243                    &put_req.item,
244                    &key_schema.partition_key,
245                    key_schema.sort_key.as_deref(),
246                )?;
247
248                // Track affected partition for deferred metrics
249                if let Some(pk_val) = put_req.item.get(&key_schema.partition_key) {
250                    affected_partitions.push((
251                        table_name.clone(),
252                        pk.clone(),
253                        key_schema.partition_key.clone(),
254                        pk_val.clone(),
255                    ));
256                }
257
258                // Record stream event
259                let old_item: Option<Item> = old_json.and_then(|j| serde_json::from_str(&j).ok());
260                crate::streams::record_stream_event(
261                    storage,
262                    &meta,
263                    old_item.as_ref(),
264                    Some(&put_req.item),
265                )?;
266            } else if let Some(ref del_req) = wr.delete_request {
267                helpers::validate_key_only(&del_req.key, &key_schema)?;
268                let (pk, sk) = helpers::extract_key_strings(&del_req.key, &key_schema)?;
269                let old_json = storage.delete_item(table_name, &pk, &sk)?;
270
271                // Accumulate WCU: based on old item size if it existed, else 1 WCU
272                let old_item: Option<Item> =
273                    old_json.as_ref().and_then(|j| serde_json::from_str(j).ok());
274                let delete_wcu = if let Some(ref old) = old_item {
275                    types::write_capacity_units(types::item_size(old))
276                } else {
277                    1.0
278                };
279                *table_wcu.entry(table_name.clone()).or_insert(0.0) += delete_wcu;
280
281                // Maintain GSI tables
282                let gsi_units =
283                    super::gsi::maintain_gsis_after_delete(storage, table_name, &meta, &pk, &sk)?;
284
285                // Accumulate GSI units per table
286                let table_entry = table_gsi_units.entry(table_name.clone()).or_default();
287                for (gsi_name, units) in &gsi_units {
288                    *table_entry.entry(gsi_name.clone()).or_insert(0.0) += units;
289                }
290
291                // Maintain LSI tables
292                super::lsi::maintain_lsis_after_delete(storage, table_name, &meta, &pk, &sk)?;
293
294                // Track affected partition for deferred metrics
295                if let Some(pk_val) = del_req.key.get(&key_schema.partition_key) {
296                    affected_partitions.push((
297                        table_name.clone(),
298                        pk.clone(),
299                        key_schema.partition_key.clone(),
300                        pk_val.clone(),
301                    ));
302                }
303
304                // Record stream event (old_item already parsed above)
305                if old_item.is_some() {
306                    crate::streams::record_stream_event(storage, &meta, old_item.as_ref(), None)?;
307                }
308            } else {
309                return Err(DynoxideError::ValidationException(
310                    "WriteRequest must contain either PutRequest or DeleteRequest".to_string(),
311                ));
312            }
313        }
314    }
315
316    // Build consumed capacity per table using pre-tracked WCU
317    let consumed_capacity = if matches!(
318        request.return_consumed_capacity.as_deref(),
319        Some("TOTAL") | Some("INDEXES")
320    ) {
321        let mut caps = Vec::new();
322        for table_name in request.request_items.keys() {
323            let total_wcu = table_wcu.get(table_name).copied().unwrap_or(0.0);
324            let gsi_units = table_gsi_units.get(table_name).cloned().unwrap_or_default();
325            if let Some(cc) = crate::types::consumed_capacity_with_indexes(
326                table_name,
327                total_wcu,
328                &gsi_units,
329                &request.return_consumed_capacity,
330            ) {
331                caps.push(cc);
332            }
333        }
334        Some(caps)
335    } else {
336        None
337    };
338
339    // Compute item collection metrics once per unique (table, pk) — deferred from the write loop
340    let mut all_item_collection_metrics: HashMap<String, Vec<crate::types::ItemCollectionMetrics>> =
341        HashMap::new();
342    if matches!(
343        request.return_item_collection_metrics.as_deref(),
344        Some("SIZE")
345    ) {
346        // Deduplicate by (table, pk) to avoid redundant queries
347        let mut seen = std::collections::HashSet::new();
348        for (tbl, pk_str, pk_attr, pk_val) in &affected_partitions {
349            let key = (tbl.as_str(), pk_str.as_str());
350            if !seen.insert(key) {
351                continue;
352            }
353            let meta = helpers::require_table(storage, tbl)?;
354            if let Some(icm) = helpers::build_item_collection_metrics(
355                storage,
356                &meta,
357                tbl,
358                pk_str,
359                pk_attr,
360                pk_val,
361                &request.return_item_collection_metrics,
362            )? {
363                all_item_collection_metrics
364                    .entry(tbl.clone())
365                    .or_default()
366                    .push(icm);
367            }
368        }
369    }
370    let item_collection_metrics = if all_item_collection_metrics.is_empty() {
371        None
372    } else {
373        Some(all_item_collection_metrics)
374    };
375
376    Ok(BatchWriteItemResponse {
377        unprocessed_items: HashMap::new(),
378        consumed_capacity,
379        item_collection_metrics,
380    })
381}