Skip to main content

dynamodb_facade/operations/
batch.rs

1use super::*;
2
3use aws_sdk_dynamodb::types::{
4    WriteRequest,
5    builders::{DeleteRequestBuilder, PutRequestBuilder},
6};
7use tracing::Instrument;
8
9/// Entry points for building DynamoDB `BatchWriteItem` write requests.
10///
11/// This trait is **blanket-implemented** for every type that implements
12/// [`DynamoDBItemOp<TD>`]. You never implement it manually.
13///
14/// Each method returns a [`WriteRequest`] that can be collected into a `Vec`
15/// and passed to [`dynamodb_batch_write`] for execution. Batch writes are
16/// limited to put and delete operations — batch updates are not supported
17/// by the DynamoDB API.
18///
19/// # Examples
20///
21/// ```no_run
22/// # use dynamodb_facade::test_fixtures::*;
23/// use dynamodb_facade::{DynamoDBItemBatchOp, dynamodb_batch_write, KeyId};
24///
25/// # async fn example(
26/// #     client: aws_sdk_dynamodb::Client,
27/// #     enrollments: Vec<Enrollment>,
28/// # ) -> dynamodb_facade::Result<()> {
29/// // Batch put a collection of enrollments
30/// let requests: Vec<_> = enrollments.iter().map(|e| e.batch_put()).collect();
31/// dynamodb_batch_write::<PlatformTable>(client, requests).await?;
32/// # Ok(())
33/// # }
34/// ```
35pub trait DynamoDBItemBatchOp<TD: TableDefinition>: DynamoDBItemOp<TD> {
36    /// Creates a `PutRequest` [`WriteRequest`] for this item.
37    ///
38    /// Serializes `self` into a DynamoDB item map and wraps it in a
39    /// `WriteRequest::PutRequest`. Pass the result to
40    /// [`dynamodb_batch_write`] for execution.
41    ///
42    /// # Panics
43    ///
44    /// Panics if serializing `self` via [`DynamoDBItem::to_item`] fails. See
45    /// [`DynamoDBItem::to_item`] for the conditions under which this can
46    /// happen — it is the caller's responsibility to provide a compatible
47    /// [`Serialize`] implementation.
48    ///
49    /// # Examples
50    ///
51    /// ```no_run
52    /// # use dynamodb_facade::test_fixtures::*;
53    /// use dynamodb_facade::{DynamoDBItemBatchOp, dynamodb_batch_write};
54    ///
55    /// # async fn example(
56    /// #     client: aws_sdk_dynamodb::Client,
57    /// #     enrollments: Vec<Enrollment>,
58    /// # ) -> dynamodb_facade::Result<()> {
59    /// // enrollments: Vec<Enrollment>
60    /// let requests: Vec<_> = enrollments.iter().map(|e| e.batch_put()).collect();
61    /// dynamodb_batch_write::<PlatformTable>(client, requests).await?;
62    /// # Ok(())
63    /// # }
64    /// ```
65    fn batch_put(&self) -> WriteRequest
66    where
67        Self: Serialize,
68    {
69        batch_put(self.to_item())
70    }
71
72    /// Creates a `DeleteRequest` [`WriteRequest`] for this item's key.
73    ///
74    /// Extracts the key from `self` and wraps it in a
75    /// `WriteRequest::DeleteRequest`. Pass the result to
76    /// [`dynamodb_batch_write`] for execution.
77    ///
78    /// # Examples
79    ///
80    /// ```no_run
81    /// # use dynamodb_facade::test_fixtures::*;
82    /// use dynamodb_facade::{DynamoDBItemBatchOp, dynamodb_batch_write};
83    ///
84    /// # async fn example(
85    /// #     client: aws_sdk_dynamodb::Client,
86    /// #     enrollments: Vec<Enrollment>,
87    /// # ) -> dynamodb_facade::Result<()> {
88    /// // enrollments: Vec<Enrollment>
89    /// let requests: Vec<_> = enrollments.iter().map(|e| e.batch_delete()).collect();
90    /// dynamodb_batch_write::<PlatformTable>(client, requests).await?;
91    /// # Ok(())
92    /// # }
93    /// ```
94    fn batch_delete(&self) -> WriteRequest {
95        batch_delete(self.get_key())
96    }
97
98    /// Creates a `DeleteRequest` [`WriteRequest`] from a key ID, without loading the item.
99    ///
100    /// Builds the key from `key_id` using the type's `HasAttribute` impl and
101    /// wraps it in a `WriteRequest::DeleteRequest`. Use this when you have the
102    /// key components but not the full item.
103    ///
104    /// # Examples
105    ///
106    /// ```no_run
107    /// # use dynamodb_facade::test_fixtures::*;
108    /// use dynamodb_facade::{DynamoDBItemBatchOp, dynamodb_batch_write, KeyId};
109    ///
110    /// # async fn example(
111    /// #     client: aws_sdk_dynamodb::Client,
112    /// #     user_ids: Vec<String>,
113    /// # ) -> dynamodb_facade::Result<()> {
114    /// // user_ids: Vec<String>
115    /// let requests: Vec<_> = user_ids
116    ///     .iter()
117    ///     .map(|id| User::batch_delete_by_id(KeyId::pk(id)))
118    ///     .collect();
119    /// dynamodb_batch_write::<PlatformTable>(client, requests).await?;
120    /// # Ok(())
121    /// # }
122    /// ```
123    fn batch_delete_by_id(key_id: Self::KeyId<'_>) -> WriteRequest {
124        batch_delete(Self::get_key_from_id(key_id))
125    }
126}
127impl<TD: TableDefinition, DBI: DynamoDBItemOp<TD>> DynamoDBItemBatchOp<TD> for DBI {}
128
129/// Creates a `PutRequest` [`WriteRequest`] from a raw [`Item`].
130///
131/// Low-level counterpart to [`DynamoDBItemBatchOp::batch_put`]. Use this
132/// when you already have an [`Item<TD>`].
133///
134/// # Examples
135///
136/// ```no_run
137/// # use dynamodb_facade::test_fixtures::*;
138/// use dynamodb_facade::{batch_put, dynamodb_batch_write};
139///
140/// # async fn example(
141/// #     client: aws_sdk_dynamodb::Client,
142/// # ) -> dynamodb_facade::Result<()> {
143/// let item /* : Item<PlatformTable> */ = sample_user_item();
144/// let request = batch_put(item);
145/// dynamodb_batch_write::<PlatformTable>(client, vec![request]).await?;
146/// # Ok(())
147/// # }
148/// ```
149#[tracing::instrument(level = "debug")]
150pub fn batch_put(item: Item<impl TableDefinition>) -> WriteRequest {
151    WriteRequest::builder()
152        .put_request(
153            PutRequestBuilder::default()
154                .set_item(Some(item.into_inner()))
155                .build()
156                .expect("item is set"),
157        )
158        .build()
159}
160
161/// Creates a `DeleteRequest` [`WriteRequest`] from a raw [`Key`].
162///
163/// Low-level counterpart to [`DynamoDBItemBatchOp::batch_delete`]. Use this
164/// when you already have a [`Key<TD>`].
165///
166/// # Examples
167///
168/// ```no_run
169/// # use dynamodb_facade::test_fixtures::*;
170/// use dynamodb_facade::{batch_delete, dynamodb_batch_write};
171///
172/// # async fn example(
173/// #     client: aws_sdk_dynamodb::Client,
174/// # ) -> dynamodb_facade::Result<()> {
175/// let key = sample_user_item().into_key_only();
176/// let request = batch_delete(key);
177/// dynamodb_batch_write::<PlatformTable>(client, vec![request]).await?;
178/// # Ok(())
179/// # }
180/// ```
181#[tracing::instrument(level = "debug")]
182pub fn batch_delete(key: Key<impl TableDefinition>) -> WriteRequest {
183    WriteRequest::builder()
184        .delete_request(
185            DeleteRequestBuilder::default()
186                .set_key(Some(key.into_inner()))
187                .build()
188                .expect("key is set"),
189        )
190        .build()
191}
192
193/// Executes a batch of `WriteRequest`s against a DynamoDB table.
194///
195/// Handles all the complexity of the DynamoDB batch write API:
196///
197/// - **Chunking** — automatically splits the input into chunks of 25 items
198///   (the DynamoDB maximum per `BatchWriteItem` call).
199/// - **Parallelism** — each chunk is sent concurrently via
200///   [`tokio::spawn`].
201/// - **Retry** — any unprocessed items returned by DynamoDB are retried up
202///   to 3 times total. If items remain unprocessed after all attempts, the
203///   function returns [`Error::FailedBatchWrite`](crate::Error::FailedBatchWrite)
204///   containing the unprocessed [`WriteRequest`]s.
205///
206/// Build `WriteRequest` values using [`DynamoDBItemBatchOp::batch_put`],
207/// [`DynamoDBItemBatchOp::batch_delete`], [`batch_put`], or [`batch_delete`].
208///
209/// # Errors
210///
211/// - Returns [`Error::FailedBatchWrite`](crate::Error::FailedBatchWrite) if
212///   items remain unprocessed after 3 retry attempts.
213/// - Returns [`Error::DynamoDB`](crate::Error::DynamoDB) if any individual
214///   `BatchWriteItem` SDK call fails with a non-retryable error.
215///
216/// # Examples
217///
218/// ```no_run
219/// # use dynamodb_facade::test_fixtures::*;
220/// use dynamodb_facade::{DynamoDBItemBatchOp, dynamodb_batch_write};
221///
222/// # async fn example(
223/// #     client: aws_sdk_dynamodb::Client,
224/// #     enrollments: Vec<Enrollment>,
225/// # ) -> dynamodb_facade::Result<()> {
226/// // Batch put a large collection — chunking and retries are handled automatically
227/// let requests: Vec<_> = enrollments.iter().map(|e| e.batch_put()).collect();
228/// dynamodb_batch_write::<PlatformTable>(client, requests).await?;
229/// # Ok(())
230/// # }
231/// ```
232#[tracing::instrument(level = "debug", skip(client))]
233pub async fn dynamodb_batch_write<TD: TableDefinition>(
234    client: aws_sdk_dynamodb::Client,
235    mut batch_write_requests: Vec<WriteRequest>,
236) -> Result<()> {
237    const MAX_RETRY: usize = 3;
238
239    let table_name = TD::table_name();
240    // Process the Batch(es) in massively parallel fashion
241    // Because Rust.
242    tracing::debug!("putting {} items...", batch_write_requests.len());
243    let mut retry = 0;
244    while !batch_write_requests.is_empty() && retry < MAX_RETRY {
245        retry += 1;
246        tracing::debug!("Try #{retry}/{MAX_RETRY}");
247        let handles = batch_write_requests
248            .chunks(25)
249            .enumerate()
250            .map(|(index, chunk)| {
251                let chunk = chunk.to_vec();
252                let cclient = client.clone();
253                let ctable_name = table_name.clone();
254                tokio::spawn(
255                    async move {
256                        tracing::debug!("Sending BatchWriteItem for chunk #{index}...");
257                        let result = cclient
258                            .batch_write_item()
259                            .set_request_items(Some([(ctable_name, chunk)].into()))
260                            .send()
261                            .await;
262                        tracing::debug!("BatchWriteItem finished for chunk #{index}");
263                        result
264                    }
265                    .instrument(tracing::info_span!("batch_write_chunk", %index, try=retry)),
266                )
267            })
268            .collect::<Vec<_>>();
269        let mut unprocess_vec = Vec::default();
270
271        for h in handles {
272            let batch_output = h.await.expect("batch write task panicked")?;
273            if let Some(unproccessed) = batch_output.unprocessed_items {
274                if !unproccessed.is_empty() {
275                    unprocess_vec.extend(unproccessed.into_iter().flat_map(|e| e.1));
276                }
277            }
278        }
279
280        batch_write_requests = unprocess_vec;
281
282        tracing::debug!("{} items were unprocessed", batch_write_requests.len());
283    }
284
285    if batch_write_requests.is_empty() {
286        Ok(())
287    } else {
288        Err(crate::Error::FailedBatchWrite(batch_write_requests))
289    }
290}