1use crate::actions::helpers;
2use crate::errors::{DynoxideError, Result};
3use crate::storage::Storage;
4use crate::types::{self, AttributeValue, Item};
5use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7
8#[derive(Debug, Default, Deserialize)]
9pub struct BatchWriteItemRequest {
10 #[serde(rename = "RequestItems")]
11 pub request_items: HashMap<String, Vec<WriteRequest>>,
12 #[serde(rename = "ReturnConsumedCapacity", default)]
13 pub return_consumed_capacity: Option<String>,
14 #[serde(rename = "ReturnItemCollectionMetrics", default)]
15 pub return_item_collection_metrics: Option<String>,
16}
17
18#[derive(Debug, Default, Deserialize)]
19pub struct WriteRequest {
20 #[serde(rename = "PutRequest", default)]
21 pub put_request: Option<PutRequest>,
22 #[serde(rename = "DeleteRequest", default)]
23 pub delete_request: Option<DeleteRequest>,
24}
25
26#[derive(Debug, Default, Deserialize)]
27pub struct PutRequest {
28 #[serde(rename = "Item")]
29 pub item: Item,
30}
31
32#[derive(Debug, Default, Deserialize)]
33pub struct DeleteRequest {
34 #[serde(rename = "Key")]
35 pub key: HashMap<String, AttributeValue>,
36}
37
38#[derive(Debug, Default, Serialize)]
39pub struct BatchWriteItemResponse {
40 #[serde(rename = "UnprocessedItems")]
41 pub unprocessed_items: HashMap<String, serde_json::Value>,
42 #[serde(rename = "ConsumedCapacity", skip_serializing_if = "Option::is_none")]
43 pub consumed_capacity: Option<Vec<crate::types::ConsumedCapacity>>,
44 #[serde(
45 rename = "ItemCollectionMetrics",
46 skip_serializing_if = "Option::is_none"
47 )]
48 pub item_collection_metrics: Option<HashMap<String, Vec<crate::types::ItemCollectionMetrics>>>,
49}
50
51pub fn execute(
52 storage: &Storage,
53 mut request: BatchWriteItemRequest,
54) -> Result<BatchWriteItemResponse> {
55 const MAX_REQUEST_SIZE: usize = 16 * 1024 * 1024; if request.request_items.is_empty() {
59 return Err(DynoxideError::ValidationException(
60 "1 validation error detected: Value at 'requestItems' failed to satisfy constraint: Member must have length greater than or equal to 1".to_string(),
61 ));
62 }
63
64 for (table_name, wrs) in &request.request_items {
66 if wrs.is_empty() {
67 return Err(DynoxideError::ValidationException(format!(
68 "1 validation error detected: Value at 'requestItems.{table_name}.member' failed to satisfy constraint: Member must have length greater than or equal to 1"
69 )));
70 }
71 }
72
73 for table_name in request.request_items.keys() {
75 crate::validation::validate_table_name(table_name)?;
76 }
77
78 let total_requests: usize = request.request_items.values().map(|v| v.len()).sum();
80 if total_requests > 25 {
81 return Err(DynoxideError::ValidationException(
82 "Too many items requested for the BatchWriteItem call".to_string(),
83 ));
84 }
85
86 for write_requests in request.request_items.values() {
90 for wr in write_requests {
91 if wr.put_request.is_none() && wr.delete_request.is_none() {
92 return Err(DynoxideError::ValidationException(
93 "Supplied AttributeValue has more than one datatypes set, must contain exactly one of the supported datatypes".to_string(),
94 ));
95 }
96 if let Some(ref put_req) = wr.put_request {
97 crate::validation::validate_item_attribute_values(&put_req.item)?;
99
100 let size = types::item_size(&put_req.item);
102 if size > types::MAX_ITEM_SIZE {
103 return Err(DynoxideError::ValidationException(
104 "Item size has exceeded the maximum allowed size".to_string(),
105 ));
106 }
107 }
108 if let Some(ref del_req) = wr.delete_request {
109 crate::validation::validate_item_attribute_values(&del_req.key)?;
110 }
111 }
112 }
113
114 let total_size: usize = request
116 .request_items
117 .values()
118 .flat_map(|wrs| wrs.iter())
119 .map(|wr| {
120 if let Some(ref put_req) = wr.put_request {
121 types::item_size(&put_req.item)
122 } else if let Some(ref del_req) = wr.delete_request {
123 types::item_size(&del_req.key)
124 } else {
125 0
126 }
127 })
128 .sum();
129 if total_size > MAX_REQUEST_SIZE {
130 return Err(DynoxideError::ValidationException(
131 "Item collection too large: aggregate size of items in BatchWriteItem exceeds 16MB limit".to_string(),
132 ));
133 }
134
135 {
137 let mut seen_keys: std::collections::HashSet<(String, String, String)> =
138 std::collections::HashSet::new();
139 for (table_name, write_requests) in &request.request_items {
140 let meta = helpers::require_table_for_item_op(storage, table_name)?;
141 let key_schema = helpers::parse_key_schema(&meta)?;
142 for wr in write_requests {
143 let key_item = if let Some(ref put) = wr.put_request {
144 &put.item
145 } else if let Some(ref del) = wr.delete_request {
146 &del.key
147 } else {
148 continue;
149 };
150 let (pk, sk) = helpers::extract_key_strings(key_item, &key_schema)?;
151 let key = (table_name.clone(), pk, sk);
152 if !seen_keys.insert(key) {
153 return Err(DynoxideError::ValidationException(
154 "Provided list of item keys contains duplicates".to_string(),
155 ));
156 }
157 }
158 }
159 }
160
161 let mut table_gsi_units: HashMap<String, HashMap<String, f64>> = HashMap::new();
163 let mut table_wcu: HashMap<String, f64> = HashMap::new();
165 let mut affected_partitions: Vec<(String, String, String, AttributeValue)> = Vec::new();
167
168 for (table_name, write_requests) in &mut request.request_items {
175 let meta = helpers::require_table_for_item_op(storage, table_name)?;
176 let key_schema = helpers::parse_key_schema(&meta)?;
177
178 for wr in write_requests {
179 if let Some(ref mut put_req) = wr.put_request {
180 helpers::validate_item_keys(&put_req.item, &key_schema, &meta)?;
182
183 crate::validation::validate_item_attribute_values(&put_req.item)?;
185
186 crate::validation::normalize_item_sets(&mut put_req.item);
188
189 let size = types::item_size(&put_req.item);
191 if size > types::MAX_ITEM_SIZE {
192 return Err(DynoxideError::ValidationException(
193 "Item size has exceeded the maximum allowed size".to_string(),
194 ));
195 }
196
197 let (pk, sk) = helpers::extract_key_strings(&put_req.item, &key_schema)?;
198 let item_json = serde_json::to_string(&put_req.item)
199 .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
200 let hash_prefix = put_req
201 .item
202 .get(&key_schema.partition_key)
203 .map(crate::storage::compute_hash_prefix)
204 .unwrap_or_default();
205 let old_json = storage.put_item_with_hash(
206 table_name,
207 &pk,
208 &sk,
209 &item_json,
210 size,
211 &hash_prefix,
212 )?;
213
214 *table_wcu.entry(table_name.clone()).or_insert(0.0) +=
216 types::write_capacity_units(size);
217
218 let gsi_units = super::gsi::maintain_gsis_after_write(
220 storage,
221 table_name,
222 &meta,
223 &pk,
224 &sk,
225 &put_req.item,
226 &key_schema.partition_key,
227 key_schema.sort_key.as_deref(),
228 )?;
229
230 let table_entry = table_gsi_units.entry(table_name.clone()).or_default();
232 for (gsi_name, units) in &gsi_units {
233 *table_entry.entry(gsi_name.clone()).or_insert(0.0) += units;
234 }
235
236 super::lsi::maintain_lsis_after_write(
238 storage,
239 table_name,
240 &meta,
241 &pk,
242 &sk,
243 &put_req.item,
244 &key_schema.partition_key,
245 key_schema.sort_key.as_deref(),
246 )?;
247
248 if let Some(pk_val) = put_req.item.get(&key_schema.partition_key) {
250 affected_partitions.push((
251 table_name.clone(),
252 pk.clone(),
253 key_schema.partition_key.clone(),
254 pk_val.clone(),
255 ));
256 }
257
258 let old_item: Option<Item> = old_json.and_then(|j| serde_json::from_str(&j).ok());
260 crate::streams::record_stream_event(
261 storage,
262 &meta,
263 old_item.as_ref(),
264 Some(&put_req.item),
265 )?;
266 } else if let Some(ref del_req) = wr.delete_request {
267 helpers::validate_key_only(&del_req.key, &key_schema)?;
268 let (pk, sk) = helpers::extract_key_strings(&del_req.key, &key_schema)?;
269 let old_json = storage.delete_item(table_name, &pk, &sk)?;
270
271 let old_item: Option<Item> =
273 old_json.as_ref().and_then(|j| serde_json::from_str(j).ok());
274 let delete_wcu = if let Some(ref old) = old_item {
275 types::write_capacity_units(types::item_size(old))
276 } else {
277 1.0
278 };
279 *table_wcu.entry(table_name.clone()).or_insert(0.0) += delete_wcu;
280
281 let gsi_units =
283 super::gsi::maintain_gsis_after_delete(storage, table_name, &meta, &pk, &sk)?;
284
285 let table_entry = table_gsi_units.entry(table_name.clone()).or_default();
287 for (gsi_name, units) in &gsi_units {
288 *table_entry.entry(gsi_name.clone()).or_insert(0.0) += units;
289 }
290
291 super::lsi::maintain_lsis_after_delete(storage, table_name, &meta, &pk, &sk)?;
293
294 if let Some(pk_val) = del_req.key.get(&key_schema.partition_key) {
296 affected_partitions.push((
297 table_name.clone(),
298 pk.clone(),
299 key_schema.partition_key.clone(),
300 pk_val.clone(),
301 ));
302 }
303
304 if old_item.is_some() {
306 crate::streams::record_stream_event(storage, &meta, old_item.as_ref(), None)?;
307 }
308 } else {
309 return Err(DynoxideError::ValidationException(
310 "WriteRequest must contain either PutRequest or DeleteRequest".to_string(),
311 ));
312 }
313 }
314 }
315
316 let consumed_capacity = if matches!(
318 request.return_consumed_capacity.as_deref(),
319 Some("TOTAL") | Some("INDEXES")
320 ) {
321 let mut caps = Vec::new();
322 for table_name in request.request_items.keys() {
323 let total_wcu = table_wcu.get(table_name).copied().unwrap_or(0.0);
324 let gsi_units = table_gsi_units.get(table_name).cloned().unwrap_or_default();
325 if let Some(cc) = crate::types::consumed_capacity_with_indexes(
326 table_name,
327 total_wcu,
328 &gsi_units,
329 &request.return_consumed_capacity,
330 ) {
331 caps.push(cc);
332 }
333 }
334 Some(caps)
335 } else {
336 None
337 };
338
339 let mut all_item_collection_metrics: HashMap<String, Vec<crate::types::ItemCollectionMetrics>> =
341 HashMap::new();
342 if matches!(
343 request.return_item_collection_metrics.as_deref(),
344 Some("SIZE")
345 ) {
346 let mut seen = std::collections::HashSet::new();
348 for (tbl, pk_str, pk_attr, pk_val) in &affected_partitions {
349 let key = (tbl.as_str(), pk_str.as_str());
350 if !seen.insert(key) {
351 continue;
352 }
353 let meta = helpers::require_table(storage, tbl)?;
354 if let Some(icm) = helpers::build_item_collection_metrics(
355 storage,
356 &meta,
357 tbl,
358 pk_str,
359 pk_attr,
360 pk_val,
361 &request.return_item_collection_metrics,
362 )? {
363 all_item_collection_metrics
364 .entry(tbl.clone())
365 .or_default()
366 .push(icm);
367 }
368 }
369 }
370 let item_collection_metrics = if all_item_collection_metrics.is_empty() {
371 None
372 } else {
373 Some(all_item_collection_metrics)
374 };
375
376 Ok(BatchWriteItemResponse {
377 unprocessed_items: HashMap::new(),
378 consumed_capacity,
379 item_collection_metrics,
380 })
381}