use actionqueue_core::run::run_instance::{RunInstance, RunInstanceError};
use actionqueue_core::run::state::RunState;
use actionqueue_executor_local::backoff::BackoffStrategy;
#[derive(Debug, Clone, PartialEq, Eq)]
#[must_use]
pub struct RetryPromotionResult {
promoted: Vec<RunInstance>,
still_waiting: Vec<RunInstance>,
}
impl RetryPromotionResult {
pub fn promoted(&self) -> &[RunInstance] {
&self.promoted
}
pub fn still_waiting(&self) -> &[RunInstance] {
&self.still_waiting
}
}
pub fn promote_retry_wait_to_ready(
retry_waiting: &[RunInstance],
current_time: u64,
strategy: &dyn BackoffStrategy,
) -> Result<RetryPromotionResult, RunInstanceError> {
let mut promoted = Vec::new();
let mut still_waiting = Vec::new();
for run in retry_waiting {
if run.state() != RunState::RetryWait {
return Err(RunInstanceError::InvalidTransition {
run_id: run.id(),
from: run.state(),
to: RunState::Ready,
});
}
let attempt_count = run.attempt_count();
let retry_wait_entered_at = run.last_state_change_at();
let ready_at = actionqueue_executor_local::backoff::retry_ready_at(
retry_wait_entered_at,
attempt_count,
strategy,
);
if ready_at <= current_time {
let mut ready_run = run.clone();
ready_run.transition_to(RunState::Ready)?;
promoted.push(ready_run);
} else {
still_waiting.push(run.clone());
}
}
Ok(RetryPromotionResult { promoted, still_waiting })
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use actionqueue_core::ids::TaskId;
use actionqueue_core::run::run_instance::RunInstance;
use actionqueue_core::run::state::RunState;
use actionqueue_executor_local::backoff::{ExponentialBackoff, FixedBackoff};
use super::*;
fn make_retry_wait_run_at(
task_id: TaskId,
attempt_count: u32,
retry_wait_entered_at: u64,
) -> RunInstance {
use actionqueue_core::ids::AttemptId;
let mut run = RunInstance::new_scheduled(task_id, 0, 0).expect("valid run");
run.transition_to(RunState::Ready).expect("valid transition");
run.transition_to(RunState::Leased).expect("valid transition");
run.transition_to(RunState::Running).expect("valid transition");
run.start_attempt(AttemptId::new()).expect("start attempt");
run.finish_attempt(run.current_attempt_id().unwrap()).expect("finish attempt");
run.transition_to(RunState::RetryWait).expect("valid transition");
for _ in 1..attempt_count {
run.transition_to(RunState::Ready).expect("valid transition");
run.transition_to(RunState::Leased).expect("valid transition");
run.transition_to(RunState::Running).expect("valid transition");
run.start_attempt(AttemptId::new()).expect("start attempt");
run.finish_attempt(run.current_attempt_id().unwrap()).expect("finish attempt");
run.transition_to(RunState::RetryWait).expect("valid transition");
}
run.record_state_change_at(retry_wait_entered_at);
run
}
#[test]
fn fixed_backoff_not_promoted_before_delay_elapses() {
let task_id = TaskId::new();
let run = make_retry_wait_run_at(task_id, 1, 970);
let strategy = FixedBackoff::new(Duration::from_secs(30));
let result = promote_retry_wait_to_ready(std::slice::from_ref(&run), 999, &strategy)
.expect("promotion should succeed for valid RetryWait runs");
assert!(result.promoted().is_empty());
assert_eq!(result.still_waiting().len(), 1);
}
#[test]
fn fixed_backoff_promoted_after_delay_elapses() {
let task_id = TaskId::new();
let run = make_retry_wait_run_at(task_id, 1, 970);
let strategy = FixedBackoff::new(Duration::from_secs(30));
let result = promote_retry_wait_to_ready(std::slice::from_ref(&run), 1000, &strategy)
.expect("promotion should succeed for valid RetryWait runs");
assert_eq!(result.promoted().len(), 1);
assert_eq!(result.promoted()[0].state(), RunState::Ready);
assert!(result.still_waiting().is_empty());
}
#[test]
fn fixed_backoff_zero_delay_promotes_immediately() {
let task_id = TaskId::new();
let run = make_retry_wait_run_at(task_id, 1, 1000);
let strategy = FixedBackoff::new(Duration::from_secs(0));
let result = promote_retry_wait_to_ready(&[run], 1000, &strategy)
.expect("promotion should succeed for valid RetryWait runs");
assert_eq!(result.promoted().len(), 1);
assert_eq!(result.promoted()[0].state(), RunState::Ready);
assert!(result.still_waiting().is_empty());
}
#[test]
fn exponential_backoff_produces_increasing_delays() {
let task_id = TaskId::new();
let run1 = make_retry_wait_run_at(task_id, 1, 990); let run2 = make_retry_wait_run_at(task_id, 2, 990);
let strategy =
ExponentialBackoff::new(Duration::from_secs(10), Duration::from_secs(3600)).unwrap();
let result = promote_retry_wait_to_ready(&[run1.clone(), run2.clone()], 999, &strategy)
.expect("promotion should succeed for valid RetryWait runs");
assert!(result.promoted().is_empty());
assert_eq!(result.still_waiting().len(), 2);
let result = promote_retry_wait_to_ready(&[run1.clone(), run2.clone()], 1000, &strategy)
.expect("promotion should succeed for valid RetryWait runs");
assert_eq!(result.promoted().len(), 1);
assert_eq!(result.still_waiting().len(), 1);
let result = promote_retry_wait_to_ready(&[run1, run2], 1010, &strategy)
.expect("promotion should succeed for valid RetryWait runs");
assert_eq!(result.promoted().len(), 2);
assert!(result.still_waiting().is_empty());
}
#[test]
fn empty_input_returns_empty_result() {
let strategy = FixedBackoff::new(Duration::from_secs(5));
let result = promote_retry_wait_to_ready(&[], 1000, &strategy)
.expect("promotion should succeed for empty input");
assert!(result.promoted().is_empty());
assert!(result.still_waiting().is_empty());
}
#[test]
fn non_retry_wait_input_returns_error() {
let task_id = TaskId::new();
let run = RunInstance::new_scheduled(task_id, 0, 0).expect("valid run");
assert_eq!(run.state(), RunState::Scheduled);
let strategy = FixedBackoff::new(Duration::from_secs(5));
let result = promote_retry_wait_to_ready(&[run], 1000, &strategy);
assert!(
matches!(
&result,
Err(RunInstanceError::InvalidTransition {
from: RunState::Scheduled,
to: RunState::Ready,
..
})
),
"expected InvalidTransition for non-RetryWait input, got: {result:?}"
);
}
#[test]
fn extreme_attempt_count_does_not_panic() {
let task_id = TaskId::new();
let run = make_retry_wait_run_at(task_id, 10, 0);
assert_eq!(run.attempt_count(), 10);
assert_eq!(run.state(), RunState::RetryWait);
let strategy =
ExponentialBackoff::new(Duration::from_secs(1), Duration::from_secs(3600)).unwrap();
let result = promote_retry_wait_to_ready(std::slice::from_ref(&run), u64::MAX, &strategy)
.expect("extreme attempt count must not panic");
assert_eq!(result.promoted().len(), 1);
assert_eq!(result.promoted()[0].state(), RunState::Ready);
assert!(result.still_waiting().is_empty());
}
}