Skip to main content

layer_client/
retry.rs

1// Copyright (c) Ankit Chaubey <ankitchaubey.dev@gmail.com>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4// NOTE:
5// The "Layer" project is no longer maintained or supported.
6// Its original purpose for personal SDK/APK experimentation and learning
7// has been fulfilled.
8//
9// Please use Ferogram instead:
10// https://github.com/ankit-chaubey/ferogram
11// Ferogram will receive future updates and development, although progress
12// may be slower.
13//
14// Ferogram is an async Telegram MTProto client library written in Rust.
15// Its implementation follows the behaviour of the official Telegram clients,
16// particularly Telegram Desktop and TDLib, and aims to provide a clean and
17// modern async interface for building Telegram clients and tools.
18
19//! Retry policies for handling `FLOOD_WAIT`, transient I/O errors, and DC-migration redirects.
20
21use std::num::NonZeroU32;
22use std::ops::ControlFlow;
23use std::sync::Arc;
24use std::time::Duration;
25
26use tokio::time::sleep;
27
28use crate::errors::InvocationError;
29
30/// Extension methods on [`crate::errors::RpcError`] for routing decisions.
31impl crate::errors::RpcError {
32    /// If this is a DC-migration redirect (code 303), returns the target DC id.
33    ///
34    /// Telegram sends these for:
35    /// - `PHONE_MIGRATE_X`  : user's home DC during auth
36    /// - `NETWORK_MIGRATE_X`: general redirect
37    /// - `FILE_MIGRATE_X`   : file download/upload DC
38    /// - `USER_MIGRATE_X`   : account migration
39    ///
40    /// All have `code == 303` and a numeric suffix that is the DC id.
41    pub fn migrate_dc_id(&self) -> Option<i32> {
42        if self.code != 303 {
43            return None;
44        }
45        //  pattern: any *_MIGRATE_* name with a numeric value
46        let is_migrate = self.name == "PHONE_MIGRATE"
47            || self.name == "NETWORK_MIGRATE"
48            || self.name == "FILE_MIGRATE"
49            || self.name == "USER_MIGRATE"
50            || self.name.ends_with("_MIGRATE");
51        if is_migrate {
52            // value is the DC id; fall back to DC 2 (Amsterdam) if missing
53            Some(self.value.unwrap_or(2) as i32)
54        } else {
55            None
56        }
57    }
58}
59
60/// Extension on [`InvocationError`] for migrate detection.
61impl InvocationError {
62    /// If this error is a DC-migration redirect, returns the target DC id.
63    pub fn migrate_dc_id(&self) -> Option<i32> {
64        match self {
65            Self::Rpc(r) => r.migrate_dc_id(),
66            _ => None,
67        }
68    }
69}
70
71// RetryPolicy trait
72
73/// Controls how the client reacts when an RPC call fails.
74///
75/// Implement this trait to provide custom flood-wait handling, circuit
76/// breakers, or exponential back-off.
77pub trait RetryPolicy: Send + Sync + 'static {
78    /// Decide whether to retry the failed request.
79    ///
80    /// Return `ControlFlow::Continue(delay)` to sleep `delay` and retry.
81    /// Return `ControlFlow::Break(())` to propagate `ctx.error` to the caller.
82    fn should_retry(&self, ctx: &RetryContext) -> ControlFlow<(), Duration>;
83}
84
85/// Context passed to [`RetryPolicy::should_retry`] on each failure.
86pub struct RetryContext {
87    /// Number of times this request has failed (starts at 1).
88    pub fail_count: NonZeroU32,
89    /// Total time already slept for this request across all prior retries.
90    pub slept_so_far: Duration,
91    /// The most recent error.
92    pub error: InvocationError,
93}
94
95// Built-in policies
96
97/// Never retry: propagate every error immediately.
98pub struct NoRetries;
99
100impl RetryPolicy for NoRetries {
101    fn should_retry(&self, _: &RetryContext) -> ControlFlow<(), Duration> {
102        ControlFlow::Break(())
103    }
104}
105
106/// Automatically sleep on `FLOOD_WAIT` and retry once on transient I/O errors.
107///
108/// Default retry policy. Sleeps on `FLOOD_WAIT`, backs off on I/O errors.
109///
110/// ```rust
111/// # use layer_client::retry::AutoSleep;
112/// let policy = AutoSleep {
113/// threshold: std::time::Duration::from_secs(60),
114/// io_errors_as_flood_of: Some(std::time::Duration::from_secs(1)),
115/// };
116/// ```
117pub struct AutoSleep {
118    /// Maximum flood-wait the library will automatically sleep through.
119    ///
120    /// If Telegram asks us to wait longer than this, the error is propagated.
121    pub threshold: Duration,
122
123    /// If `Some(d)`, treat the first I/O error as a `d`-second flood wait
124    /// and retry once.  `None` propagates I/O errors immediately.
125    pub io_errors_as_flood_of: Option<Duration>,
126}
127
128impl Default for AutoSleep {
129    fn default() -> Self {
130        Self {
131            threshold: Duration::from_secs(60),
132            io_errors_as_flood_of: Some(Duration::from_secs(1)),
133        }
134    }
135}
136
137impl RetryPolicy for AutoSleep {
138    fn should_retry(&self, ctx: &RetryContext) -> ControlFlow<(), Duration> {
139        match &ctx.error {
140            // FLOOD_WAIT: sleep exactly as long as Telegram asks, for every
141            // occurrence up to threshold. Removing the fail_count==1 guard
142            // means a second consecutive FLOOD_WAIT is also honoured rather
143            // than propagated immediately.
144            InvocationError::Rpc(rpc) if rpc.code == 420 && rpc.name == "FLOOD_WAIT" => {
145                let secs = rpc.value.unwrap_or(0) as u64;
146                if secs <= self.threshold.as_secs() {
147                    tracing::info!("FLOOD_WAIT_{secs}: sleeping before retry");
148                    ControlFlow::Continue(Duration::from_secs(secs))
149                } else {
150                    ControlFlow::Break(())
151                }
152            }
153
154            // SLOWMODE_WAIT: same semantics as FLOOD_WAIT; very common in
155            // group bots that send messages faster than the channel's slowmode.
156            InvocationError::Rpc(rpc) if rpc.code == 420 && rpc.name == "SLOWMODE_WAIT" => {
157                let secs = rpc.value.unwrap_or(0) as u64;
158                if secs <= self.threshold.as_secs() {
159                    tracing::info!("SLOWMODE_WAIT_{secs}: sleeping before retry");
160                    ControlFlow::Continue(Duration::from_secs(secs))
161                } else {
162                    ControlFlow::Break(())
163                }
164            }
165
166            // Transient I/O errors: back off briefly and retry once.
167            InvocationError::Io(_) if ctx.fail_count.get() == 1 => {
168                if let Some(d) = self.io_errors_as_flood_of {
169                    tracing::info!("I/O error: sleeping {d:?} before retry");
170                    ControlFlow::Continue(d)
171                } else {
172                    ControlFlow::Break(())
173                }
174            }
175
176            _ => ControlFlow::Break(()),
177        }
178    }
179}
180
181// RetryLoop
182
183/// Drives the retry loop for a single RPC call.
184///
185/// Create one per call, then call `advance` after every failure.
186///
187/// ```rust,ignore
188/// let mut rl = RetryLoop::new(Arc::clone(&self.inner.retry_policy));
189/// loop {
190/// match self.do_rpc_call(req).await {
191///     Ok(body) => return Ok(body),
192///     Err(e)   => rl.advance(e).await?,
193/// }
194/// }
195/// ```
196///
197/// `advance` either:
198/// - sleeps the required duration and returns `Ok(())` → caller should retry, or
199/// - returns `Err(e)` → caller should propagate.
200///
201/// This is the single source of truth; previously the same loop was
202/// copy-pasted into `rpc_call_raw`, `rpc_write`, and the reconnect path.
203pub struct RetryLoop {
204    policy: Arc<dyn RetryPolicy>,
205    ctx: RetryContext,
206}
207
208impl RetryLoop {
209    pub fn new(policy: Arc<dyn RetryPolicy>) -> Self {
210        Self {
211            policy,
212            ctx: RetryContext {
213                fail_count: NonZeroU32::new(1).unwrap(),
214                slept_so_far: Duration::default(),
215                error: InvocationError::Dropped,
216            },
217        }
218    }
219
220    /// Record a failure and either sleep+return-Ok (retry) or return-Err (give up).
221    ///
222    /// Mutates `self` to track cumulative state across retries.
223    pub async fn advance(&mut self, err: InvocationError) -> Result<(), InvocationError> {
224        self.ctx.error = err;
225        match self.policy.should_retry(&self.ctx) {
226            ControlFlow::Continue(delay) => {
227                sleep(delay).await;
228                self.ctx.slept_so_far += delay;
229                // saturating_add: if somehow we overflow NonZeroU32, clamp at MAX
230                self.ctx.fail_count = self.ctx.fail_count.saturating_add(1);
231                Ok(())
232            }
233            ControlFlow::Break(()) => {
234                // Move the error out so the caller doesn't have to clone it
235                Err(std::mem::replace(
236                    &mut self.ctx.error,
237                    InvocationError::Dropped,
238                ))
239            }
240        }
241    }
242}
243
244// Tests
245
246#[cfg(test)]
247mod tests {
248    use super::*;
249    use std::io;
250
251    fn flood(secs: u32) -> InvocationError {
252        InvocationError::Rpc(crate::errors::RpcError {
253            code: 420,
254            name: "FLOOD_WAIT".into(),
255            value: Some(secs),
256        })
257    }
258
259    fn io_err() -> InvocationError {
260        InvocationError::Io(io::Error::new(io::ErrorKind::ConnectionReset, "reset"))
261    }
262
263    fn rpc(code: i32, name: &str, value: Option<u32>) -> InvocationError {
264        InvocationError::Rpc(crate::errors::RpcError {
265            code,
266            name: name.into(),
267            value,
268        })
269    }
270
271    // NoRetries
272
273    #[test]
274    fn no_retries_always_breaks() {
275        let policy = NoRetries;
276        let ctx = RetryContext {
277            fail_count: NonZeroU32::new(1).unwrap(),
278            slept_so_far: Duration::default(),
279            error: flood(10),
280        };
281        assert!(matches!(policy.should_retry(&ctx), ControlFlow::Break(())));
282    }
283
284    // AutoSleep
285
286    #[test]
287    fn autosleep_retries_flood_under_threshold() {
288        let policy = AutoSleep::default(); // threshold = 60s
289        let ctx = RetryContext {
290            fail_count: NonZeroU32::new(1).unwrap(),
291            slept_so_far: Duration::default(),
292            error: flood(30),
293        };
294        match policy.should_retry(&ctx) {
295            ControlFlow::Continue(d) => assert_eq!(d, Duration::from_secs(30)),
296            other => panic!("expected Continue, got {other:?}"),
297        }
298    }
299
300    #[test]
301    fn autosleep_breaks_flood_over_threshold() {
302        let policy = AutoSleep::default(); // threshold = 60s
303        let ctx = RetryContext {
304            fail_count: NonZeroU32::new(1).unwrap(),
305            slept_so_far: Duration::default(),
306            error: flood(120),
307        };
308        assert!(matches!(policy.should_retry(&ctx), ControlFlow::Break(())));
309    }
310
311    #[test]
312    fn autosleep_no_second_flood_retry() {
313        let policy = AutoSleep::default();
314        // fail_count == 2 → already retried once, should give up
315        let ctx = RetryContext {
316            fail_count: NonZeroU32::new(2).unwrap(),
317            slept_so_far: Duration::from_secs(30),
318            error: flood(30),
319        };
320        assert!(matches!(policy.should_retry(&ctx), ControlFlow::Break(())));
321    }
322
323    #[test]
324    fn autosleep_retries_io_once() {
325        let policy = AutoSleep::default();
326        let ctx = RetryContext {
327            fail_count: NonZeroU32::new(1).unwrap(),
328            slept_so_far: Duration::default(),
329            error: io_err(),
330        };
331        match policy.should_retry(&ctx) {
332            ControlFlow::Continue(d) => assert_eq!(d, Duration::from_secs(1)),
333            other => panic!("expected Continue, got {other:?}"),
334        }
335    }
336
337    #[test]
338    fn autosleep_no_second_io_retry() {
339        let policy = AutoSleep::default();
340        let ctx = RetryContext {
341            fail_count: NonZeroU32::new(2).unwrap(),
342            slept_so_far: Duration::from_secs(1),
343            error: io_err(),
344        };
345        assert!(matches!(policy.should_retry(&ctx), ControlFlow::Break(())));
346    }
347
348    #[test]
349    fn autosleep_breaks_other_rpc() {
350        let policy = AutoSleep::default();
351        let ctx = RetryContext {
352            fail_count: NonZeroU32::new(1).unwrap(),
353            slept_so_far: Duration::default(),
354            error: rpc(400, "BAD_REQUEST", None),
355        };
356        assert!(matches!(policy.should_retry(&ctx), ControlFlow::Break(())));
357    }
358
359    // RpcError::migrate_dc_id
360
361    #[test]
362    fn migrate_dc_id_detected() {
363        let e = crate::errors::RpcError {
364            code: 303,
365            name: "PHONE_MIGRATE".into(),
366            value: Some(5),
367        };
368        assert_eq!(e.migrate_dc_id(), Some(5));
369    }
370
371    #[test]
372    fn network_migrate_detected() {
373        let e = crate::errors::RpcError {
374            code: 303,
375            name: "NETWORK_MIGRATE".into(),
376            value: Some(3),
377        };
378        assert_eq!(e.migrate_dc_id(), Some(3));
379    }
380
381    #[test]
382    fn file_migrate_detected() {
383        let e = crate::errors::RpcError {
384            code: 303,
385            name: "FILE_MIGRATE".into(),
386            value: Some(4),
387        };
388        assert_eq!(e.migrate_dc_id(), Some(4));
389    }
390
391    #[test]
392    fn non_migrate_is_none() {
393        let e = crate::errors::RpcError {
394            code: 420,
395            name: "FLOOD_WAIT".into(),
396            value: Some(30),
397        };
398        assert_eq!(e.migrate_dc_id(), None);
399    }
400
401    #[test]
402    fn migrate_falls_back_to_dc2_when_no_value() {
403        let e = crate::errors::RpcError {
404            code: 303,
405            name: "PHONE_MIGRATE".into(),
406            value: None,
407        };
408        assert_eq!(e.migrate_dc_id(), Some(2));
409    }
410
411    // RetryLoop
412
413    #[tokio::test]
414    async fn retry_loop_gives_up_on_no_retries() {
415        let mut rl = RetryLoop::new(Arc::new(NoRetries));
416        let err = rpc(400, "SOMETHING_WRONG", None);
417        let result = rl.advance(err).await;
418        assert!(result.is_err());
419    }
420
421    #[tokio::test]
422    async fn retry_loop_increments_fail_count() {
423        let mut rl = RetryLoop::new(Arc::new(AutoSleep {
424            threshold: Duration::from_secs(60),
425            io_errors_as_flood_of: Some(Duration::from_millis(1)),
426        }));
427        assert!(rl.advance(io_err()).await.is_ok());
428        assert!(rl.advance(io_err()).await.is_err());
429    }
430}