1use std::num::NonZeroU32;
17use std::ops::ControlFlow;
18use std::sync::Arc;
19use std::time::Duration;
20
21use tokio::time::sleep;
22
23use crate::errors::InvocationError;
24
25impl crate::errors::RpcError {
27 pub fn migrate_dc_id(&self) -> Option<i32> {
37 if self.code != 303 {
38 return None;
39 }
40 let is_migrate = self.name == "PHONE_MIGRATE"
42 || self.name == "NETWORK_MIGRATE"
43 || self.name == "FILE_MIGRATE"
44 || self.name == "USER_MIGRATE"
45 || self.name.ends_with("_MIGRATE");
46 if is_migrate {
47 Some(self.value.unwrap_or(2) as i32)
49 } else {
50 None
51 }
52 }
53}
54
55impl InvocationError {
57 pub fn migrate_dc_id(&self) -> Option<i32> {
59 match self {
60 Self::Rpc(r) => r.migrate_dc_id(),
61 _ => None,
62 }
63 }
64}
65
66pub trait RetryPolicy: Send + Sync + 'static {
73 fn should_retry(&self, ctx: &RetryContext) -> ControlFlow<(), Duration>;
78}
79
80pub struct RetryContext {
82 pub fail_count: NonZeroU32,
84 pub slept_so_far: Duration,
86 pub error: InvocationError,
88}
89
90pub struct NoRetries;
94
95impl RetryPolicy for NoRetries {
96 fn should_retry(&self, _: &RetryContext) -> ControlFlow<(), Duration> {
97 ControlFlow::Break(())
98 }
99}
100
101pub struct AutoSleep {
113 pub threshold: Duration,
117
118 pub io_errors_as_flood_of: Option<Duration>,
121}
122
123impl Default for AutoSleep {
124 fn default() -> Self {
125 Self {
126 threshold: Duration::from_secs(60),
127 io_errors_as_flood_of: Some(Duration::from_secs(1)),
128 }
129 }
130}
131
132impl RetryPolicy for AutoSleep {
133 fn should_retry(&self, ctx: &RetryContext) -> ControlFlow<(), Duration> {
134 match &ctx.error {
135 InvocationError::Rpc(rpc) if rpc.code == 420 && rpc.name == "FLOOD_WAIT" => {
140 let secs = rpc.value.unwrap_or(0) as u64;
141 if secs <= self.threshold.as_secs() {
142 tracing::info!("FLOOD_WAIT_{secs}: sleeping before retry");
143 ControlFlow::Continue(Duration::from_secs(secs))
144 } else {
145 ControlFlow::Break(())
146 }
147 }
148
149 InvocationError::Rpc(rpc) if rpc.code == 420 && rpc.name == "SLOWMODE_WAIT" => {
152 let secs = rpc.value.unwrap_or(0) as u64;
153 if secs <= self.threshold.as_secs() {
154 tracing::info!("SLOWMODE_WAIT_{secs}: sleeping before retry");
155 ControlFlow::Continue(Duration::from_secs(secs))
156 } else {
157 ControlFlow::Break(())
158 }
159 }
160
161 InvocationError::Io(_) if ctx.fail_count.get() == 1 => {
163 if let Some(d) = self.io_errors_as_flood_of {
164 tracing::info!("I/O error: sleeping {d:?} before retry");
165 ControlFlow::Continue(d)
166 } else {
167 ControlFlow::Break(())
168 }
169 }
170
171 _ => ControlFlow::Break(()),
172 }
173 }
174}
175
176pub struct RetryLoop {
199 policy: Arc<dyn RetryPolicy>,
200 ctx: RetryContext,
201}
202
203impl RetryLoop {
204 pub fn new(policy: Arc<dyn RetryPolicy>) -> Self {
205 Self {
206 policy,
207 ctx: RetryContext {
208 fail_count: NonZeroU32::new(1).unwrap(),
209 slept_so_far: Duration::default(),
210 error: InvocationError::Dropped,
211 },
212 }
213 }
214
215 pub async fn advance(&mut self, err: InvocationError) -> Result<(), InvocationError> {
219 self.ctx.error = err;
220 match self.policy.should_retry(&self.ctx) {
221 ControlFlow::Continue(delay) => {
222 sleep(delay).await;
223 self.ctx.slept_so_far += delay;
224 self.ctx.fail_count = self.ctx.fail_count.saturating_add(1);
226 Ok(())
227 }
228 ControlFlow::Break(()) => {
229 Err(std::mem::replace(
231 &mut self.ctx.error,
232 InvocationError::Dropped,
233 ))
234 }
235 }
236 }
237}
238
239#[cfg(test)]
242mod tests {
243 use super::*;
244 use std::io;
245
246 fn flood(secs: u32) -> InvocationError {
247 InvocationError::Rpc(crate::errors::RpcError {
248 code: 420,
249 name: "FLOOD_WAIT".into(),
250 value: Some(secs),
251 })
252 }
253
254 fn io_err() -> InvocationError {
255 InvocationError::Io(io::Error::new(io::ErrorKind::ConnectionReset, "reset"))
256 }
257
258 fn rpc(code: i32, name: &str, value: Option<u32>) -> InvocationError {
259 InvocationError::Rpc(crate::errors::RpcError {
260 code,
261 name: name.into(),
262 value,
263 })
264 }
265
266 #[test]
269 fn no_retries_always_breaks() {
270 let policy = NoRetries;
271 let ctx = RetryContext {
272 fail_count: NonZeroU32::new(1).unwrap(),
273 slept_so_far: Duration::default(),
274 error: flood(10),
275 };
276 assert!(matches!(policy.should_retry(&ctx), ControlFlow::Break(())));
277 }
278
279 #[test]
282 fn autosleep_retries_flood_under_threshold() {
283 let policy = AutoSleep::default(); let ctx = RetryContext {
285 fail_count: NonZeroU32::new(1).unwrap(),
286 slept_so_far: Duration::default(),
287 error: flood(30),
288 };
289 match policy.should_retry(&ctx) {
290 ControlFlow::Continue(d) => assert_eq!(d, Duration::from_secs(30)),
291 other => panic!("expected Continue, got {other:?}"),
292 }
293 }
294
295 #[test]
296 fn autosleep_breaks_flood_over_threshold() {
297 let policy = AutoSleep::default(); let ctx = RetryContext {
299 fail_count: NonZeroU32::new(1).unwrap(),
300 slept_so_far: Duration::default(),
301 error: flood(120),
302 };
303 assert!(matches!(policy.should_retry(&ctx), ControlFlow::Break(())));
304 }
305
306 #[test]
307 fn autosleep_no_second_flood_retry() {
308 let policy = AutoSleep::default();
309 let ctx = RetryContext {
311 fail_count: NonZeroU32::new(2).unwrap(),
312 slept_so_far: Duration::from_secs(30),
313 error: flood(30),
314 };
315 assert!(matches!(policy.should_retry(&ctx), ControlFlow::Break(())));
316 }
317
318 #[test]
319 fn autosleep_retries_io_once() {
320 let policy = AutoSleep::default();
321 let ctx = RetryContext {
322 fail_count: NonZeroU32::new(1).unwrap(),
323 slept_so_far: Duration::default(),
324 error: io_err(),
325 };
326 match policy.should_retry(&ctx) {
327 ControlFlow::Continue(d) => assert_eq!(d, Duration::from_secs(1)),
328 other => panic!("expected Continue, got {other:?}"),
329 }
330 }
331
332 #[test]
333 fn autosleep_no_second_io_retry() {
334 let policy = AutoSleep::default();
335 let ctx = RetryContext {
336 fail_count: NonZeroU32::new(2).unwrap(),
337 slept_so_far: Duration::from_secs(1),
338 error: io_err(),
339 };
340 assert!(matches!(policy.should_retry(&ctx), ControlFlow::Break(())));
341 }
342
343 #[test]
344 fn autosleep_breaks_other_rpc() {
345 let policy = AutoSleep::default();
346 let ctx = RetryContext {
347 fail_count: NonZeroU32::new(1).unwrap(),
348 slept_so_far: Duration::default(),
349 error: rpc(400, "BAD_REQUEST", None),
350 };
351 assert!(matches!(policy.should_retry(&ctx), ControlFlow::Break(())));
352 }
353
354 #[test]
357 fn migrate_dc_id_detected() {
358 let e = crate::errors::RpcError {
359 code: 303,
360 name: "PHONE_MIGRATE".into(),
361 value: Some(5),
362 };
363 assert_eq!(e.migrate_dc_id(), Some(5));
364 }
365
366 #[test]
367 fn network_migrate_detected() {
368 let e = crate::errors::RpcError {
369 code: 303,
370 name: "NETWORK_MIGRATE".into(),
371 value: Some(3),
372 };
373 assert_eq!(e.migrate_dc_id(), Some(3));
374 }
375
376 #[test]
377 fn file_migrate_detected() {
378 let e = crate::errors::RpcError {
379 code: 303,
380 name: "FILE_MIGRATE".into(),
381 value: Some(4),
382 };
383 assert_eq!(e.migrate_dc_id(), Some(4));
384 }
385
386 #[test]
387 fn non_migrate_is_none() {
388 let e = crate::errors::RpcError {
389 code: 420,
390 name: "FLOOD_WAIT".into(),
391 value: Some(30),
392 };
393 assert_eq!(e.migrate_dc_id(), None);
394 }
395
396 #[test]
397 fn migrate_falls_back_to_dc2_when_no_value() {
398 let e = crate::errors::RpcError {
399 code: 303,
400 name: "PHONE_MIGRATE".into(),
401 value: None,
402 };
403 assert_eq!(e.migrate_dc_id(), Some(2));
404 }
405
406 #[tokio::test]
409 async fn retry_loop_gives_up_on_no_retries() {
410 let mut rl = RetryLoop::new(Arc::new(NoRetries));
411 let err = rpc(400, "SOMETHING_WRONG", None);
412 let result = rl.advance(err).await;
413 assert!(result.is_err());
414 }
415
416 #[tokio::test]
417 async fn retry_loop_increments_fail_count() {
418 let mut rl = RetryLoop::new(Arc::new(AutoSleep {
419 threshold: Duration::from_secs(60),
420 io_errors_as_flood_of: Some(Duration::from_millis(1)),
421 }));
422 assert!(rl.advance(io_err()).await.is_ok());
423 assert!(rl.advance(io_err()).await.is_err());
424 }
425}