danube_client/
retry_manager.rs1use crate::{
2 errors::{DanubeError, Result},
3 DanubeClient,
4};
5use rand::{rng, Rng};
6use std::time::Duration;
7use tonic::{transport::Uri, Code, Status};
8
9const DEFAULT_MAX_RETRIES: usize = 5;
11const DEFAULT_BASE_BACKOFF_MS: u64 = 200;
13const DEFAULT_MAX_BACKOFF_MS: u64 = 5_000;
15
16#[derive(Debug, Clone)]
18#[allow(dead_code)]
19pub struct RetryManager {
20 max_retries: usize,
21 base_backoff_ms: u64,
22 max_backoff_ms: u64,
23}
24
25impl RetryManager {
26 pub fn new(max_retries: usize, base_backoff_ms: u64, max_backoff_ms: u64) -> Self {
27 Self {
28 max_retries: if max_retries == 0 {
29 DEFAULT_MAX_RETRIES
30 } else {
31 max_retries
32 },
33 base_backoff_ms: if base_backoff_ms == 0 {
34 DEFAULT_BASE_BACKOFF_MS
35 } else {
36 base_backoff_ms
37 },
38 max_backoff_ms: if max_backoff_ms == 0 {
39 DEFAULT_MAX_BACKOFF_MS
40 } else {
41 max_backoff_ms
42 },
43 }
44 }
45
46 pub fn max_retries(&self) -> usize {
48 self.max_retries
49 }
50
51 pub async fn insert_auth_token<T>(
54 client: &DanubeClient,
55 request: &mut tonic::Request<T>,
56 addr: &Uri,
57 ) -> Result<()> {
58 client
59 .auth_service
60 .insert_token_if_needed(
61 client.cnx_manager.connection_options.resolve_token(),
62 request,
63 addr,
64 )
65 .await
66 }
67
68 pub fn insert_proxy_header<T>(request: &mut tonic::Request<T>, broker_url: &Uri, proxy: bool) {
71 if proxy {
72 if let Ok(value) = broker_url.to_string().parse() {
73 request.metadata_mut().insert("x-danube-broker-url", value);
74 }
75 }
76 }
77
78 pub fn is_retryable_error(&self, error: &DanubeError) -> bool {
85 match error {
86 DanubeError::FromStatus(status) => matches!(
87 status.code(),
88 Code::Unavailable | Code::DeadlineExceeded | Code::ResourceExhausted
89 ),
90 _ => false,
91 }
92 }
93
94 pub fn calculate_backoff(&self, attempt: usize) -> Duration {
96 let linear = self.base_backoff_ms.saturating_mul(attempt as u64 + 1);
98 let backoff = linear.min(self.max_backoff_ms);
99 let jitter = rng().random_range(backoff / 2..=backoff); Duration::from_millis(jitter)
101 }
102}
103
104pub fn status_to_danube_error(status: Status) -> DanubeError {
106 DanubeError::FromStatus(status)
107}