Skip to main content

ferogram/
retry.rs

1// Copyright (c) Ankit Chaubey <ankitchaubey.dev@gmail.com>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3//
4// ferogram: async Telegram MTProto client in Rust
5// https://github.com/ankit-chaubey/ferogram
6//
7// Based on layer: https://github.com/ankit-chaubey/layer
8// Follows official Telegram client behaviour (tdesktop, TDLib).
9//
10// If you use or modify this code, keep this notice at the top of your file
11// and include the LICENSE-MIT or LICENSE-APACHE file from this repository:
12// https://github.com/ankit-chaubey/ferogram
13
14//! Retry policies for handling `FLOOD_WAIT`, transient I/O errors, and DC-migration redirects.
15
16use std::num::NonZeroU32;
17use std::ops::ControlFlow;
18use std::sync::Arc;
19use std::time::Duration;
20
21use tokio::time::sleep;
22
23use crate::errors::InvocationError;
24
25/// Extension methods on [`crate::errors::RpcError`] for routing decisions.
26impl crate::errors::RpcError {
27    /// If this is a DC-migration redirect (code 303), returns the target DC id.
28    ///
29    /// Telegram sends these for:
30    /// - `PHONE_MIGRATE_X`  : user's home DC during auth
31    /// - `NETWORK_MIGRATE_X`: general redirect
32    /// - `FILE_MIGRATE_X`   : file download/upload DC
33    /// - `USER_MIGRATE_X`   : account migration
34    ///
35    /// All have `code == 303` and a numeric suffix that is the DC id.
36    pub fn migrate_dc_id(&self) -> Option<i32> {
37        if self.code != 303 {
38            return None;
39        }
40        //  pattern: any *_MIGRATE_* name with a numeric value
41        let is_migrate = self.name == "PHONE_MIGRATE"
42            || self.name == "NETWORK_MIGRATE"
43            || self.name == "FILE_MIGRATE"
44            || self.name == "USER_MIGRATE"
45            || self.name.ends_with("_MIGRATE");
46        if is_migrate {
47            // value is the DC id; fall back to DC 2 (Amsterdam) if missing
48            Some(self.value.unwrap_or(2) as i32)
49        } else {
50            None
51        }
52    }
53}
54
55/// Extension on [`InvocationError`] for migrate detection.
56impl InvocationError {
57    /// If this error is a DC-migration redirect, returns the target DC id.
58    pub fn migrate_dc_id(&self) -> Option<i32> {
59        match self {
60            Self::Rpc(r) => r.migrate_dc_id(),
61            _ => None,
62        }
63    }
64}
65
66// RetryPolicy trait
67
68/// Controls how the client reacts when an RPC call fails.
69///
70/// Implement this trait to provide custom flood-wait handling, circuit
71/// breakers, or exponential back-off.
72pub trait RetryPolicy: Send + Sync + 'static {
73    /// Decide whether to retry the failed request.
74    ///
75    /// Return `ControlFlow::Continue(delay)` to sleep `delay` and retry.
76    /// Return `ControlFlow::Break(())` to propagate `ctx.error` to the caller.
77    fn should_retry(&self, ctx: &RetryContext) -> ControlFlow<(), Duration>;
78}
79
80/// Context passed to [`RetryPolicy::should_retry`] on each failure.
81pub struct RetryContext {
82    /// Number of times this request has failed (starts at 1).
83    pub fail_count: NonZeroU32,
84    /// Total time already slept for this request across all prior retries.
85    pub slept_so_far: Duration,
86    /// The most recent error.
87    pub error: InvocationError,
88}
89
90// Built-in policies
91
92/// Never retry: propagate every error immediately.
93pub struct NoRetries;
94
95impl RetryPolicy for NoRetries {
96    fn should_retry(&self, _: &RetryContext) -> ControlFlow<(), Duration> {
97        ControlFlow::Break(())
98    }
99}
100
101/// Automatically sleep on `FLOOD_WAIT` and retry once on transient I/O errors.
102///
103/// Default retry policy. Sleeps on `FLOOD_WAIT`, backs off on I/O errors.
104///
105/// ```rust
106/// # use ferogram::retry::AutoSleep;
107/// let policy = AutoSleep {
108/// threshold: std::time::Duration::from_secs(60),
109/// io_errors_as_flood_of: Some(std::time::Duration::from_secs(1)),
110/// };
111/// ```
112pub struct AutoSleep {
113    /// Maximum flood-wait the library will automatically sleep through.
114    ///
115    /// If Telegram asks us to wait longer than this, the error is propagated.
116    pub threshold: Duration,
117
118    /// If `Some(d)`, treat the first I/O error as a `d`-second flood wait
119    /// and retry once.  `None` propagates I/O errors immediately.
120    pub io_errors_as_flood_of: Option<Duration>,
121}
122
123impl Default for AutoSleep {
124    fn default() -> Self {
125        Self {
126            threshold: Duration::from_secs(60),
127            io_errors_as_flood_of: Some(Duration::from_secs(1)),
128        }
129    }
130}
131
132impl RetryPolicy for AutoSleep {
133    fn should_retry(&self, ctx: &RetryContext) -> ControlFlow<(), Duration> {
134        match &ctx.error {
135            // FLOOD_WAIT: sleep exactly as long as Telegram asks, for every
136            // occurrence up to threshold. Removing the fail_count==1 guard
137            // means a second consecutive FLOOD_WAIT is also honoured rather
138            // than propagated immediately.
139            InvocationError::Rpc(rpc) if rpc.code == 420 && rpc.name == "FLOOD_WAIT" => {
140                let secs = rpc.value.unwrap_or(0) as u64;
141                if secs <= self.threshold.as_secs() {
142                    tracing::info!("FLOOD_WAIT_{secs}: sleeping before retry");
143                    ControlFlow::Continue(Duration::from_secs(secs))
144                } else {
145                    ControlFlow::Break(())
146                }
147            }
148
149            // SLOWMODE_WAIT: same semantics as FLOOD_WAIT; very common in
150            // group bots that send messages faster than the channel's slowmode.
151            InvocationError::Rpc(rpc) if rpc.code == 420 && rpc.name == "SLOWMODE_WAIT" => {
152                let secs = rpc.value.unwrap_or(0) as u64;
153                if secs <= self.threshold.as_secs() {
154                    tracing::info!("SLOWMODE_WAIT_{secs}: sleeping before retry");
155                    ControlFlow::Continue(Duration::from_secs(secs))
156                } else {
157                    ControlFlow::Break(())
158                }
159            }
160
161            // Transient I/O errors: back off briefly and retry once.
162            InvocationError::Io(_) if ctx.fail_count.get() == 1 => {
163                if let Some(d) = self.io_errors_as_flood_of {
164                    tracing::info!("I/O error: sleeping {d:?} before retry");
165                    ControlFlow::Continue(d)
166                } else {
167                    ControlFlow::Break(())
168                }
169            }
170
171            _ => ControlFlow::Break(()),
172        }
173    }
174}
175
176// RetryLoop
177
178/// Drives the retry loop for a single RPC call.
179///
180/// Create one per call, then call `advance` after every failure.
181///
182/// ```rust,ignore
183/// let mut rl = RetryLoop::new(Arc::clone(&self.inner.retry_policy));
184/// loop {
185/// match self.do_rpc_call(req).await {
186///     Ok(body) => return Ok(body),
187///     Err(e)   => rl.advance(e).await?,
188/// }
189/// }
190/// ```
191///
192/// `advance` either:
193/// - sleeps the required duration and returns `Ok(())` → caller should retry, or
194/// - returns `Err(e)` → caller should propagate.
195///
196/// This is the single source of truth; previously the same loop was
197/// copy-pasted into `rpc_call_raw`, `rpc_write`, and the reconnect path.
198pub struct RetryLoop {
199    policy: Arc<dyn RetryPolicy>,
200    ctx: RetryContext,
201}
202
203impl RetryLoop {
204    pub fn new(policy: Arc<dyn RetryPolicy>) -> Self {
205        Self {
206            policy,
207            ctx: RetryContext {
208                fail_count: NonZeroU32::new(1).unwrap(),
209                slept_so_far: Duration::default(),
210                error: InvocationError::Dropped,
211            },
212        }
213    }
214
215    /// Record a failure and either sleep+return-Ok (retry) or return-Err (give up).
216    ///
217    /// Mutates `self` to track cumulative state across retries.
218    pub async fn advance(&mut self, err: InvocationError) -> Result<(), InvocationError> {
219        self.ctx.error = err;
220        match self.policy.should_retry(&self.ctx) {
221            ControlFlow::Continue(delay) => {
222                sleep(delay).await;
223                self.ctx.slept_so_far += delay;
224                // saturating_add: if somehow we overflow NonZeroU32, clamp at MAX
225                self.ctx.fail_count = self.ctx.fail_count.saturating_add(1);
226                Ok(())
227            }
228            ControlFlow::Break(()) => {
229                // Move the error out so the caller doesn't have to clone it
230                Err(std::mem::replace(
231                    &mut self.ctx.error,
232                    InvocationError::Dropped,
233                ))
234            }
235        }
236    }
237}
238
239// Tests
240
241#[cfg(test)]
242mod tests {
243    use super::*;
244    use std::io;
245
246    fn flood(secs: u32) -> InvocationError {
247        InvocationError::Rpc(crate::errors::RpcError {
248            code: 420,
249            name: "FLOOD_WAIT".into(),
250            value: Some(secs),
251        })
252    }
253
254    fn io_err() -> InvocationError {
255        InvocationError::Io(io::Error::new(io::ErrorKind::ConnectionReset, "reset"))
256    }
257
258    fn rpc(code: i32, name: &str, value: Option<u32>) -> InvocationError {
259        InvocationError::Rpc(crate::errors::RpcError {
260            code,
261            name: name.into(),
262            value,
263        })
264    }
265
266    // NoRetries
267
268    #[test]
269    fn no_retries_always_breaks() {
270        let policy = NoRetries;
271        let ctx = RetryContext {
272            fail_count: NonZeroU32::new(1).unwrap(),
273            slept_so_far: Duration::default(),
274            error: flood(10),
275        };
276        assert!(matches!(policy.should_retry(&ctx), ControlFlow::Break(())));
277    }
278
279    // AutoSleep
280
281    #[test]
282    fn autosleep_retries_flood_under_threshold() {
283        let policy = AutoSleep::default(); // threshold = 60s
284        let ctx = RetryContext {
285            fail_count: NonZeroU32::new(1).unwrap(),
286            slept_so_far: Duration::default(),
287            error: flood(30),
288        };
289        match policy.should_retry(&ctx) {
290            ControlFlow::Continue(d) => assert_eq!(d, Duration::from_secs(30)),
291            other => panic!("expected Continue, got {other:?}"),
292        }
293    }
294
295    #[test]
296    fn autosleep_breaks_flood_over_threshold() {
297        let policy = AutoSleep::default(); // threshold = 60s
298        let ctx = RetryContext {
299            fail_count: NonZeroU32::new(1).unwrap(),
300            slept_so_far: Duration::default(),
301            error: flood(120),
302        };
303        assert!(matches!(policy.should_retry(&ctx), ControlFlow::Break(())));
304    }
305
306    #[test]
307    fn autosleep_no_second_flood_retry() {
308        let policy = AutoSleep::default();
309        // fail_count == 2 → already retried once, should give up
310        let ctx = RetryContext {
311            fail_count: NonZeroU32::new(2).unwrap(),
312            slept_so_far: Duration::from_secs(30),
313            error: flood(30),
314        };
315        assert!(matches!(policy.should_retry(&ctx), ControlFlow::Break(())));
316    }
317
318    #[test]
319    fn autosleep_retries_io_once() {
320        let policy = AutoSleep::default();
321        let ctx = RetryContext {
322            fail_count: NonZeroU32::new(1).unwrap(),
323            slept_so_far: Duration::default(),
324            error: io_err(),
325        };
326        match policy.should_retry(&ctx) {
327            ControlFlow::Continue(d) => assert_eq!(d, Duration::from_secs(1)),
328            other => panic!("expected Continue, got {other:?}"),
329        }
330    }
331
332    #[test]
333    fn autosleep_no_second_io_retry() {
334        let policy = AutoSleep::default();
335        let ctx = RetryContext {
336            fail_count: NonZeroU32::new(2).unwrap(),
337            slept_so_far: Duration::from_secs(1),
338            error: io_err(),
339        };
340        assert!(matches!(policy.should_retry(&ctx), ControlFlow::Break(())));
341    }
342
343    #[test]
344    fn autosleep_breaks_other_rpc() {
345        let policy = AutoSleep::default();
346        let ctx = RetryContext {
347            fail_count: NonZeroU32::new(1).unwrap(),
348            slept_so_far: Duration::default(),
349            error: rpc(400, "BAD_REQUEST", None),
350        };
351        assert!(matches!(policy.should_retry(&ctx), ControlFlow::Break(())));
352    }
353
354    // RpcError::migrate_dc_id
355
356    #[test]
357    fn migrate_dc_id_detected() {
358        let e = crate::errors::RpcError {
359            code: 303,
360            name: "PHONE_MIGRATE".into(),
361            value: Some(5),
362        };
363        assert_eq!(e.migrate_dc_id(), Some(5));
364    }
365
366    #[test]
367    fn network_migrate_detected() {
368        let e = crate::errors::RpcError {
369            code: 303,
370            name: "NETWORK_MIGRATE".into(),
371            value: Some(3),
372        };
373        assert_eq!(e.migrate_dc_id(), Some(3));
374    }
375
376    #[test]
377    fn file_migrate_detected() {
378        let e = crate::errors::RpcError {
379            code: 303,
380            name: "FILE_MIGRATE".into(),
381            value: Some(4),
382        };
383        assert_eq!(e.migrate_dc_id(), Some(4));
384    }
385
386    #[test]
387    fn non_migrate_is_none() {
388        let e = crate::errors::RpcError {
389            code: 420,
390            name: "FLOOD_WAIT".into(),
391            value: Some(30),
392        };
393        assert_eq!(e.migrate_dc_id(), None);
394    }
395
396    #[test]
397    fn migrate_falls_back_to_dc2_when_no_value() {
398        let e = crate::errors::RpcError {
399            code: 303,
400            name: "PHONE_MIGRATE".into(),
401            value: None,
402        };
403        assert_eq!(e.migrate_dc_id(), Some(2));
404    }
405
406    // RetryLoop
407
408    #[tokio::test]
409    async fn retry_loop_gives_up_on_no_retries() {
410        let mut rl = RetryLoop::new(Arc::new(NoRetries));
411        let err = rpc(400, "SOMETHING_WRONG", None);
412        let result = rl.advance(err).await;
413        assert!(result.is_err());
414    }
415
416    #[tokio::test]
417    async fn retry_loop_increments_fail_count() {
418        let mut rl = RetryLoop::new(Arc::new(AutoSleep {
419            threshold: Duration::from_secs(60),
420            io_errors_as_flood_of: Some(Duration::from_millis(1)),
421        }));
422        assert!(rl.advance(io_err()).await.is_ok());
423        assert!(rl.advance(io_err()).await.is_err());
424    }
425}