use std::{
collections::HashMap,
sync::Arc,
time::{Duration, Instant},
};
use reqwest::{Client, Request, Response};
use tokio::{
sync::{oneshot, Mutex},
time::sleep,
};
pub struct EventualRequest<T: Sync + Send + Sized> {
id: Option<usize>,
request: Option<Request>,
response: Option<Response>,
queued: Option<Instant>, retries: usize, requested: Option<Instant>, responsed: Option<Instant>,
in_queue: Duration,
latency: Duration,
pub metadata: Option<T>, }
impl<T: Sync + Send + Sized> EventualRequest<T> {
pub fn new(request: impl Into<Request>, metadata: Option<T>) -> Self {
EventualRequest {
id: None,
request: Some(request.into()),
response: None,
queued: None,
retries: 0,
requested: None,
responsed: None,
in_queue: Duration::ZERO,
latency: Duration::ZERO,
metadata,
}
}
pub async fn register(
&mut self,
id: usize,
ch: Arc<Mutex<HashMap<usize, oneshot::Sender<EventualRequest<T>>>>>,
) -> oneshot::Receiver<EventualRequest<T>> {
self.id = Some(id);
let (sender, receiver) = oneshot::channel();
ch.lock().await.insert(id, sender);
receiver
}
pub fn get_id(&self) -> Option<usize> {
self.id
}
pub fn request(&self) -> Option<&Request> {
self.request.as_ref()
}
pub fn time_queue_start(&mut self) {
if self.queued.is_some() {
self.time_queue_stop();
}
self.queued = Some(Instant::now())
}
pub fn time_queue_stop(&mut self) {
self.in_queue += Instant::now().duration_since(self.queued.unwrap());
self.queued = None;
}
pub fn time_request_start(&mut self) {
if self.requested.is_some() {
self.time_request_stop();
}
self.requested = Some(Instant::now())
}
pub fn time_request_stop(&mut self) {
self.responsed = Some(Instant::now());
self.latency += self
.responsed
.unwrap()
.duration_since(self.requested.unwrap());
}
fn extract_retry_delay(&self, response: &Response) -> Option<Duration> {
if let Some(retry_after) = response.headers().get("retry-after") {
if let Ok(retry_str) = retry_after.to_str() {
if let Ok(retry_secs) = retry_str.parse::<u64>() {
if retry_secs >= 1 {
return Some(Duration::from_secs(retry_secs));
}
}
}
}
None
}
fn calculate_backoff_delay(&self) -> Duration {
let delay_ms = (50.0 * 1.2_f64.powi(self.retries as i32)).min(10000.0) as u64;
Duration::from_millis(delay_ms)
}
pub async fn execute(&mut self, client: &Client) -> EventualRequestResult {
let Some(request) = self.request.take() else {
return EventualRequestResult::Error("Missing Request".to_string());
};
self.request = request.try_clone();
match client.execute(request).await {
Ok(response) if response.status() == 429 => {
self.retries += 1;
let delay = self
.extract_retry_delay(&response)
.unwrap_or_else(|| self.calculate_backoff_delay());
self.response = Some(response);
eprintln!(
"Received 429, retrying... attempt {} (delay: {:?})",
self.retries, delay
);
sleep(delay).await;
EventualRequestResult::Retry
}
Ok(response) if response.status().is_server_error() => {
self.retries += 1;
let status = response.status();
let delay = self.calculate_backoff_delay();
self.response = Some(response);
sleep(delay).await;
eprintln!(
"Received 5xx error ({}), retrying... attempt {} (delay: {:?})",
status, self.retries, delay
);
EventualRequestResult::Retry
}
Ok(response) if response.status().is_success() => {
self.response = Some(response);
EventualRequestResult::Success
}
Ok(response) => {
let status = response.status();
let error = format!("Status {} returned", status);
self.response = Some(response);
EventualRequestResult::Error(error)
}
Err(err) => {
self.retries += 1;
let delay = self.calculate_backoff_delay();
sleep(delay).await;
eprintln!(
"Network error: {}, retrying... attempt {} (delay: {:?})",
err, self.retries, delay
);
EventualRequestResult::Retry
}
}
}
pub fn response(&mut self) -> Option<Response> {
self.response.take()
}
}
pub enum EventualRequestResult {
Success,
Retry,
Error(String),
}