Skip to main content

dapr_durabletask/worker/
reconnect_policy.rs

1use std::time::Duration;
2
3/// Exponential-backoff policy used by [`TaskHubGrpcWorker`](super::TaskHubGrpcWorker) when the sidecar
4/// is unavailable.
5///
6/// The worker applies this policy for both the initial connection attempt and
7/// every subsequent reconnect after the gRPC stream drops. On a successful
8/// connection the delay is reset to [`initial_delay`](Self::initial_delay).
9///
10/// # Delay calculation
11///
12/// ```text
13/// delay[0] = initial_delay
14/// delay[n] = min(delay[n-1] * multiplier, max_delay) ± jitter
15/// ```
16///
17/// Jitter (when enabled) adds a uniformly-distributed ±10 % random offset to
18/// each delay, which reduces "thundering herd" reconnects in environments
19/// with many workers.
20///
21/// # Examples
22///
23/// ```rust
24/// use std::time::Duration;
25/// use dapr_durabletask::worker::ReconnectPolicy;
26///
27/// // Fast reconnect for development (no jitter, 2 max attempts):
28/// let policy = ReconnectPolicy::new()
29///     .with_initial_delay(Duration::from_millis(200))
30///     .with_max_delay(Duration::from_secs(5))
31///     .with_multiplier(2.0)
32///     .with_max_attempts(2);
33///
34/// // Production policy with jitter:
35/// let policy = ReconnectPolicy::new()
36///     .with_initial_delay(Duration::from_secs(1))
37///     .with_max_delay(Duration::from_secs(60))
38///     .with_multiplier(1.5)
39///     .with_jitter(true);
40/// ```
41#[derive(Debug, Clone)]
42pub struct ReconnectPolicy {
43    /// Delay before the first reconnect attempt. Defaults to 1 s.
44    pub initial_delay: Duration,
45
46    /// Upper bound on the delay between reconnect attempts. Defaults to 30 s.
47    pub max_delay: Duration,
48
49    /// Multiplier applied to the current delay after each failed attempt.
50    /// Must be ≥ 1.0. Defaults to 1.5.
51    pub multiplier: f64,
52
53    /// Maximum number of reconnect attempts before `start()` returns an error.
54    /// `None` means retry indefinitely. Defaults to `None`.
55    pub max_attempts: Option<u32>,
56
57    /// When `true`, add a uniformly-distributed ±10 % random offset to each
58    /// delay. Defaults to `true`.
59    pub jitter: bool,
60}
61
62impl Default for ReconnectPolicy {
63    fn default() -> Self {
64        Self {
65            initial_delay: Duration::from_secs(1),
66            max_delay: Duration::from_secs(30),
67            multiplier: 1.5,
68            max_attempts: None,
69            jitter: true,
70        }
71    }
72}
73
74impl ReconnectPolicy {
75    /// Create a policy with default values.
76    pub fn new() -> Self {
77        Self::default()
78    }
79
80    /// Set the initial delay before the first reconnect attempt.
81    pub fn with_initial_delay(mut self, delay: Duration) -> Self {
82        self.initial_delay = delay;
83        self
84    }
85
86    /// Set the maximum delay between reconnect attempts.
87    pub fn with_max_delay(mut self, max: Duration) -> Self {
88        self.max_delay = max;
89        self
90    }
91
92    /// Set the backoff multiplier (must be ≥ 1.0).
93    pub fn with_multiplier(mut self, mult: f64) -> Self {
94        self.multiplier = mult.max(1.0);
95        self
96    }
97
98    /// Set the maximum number of reconnect attempts.
99    /// `None` retries indefinitely.
100    pub fn with_max_attempts(mut self, n: u32) -> Self {
101        self.max_attempts = Some(n);
102        self
103    }
104
105    /// Enable or disable ±10 % random jitter on each delay.
106    pub fn with_jitter(mut self, jitter: bool) -> Self {
107        self.jitter = jitter;
108        self
109    }
110}
111
112// ── Internal backoff iterator ─────────────────────────────────────────────────
113
114/// Stateful iterator that computes successive backoff delays.
115pub(crate) struct BackoffIter<'a> {
116    policy: &'a ReconnectPolicy,
117    current: Duration,
118    attempts: u32,
119}
120
121impl<'a> BackoffIter<'a> {
122    pub(crate) fn new(policy: &'a ReconnectPolicy) -> Self {
123        Self {
124            policy,
125            current: policy.initial_delay,
126            attempts: 0,
127        }
128    }
129
130    /// Returns the next delay, or `None` if `max_attempts` has been reached.
131    pub(crate) fn next_delay(&mut self) -> Option<Duration> {
132        if let Some(max) = self.policy.max_attempts
133            && self.attempts >= max
134        {
135            return None;
136        }
137        self.attempts += 1;
138
139        let delay = self.current;
140
141        let next_secs = (self.current.as_secs_f64() * self.policy.multiplier)
142            .min(self.policy.max_delay.as_secs_f64());
143        self.current = Duration::from_secs_f64(next_secs);
144
145        if self.policy.jitter {
146            Some(apply_jitter(delay))
147        } else {
148            Some(delay)
149        }
150    }
151
152    /// Reset the delay back to `initial_delay` and clear the attempt counter.
153    pub(crate) fn reset(&mut self) {
154        self.current = self.policy.initial_delay;
155        self.attempts = 0;
156    }
157}
158
159/// Add a uniformly-distributed ±10 % offset to `d`.
160fn apply_jitter(d: Duration) -> Duration {
161    // ±10 % uniform jitter.
162    let factor = rand::random_range(-0.1..=0.1);
163    let adjusted = d.as_secs_f64() * (1.0 + factor);
164    Duration::from_secs_f64(adjusted.max(0.0))
165}
166
167#[cfg(test)]
168mod tests {
169    use super::*;
170
171    #[test]
172    fn test_backoff_grows_to_max() {
173        let policy = ReconnectPolicy::new()
174            .with_initial_delay(Duration::from_millis(100))
175            .with_max_delay(Duration::from_millis(500))
176            .with_multiplier(2.0)
177            .with_jitter(false);
178
179        let mut iter = BackoffIter::new(&policy);
180
181        assert_eq!(iter.next_delay(), Some(Duration::from_millis(100)));
182        assert_eq!(iter.next_delay(), Some(Duration::from_millis(200)));
183        assert_eq!(iter.next_delay(), Some(Duration::from_millis(400)));
184        // Capped at max_delay.
185        assert_eq!(iter.next_delay(), Some(Duration::from_millis(500)));
186        assert_eq!(iter.next_delay(), Some(Duration::from_millis(500)));
187    }
188
189    #[test]
190    fn test_max_attempts_exhausted() {
191        let policy = ReconnectPolicy::new()
192            .with_initial_delay(Duration::from_millis(10))
193            .with_max_attempts(3)
194            .with_jitter(false);
195
196        let mut iter = BackoffIter::new(&policy);
197
198        assert!(iter.next_delay().is_some());
199        assert!(iter.next_delay().is_some());
200        assert!(iter.next_delay().is_some());
201        assert_eq!(iter.next_delay(), None);
202    }
203
204    #[test]
205    fn test_reset_restarts_delay() {
206        let policy = ReconnectPolicy::new()
207            .with_initial_delay(Duration::from_millis(100))
208            .with_multiplier(2.0)
209            .with_max_attempts(10)
210            .with_jitter(false);
211
212        let mut iter = BackoffIter::new(&policy);
213        iter.next_delay(); // 100 ms
214        iter.next_delay(); // 200 ms
215        iter.reset();
216
217        // After reset, delay starts again from initial_delay and attempt
218        // counter is cleared.
219        assert_eq!(iter.next_delay(), Some(Duration::from_millis(100)));
220    }
221
222    #[test]
223    fn test_jitter_stays_within_bounds() {
224        let policy = ReconnectPolicy::new()
225            .with_initial_delay(Duration::from_millis(1000))
226            .with_max_delay(Duration::from_millis(2000))
227            .with_multiplier(1.0) // keep delay constant at 1000 ms
228            .with_jitter(true);
229
230        let mut iter = BackoffIter::new(&policy);
231        for _ in 0..50 {
232            let d = iter.next_delay().unwrap().as_secs_f64();
233            // ±10 % around 1000 ms → [900, 1100] ms.
234            assert!((0.9..=1.11).contains(&d), "jitter out of range: {d}");
235        }
236    }
237
238    #[test]
239    fn test_multiplier_below_one_clamped() {
240        let policy = ReconnectPolicy::new()
241            .with_initial_delay(Duration::from_millis(100))
242            .with_multiplier(0.5) // clamped to 1.0
243            .with_jitter(false);
244        let mut iter = BackoffIter::new(&policy);
245        // With multiplier clamped to 1.0 the delay stays constant.
246        assert_eq!(iter.next_delay(), Some(Duration::from_millis(100)));
247        assert_eq!(iter.next_delay(), Some(Duration::from_millis(100)));
248    }
249}