camel_component_api/network_retry.rs
1//! Shared reconnection/backoff policy for networked components.
2//!
3//! Provides [`NetworkRetryPolicy`] (config struct), [`retry_async`] (execution helper),
4//! and [`retry_async_cancelable`] (cancellation-aware variant). Components that
5//! supervise external processes (JMS, xj, xslt) should use only
6//! [`NetworkRetryPolicy::delay_for`] inside their own supervision loops.
7//!
8//! Both [`retry_async`] and [`retry_async_cancelable`] accept an optional
9//! `label` for component identity in retry logs:
10//!
11//! ```rust,ignore
12//! use camel_component_api::retry_async;
13//!
14//! retry_async(&config.reconnect, Some("ws-producer"), op, is_retryable).await?;
15//! ```
16//!
17//! When a label is set, log messages include `"ws-producer: transient error
18//! — retrying"` with a `component` structured field that operators can filter
19//! with `component=ws-producer`.
20//!
21//! For location-specific context (URLs, endpoints), wrap the retry call in a
22//! [`tracing::span`](https://docs.rs/tracing/latest/tracing/macro.span.html)
23//! whose fields are inherited by all log events inside the retry loop:
24//!
25//! ```rust,ignore
26//! let span = tracing::info_span!("ws_connect", url = %url);
27//! let _guard = span.enter();
28//! retry_async(&config.reconnect, Some("ws-producer"), op, is_retryable).await?;
29//! ```
30
31use std::{future::Future, time::Duration};
32
33use rand::RngExt;
34use rand::distr::Uniform;
35use serde::{Deserialize, Serialize};
36use tokio::time::sleep;
37use tokio_util::sync::CancellationToken;
38
39use crate::CamelError;
40
41// Default-value helpers — must return the field type (Duration), not u64,
42// because serde calls these when the key is absent and deserialize_with is
43// NOT invoked in that case; the value must already be the target type.
44fn default_enabled() -> bool {
45 true
46}
47fn default_max_attempts() -> u32 {
48 10
49}
50fn default_initial_delay() -> Duration {
51 Duration::from_millis(100)
52}
53fn default_multiplier() -> f64 {
54 2.0
55}
56fn default_max_delay() -> Duration {
57 Duration::from_millis(30_000)
58}
59fn default_jitter_factor() -> f64 {
60 0.2
61}
62
63fn deserialize_duration_ms<'de, D>(d: D) -> Result<Duration, D::Error>
64where
65 D: serde::Deserializer<'de>,
66{
67 let ms = u64::deserialize(d)?;
68 Ok(Duration::from_millis(ms))
69}
70
71/// Reconnection and backoff policy for networked components.
72///
73/// Used in component config structs as a `reconnect` field:
74/// ```toml
75/// [default.components.redis.reconnect]
76/// max_attempts = 10
77/// initial_delay_ms = 100
78/// max_delay_ms = 30000
79/// ```
80// Derive Serialize as well: several component configs (e.g. Kafka) derive
81// Serialize, and a Deserialize-only nested struct breaks the derive on the
82// host struct.
83#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
84#[must_use]
85pub struct NetworkRetryPolicy {
86 /// Whether reconnection is enabled at all.
87 #[serde(default = "default_enabled")]
88 pub enabled: bool,
89
90 /// Maximum number of attempts before giving up. 0 means unlimited.
91 #[serde(default = "default_max_attempts")]
92 pub max_attempts: u32,
93
94 /// Base delay for the first retry.
95 #[serde(
96 default = "default_initial_delay",
97 rename = "initial_delay_ms",
98 deserialize_with = "deserialize_duration_ms"
99 )]
100 pub initial_delay: Duration,
101
102 /// Exponential backoff multiplier applied to each successive attempt.
103 #[serde(default = "default_multiplier")]
104 pub multiplier: f64,
105
106 /// Maximum delay cap regardless of computed backoff.
107 #[serde(
108 default = "default_max_delay",
109 rename = "max_delay_ms",
110 deserialize_with = "deserialize_duration_ms"
111 )]
112 pub max_delay: Duration,
113
114 /// Jitter factor in [0.0, 1.0]. Actual delay is `base ± (base * jitter_factor / 2)`.
115 #[serde(default = "default_jitter_factor")]
116 pub jitter_factor: f64,
117}
118
119impl Default for NetworkRetryPolicy {
120 fn default() -> Self {
121 Self {
122 enabled: default_enabled(),
123 max_attempts: default_max_attempts(),
124 initial_delay: default_initial_delay(),
125 multiplier: default_multiplier(),
126 max_delay: default_max_delay(),
127 jitter_factor: default_jitter_factor(),
128 }
129 }
130}
131
132impl NetworkRetryPolicy {
133 /// Returns a disabled policy — no retries will be attempted.
134 pub fn disabled() -> Self {
135 Self {
136 enabled: false,
137 ..Self::default()
138 }
139 }
140
141 /// Computes the sleep duration for a given zero-based attempt number.
142 ///
143 /// Formula: `clamp(initial * multiplier^attempt, 0, max_delay)` with random jitter.
144 #[must_use]
145 pub fn delay_for(&self, attempt: u32) -> Duration {
146 let base_ms = self.initial_delay.as_millis() as f64;
147 let exp = self.multiplier.powi(attempt as i32);
148 let computed_ms = (base_ms * exp).min(self.max_delay.as_millis() as f64);
149
150 // Apply random jitter: uniform in [-jitter_range/2, +jitter_range/2].
151 // Uses rand to avoid thundering herd when multiple consumers reconnect.
152 let jitter_range = computed_ms * self.jitter_factor;
153 let jitter = if jitter_range > 0.0 {
154 let mut rng = rand::rng();
155 let lo = -jitter_range / 2.0;
156 let hi = jitter_range / 2.0;
157 debug_assert!(lo < hi, "jitter bounds are valid when jitter_range > 0");
158 let dist = Uniform::new(lo, hi).unwrap(); // allow-unwrap
159 rng.sample(dist)
160 } else {
161 0.0
162 };
163
164 let final_ms = (computed_ms + jitter).max(0.0) as u64;
165 let max_delay_ms = u64::try_from(self.max_delay.as_millis()).unwrap_or(u64::MAX);
166 Duration::from_millis(final_ms.min(max_delay_ms))
167 }
168
169 /// Returns `true` if another retry should be attempted.
170 ///
171 /// `attempt` is zero-based: 0 = first attempt, 1 = first retry, etc.
172 #[must_use]
173 pub fn should_retry(&self, attempt: u32) -> bool {
174 self.enabled && (self.max_attempts == 0 || attempt < self.max_attempts)
175 }
176}
177
178/// Executes `op` with reconnect/backoff according to `policy`.
179///
180/// `label`, if `Some`, is emitted as a structured `component` tracing field
181/// and in the retry log message text so operators can identify which component
182/// is retrying (e.g., `ws-producer: transient error — retrying`). Pass `None`
183/// for the pre-0.14 backwards-compatible unlabeled path.
184///
185/// `is_retryable` classifies errors: retryable errors are retried, permanent
186/// errors are not.
187///
188/// # Security note: error Display is logged at WARN level
189///
190/// On every retry, the error's [`Display`](std::fmt::Display) representation
191/// is emitted via `tracing::warn!` for operator visibility during connection
192/// retries. Callers MUST sanitize errors before returning them from `op` if
193/// they may contain sensitive content such as connection strings, embedded
194/// credentials, or host‑port pairs. Sanitization belongs at the source — in
195/// the IO call whose error is wrapped — not here.
196///
197/// This log call is intentional and should not be removed: it provides the
198/// only diagnostic signal that a networked component is retrying and why.
199///
200/// # Example
201/// ```rust,ignore
202/// let result = retry_async(
203/// &config.reconnect,
204/// Some("ws-producer"),
205/// || async move { connect_to_server().await },
206/// |err: &CamelError| matches!(err, CamelError::Io(_)),
207/// ).await?;
208/// ```
209pub async fn retry_async<T, Op, Fut, IsRetryable, E>(
210 policy: &NetworkRetryPolicy,
211 label: Option<&'static str>,
212 op: Op,
213 is_retryable: IsRetryable,
214) -> Result<T, E>
215where
216 Op: FnMut() -> Fut,
217 Fut: Future<Output = Result<T, E>>,
218 IsRetryable: Fn(&E) -> bool,
219 E: std::fmt::Display,
220{
221 retry_async_inner(policy, op, is_retryable, None, label).await
222}
223
224/// Shared private implementation used by both [`retry_async`] and
225/// [`retry_async_cancelable`].
226async fn retry_async_inner<T, Op, Fut, IsRetryable, E>(
227 policy: &NetworkRetryPolicy,
228 mut op: Op,
229 is_retryable: IsRetryable,
230 cancel: Option<&CancellationToken>,
231 label: Option<&'static str>,
232) -> Result<T, E>
233where
234 Op: FnMut() -> Fut,
235 Fut: Future<Output = Result<T, E>>,
236 IsRetryable: Fn(&E) -> bool,
237 E: std::fmt::Display,
238{
239 let mut attempt = 0u32;
240 loop {
241 match op().await {
242 Ok(val) => return Ok(val),
243 Err(err) => {
244 if !is_retryable(&err) || !policy.should_retry(attempt + 1) {
245 return Err(err);
246 }
247 let delay = policy.delay_for(attempt);
248 if let Some(component) = label {
249 tracing::warn!(
250 component,
251 attempt,
252 delay_ms = delay.as_millis(),
253 error = %err,
254 "{component}: transient error — retrying"
255 );
256 } else {
257 tracing::warn!(
258 attempt,
259 delay_ms = delay.as_millis(),
260 error = %err,
261 "transient error — retrying"
262 );
263 }
264 // Honour cancellation only during inter-retry sleep, not
265 // during the operation itself (that is the caller's
266 // responsibility).
267 if let Some(token) = cancel {
268 tokio::select! {
269 biased;
270 _ = token.cancelled() => return Err(err),
271 _ = sleep(delay) => {}
272 }
273 } else {
274 sleep(delay).await;
275 }
276 attempt += 1;
277 }
278 }
279 }
280}
281
282/// Like [`retry_async`] but honours a [`CancellationToken`] during inter-retry sleep.
283///
284/// Same semantics as [`retry_async`] except that if `cancel` fires while waiting
285/// between attempts, the function returns the last operation error immediately.
286/// Cancellation is **not** checked during the operation itself — the caller is
287/// responsible for making the operation itself cancellation-aware if needed.
288///
289/// `label`, if `Some`, is emitted as a structured `component` tracing field
290/// (see [`retry_async`] for details).
291///
292/// # Example
293/// ```rust,ignore
294/// let cancel = CancellationToken::new();
295/// let result = retry_async_cancelable(
296/// &config.reconnect,
297/// Some("container-events"),
298/// || async move { make_network_call().await },
299/// |err| is_transient(err),
300/// &cancel,
301/// ).await;
302/// ```
303pub async fn retry_async_cancelable<T, Op, Fut, IsRetryable, E>(
304 policy: &NetworkRetryPolicy,
305 label: Option<&'static str>,
306 op: Op,
307 is_retryable: IsRetryable,
308 cancel: &CancellationToken,
309) -> Result<T, E>
310where
311 Op: FnMut() -> Fut,
312 Fut: Future<Output = Result<T, E>>,
313 IsRetryable: Fn(&E) -> bool,
314 E: std::fmt::Display,
315{
316 retry_async_inner(policy, op, is_retryable, Some(cancel), label).await
317}
318
319/// Classify a [`CamelError`] as retryable (transient network/IO errors).
320///
321/// Retryable variants:
322/// - [`CamelError::Io`] — I/O errors (connection refused, DNS, etc.)
323/// - [`CamelError::ProcessorError`] whose message contains the literal
324/// `[TRANSIENT]` marker (used by gRPC and other components to flag
325/// retryable-by-classification errors).
326/// - [`CamelError::ProcessorErrorWithSource`] whose message contains the
327/// `[TRANSIENT]` marker (same semantics, preserves source error chain).
328///
329/// Non-retryable variants:
330/// - [`CamelError::Config`], [`CamelError::TypeConversionFailed`],
331/// [`CamelError::Stopped`], [`CamelError::EndpointCreationFailed`],
332/// [`CamelError::ChannelClosed`] — permanent failures
333pub fn is_retryable_camel_error(err: &CamelError) -> bool {
334 matches!(err, CamelError::Io(_))
335 || matches!(err, CamelError::ProcessorError(s) if s.contains("[TRANSIENT]"))
336 || matches!(err, CamelError::ProcessorErrorWithSource(s, _) if s.contains("[TRANSIENT]"))
337}
338
339#[cfg(test)]
340#[path = "network_retry_tests.rs"]
341mod tests;