Skip to main content

ferogram_mtsender/
retry.rs

1// Copyright (c) Ankit Chaubey <ankitchaubey.dev@gmail.com>
2//
3// ferogram: async Telegram MTProto client in Rust
4// https://github.com/ankit-chaubey/ferogram
5//
6// Licensed under either the MIT License or the Apache License 2.0.
7// See the LICENSE-MIT or LICENSE-APACHE file in this repository:
8// https://github.com/ankit-chaubey/ferogram
9//
10// Feel free to use, modify, and share this code.
11// Please keep this notice when redistributing.
12
13use std::num::NonZeroU32;
14use std::ops::ControlFlow;
15use std::sync::Arc;
16use std::time::Duration;
17
18use tokio::time::sleep;
19
20use crate::errors::InvocationError;
21
22// RetryPolicy trait
23
24/// Controls how the client reacts when an RPC call fails.
25///
26/// Implement this trait to provide custom flood-wait handling, circuit
27/// breakers, or exponential back-off.
28pub trait RetryPolicy: Send + Sync + 'static {
29    /// Decide whether to retry the failed request.
30    ///
31    /// Return `ControlFlow::Continue(delay)` to sleep `delay` and retry.
32    /// Return `ControlFlow::Break(())` to propagate `ctx.error` to the caller.
33    fn should_retry(&self, ctx: &RetryContext) -> ControlFlow<(), Duration>;
34}
35
36/// Context passed to [`RetryPolicy::should_retry`] on each failure.
37pub struct RetryContext {
38    /// Number of times this request has failed (starts at 1).
39    pub fail_count: NonZeroU32,
40    /// Total time already slept for this request across all prior retries.
41    pub slept_so_far: Duration,
42    /// The most recent error.
43    pub error: InvocationError,
44}
45
46// Built-in policies
47
48/// Never retry: propagate every error immediately.
49pub struct NoRetries;
50
51impl RetryPolicy for NoRetries {
52    fn should_retry(&self, _: &RetryContext) -> ControlFlow<(), Duration> {
53        ControlFlow::Break(())
54    }
55}
56
57/// Automatically sleep on `FLOOD_WAIT` and retry once on transient I/O errors.
58///
59/// Default retry policy. Sleeps on `FLOOD_WAIT`, backs off on I/O errors.
60///
61/// ```rust
62/// # use ferogram_mtsender::AutoSleep;
63/// let policy = AutoSleep {
64/// threshold: std::time::Duration::from_secs(60),
65/// io_errors_as_flood_of: Some(std::time::Duration::from_secs(1)),
66/// };
67/// ```
68pub struct AutoSleep {
69    /// Maximum flood-wait the library will automatically sleep through.
70    ///
71    /// If Telegram asks us to wait longer than this, the error is propagated.
72    pub threshold: Duration,
73
74    /// If `Some(d)`, treat the first I/O error as a `d`-second flood wait
75    /// and retry once.  `None` propagates I/O errors immediately.
76    pub io_errors_as_flood_of: Option<Duration>,
77}
78
79impl Default for AutoSleep {
80    fn default() -> Self {
81        Self {
82            threshold: Duration::from_secs(60),
83            io_errors_as_flood_of: Some(Duration::from_secs(1)),
84        }
85    }
86}
87
88/// Add deterministic ±`max_jitter_secs` jitter to `base`.
89///
90/// Uses a fast integer hash of `seed` (the fail count) so no `rand` crate is
91/// needed. Different bots have different fail counts at any given moment, so
92/// the spread is sufficient to avoid thundering-herd on simultaneous FLOOD_WAITs.
93fn jitter_duration(base: Duration, seed: u32, max_jitter_secs: u64) -> Duration {
94    // Murmur3-inspired finalizer.
95    let h = {
96        let mut v = seed as u64 ^ 0x9e37_79b9_7f4a_7c15;
97        v ^= v >> 30;
98        v = v.wrapping_mul(0xbf58_476d_1ce4_e5b9);
99        v ^= v >> 27;
100        v = v.wrapping_mul(0x94d0_49bb_1331_11eb);
101        v ^= v >> 31;
102        v
103    };
104    // Map into [-max_jitter_secs, +max_jitter_secs] in milliseconds.
105    let range_ms = max_jitter_secs * 1000 * 2 + 1;
106    let jitter_ms = (h % range_ms) as i64 - (max_jitter_secs * 1000) as i64;
107    let base_ms = base.as_millis() as i64;
108    let final_ms = (base_ms + jitter_ms).max(0) as u64;
109    Duration::from_millis(final_ms)
110}
111
112impl RetryPolicy for AutoSleep {
113    fn should_retry(&self, ctx: &RetryContext) -> ControlFlow<(), Duration> {
114        match &ctx.error {
115            // FLOOD_WAIT: sleep as long as Telegram asks, plus ±2 s jitter.
116            // Jitter spreads retries across clients that all hit the same limit
117            // simultaneously (e.g. after a server-side rate-limit window resets).
118            InvocationError::Rpc(rpc) if rpc.code == 420 && rpc.name == "FLOOD_WAIT" => {
119                let secs = rpc.value.unwrap_or(0) as u64;
120                if secs <= self.threshold.as_secs() {
121                    let delay = jitter_duration(Duration::from_secs(secs), ctx.fail_count.get(), 2);
122                    tracing::debug!(
123                        "[ferogram::retry] FLOOD_WAIT_{secs}: sleeping {delay:?} before retrying"
124                    );
125                    ControlFlow::Continue(delay)
126                } else {
127                    ControlFlow::Break(())
128                }
129            }
130
131            // SLOWMODE_WAIT: same semantics as FLOOD_WAIT; very common in
132            // group bots that send messages faster than the channel's slowmode.
133            InvocationError::Rpc(rpc) if rpc.code == 420 && rpc.name == "SLOWMODE_WAIT" => {
134                let secs = rpc.value.unwrap_or(0) as u64;
135                if secs <= self.threshold.as_secs() {
136                    let delay = jitter_duration(Duration::from_secs(secs), ctx.fail_count.get(), 2);
137                    tracing::debug!(
138                        "[ferogram::retry] SLOWMODE_WAIT_{secs}: sleeping {delay:?} before retrying"
139                    );
140                    ControlFlow::Continue(delay)
141                } else {
142                    ControlFlow::Break(())
143                }
144            }
145
146            // Transient I/O errors: back off briefly and retry once.
147            InvocationError::Io(_) if ctx.fail_count.get() <= 1 => {
148                if let Some(d) = self.io_errors_as_flood_of {
149                    tracing::debug!(
150                        "[ferogram::retry] transient I/O error (attempt {}): sleeping {d:?} before retrying",
151                        ctx.fail_count.get()
152                    );
153                    ControlFlow::Continue(d)
154                } else {
155                    ControlFlow::Break(())
156                }
157            }
158
159            _ => ControlFlow::Break(()),
160        }
161    }
162}
163
164// RetryLoop
165
166/// Drives the retry loop for a single RPC call.
167///
168/// Create one per call, then call `advance` after every failure.
169///
170/// ```rust,ignore
171/// let mut rl = RetryLoop::new(Arc::clone(&self.inner.retry_policy));
172/// loop {
173/// match self.do_rpc_call(req).await {
174///     Ok(body) => return Ok(body),
175///     Err(e)   => rl.advance(e).await?,
176/// }
177/// }
178/// ```
179///
180/// `advance` either:
181/// - sleeps the required duration and returns `Ok(())` → caller should retry, or
182/// - returns `Err(e)` → caller should propagate.
183///
184/// This is the single source of truth; previously the same loop was
185/// copy-pasted into `rpc_call_raw`, `rpc_write`, and the reconnect path.
186pub struct RetryLoop {
187    policy: Arc<dyn RetryPolicy>,
188    ctx: RetryContext,
189}
190
191impl RetryLoop {
192    pub fn new(policy: Arc<dyn RetryPolicy>) -> Self {
193        Self {
194            policy,
195            ctx: RetryContext {
196                fail_count: NonZeroU32::new(1).expect("1 is nonzero"),
197                slept_so_far: Duration::default(),
198                error: InvocationError::Dropped,
199            },
200        }
201    }
202
203    /// Record a failure and either sleep+return-Ok (retry) or return-Err (give up).
204    ///
205    /// Mutates `self` to track cumulative state across retries.
206    pub async fn advance(&mut self, err: InvocationError) -> Result<(), InvocationError> {
207        self.ctx.error = err;
208        match self.policy.should_retry(&self.ctx) {
209            ControlFlow::Continue(delay) => {
210                sleep(delay).await;
211                self.ctx.slept_so_far += delay;
212                // saturating_add: if somehow we overflow NonZeroU32, clamp at MAX
213                self.ctx.fail_count = self.ctx.fail_count.saturating_add(1);
214                Ok(())
215            }
216            ControlFlow::Break(()) => {
217                // Move the error out so the caller doesn't have to clone it
218                Err(std::mem::replace(
219                    &mut self.ctx.error,
220                    InvocationError::Dropped,
221                ))
222            }
223        }
224    }
225}
226
227// CircuitBreaker
228
229/// Internal state of a [`CircuitBreaker`].
230#[derive(Debug)]
231enum CbState {
232    /// Normal operation: counting consecutive failures.
233    Closed { consecutive_failures: u32 },
234    /// Breaker tripped: all calls rejected until cooldown expires.
235    Open { tripped_at: std::time::Instant },
236}
237
238/// A [`RetryPolicy`] that stops retrying after `threshold` consecutive
239/// failures and stays silent for a `cooldown` window before resetting.
240///
241/// # States
242/// - **Closed** (normal): forwards calls, increments a failure counter on
243///   each error, and applies an exponential back-off up to `threshold − 1`
244///   attempts.  On the `threshold`-th consecutive failure the breaker trips.
245/// - **Open** (tripped): rejects every call immediately (`Break`) for the
246///   duration of `cooldown`.
247/// - **Reset**: once `cooldown` has elapsed the breaker closes again and
248///   the failure counter resets to zero.
249///
250/// Because [`RetryPolicy`] has no success callback the breaker cannot
251/// distinguish a successful probe from a clean run; the counter simply
252/// resets when the cooldown expires.  For a full half-open probe you can
253/// wrap `CircuitBreaker` in a custom `RetryPolicy`.
254///
255/// # Example
256/// ```rust
257/// use ferogram_mtsender::CircuitBreaker;
258/// use std::time::Duration;
259///
260/// // Trip after 5 consecutive errors; stay open for 30 s.
261/// let policy = CircuitBreaker::new(5, Duration::from_secs(30));
262/// ```
263pub struct CircuitBreaker {
264    /// Number of consecutive failures before the breaker trips.
265    threshold: u32,
266    /// How long the breaker stays open before resetting.
267    cooldown: Duration,
268    state: std::sync::Mutex<CbState>,
269}
270
271impl CircuitBreaker {
272    /// Create a new `CircuitBreaker`.
273    ///
274    /// - `threshold`: failures before the breaker trips (minimum 1).
275    /// - `cooldown`: how long the breaker stays open.
276    pub fn new(threshold: u32, cooldown: Duration) -> Self {
277        assert!(
278            threshold >= 1,
279            "CircuitBreaker threshold must be at least 1"
280        );
281        Self {
282            threshold,
283            cooldown,
284            state: std::sync::Mutex::new(CbState::Closed {
285                consecutive_failures: 0,
286            }),
287        }
288    }
289}
290
291impl RetryPolicy for CircuitBreaker {
292    fn should_retry(&self, _ctx: &RetryContext) -> ControlFlow<(), Duration> {
293        let mut state = self.state.lock().expect("lock poisoned");
294        match &*state {
295            CbState::Open { tripped_at } => {
296                if tripped_at.elapsed() >= self.cooldown {
297                    // Cooldown expired: reset to Closed, allow retry with small delay.
298                    *state = CbState::Closed {
299                        consecutive_failures: 1,
300                    };
301                    ControlFlow::Continue(Duration::from_millis(200))
302                } else {
303                    // Still open: reject immediately.
304                    ControlFlow::Break(())
305                }
306            }
307            CbState::Closed {
308                consecutive_failures,
309            } => {
310                let new_count = consecutive_failures + 1;
311                if new_count >= self.threshold {
312                    tracing::warn!(
313                        "[ferogram::retry] circuit breaker tripped after {new_count} consecutive failures; rejecting requests for {:?}",
314                        self.cooldown
315                    );
316                    *state = CbState::Open {
317                        tripped_at: std::time::Instant::now(),
318                    };
319                    ControlFlow::Break(())
320                } else {
321                    // Exponential back-off: 200 ms × 2^(n-1), capped at ~3 s.
322                    let backoff_ms = 200u64 * (1u64 << new_count.saturating_sub(1).min(4));
323                    *state = CbState::Closed {
324                        consecutive_failures: new_count,
325                    };
326                    ControlFlow::Continue(Duration::from_millis(backoff_ms))
327                }
328            }
329        }
330    }
331}
332
333// Tests
334
335#[cfg(test)]
336mod tests {
337    use super::*;
338    use crate::errors::RpcError;
339    use std::io;
340
341    fn flood(secs: u32) -> InvocationError {
342        InvocationError::Rpc(RpcError {
343            code: 420,
344            name: "FLOOD_WAIT".into(),
345            value: Some(secs),
346        })
347    }
348
349    fn io_err() -> InvocationError {
350        InvocationError::Io(io::Error::new(io::ErrorKind::ConnectionReset, "reset"))
351    }
352
353    fn rpc(code: i32, name: &str, value: Option<u32>) -> InvocationError {
354        InvocationError::Rpc(RpcError {
355            code,
356            name: name.into(),
357            value,
358        })
359    }
360
361    // NoRetries
362
363    #[test]
364    fn no_retries_always_breaks() {
365        let policy = NoRetries;
366        let ctx = RetryContext {
367            fail_count: NonZeroU32::new(1).expect("1 is nonzero"),
368            slept_so_far: Duration::default(),
369            error: flood(10),
370        };
371        assert!(matches!(policy.should_retry(&ctx), ControlFlow::Break(())));
372    }
373
374    // AutoSleep
375
376    #[test]
377    fn autosleep_retries_flood_under_threshold() {
378        let policy = AutoSleep::default(); // threshold = 60s
379        let ctx = RetryContext {
380            fail_count: NonZeroU32::new(1).expect("1 is nonzero"),
381            slept_so_far: Duration::default(),
382            error: flood(30),
383        };
384        match policy.should_retry(&ctx) {
385            // Jitter of ±2s is applied; accept 28..=32 s.
386            ControlFlow::Continue(d) => {
387                let secs = d.as_secs_f64();
388                assert!(
389                    secs >= 28.0 && secs <= 32.0,
390                    "expected 28-32s delay (jitter), got {secs:.3}s"
391                );
392            }
393            other => panic!("expected Continue, got {other:?}"),
394        }
395    }
396
397    #[test]
398    fn autosleep_breaks_flood_over_threshold() {
399        let policy = AutoSleep::default(); // threshold = 60s
400        let ctx = RetryContext {
401            fail_count: NonZeroU32::new(1).expect("1 is nonzero"),
402            slept_so_far: Duration::default(),
403            error: flood(120),
404        };
405        assert!(matches!(policy.should_retry(&ctx), ControlFlow::Break(())));
406    }
407
408    #[test]
409    fn autosleep_second_flood_retry_is_honoured() {
410        let policy = AutoSleep::default();
411        let ctx = RetryContext {
412            fail_count: NonZeroU32::new(2).expect("2 is nonzero"),
413            slept_so_far: Duration::from_secs(30),
414            error: flood(30),
415        };
416        match policy.should_retry(&ctx) {
417            // Jitter of ±2s; accept 28..=32 s.
418            ControlFlow::Continue(d) => {
419                let secs = d.as_secs_f64();
420                assert!(
421                    secs >= 28.0 && secs <= 32.0,
422                    "expected 28-32s on second FLOOD_WAIT, got {secs:.3}s"
423                );
424            }
425            other => panic!("expected Continue on second FLOOD_WAIT, got {other:?}"),
426        }
427    }
428
429    #[test]
430    fn autosleep_retries_io_once() {
431        let policy = AutoSleep::default();
432        let ctx = RetryContext {
433            fail_count: NonZeroU32::new(1).expect("1 is nonzero"),
434            slept_so_far: Duration::default(),
435            error: io_err(),
436        };
437        match policy.should_retry(&ctx) {
438            ControlFlow::Continue(d) => assert_eq!(d, Duration::from_secs(1)),
439            other => panic!("expected Continue, got {other:?}"),
440        }
441    }
442
443    #[test]
444    fn autosleep_no_io_retry_after_first() {
445        let policy = AutoSleep::default();
446        let ctx = RetryContext {
447            fail_count: NonZeroU32::new(4).expect("4 is nonzero"),
448            slept_so_far: Duration::from_secs(3),
449            error: io_err(),
450        };
451        assert!(matches!(policy.should_retry(&ctx), ControlFlow::Break(())));
452    }
453
454    #[test]
455    fn autosleep_breaks_other_rpc() {
456        let policy = AutoSleep::default();
457        let ctx = RetryContext {
458            fail_count: NonZeroU32::new(1).expect("1 is nonzero"),
459            slept_so_far: Duration::default(),
460            error: rpc(400, "BAD_REQUEST", None),
461        };
462        assert!(matches!(policy.should_retry(&ctx), ControlFlow::Break(())));
463    }
464
465    // RpcError::migrate_dc_id
466
467    #[test]
468    fn migrate_dc_id_detected() {
469        let e = RpcError {
470            code: 303,
471            name: "PHONE_MIGRATE".into(),
472            value: Some(5),
473        };
474        assert_eq!(e.migrate_dc_id(), Some(5));
475    }
476
477    #[test]
478    fn network_migrate_detected() {
479        let e = RpcError {
480            code: 303,
481            name: "NETWORK_MIGRATE".into(),
482            value: Some(3),
483        };
484        assert_eq!(e.migrate_dc_id(), Some(3));
485    }
486
487    #[test]
488    fn file_migrate_detected() {
489        let e = RpcError {
490            code: 303,
491            name: "FILE_MIGRATE".into(),
492            value: Some(4),
493        };
494        assert_eq!(e.migrate_dc_id(), Some(4));
495    }
496
497    #[test]
498    fn non_migrate_is_none() {
499        let e = RpcError {
500            code: 420,
501            name: "FLOOD_WAIT".into(),
502            value: Some(30),
503        };
504        assert_eq!(e.migrate_dc_id(), None);
505    }
506
507    #[test]
508    fn migrate_falls_back_to_dc2_when_no_value() {
509        let e = RpcError {
510            code: 303,
511            name: "PHONE_MIGRATE".into(),
512            value: None,
513        };
514        assert_eq!(e.migrate_dc_id(), Some(2));
515    }
516
517    // RetryLoop
518
519    #[tokio::test]
520    async fn retry_loop_gives_up_on_no_retries() {
521        let mut rl = RetryLoop::new(Arc::new(NoRetries));
522        let err = rpc(400, "SOMETHING_WRONG", None);
523        let result = rl.advance(err).await;
524        assert!(result.is_err());
525    }
526
527    #[tokio::test]
528    async fn retry_loop_increments_fail_count() {
529        let mut rl = RetryLoop::new(Arc::new(AutoSleep {
530            threshold: Duration::from_secs(60),
531            io_errors_as_flood_of: Some(Duration::from_millis(1)),
532        }));
533        assert!(rl.advance(io_err()).await.is_ok());
534        assert!(rl.advance(io_err()).await.is_err());
535    }
536
537    // CircuitBreaker
538
539    #[test]
540    fn circuit_breaker_trips_after_threshold() {
541        let cb = CircuitBreaker::new(3, Duration::from_secs(60));
542        let ctx = |n: u32| RetryContext {
543            fail_count: NonZeroU32::new(n).unwrap(),
544            slept_so_far: Duration::default(),
545            error: rpc(500, "INTERNAL", None),
546        };
547        // First two failures: Continue (backoff)
548        assert!(matches!(cb.should_retry(&ctx(1)), ControlFlow::Continue(_)));
549        assert!(matches!(cb.should_retry(&ctx(2)), ControlFlow::Continue(_)));
550        // Third: trips the breaker → Break
551        assert!(matches!(cb.should_retry(&ctx(3)), ControlFlow::Break(())));
552        // Subsequent calls while open → Break immediately
553        assert!(matches!(cb.should_retry(&ctx(4)), ControlFlow::Break(())));
554    }
555
556    #[test]
557    fn circuit_breaker_resets_after_cooldown() {
558        let cb = CircuitBreaker::new(2, Duration::from_millis(10));
559        let ctx = |n: u32| RetryContext {
560            fail_count: NonZeroU32::new(n).unwrap(),
561            slept_so_far: Duration::default(),
562            error: rpc(500, "INTERNAL", None),
563        };
564        // Trip the breaker
565        assert!(matches!(cb.should_retry(&ctx(1)), ControlFlow::Continue(_)));
566        assert!(matches!(cb.should_retry(&ctx(2)), ControlFlow::Break(())));
567        // Wait for cooldown
568        std::thread::sleep(Duration::from_millis(20));
569        // After cooldown: breaker resets → Continue again
570        assert!(matches!(cb.should_retry(&ctx(1)), ControlFlow::Continue(_)));
571    }
572}