use crate::jobs::config::{BackoffStrategy, RetryConfig};
use crate::jobs::error::JobsError;
use backon::{ConstantBuilder, ExponentialBuilder, FibonacciBuilder, Retryable};
use std::future::Future;
use std::time::Duration;
pub struct RetryExecutor {
config: RetryConfig,
}
impl RetryExecutor {
pub fn new(config: RetryConfig) -> Self {
Self { config }
}
pub fn default_config() -> Self {
Self::new(RetryConfig::default())
}
pub async fn execute<F, Fut, T, E>(&self, operation: F) -> Result<T, JobsError>
where
F: FnMut() -> Fut,
Fut: Future<Output = Result<T, E>>,
E: std::fmt::Display,
{
match self.config.backoff {
BackoffStrategy::Constant => self.execute_constant(operation).await,
BackoffStrategy::Linear => self.execute_linear(operation).await,
BackoffStrategy::Exponential => self.execute_exponential(operation).await,
}
}
async fn execute_constant<F, Fut, T, E>(&self, operation: F) -> Result<T, JobsError>
where
F: FnMut() -> Fut,
Fut: Future<Output = Result<T, E>>,
E: std::fmt::Display,
{
let mut builder = ConstantBuilder::default()
.with_delay(self.config.initial_delay)
.with_max_times(self.config.max_attempts.saturating_sub(1) as usize);
if self.config.jitter {
builder = builder.with_jitter();
}
operation
.retry(builder)
.await
.map_err(|e| JobsError::ExecutionFailed {
name: "retry".to_string(),
reason: e.to_string(),
})
}
async fn execute_linear<F, Fut, T, E>(&self, operation: F) -> Result<T, JobsError>
where
F: FnMut() -> Fut,
Fut: Future<Output = Result<T, E>>,
E: std::fmt::Display,
{
let mut builder = FibonacciBuilder::default()
.with_min_delay(self.config.initial_delay)
.with_max_delay(self.config.max_delay)
.with_max_times(self.config.max_attempts.saturating_sub(1) as usize);
if self.config.jitter {
builder = builder.with_jitter();
}
operation
.retry(builder)
.await
.map_err(|e| JobsError::ExecutionFailed {
name: "retry".to_string(),
reason: e.to_string(),
})
}
async fn execute_exponential<F, Fut, T, E>(&self, operation: F) -> Result<T, JobsError>
where
F: FnMut() -> Fut,
Fut: Future<Output = Result<T, E>>,
E: std::fmt::Display,
{
let mut builder = ExponentialBuilder::default()
.with_min_delay(self.config.initial_delay)
.with_max_delay(self.config.max_delay)
.with_max_times(self.config.max_attempts.saturating_sub(1) as usize)
.with_factor(2.0);
if self.config.jitter {
builder = builder.with_jitter();
}
operation
.retry(builder)
.await
.map_err(|e| JobsError::ExecutionFailed {
name: "retry".to_string(),
reason: e.to_string(),
})
}
pub fn max_attempts(&self) -> u32 {
self.config.max_attempts
}
pub fn strategy(&self) -> BackoffStrategy {
self.config.backoff
}
pub fn has_jitter(&self) -> bool {
self.config.jitter
}
}
pub fn calculate_delay(config: &RetryConfig, attempt: u32) -> Duration {
if attempt == 0 {
return Duration::ZERO;
}
let base_delay = match config.backoff {
BackoffStrategy::Constant => config.initial_delay,
BackoffStrategy::Linear => {
Duration::from_millis(config.initial_delay.as_millis() as u64 * attempt as u64)
}
BackoffStrategy::Exponential => {
let multiplier = 2u64.saturating_pow(attempt - 1);
Duration::from_millis(config.initial_delay.as_millis() as u64 * multiplier)
}
};
base_delay.min(config.max_delay)
}
pub fn is_retryable_error(error: &JobsError) -> bool {
match error {
JobsError::InvalidJobDefinition { .. } => false,
JobsError::DuplicateJobName { .. } => false,
JobsError::InvalidCronExpression { .. } => false,
JobsError::TriggerConfigError { .. } => false,
JobsError::InvalidNotificationConfig { .. } => false,
JobsError::ConfigParseError { .. } => false,
JobsError::InvalidConfig { .. } => false,
JobsError::WorkflowNotFound { .. } => false,
JobsError::JobNotFound { .. } => false,
JobsError::ConfigNotFound { .. } => false,
JobsError::DaemonStartFailed { .. } => true,
JobsError::ExecutionFailed { .. } => true,
JobsError::ExecutionTimeout { .. } => true,
JobsError::WebhookServerError { .. } => true,
JobsError::DatabaseError { .. } => true,
JobsError::NotificationFailed { .. } => true,
JobsError::IoError { .. } => true,
JobsError::FileSystemError { .. } => true,
JobsError::MaxRetriesExceeded { .. } => false, JobsError::JobAlreadyRunning { .. } => false, JobsError::DaemonAlreadyRunning { .. } => false,
JobsError::DaemonNotRunning => false,
JobsError::CannotCancelRunning { .. } => false,
JobsError::ChannelClosed => false,
JobsError::DatabaseOpenFailed { .. } => true,
JobsError::SerializationError { .. } => false,
JobsError::MigrationError { .. } => false,
JobsError::PidFileError { .. } => true,
JobsError::WatchPathError { .. } => true,
JobsError::ExecutionNotFound { .. } => false,
JobsError::DaemonStopFailed { .. } => true,
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
#[test]
fn test_calculate_delay_constant() {
let config = RetryConfig {
backoff: BackoffStrategy::Constant,
initial_delay: Duration::from_millis(100),
max_delay: Duration::from_secs(10),
max_attempts: 3,
jitter: false,
};
assert_eq!(calculate_delay(&config, 0), Duration::ZERO);
assert_eq!(calculate_delay(&config, 1), Duration::from_millis(100));
assert_eq!(calculate_delay(&config, 2), Duration::from_millis(100));
assert_eq!(calculate_delay(&config, 3), Duration::from_millis(100));
}
#[test]
fn test_calculate_delay_linear() {
let config = RetryConfig {
backoff: BackoffStrategy::Linear,
initial_delay: Duration::from_millis(100),
max_delay: Duration::from_secs(10),
max_attempts: 5,
jitter: false,
};
assert_eq!(calculate_delay(&config, 0), Duration::ZERO);
assert_eq!(calculate_delay(&config, 1), Duration::from_millis(100));
assert_eq!(calculate_delay(&config, 2), Duration::from_millis(200));
assert_eq!(calculate_delay(&config, 3), Duration::from_millis(300));
}
#[test]
fn test_calculate_delay_exponential() {
let config = RetryConfig {
backoff: BackoffStrategy::Exponential,
initial_delay: Duration::from_millis(100),
max_delay: Duration::from_secs(10),
max_attempts: 5,
jitter: false,
};
assert_eq!(calculate_delay(&config, 0), Duration::ZERO);
assert_eq!(calculate_delay(&config, 1), Duration::from_millis(100));
assert_eq!(calculate_delay(&config, 2), Duration::from_millis(200));
assert_eq!(calculate_delay(&config, 3), Duration::from_millis(400));
assert_eq!(calculate_delay(&config, 4), Duration::from_millis(800));
}
#[test]
fn test_calculate_delay_respects_max() {
let config = RetryConfig {
backoff: BackoffStrategy::Exponential,
initial_delay: Duration::from_secs(1),
max_delay: Duration::from_secs(5),
max_attempts: 10,
jitter: false,
};
assert_eq!(calculate_delay(&config, 10), Duration::from_secs(5));
}
#[test]
fn test_is_retryable_error_transient() {
assert!(is_retryable_error(&JobsError::ExecutionFailed {
name: "test".to_string(),
reason: "timeout".to_string(),
}));
assert!(is_retryable_error(&JobsError::IoError {
reason: "connection reset".to_string(),
}));
}
#[test]
fn test_is_retryable_error_permanent() {
assert!(!is_retryable_error(&JobsError::JobNotFound {
name: "test".to_string(),
}));
assert!(!is_retryable_error(&JobsError::InvalidCronExpression {
expression: "bad".to_string(),
}));
}
#[tokio::test]
async fn test_retry_executor_success_first_try() {
let executor = RetryExecutor::new(RetryConfig {
max_attempts: 3,
backoff: BackoffStrategy::Constant,
initial_delay: Duration::from_millis(10),
max_delay: Duration::from_secs(1),
jitter: false,
});
let result: Result<i32, JobsError> =
executor.execute(|| async { Ok::<_, String>(42) }).await;
assert_eq!(result.unwrap(), 42);
}
#[tokio::test]
async fn test_retry_executor_success_after_retries() {
let executor = RetryExecutor::new(RetryConfig {
max_attempts: 3,
backoff: BackoffStrategy::Constant,
initial_delay: Duration::from_millis(1),
max_delay: Duration::from_secs(1),
jitter: false,
});
let attempts = Arc::new(AtomicU32::new(0));
let attempts_clone = Arc::clone(&attempts);
let result: Result<i32, JobsError> = executor
.execute(|| {
let attempts = Arc::clone(&attempts_clone);
async move {
let current = attempts.fetch_add(1, Ordering::SeqCst);
if current < 2 {
Err("not yet".to_string())
} else {
Ok(42)
}
}
})
.await;
assert_eq!(result.unwrap(), 42);
assert_eq!(attempts.load(Ordering::SeqCst), 3);
}
#[tokio::test]
async fn test_retry_executor_exhausted() {
let executor = RetryExecutor::new(RetryConfig {
max_attempts: 2,
backoff: BackoffStrategy::Constant,
initial_delay: Duration::from_millis(1),
max_delay: Duration::from_secs(1),
jitter: false,
});
let attempts = Arc::new(AtomicU32::new(0));
let attempts_clone = Arc::clone(&attempts);
let result: Result<i32, JobsError> = executor
.execute(|| {
let attempts = Arc::clone(&attempts_clone);
async move {
attempts.fetch_add(1, Ordering::SeqCst);
Err::<i32, _>("always fails".to_string())
}
})
.await;
assert!(result.is_err());
assert_eq!(attempts.load(Ordering::SeqCst), 2);
}
#[test]
fn test_retry_executor_config_accessors() {
let config = RetryConfig {
max_attempts: 5,
backoff: BackoffStrategy::Linear,
initial_delay: Duration::from_millis(100),
max_delay: Duration::from_secs(10),
jitter: true,
};
let executor = RetryExecutor::new(config);
assert_eq!(executor.max_attempts(), 5);
assert_eq!(executor.strategy(), BackoffStrategy::Linear);
assert!(executor.has_jitter());
}
}