1use std::num::NonZeroU32;
14use std::ops::ControlFlow;
15use std::sync::Arc;
16use std::time::Duration;
17
18use tokio::time::sleep;
19
20use crate::errors::InvocationError;
21
22pub trait RetryPolicy: Send + Sync + 'static {
29 fn should_retry(&self, ctx: &RetryContext) -> ControlFlow<(), Duration>;
34}
35
36pub struct RetryContext {
38 pub fail_count: NonZeroU32,
40 pub slept_so_far: Duration,
42 pub error: InvocationError,
44}
45
46pub struct NoRetries;
50
51impl RetryPolicy for NoRetries {
52 fn should_retry(&self, _: &RetryContext) -> ControlFlow<(), Duration> {
53 ControlFlow::Break(())
54 }
55}
56
57pub struct AutoSleep {
69 pub threshold: Duration,
73
74 pub io_errors_as_flood_of: Option<Duration>,
77}
78
79impl Default for AutoSleep {
80 fn default() -> Self {
81 Self {
82 threshold: Duration::from_secs(60),
83 io_errors_as_flood_of: Some(Duration::from_secs(1)),
84 }
85 }
86}
87
88fn jitter_duration(base: Duration, seed: u32, max_jitter_secs: u64) -> Duration {
94 let h = {
96 let mut v = seed as u64 ^ 0x9e37_79b9_7f4a_7c15;
97 v ^= v >> 30;
98 v = v.wrapping_mul(0xbf58_476d_1ce4_e5b9);
99 v ^= v >> 27;
100 v = v.wrapping_mul(0x94d0_49bb_1331_11eb);
101 v ^= v >> 31;
102 v
103 };
104 let range_ms = max_jitter_secs * 1000 * 2 + 1;
106 let jitter_ms = (h % range_ms) as i64 - (max_jitter_secs * 1000) as i64;
107 let base_ms = base.as_millis() as i64;
108 let final_ms = (base_ms + jitter_ms).max(0) as u64;
109 Duration::from_millis(final_ms)
110}
111
112impl RetryPolicy for AutoSleep {
113 fn should_retry(&self, ctx: &RetryContext) -> ControlFlow<(), Duration> {
114 match &ctx.error {
115 InvocationError::Rpc(rpc) if rpc.code == 420 && rpc.name == "FLOOD_WAIT" => {
119 let secs = rpc.value.unwrap_or(0) as u64;
120 if secs <= self.threshold.as_secs() {
121 let delay = jitter_duration(Duration::from_secs(secs), ctx.fail_count.get(), 2);
122 tracing::debug!(
123 "[ferogram::retry] FLOOD_WAIT_{secs}: sleeping {delay:?} before retrying"
124 );
125 ControlFlow::Continue(delay)
126 } else {
127 ControlFlow::Break(())
128 }
129 }
130
131 InvocationError::Rpc(rpc) if rpc.code == 420 && rpc.name == "SLOWMODE_WAIT" => {
134 let secs = rpc.value.unwrap_or(0) as u64;
135 if secs <= self.threshold.as_secs() {
136 let delay = jitter_duration(Duration::from_secs(secs), ctx.fail_count.get(), 2);
137 tracing::debug!(
138 "[ferogram::retry] SLOWMODE_WAIT_{secs}: sleeping {delay:?} before retrying"
139 );
140 ControlFlow::Continue(delay)
141 } else {
142 ControlFlow::Break(())
143 }
144 }
145
146 InvocationError::Io(_) if ctx.fail_count.get() <= 1 => {
148 if let Some(d) = self.io_errors_as_flood_of {
149 tracing::debug!(
150 "[ferogram::retry] transient I/O error (attempt {}): sleeping {d:?} before retrying",
151 ctx.fail_count.get()
152 );
153 ControlFlow::Continue(d)
154 } else {
155 ControlFlow::Break(())
156 }
157 }
158
159 _ => ControlFlow::Break(()),
160 }
161 }
162}
163
164pub struct RetryLoop {
187 policy: Arc<dyn RetryPolicy>,
188 ctx: RetryContext,
189}
190
191impl RetryLoop {
192 pub fn new(policy: Arc<dyn RetryPolicy>) -> Self {
195 Self {
196 policy,
197 ctx: RetryContext {
198 fail_count: NonZeroU32::new(1).expect("1 is nonzero"),
199 slept_so_far: Duration::default(),
200 error: InvocationError::Dropped,
201 },
202 }
203 }
204
205 pub async fn advance(&mut self, err: InvocationError) -> Result<(), InvocationError> {
209 self.ctx.error = err;
210 match self.policy.should_retry(&self.ctx) {
211 ControlFlow::Continue(delay) => {
212 sleep(delay).await;
213 self.ctx.slept_so_far += delay;
214 self.ctx.fail_count = self.ctx.fail_count.saturating_add(1);
216 Ok(())
217 }
218 ControlFlow::Break(()) => {
219 Err(std::mem::replace(
221 &mut self.ctx.error,
222 InvocationError::Dropped,
223 ))
224 }
225 }
226 }
227}
228
229#[derive(Debug)]
233enum CbState {
234 Closed { consecutive_failures: u32 },
236 Open { tripped_at: std::time::Instant },
238}
239
240pub struct CircuitBreaker {
266 threshold: u32,
268 cooldown: Duration,
270 state: std::sync::Mutex<CbState>,
271}
272
273impl CircuitBreaker {
274 pub fn new(threshold: u32, cooldown: Duration) -> Self {
279 assert!(
280 threshold >= 1,
281 "CircuitBreaker threshold must be at least 1"
282 );
283 Self {
284 threshold,
285 cooldown,
286 state: std::sync::Mutex::new(CbState::Closed {
287 consecutive_failures: 0,
288 }),
289 }
290 }
291}
292
293impl RetryPolicy for CircuitBreaker {
294 fn should_retry(&self, _ctx: &RetryContext) -> ControlFlow<(), Duration> {
295 let mut state = self.state.lock().expect("lock poisoned");
296 match &*state {
297 CbState::Open { tripped_at } => {
298 if tripped_at.elapsed() >= self.cooldown {
299 *state = CbState::Closed {
301 consecutive_failures: 1,
302 };
303 ControlFlow::Continue(Duration::from_millis(200))
304 } else {
305 ControlFlow::Break(())
307 }
308 }
309 CbState::Closed {
310 consecutive_failures,
311 } => {
312 let new_count = consecutive_failures + 1;
313 if new_count >= self.threshold {
314 tracing::warn!(
315 "[ferogram::retry] circuit breaker tripped after {new_count} consecutive failures; rejecting requests for {:?}",
316 self.cooldown
317 );
318 *state = CbState::Open {
319 tripped_at: std::time::Instant::now(),
320 };
321 ControlFlow::Break(())
322 } else {
323 let backoff_ms = 200u64 * (1u64 << new_count.saturating_sub(1).min(4));
325 *state = CbState::Closed {
326 consecutive_failures: new_count,
327 };
328 ControlFlow::Continue(Duration::from_millis(backoff_ms))
329 }
330 }
331 }
332 }
333}
334
335#[cfg(test)]
338mod tests {
339 use super::*;
340 use crate::errors::RpcError;
341 use std::io;
342
343 fn flood(secs: u32) -> InvocationError {
344 InvocationError::Rpc(RpcError {
345 code: 420,
346 name: "FLOOD_WAIT".into(),
347 value: Some(secs),
348 })
349 }
350
351 fn io_err() -> InvocationError {
352 InvocationError::Io(io::Error::new(io::ErrorKind::ConnectionReset, "reset"))
353 }
354
355 fn rpc(code: i32, name: &str, value: Option<u32>) -> InvocationError {
356 InvocationError::Rpc(RpcError {
357 code,
358 name: name.into(),
359 value,
360 })
361 }
362
363 #[test]
366 fn no_retries_always_breaks() {
367 let policy = NoRetries;
368 let ctx = RetryContext {
369 fail_count: NonZeroU32::new(1).expect("1 is nonzero"),
370 slept_so_far: Duration::default(),
371 error: flood(10),
372 };
373 assert!(matches!(policy.should_retry(&ctx), ControlFlow::Break(())));
374 }
375
376 #[test]
379 fn autosleep_retries_flood_under_threshold() {
380 let policy = AutoSleep::default(); let ctx = RetryContext {
382 fail_count: NonZeroU32::new(1).expect("1 is nonzero"),
383 slept_so_far: Duration::default(),
384 error: flood(30),
385 };
386 match policy.should_retry(&ctx) {
387 ControlFlow::Continue(d) => {
389 let secs = d.as_secs_f64();
390 assert!(
391 secs >= 28.0 && secs <= 32.0,
392 "expected 28-32s delay (jitter), got {secs:.3}s"
393 );
394 }
395 other => panic!("expected Continue, got {other:?}"),
396 }
397 }
398
399 #[test]
400 fn autosleep_breaks_flood_over_threshold() {
401 let policy = AutoSleep::default(); let ctx = RetryContext {
403 fail_count: NonZeroU32::new(1).expect("1 is nonzero"),
404 slept_so_far: Duration::default(),
405 error: flood(120),
406 };
407 assert!(matches!(policy.should_retry(&ctx), ControlFlow::Break(())));
408 }
409
410 #[test]
411 fn autosleep_second_flood_retry_is_honoured() {
412 let policy = AutoSleep::default();
413 let ctx = RetryContext {
414 fail_count: NonZeroU32::new(2).expect("2 is nonzero"),
415 slept_so_far: Duration::from_secs(30),
416 error: flood(30),
417 };
418 match policy.should_retry(&ctx) {
419 ControlFlow::Continue(d) => {
421 let secs = d.as_secs_f64();
422 assert!(
423 secs >= 28.0 && secs <= 32.0,
424 "expected 28-32s on second FLOOD_WAIT, got {secs:.3}s"
425 );
426 }
427 other => panic!("expected Continue on second FLOOD_WAIT, got {other:?}"),
428 }
429 }
430
431 #[test]
432 fn autosleep_retries_io_once() {
433 let policy = AutoSleep::default();
434 let ctx = RetryContext {
435 fail_count: NonZeroU32::new(1).expect("1 is nonzero"),
436 slept_so_far: Duration::default(),
437 error: io_err(),
438 };
439 match policy.should_retry(&ctx) {
440 ControlFlow::Continue(d) => assert_eq!(d, Duration::from_secs(1)),
441 other => panic!("expected Continue, got {other:?}"),
442 }
443 }
444
445 #[test]
446 fn autosleep_no_io_retry_after_first() {
447 let policy = AutoSleep::default();
448 let ctx = RetryContext {
449 fail_count: NonZeroU32::new(4).expect("4 is nonzero"),
450 slept_so_far: Duration::from_secs(3),
451 error: io_err(),
452 };
453 assert!(matches!(policy.should_retry(&ctx), ControlFlow::Break(())));
454 }
455
456 #[test]
457 fn autosleep_breaks_other_rpc() {
458 let policy = AutoSleep::default();
459 let ctx = RetryContext {
460 fail_count: NonZeroU32::new(1).expect("1 is nonzero"),
461 slept_so_far: Duration::default(),
462 error: rpc(400, "BAD_REQUEST", None),
463 };
464 assert!(matches!(policy.should_retry(&ctx), ControlFlow::Break(())));
465 }
466
467 #[test]
470 fn migrate_dc_id_detected() {
471 let e = RpcError {
472 code: 303,
473 name: "PHONE_MIGRATE".into(),
474 value: Some(5),
475 };
476 assert_eq!(e.migrate_dc_id(), Some(5));
477 }
478
479 #[test]
480 fn network_migrate_detected() {
481 let e = RpcError {
482 code: 303,
483 name: "NETWORK_MIGRATE".into(),
484 value: Some(3),
485 };
486 assert_eq!(e.migrate_dc_id(), Some(3));
487 }
488
489 #[test]
490 fn file_migrate_detected() {
491 let e = RpcError {
492 code: 303,
493 name: "FILE_MIGRATE".into(),
494 value: Some(4),
495 };
496 assert_eq!(e.migrate_dc_id(), Some(4));
497 }
498
499 #[test]
500 fn non_migrate_is_none() {
501 let e = RpcError {
502 code: 420,
503 name: "FLOOD_WAIT".into(),
504 value: Some(30),
505 };
506 assert_eq!(e.migrate_dc_id(), None);
507 }
508
509 #[test]
510 fn migrate_falls_back_to_dc2_when_no_value() {
511 let e = RpcError {
512 code: 303,
513 name: "PHONE_MIGRATE".into(),
514 value: None,
515 };
516 assert_eq!(e.migrate_dc_id(), Some(2));
517 }
518
519 #[tokio::test]
522 async fn retry_loop_gives_up_on_no_retries() {
523 let mut rl = RetryLoop::new(Arc::new(NoRetries));
524 let err = rpc(400, "SOMETHING_WRONG", None);
525 let result = rl.advance(err).await;
526 assert!(result.is_err());
527 }
528
529 #[tokio::test]
530 async fn retry_loop_increments_fail_count() {
531 let mut rl = RetryLoop::new(Arc::new(AutoSleep {
532 threshold: Duration::from_secs(60),
533 io_errors_as_flood_of: Some(Duration::from_millis(1)),
534 }));
535 assert!(rl.advance(io_err()).await.is_ok());
536 assert!(rl.advance(io_err()).await.is_err());
537 }
538
539 #[test]
542 fn circuit_breaker_trips_after_threshold() {
543 let cb = CircuitBreaker::new(3, Duration::from_secs(60));
544 let ctx = |n: u32| RetryContext {
545 fail_count: NonZeroU32::new(n).unwrap(),
546 slept_so_far: Duration::default(),
547 error: rpc(500, "INTERNAL", None),
548 };
549 assert!(matches!(cb.should_retry(&ctx(1)), ControlFlow::Continue(_)));
551 assert!(matches!(cb.should_retry(&ctx(2)), ControlFlow::Continue(_)));
552 assert!(matches!(cb.should_retry(&ctx(3)), ControlFlow::Break(())));
554 assert!(matches!(cb.should_retry(&ctx(4)), ControlFlow::Break(())));
556 }
557
558 #[test]
559 fn circuit_breaker_resets_after_cooldown() {
560 let cb = CircuitBreaker::new(2, Duration::from_millis(10));
561 let ctx = |n: u32| RetryContext {
562 fail_count: NonZeroU32::new(n).unwrap(),
563 slept_so_far: Duration::default(),
564 error: rpc(500, "INTERNAL", None),
565 };
566 assert!(matches!(cb.should_retry(&ctx(1)), ControlFlow::Continue(_)));
568 assert!(matches!(cb.should_retry(&ctx(2)), ControlFlow::Break(())));
569 std::thread::sleep(Duration::from_millis(20));
571 assert!(matches!(cb.should_retry(&ctx(1)), ControlFlow::Continue(_)));
573 }
574}