dynamodb_facade/operations/batch.rs
1use super::*;
2
3use aws_sdk_dynamodb::types::{
4 WriteRequest,
5 builders::{DeleteRequestBuilder, PutRequestBuilder},
6};
7use tracing::Instrument;
8
9/// Entry points for building DynamoDB `BatchWriteItem` write requests.
10///
11/// This trait is **blanket-implemented** for every type that implements
12/// [`DynamoDBItemOp<TD>`]. You never implement it manually.
13///
14/// Each method returns a [`WriteRequest`] that can be collected into a `Vec`
15/// and passed to [`dynamodb_batch_write`] for execution. Batch writes are
16/// limited to put and delete operations — batch updates are not supported
17/// by the DynamoDB API.
18///
19/// # Examples
20///
21/// ```no_run
22/// # use dynamodb_facade::test_fixtures::*;
23/// use dynamodb_facade::{DynamoDBItemBatchOp, dynamodb_batch_write, KeyId};
24///
25/// # async fn example(
26/// # client: aws_sdk_dynamodb::Client,
27/// # enrollments: Vec<Enrollment>,
28/// # ) -> dynamodb_facade::Result<()> {
29/// // Batch put a collection of enrollments
30/// let requests: Vec<_> = enrollments.iter().map(|e| e.batch_put()).collect();
31/// dynamodb_batch_write::<PlatformTable>(client, requests).await?;
32/// # Ok(())
33/// # }
34/// ```
35pub trait DynamoDBItemBatchOp<TD: TableDefinition>: DynamoDBItemOp<TD> {
36 /// Creates a `PutRequest` [`WriteRequest`] for this item.
37 ///
38 /// Serializes `self` into a DynamoDB item map and wraps it in a
39 /// `WriteRequest::PutRequest`. Pass the result to
40 /// [`dynamodb_batch_write`] for execution.
41 ///
42 /// # Panics
43 ///
44 /// Panics if serializing `self` via [`DynamoDBItem::to_item`] fails. See
45 /// [`DynamoDBItem::to_item`] for the conditions under which this can
46 /// happen — it is the caller's responsibility to provide a compatible
47 /// [`Serialize`] implementation.
48 ///
49 /// # Examples
50 ///
51 /// ```no_run
52 /// # use dynamodb_facade::test_fixtures::*;
53 /// use dynamodb_facade::{DynamoDBItemBatchOp, dynamodb_batch_write};
54 ///
55 /// # async fn example(
56 /// # client: aws_sdk_dynamodb::Client,
57 /// # enrollments: Vec<Enrollment>,
58 /// # ) -> dynamodb_facade::Result<()> {
59 /// // enrollments: Vec<Enrollment>
60 /// let requests: Vec<_> = enrollments.iter().map(|e| e.batch_put()).collect();
61 /// dynamodb_batch_write::<PlatformTable>(client, requests).await?;
62 /// # Ok(())
63 /// # }
64 /// ```
65 fn batch_put(&self) -> WriteRequest
66 where
67 Self: Serialize,
68 {
69 batch_put(self.to_item())
70 }
71
72 /// Creates a `DeleteRequest` [`WriteRequest`] for this item's key.
73 ///
74 /// Extracts the key from `self` and wraps it in a
75 /// `WriteRequest::DeleteRequest`. Pass the result to
76 /// [`dynamodb_batch_write`] for execution.
77 ///
78 /// # Examples
79 ///
80 /// ```no_run
81 /// # use dynamodb_facade::test_fixtures::*;
82 /// use dynamodb_facade::{DynamoDBItemBatchOp, dynamodb_batch_write};
83 ///
84 /// # async fn example(
85 /// # client: aws_sdk_dynamodb::Client,
86 /// # enrollments: Vec<Enrollment>,
87 /// # ) -> dynamodb_facade::Result<()> {
88 /// // enrollments: Vec<Enrollment>
89 /// let requests: Vec<_> = enrollments.iter().map(|e| e.batch_delete()).collect();
90 /// dynamodb_batch_write::<PlatformTable>(client, requests).await?;
91 /// # Ok(())
92 /// # }
93 /// ```
94 fn batch_delete(&self) -> WriteRequest {
95 batch_delete(self.get_key())
96 }
97
98 /// Creates a `DeleteRequest` [`WriteRequest`] from a key ID, without loading the item.
99 ///
100 /// Builds the key from `key_id` using the type's `HasAttribute` impl and
101 /// wraps it in a `WriteRequest::DeleteRequest`. Use this when you have the
102 /// key components but not the full item.
103 ///
104 /// # Examples
105 ///
106 /// ```no_run
107 /// # use dynamodb_facade::test_fixtures::*;
108 /// use dynamodb_facade::{DynamoDBItemBatchOp, dynamodb_batch_write, KeyId};
109 ///
110 /// # async fn example(
111 /// # client: aws_sdk_dynamodb::Client,
112 /// # user_ids: Vec<String>,
113 /// # ) -> dynamodb_facade::Result<()> {
114 /// // user_ids: Vec<String>
115 /// let requests: Vec<_> = user_ids
116 /// .iter()
117 /// .map(|id| User::batch_delete_by_id(KeyId::pk(id)))
118 /// .collect();
119 /// dynamodb_batch_write::<PlatformTable>(client, requests).await?;
120 /// # Ok(())
121 /// # }
122 /// ```
123 fn batch_delete_by_id(key_id: Self::KeyId<'_>) -> WriteRequest {
124 batch_delete(Self::get_key_from_id(key_id))
125 }
126}
127impl<TD: TableDefinition, DBI: DynamoDBItemOp<TD>> DynamoDBItemBatchOp<TD> for DBI {}
128
129/// Creates a `PutRequest` [`WriteRequest`] from a raw [`Item`].
130///
131/// Low-level counterpart to [`DynamoDBItemBatchOp::batch_put`]. Use this
132/// when you already have an [`Item<TD>`].
133///
134/// # Examples
135///
136/// ```no_run
137/// # use dynamodb_facade::test_fixtures::*;
138/// use dynamodb_facade::{batch_put, dynamodb_batch_write};
139///
140/// # async fn example(
141/// # client: aws_sdk_dynamodb::Client,
142/// # ) -> dynamodb_facade::Result<()> {
143/// let item /* : Item<PlatformTable> */ = sample_user_item();
144/// let request = batch_put(item);
145/// dynamodb_batch_write::<PlatformTable>(client, vec![request]).await?;
146/// # Ok(())
147/// # }
148/// ```
149#[tracing::instrument(level = "debug")]
150pub fn batch_put(item: Item<impl TableDefinition>) -> WriteRequest {
151 WriteRequest::builder()
152 .put_request(
153 PutRequestBuilder::default()
154 .set_item(Some(item.into_inner()))
155 .build()
156 .expect("item is set"),
157 )
158 .build()
159}
160
161/// Creates a `DeleteRequest` [`WriteRequest`] from a raw [`Key`].
162///
163/// Low-level counterpart to [`DynamoDBItemBatchOp::batch_delete`]. Use this
164/// when you already have a [`Key<TD>`].
165///
166/// # Examples
167///
168/// ```no_run
169/// # use dynamodb_facade::test_fixtures::*;
170/// use dynamodb_facade::{batch_delete, dynamodb_batch_write};
171///
172/// # async fn example(
173/// # client: aws_sdk_dynamodb::Client,
174/// # ) -> dynamodb_facade::Result<()> {
175/// let key = sample_user_item().into_key_only();
176/// let request = batch_delete(key);
177/// dynamodb_batch_write::<PlatformTable>(client, vec![request]).await?;
178/// # Ok(())
179/// # }
180/// ```
181#[tracing::instrument(level = "debug")]
182pub fn batch_delete(key: Key<impl TableDefinition>) -> WriteRequest {
183 WriteRequest::builder()
184 .delete_request(
185 DeleteRequestBuilder::default()
186 .set_key(Some(key.into_inner()))
187 .build()
188 .expect("key is set"),
189 )
190 .build()
191}
192
193/// Executes a batch of `WriteRequest`s against a DynamoDB table.
194///
195/// Handles all the complexity of the DynamoDB batch write API:
196///
197/// - **Chunking** — automatically splits the input into chunks of 25 items
198/// (the DynamoDB maximum per `BatchWriteItem` call).
199/// - **Parallelism** — each chunk is sent concurrently via
200/// [`tokio::spawn`].
201/// - **Retry** — any unprocessed items returned by DynamoDB are retried up
202/// to 3 times total. If items remain unprocessed after all attempts, the
203/// function returns [`Error::FailedBatchWrite`](crate::Error::FailedBatchWrite)
204/// containing the unprocessed [`WriteRequest`]s.
205///
206/// Build `WriteRequest` values using [`DynamoDBItemBatchOp::batch_put`],
207/// [`DynamoDBItemBatchOp::batch_delete`], [`batch_put`], or [`batch_delete`].
208///
209/// # Errors
210///
211/// - Returns [`Error::FailedBatchWrite`](crate::Error::FailedBatchWrite) if
212/// items remain unprocessed after 3 retry attempts.
213/// - Returns [`Error::DynamoDB`](crate::Error::DynamoDB) if any individual
214/// `BatchWriteItem` SDK call fails with a non-retryable error.
215///
216/// # Examples
217///
218/// ```no_run
219/// # use dynamodb_facade::test_fixtures::*;
220/// use dynamodb_facade::{DynamoDBItemBatchOp, dynamodb_batch_write};
221///
222/// # async fn example(
223/// # client: aws_sdk_dynamodb::Client,
224/// # enrollments: Vec<Enrollment>,
225/// # ) -> dynamodb_facade::Result<()> {
226/// // Batch put a large collection — chunking and retries are handled automatically
227/// let requests: Vec<_> = enrollments.iter().map(|e| e.batch_put()).collect();
228/// dynamodb_batch_write::<PlatformTable>(client, requests).await?;
229/// # Ok(())
230/// # }
231/// ```
232#[tracing::instrument(level = "debug", skip(client))]
233pub async fn dynamodb_batch_write<TD: TableDefinition>(
234 client: aws_sdk_dynamodb::Client,
235 mut batch_write_requests: Vec<WriteRequest>,
236) -> Result<()> {
237 const MAX_RETRY: usize = 3;
238
239 let table_name = TD::table_name();
240 // Process the Batch(es) in massively parallel fashion
241 // Because Rust.
242 tracing::debug!("putting {} items...", batch_write_requests.len());
243 let mut retry = 0;
244 while !batch_write_requests.is_empty() && retry < MAX_RETRY {
245 retry += 1;
246 tracing::debug!("Try #{retry}/{MAX_RETRY}");
247 let handles = batch_write_requests
248 .chunks(25)
249 .enumerate()
250 .map(|(index, chunk)| {
251 let chunk = chunk.to_vec();
252 let cclient = client.clone();
253 let ctable_name = table_name.clone();
254 tokio::spawn(
255 async move {
256 tracing::debug!("Sending BatchWriteItem for chunk #{index}...");
257 let result = cclient
258 .batch_write_item()
259 .set_request_items(Some([(ctable_name, chunk)].into()))
260 .send()
261 .await;
262 tracing::debug!("BatchWriteItem finished for chunk #{index}");
263 result
264 }
265 .instrument(tracing::info_span!("batch_write_chunk", %index, try=retry)),
266 )
267 })
268 .collect::<Vec<_>>();
269 let mut unprocess_vec = Vec::default();
270
271 for h in handles {
272 let batch_output = h.await.expect("batch write task panicked")?;
273 if let Some(unproccessed) = batch_output.unprocessed_items {
274 if !unproccessed.is_empty() {
275 unprocess_vec.extend(unproccessed.into_iter().flat_map(|e| e.1));
276 }
277 }
278 }
279
280 batch_write_requests = unprocess_vec;
281
282 tracing::debug!("{} items were unprocessed", batch_write_requests.len());
283 }
284
285 if batch_write_requests.is_empty() {
286 Ok(())
287 } else {
288 Err(crate::Error::FailedBatchWrite(batch_write_requests))
289 }
290}