use crate::{
errors::{DanubeError, Result},
DanubeClient,
};
use rand::{rng, Rng};
use std::time::Duration;
use tonic::{transport::Uri, Code, Status};
const DEFAULT_MAX_RETRIES: usize = 5;
const DEFAULT_BASE_BACKOFF_MS: u64 = 200;
const DEFAULT_MAX_BACKOFF_MS: u64 = 5_000;
#[derive(Debug, Clone)]
#[allow(dead_code)]
pub struct RetryManager {
max_retries: usize,
base_backoff_ms: u64,
max_backoff_ms: u64,
}
impl RetryManager {
pub fn new(max_retries: usize, base_backoff_ms: u64, max_backoff_ms: u64) -> Self {
Self {
max_retries: if max_retries == 0 {
DEFAULT_MAX_RETRIES
} else {
max_retries
},
base_backoff_ms: if base_backoff_ms == 0 {
DEFAULT_BASE_BACKOFF_MS
} else {
base_backoff_ms
},
max_backoff_ms: if max_backoff_ms == 0 {
DEFAULT_MAX_BACKOFF_MS
} else {
max_backoff_ms
},
}
}
pub fn max_retries(&self) -> usize {
self.max_retries
}
pub async fn insert_auth_token<T>(
client: &DanubeClient,
request: &mut tonic::Request<T>,
addr: &Uri,
) -> Result<()> {
client
.auth_service
.insert_token_if_needed(
client.cnx_manager.connection_options.api_key.as_deref(),
request,
addr,
)
.await
}
pub fn insert_proxy_header<T>(request: &mut tonic::Request<T>, broker_url: &Uri, proxy: bool) {
if proxy {
if let Ok(value) = broker_url.to_string().parse() {
request.metadata_mut().insert("x-danube-broker-url", value);
}
}
}
pub fn is_retryable_error(&self, error: &DanubeError) -> bool {
match error {
DanubeError::FromStatus(status) => matches!(
status.code(),
Code::Unavailable | Code::DeadlineExceeded | Code::ResourceExhausted
),
_ => false,
}
}
pub fn calculate_backoff(&self, attempt: usize) -> Duration {
let linear = self.base_backoff_ms.saturating_mul(attempt as u64 + 1);
let backoff = linear.min(self.max_backoff_ms);
let jitter = rng().random_range(backoff / 2..=backoff); Duration::from_millis(jitter)
}
}
pub fn status_to_danube_error(status: Status) -> DanubeError {
DanubeError::FromStatus(status)
}