Skip to main content

danube_client/
retry_manager.rs

1use crate::{
2    errors::{DanubeError, Result},
3    DanubeClient,
4};
5use rand::{rng, Rng};
6use std::time::Duration;
7use tonic::{transport::Uri, Code, Status};
8
9/// Default maximum number of retry attempts before giving up.
10const DEFAULT_MAX_RETRIES: usize = 5;
11/// Default base backoff duration in milliseconds (used for linear backoff calculation).
12const DEFAULT_BASE_BACKOFF_MS: u64 = 200;
13/// Default maximum backoff cap in milliseconds.
14const DEFAULT_MAX_BACKOFF_MS: u64 = 5_000;
15
16/// Centralized retry and reconnection management with backoff, jitter, and authentication
17#[derive(Debug, Clone)]
18#[allow(dead_code)]
19pub struct RetryManager {
20    max_retries: usize,
21    base_backoff_ms: u64,
22    max_backoff_ms: u64,
23}
24
25impl RetryManager {
26    pub fn new(max_retries: usize, base_backoff_ms: u64, max_backoff_ms: u64) -> Self {
27        Self {
28            max_retries: if max_retries == 0 {
29                DEFAULT_MAX_RETRIES
30            } else {
31                max_retries
32            },
33            base_backoff_ms: if base_backoff_ms == 0 {
34                DEFAULT_BASE_BACKOFF_MS
35            } else {
36                base_backoff_ms
37            },
38            max_backoff_ms: if max_backoff_ms == 0 {
39                DEFAULT_MAX_BACKOFF_MS
40            } else {
41                max_backoff_ms
42            },
43        }
44    }
45
46    /// Get the maximum number of retries
47    pub fn max_retries(&self) -> usize {
48        self.max_retries
49    }
50
51    /// Insert authentication token into request.
52    /// Delegates to [`AuthService::insert_token_if_needed`].
53    pub async fn insert_auth_token<T>(
54        client: &DanubeClient,
55        request: &mut tonic::Request<T>,
56        addr: &Uri,
57    ) -> Result<()> {
58        client
59            .auth_service
60            .insert_token_if_needed(
61                client.cnx_manager.connection_options.resolve_token(),
62                request,
63                addr,
64            )
65            .await
66    }
67
68    /// Insert proxy routing header into request when proxy mode is active.
69    /// The proxy uses this header to route the gRPC call to the correct broker.
70    pub fn insert_proxy_header<T>(request: &mut tonic::Request<T>, broker_url: &Uri, proxy: bool) {
71        if proxy {
72            if let Ok(value) = broker_url.to_string().parse() {
73                request.metadata_mut().insert("x-danube-broker-url", value);
74            }
75        }
76    }
77
78    /// Check if an error is retryable based on gRPC status codes.
79    ///
80    /// Retryable codes:
81    /// - `Unavailable` — broker not reachable or topic moved (ServiceNotReady)
82    /// - `DeadlineExceeded` — request timed out
83    /// - `ResourceExhausted` — rate limited / overloaded
84    pub fn is_retryable_error(&self, error: &DanubeError) -> bool {
85        match error {
86            DanubeError::FromStatus(status) => matches!(
87                status.code(),
88                Code::Unavailable | Code::DeadlineExceeded | Code::ResourceExhausted
89            ),
90            _ => false,
91        }
92    }
93
94    /// Calculate linear backoff with jitter
95    pub fn calculate_backoff(&self, attempt: usize) -> Duration {
96        // Linear backoff: base * (attempt + 1), capped at max
97        let linear = self.base_backoff_ms.saturating_mul(attempt as u64 + 1);
98        let backoff = linear.min(self.max_backoff_ms);
99        let jitter = rng().random_range(backoff / 2..=backoff); // 50-100% jitter
100        Duration::from_millis(jitter)
101    }
102}
103
104/// Convert gRPC Status to DanubeError
105pub fn status_to_danube_error(status: Status) -> DanubeError {
106    DanubeError::FromStatus(status)
107}