1use std::num::NonZeroU32;
14use std::ops::ControlFlow;
15use std::sync::Arc;
16use std::time::Duration;
17
18use tokio::time::sleep;
19
20use crate::errors::InvocationError;
21
22pub trait RetryPolicy: Send + Sync + 'static {
29 fn should_retry(&self, ctx: &RetryContext) -> ControlFlow<(), Duration>;
34}
35
36pub struct RetryContext {
38 pub fail_count: NonZeroU32,
40 pub slept_so_far: Duration,
42 pub error: InvocationError,
44}
45
46pub struct NoRetries;
50
51impl RetryPolicy for NoRetries {
52 fn should_retry(&self, _: &RetryContext) -> ControlFlow<(), Duration> {
53 ControlFlow::Break(())
54 }
55}
56
57pub struct AutoSleep {
69 pub threshold: Duration,
73
74 pub io_errors_as_flood_of: Option<Duration>,
77}
78
79impl Default for AutoSleep {
80 fn default() -> Self {
81 Self {
82 threshold: Duration::from_secs(60),
83 io_errors_as_flood_of: Some(Duration::from_secs(1)),
84 }
85 }
86}
87
88fn jitter_duration(base: Duration, seed: u32, max_jitter_secs: u64) -> Duration {
94 let h = {
96 let mut v = seed as u64 ^ 0x9e37_79b9_7f4a_7c15;
97 v ^= v >> 30;
98 v = v.wrapping_mul(0xbf58_476d_1ce4_e5b9);
99 v ^= v >> 27;
100 v = v.wrapping_mul(0x94d0_49bb_1331_11eb);
101 v ^= v >> 31;
102 v
103 };
104 let range_ms = max_jitter_secs * 1000 * 2 + 1;
106 let jitter_ms = (h % range_ms) as i64 - (max_jitter_secs * 1000) as i64;
107 let base_ms = base.as_millis() as i64;
108 let final_ms = (base_ms + jitter_ms).max(0) as u64;
109 Duration::from_millis(final_ms)
110}
111
112impl RetryPolicy for AutoSleep {
113 fn should_retry(&self, ctx: &RetryContext) -> ControlFlow<(), Duration> {
114 match &ctx.error {
115 InvocationError::Rpc(rpc) if rpc.code == 420 && rpc.name == "FLOOD_WAIT" => {
119 let secs = rpc.value.unwrap_or(0) as u64;
120 if secs <= self.threshold.as_secs() {
121 let delay = jitter_duration(Duration::from_secs(secs), ctx.fail_count.get(), 2);
122 tracing::info!("FLOOD_WAIT_{secs}: sleeping {delay:?} before retry");
123 ControlFlow::Continue(delay)
124 } else {
125 ControlFlow::Break(())
126 }
127 }
128
129 InvocationError::Rpc(rpc) if rpc.code == 420 && rpc.name == "SLOWMODE_WAIT" => {
132 let secs = rpc.value.unwrap_or(0) as u64;
133 if secs <= self.threshold.as_secs() {
134 let delay = jitter_duration(Duration::from_secs(secs), ctx.fail_count.get(), 2);
135 tracing::info!("SLOWMODE_WAIT_{secs}: sleeping {delay:?} before retry");
136 ControlFlow::Continue(delay)
137 } else {
138 ControlFlow::Break(())
139 }
140 }
141
142 InvocationError::Io(_) if ctx.fail_count.get() <= 1 => {
144 if let Some(d) = self.io_errors_as_flood_of {
145 tracing::info!(
146 "I/O error (attempt {}): sleeping {d:?} before retry",
147 ctx.fail_count.get()
148 );
149 ControlFlow::Continue(d)
150 } else {
151 ControlFlow::Break(())
152 }
153 }
154
155 _ => ControlFlow::Break(()),
156 }
157 }
158}
159
160pub struct RetryLoop {
183 policy: Arc<dyn RetryPolicy>,
184 ctx: RetryContext,
185}
186
187impl RetryLoop {
188 pub fn new(policy: Arc<dyn RetryPolicy>) -> Self {
189 Self {
190 policy,
191 ctx: RetryContext {
192 fail_count: NonZeroU32::new(1).expect("1 is nonzero"),
193 slept_so_far: Duration::default(),
194 error: InvocationError::Dropped,
195 },
196 }
197 }
198
199 pub async fn advance(&mut self, err: InvocationError) -> Result<(), InvocationError> {
203 self.ctx.error = err;
204 match self.policy.should_retry(&self.ctx) {
205 ControlFlow::Continue(delay) => {
206 sleep(delay).await;
207 self.ctx.slept_so_far += delay;
208 self.ctx.fail_count = self.ctx.fail_count.saturating_add(1);
210 Ok(())
211 }
212 ControlFlow::Break(()) => {
213 Err(std::mem::replace(
215 &mut self.ctx.error,
216 InvocationError::Dropped,
217 ))
218 }
219 }
220 }
221}
222
223#[derive(Debug)]
227enum CbState {
228 Closed { consecutive_failures: u32 },
230 Open { tripped_at: std::time::Instant },
232}
233
234pub struct CircuitBreaker {
260 threshold: u32,
262 cooldown: Duration,
264 state: std::sync::Mutex<CbState>,
265}
266
267impl CircuitBreaker {
268 pub fn new(threshold: u32, cooldown: Duration) -> Self {
273 assert!(
274 threshold >= 1,
275 "CircuitBreaker threshold must be at least 1"
276 );
277 Self {
278 threshold,
279 cooldown,
280 state: std::sync::Mutex::new(CbState::Closed {
281 consecutive_failures: 0,
282 }),
283 }
284 }
285}
286
287impl RetryPolicy for CircuitBreaker {
288 fn should_retry(&self, _ctx: &RetryContext) -> ControlFlow<(), Duration> {
289 let mut state = self.state.lock().expect("lock poisoned");
290 match &*state {
291 CbState::Open { tripped_at } => {
292 if tripped_at.elapsed() >= self.cooldown {
293 *state = CbState::Closed {
295 consecutive_failures: 1,
296 };
297 ControlFlow::Continue(Duration::from_millis(200))
298 } else {
299 ControlFlow::Break(())
301 }
302 }
303 CbState::Closed {
304 consecutive_failures,
305 } => {
306 let new_count = consecutive_failures + 1;
307 if new_count >= self.threshold {
308 tracing::warn!(
309 "[ferogram] CircuitBreaker tripped after {new_count} consecutive failures"
310 );
311 *state = CbState::Open {
312 tripped_at: std::time::Instant::now(),
313 };
314 ControlFlow::Break(())
315 } else {
316 let backoff_ms = 200u64 * (1u64 << new_count.saturating_sub(1).min(4));
318 *state = CbState::Closed {
319 consecutive_failures: new_count,
320 };
321 ControlFlow::Continue(Duration::from_millis(backoff_ms))
322 }
323 }
324 }
325 }
326}
327
328#[cfg(test)]
331mod tests {
332 use super::*;
333 use crate::errors::RpcError;
334 use std::io;
335
336 fn flood(secs: u32) -> InvocationError {
337 InvocationError::Rpc(RpcError {
338 code: 420,
339 name: "FLOOD_WAIT".into(),
340 value: Some(secs),
341 })
342 }
343
344 fn io_err() -> InvocationError {
345 InvocationError::Io(io::Error::new(io::ErrorKind::ConnectionReset, "reset"))
346 }
347
348 fn rpc(code: i32, name: &str, value: Option<u32>) -> InvocationError {
349 InvocationError::Rpc(RpcError {
350 code,
351 name: name.into(),
352 value,
353 })
354 }
355
356 #[test]
359 fn no_retries_always_breaks() {
360 let policy = NoRetries;
361 let ctx = RetryContext {
362 fail_count: NonZeroU32::new(1).expect("1 is nonzero"),
363 slept_so_far: Duration::default(),
364 error: flood(10),
365 };
366 assert!(matches!(policy.should_retry(&ctx), ControlFlow::Break(())));
367 }
368
369 #[test]
372 fn autosleep_retries_flood_under_threshold() {
373 let policy = AutoSleep::default(); let ctx = RetryContext {
375 fail_count: NonZeroU32::new(1).expect("1 is nonzero"),
376 slept_so_far: Duration::default(),
377 error: flood(30),
378 };
379 match policy.should_retry(&ctx) {
380 ControlFlow::Continue(d) => {
382 let secs = d.as_secs_f64();
383 assert!(
384 secs >= 28.0 && secs <= 32.0,
385 "expected 28-32s delay (jitter), got {secs:.3}s"
386 );
387 }
388 other => panic!("expected Continue, got {other:?}"),
389 }
390 }
391
392 #[test]
393 fn autosleep_breaks_flood_over_threshold() {
394 let policy = AutoSleep::default(); let ctx = RetryContext {
396 fail_count: NonZeroU32::new(1).expect("1 is nonzero"),
397 slept_so_far: Duration::default(),
398 error: flood(120),
399 };
400 assert!(matches!(policy.should_retry(&ctx), ControlFlow::Break(())));
401 }
402
403 #[test]
404 fn autosleep_second_flood_retry_is_honoured() {
405 let policy = AutoSleep::default();
406 let ctx = RetryContext {
407 fail_count: NonZeroU32::new(2).expect("2 is nonzero"),
408 slept_so_far: Duration::from_secs(30),
409 error: flood(30),
410 };
411 match policy.should_retry(&ctx) {
412 ControlFlow::Continue(d) => {
414 let secs = d.as_secs_f64();
415 assert!(
416 secs >= 28.0 && secs <= 32.0,
417 "expected 28-32s on second FLOOD_WAIT, got {secs:.3}s"
418 );
419 }
420 other => panic!("expected Continue on second FLOOD_WAIT, got {other:?}"),
421 }
422 }
423
424 #[test]
425 fn autosleep_retries_io_once() {
426 let policy = AutoSleep::default();
427 let ctx = RetryContext {
428 fail_count: NonZeroU32::new(1).expect("1 is nonzero"),
429 slept_so_far: Duration::default(),
430 error: io_err(),
431 };
432 match policy.should_retry(&ctx) {
433 ControlFlow::Continue(d) => assert_eq!(d, Duration::from_secs(1)),
434 other => panic!("expected Continue, got {other:?}"),
435 }
436 }
437
438 #[test]
439 fn autosleep_no_io_retry_after_first() {
440 let policy = AutoSleep::default();
441 let ctx = RetryContext {
442 fail_count: NonZeroU32::new(4).expect("4 is nonzero"),
443 slept_so_far: Duration::from_secs(3),
444 error: io_err(),
445 };
446 assert!(matches!(policy.should_retry(&ctx), ControlFlow::Break(())));
447 }
448
449 #[test]
450 fn autosleep_breaks_other_rpc() {
451 let policy = AutoSleep::default();
452 let ctx = RetryContext {
453 fail_count: NonZeroU32::new(1).expect("1 is nonzero"),
454 slept_so_far: Duration::default(),
455 error: rpc(400, "BAD_REQUEST", None),
456 };
457 assert!(matches!(policy.should_retry(&ctx), ControlFlow::Break(())));
458 }
459
460 #[test]
463 fn migrate_dc_id_detected() {
464 let e = RpcError {
465 code: 303,
466 name: "PHONE_MIGRATE".into(),
467 value: Some(5),
468 };
469 assert_eq!(e.migrate_dc_id(), Some(5));
470 }
471
472 #[test]
473 fn network_migrate_detected() {
474 let e = RpcError {
475 code: 303,
476 name: "NETWORK_MIGRATE".into(),
477 value: Some(3),
478 };
479 assert_eq!(e.migrate_dc_id(), Some(3));
480 }
481
482 #[test]
483 fn file_migrate_detected() {
484 let e = RpcError {
485 code: 303,
486 name: "FILE_MIGRATE".into(),
487 value: Some(4),
488 };
489 assert_eq!(e.migrate_dc_id(), Some(4));
490 }
491
492 #[test]
493 fn non_migrate_is_none() {
494 let e = RpcError {
495 code: 420,
496 name: "FLOOD_WAIT".into(),
497 value: Some(30),
498 };
499 assert_eq!(e.migrate_dc_id(), None);
500 }
501
502 #[test]
503 fn migrate_falls_back_to_dc2_when_no_value() {
504 let e = RpcError {
505 code: 303,
506 name: "PHONE_MIGRATE".into(),
507 value: None,
508 };
509 assert_eq!(e.migrate_dc_id(), Some(2));
510 }
511
512 #[tokio::test]
515 async fn retry_loop_gives_up_on_no_retries() {
516 let mut rl = RetryLoop::new(Arc::new(NoRetries));
517 let err = rpc(400, "SOMETHING_WRONG", None);
518 let result = rl.advance(err).await;
519 assert!(result.is_err());
520 }
521
522 #[tokio::test]
523 async fn retry_loop_increments_fail_count() {
524 let mut rl = RetryLoop::new(Arc::new(AutoSleep {
525 threshold: Duration::from_secs(60),
526 io_errors_as_flood_of: Some(Duration::from_millis(1)),
527 }));
528 assert!(rl.advance(io_err()).await.is_ok());
529 assert!(rl.advance(io_err()).await.is_err());
530 }
531
532 #[test]
535 fn circuit_breaker_trips_after_threshold() {
536 let cb = CircuitBreaker::new(3, Duration::from_secs(60));
537 let ctx = |n: u32| RetryContext {
538 fail_count: NonZeroU32::new(n).unwrap(),
539 slept_so_far: Duration::default(),
540 error: rpc(500, "INTERNAL", None),
541 };
542 assert!(matches!(cb.should_retry(&ctx(1)), ControlFlow::Continue(_)));
544 assert!(matches!(cb.should_retry(&ctx(2)), ControlFlow::Continue(_)));
545 assert!(matches!(cb.should_retry(&ctx(3)), ControlFlow::Break(())));
547 assert!(matches!(cb.should_retry(&ctx(4)), ControlFlow::Break(())));
549 }
550
551 #[test]
552 fn circuit_breaker_resets_after_cooldown() {
553 let cb = CircuitBreaker::new(2, Duration::from_millis(10));
554 let ctx = |n: u32| RetryContext {
555 fail_count: NonZeroU32::new(n).unwrap(),
556 slept_so_far: Duration::default(),
557 error: rpc(500, "INTERNAL", None),
558 };
559 assert!(matches!(cb.should_retry(&ctx(1)), ControlFlow::Continue(_)));
561 assert!(matches!(cb.should_retry(&ctx(2)), ControlFlow::Break(())));
562 std::thread::sleep(Duration::from_millis(20));
564 assert!(matches!(cb.should_retry(&ctx(1)), ControlFlow::Continue(_)));
566 }
567}