use std::time::Duration;
use d_engine_core::BackoffPolicy;
use d_engine_core::NetworkError;
use d_engine_core::Result;
use tokio::time::sleep;
use tokio::time::timeout;
use tracing::debug;
use tracing::error;
use tracing::warn;
pub(crate) async fn task_with_timeout_and_exponential_backoff<F, T, P>(
task: F,
policy: BackoffPolicy,
) -> Result<P>
where
F: Fn() -> T, T: std::future::Future<Output = Result<P>>, {
let mut retries = 0;
let mut current_delay = Duration::from_millis(policy.base_delay_ms);
let timeout_duration = Duration::from_millis(policy.timeout_ms);
let max_delay = Duration::from_millis(policy.max_delay_ms);
let max_retries = policy.max_retries;
let mut last_error =
NetworkError::TaskBackoffFailed("Task failed after max retries".to_string());
while retries < max_retries {
match timeout(timeout_duration, task()).await {
Ok(Ok(r)) => {
return Ok(r); }
Ok(Err(error)) => {
warn!(?error, "failed with error.");
last_error =
NetworkError::TaskBackoffFailed(format!("failed with error: {:?}", &error));
}
Err(error) => {
warn!(?timeout_duration, ?error, "Task timed out");
last_error = NetworkError::RetryTimeoutError(timeout_duration);
}
};
if retries < max_retries - 1 {
debug!("Retrying in {:?}...", current_delay);
sleep(current_delay).await;
current_delay = (current_delay * 2).min(max_delay);
} else {
warn!("Task failed after {} retries", retries);
}
retries += 1;
}
warn!("Task failed after {} retries", max_retries);
Err(last_error.into()) }
pub(crate) async fn spawn_task<F, Fut>(
name: &str,
task_fn: F,
handles: Option<&mut Vec<tokio::task::JoinHandle<()>>>,
) where
F: FnOnce() -> Fut + Send + 'static,
Fut: std::future::Future<Output = Result<()>> + Send + 'static,
{
let name = name.to_string();
let handle = tokio::spawn(async move {
if let Err(e) = task_fn().await {
error!(
"spawned task: {name} stopped or encountered an error: {:?}",
e
);
}
});
if let Some(h) = handles {
h.push(handle);
}
}