Skip to main content

actionqueue_engine/scheduler/
retry_promotion.rs

1//! RetryWait-to-Ready promotion logic using backoff strategies.
2//!
3//! This module provides functionality for promoting runs from the RetryWait
4//! state to the Ready state when their computed backoff delay has elapsed.
5
6use actionqueue_core::run::run_instance::{RunInstance, RunInstanceError};
7use actionqueue_core::run::state::RunState;
8use actionqueue_executor_local::backoff::BackoffStrategy;
9
10/// Result of promoting RetryWait runs to Ready.
11#[derive(Debug, Clone, PartialEq, Eq)]
12#[must_use]
13pub struct RetryPromotionResult {
14    /// Runs that were promoted from RetryWait to Ready.
15    promoted: Vec<RunInstance>,
16    /// Runs that remain in RetryWait state (backoff delay not yet elapsed).
17    still_waiting: Vec<RunInstance>,
18}
19
20impl RetryPromotionResult {
21    /// Returns the runs that were promoted from RetryWait to Ready.
22    pub fn promoted(&self) -> &[RunInstance] {
23        &self.promoted
24    }
25
26    /// Returns the runs that remain in RetryWait state.
27    pub fn still_waiting(&self) -> &[RunInstance] {
28        &self.still_waiting
29    }
30}
31
32/// Promotes RetryWait runs to Ready based on backoff delay computation.
33///
34/// For each run in `retry_waiting`, the function computes the `retry_ready_at`
35/// timestamp from the run's `attempt_count` and the provided backoff strategy.
36/// Runs whose computed ready time is `<= current_time` are promoted.
37///
38/// # Arguments
39///
40/// * `retry_waiting` - Runs currently in RetryWait state
41/// * `current_time` - The current time according to the scheduler clock
42/// * `strategy` - The backoff strategy used to compute retry delays
43pub fn promote_retry_wait_to_ready(
44    retry_waiting: &[RunInstance],
45    current_time: u64,
46    strategy: &dyn BackoffStrategy,
47) -> Result<RetryPromotionResult, RunInstanceError> {
48    let mut promoted = Vec::new();
49    let mut still_waiting = Vec::new();
50
51    for run in retry_waiting {
52        if run.state() != RunState::RetryWait {
53            return Err(RunInstanceError::InvalidTransition {
54                run_id: run.id(),
55                from: run.state(),
56                to: RunState::Ready,
57            });
58        }
59
60        let attempt_count = run.attempt_count();
61        let retry_wait_entered_at = run.last_state_change_at();
62        let ready_at = actionqueue_executor_local::backoff::retry_ready_at(
63            retry_wait_entered_at,
64            attempt_count,
65            strategy,
66        );
67
68        if ready_at <= current_time {
69            // Zero delay or immediate — promote now
70            let mut ready_run = run.clone();
71            ready_run.transition_to(RunState::Ready)?;
72            promoted.push(ready_run);
73        } else {
74            still_waiting.push(run.clone());
75        }
76    }
77
78    Ok(RetryPromotionResult { promoted, still_waiting })
79}
80
81#[cfg(test)]
82mod tests {
83    use std::time::Duration;
84
85    use actionqueue_core::ids::TaskId;
86    use actionqueue_core::run::run_instance::RunInstance;
87    use actionqueue_core::run::state::RunState;
88    use actionqueue_executor_local::backoff::{ExponentialBackoff, FixedBackoff};
89
90    use super::*;
91
92    fn make_retry_wait_run_at(
93        task_id: TaskId,
94        attempt_count: u32,
95        retry_wait_entered_at: u64,
96    ) -> RunInstance {
97        use actionqueue_core::ids::AttemptId;
98
99        let mut run = RunInstance::new_scheduled(task_id, 0, 0).expect("valid run");
100        run.transition_to(RunState::Ready).expect("valid transition");
101        run.transition_to(RunState::Leased).expect("valid transition");
102        run.transition_to(RunState::Running).expect("valid transition");
103        run.start_attempt(AttemptId::new()).expect("start attempt");
104        run.finish_attempt(run.current_attempt_id().unwrap()).expect("finish attempt");
105        run.transition_to(RunState::RetryWait).expect("valid transition");
106
107        // Simulate additional attempts by cycling through states
108        for _ in 1..attempt_count {
109            run.transition_to(RunState::Ready).expect("valid transition");
110            run.transition_to(RunState::Leased).expect("valid transition");
111            run.transition_to(RunState::Running).expect("valid transition");
112            run.start_attempt(AttemptId::new()).expect("start attempt");
113            run.finish_attempt(run.current_attempt_id().unwrap()).expect("finish attempt");
114            run.transition_to(RunState::RetryWait).expect("valid transition");
115        }
116
117        // Record when the run entered RetryWait (simulates reducer behavior)
118        run.record_state_change_at(retry_wait_entered_at);
119        run
120    }
121
122    #[test]
123    fn fixed_backoff_not_promoted_before_delay_elapses() {
124        let task_id = TaskId::new();
125        // Run entered RetryWait at t=970, delay=30 → ready_at=1000
126        let run = make_retry_wait_run_at(task_id, 1, 970);
127        let strategy = FixedBackoff::new(Duration::from_secs(30));
128
129        // At t=999, delay hasn't elapsed yet
130        let result = promote_retry_wait_to_ready(std::slice::from_ref(&run), 999, &strategy)
131            .expect("promotion should succeed for valid RetryWait runs");
132        assert!(result.promoted().is_empty());
133        assert_eq!(result.still_waiting().len(), 1);
134    }
135
136    #[test]
137    fn fixed_backoff_promoted_after_delay_elapses() {
138        let task_id = TaskId::new();
139        // Run entered RetryWait at t=970, delay=30 → ready_at=1000
140        let run = make_retry_wait_run_at(task_id, 1, 970);
141        let strategy = FixedBackoff::new(Duration::from_secs(30));
142
143        // At t=1000, delay has elapsed (ready_at=1000 <= current_time=1000)
144        let result = promote_retry_wait_to_ready(std::slice::from_ref(&run), 1000, &strategy)
145            .expect("promotion should succeed for valid RetryWait runs");
146        assert_eq!(result.promoted().len(), 1);
147        assert_eq!(result.promoted()[0].state(), RunState::Ready);
148        assert!(result.still_waiting().is_empty());
149    }
150
151    #[test]
152    fn fixed_backoff_zero_delay_promotes_immediately() {
153        let task_id = TaskId::new();
154        let run = make_retry_wait_run_at(task_id, 1, 1000);
155        let strategy = FixedBackoff::new(Duration::from_secs(0));
156
157        // Zero delay: ready_at = 1000 + 0 = 1000, which is <= 1000
158        let result = promote_retry_wait_to_ready(&[run], 1000, &strategy)
159            .expect("promotion should succeed for valid RetryWait runs");
160        assert_eq!(result.promoted().len(), 1);
161        assert_eq!(result.promoted()[0].state(), RunState::Ready);
162        assert!(result.still_waiting().is_empty());
163    }
164
165    #[test]
166    fn exponential_backoff_produces_increasing_delays() {
167        let task_id = TaskId::new();
168        // Both entered RetryWait at t=990
169        let run1 = make_retry_wait_run_at(task_id, 1, 990); // attempt 1
170        let run2 = make_retry_wait_run_at(task_id, 2, 990); // attempt 2
171
172        let strategy =
173            ExponentialBackoff::new(Duration::from_secs(10), Duration::from_secs(3600)).unwrap();
174
175        // At t=999: neither promoted
176        // attempt 1: ready_at = 990 + 10 = 1000 → not yet (1000 > 999)
177        // attempt 2: ready_at = 990 + 20 = 1010 → not yet (1010 > 999)
178        let result = promote_retry_wait_to_ready(&[run1.clone(), run2.clone()], 999, &strategy)
179            .expect("promotion should succeed for valid RetryWait runs");
180        assert!(result.promoted().is_empty());
181        assert_eq!(result.still_waiting().len(), 2);
182
183        // At t=1000: only attempt 1 promoted (1000 <= 1000), attempt 2 still waiting
184        let result = promote_retry_wait_to_ready(&[run1.clone(), run2.clone()], 1000, &strategy)
185            .expect("promotion should succeed for valid RetryWait runs");
186        assert_eq!(result.promoted().len(), 1);
187        assert_eq!(result.still_waiting().len(), 1);
188
189        // At t=1010: both promoted
190        let result = promote_retry_wait_to_ready(&[run1, run2], 1010, &strategy)
191            .expect("promotion should succeed for valid RetryWait runs");
192        assert_eq!(result.promoted().len(), 2);
193        assert!(result.still_waiting().is_empty());
194    }
195
196    #[test]
197    fn empty_input_returns_empty_result() {
198        let strategy = FixedBackoff::new(Duration::from_secs(5));
199        let result = promote_retry_wait_to_ready(&[], 1000, &strategy)
200            .expect("promotion should succeed for empty input");
201        assert!(result.promoted().is_empty());
202        assert!(result.still_waiting().is_empty());
203    }
204
205    #[test]
206    fn non_retry_wait_input_returns_error() {
207        let task_id = TaskId::new();
208        let run = RunInstance::new_scheduled(task_id, 0, 0).expect("valid run");
209        assert_eq!(run.state(), RunState::Scheduled);
210
211        let strategy = FixedBackoff::new(Duration::from_secs(5));
212        let result = promote_retry_wait_to_ready(&[run], 1000, &strategy);
213
214        assert!(
215            matches!(
216                &result,
217                Err(RunInstanceError::InvalidTransition {
218                    from: RunState::Scheduled,
219                    to: RunState::Ready,
220                    ..
221                })
222            ),
223            "expected InvalidTransition for non-RetryWait input, got: {result:?}"
224        );
225    }
226
227    #[test]
228    fn extreme_attempt_count_does_not_panic() {
229        let task_id = TaskId::new();
230        // Build a run with a high attempt count (10 cycles) to exercise
231        // exponential backoff with large attempt numbers. The backoff
232        // computation must saturate instead of panicking.
233        let run = make_retry_wait_run_at(task_id, 10, 0);
234        assert_eq!(run.attempt_count(), 10);
235        assert_eq!(run.state(), RunState::RetryWait);
236
237        let strategy =
238            ExponentialBackoff::new(Duration::from_secs(1), Duration::from_secs(3600)).unwrap();
239
240        // With current_time=u64::MAX, even a saturating backoff delay should
241        // be <= u64::MAX, so the run must be promoted.
242        let result = promote_retry_wait_to_ready(std::slice::from_ref(&run), u64::MAX, &strategy)
243            .expect("extreme attempt count must not panic");
244
245        assert_eq!(result.promoted().len(), 1);
246        assert_eq!(result.promoted()[0].state(), RunState::Ready);
247        assert!(result.still_waiting().is_empty());
248    }
249}