Skip to main content

actionqueue_engine/concurrency/
lifecycle.rs

1//! Concurrency key lifecycle policy for run execution boundaries.
2//!
3//! This module implements the concurrency key acquire and release policy:
4//! - Keys are acquired when a run transitions to Running state (execution starts).
5//! - Keys are released when a run transitions to a terminal state.
6//!
7//! The lifecycle logic uses the key gate contracts from `key_gate` module
8//! and does not add unrelated policy behavior.
9
10use actionqueue_core::ids::RunId;
11use actionqueue_core::run::RunState;
12use actionqueue_core::task::constraints::ConcurrencyKeyHoldPolicy;
13
14use crate::concurrency::key_gate::{AcquireResult, KeyGate, ReleaseResult};
15
16/// Result of a key lifecycle operation.
17#[derive(Debug, Clone, PartialEq, Eq)]
18#[must_use]
19pub enum LifecycleResult {
20    /// Key was acquired successfully.
21    Acquired {
22        /// The concurrency key that was acquired.
23        key: String,
24        /// The run that now holds the key.
25        run_id: RunId,
26    },
27    /// Key acquisition failed because the key is occupied by another run.
28    KeyOccupied {
29        /// The concurrency key that is occupied.
30        key: String,
31        /// The run that currently holds the key.
32        holder_run_id: RunId,
33    },
34    /// Key was released successfully.
35    Released {
36        /// The concurrency key that was released.
37        key: String,
38    },
39    /// No action was taken (either no key or key already free).
40    NoAction {
41        /// The key that was attempted to release (if any).
42        key: Option<String>,
43    },
44}
45
46/// Attempts to acquire the concurrency key for a run.
47///
48/// Returns `LifecycleResult::Acquired` if the key is free or already held by
49/// the same run. Returns `LifecycleResult::KeyOccupied` if another run holds it.
50///
51/// The key is obtained from the task constraints. Returns `NoAction` if no
52/// concurrency key is defined.
53pub fn acquire_key(key: Option<String>, run_id: RunId, key_gate: &mut KeyGate) -> LifecycleResult {
54    match key {
55        Some(key_str) => {
56            let concurrency_key = crate::concurrency::key_gate::ConcurrencyKey::new(key_str);
57            match key_gate.acquire(concurrency_key, run_id) {
58                AcquireResult::Acquired { key, run_id } => {
59                    LifecycleResult::Acquired { key: key.as_str().to_string(), run_id }
60                }
61                AcquireResult::Occupied { key, holder_run_id } => {
62                    LifecycleResult::KeyOccupied { key: key.as_str().to_string(), holder_run_id }
63                }
64            }
65        }
66        None => LifecycleResult::NoAction { key: None },
67    }
68}
69
70/// Attempts to release the concurrency key for a run.
71///
72/// Returns `LifecycleResult::Released` if the key was held by the run.
73/// Returns `LifecycleResult::NoAction` if no key was defined or the key
74/// was not held by the run.
75pub fn release_key(key: Option<String>, run_id: RunId, key_gate: &mut KeyGate) -> LifecycleResult {
76    match key {
77        Some(key_str) => {
78            let concurrency_key = crate::concurrency::key_gate::ConcurrencyKey::new(key_str);
79            match key_gate.release(concurrency_key, run_id) {
80                ReleaseResult::Released { key } => {
81                    LifecycleResult::Released { key: key.as_str().to_string() }
82                }
83                ReleaseResult::NotHeld { key, .. } => {
84                    LifecycleResult::NoAction { key: Some(key.as_str().to_string()) }
85                }
86            }
87        }
88        None => LifecycleResult::NoAction { key: None },
89    }
90}
91
92/// Context for evaluating concurrency key lifecycle during a state transition.
93pub struct KeyLifecycleContext<'a> {
94    /// Optional concurrency key associated with the run.
95    concurrency_key: Option<String>,
96    /// The run undergoing the state transition.
97    run_id: RunId,
98    /// Mutable reference to the key gate managing concurrency slots.
99    key_gate: &'a mut KeyGate,
100    /// Policy controlling key behavior during retry transitions.
101    hold_policy: ConcurrencyKeyHoldPolicy,
102}
103
104impl<'a> KeyLifecycleContext<'a> {
105    /// Creates a new key lifecycle context.
106    pub fn new(
107        concurrency_key: Option<String>,
108        run_id: RunId,
109        key_gate: &'a mut KeyGate,
110        hold_policy: ConcurrencyKeyHoldPolicy,
111    ) -> Self {
112        Self { concurrency_key, run_id, key_gate, hold_policy }
113    }
114
115    /// Returns the optional concurrency key.
116    pub fn concurrency_key(&self) -> Option<&str> {
117        self.concurrency_key.as_deref()
118    }
119
120    /// Returns the run identifier.
121    pub fn run_id(&self) -> RunId {
122        self.run_id
123    }
124
125    /// Returns the hold policy.
126    pub fn hold_policy(&self) -> ConcurrencyKeyHoldPolicy {
127        self.hold_policy
128    }
129}
130
131/// Determines the appropriate key lifecycle action based on a state transition.
132///
133/// When a run transitions to Running, the concurrency key should be acquired.
134/// When a run transitions to a terminal state (Completed, Failed, Canceled),
135/// the concurrency key should be released.
136///
137/// The `hold_policy` parameter controls behavior when transitioning from
138/// Running to RetryWait:
139/// - [`ConcurrencyKeyHoldPolicy::HoldDuringRetry`]: the key is retained
140///   (no release) so no other run with the same key can start during retry.
141/// - [`ConcurrencyKeyHoldPolicy::ReleaseOnRetry`]: the key is released,
142///   allowing other runs to acquire it while this run waits for retry.
143pub fn evaluate_state_transition(
144    from: RunState,
145    to: RunState,
146    ctx: KeyLifecycleContext<'_>,
147) -> LifecycleResult {
148    let KeyLifecycleContext { concurrency_key, run_id, key_gate, hold_policy } = ctx;
149    tracing::debug!(%run_id, ?from, ?to, "concurrency key lifecycle evaluated");
150
151    // Key is acquired when entering Running state
152    if from != RunState::Running && to == RunState::Running {
153        return acquire_key(concurrency_key, run_id, key_gate);
154    }
155
156    // When leaving Running for RetryWait, consult the hold policy
157    if from == RunState::Running && to == RunState::RetryWait {
158        return match hold_policy {
159            ConcurrencyKeyHoldPolicy::HoldDuringRetry => LifecycleResult::NoAction { key: None },
160            ConcurrencyKeyHoldPolicy::ReleaseOnRetry => {
161                release_key(concurrency_key, run_id, key_gate)
162            }
163        };
164    }
165
166    // When leaving Running for Suspended, follow the same hold policy as RetryWait.
167    // The run is paused and may resume, so the key behaviour mirrors retry semantics.
168    if from == RunState::Running && to == RunState::Suspended {
169        return match hold_policy {
170            ConcurrencyKeyHoldPolicy::HoldDuringRetry => LifecycleResult::NoAction { key: None },
171            ConcurrencyKeyHoldPolicy::ReleaseOnRetry => {
172                release_key(concurrency_key, run_id, key_gate)
173            }
174        };
175    }
176
177    // Key is released when leaving Running state for any other non-Running state
178    if from == RunState::Running && to != RunState::Running {
179        return release_key(concurrency_key, run_id, key_gate);
180    }
181
182    // When a Suspended run reaches a terminal state (e.g. cascade cancellation),
183    // release the key unconditionally. The key may have been held during suspend
184    // under HoldDuringRetry policy and must be freed to avoid permanent key leak.
185    if from == RunState::Suspended && to.is_terminal() {
186        return release_key(concurrency_key, run_id, key_gate);
187    }
188
189    // No key lifecycle action needed for other transitions
190    LifecycleResult::NoAction { key: None }
191}
192
193#[cfg(test)]
194mod tests {
195    use actionqueue_core::ids::RunId;
196
197    use super::*;
198
199    #[test]
200    fn acquire_key_succeeds_when_key_is_free() {
201        let mut key_gate = KeyGate::new();
202        let run_id = RunId::new();
203        let key = Some("my-key".to_string());
204
205        let result = acquire_key(key, run_id, &mut key_gate);
206
207        match result {
208            LifecycleResult::Acquired { key: acquired_key, run_id: acquired_run_id } => {
209                assert_eq!(acquired_key, "my-key");
210                assert_eq!(acquired_run_id, run_id);
211            }
212            _ => panic!("Expected acquire to succeed"),
213        }
214    }
215
216    #[test]
217    fn acquire_key_returns_no_action_when_no_key_defined() {
218        let mut key_gate = KeyGate::new();
219        let run_id = RunId::new();
220
221        let result = acquire_key(None, run_id, &mut key_gate);
222
223        assert_eq!(result, LifecycleResult::NoAction { key: None });
224    }
225
226    #[test]
227    fn acquire_key_fails_when_key_is_occupied() {
228        let mut key_gate = KeyGate::new();
229        let holder_run_id = RunId::new();
230        let requesting_run_id = RunId::new();
231        let key = Some("my-key".to_string());
232
233        // First run acquires the key
234        let _ = acquire_key(key.clone(), holder_run_id, &mut key_gate);
235
236        // Second run tries to acquire the same key
237        let result = acquire_key(key, requesting_run_id, &mut key_gate);
238
239        match result {
240            LifecycleResult::KeyOccupied { key: occupied_key, holder_run_id: occupied_holder } => {
241                assert_eq!(occupied_key, "my-key");
242                assert_eq!(occupied_holder, holder_run_id);
243            }
244            _ => panic!("Expected key to be occupied"),
245        }
246    }
247
248    #[test]
249    fn release_key_succeeds_when_key_is_held() {
250        let mut key_gate = KeyGate::new();
251        let run_id = RunId::new();
252        let key = Some("my-key".to_string());
253
254        // Acquire the key first
255        let _ = acquire_key(key.clone(), run_id, &mut key_gate);
256
257        // Release the key
258        let result = release_key(key, run_id, &mut key_gate);
259
260        match result {
261            LifecycleResult::Released { key: released_key } => {
262                assert_eq!(released_key, "my-key");
263            }
264            _ => panic!("Expected release to succeed"),
265        }
266    }
267
268    #[test]
269    fn release_key_returns_no_action_when_no_key_defined() {
270        let mut key_gate = KeyGate::new();
271        let run_id = RunId::new();
272
273        let result = release_key(None, run_id, &mut key_gate);
274
275        assert_eq!(result, LifecycleResult::NoAction { key: None });
276    }
277
278    #[test]
279    fn release_key_returns_no_action_when_key_not_held() {
280        let mut key_gate = KeyGate::new();
281        let run_id = RunId::new();
282        let key = Some("my-key".to_string());
283
284        let result = release_key(key, run_id, &mut key_gate);
285
286        assert_eq!(result, LifecycleResult::NoAction { key: Some("my-key".to_string()) });
287    }
288
289    #[test]
290    fn evaluate_transition_acquires_key_when_entering_running() {
291        let mut key_gate = KeyGate::new();
292        let run_id = RunId::new();
293
294        let result = evaluate_state_transition(
295            RunState::Leased,
296            RunState::Running,
297            KeyLifecycleContext::new(
298                Some("my-key".to_string()),
299                run_id,
300                &mut key_gate,
301                ConcurrencyKeyHoldPolicy::default(),
302            ),
303        );
304
305        match result {
306            LifecycleResult::Acquired { key, run_id: acquired_run_id } => {
307                assert_eq!(key, "my-key");
308                assert_eq!(acquired_run_id, run_id);
309            }
310            _ => panic!("Expected key to be acquired"),
311        }
312    }
313
314    #[test]
315    fn evaluate_transition_releases_key_when_entering_terminal_state() {
316        let mut key_gate = KeyGate::new();
317        let run_id = RunId::new();
318
319        // First acquire the key
320        let _ = acquire_key(Some("my-key".to_string()), run_id, &mut key_gate);
321
322        // Then evaluate transition to terminal state
323        let result = evaluate_state_transition(
324            RunState::Running,
325            RunState::Completed,
326            KeyLifecycleContext::new(
327                Some("my-key".to_string()),
328                run_id,
329                &mut key_gate,
330                ConcurrencyKeyHoldPolicy::default(),
331            ),
332        );
333
334        match result {
335            LifecycleResult::Released { key } => {
336                assert_eq!(key, "my-key");
337            }
338            _ => panic!("Expected key to be released"),
339        }
340    }
341
342    #[test]
343    fn evaluate_transition_no_action_for_non_key_transitions() {
344        let mut key_gate = KeyGate::new();
345        let run_id = RunId::new();
346
347        // Evaluate transition that doesn't trigger key lifecycle
348        let result = evaluate_state_transition(
349            RunState::Scheduled,
350            RunState::Ready,
351            KeyLifecycleContext::new(
352                Some("my-key".to_string()),
353                run_id,
354                &mut key_gate,
355                ConcurrencyKeyHoldPolicy::default(),
356            ),
357        );
358
359        assert_eq!(result, LifecycleResult::NoAction { key: None });
360    }
361
362    #[test]
363    fn evaluate_transition_holds_key_during_retry_with_default_policy() {
364        let mut key_gate = KeyGate::new();
365        let run_id = RunId::new();
366
367        // First transition to Running should acquire
368        let first = evaluate_state_transition(
369            RunState::Leased,
370            RunState::Running,
371            KeyLifecycleContext::new(
372                Some("my-key".to_string()),
373                run_id,
374                &mut key_gate,
375                ConcurrencyKeyHoldPolicy::HoldDuringRetry,
376            ),
377        );
378
379        // Transition from Running to RetryWait with HoldDuringRetry should NOT release
380        let second = evaluate_state_transition(
381            RunState::Running,
382            RunState::RetryWait,
383            KeyLifecycleContext::new(
384                Some("my-key".to_string()),
385                run_id,
386                &mut key_gate,
387                ConcurrencyKeyHoldPolicy::HoldDuringRetry,
388            ),
389        );
390
391        assert!(matches!(first, LifecycleResult::Acquired { .. }));
392        assert!(matches!(second, LifecycleResult::NoAction { .. }));
393    }
394
395    #[test]
396    fn evaluate_transition_releases_key_on_retry_with_release_policy() {
397        let mut key_gate = KeyGate::new();
398        let run_id = RunId::new();
399
400        // First transition to Running should acquire
401        let first = evaluate_state_transition(
402            RunState::Leased,
403            RunState::Running,
404            KeyLifecycleContext::new(
405                Some("my-key".to_string()),
406                run_id,
407                &mut key_gate,
408                ConcurrencyKeyHoldPolicy::ReleaseOnRetry,
409            ),
410        );
411
412        // Transition from Running to RetryWait with ReleaseOnRetry should release
413        let second = evaluate_state_transition(
414            RunState::Running,
415            RunState::RetryWait,
416            KeyLifecycleContext::new(
417                Some("my-key".to_string()),
418                run_id,
419                &mut key_gate,
420                ConcurrencyKeyHoldPolicy::ReleaseOnRetry,
421            ),
422        );
423
424        assert!(matches!(first, LifecycleResult::Acquired { .. }));
425        assert!(matches!(second, LifecycleResult::Released { .. }));
426    }
427
428    #[test]
429    fn evaluate_transition_releases_key_on_suspended_to_canceled_with_hold_policy() {
430        let mut key_gate = KeyGate::new();
431        let run_id = RunId::new();
432
433        // Acquire key (simulating Running state entry)
434        let _ = acquire_key(Some("my-key".to_string()), run_id, &mut key_gate);
435
436        // Running→Suspended with HoldDuringRetry should NOT release
437        let suspend_result = evaluate_state_transition(
438            RunState::Running,
439            RunState::Suspended,
440            KeyLifecycleContext::new(
441                Some("my-key".to_string()),
442                run_id,
443                &mut key_gate,
444                ConcurrencyKeyHoldPolicy::HoldDuringRetry,
445            ),
446        );
447        assert!(
448            matches!(suspend_result, LifecycleResult::NoAction { .. }),
449            "HoldDuringRetry must not release key on suspend"
450        );
451
452        // Suspended→Canceled must release regardless of hold policy
453        let cancel_result = evaluate_state_transition(
454            RunState::Suspended,
455            RunState::Canceled,
456            KeyLifecycleContext::new(
457                Some("my-key".to_string()),
458                run_id,
459                &mut key_gate,
460                ConcurrencyKeyHoldPolicy::HoldDuringRetry,
461            ),
462        );
463        assert!(
464            matches!(cancel_result, LifecycleResult::Released { .. }),
465            "Suspended→Canceled must release concurrency key"
466        );
467    }
468}