use actionqueue_core::ids::RunId;
use actionqueue_core::run::RunState;
use actionqueue_core::task::constraints::ConcurrencyKeyHoldPolicy;
use crate::concurrency::key_gate::{AcquireResult, KeyGate, ReleaseResult};
#[derive(Debug, Clone, PartialEq, Eq)]
#[must_use]
pub enum LifecycleResult {
Acquired {
key: String,
run_id: RunId,
},
KeyOccupied {
key: String,
holder_run_id: RunId,
},
Released {
key: String,
},
NoAction {
key: Option<String>,
},
}
pub fn acquire_key(key: Option<String>, run_id: RunId, key_gate: &mut KeyGate) -> LifecycleResult {
match key {
Some(key_str) => {
let concurrency_key = crate::concurrency::key_gate::ConcurrencyKey::new(key_str);
match key_gate.acquire(concurrency_key, run_id) {
AcquireResult::Acquired { key, run_id } => {
LifecycleResult::Acquired { key: key.as_str().to_string(), run_id }
}
AcquireResult::Occupied { key, holder_run_id } => {
LifecycleResult::KeyOccupied { key: key.as_str().to_string(), holder_run_id }
}
}
}
None => LifecycleResult::NoAction { key: None },
}
}
pub fn release_key(key: Option<String>, run_id: RunId, key_gate: &mut KeyGate) -> LifecycleResult {
match key {
Some(key_str) => {
let concurrency_key = crate::concurrency::key_gate::ConcurrencyKey::new(key_str);
match key_gate.release(concurrency_key, run_id) {
ReleaseResult::Released { key } => {
LifecycleResult::Released { key: key.as_str().to_string() }
}
ReleaseResult::NotHeld { key, .. } => {
LifecycleResult::NoAction { key: Some(key.as_str().to_string()) }
}
}
}
None => LifecycleResult::NoAction { key: None },
}
}
pub struct KeyLifecycleContext<'a> {
concurrency_key: Option<String>,
run_id: RunId,
key_gate: &'a mut KeyGate,
hold_policy: ConcurrencyKeyHoldPolicy,
}
impl<'a> KeyLifecycleContext<'a> {
pub fn new(
concurrency_key: Option<String>,
run_id: RunId,
key_gate: &'a mut KeyGate,
hold_policy: ConcurrencyKeyHoldPolicy,
) -> Self {
Self { concurrency_key, run_id, key_gate, hold_policy }
}
pub fn concurrency_key(&self) -> Option<&str> {
self.concurrency_key.as_deref()
}
pub fn run_id(&self) -> RunId {
self.run_id
}
pub fn hold_policy(&self) -> ConcurrencyKeyHoldPolicy {
self.hold_policy
}
}
pub fn evaluate_state_transition(
from: RunState,
to: RunState,
ctx: KeyLifecycleContext<'_>,
) -> LifecycleResult {
let KeyLifecycleContext { concurrency_key, run_id, key_gate, hold_policy } = ctx;
tracing::debug!(%run_id, ?from, ?to, "concurrency key lifecycle evaluated");
if from != RunState::Running && to == RunState::Running {
return acquire_key(concurrency_key, run_id, key_gate);
}
if from == RunState::Running && to == RunState::RetryWait {
return match hold_policy {
ConcurrencyKeyHoldPolicy::HoldDuringRetry => LifecycleResult::NoAction { key: None },
ConcurrencyKeyHoldPolicy::ReleaseOnRetry => {
release_key(concurrency_key, run_id, key_gate)
}
};
}
if from == RunState::Running && to == RunState::Suspended {
return match hold_policy {
ConcurrencyKeyHoldPolicy::HoldDuringRetry => LifecycleResult::NoAction { key: None },
ConcurrencyKeyHoldPolicy::ReleaseOnRetry => {
release_key(concurrency_key, run_id, key_gate)
}
};
}
if from == RunState::Running && to != RunState::Running {
return release_key(concurrency_key, run_id, key_gate);
}
if from == RunState::Suspended && to.is_terminal() {
return release_key(concurrency_key, run_id, key_gate);
}
LifecycleResult::NoAction { key: None }
}
#[cfg(test)]
mod tests {
use actionqueue_core::ids::RunId;
use super::*;
#[test]
fn acquire_key_succeeds_when_key_is_free() {
let mut key_gate = KeyGate::new();
let run_id = RunId::new();
let key = Some("my-key".to_string());
let result = acquire_key(key, run_id, &mut key_gate);
match result {
LifecycleResult::Acquired { key: acquired_key, run_id: acquired_run_id } => {
assert_eq!(acquired_key, "my-key");
assert_eq!(acquired_run_id, run_id);
}
_ => panic!("Expected acquire to succeed"),
}
}
#[test]
fn acquire_key_returns_no_action_when_no_key_defined() {
let mut key_gate = KeyGate::new();
let run_id = RunId::new();
let result = acquire_key(None, run_id, &mut key_gate);
assert_eq!(result, LifecycleResult::NoAction { key: None });
}
#[test]
fn acquire_key_fails_when_key_is_occupied() {
let mut key_gate = KeyGate::new();
let holder_run_id = RunId::new();
let requesting_run_id = RunId::new();
let key = Some("my-key".to_string());
let _ = acquire_key(key.clone(), holder_run_id, &mut key_gate);
let result = acquire_key(key, requesting_run_id, &mut key_gate);
match result {
LifecycleResult::KeyOccupied { key: occupied_key, holder_run_id: occupied_holder } => {
assert_eq!(occupied_key, "my-key");
assert_eq!(occupied_holder, holder_run_id);
}
_ => panic!("Expected key to be occupied"),
}
}
#[test]
fn release_key_succeeds_when_key_is_held() {
let mut key_gate = KeyGate::new();
let run_id = RunId::new();
let key = Some("my-key".to_string());
let _ = acquire_key(key.clone(), run_id, &mut key_gate);
let result = release_key(key, run_id, &mut key_gate);
match result {
LifecycleResult::Released { key: released_key } => {
assert_eq!(released_key, "my-key");
}
_ => panic!("Expected release to succeed"),
}
}
#[test]
fn release_key_returns_no_action_when_no_key_defined() {
let mut key_gate = KeyGate::new();
let run_id = RunId::new();
let result = release_key(None, run_id, &mut key_gate);
assert_eq!(result, LifecycleResult::NoAction { key: None });
}
#[test]
fn release_key_returns_no_action_when_key_not_held() {
let mut key_gate = KeyGate::new();
let run_id = RunId::new();
let key = Some("my-key".to_string());
let result = release_key(key, run_id, &mut key_gate);
assert_eq!(result, LifecycleResult::NoAction { key: Some("my-key".to_string()) });
}
#[test]
fn evaluate_transition_acquires_key_when_entering_running() {
let mut key_gate = KeyGate::new();
let run_id = RunId::new();
let result = evaluate_state_transition(
RunState::Leased,
RunState::Running,
KeyLifecycleContext::new(
Some("my-key".to_string()),
run_id,
&mut key_gate,
ConcurrencyKeyHoldPolicy::default(),
),
);
match result {
LifecycleResult::Acquired { key, run_id: acquired_run_id } => {
assert_eq!(key, "my-key");
assert_eq!(acquired_run_id, run_id);
}
_ => panic!("Expected key to be acquired"),
}
}
#[test]
fn evaluate_transition_releases_key_when_entering_terminal_state() {
let mut key_gate = KeyGate::new();
let run_id = RunId::new();
let _ = acquire_key(Some("my-key".to_string()), run_id, &mut key_gate);
let result = evaluate_state_transition(
RunState::Running,
RunState::Completed,
KeyLifecycleContext::new(
Some("my-key".to_string()),
run_id,
&mut key_gate,
ConcurrencyKeyHoldPolicy::default(),
),
);
match result {
LifecycleResult::Released { key } => {
assert_eq!(key, "my-key");
}
_ => panic!("Expected key to be released"),
}
}
#[test]
fn evaluate_transition_no_action_for_non_key_transitions() {
let mut key_gate = KeyGate::new();
let run_id = RunId::new();
let result = evaluate_state_transition(
RunState::Scheduled,
RunState::Ready,
KeyLifecycleContext::new(
Some("my-key".to_string()),
run_id,
&mut key_gate,
ConcurrencyKeyHoldPolicy::default(),
),
);
assert_eq!(result, LifecycleResult::NoAction { key: None });
}
#[test]
fn evaluate_transition_holds_key_during_retry_with_default_policy() {
let mut key_gate = KeyGate::new();
let run_id = RunId::new();
let first = evaluate_state_transition(
RunState::Leased,
RunState::Running,
KeyLifecycleContext::new(
Some("my-key".to_string()),
run_id,
&mut key_gate,
ConcurrencyKeyHoldPolicy::HoldDuringRetry,
),
);
let second = evaluate_state_transition(
RunState::Running,
RunState::RetryWait,
KeyLifecycleContext::new(
Some("my-key".to_string()),
run_id,
&mut key_gate,
ConcurrencyKeyHoldPolicy::HoldDuringRetry,
),
);
assert!(matches!(first, LifecycleResult::Acquired { .. }));
assert!(matches!(second, LifecycleResult::NoAction { .. }));
}
#[test]
fn evaluate_transition_releases_key_on_retry_with_release_policy() {
let mut key_gate = KeyGate::new();
let run_id = RunId::new();
let first = evaluate_state_transition(
RunState::Leased,
RunState::Running,
KeyLifecycleContext::new(
Some("my-key".to_string()),
run_id,
&mut key_gate,
ConcurrencyKeyHoldPolicy::ReleaseOnRetry,
),
);
let second = evaluate_state_transition(
RunState::Running,
RunState::RetryWait,
KeyLifecycleContext::new(
Some("my-key".to_string()),
run_id,
&mut key_gate,
ConcurrencyKeyHoldPolicy::ReleaseOnRetry,
),
);
assert!(matches!(first, LifecycleResult::Acquired { .. }));
assert!(matches!(second, LifecycleResult::Released { .. }));
}
#[test]
fn evaluate_transition_releases_key_on_suspended_to_canceled_with_hold_policy() {
let mut key_gate = KeyGate::new();
let run_id = RunId::new();
let _ = acquire_key(Some("my-key".to_string()), run_id, &mut key_gate);
let suspend_result = evaluate_state_transition(
RunState::Running,
RunState::Suspended,
KeyLifecycleContext::new(
Some("my-key".to_string()),
run_id,
&mut key_gate,
ConcurrencyKeyHoldPolicy::HoldDuringRetry,
),
);
assert!(
matches!(suspend_result, LifecycleResult::NoAction { .. }),
"HoldDuringRetry must not release key on suspend"
);
let cancel_result = evaluate_state_transition(
RunState::Suspended,
RunState::Canceled,
KeyLifecycleContext::new(
Some("my-key".to_string()),
run_id,
&mut key_gate,
ConcurrencyKeyHoldPolicy::HoldDuringRetry,
),
);
assert!(
matches!(cancel_result, LifecycleResult::Released { .. }),
"Suspended→Canceled must release concurrency key"
);
}
}