1use crate::types::{LastFmError, RetryConfig, RetryResult};
2use crate::Result;
3use std::future::Future;
4use std::time::{Instant, SystemTime, UNIX_EPOCH};
5
6pub async fn retry_with_backoff<T, F, Fut, OnRateLimit, OnRateLimitEnd>(
21 config: RetryConfig,
22 operation_name: &str,
23 mut operation: F,
24 mut on_rate_limit: OnRateLimit,
25 mut on_rate_limit_end: OnRateLimitEnd,
26) -> Result<RetryResult<T>>
27where
28 F: FnMut() -> Fut,
29 Fut: Future<Output = Result<T>>,
30 OnRateLimit: FnMut(u64, u64, &str),
31 OnRateLimitEnd: FnMut(u64, &str),
32{
33 let mut retries = 0;
34 let mut total_retry_time = 0;
35 let mut rate_limit_start_time: Option<Instant> = None;
36
37 loop {
38 match operation().await {
39 Ok(result) => {
40 if let Some(start_time) = rate_limit_start_time {
42 let total_duration = start_time.elapsed().as_secs();
43 on_rate_limit_end(total_duration, operation_name);
44 }
45
46 return Ok(RetryResult {
47 result,
48 attempts_made: retries,
49 total_retry_time,
50 });
51 }
52 Err(LastFmError::RateLimit { retry_after }) => {
53 if rate_limit_start_time.is_none() {
55 rate_limit_start_time = Some(Instant::now());
56 }
57
58 if !config.enabled || retries >= config.max_retries {
59 if !config.enabled {
60 log::debug!("Retries disabled for {operation_name} operation");
61 } else {
62 log::warn!(
63 "Max retries ({}) exceeded for {operation_name} operation",
64 config.max_retries
65 );
66 }
67 return Err(LastFmError::RateLimit { retry_after });
68 }
69
70 let base_backoff = config.base_delay * 2_u64.pow(retries);
72 let delay = std::cmp::min(
73 std::cmp::min(retry_after + base_backoff, config.max_delay),
74 retry_after + (retries as u64 * 30), );
76
77 log::info!(
78 "{} rate limited. Waiting {} seconds before retry {} of {}",
79 operation_name,
80 delay,
81 retries + 1,
82 config.max_retries
83 );
84
85 let timestamp = SystemTime::now()
87 .duration_since(UNIX_EPOCH)
88 .unwrap_or_default()
89 .as_secs();
90 on_rate_limit(delay, timestamp, operation_name);
91
92 tokio::time::sleep(std::time::Duration::from_secs(delay)).await;
93 retries += 1;
94 total_retry_time += delay;
95 }
96 Err(other_error) => {
97 return Err(other_error);
98 }
99 }
100 }
101}
102
103pub async fn retry_operation<T, F, Fut>(
105 config: RetryConfig,
106 operation_name: &str,
107 operation: F,
108) -> Result<RetryResult<T>>
109where
110 F: FnMut() -> Fut,
111 Fut: Future<Output = Result<T>>,
112{
113 retry_with_backoff(
114 config,
115 operation_name,
116 operation,
117 |delay, timestamp, op_name| {
118 log::debug!(
119 "Rate limited during {op_name}: waiting {delay} seconds (at timestamp {timestamp})"
120 );
121 },
122 |duration, op_name| {
123 log::debug!("Rate limiting ended for {op_name} after {duration} seconds");
124 },
125 )
126 .await
127}
128
129#[cfg(test)]
130mod tests {
131 use super::*;
132 use std::sync::atomic::{AtomicU32, Ordering};
133 use std::sync::Arc;
134
135 #[tokio::test]
136 async fn test_successful_operation() {
137 let config = RetryConfig {
138 max_retries: 3,
139 base_delay: 1,
140 max_delay: 60,
141 enabled: true,
142 };
143
144 let result = retry_operation(config, "test", || async { Ok::<i32, LastFmError>(42) }).await;
145
146 assert!(result.is_ok());
147 let retry_result = result.unwrap();
148 assert_eq!(retry_result.result, 42);
149 assert_eq!(retry_result.attempts_made, 0);
150 assert_eq!(retry_result.total_retry_time, 0);
151 }
152
153 #[tokio::test]
154 async fn test_retry_on_rate_limit() {
155 let config = RetryConfig {
156 max_retries: 2,
157 base_delay: 1,
158 max_delay: 60,
159 enabled: true,
160 };
161
162 let call_count = Arc::new(AtomicU32::new(0));
163 let call_count_clone = call_count.clone();
164
165 let result = retry_operation(config, "test", move || {
166 let count = call_count_clone.fetch_add(1, Ordering::SeqCst);
167 async move {
168 if count < 2 {
169 Err(LastFmError::RateLimit { retry_after: 1 })
170 } else {
171 Ok::<i32, LastFmError>(42)
172 }
173 }
174 })
175 .await;
176
177 assert!(result.is_ok());
178 let retry_result = result.unwrap();
179 assert_eq!(retry_result.result, 42);
180 assert_eq!(retry_result.attempts_made, 2);
181 assert!(retry_result.total_retry_time >= 2); }
183
184 #[tokio::test]
185 async fn test_max_retries_exceeded() {
186 let config = RetryConfig {
187 max_retries: 1,
188 base_delay: 1,
189 max_delay: 60,
190 enabled: true,
191 };
192
193 let result = retry_operation(config, "test", || async {
194 Err::<i32, LastFmError>(LastFmError::RateLimit { retry_after: 1 })
195 })
196 .await;
197
198 assert!(result.is_err());
199 match result.unwrap_err() {
200 LastFmError::RateLimit { .. } => {} other => panic!("Expected rate limit error, got: {other:?}"),
202 }
203 }
204
205 #[tokio::test]
206 async fn test_retries_disabled() {
207 let config = RetryConfig::disabled();
208
209 let result = retry_operation(config, "test", || async {
210 Err::<i32, LastFmError>(LastFmError::RateLimit { retry_after: 1 })
211 })
212 .await;
213
214 assert!(result.is_err());
215 match result.unwrap_err() {
216 LastFmError::RateLimit { .. } => {} other => panic!("Expected rate limit error, got: {other:?}"),
218 }
219 }
220}