1use std::num::NonZeroU32;
14use std::ops::ControlFlow;
15use std::sync::Arc;
16use std::time::Duration;
17
18use tokio::time::sleep;
19
20use crate::errors::InvocationError;
21
22pub trait RetryPolicy: Send + Sync + 'static {
29 fn should_retry(&self, ctx: &RetryContext) -> ControlFlow<(), Duration>;
34}
35
36pub struct RetryContext {
38 pub fail_count: NonZeroU32,
40 pub slept_so_far: Duration,
42 pub error: InvocationError,
44}
45
46pub struct NoRetries;
50
51impl RetryPolicy for NoRetries {
52 fn should_retry(&self, _: &RetryContext) -> ControlFlow<(), Duration> {
53 ControlFlow::Break(())
54 }
55}
56
57pub struct AutoSleep {
69 pub threshold: Duration,
73
74 pub io_errors_as_flood_of: Option<Duration>,
77}
78
79impl Default for AutoSleep {
80 fn default() -> Self {
81 Self {
82 threshold: Duration::from_secs(60),
83 io_errors_as_flood_of: Some(Duration::from_secs(1)),
84 }
85 }
86}
87
88fn jitter_duration(base: Duration, seed: u32, max_jitter_secs: u64) -> Duration {
94 let h = {
96 let mut v = seed as u64 ^ 0x9e37_79b9_7f4a_7c15;
97 v ^= v >> 30;
98 v = v.wrapping_mul(0xbf58_476d_1ce4_e5b9);
99 v ^= v >> 27;
100 v = v.wrapping_mul(0x94d0_49bb_1331_11eb);
101 v ^= v >> 31;
102 v
103 };
104 let range_ms = max_jitter_secs * 1000 * 2 + 1;
106 let jitter_ms = (h % range_ms) as i64 - (max_jitter_secs * 1000) as i64;
107 let base_ms = base.as_millis() as i64;
108 let final_ms = (base_ms + jitter_ms).max(0) as u64;
109 Duration::from_millis(final_ms)
110}
111
112impl RetryPolicy for AutoSleep {
113 fn should_retry(&self, ctx: &RetryContext) -> ControlFlow<(), Duration> {
114 match &ctx.error {
115 InvocationError::Rpc(rpc) if rpc.code == 420 && rpc.name == "FLOOD_WAIT" => {
119 let secs = rpc.value.unwrap_or(0) as u64;
120 if secs <= self.threshold.as_secs() {
121 let delay = jitter_duration(Duration::from_secs(secs), ctx.fail_count.get(), 2);
122 tracing::debug!(
123 "[ferogram::retry] FLOOD_WAIT_{secs}: sleeping {delay:?} before retrying"
124 );
125 ControlFlow::Continue(delay)
126 } else {
127 ControlFlow::Break(())
128 }
129 }
130
131 InvocationError::Rpc(rpc) if rpc.code == 420 && rpc.name == "SLOWMODE_WAIT" => {
134 let secs = rpc.value.unwrap_or(0) as u64;
135 if secs <= self.threshold.as_secs() {
136 let delay = jitter_duration(Duration::from_secs(secs), ctx.fail_count.get(), 2);
137 tracing::debug!(
138 "[ferogram::retry] SLOWMODE_WAIT_{secs}: sleeping {delay:?} before retrying"
139 );
140 ControlFlow::Continue(delay)
141 } else {
142 ControlFlow::Break(())
143 }
144 }
145
146 InvocationError::Io(_) if ctx.fail_count.get() <= 1 => {
148 if let Some(d) = self.io_errors_as_flood_of {
149 tracing::debug!(
150 "[ferogram::retry] transient I/O error (attempt {}): sleeping {d:?} before retrying",
151 ctx.fail_count.get()
152 );
153 ControlFlow::Continue(d)
154 } else {
155 ControlFlow::Break(())
156 }
157 }
158
159 _ => ControlFlow::Break(()),
160 }
161 }
162}
163
164pub struct RetryLoop {
187 policy: Arc<dyn RetryPolicy>,
188 ctx: RetryContext,
189}
190
191impl RetryLoop {
192 pub fn new(policy: Arc<dyn RetryPolicy>) -> Self {
193 Self {
194 policy,
195 ctx: RetryContext {
196 fail_count: NonZeroU32::new(1).expect("1 is nonzero"),
197 slept_so_far: Duration::default(),
198 error: InvocationError::Dropped,
199 },
200 }
201 }
202
203 pub async fn advance(&mut self, err: InvocationError) -> Result<(), InvocationError> {
207 self.ctx.error = err;
208 match self.policy.should_retry(&self.ctx) {
209 ControlFlow::Continue(delay) => {
210 sleep(delay).await;
211 self.ctx.slept_so_far += delay;
212 self.ctx.fail_count = self.ctx.fail_count.saturating_add(1);
214 Ok(())
215 }
216 ControlFlow::Break(()) => {
217 Err(std::mem::replace(
219 &mut self.ctx.error,
220 InvocationError::Dropped,
221 ))
222 }
223 }
224 }
225}
226
227#[derive(Debug)]
231enum CbState {
232 Closed { consecutive_failures: u32 },
234 Open { tripped_at: std::time::Instant },
236}
237
238pub struct CircuitBreaker {
264 threshold: u32,
266 cooldown: Duration,
268 state: std::sync::Mutex<CbState>,
269}
270
271impl CircuitBreaker {
272 pub fn new(threshold: u32, cooldown: Duration) -> Self {
277 assert!(
278 threshold >= 1,
279 "CircuitBreaker threshold must be at least 1"
280 );
281 Self {
282 threshold,
283 cooldown,
284 state: std::sync::Mutex::new(CbState::Closed {
285 consecutive_failures: 0,
286 }),
287 }
288 }
289}
290
291impl RetryPolicy for CircuitBreaker {
292 fn should_retry(&self, _ctx: &RetryContext) -> ControlFlow<(), Duration> {
293 let mut state = self.state.lock().expect("lock poisoned");
294 match &*state {
295 CbState::Open { tripped_at } => {
296 if tripped_at.elapsed() >= self.cooldown {
297 *state = CbState::Closed {
299 consecutive_failures: 1,
300 };
301 ControlFlow::Continue(Duration::from_millis(200))
302 } else {
303 ControlFlow::Break(())
305 }
306 }
307 CbState::Closed {
308 consecutive_failures,
309 } => {
310 let new_count = consecutive_failures + 1;
311 if new_count >= self.threshold {
312 tracing::warn!(
313 "[ferogram::retry] circuit breaker tripped after {new_count} consecutive failures; rejecting requests for {:?}",
314 self.cooldown
315 );
316 *state = CbState::Open {
317 tripped_at: std::time::Instant::now(),
318 };
319 ControlFlow::Break(())
320 } else {
321 let backoff_ms = 200u64 * (1u64 << new_count.saturating_sub(1).min(4));
323 *state = CbState::Closed {
324 consecutive_failures: new_count,
325 };
326 ControlFlow::Continue(Duration::from_millis(backoff_ms))
327 }
328 }
329 }
330 }
331}
332
333#[cfg(test)]
336mod tests {
337 use super::*;
338 use crate::errors::RpcError;
339 use std::io;
340
341 fn flood(secs: u32) -> InvocationError {
342 InvocationError::Rpc(RpcError {
343 code: 420,
344 name: "FLOOD_WAIT".into(),
345 value: Some(secs),
346 })
347 }
348
349 fn io_err() -> InvocationError {
350 InvocationError::Io(io::Error::new(io::ErrorKind::ConnectionReset, "reset"))
351 }
352
353 fn rpc(code: i32, name: &str, value: Option<u32>) -> InvocationError {
354 InvocationError::Rpc(RpcError {
355 code,
356 name: name.into(),
357 value,
358 })
359 }
360
361 #[test]
364 fn no_retries_always_breaks() {
365 let policy = NoRetries;
366 let ctx = RetryContext {
367 fail_count: NonZeroU32::new(1).expect("1 is nonzero"),
368 slept_so_far: Duration::default(),
369 error: flood(10),
370 };
371 assert!(matches!(policy.should_retry(&ctx), ControlFlow::Break(())));
372 }
373
374 #[test]
377 fn autosleep_retries_flood_under_threshold() {
378 let policy = AutoSleep::default(); let ctx = RetryContext {
380 fail_count: NonZeroU32::new(1).expect("1 is nonzero"),
381 slept_so_far: Duration::default(),
382 error: flood(30),
383 };
384 match policy.should_retry(&ctx) {
385 ControlFlow::Continue(d) => {
387 let secs = d.as_secs_f64();
388 assert!(
389 secs >= 28.0 && secs <= 32.0,
390 "expected 28-32s delay (jitter), got {secs:.3}s"
391 );
392 }
393 other => panic!("expected Continue, got {other:?}"),
394 }
395 }
396
397 #[test]
398 fn autosleep_breaks_flood_over_threshold() {
399 let policy = AutoSleep::default(); let ctx = RetryContext {
401 fail_count: NonZeroU32::new(1).expect("1 is nonzero"),
402 slept_so_far: Duration::default(),
403 error: flood(120),
404 };
405 assert!(matches!(policy.should_retry(&ctx), ControlFlow::Break(())));
406 }
407
408 #[test]
409 fn autosleep_second_flood_retry_is_honoured() {
410 let policy = AutoSleep::default();
411 let ctx = RetryContext {
412 fail_count: NonZeroU32::new(2).expect("2 is nonzero"),
413 slept_so_far: Duration::from_secs(30),
414 error: flood(30),
415 };
416 match policy.should_retry(&ctx) {
417 ControlFlow::Continue(d) => {
419 let secs = d.as_secs_f64();
420 assert!(
421 secs >= 28.0 && secs <= 32.0,
422 "expected 28-32s on second FLOOD_WAIT, got {secs:.3}s"
423 );
424 }
425 other => panic!("expected Continue on second FLOOD_WAIT, got {other:?}"),
426 }
427 }
428
429 #[test]
430 fn autosleep_retries_io_once() {
431 let policy = AutoSleep::default();
432 let ctx = RetryContext {
433 fail_count: NonZeroU32::new(1).expect("1 is nonzero"),
434 slept_so_far: Duration::default(),
435 error: io_err(),
436 };
437 match policy.should_retry(&ctx) {
438 ControlFlow::Continue(d) => assert_eq!(d, Duration::from_secs(1)),
439 other => panic!("expected Continue, got {other:?}"),
440 }
441 }
442
443 #[test]
444 fn autosleep_no_io_retry_after_first() {
445 let policy = AutoSleep::default();
446 let ctx = RetryContext {
447 fail_count: NonZeroU32::new(4).expect("4 is nonzero"),
448 slept_so_far: Duration::from_secs(3),
449 error: io_err(),
450 };
451 assert!(matches!(policy.should_retry(&ctx), ControlFlow::Break(())));
452 }
453
454 #[test]
455 fn autosleep_breaks_other_rpc() {
456 let policy = AutoSleep::default();
457 let ctx = RetryContext {
458 fail_count: NonZeroU32::new(1).expect("1 is nonzero"),
459 slept_so_far: Duration::default(),
460 error: rpc(400, "BAD_REQUEST", None),
461 };
462 assert!(matches!(policy.should_retry(&ctx), ControlFlow::Break(())));
463 }
464
465 #[test]
468 fn migrate_dc_id_detected() {
469 let e = RpcError {
470 code: 303,
471 name: "PHONE_MIGRATE".into(),
472 value: Some(5),
473 };
474 assert_eq!(e.migrate_dc_id(), Some(5));
475 }
476
477 #[test]
478 fn network_migrate_detected() {
479 let e = RpcError {
480 code: 303,
481 name: "NETWORK_MIGRATE".into(),
482 value: Some(3),
483 };
484 assert_eq!(e.migrate_dc_id(), Some(3));
485 }
486
487 #[test]
488 fn file_migrate_detected() {
489 let e = RpcError {
490 code: 303,
491 name: "FILE_MIGRATE".into(),
492 value: Some(4),
493 };
494 assert_eq!(e.migrate_dc_id(), Some(4));
495 }
496
497 #[test]
498 fn non_migrate_is_none() {
499 let e = RpcError {
500 code: 420,
501 name: "FLOOD_WAIT".into(),
502 value: Some(30),
503 };
504 assert_eq!(e.migrate_dc_id(), None);
505 }
506
507 #[test]
508 fn migrate_falls_back_to_dc2_when_no_value() {
509 let e = RpcError {
510 code: 303,
511 name: "PHONE_MIGRATE".into(),
512 value: None,
513 };
514 assert_eq!(e.migrate_dc_id(), Some(2));
515 }
516
517 #[tokio::test]
520 async fn retry_loop_gives_up_on_no_retries() {
521 let mut rl = RetryLoop::new(Arc::new(NoRetries));
522 let err = rpc(400, "SOMETHING_WRONG", None);
523 let result = rl.advance(err).await;
524 assert!(result.is_err());
525 }
526
527 #[tokio::test]
528 async fn retry_loop_increments_fail_count() {
529 let mut rl = RetryLoop::new(Arc::new(AutoSleep {
530 threshold: Duration::from_secs(60),
531 io_errors_as_flood_of: Some(Duration::from_millis(1)),
532 }));
533 assert!(rl.advance(io_err()).await.is_ok());
534 assert!(rl.advance(io_err()).await.is_err());
535 }
536
537 #[test]
540 fn circuit_breaker_trips_after_threshold() {
541 let cb = CircuitBreaker::new(3, Duration::from_secs(60));
542 let ctx = |n: u32| RetryContext {
543 fail_count: NonZeroU32::new(n).unwrap(),
544 slept_so_far: Duration::default(),
545 error: rpc(500, "INTERNAL", None),
546 };
547 assert!(matches!(cb.should_retry(&ctx(1)), ControlFlow::Continue(_)));
549 assert!(matches!(cb.should_retry(&ctx(2)), ControlFlow::Continue(_)));
550 assert!(matches!(cb.should_retry(&ctx(3)), ControlFlow::Break(())));
552 assert!(matches!(cb.should_retry(&ctx(4)), ControlFlow::Break(())));
554 }
555
556 #[test]
557 fn circuit_breaker_resets_after_cooldown() {
558 let cb = CircuitBreaker::new(2, Duration::from_millis(10));
559 let ctx = |n: u32| RetryContext {
560 fail_count: NonZeroU32::new(n).unwrap(),
561 slept_so_far: Duration::default(),
562 error: rpc(500, "INTERNAL", None),
563 };
564 assert!(matches!(cb.should_retry(&ctx(1)), ControlFlow::Continue(_)));
566 assert!(matches!(cb.should_retry(&ctx(2)), ControlFlow::Break(())));
567 std::thread::sleep(Duration::from_millis(20));
569 assert!(matches!(cb.should_retry(&ctx(1)), ControlFlow::Continue(_)));
571 }
572}