Skip to main content

ferogram_mtsender/
retry.rs

1// Copyright (c) Ankit Chaubey <ankitchaubey.dev@gmail.com>
2//
3// ferogram: async Telegram MTProto client in Rust
4// https://github.com/ankit-chaubey/ferogram
5//
6// Licensed under either the MIT License or the Apache License 2.0.
7// See the LICENSE-MIT or LICENSE-APACHE file in this repository:
8// https://github.com/ankit-chaubey/ferogram
9//
10// Feel free to use, modify, and share this code.
11// Please keep this notice when redistributing.
12
13use std::num::NonZeroU32;
14use std::ops::ControlFlow;
15use std::sync::Arc;
16use std::time::Duration;
17
18use tokio::time::sleep;
19
20use crate::errors::InvocationError;
21
22// RetryPolicy trait
23
24/// Controls how the client reacts when an RPC call fails.
25///
26/// Implement this trait to provide custom flood-wait handling, circuit
27/// breakers, or exponential back-off.
28pub trait RetryPolicy: Send + Sync + 'static {
29    /// Decide whether to retry the failed request.
30    ///
31    /// Return `ControlFlow::Continue(delay)` to sleep `delay` and retry.
32    /// Return `ControlFlow::Break(())` to propagate `ctx.error` to the caller.
33    fn should_retry(&self, ctx: &RetryContext) -> ControlFlow<(), Duration>;
34}
35
36/// Context passed to [`RetryPolicy::should_retry`] on each failure.
37pub struct RetryContext {
38    /// Number of times this request has failed (starts at 1).
39    pub fail_count: NonZeroU32,
40    /// Total time already slept for this request across all prior retries.
41    pub slept_so_far: Duration,
42    /// The most recent error.
43    pub error: InvocationError,
44}
45
46// Built-in policies
47
48/// Never retry: propagate every error immediately.
49pub struct NoRetries;
50
51impl RetryPolicy for NoRetries {
52    fn should_retry(&self, _: &RetryContext) -> ControlFlow<(), Duration> {
53        ControlFlow::Break(())
54    }
55}
56
57/// Automatically sleep on `FLOOD_WAIT` and retry once on transient I/O errors.
58///
59/// Default retry policy. Sleeps on `FLOOD_WAIT`, backs off on I/O errors.
60///
61/// ```rust
62/// # use ferogram_mtsender::AutoSleep;
63/// let policy = AutoSleep {
64/// threshold: std::time::Duration::from_secs(60),
65/// io_errors_as_flood_of: Some(std::time::Duration::from_secs(1)),
66/// };
67/// ```
68pub struct AutoSleep {
69    /// Maximum flood-wait the library will automatically sleep through.
70    ///
71    /// If Telegram asks us to wait longer than this, the error is propagated.
72    pub threshold: Duration,
73
74    /// If `Some(d)`, treat the first I/O error as a `d`-second flood wait
75    /// and retry once.  `None` propagates I/O errors immediately.
76    pub io_errors_as_flood_of: Option<Duration>,
77}
78
79impl Default for AutoSleep {
80    fn default() -> Self {
81        Self {
82            threshold: Duration::from_secs(60),
83            io_errors_as_flood_of: Some(Duration::from_secs(1)),
84        }
85    }
86}
87
88/// Add deterministic ±`max_jitter_secs` jitter to `base`.
89///
90/// Uses a fast integer hash of `seed` (the fail count) so no `rand` crate is
91/// needed. Different bots have different fail counts at any given moment, so
92/// the spread is sufficient to avoid thundering-herd on simultaneous FLOOD_WAITs.
93fn jitter_duration(base: Duration, seed: u32, max_jitter_secs: u64) -> Duration {
94    // Murmur3-inspired finalizer.
95    let h = {
96        let mut v = seed as u64 ^ 0x9e37_79b9_7f4a_7c15;
97        v ^= v >> 30;
98        v = v.wrapping_mul(0xbf58_476d_1ce4_e5b9);
99        v ^= v >> 27;
100        v = v.wrapping_mul(0x94d0_49bb_1331_11eb);
101        v ^= v >> 31;
102        v
103    };
104    // Map into [-max_jitter_secs, +max_jitter_secs] in milliseconds.
105    let range_ms = max_jitter_secs * 1000 * 2 + 1;
106    let jitter_ms = (h % range_ms) as i64 - (max_jitter_secs * 1000) as i64;
107    let base_ms = base.as_millis() as i64;
108    let final_ms = (base_ms + jitter_ms).max(0) as u64;
109    Duration::from_millis(final_ms)
110}
111
112impl RetryPolicy for AutoSleep {
113    fn should_retry(&self, ctx: &RetryContext) -> ControlFlow<(), Duration> {
114        match &ctx.error {
115            // FLOOD_WAIT: sleep as long as Telegram asks, plus ±2 s jitter.
116            // Jitter spreads retries across clients that all hit the same limit
117            // simultaneously (e.g. after a server-side rate-limit window resets).
118            InvocationError::Rpc(rpc) if rpc.code == 420 && rpc.name == "FLOOD_WAIT" => {
119                let secs = rpc.value.unwrap_or(0) as u64;
120                if secs <= self.threshold.as_secs() {
121                    let delay = jitter_duration(Duration::from_secs(secs), ctx.fail_count.get(), 2);
122                    tracing::debug!(
123                        "[ferogram::retry] FLOOD_WAIT_{secs}: sleeping {delay:?} before retrying"
124                    );
125                    ControlFlow::Continue(delay)
126                } else {
127                    ControlFlow::Break(())
128                }
129            }
130
131            // SLOWMODE_WAIT: same semantics as FLOOD_WAIT; very common in
132            // group bots that send messages faster than the channel's slowmode.
133            InvocationError::Rpc(rpc) if rpc.code == 420 && rpc.name == "SLOWMODE_WAIT" => {
134                let secs = rpc.value.unwrap_or(0) as u64;
135                if secs <= self.threshold.as_secs() {
136                    let delay = jitter_duration(Duration::from_secs(secs), ctx.fail_count.get(), 2);
137                    tracing::debug!(
138                        "[ferogram::retry] SLOWMODE_WAIT_{secs}: sleeping {delay:?} before retrying"
139                    );
140                    ControlFlow::Continue(delay)
141                } else {
142                    ControlFlow::Break(())
143                }
144            }
145
146            // Transient I/O errors: back off briefly and retry once.
147            InvocationError::Io(_) if ctx.fail_count.get() <= 1 => {
148                if let Some(d) = self.io_errors_as_flood_of {
149                    tracing::debug!(
150                        "[ferogram::retry] transient I/O error (attempt {}): sleeping {d:?} before retrying",
151                        ctx.fail_count.get()
152                    );
153                    ControlFlow::Continue(d)
154                } else {
155                    ControlFlow::Break(())
156                }
157            }
158
159            _ => ControlFlow::Break(()),
160        }
161    }
162}
163
164// RetryLoop
165
166/// Drives the retry loop for a single RPC call.
167///
168/// Create one per call, then call `advance` after every failure.
169///
170/// ```rust,ignore
171/// let mut rl = RetryLoop::new(Arc::clone(&self.inner.retry_policy));
172/// loop {
173/// match self.do_rpc_call(req).await {
174///     Ok(body) => return Ok(body),
175///     Err(e)   => rl.advance(e).await?,
176/// }
177/// }
178/// ```
179///
180/// `advance` either:
181/// - sleeps the required duration and returns `Ok(())` → caller should retry, or
182/// - returns `Err(e)` → caller should propagate.
183///
184/// This is the single source of truth; previously the same loop was
185/// copy-pasted into `rpc_call_raw`, `rpc_write`, and the reconnect path.
186pub struct RetryLoop {
187    policy: Arc<dyn RetryPolicy>,
188    ctx: RetryContext,
189}
190
191impl RetryLoop {
192    /// Start a fresh retry loop against `policy`, with the failure count
193    /// and accumulated sleep time both reset.
194    pub fn new(policy: Arc<dyn RetryPolicy>) -> Self {
195        Self {
196            policy,
197            ctx: RetryContext {
198                fail_count: NonZeroU32::new(1).expect("1 is nonzero"),
199                slept_so_far: Duration::default(),
200                error: InvocationError::Dropped,
201            },
202        }
203    }
204
205    /// Record a failure and either sleep+return-Ok (retry) or return-Err (give up).
206    ///
207    /// Mutates `self` to track cumulative state across retries.
208    pub async fn advance(&mut self, err: InvocationError) -> Result<(), InvocationError> {
209        self.ctx.error = err;
210        match self.policy.should_retry(&self.ctx) {
211            ControlFlow::Continue(delay) => {
212                sleep(delay).await;
213                self.ctx.slept_so_far += delay;
214                // saturating_add: if somehow we overflow NonZeroU32, clamp at MAX
215                self.ctx.fail_count = self.ctx.fail_count.saturating_add(1);
216                Ok(())
217            }
218            ControlFlow::Break(()) => {
219                // Move the error out so the caller doesn't have to clone it
220                Err(std::mem::replace(
221                    &mut self.ctx.error,
222                    InvocationError::Dropped,
223                ))
224            }
225        }
226    }
227}
228
229// CircuitBreaker
230
231/// Internal state of a [`CircuitBreaker`].
232#[derive(Debug)]
233enum CbState {
234    /// Normal operation: counting consecutive failures.
235    Closed { consecutive_failures: u32 },
236    /// Breaker tripped: all calls rejected until cooldown expires.
237    Open { tripped_at: std::time::Instant },
238}
239
240/// A [`RetryPolicy`] that stops retrying after `threshold` consecutive
241/// failures and stays silent for a `cooldown` window before resetting.
242///
243/// # States
244/// - **Closed** (normal): forwards calls, increments a failure counter on
245///   each error, and applies an exponential back-off up to `threshold − 1`
246///   attempts.  On the `threshold`-th consecutive failure the breaker trips.
247/// - **Open** (tripped): rejects every call immediately (`Break`) for the
248///   duration of `cooldown`.
249/// - **Reset**: once `cooldown` has elapsed the breaker closes again and
250///   the failure counter resets to zero.
251///
252/// Because [`RetryPolicy`] has no success callback the breaker cannot
253/// distinguish a successful probe from a clean run; the counter simply
254/// resets when the cooldown expires.  For a full half-open probe you can
255/// wrap `CircuitBreaker` in a custom `RetryPolicy`.
256///
257/// # Example
258/// ```rust
259/// use ferogram_mtsender::CircuitBreaker;
260/// use std::time::Duration;
261///
262/// // Trip after 5 consecutive errors; stay open for 30 s.
263/// let policy = CircuitBreaker::new(5, Duration::from_secs(30));
264/// ```
265pub struct CircuitBreaker {
266    /// Number of consecutive failures before the breaker trips.
267    threshold: u32,
268    /// How long the breaker stays open before resetting.
269    cooldown: Duration,
270    state: std::sync::Mutex<CbState>,
271}
272
273impl CircuitBreaker {
274    /// Create a new `CircuitBreaker`.
275    ///
276    /// - `threshold`: failures before the breaker trips (minimum 1).
277    /// - `cooldown`: how long the breaker stays open.
278    pub fn new(threshold: u32, cooldown: Duration) -> Self {
279        assert!(
280            threshold >= 1,
281            "CircuitBreaker threshold must be at least 1"
282        );
283        Self {
284            threshold,
285            cooldown,
286            state: std::sync::Mutex::new(CbState::Closed {
287                consecutive_failures: 0,
288            }),
289        }
290    }
291}
292
293impl RetryPolicy for CircuitBreaker {
294    fn should_retry(&self, _ctx: &RetryContext) -> ControlFlow<(), Duration> {
295        let mut state = self.state.lock().expect("lock poisoned");
296        match &*state {
297            CbState::Open { tripped_at } => {
298                if tripped_at.elapsed() >= self.cooldown {
299                    // Cooldown expired: reset to Closed, allow retry with small delay.
300                    *state = CbState::Closed {
301                        consecutive_failures: 1,
302                    };
303                    ControlFlow::Continue(Duration::from_millis(200))
304                } else {
305                    // Still open: reject immediately.
306                    ControlFlow::Break(())
307                }
308            }
309            CbState::Closed {
310                consecutive_failures,
311            } => {
312                let new_count = consecutive_failures + 1;
313                if new_count >= self.threshold {
314                    tracing::warn!(
315                        "[ferogram::retry] circuit breaker tripped after {new_count} consecutive failures; rejecting requests for {:?}",
316                        self.cooldown
317                    );
318                    *state = CbState::Open {
319                        tripped_at: std::time::Instant::now(),
320                    };
321                    ControlFlow::Break(())
322                } else {
323                    // Exponential back-off: 200 ms × 2^(n-1), capped at ~3 s.
324                    let backoff_ms = 200u64 * (1u64 << new_count.saturating_sub(1).min(4));
325                    *state = CbState::Closed {
326                        consecutive_failures: new_count,
327                    };
328                    ControlFlow::Continue(Duration::from_millis(backoff_ms))
329                }
330            }
331        }
332    }
333}
334
335// Tests
336
337#[cfg(test)]
338mod tests {
339    use super::*;
340    use crate::errors::RpcError;
341    use std::io;
342
343    fn flood(secs: u32) -> InvocationError {
344        InvocationError::Rpc(RpcError {
345            code: 420,
346            name: "FLOOD_WAIT".into(),
347            value: Some(secs),
348        })
349    }
350
351    fn io_err() -> InvocationError {
352        InvocationError::Io(io::Error::new(io::ErrorKind::ConnectionReset, "reset"))
353    }
354
355    fn rpc(code: i32, name: &str, value: Option<u32>) -> InvocationError {
356        InvocationError::Rpc(RpcError {
357            code,
358            name: name.into(),
359            value,
360        })
361    }
362
363    // NoRetries
364
365    #[test]
366    fn no_retries_always_breaks() {
367        let policy = NoRetries;
368        let ctx = RetryContext {
369            fail_count: NonZeroU32::new(1).expect("1 is nonzero"),
370            slept_so_far: Duration::default(),
371            error: flood(10),
372        };
373        assert!(matches!(policy.should_retry(&ctx), ControlFlow::Break(())));
374    }
375
376    // AutoSleep
377
378    #[test]
379    fn autosleep_retries_flood_under_threshold() {
380        let policy = AutoSleep::default(); // threshold = 60s
381        let ctx = RetryContext {
382            fail_count: NonZeroU32::new(1).expect("1 is nonzero"),
383            slept_so_far: Duration::default(),
384            error: flood(30),
385        };
386        match policy.should_retry(&ctx) {
387            // Jitter of ±2s is applied; accept 28..=32 s.
388            ControlFlow::Continue(d) => {
389                let secs = d.as_secs_f64();
390                assert!(
391                    secs >= 28.0 && secs <= 32.0,
392                    "expected 28-32s delay (jitter), got {secs:.3}s"
393                );
394            }
395            other => panic!("expected Continue, got {other:?}"),
396        }
397    }
398
399    #[test]
400    fn autosleep_breaks_flood_over_threshold() {
401        let policy = AutoSleep::default(); // threshold = 60s
402        let ctx = RetryContext {
403            fail_count: NonZeroU32::new(1).expect("1 is nonzero"),
404            slept_so_far: Duration::default(),
405            error: flood(120),
406        };
407        assert!(matches!(policy.should_retry(&ctx), ControlFlow::Break(())));
408    }
409
410    #[test]
411    fn autosleep_second_flood_retry_is_honoured() {
412        let policy = AutoSleep::default();
413        let ctx = RetryContext {
414            fail_count: NonZeroU32::new(2).expect("2 is nonzero"),
415            slept_so_far: Duration::from_secs(30),
416            error: flood(30),
417        };
418        match policy.should_retry(&ctx) {
419            // Jitter of ±2s; accept 28..=32 s.
420            ControlFlow::Continue(d) => {
421                let secs = d.as_secs_f64();
422                assert!(
423                    secs >= 28.0 && secs <= 32.0,
424                    "expected 28-32s on second FLOOD_WAIT, got {secs:.3}s"
425                );
426            }
427            other => panic!("expected Continue on second FLOOD_WAIT, got {other:?}"),
428        }
429    }
430
431    #[test]
432    fn autosleep_retries_io_once() {
433        let policy = AutoSleep::default();
434        let ctx = RetryContext {
435            fail_count: NonZeroU32::new(1).expect("1 is nonzero"),
436            slept_so_far: Duration::default(),
437            error: io_err(),
438        };
439        match policy.should_retry(&ctx) {
440            ControlFlow::Continue(d) => assert_eq!(d, Duration::from_secs(1)),
441            other => panic!("expected Continue, got {other:?}"),
442        }
443    }
444
445    #[test]
446    fn autosleep_no_io_retry_after_first() {
447        let policy = AutoSleep::default();
448        let ctx = RetryContext {
449            fail_count: NonZeroU32::new(4).expect("4 is nonzero"),
450            slept_so_far: Duration::from_secs(3),
451            error: io_err(),
452        };
453        assert!(matches!(policy.should_retry(&ctx), ControlFlow::Break(())));
454    }
455
456    #[test]
457    fn autosleep_breaks_other_rpc() {
458        let policy = AutoSleep::default();
459        let ctx = RetryContext {
460            fail_count: NonZeroU32::new(1).expect("1 is nonzero"),
461            slept_so_far: Duration::default(),
462            error: rpc(400, "BAD_REQUEST", None),
463        };
464        assert!(matches!(policy.should_retry(&ctx), ControlFlow::Break(())));
465    }
466
467    // RpcError::migrate_dc_id
468
469    #[test]
470    fn migrate_dc_id_detected() {
471        let e = RpcError {
472            code: 303,
473            name: "PHONE_MIGRATE".into(),
474            value: Some(5),
475        };
476        assert_eq!(e.migrate_dc_id(), Some(5));
477    }
478
479    #[test]
480    fn network_migrate_detected() {
481        let e = RpcError {
482            code: 303,
483            name: "NETWORK_MIGRATE".into(),
484            value: Some(3),
485        };
486        assert_eq!(e.migrate_dc_id(), Some(3));
487    }
488
489    #[test]
490    fn file_migrate_detected() {
491        let e = RpcError {
492            code: 303,
493            name: "FILE_MIGRATE".into(),
494            value: Some(4),
495        };
496        assert_eq!(e.migrate_dc_id(), Some(4));
497    }
498
499    #[test]
500    fn non_migrate_is_none() {
501        let e = RpcError {
502            code: 420,
503            name: "FLOOD_WAIT".into(),
504            value: Some(30),
505        };
506        assert_eq!(e.migrate_dc_id(), None);
507    }
508
509    #[test]
510    fn migrate_falls_back_to_dc2_when_no_value() {
511        let e = RpcError {
512            code: 303,
513            name: "PHONE_MIGRATE".into(),
514            value: None,
515        };
516        assert_eq!(e.migrate_dc_id(), Some(2));
517    }
518
519    // RetryLoop
520
521    #[tokio::test]
522    async fn retry_loop_gives_up_on_no_retries() {
523        let mut rl = RetryLoop::new(Arc::new(NoRetries));
524        let err = rpc(400, "SOMETHING_WRONG", None);
525        let result = rl.advance(err).await;
526        assert!(result.is_err());
527    }
528
529    #[tokio::test]
530    async fn retry_loop_increments_fail_count() {
531        let mut rl = RetryLoop::new(Arc::new(AutoSleep {
532            threshold: Duration::from_secs(60),
533            io_errors_as_flood_of: Some(Duration::from_millis(1)),
534        }));
535        assert!(rl.advance(io_err()).await.is_ok());
536        assert!(rl.advance(io_err()).await.is_err());
537    }
538
539    // CircuitBreaker
540
541    #[test]
542    fn circuit_breaker_trips_after_threshold() {
543        let cb = CircuitBreaker::new(3, Duration::from_secs(60));
544        let ctx = |n: u32| RetryContext {
545            fail_count: NonZeroU32::new(n).unwrap(),
546            slept_so_far: Duration::default(),
547            error: rpc(500, "INTERNAL", None),
548        };
549        // First two failures: Continue (backoff)
550        assert!(matches!(cb.should_retry(&ctx(1)), ControlFlow::Continue(_)));
551        assert!(matches!(cb.should_retry(&ctx(2)), ControlFlow::Continue(_)));
552        // Third: trips the breaker → Break
553        assert!(matches!(cb.should_retry(&ctx(3)), ControlFlow::Break(())));
554        // Subsequent calls while open → Break immediately
555        assert!(matches!(cb.should_retry(&ctx(4)), ControlFlow::Break(())));
556    }
557
558    #[test]
559    fn circuit_breaker_resets_after_cooldown() {
560        let cb = CircuitBreaker::new(2, Duration::from_millis(10));
561        let ctx = |n: u32| RetryContext {
562            fail_count: NonZeroU32::new(n).unwrap(),
563            slept_so_far: Duration::default(),
564            error: rpc(500, "INTERNAL", None),
565        };
566        // Trip the breaker
567        assert!(matches!(cb.should_retry(&ctx(1)), ControlFlow::Continue(_)));
568        assert!(matches!(cb.should_retry(&ctx(2)), ControlFlow::Break(())));
569        // Wait for cooldown
570        std::thread::sleep(Duration::from_millis(20));
571        // After cooldown: breaker resets → Continue again
572        assert!(matches!(cb.should_retry(&ctx(1)), ControlFlow::Continue(_)));
573    }
574}