1use crate::actions::helpers;
2use crate::errors::{DynoxideError, Result};
3use crate::storage::Storage;
4use crate::types::{self, AttributeValue, Item};
5use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7
8#[derive(Debug, Default, Deserialize)]
9pub struct BatchWriteItemRequest {
10 #[serde(rename = "RequestItems")]
11 pub request_items: HashMap<String, Vec<WriteRequest>>,
12 #[serde(rename = "ReturnConsumedCapacity", default)]
13 pub return_consumed_capacity: Option<String>,
14 #[serde(rename = "ReturnItemCollectionMetrics", default)]
15 pub return_item_collection_metrics: Option<String>,
16}
17
18#[derive(Debug, Default, Deserialize)]
19pub struct WriteRequest {
20 #[serde(rename = "PutRequest", default)]
21 pub put_request: Option<PutRequest>,
22 #[serde(rename = "DeleteRequest", default)]
23 pub delete_request: Option<DeleteRequest>,
24}
25
26#[derive(Debug, Default, Deserialize)]
27pub struct PutRequest {
28 #[serde(rename = "Item")]
29 pub item: Item,
30}
31
32#[derive(Debug, Default, Deserialize)]
33pub struct DeleteRequest {
34 #[serde(rename = "Key")]
35 pub key: HashMap<String, AttributeValue>,
36}
37
38#[derive(Debug, Default, Serialize)]
39pub struct BatchWriteItemResponse {
40 #[serde(rename = "UnprocessedItems")]
41 pub unprocessed_items: HashMap<String, serde_json::Value>,
42 #[serde(rename = "ConsumedCapacity", skip_serializing_if = "Option::is_none")]
43 pub consumed_capacity: Option<Vec<crate::types::ConsumedCapacity>>,
44 #[serde(
45 rename = "ItemCollectionMetrics",
46 skip_serializing_if = "Option::is_none"
47 )]
48 pub item_collection_metrics: Option<HashMap<String, Vec<crate::types::ItemCollectionMetrics>>>,
49}
50
51pub fn execute(
52 storage: &Storage,
53 mut request: BatchWriteItemRequest,
54) -> Result<BatchWriteItemResponse> {
55 const MAX_REQUEST_SIZE: usize = 16 * 1024 * 1024; if request.request_items.is_empty() {
61 return Err(DynoxideError::ValidationException(
62 "The requestItems parameter is required for BatchWriteItem".to_string(),
63 ));
64 }
65
66 for (table_name, wrs) in &request.request_items {
68 if wrs.is_empty() {
69 return Err(DynoxideError::ValidationException(format!(
70 "1 validation error detected: Value at 'requestItems.{table_name}.member' failed to satisfy constraint: Member must have length greater than or equal to 1"
71 )));
72 }
73 }
74
75 for table_name in request.request_items.keys() {
77 crate::validation::validate_table_name(table_name)?;
78 }
79
80 let total_requests: usize = request.request_items.values().map(|v| v.len()).sum();
91 if total_requests > 25 {
92 let empty: Vec<WriteRequest> = Vec::new();
93 let (table_name, requests) = request
94 .request_items
95 .iter()
96 .max_by_key(|(_, v)| v.len())
97 .map(|(name, v)| (name.as_str(), v))
98 .unwrap_or(("", &empty));
99 let dump = format!("{requests:?}");
100 return Err(DynoxideError::ValidationException(format!(
101 "1 validation error detected: Value '{{{table_name}=[{dump}]}}' at 'requestItems' failed to satisfy constraint: Map value must satisfy constraint: [Member must have length less than or equal to 25, Member must have length greater than or equal to 1]"
102 )));
103 }
104
105 for write_requests in request.request_items.values() {
109 for wr in write_requests {
110 if wr.put_request.is_none() && wr.delete_request.is_none() {
111 return Err(DynoxideError::ValidationException(
112 "Supplied AttributeValue has more than one datatypes set, must contain exactly one of the supported datatypes".to_string(),
113 ));
114 }
115 if let Some(ref put_req) = wr.put_request {
116 crate::validation::validate_item_attribute_values(&put_req.item)?;
118
119 let size = types::item_size(&put_req.item);
121 if size > types::MAX_ITEM_SIZE {
122 return Err(DynoxideError::ValidationException(
123 "Item size has exceeded the maximum allowed size".to_string(),
124 ));
125 }
126 }
127 if let Some(ref del_req) = wr.delete_request {
128 crate::validation::validate_item_attribute_values(&del_req.key)?;
129 }
130 }
131 }
132
133 let total_size: usize = request
135 .request_items
136 .values()
137 .flat_map(|wrs| wrs.iter())
138 .map(|wr| {
139 if let Some(ref put_req) = wr.put_request {
140 types::item_size(&put_req.item)
141 } else if let Some(ref del_req) = wr.delete_request {
142 types::item_size(&del_req.key)
143 } else {
144 0
145 }
146 })
147 .sum();
148 if total_size > MAX_REQUEST_SIZE {
149 return Err(DynoxideError::ValidationException(
150 "Item collection too large: aggregate size of items in BatchWriteItem exceeds 16MB limit".to_string(),
151 ));
152 }
153
154 {
156 let mut seen_keys: std::collections::HashSet<(String, String, String)> =
157 std::collections::HashSet::new();
158 for (table_name, write_requests) in &request.request_items {
159 let meta = helpers::require_table_for_item_op(storage, table_name)?;
160 let key_schema = helpers::parse_key_schema(&meta)?;
161 for wr in write_requests {
162 let key_item = if let Some(ref put) = wr.put_request {
163 &put.item
164 } else if let Some(ref del) = wr.delete_request {
165 &del.key
166 } else {
167 continue;
168 };
169 let (pk, sk) = helpers::extract_key_strings(key_item, &key_schema)?;
171 let key = (table_name.clone(), pk, sk);
172 if !seen_keys.insert(key) {
173 return Err(DynoxideError::ValidationException(
174 "Provided list of item keys contains duplicates".to_string(),
175 ));
176 }
177 }
178 }
179 }
180
181 let mut table_gsi_units: HashMap<String, HashMap<String, f64>> = HashMap::new();
183 let mut table_wcu: HashMap<String, f64> = HashMap::new();
185 let mut affected_partitions: Vec<(String, String, String, AttributeValue)> = Vec::new();
187
188 for (table_name, write_requests) in &mut request.request_items {
195 let meta = helpers::require_table_for_item_op(storage, table_name)?;
196 let key_schema = helpers::parse_key_schema(&meta)?;
197
198 for wr in write_requests {
199 if let Some(ref mut put_req) = wr.put_request {
200 helpers::validate_item_keys(&put_req.item, &key_schema, &meta)?;
202
203 crate::validation::validate_item_attribute_values(&put_req.item)?;
205
206 crate::validation::normalize_item_sets(&mut put_req.item);
208
209 let size = types::item_size(&put_req.item);
211 if size > types::MAX_ITEM_SIZE {
212 return Err(DynoxideError::ValidationException(
213 "Item size has exceeded the maximum allowed size".to_string(),
214 ));
215 }
216
217 let (pk, sk) = helpers::extract_key_strings(&put_req.item, &key_schema)?;
219 let item_json = serde_json::to_string(&put_req.item)
220 .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
221 let hash_prefix = put_req
222 .item
223 .get(&key_schema.partition_key)
224 .map(crate::storage::compute_hash_prefix)
225 .unwrap_or_default();
226 let old_json = storage.put_item_with_hash(
227 table_name,
228 &pk,
229 &sk,
230 &item_json,
231 size,
232 &hash_prefix,
233 )?;
234
235 *table_wcu.entry(table_name.clone()).or_insert(0.0) +=
237 types::write_capacity_units(size);
238
239 let gsi_units = super::gsi::maintain_gsis_after_write(
241 storage,
242 table_name,
243 &meta,
244 &pk,
245 &sk,
246 &put_req.item,
247 &key_schema.partition_key,
248 key_schema.sort_key.as_deref(),
249 )?;
250
251 let table_entry = table_gsi_units.entry(table_name.clone()).or_default();
253 for (gsi_name, units) in &gsi_units {
254 *table_entry.entry(gsi_name.clone()).or_insert(0.0) += units;
255 }
256
257 super::lsi::maintain_lsis_after_write(
259 storage,
260 table_name,
261 &meta,
262 &pk,
263 &sk,
264 &put_req.item,
265 &key_schema.partition_key,
266 key_schema.sort_key.as_deref(),
267 )?;
268
269 if let Some(pk_val) = put_req.item.get(&key_schema.partition_key) {
271 affected_partitions.push((
272 table_name.clone(),
273 pk.clone(),
274 key_schema.partition_key.clone(),
275 pk_val.clone(),
276 ));
277 }
278
279 let old_item: Option<Item> = old_json.and_then(|j| serde_json::from_str(&j).ok());
281 crate::streams::record_stream_event(
282 storage,
283 &meta,
284 old_item.as_ref(),
285 Some(&put_req.item),
286 )?;
287 } else if let Some(ref del_req) = wr.delete_request {
288 helpers::validate_key_only(&del_req.key, &key_schema)?;
289 let (pk, sk) = helpers::extract_key_strings(&del_req.key, &key_schema)?;
291 let old_json = storage.delete_item(table_name, &pk, &sk)?;
292
293 let old_item: Option<Item> =
295 old_json.as_ref().and_then(|j| serde_json::from_str(j).ok());
296 let delete_wcu = if let Some(ref old) = old_item {
297 types::write_capacity_units(types::item_size(old))
298 } else {
299 1.0
300 };
301 *table_wcu.entry(table_name.clone()).or_insert(0.0) += delete_wcu;
302
303 let gsi_units =
305 super::gsi::maintain_gsis_after_delete(storage, table_name, &meta, &pk, &sk)?;
306
307 let table_entry = table_gsi_units.entry(table_name.clone()).or_default();
309 for (gsi_name, units) in &gsi_units {
310 *table_entry.entry(gsi_name.clone()).or_insert(0.0) += units;
311 }
312
313 super::lsi::maintain_lsis_after_delete(storage, table_name, &meta, &pk, &sk)?;
315
316 if let Some(pk_val) = del_req.key.get(&key_schema.partition_key) {
318 affected_partitions.push((
319 table_name.clone(),
320 pk.clone(),
321 key_schema.partition_key.clone(),
322 pk_val.clone(),
323 ));
324 }
325
326 if old_item.is_some() {
328 crate::streams::record_stream_event(storage, &meta, old_item.as_ref(), None)?;
329 }
330 } else {
331 return Err(DynoxideError::ValidationException(
332 "WriteRequest must contain either PutRequest or DeleteRequest".to_string(),
333 ));
334 }
335 }
336 }
337
338 let consumed_capacity = if matches!(
340 request.return_consumed_capacity.as_deref(),
341 Some("TOTAL") | Some("INDEXES")
342 ) {
343 let mut caps = Vec::new();
344 for table_name in request.request_items.keys() {
345 let total_wcu = table_wcu.get(table_name).copied().unwrap_or(0.0);
346 let gsi_units = table_gsi_units.get(table_name).cloned().unwrap_or_default();
347 if let Some(cc) = crate::types::consumed_capacity_with_indexes(
348 table_name,
349 total_wcu,
350 &gsi_units,
351 &request.return_consumed_capacity,
352 ) {
353 caps.push(cc);
354 }
355 }
356 Some(caps)
357 } else {
358 None
359 };
360
361 let mut all_item_collection_metrics: HashMap<String, Vec<crate::types::ItemCollectionMetrics>> =
363 HashMap::new();
364 if matches!(
365 request.return_item_collection_metrics.as_deref(),
366 Some("SIZE")
367 ) {
368 let mut seen = std::collections::HashSet::new();
370 for (tbl, pk_str, pk_attr, pk_val) in &affected_partitions {
371 let key = (tbl.as_str(), pk_str.as_str());
372 if !seen.insert(key) {
373 continue;
374 }
375 let meta = helpers::require_table(storage, tbl)?;
376 if let Some(icm) = helpers::build_item_collection_metrics(
377 storage,
378 &meta,
379 tbl,
380 pk_str,
381 pk_attr,
382 pk_val,
383 &request.return_item_collection_metrics,
384 )? {
385 all_item_collection_metrics
386 .entry(tbl.clone())
387 .or_default()
388 .push(icm);
389 }
390 }
391 }
392 let item_collection_metrics = if all_item_collection_metrics.is_empty() {
393 None
394 } else {
395 Some(all_item_collection_metrics)
396 };
397
398 Ok(BatchWriteItemResponse {
399 unprocessed_items: HashMap::new(),
400 consumed_capacity,
401 item_collection_metrics,
402 })
403}