actionqueue_engine/scheduler/
retry_promotion.rs1use actionqueue_core::run::run_instance::{RunInstance, RunInstanceError};
7use actionqueue_core::run::state::RunState;
8use actionqueue_executor_local::backoff::BackoffStrategy;
9
10#[derive(Debug, Clone, PartialEq, Eq)]
12#[must_use]
13pub struct RetryPromotionResult {
14 promoted: Vec<RunInstance>,
16 still_waiting: Vec<RunInstance>,
18}
19
20impl RetryPromotionResult {
21 pub fn promoted(&self) -> &[RunInstance] {
23 &self.promoted
24 }
25
26 pub fn still_waiting(&self) -> &[RunInstance] {
28 &self.still_waiting
29 }
30}
31
32pub fn promote_retry_wait_to_ready(
44 retry_waiting: &[RunInstance],
45 current_time: u64,
46 strategy: &dyn BackoffStrategy,
47) -> Result<RetryPromotionResult, RunInstanceError> {
48 let mut promoted = Vec::new();
49 let mut still_waiting = Vec::new();
50
51 for run in retry_waiting {
52 if run.state() != RunState::RetryWait {
53 return Err(RunInstanceError::InvalidTransition {
54 run_id: run.id(),
55 from: run.state(),
56 to: RunState::Ready,
57 });
58 }
59
60 let attempt_count = run.attempt_count();
61 let retry_wait_entered_at = run.last_state_change_at();
62 let ready_at = actionqueue_executor_local::backoff::retry_ready_at(
63 retry_wait_entered_at,
64 attempt_count,
65 strategy,
66 );
67
68 if ready_at <= current_time {
69 let mut ready_run = run.clone();
71 ready_run.transition_to(RunState::Ready)?;
72 promoted.push(ready_run);
73 } else {
74 still_waiting.push(run.clone());
75 }
76 }
77
78 Ok(RetryPromotionResult { promoted, still_waiting })
79}
80
81#[cfg(test)]
82mod tests {
83 use std::time::Duration;
84
85 use actionqueue_core::ids::TaskId;
86 use actionqueue_core::run::run_instance::RunInstance;
87 use actionqueue_core::run::state::RunState;
88 use actionqueue_executor_local::backoff::{ExponentialBackoff, FixedBackoff};
89
90 use super::*;
91
92 fn make_retry_wait_run_at(
93 task_id: TaskId,
94 attempt_count: u32,
95 retry_wait_entered_at: u64,
96 ) -> RunInstance {
97 use actionqueue_core::ids::AttemptId;
98
99 let mut run = RunInstance::new_scheduled(task_id, 0, 0).expect("valid run");
100 run.transition_to(RunState::Ready).expect("valid transition");
101 run.transition_to(RunState::Leased).expect("valid transition");
102 run.transition_to(RunState::Running).expect("valid transition");
103 run.start_attempt(AttemptId::new()).expect("start attempt");
104 run.finish_attempt(run.current_attempt_id().unwrap()).expect("finish attempt");
105 run.transition_to(RunState::RetryWait).expect("valid transition");
106
107 for _ in 1..attempt_count {
109 run.transition_to(RunState::Ready).expect("valid transition");
110 run.transition_to(RunState::Leased).expect("valid transition");
111 run.transition_to(RunState::Running).expect("valid transition");
112 run.start_attempt(AttemptId::new()).expect("start attempt");
113 run.finish_attempt(run.current_attempt_id().unwrap()).expect("finish attempt");
114 run.transition_to(RunState::RetryWait).expect("valid transition");
115 }
116
117 run.record_state_change_at(retry_wait_entered_at);
119 run
120 }
121
122 #[test]
123 fn fixed_backoff_not_promoted_before_delay_elapses() {
124 let task_id = TaskId::new();
125 let run = make_retry_wait_run_at(task_id, 1, 970);
127 let strategy = FixedBackoff::new(Duration::from_secs(30));
128
129 let result = promote_retry_wait_to_ready(std::slice::from_ref(&run), 999, &strategy)
131 .expect("promotion should succeed for valid RetryWait runs");
132 assert!(result.promoted().is_empty());
133 assert_eq!(result.still_waiting().len(), 1);
134 }
135
136 #[test]
137 fn fixed_backoff_promoted_after_delay_elapses() {
138 let task_id = TaskId::new();
139 let run = make_retry_wait_run_at(task_id, 1, 970);
141 let strategy = FixedBackoff::new(Duration::from_secs(30));
142
143 let result = promote_retry_wait_to_ready(std::slice::from_ref(&run), 1000, &strategy)
145 .expect("promotion should succeed for valid RetryWait runs");
146 assert_eq!(result.promoted().len(), 1);
147 assert_eq!(result.promoted()[0].state(), RunState::Ready);
148 assert!(result.still_waiting().is_empty());
149 }
150
151 #[test]
152 fn fixed_backoff_zero_delay_promotes_immediately() {
153 let task_id = TaskId::new();
154 let run = make_retry_wait_run_at(task_id, 1, 1000);
155 let strategy = FixedBackoff::new(Duration::from_secs(0));
156
157 let result = promote_retry_wait_to_ready(&[run], 1000, &strategy)
159 .expect("promotion should succeed for valid RetryWait runs");
160 assert_eq!(result.promoted().len(), 1);
161 assert_eq!(result.promoted()[0].state(), RunState::Ready);
162 assert!(result.still_waiting().is_empty());
163 }
164
165 #[test]
166 fn exponential_backoff_produces_increasing_delays() {
167 let task_id = TaskId::new();
168 let run1 = make_retry_wait_run_at(task_id, 1, 990); let run2 = make_retry_wait_run_at(task_id, 2, 990); let strategy =
173 ExponentialBackoff::new(Duration::from_secs(10), Duration::from_secs(3600)).unwrap();
174
175 let result = promote_retry_wait_to_ready(&[run1.clone(), run2.clone()], 999, &strategy)
179 .expect("promotion should succeed for valid RetryWait runs");
180 assert!(result.promoted().is_empty());
181 assert_eq!(result.still_waiting().len(), 2);
182
183 let result = promote_retry_wait_to_ready(&[run1.clone(), run2.clone()], 1000, &strategy)
185 .expect("promotion should succeed for valid RetryWait runs");
186 assert_eq!(result.promoted().len(), 1);
187 assert_eq!(result.still_waiting().len(), 1);
188
189 let result = promote_retry_wait_to_ready(&[run1, run2], 1010, &strategy)
191 .expect("promotion should succeed for valid RetryWait runs");
192 assert_eq!(result.promoted().len(), 2);
193 assert!(result.still_waiting().is_empty());
194 }
195
196 #[test]
197 fn empty_input_returns_empty_result() {
198 let strategy = FixedBackoff::new(Duration::from_secs(5));
199 let result = promote_retry_wait_to_ready(&[], 1000, &strategy)
200 .expect("promotion should succeed for empty input");
201 assert!(result.promoted().is_empty());
202 assert!(result.still_waiting().is_empty());
203 }
204
205 #[test]
206 fn non_retry_wait_input_returns_error() {
207 let task_id = TaskId::new();
208 let run = RunInstance::new_scheduled(task_id, 0, 0).expect("valid run");
209 assert_eq!(run.state(), RunState::Scheduled);
210
211 let strategy = FixedBackoff::new(Duration::from_secs(5));
212 let result = promote_retry_wait_to_ready(&[run], 1000, &strategy);
213
214 assert!(
215 matches!(
216 &result,
217 Err(RunInstanceError::InvalidTransition {
218 from: RunState::Scheduled,
219 to: RunState::Ready,
220 ..
221 })
222 ),
223 "expected InvalidTransition for non-RetryWait input, got: {result:?}"
224 );
225 }
226
227 #[test]
228 fn extreme_attempt_count_does_not_panic() {
229 let task_id = TaskId::new();
230 let run = make_retry_wait_run_at(task_id, 10, 0);
234 assert_eq!(run.attempt_count(), 10);
235 assert_eq!(run.state(), RunState::RetryWait);
236
237 let strategy =
238 ExponentialBackoff::new(Duration::from_secs(1), Duration::from_secs(3600)).unwrap();
239
240 let result = promote_retry_wait_to_ready(std::slice::from_ref(&run), u64::MAX, &strategy)
243 .expect("extreme attempt count must not panic");
244
245 assert_eq!(result.promoted().len(), 1);
246 assert_eq!(result.promoted()[0].state(), RunState::Ready);
247 assert!(result.still_waiting().is_empty());
248 }
249}