use super::*;
use aws_sdk_dynamodb::types::{
WriteRequest,
builders::{DeleteRequestBuilder, PutRequestBuilder},
};
use tracing::Instrument;
pub trait DynamoDBItemBatchOp<TD: TableDefinition>: DynamoDBItemOp<TD> {
fn batch_put(&self) -> WriteRequest
where
Self: Serialize,
{
batch_put(self.to_item())
}
fn batch_delete(&self) -> WriteRequest {
batch_delete(self.get_key())
}
fn batch_delete_by_id(key_id: Self::KeyId<'_>) -> WriteRequest {
batch_delete(Self::get_key_from_id(key_id))
}
}
impl<TD: TableDefinition, DBI: DynamoDBItemOp<TD>> DynamoDBItemBatchOp<TD> for DBI {}
#[tracing::instrument(level = "debug")]
pub fn batch_put(item: Item<impl TableDefinition>) -> WriteRequest {
WriteRequest::builder()
.put_request(
PutRequestBuilder::default()
.set_item(Some(item.into_inner()))
.build()
.expect("item is set"),
)
.build()
}
#[tracing::instrument(level = "debug")]
pub fn batch_delete(key: Key<impl TableDefinition>) -> WriteRequest {
WriteRequest::builder()
.delete_request(
DeleteRequestBuilder::default()
.set_key(Some(key.into_inner()))
.build()
.expect("key is set"),
)
.build()
}
#[tracing::instrument(level = "debug", skip(client))]
pub async fn dynamodb_batch_write<TD: TableDefinition>(
client: aws_sdk_dynamodb::Client,
mut batch_write_requests: Vec<WriteRequest>,
) -> Result<()> {
const MAX_RETRY: usize = 3;
let table_name = TD::table_name();
tracing::debug!("putting {} items...", batch_write_requests.len());
let mut retry = 0;
while !batch_write_requests.is_empty() && retry < MAX_RETRY {
retry += 1;
tracing::debug!("Try #{retry}/{MAX_RETRY}");
let handles = batch_write_requests
.chunks(25)
.enumerate()
.map(|(index, chunk)| {
let chunk = chunk.to_vec();
let cclient = client.clone();
let ctable_name = table_name.clone();
tokio::spawn(
async move {
tracing::debug!("Sending BatchWriteItem for chunk #{index}...");
let result = cclient
.batch_write_item()
.set_request_items(Some([(ctable_name, chunk)].into()))
.send()
.await;
tracing::debug!("BatchWriteItem finished for chunk #{index}");
result
}
.instrument(tracing::info_span!("batch_write_chunk", %index, try=retry)),
)
})
.collect::<Vec<_>>();
let mut unprocess_vec = Vec::default();
for h in handles {
let batch_output = h.await.expect("batch write task panicked")?;
if let Some(unproccessed) = batch_output.unprocessed_items {
if !unproccessed.is_empty() {
unprocess_vec.extend(unproccessed.into_iter().flat_map(|e| e.1));
}
}
}
batch_write_requests = unprocess_vec;
tracing::debug!("{} items were unprocessed", batch_write_requests.len());
}
if batch_write_requests.is_empty() {
Ok(())
} else {
Err(crate::Error::FailedBatchWrite(batch_write_requests))
}
}