use std::time::Duration;
pub const MAX_RETRIES: u32 = 3;
pub const MAX_BATCH_RETRIES: u32 = 5;
const BASE_DELAY_MS: u64 = 100;
pub fn is_retryable_error(error_msg: &str) -> bool {
error_msg.contains("ProvisionedThroughputExceededException")
|| error_msg.contains("ThrottlingException")
|| error_msg.contains("ServiceUnavailable")
|| error_msg.contains("InternalServerError")
|| error_msg.contains("ServiceError")
|| error_msg.contains("RequestLimitExceeded")
}
pub fn exponential_backoff(retry_count: u32) -> Duration {
let delay_ms = BASE_DELAY_MS * (1u64 << retry_count.min(10)); Duration::from_millis(delay_ms)
}
#[macro_export]
macro_rules! retry_operation {
($op:expr, $op_name:expr, $table:expr, $key:expr, $max_retries:expr, $err_conv:expr) => {{
use $crate::storage::dynamodb_utils::{is_retryable_error, exponential_backoff, format_dynamodb_error};
let mut retries = 0;
loop {
match $op.await {
Ok(result) => break Ok(result),
Err(e) => {
let error_str = e.to_string();
if retries == 0 {
log::debug!(
"DynamoDB {} error (attempt {}): {}",
$op_name,
retries + 1,
error_str
);
log::debug!("Full error details: {:?}", e);
}
if retries >= $max_retries {
let detailed_error = format_dynamodb_error($op_name, $table, $key, &error_str);
log::error!("DynamoDB {} failed after {} retries: {}", $op_name, retries + 1, detailed_error);
break Err($err_conv(detailed_error));
}
if is_retryable_error(&error_str) {
let delay = exponential_backoff(retries);
log::debug!("Retrying {} after {:?} delay (attempt {}/{})", $op_name, delay, retries + 1, $max_retries);
tokio::time::sleep(delay).await;
retries += 1;
continue;
}
let detailed_error = format_dynamodb_error($op_name, $table, $key, &error_str);
log::error!("DynamoDB {} failed with non-retryable error: {}", $op_name, detailed_error);
break Err($err_conv(detailed_error));
}
}
}
}};
}
pub fn format_dynamodb_error(
operation: &str,
table_name: &str,
key: Option<&str>,
error: impl std::fmt::Display,
) -> String {
if let Some(k) = key {
format!(
"DynamoDB {} failed for table '{}', key '{}': {}",
operation, table_name, k, error
)
} else {
format!(
"DynamoDB {} failed for table '{}': {}",
operation, table_name, error
)
}
}
pub async fn retry_batch_operation<F>(
mut batch_operation: F,
table_name: &str,
initial_requests: Vec<aws_sdk_dynamodb::types::WriteRequest>,
) -> Result<(), String>
where
F: FnMut(
&[aws_sdk_dynamodb::types::WriteRequest],
) -> std::pin::Pin<
Box<
dyn std::future::Future<
Output = Result<
aws_sdk_dynamodb::operation::batch_write_item::BatchWriteItemOutput,
aws_sdk_dynamodb::error::SdkError<
aws_sdk_dynamodb::operation::batch_write_item::BatchWriteItemError,
>,
>,
> + Send,
>,
>,
{
let mut remaining_requests = initial_requests;
let mut retries = 0;
while !remaining_requests.is_empty() && retries < MAX_BATCH_RETRIES {
let result = batch_operation(&remaining_requests).await;
match result {
Ok(response) => {
if let Some(unprocessed) = response.unprocessed_items {
if let Some(unprocessed_reqs) = unprocessed.get(table_name) {
if !unprocessed_reqs.is_empty() {
remaining_requests = unprocessed_reqs.clone();
let delay = exponential_backoff(retries);
tokio::time::sleep(delay).await;
retries += 1;
continue;
}
}
}
return Ok(()); }
Err(e) => {
let error_str = e.to_string();
let error_msg =
format_dynamodb_error("batch_write_item", table_name, None, &error_str);
if retries < MAX_BATCH_RETRIES && is_retryable_error(&error_str) {
let delay = exponential_backoff(retries);
tokio::time::sleep(delay).await;
retries += 1;
continue;
}
return Err(error_msg);
}
}
}
if !remaining_requests.is_empty() {
return Err(format!(
"Failed to process {} items in table '{}' after {} retries",
remaining_requests.len(),
table_name,
MAX_BATCH_RETRIES
));
}
Ok(())
}