Skip to main content

dapr_durabletask/worker/
reconnect_policy.rs

1use std::time::Duration;
2
3/// Exponential-backoff policy used by [`TaskHubGrpcWorker`](super::TaskHubGrpcWorker) when the sidecar
4/// is unavailable.
5///
6/// The worker applies this policy for both the initial connection attempt and
7/// every subsequent reconnect after the gRPC stream drops. On a successful
8/// connection the delay is reset to [`initial_delay`](Self::initial_delay).
9///
10/// # Delay calculation
11///
12/// ```text
13/// delay[0] = initial_delay
14/// delay[n] = min(delay[n-1] * multiplier, max_delay) ± jitter
15/// ```
16///
17/// Jitter (when enabled) adds a uniformly-distributed ±10 % random offset to
18/// each delay, which reduces "thundering herd" reconnects in environments
19/// with many workers.
20///
21/// # Examples
22///
23/// ```rust
24/// use std::time::Duration;
25/// use dapr_durabletask::worker::ReconnectPolicy;
26///
27/// // Fast reconnect for development (no jitter, 2 max attempts):
28/// let policy = ReconnectPolicy::new()
29///     .with_initial_delay(Duration::from_millis(200))
30///     .with_max_delay(Duration::from_secs(5))
31///     .with_multiplier(2.0)
32///     .with_max_attempts(2);
33///
34/// // Production policy with jitter:
35/// let policy = ReconnectPolicy::new()
36///     .with_initial_delay(Duration::from_secs(1))
37///     .with_max_delay(Duration::from_secs(60))
38///     .with_multiplier(1.5)
39///     .with_jitter(true);
40/// ```
41#[derive(Debug, Clone)]
42pub struct ReconnectPolicy {
43    /// Delay before the first reconnect attempt. Defaults to 1 s.
44    pub initial_delay: Duration,
45
46    /// Upper bound on the delay between reconnect attempts. Defaults to 30 s.
47    pub max_delay: Duration,
48
49    /// Multiplier applied to the current delay after each failed attempt.
50    /// Must be ≥ 1.0. Defaults to 1.5.
51    pub multiplier: f64,
52
53    /// Maximum number of reconnect attempts before `start()` returns an error.
54    /// `None` means retry indefinitely. Defaults to `None`.
55    pub max_attempts: Option<u32>,
56
57    /// When `true`, add a uniformly-distributed ±10 % random offset to each
58    /// delay. Defaults to `true`.
59    pub jitter: bool,
60}
61
62impl Default for ReconnectPolicy {
63    fn default() -> Self {
64        Self {
65            initial_delay: Duration::from_secs(1),
66            max_delay: Duration::from_secs(30),
67            multiplier: 1.5,
68            max_attempts: None,
69            jitter: true,
70        }
71    }
72}
73
74impl ReconnectPolicy {
75    /// Create a policy with default values.
76    pub fn new() -> Self {
77        Self::default()
78    }
79
80    /// Set the initial delay before the first reconnect attempt.
81    pub fn with_initial_delay(mut self, delay: Duration) -> Self {
82        self.initial_delay = delay;
83        self
84    }
85
86    /// Set the maximum delay between reconnect attempts.
87    pub fn with_max_delay(mut self, max: Duration) -> Self {
88        self.max_delay = max;
89        self
90    }
91
92    /// Set the backoff multiplier (must be ≥ 1.0).
93    pub fn with_multiplier(mut self, mult: f64) -> Self {
94        self.multiplier = mult.max(1.0);
95        self
96    }
97
98    /// Set the maximum number of reconnect attempts.
99    /// `None` retries indefinitely.
100    pub fn with_max_attempts(mut self, n: u32) -> Self {
101        self.max_attempts = Some(n);
102        self
103    }
104
105    /// Enable or disable ±10 % random jitter on each delay.
106    pub fn with_jitter(mut self, jitter: bool) -> Self {
107        self.jitter = jitter;
108        self
109    }
110}
111
112// ── Internal backoff iterator ─────────────────────────────────────────────────
113
114/// Stateful iterator that computes successive backoff delays.
115pub(crate) struct BackoffIter<'a> {
116    policy: &'a ReconnectPolicy,
117    current: Duration,
118    attempts: u32,
119}
120
121impl<'a> BackoffIter<'a> {
122    pub(crate) fn new(policy: &'a ReconnectPolicy) -> Self {
123        Self {
124            policy,
125            current: policy.initial_delay,
126            attempts: 0,
127        }
128    }
129
130    /// Returns the next delay, or `None` if `max_attempts` has been reached.
131    pub(crate) fn next_delay(&mut self) -> Option<Duration> {
132        if let Some(max) = self.policy.max_attempts {
133            if self.attempts >= max {
134                return None;
135            }
136        }
137        self.attempts += 1;
138
139        let delay = self.current;
140
141        let next_secs = (self.current.as_secs_f64() * self.policy.multiplier)
142            .min(self.policy.max_delay.as_secs_f64());
143        self.current = Duration::from_secs_f64(next_secs);
144
145        if self.policy.jitter {
146            Some(apply_jitter(delay))
147        } else {
148            Some(delay)
149        }
150    }
151
152    /// Reset the delay back to `initial_delay` and clear the attempt counter.
153    pub(crate) fn reset(&mut self) {
154        self.current = self.policy.initial_delay;
155        self.attempts = 0;
156    }
157}
158
159/// Add a uniformly-distributed ±10 % offset to `d`.
160fn apply_jitter(d: Duration) -> Duration {
161    // Simple LCG-based random — avoids pulling in the `rand` crate.
162    // Seeded from the lower bits of the current time.
163    static SEED: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
164    let seed = SEED.fetch_add(
165        std::time::SystemTime::now()
166            .duration_since(std::time::UNIX_EPOCH)
167            .map(|d| d.subsec_nanos() as u64)
168            .unwrap_or(12345),
169        std::sync::atomic::Ordering::Relaxed,
170    );
171    // LCG parameters (Knuth).
172    let r = seed
173        .wrapping_mul(6_364_136_223_846_793_005)
174        .wrapping_add(1_442_695_040_888_963_407);
175    // Map to [-0.1, +0.1].
176    let factor = (r % 201) as f64 / 1000.0 - 0.1;
177    let adjusted = d.as_secs_f64() * (1.0 + factor);
178    Duration::from_secs_f64(adjusted.max(0.0))
179}
180
181#[cfg(test)]
182mod tests {
183    use super::*;
184
185    #[test]
186    fn test_backoff_grows_to_max() {
187        let policy = ReconnectPolicy::new()
188            .with_initial_delay(Duration::from_millis(100))
189            .with_max_delay(Duration::from_millis(500))
190            .with_multiplier(2.0)
191            .with_jitter(false);
192
193        let mut iter = BackoffIter::new(&policy);
194
195        assert_eq!(iter.next_delay(), Some(Duration::from_millis(100)));
196        assert_eq!(iter.next_delay(), Some(Duration::from_millis(200)));
197        assert_eq!(iter.next_delay(), Some(Duration::from_millis(400)));
198        // Capped at max_delay.
199        assert_eq!(iter.next_delay(), Some(Duration::from_millis(500)));
200        assert_eq!(iter.next_delay(), Some(Duration::from_millis(500)));
201    }
202
203    #[test]
204    fn test_max_attempts_exhausted() {
205        let policy = ReconnectPolicy::new()
206            .with_initial_delay(Duration::from_millis(10))
207            .with_max_attempts(3)
208            .with_jitter(false);
209
210        let mut iter = BackoffIter::new(&policy);
211
212        assert!(iter.next_delay().is_some());
213        assert!(iter.next_delay().is_some());
214        assert!(iter.next_delay().is_some());
215        assert_eq!(iter.next_delay(), None);
216    }
217
218    #[test]
219    fn test_reset_restarts_delay() {
220        let policy = ReconnectPolicy::new()
221            .with_initial_delay(Duration::from_millis(100))
222            .with_multiplier(2.0)
223            .with_max_attempts(10)
224            .with_jitter(false);
225
226        let mut iter = BackoffIter::new(&policy);
227        iter.next_delay(); // 100 ms
228        iter.next_delay(); // 200 ms
229        iter.reset();
230
231        // After reset, delay starts again from initial_delay and attempt
232        // counter is cleared.
233        assert_eq!(iter.next_delay(), Some(Duration::from_millis(100)));
234    }
235
236    #[test]
237    fn test_jitter_stays_within_bounds() {
238        let policy = ReconnectPolicy::new()
239            .with_initial_delay(Duration::from_millis(1000))
240            .with_max_delay(Duration::from_millis(2000))
241            .with_multiplier(1.0) // keep delay constant at 1000 ms
242            .with_jitter(true);
243
244        let mut iter = BackoffIter::new(&policy);
245        for _ in 0..50 {
246            let d = iter.next_delay().unwrap().as_secs_f64();
247            // ±10 % around 1000 ms → [900, 1100] ms.
248            assert!((0.9..=1.11).contains(&d), "jitter out of range: {d}");
249        }
250    }
251
252    #[test]
253    fn test_multiplier_below_one_clamped() {
254        let policy = ReconnectPolicy::new()
255            .with_initial_delay(Duration::from_millis(100))
256            .with_multiplier(0.5) // clamped to 1.0
257            .with_jitter(false);
258        let mut iter = BackoffIter::new(&policy);
259        // With multiplier clamped to 1.0 the delay stays constant.
260        assert_eq!(iter.next_delay(), Some(Duration::from_millis(100)));
261        assert_eq!(iter.next_delay(), Some(Duration::from_millis(100)));
262    }
263}