dapr_durabletask/worker/
reconnect_policy.rs1use std::time::Duration;
2
3#[derive(Debug, Clone)]
42pub struct ReconnectPolicy {
43 pub initial_delay: Duration,
45
46 pub max_delay: Duration,
48
49 pub multiplier: f64,
52
53 pub max_attempts: Option<u32>,
56
57 pub jitter: bool,
60}
61
62impl Default for ReconnectPolicy {
63 fn default() -> Self {
64 Self {
65 initial_delay: Duration::from_secs(1),
66 max_delay: Duration::from_secs(30),
67 multiplier: 1.5,
68 max_attempts: None,
69 jitter: true,
70 }
71 }
72}
73
74impl ReconnectPolicy {
75 pub fn new() -> Self {
77 Self::default()
78 }
79
80 pub fn with_initial_delay(mut self, delay: Duration) -> Self {
82 self.initial_delay = delay;
83 self
84 }
85
86 pub fn with_max_delay(mut self, max: Duration) -> Self {
88 self.max_delay = max;
89 self
90 }
91
92 pub fn with_multiplier(mut self, mult: f64) -> Self {
94 self.multiplier = mult.max(1.0);
95 self
96 }
97
98 pub fn with_max_attempts(mut self, n: u32) -> Self {
101 self.max_attempts = Some(n);
102 self
103 }
104
105 pub fn with_jitter(mut self, jitter: bool) -> Self {
107 self.jitter = jitter;
108 self
109 }
110}
111
112pub(crate) struct BackoffIter<'a> {
116 policy: &'a ReconnectPolicy,
117 current: Duration,
118 attempts: u32,
119}
120
121impl<'a> BackoffIter<'a> {
122 pub(crate) fn new(policy: &'a ReconnectPolicy) -> Self {
123 Self {
124 policy,
125 current: policy.initial_delay,
126 attempts: 0,
127 }
128 }
129
130 pub(crate) fn next_delay(&mut self) -> Option<Duration> {
132 if let Some(max) = self.policy.max_attempts {
133 if self.attempts >= max {
134 return None;
135 }
136 }
137 self.attempts += 1;
138
139 let delay = self.current;
140
141 let next_secs = (self.current.as_secs_f64() * self.policy.multiplier)
142 .min(self.policy.max_delay.as_secs_f64());
143 self.current = Duration::from_secs_f64(next_secs);
144
145 if self.policy.jitter {
146 Some(apply_jitter(delay))
147 } else {
148 Some(delay)
149 }
150 }
151
152 pub(crate) fn reset(&mut self) {
154 self.current = self.policy.initial_delay;
155 self.attempts = 0;
156 }
157}
158
159fn apply_jitter(d: Duration) -> Duration {
161 static SEED: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
164 let seed = SEED.fetch_add(
165 std::time::SystemTime::now()
166 .duration_since(std::time::UNIX_EPOCH)
167 .map(|d| d.subsec_nanos() as u64)
168 .unwrap_or(12345),
169 std::sync::atomic::Ordering::Relaxed,
170 );
171 let r = seed
173 .wrapping_mul(6_364_136_223_846_793_005)
174 .wrapping_add(1_442_695_040_888_963_407);
175 let factor = (r % 201) as f64 / 1000.0 - 0.1;
177 let adjusted = d.as_secs_f64() * (1.0 + factor);
178 Duration::from_secs_f64(adjusted.max(0.0))
179}
180
181#[cfg(test)]
182mod tests {
183 use super::*;
184
185 #[test]
186 fn test_backoff_grows_to_max() {
187 let policy = ReconnectPolicy::new()
188 .with_initial_delay(Duration::from_millis(100))
189 .with_max_delay(Duration::from_millis(500))
190 .with_multiplier(2.0)
191 .with_jitter(false);
192
193 let mut iter = BackoffIter::new(&policy);
194
195 assert_eq!(iter.next_delay(), Some(Duration::from_millis(100)));
196 assert_eq!(iter.next_delay(), Some(Duration::from_millis(200)));
197 assert_eq!(iter.next_delay(), Some(Duration::from_millis(400)));
198 assert_eq!(iter.next_delay(), Some(Duration::from_millis(500)));
200 assert_eq!(iter.next_delay(), Some(Duration::from_millis(500)));
201 }
202
203 #[test]
204 fn test_max_attempts_exhausted() {
205 let policy = ReconnectPolicy::new()
206 .with_initial_delay(Duration::from_millis(10))
207 .with_max_attempts(3)
208 .with_jitter(false);
209
210 let mut iter = BackoffIter::new(&policy);
211
212 assert!(iter.next_delay().is_some());
213 assert!(iter.next_delay().is_some());
214 assert!(iter.next_delay().is_some());
215 assert_eq!(iter.next_delay(), None);
216 }
217
218 #[test]
219 fn test_reset_restarts_delay() {
220 let policy = ReconnectPolicy::new()
221 .with_initial_delay(Duration::from_millis(100))
222 .with_multiplier(2.0)
223 .with_max_attempts(10)
224 .with_jitter(false);
225
226 let mut iter = BackoffIter::new(&policy);
227 iter.next_delay(); iter.next_delay(); iter.reset();
230
231 assert_eq!(iter.next_delay(), Some(Duration::from_millis(100)));
234 }
235
236 #[test]
237 fn test_jitter_stays_within_bounds() {
238 let policy = ReconnectPolicy::new()
239 .with_initial_delay(Duration::from_millis(1000))
240 .with_max_delay(Duration::from_millis(2000))
241 .with_multiplier(1.0) .with_jitter(true);
243
244 let mut iter = BackoffIter::new(&policy);
245 for _ in 0..50 {
246 let d = iter.next_delay().unwrap().as_secs_f64();
247 assert!((0.9..=1.11).contains(&d), "jitter out of range: {d}");
249 }
250 }
251
252 #[test]
253 fn test_multiplier_below_one_clamped() {
254 let policy = ReconnectPolicy::new()
255 .with_initial_delay(Duration::from_millis(100))
256 .with_multiplier(0.5) .with_jitter(false);
258 let mut iter = BackoffIter::new(&policy);
259 assert_eq!(iter.next_delay(), Some(Duration::from_millis(100)));
261 assert_eq!(iter.next_delay(), Some(Duration::from_millis(100)));
262 }
263}