1use std::num::NonZeroU32;
4use std::ops::ControlFlow;
5use std::sync::Arc;
6use std::time::Duration;
7
8use tokio::time::sleep;
9
10use crate::errors::InvocationError;
11
12impl crate::errors::RpcError {
14 pub fn migrate_dc_id(&self) -> Option<i32> {
24 if self.code != 303 {
25 return None;
26 }
27 let is_migrate = self.name == "PHONE_MIGRATE"
29 || self.name == "NETWORK_MIGRATE"
30 || self.name == "FILE_MIGRATE"
31 || self.name == "USER_MIGRATE"
32 || self.name.ends_with("_MIGRATE");
33 if is_migrate {
34 Some(self.value.unwrap_or(2) as i32)
36 } else {
37 None
38 }
39 }
40}
41
42impl InvocationError {
44 pub fn migrate_dc_id(&self) -> Option<i32> {
46 match self {
47 Self::Rpc(r) => r.migrate_dc_id(),
48 _ => None,
49 }
50 }
51}
52
53pub trait RetryPolicy: Send + Sync + 'static {
60 fn should_retry(&self, ctx: &RetryContext) -> ControlFlow<(), Duration>;
65}
66
67pub struct RetryContext {
69 pub fail_count: NonZeroU32,
71 pub slept_so_far: Duration,
73 pub error: InvocationError,
75}
76
77pub struct NoRetries;
81
82impl RetryPolicy for NoRetries {
83 fn should_retry(&self, _: &RetryContext) -> ControlFlow<(), Duration> {
84 ControlFlow::Break(())
85 }
86}
87
88pub struct AutoSleep {
100 pub threshold: Duration,
104
105 pub io_errors_as_flood_of: Option<Duration>,
108}
109
110impl Default for AutoSleep {
111 fn default() -> Self {
112 Self {
113 threshold: Duration::from_secs(60),
114 io_errors_as_flood_of: Some(Duration::from_secs(1)),
115 }
116 }
117}
118
119impl RetryPolicy for AutoSleep {
120 fn should_retry(&self, ctx: &RetryContext) -> ControlFlow<(), Duration> {
121 match &ctx.error {
122 InvocationError::Rpc(rpc) if rpc.code == 420 && rpc.name == "FLOOD_WAIT" => {
127 let secs = rpc.value.unwrap_or(0) as u64;
128 if secs <= self.threshold.as_secs() {
129 tracing::info!("FLOOD_WAIT_{secs}: sleeping before retry");
130 ControlFlow::Continue(Duration::from_secs(secs))
131 } else {
132 ControlFlow::Break(())
133 }
134 }
135
136 InvocationError::Rpc(rpc) if rpc.code == 420 && rpc.name == "SLOWMODE_WAIT" => {
139 let secs = rpc.value.unwrap_or(0) as u64;
140 if secs <= self.threshold.as_secs() {
141 tracing::info!("SLOWMODE_WAIT_{secs}: sleeping before retry");
142 ControlFlow::Continue(Duration::from_secs(secs))
143 } else {
144 ControlFlow::Break(())
145 }
146 }
147
148 InvocationError::Io(_) if ctx.fail_count.get() == 1 => {
150 if let Some(d) = self.io_errors_as_flood_of {
151 tracing::info!("I/O error: sleeping {d:?} before retry");
152 ControlFlow::Continue(d)
153 } else {
154 ControlFlow::Break(())
155 }
156 }
157
158 _ => ControlFlow::Break(()),
159 }
160 }
161}
162
163pub struct RetryLoop {
186 policy: Arc<dyn RetryPolicy>,
187 ctx: RetryContext,
188}
189
190impl RetryLoop {
191 pub fn new(policy: Arc<dyn RetryPolicy>) -> Self {
192 Self {
193 policy,
194 ctx: RetryContext {
195 fail_count: NonZeroU32::new(1).unwrap(),
196 slept_so_far: Duration::default(),
197 error: InvocationError::Dropped,
198 },
199 }
200 }
201
202 pub async fn advance(&mut self, err: InvocationError) -> Result<(), InvocationError> {
206 self.ctx.error = err;
207 match self.policy.should_retry(&self.ctx) {
208 ControlFlow::Continue(delay) => {
209 sleep(delay).await;
210 self.ctx.slept_so_far += delay;
211 self.ctx.fail_count = self.ctx.fail_count.saturating_add(1);
213 Ok(())
214 }
215 ControlFlow::Break(()) => {
216 Err(std::mem::replace(
218 &mut self.ctx.error,
219 InvocationError::Dropped,
220 ))
221 }
222 }
223 }
224}
225
226#[cfg(test)]
229mod tests {
230 use super::*;
231 use std::io;
232
233 fn flood(secs: u32) -> InvocationError {
234 InvocationError::Rpc(crate::errors::RpcError {
235 code: 420,
236 name: "FLOOD_WAIT".into(),
237 value: Some(secs),
238 })
239 }
240
241 fn io_err() -> InvocationError {
242 InvocationError::Io(io::Error::new(io::ErrorKind::ConnectionReset, "reset"))
243 }
244
245 fn rpc(code: i32, name: &str, value: Option<u32>) -> InvocationError {
246 InvocationError::Rpc(crate::errors::RpcError {
247 code,
248 name: name.into(),
249 value,
250 })
251 }
252
253 #[test]
256 fn no_retries_always_breaks() {
257 let policy = NoRetries;
258 let ctx = RetryContext {
259 fail_count: NonZeroU32::new(1).unwrap(),
260 slept_so_far: Duration::default(),
261 error: flood(10),
262 };
263 assert!(matches!(policy.should_retry(&ctx), ControlFlow::Break(())));
264 }
265
266 #[test]
269 fn autosleep_retries_flood_under_threshold() {
270 let policy = AutoSleep::default(); let ctx = RetryContext {
272 fail_count: NonZeroU32::new(1).unwrap(),
273 slept_so_far: Duration::default(),
274 error: flood(30),
275 };
276 match policy.should_retry(&ctx) {
277 ControlFlow::Continue(d) => assert_eq!(d, Duration::from_secs(30)),
278 other => panic!("expected Continue, got {other:?}"),
279 }
280 }
281
282 #[test]
283 fn autosleep_breaks_flood_over_threshold() {
284 let policy = AutoSleep::default(); let ctx = RetryContext {
286 fail_count: NonZeroU32::new(1).unwrap(),
287 slept_so_far: Duration::default(),
288 error: flood(120),
289 };
290 assert!(matches!(policy.should_retry(&ctx), ControlFlow::Break(())));
291 }
292
293 #[test]
294 fn autosleep_no_second_flood_retry() {
295 let policy = AutoSleep::default();
296 let ctx = RetryContext {
298 fail_count: NonZeroU32::new(2).unwrap(),
299 slept_so_far: Duration::from_secs(30),
300 error: flood(30),
301 };
302 assert!(matches!(policy.should_retry(&ctx), ControlFlow::Break(())));
303 }
304
305 #[test]
306 fn autosleep_retries_io_once() {
307 let policy = AutoSleep::default();
308 let ctx = RetryContext {
309 fail_count: NonZeroU32::new(1).unwrap(),
310 slept_so_far: Duration::default(),
311 error: io_err(),
312 };
313 match policy.should_retry(&ctx) {
314 ControlFlow::Continue(d) => assert_eq!(d, Duration::from_secs(1)),
315 other => panic!("expected Continue, got {other:?}"),
316 }
317 }
318
319 #[test]
320 fn autosleep_no_second_io_retry() {
321 let policy = AutoSleep::default();
322 let ctx = RetryContext {
323 fail_count: NonZeroU32::new(2).unwrap(),
324 slept_so_far: Duration::from_secs(1),
325 error: io_err(),
326 };
327 assert!(matches!(policy.should_retry(&ctx), ControlFlow::Break(())));
328 }
329
330 #[test]
331 fn autosleep_breaks_other_rpc() {
332 let policy = AutoSleep::default();
333 let ctx = RetryContext {
334 fail_count: NonZeroU32::new(1).unwrap(),
335 slept_so_far: Duration::default(),
336 error: rpc(400, "BAD_REQUEST", None),
337 };
338 assert!(matches!(policy.should_retry(&ctx), ControlFlow::Break(())));
339 }
340
341 #[test]
344 fn migrate_dc_id_detected() {
345 let e = crate::errors::RpcError {
346 code: 303,
347 name: "PHONE_MIGRATE".into(),
348 value: Some(5),
349 };
350 assert_eq!(e.migrate_dc_id(), Some(5));
351 }
352
353 #[test]
354 fn network_migrate_detected() {
355 let e = crate::errors::RpcError {
356 code: 303,
357 name: "NETWORK_MIGRATE".into(),
358 value: Some(3),
359 };
360 assert_eq!(e.migrate_dc_id(), Some(3));
361 }
362
363 #[test]
364 fn file_migrate_detected() {
365 let e = crate::errors::RpcError {
366 code: 303,
367 name: "FILE_MIGRATE".into(),
368 value: Some(4),
369 };
370 assert_eq!(e.migrate_dc_id(), Some(4));
371 }
372
373 #[test]
374 fn non_migrate_is_none() {
375 let e = crate::errors::RpcError {
376 code: 420,
377 name: "FLOOD_WAIT".into(),
378 value: Some(30),
379 };
380 assert_eq!(e.migrate_dc_id(), None);
381 }
382
383 #[test]
384 fn migrate_falls_back_to_dc2_when_no_value() {
385 let e = crate::errors::RpcError {
386 code: 303,
387 name: "PHONE_MIGRATE".into(),
388 value: None,
389 };
390 assert_eq!(e.migrate_dc_id(), Some(2));
391 }
392
393 #[tokio::test]
396 async fn retry_loop_gives_up_on_no_retries() {
397 let mut rl = RetryLoop::new(Arc::new(NoRetries));
398 let err = rpc(400, "SOMETHING_WRONG", None);
399 let result = rl.advance(err).await;
400 assert!(result.is_err());
401 }
402
403 #[tokio::test]
404 async fn retry_loop_increments_fail_count() {
405 let mut rl = RetryLoop::new(Arc::new(AutoSleep {
406 threshold: Duration::from_secs(60),
407 io_errors_as_flood_of: Some(Duration::from_millis(1)),
408 }));
409 assert!(rl.advance(io_err()).await.is_ok());
410 assert!(rl.advance(io_err()).await.is_err());
411 }
412}