1use actionqueue_core::ids::RunId;
11use actionqueue_core::run::RunState;
12use actionqueue_core::task::constraints::ConcurrencyKeyHoldPolicy;
13
14use crate::concurrency::key_gate::{AcquireResult, KeyGate, ReleaseResult};
15
16#[derive(Debug, Clone, PartialEq, Eq)]
18#[must_use]
19pub enum LifecycleResult {
20 Acquired {
22 key: String,
24 run_id: RunId,
26 },
27 KeyOccupied {
29 key: String,
31 holder_run_id: RunId,
33 },
34 Released {
36 key: String,
38 },
39 NoAction {
41 key: Option<String>,
43 },
44}
45
46pub fn acquire_key(key: Option<String>, run_id: RunId, key_gate: &mut KeyGate) -> LifecycleResult {
54 match key {
55 Some(key_str) => {
56 let concurrency_key = crate::concurrency::key_gate::ConcurrencyKey::new(key_str);
57 match key_gate.acquire(concurrency_key, run_id) {
58 AcquireResult::Acquired { key, run_id } => {
59 LifecycleResult::Acquired { key: key.as_str().to_string(), run_id }
60 }
61 AcquireResult::Occupied { key, holder_run_id } => {
62 LifecycleResult::KeyOccupied { key: key.as_str().to_string(), holder_run_id }
63 }
64 }
65 }
66 None => LifecycleResult::NoAction { key: None },
67 }
68}
69
70pub fn release_key(key: Option<String>, run_id: RunId, key_gate: &mut KeyGate) -> LifecycleResult {
76 match key {
77 Some(key_str) => {
78 let concurrency_key = crate::concurrency::key_gate::ConcurrencyKey::new(key_str);
79 match key_gate.release(concurrency_key, run_id) {
80 ReleaseResult::Released { key } => {
81 LifecycleResult::Released { key: key.as_str().to_string() }
82 }
83 ReleaseResult::NotHeld { key, .. } => {
84 LifecycleResult::NoAction { key: Some(key.as_str().to_string()) }
85 }
86 }
87 }
88 None => LifecycleResult::NoAction { key: None },
89 }
90}
91
92pub struct KeyLifecycleContext<'a> {
94 concurrency_key: Option<String>,
96 run_id: RunId,
98 key_gate: &'a mut KeyGate,
100 hold_policy: ConcurrencyKeyHoldPolicy,
102}
103
104impl<'a> KeyLifecycleContext<'a> {
105 pub fn new(
107 concurrency_key: Option<String>,
108 run_id: RunId,
109 key_gate: &'a mut KeyGate,
110 hold_policy: ConcurrencyKeyHoldPolicy,
111 ) -> Self {
112 Self { concurrency_key, run_id, key_gate, hold_policy }
113 }
114
115 pub fn concurrency_key(&self) -> Option<&str> {
117 self.concurrency_key.as_deref()
118 }
119
120 pub fn run_id(&self) -> RunId {
122 self.run_id
123 }
124
125 pub fn hold_policy(&self) -> ConcurrencyKeyHoldPolicy {
127 self.hold_policy
128 }
129}
130
131pub fn evaluate_state_transition(
144 from: RunState,
145 to: RunState,
146 ctx: KeyLifecycleContext<'_>,
147) -> LifecycleResult {
148 let KeyLifecycleContext { concurrency_key, run_id, key_gate, hold_policy } = ctx;
149 tracing::debug!(%run_id, ?from, ?to, "concurrency key lifecycle evaluated");
150
151 if from != RunState::Running && to == RunState::Running {
153 return acquire_key(concurrency_key, run_id, key_gate);
154 }
155
156 if from == RunState::Running && to == RunState::RetryWait {
158 return match hold_policy {
159 ConcurrencyKeyHoldPolicy::HoldDuringRetry => LifecycleResult::NoAction { key: None },
160 ConcurrencyKeyHoldPolicy::ReleaseOnRetry => {
161 release_key(concurrency_key, run_id, key_gate)
162 }
163 };
164 }
165
166 if from == RunState::Running && to == RunState::Suspended {
169 return match hold_policy {
170 ConcurrencyKeyHoldPolicy::HoldDuringRetry => LifecycleResult::NoAction { key: None },
171 ConcurrencyKeyHoldPolicy::ReleaseOnRetry => {
172 release_key(concurrency_key, run_id, key_gate)
173 }
174 };
175 }
176
177 if from == RunState::Running && to != RunState::Running {
179 return release_key(concurrency_key, run_id, key_gate);
180 }
181
182 if from == RunState::Suspended && to.is_terminal() {
186 return release_key(concurrency_key, run_id, key_gate);
187 }
188
189 LifecycleResult::NoAction { key: None }
191}
192
193#[cfg(test)]
194mod tests {
195 use actionqueue_core::ids::RunId;
196
197 use super::*;
198
199 #[test]
200 fn acquire_key_succeeds_when_key_is_free() {
201 let mut key_gate = KeyGate::new();
202 let run_id = RunId::new();
203 let key = Some("my-key".to_string());
204
205 let result = acquire_key(key, run_id, &mut key_gate);
206
207 match result {
208 LifecycleResult::Acquired { key: acquired_key, run_id: acquired_run_id } => {
209 assert_eq!(acquired_key, "my-key");
210 assert_eq!(acquired_run_id, run_id);
211 }
212 _ => panic!("Expected acquire to succeed"),
213 }
214 }
215
216 #[test]
217 fn acquire_key_returns_no_action_when_no_key_defined() {
218 let mut key_gate = KeyGate::new();
219 let run_id = RunId::new();
220
221 let result = acquire_key(None, run_id, &mut key_gate);
222
223 assert_eq!(result, LifecycleResult::NoAction { key: None });
224 }
225
226 #[test]
227 fn acquire_key_fails_when_key_is_occupied() {
228 let mut key_gate = KeyGate::new();
229 let holder_run_id = RunId::new();
230 let requesting_run_id = RunId::new();
231 let key = Some("my-key".to_string());
232
233 let _ = acquire_key(key.clone(), holder_run_id, &mut key_gate);
235
236 let result = acquire_key(key, requesting_run_id, &mut key_gate);
238
239 match result {
240 LifecycleResult::KeyOccupied { key: occupied_key, holder_run_id: occupied_holder } => {
241 assert_eq!(occupied_key, "my-key");
242 assert_eq!(occupied_holder, holder_run_id);
243 }
244 _ => panic!("Expected key to be occupied"),
245 }
246 }
247
248 #[test]
249 fn release_key_succeeds_when_key_is_held() {
250 let mut key_gate = KeyGate::new();
251 let run_id = RunId::new();
252 let key = Some("my-key".to_string());
253
254 let _ = acquire_key(key.clone(), run_id, &mut key_gate);
256
257 let result = release_key(key, run_id, &mut key_gate);
259
260 match result {
261 LifecycleResult::Released { key: released_key } => {
262 assert_eq!(released_key, "my-key");
263 }
264 _ => panic!("Expected release to succeed"),
265 }
266 }
267
268 #[test]
269 fn release_key_returns_no_action_when_no_key_defined() {
270 let mut key_gate = KeyGate::new();
271 let run_id = RunId::new();
272
273 let result = release_key(None, run_id, &mut key_gate);
274
275 assert_eq!(result, LifecycleResult::NoAction { key: None });
276 }
277
278 #[test]
279 fn release_key_returns_no_action_when_key_not_held() {
280 let mut key_gate = KeyGate::new();
281 let run_id = RunId::new();
282 let key = Some("my-key".to_string());
283
284 let result = release_key(key, run_id, &mut key_gate);
285
286 assert_eq!(result, LifecycleResult::NoAction { key: Some("my-key".to_string()) });
287 }
288
289 #[test]
290 fn evaluate_transition_acquires_key_when_entering_running() {
291 let mut key_gate = KeyGate::new();
292 let run_id = RunId::new();
293
294 let result = evaluate_state_transition(
295 RunState::Leased,
296 RunState::Running,
297 KeyLifecycleContext::new(
298 Some("my-key".to_string()),
299 run_id,
300 &mut key_gate,
301 ConcurrencyKeyHoldPolicy::default(),
302 ),
303 );
304
305 match result {
306 LifecycleResult::Acquired { key, run_id: acquired_run_id } => {
307 assert_eq!(key, "my-key");
308 assert_eq!(acquired_run_id, run_id);
309 }
310 _ => panic!("Expected key to be acquired"),
311 }
312 }
313
314 #[test]
315 fn evaluate_transition_releases_key_when_entering_terminal_state() {
316 let mut key_gate = KeyGate::new();
317 let run_id = RunId::new();
318
319 let _ = acquire_key(Some("my-key".to_string()), run_id, &mut key_gate);
321
322 let result = evaluate_state_transition(
324 RunState::Running,
325 RunState::Completed,
326 KeyLifecycleContext::new(
327 Some("my-key".to_string()),
328 run_id,
329 &mut key_gate,
330 ConcurrencyKeyHoldPolicy::default(),
331 ),
332 );
333
334 match result {
335 LifecycleResult::Released { key } => {
336 assert_eq!(key, "my-key");
337 }
338 _ => panic!("Expected key to be released"),
339 }
340 }
341
342 #[test]
343 fn evaluate_transition_no_action_for_non_key_transitions() {
344 let mut key_gate = KeyGate::new();
345 let run_id = RunId::new();
346
347 let result = evaluate_state_transition(
349 RunState::Scheduled,
350 RunState::Ready,
351 KeyLifecycleContext::new(
352 Some("my-key".to_string()),
353 run_id,
354 &mut key_gate,
355 ConcurrencyKeyHoldPolicy::default(),
356 ),
357 );
358
359 assert_eq!(result, LifecycleResult::NoAction { key: None });
360 }
361
362 #[test]
363 fn evaluate_transition_holds_key_during_retry_with_default_policy() {
364 let mut key_gate = KeyGate::new();
365 let run_id = RunId::new();
366
367 let first = evaluate_state_transition(
369 RunState::Leased,
370 RunState::Running,
371 KeyLifecycleContext::new(
372 Some("my-key".to_string()),
373 run_id,
374 &mut key_gate,
375 ConcurrencyKeyHoldPolicy::HoldDuringRetry,
376 ),
377 );
378
379 let second = evaluate_state_transition(
381 RunState::Running,
382 RunState::RetryWait,
383 KeyLifecycleContext::new(
384 Some("my-key".to_string()),
385 run_id,
386 &mut key_gate,
387 ConcurrencyKeyHoldPolicy::HoldDuringRetry,
388 ),
389 );
390
391 assert!(matches!(first, LifecycleResult::Acquired { .. }));
392 assert!(matches!(second, LifecycleResult::NoAction { .. }));
393 }
394
395 #[test]
396 fn evaluate_transition_releases_key_on_retry_with_release_policy() {
397 let mut key_gate = KeyGate::new();
398 let run_id = RunId::new();
399
400 let first = evaluate_state_transition(
402 RunState::Leased,
403 RunState::Running,
404 KeyLifecycleContext::new(
405 Some("my-key".to_string()),
406 run_id,
407 &mut key_gate,
408 ConcurrencyKeyHoldPolicy::ReleaseOnRetry,
409 ),
410 );
411
412 let second = evaluate_state_transition(
414 RunState::Running,
415 RunState::RetryWait,
416 KeyLifecycleContext::new(
417 Some("my-key".to_string()),
418 run_id,
419 &mut key_gate,
420 ConcurrencyKeyHoldPolicy::ReleaseOnRetry,
421 ),
422 );
423
424 assert!(matches!(first, LifecycleResult::Acquired { .. }));
425 assert!(matches!(second, LifecycleResult::Released { .. }));
426 }
427
428 #[test]
429 fn evaluate_transition_releases_key_on_suspended_to_canceled_with_hold_policy() {
430 let mut key_gate = KeyGate::new();
431 let run_id = RunId::new();
432
433 let _ = acquire_key(Some("my-key".to_string()), run_id, &mut key_gate);
435
436 let suspend_result = evaluate_state_transition(
438 RunState::Running,
439 RunState::Suspended,
440 KeyLifecycleContext::new(
441 Some("my-key".to_string()),
442 run_id,
443 &mut key_gate,
444 ConcurrencyKeyHoldPolicy::HoldDuringRetry,
445 ),
446 );
447 assert!(
448 matches!(suspend_result, LifecycleResult::NoAction { .. }),
449 "HoldDuringRetry must not release key on suspend"
450 );
451
452 let cancel_result = evaluate_state_transition(
454 RunState::Suspended,
455 RunState::Canceled,
456 KeyLifecycleContext::new(
457 Some("my-key".to_string()),
458 run_id,
459 &mut key_gate,
460 ConcurrencyKeyHoldPolicy::HoldDuringRetry,
461 ),
462 );
463 assert!(
464 matches!(cancel_result, LifecycleResult::Released { .. }),
465 "Suspended→Canceled must release concurrency key"
466 );
467 }
468}