use std::{cmp::max, time::Duration};
use time::OffsetDateTime;
use tokio::sync::mpsc::{self, error::TrySendError};
pub struct TrySendWithRetryConfig {
pub max_sleep_ms: u32,
pub max_retry_duration_ms: Option<u32>,
pub max_retry_attempts: Option<u32>,
}
impl Default for TrySendWithRetryConfig {
fn default() -> Self {
Self {
max_sleep_ms: 30,
max_retry_duration_ms: Some(1_000),
max_retry_attempts: Some(5),
}
}
}
pub async fn try_send_with_retry<T: Clone>(tx: &mpsc::Sender<T>, message: T, config: TrySendWithRetryConfig) -> Result<(), TrySendError<T>> {
let abort_time = config
.max_retry_duration_ms
.map(|max_duration_ms| OffsetDateTime::now_utc().unix_timestamp_nanos() + (max_duration_ms * 1_000_000) as i128);
let mut counter = 0;
loop {
counter += 1;
match tx.try_send(message.clone()) {
Ok(_) => break Ok(()),
Err(TrySendError::Full(_)) => {
let sleep_time = max(
counter * 5, config.max_sleep_ms,
);
if let Some(max_attempts) = config.max_retry_attempts {
if counter >= max_attempts {
break Err(TrySendError::Full(message));
}
}
if let Some(stop_ns) = abort_time {
if OffsetDateTime::now_utc().unix_timestamp_nanos() >= stop_ns {
break Err(TrySendError::Full(message));
}
}
tokio::time::sleep(Duration::from_millis(sleep_time as u64)).await;
}
Err(e) => break Err(e),
}
}
}