1use std::num::NonZeroU32;
22use std::ops::ControlFlow;
23use std::sync::Arc;
24use std::time::Duration;
25
26use tokio::time::sleep;
27
28use crate::errors::InvocationError;
29
30impl crate::errors::RpcError {
32 pub fn migrate_dc_id(&self) -> Option<i32> {
42 if self.code != 303 {
43 return None;
44 }
45 let is_migrate = self.name == "PHONE_MIGRATE"
47 || self.name == "NETWORK_MIGRATE"
48 || self.name == "FILE_MIGRATE"
49 || self.name == "USER_MIGRATE"
50 || self.name.ends_with("_MIGRATE");
51 if is_migrate {
52 Some(self.value.unwrap_or(2) as i32)
54 } else {
55 None
56 }
57 }
58}
59
60impl InvocationError {
62 pub fn migrate_dc_id(&self) -> Option<i32> {
64 match self {
65 Self::Rpc(r) => r.migrate_dc_id(),
66 _ => None,
67 }
68 }
69}
70
71pub trait RetryPolicy: Send + Sync + 'static {
78 fn should_retry(&self, ctx: &RetryContext) -> ControlFlow<(), Duration>;
83}
84
85pub struct RetryContext {
87 pub fail_count: NonZeroU32,
89 pub slept_so_far: Duration,
91 pub error: InvocationError,
93}
94
95pub struct NoRetries;
99
100impl RetryPolicy for NoRetries {
101 fn should_retry(&self, _: &RetryContext) -> ControlFlow<(), Duration> {
102 ControlFlow::Break(())
103 }
104}
105
106pub struct AutoSleep {
118 pub threshold: Duration,
122
123 pub io_errors_as_flood_of: Option<Duration>,
126}
127
128impl Default for AutoSleep {
129 fn default() -> Self {
130 Self {
131 threshold: Duration::from_secs(60),
132 io_errors_as_flood_of: Some(Duration::from_secs(1)),
133 }
134 }
135}
136
137impl RetryPolicy for AutoSleep {
138 fn should_retry(&self, ctx: &RetryContext) -> ControlFlow<(), Duration> {
139 match &ctx.error {
140 InvocationError::Rpc(rpc) if rpc.code == 420 && rpc.name == "FLOOD_WAIT" => {
145 let secs = rpc.value.unwrap_or(0) as u64;
146 if secs <= self.threshold.as_secs() {
147 tracing::info!("FLOOD_WAIT_{secs}: sleeping before retry");
148 ControlFlow::Continue(Duration::from_secs(secs))
149 } else {
150 ControlFlow::Break(())
151 }
152 }
153
154 InvocationError::Rpc(rpc) if rpc.code == 420 && rpc.name == "SLOWMODE_WAIT" => {
157 let secs = rpc.value.unwrap_or(0) as u64;
158 if secs <= self.threshold.as_secs() {
159 tracing::info!("SLOWMODE_WAIT_{secs}: sleeping before retry");
160 ControlFlow::Continue(Duration::from_secs(secs))
161 } else {
162 ControlFlow::Break(())
163 }
164 }
165
166 InvocationError::Io(_) if ctx.fail_count.get() == 1 => {
168 if let Some(d) = self.io_errors_as_flood_of {
169 tracing::info!("I/O error: sleeping {d:?} before retry");
170 ControlFlow::Continue(d)
171 } else {
172 ControlFlow::Break(())
173 }
174 }
175
176 _ => ControlFlow::Break(()),
177 }
178 }
179}
180
181pub struct RetryLoop {
204 policy: Arc<dyn RetryPolicy>,
205 ctx: RetryContext,
206}
207
208impl RetryLoop {
209 pub fn new(policy: Arc<dyn RetryPolicy>) -> Self {
210 Self {
211 policy,
212 ctx: RetryContext {
213 fail_count: NonZeroU32::new(1).unwrap(),
214 slept_so_far: Duration::default(),
215 error: InvocationError::Dropped,
216 },
217 }
218 }
219
220 pub async fn advance(&mut self, err: InvocationError) -> Result<(), InvocationError> {
224 self.ctx.error = err;
225 match self.policy.should_retry(&self.ctx) {
226 ControlFlow::Continue(delay) => {
227 sleep(delay).await;
228 self.ctx.slept_so_far += delay;
229 self.ctx.fail_count = self.ctx.fail_count.saturating_add(1);
231 Ok(())
232 }
233 ControlFlow::Break(()) => {
234 Err(std::mem::replace(
236 &mut self.ctx.error,
237 InvocationError::Dropped,
238 ))
239 }
240 }
241 }
242}
243
244#[cfg(test)]
247mod tests {
248 use super::*;
249 use std::io;
250
251 fn flood(secs: u32) -> InvocationError {
252 InvocationError::Rpc(crate::errors::RpcError {
253 code: 420,
254 name: "FLOOD_WAIT".into(),
255 value: Some(secs),
256 })
257 }
258
259 fn io_err() -> InvocationError {
260 InvocationError::Io(io::Error::new(io::ErrorKind::ConnectionReset, "reset"))
261 }
262
263 fn rpc(code: i32, name: &str, value: Option<u32>) -> InvocationError {
264 InvocationError::Rpc(crate::errors::RpcError {
265 code,
266 name: name.into(),
267 value,
268 })
269 }
270
271 #[test]
274 fn no_retries_always_breaks() {
275 let policy = NoRetries;
276 let ctx = RetryContext {
277 fail_count: NonZeroU32::new(1).unwrap(),
278 slept_so_far: Duration::default(),
279 error: flood(10),
280 };
281 assert!(matches!(policy.should_retry(&ctx), ControlFlow::Break(())));
282 }
283
284 #[test]
287 fn autosleep_retries_flood_under_threshold() {
288 let policy = AutoSleep::default(); let ctx = RetryContext {
290 fail_count: NonZeroU32::new(1).unwrap(),
291 slept_so_far: Duration::default(),
292 error: flood(30),
293 };
294 match policy.should_retry(&ctx) {
295 ControlFlow::Continue(d) => assert_eq!(d, Duration::from_secs(30)),
296 other => panic!("expected Continue, got {other:?}"),
297 }
298 }
299
300 #[test]
301 fn autosleep_breaks_flood_over_threshold() {
302 let policy = AutoSleep::default(); let ctx = RetryContext {
304 fail_count: NonZeroU32::new(1).unwrap(),
305 slept_so_far: Duration::default(),
306 error: flood(120),
307 };
308 assert!(matches!(policy.should_retry(&ctx), ControlFlow::Break(())));
309 }
310
311 #[test]
312 fn autosleep_no_second_flood_retry() {
313 let policy = AutoSleep::default();
314 let ctx = RetryContext {
316 fail_count: NonZeroU32::new(2).unwrap(),
317 slept_so_far: Duration::from_secs(30),
318 error: flood(30),
319 };
320 assert!(matches!(policy.should_retry(&ctx), ControlFlow::Break(())));
321 }
322
323 #[test]
324 fn autosleep_retries_io_once() {
325 let policy = AutoSleep::default();
326 let ctx = RetryContext {
327 fail_count: NonZeroU32::new(1).unwrap(),
328 slept_so_far: Duration::default(),
329 error: io_err(),
330 };
331 match policy.should_retry(&ctx) {
332 ControlFlow::Continue(d) => assert_eq!(d, Duration::from_secs(1)),
333 other => panic!("expected Continue, got {other:?}"),
334 }
335 }
336
337 #[test]
338 fn autosleep_no_second_io_retry() {
339 let policy = AutoSleep::default();
340 let ctx = RetryContext {
341 fail_count: NonZeroU32::new(2).unwrap(),
342 slept_so_far: Duration::from_secs(1),
343 error: io_err(),
344 };
345 assert!(matches!(policy.should_retry(&ctx), ControlFlow::Break(())));
346 }
347
348 #[test]
349 fn autosleep_breaks_other_rpc() {
350 let policy = AutoSleep::default();
351 let ctx = RetryContext {
352 fail_count: NonZeroU32::new(1).unwrap(),
353 slept_so_far: Duration::default(),
354 error: rpc(400, "BAD_REQUEST", None),
355 };
356 assert!(matches!(policy.should_retry(&ctx), ControlFlow::Break(())));
357 }
358
359 #[test]
362 fn migrate_dc_id_detected() {
363 let e = crate::errors::RpcError {
364 code: 303,
365 name: "PHONE_MIGRATE".into(),
366 value: Some(5),
367 };
368 assert_eq!(e.migrate_dc_id(), Some(5));
369 }
370
371 #[test]
372 fn network_migrate_detected() {
373 let e = crate::errors::RpcError {
374 code: 303,
375 name: "NETWORK_MIGRATE".into(),
376 value: Some(3),
377 };
378 assert_eq!(e.migrate_dc_id(), Some(3));
379 }
380
381 #[test]
382 fn file_migrate_detected() {
383 let e = crate::errors::RpcError {
384 code: 303,
385 name: "FILE_MIGRATE".into(),
386 value: Some(4),
387 };
388 assert_eq!(e.migrate_dc_id(), Some(4));
389 }
390
391 #[test]
392 fn non_migrate_is_none() {
393 let e = crate::errors::RpcError {
394 code: 420,
395 name: "FLOOD_WAIT".into(),
396 value: Some(30),
397 };
398 assert_eq!(e.migrate_dc_id(), None);
399 }
400
401 #[test]
402 fn migrate_falls_back_to_dc2_when_no_value() {
403 let e = crate::errors::RpcError {
404 code: 303,
405 name: "PHONE_MIGRATE".into(),
406 value: None,
407 };
408 assert_eq!(e.migrate_dc_id(), Some(2));
409 }
410
411 #[tokio::test]
414 async fn retry_loop_gives_up_on_no_retries() {
415 let mut rl = RetryLoop::new(Arc::new(NoRetries));
416 let err = rpc(400, "SOMETHING_WRONG", None);
417 let result = rl.advance(err).await;
418 assert!(result.is_err());
419 }
420
421 #[tokio::test]
422 async fn retry_loop_increments_fail_count() {
423 let mut rl = RetryLoop::new(Arc::new(AutoSleep {
424 threshold: Duration::from_secs(60),
425 io_errors_as_flood_of: Some(Duration::from_millis(1)),
426 }));
427 assert!(rl.advance(io_err()).await.is_ok());
428 assert!(rl.advance(io_err()).await.is_err());
429 }
430}