use actionqueue_core::ids::RunId;
use actionqueue_core::run::RunInstance;
use actionqueue_core::run::RunState;
use actionqueue_core::task::constraints::ConcurrencyKeyHoldPolicy;
use actionqueue_engine::concurrency::key_gate::{
AcquireResult, ConcurrencyKey, KeyGate, ReleaseResult,
};
use actionqueue_engine::concurrency::lifecycle::{
evaluate_state_transition, KeyLifecycleContext, LifecycleResult,
};
#[test]
fn key_can_be_acquired_when_free() {
let mut key_gate = KeyGate::new();
let key = ConcurrencyKey::new("my-key");
let run_id = RunId::new();
let result = key_gate.acquire(key.clone(), run_id);
match result {
AcquireResult::Acquired { key: acquired_key, run_id: acquired_run_id } => {
assert_eq!(acquired_key, key);
assert_eq!(acquired_run_id, run_id);
}
AcquireResult::Occupied { .. } => panic!("Expected key to be acquired"),
}
}
#[test]
fn second_run_cannot_acquire_same_key_while_first_holds_it() {
let mut key_gate = KeyGate::new();
let key = ConcurrencyKey::new("my-key");
let first_run_id = RunId::new();
let second_run_id = RunId::new();
let acquire_first = key_gate.acquire(key.clone(), first_run_id);
match acquire_first {
AcquireResult::Acquired { key: _, run_id } => {
assert_eq!(run_id, first_run_id);
}
AcquireResult::Occupied { .. } => panic!("Expected first acquisition to succeed"),
}
let acquire_second = key_gate.acquire(key, second_run_id);
match acquire_second {
AcquireResult::Occupied { key: occupied_key, holder_run_id } => {
assert_eq!(occupied_key.as_str(), "my-key");
assert_eq!(holder_run_id, first_run_id);
}
AcquireResult::Acquired { .. } => panic!("Expected key to be occupied"),
}
}
#[test]
fn same_run_can_acquire_same_key_multiple_times() {
let mut key_gate = KeyGate::new();
let key = ConcurrencyKey::new("my-key");
let run_id = RunId::new();
let result_first = key_gate.acquire(key.clone(), run_id);
match result_first {
AcquireResult::Acquired { key: _, run_id: acquired_run_id } => {
assert_eq!(acquired_run_id, run_id);
}
AcquireResult::Occupied { .. } => panic!("Expected first acquisition to succeed"),
}
let result_second = key_gate.acquire(key, run_id);
match result_second {
AcquireResult::Acquired { key: _, run_id: acquired_run_id } => {
assert_eq!(acquired_run_id, run_id);
}
AcquireResult::Occupied { .. } => panic!("Expected repeated acquisition to succeed"),
}
}
#[test]
fn release_allows_other_runs_to_acquire_key() {
let mut key_gate = KeyGate::new();
let key = ConcurrencyKey::new("my-key");
let first_run_id = RunId::new();
let second_run_id = RunId::new();
let acquire_first = key_gate.acquire(key.clone(), first_run_id);
match acquire_first {
AcquireResult::Acquired { key: _, run_id } => {
assert_eq!(run_id, first_run_id);
}
AcquireResult::Occupied { .. } => panic!("Expected first acquisition to succeed"),
}
let release_result = key_gate.release(key.clone(), first_run_id);
match release_result {
ReleaseResult::Released { key: released_key } => {
assert_eq!(released_key, key);
}
ReleaseResult::NotHeld { .. } => {
panic!("Expected release to succeed");
}
}
let acquire_second = key_gate.acquire(key, second_run_id);
match acquire_second {
AcquireResult::Acquired { key: _, run_id } => {
assert_eq!(run_id, second_run_id);
}
AcquireResult::Occupied { .. } => panic!("Expected key to be free after release"),
}
}
#[test]
fn release_from_different_run_does_not_clear_key() {
let mut key_gate = KeyGate::new();
let key = ConcurrencyKey::new("my-key");
let first_run_id = RunId::new();
let second_run_id = RunId::new();
let acquire_first = key_gate.acquire(key.clone(), first_run_id);
match acquire_first {
AcquireResult::Acquired { key: _, run_id } => {
assert_eq!(run_id, first_run_id);
}
AcquireResult::Occupied { .. } => panic!("Expected first acquisition to succeed"),
}
let release_result = key_gate.release(key, second_run_id);
match release_result {
ReleaseResult::Released { .. } => {
panic!("Expected release from different run to fail");
}
ReleaseResult::NotHeld { key: _, attempting_run_id } => {
assert_eq!(attempting_run_id, second_run_id);
}
}
let acquire_duplicate = key_gate.acquire(ConcurrencyKey::new("my-key"), first_run_id);
match acquire_duplicate {
AcquireResult::Acquired { .. } => {}
AcquireResult::Occupied { .. } => panic!("Expected same run to still hold key"),
}
}
#[test]
fn same_key_exclusion_contract_test() {
let mut key_gate = KeyGate::new();
let task_id = actionqueue_core::ids::TaskId::new();
let run1 = RunInstance::new_scheduled(task_id, 1000, 500).expect("valid run");
let run2 = RunInstance::new_scheduled(task_id, 1000, 501).expect("valid run");
let concurrency_key = Some("shared-key".to_string());
let result1 = evaluate_state_transition(
RunState::Leased,
RunState::Running,
KeyLifecycleContext::new(
concurrency_key.clone(),
run1.id(),
&mut key_gate,
ConcurrencyKeyHoldPolicy::default(),
),
);
assert!(
matches!(result1, LifecycleResult::Acquired { .. }),
"First run should acquire key when entering Running"
);
assert!(key_gate.is_key_occupied(&ConcurrencyKey::new("shared-key")));
let result2 = evaluate_state_transition(
RunState::Leased,
RunState::Running,
KeyLifecycleContext::new(
concurrency_key.clone(),
run2.id(),
&mut key_gate,
ConcurrencyKeyHoldPolicy::default(),
),
);
assert!(
matches!(result2, LifecycleResult::KeyOccupied { .. }),
"Second run should observe key occupied when entering Running"
);
assert_eq!(key_gate.key_holder(&ConcurrencyKey::new("shared-key")), Some(run1.id()));
}
#[test]
fn key_release_enables_reacquisition_contract_test() {
let mut key_gate = KeyGate::new();
let task_id = actionqueue_core::ids::TaskId::new();
let run1 = RunInstance::new_scheduled(task_id, 1000, 500).expect("valid run");
let run2 = RunInstance::new_scheduled(task_id, 1000, 501).expect("valid run");
let concurrency_key = Some("reacquisition-key".to_string());
let _ = evaluate_state_transition(
RunState::Leased,
RunState::Running,
KeyLifecycleContext::new(
concurrency_key.clone(),
run1.id(),
&mut key_gate,
ConcurrencyKeyHoldPolicy::default(),
),
);
assert!(key_gate.is_key_occupied(&ConcurrencyKey::new("reacquisition-key")));
let _ = evaluate_state_transition(
RunState::Running,
RunState::Completed,
KeyLifecycleContext::new(
concurrency_key.clone(),
run1.id(),
&mut key_gate,
ConcurrencyKeyHoldPolicy::default(),
),
);
assert!(!key_gate.is_key_occupied(&ConcurrencyKey::new("reacquisition-key")));
let result = evaluate_state_transition(
RunState::Leased,
RunState::Running,
KeyLifecycleContext::new(
concurrency_key,
run2.id(),
&mut key_gate,
ConcurrencyKeyHoldPolicy::default(),
),
);
assert!(
matches!(result, LifecycleResult::Acquired { .. }),
"Second run should acquire key after first run releases it"
);
assert!(key_gate.is_key_occupied(&ConcurrencyKey::new("reacquisition-key")));
}
#[test]
fn no_key_runs_can_execute_concurrently() {
let mut key_gate = KeyGate::new();
let task_id = actionqueue_core::ids::TaskId::new();
let run1 = RunInstance::new_scheduled(task_id, 1000, 500).expect("valid run");
let run2 = RunInstance::new_scheduled(task_id, 1000, 501).expect("valid run");
let concurrency_key = None;
let result1 = evaluate_state_transition(
RunState::Leased,
RunState::Running,
KeyLifecycleContext::new(
concurrency_key.clone(),
run1.id(),
&mut key_gate,
ConcurrencyKeyHoldPolicy::default(),
),
);
assert!(matches!(result1, LifecycleResult::NoAction { .. }), "No action when no key defined");
let result2 = evaluate_state_transition(
RunState::Leased,
RunState::Running,
KeyLifecycleContext::new(
concurrency_key,
run2.id(),
&mut key_gate,
ConcurrencyKeyHoldPolicy::default(),
),
);
assert!(matches!(result2, LifecycleResult::NoAction { .. }), "No action when no key defined");
assert!(!key_gate.is_key_occupied(&ConcurrencyKey::new("nonexistent")));
}
#[test]
fn same_key_scheduling_cycle() {
use actionqueue_core::run::RunInstance;
let mut key_gate = KeyGate::new();
let task_id = actionqueue_core::ids::TaskId::new();
let run1 = RunInstance::new_scheduled(task_id, 1000, 500).expect("valid run");
let run2 = RunInstance::new_scheduled(task_id, 1000, 501).expect("valid run");
let run3 = RunInstance::new_scheduled(task_id, 1000, 502).expect("valid run");
let concurrency_key = "cycle-key";
let phase1 = evaluate_state_transition(
RunState::Leased,
RunState::Running,
KeyLifecycleContext::new(
Some(concurrency_key.to_string()),
run1.id(),
&mut key_gate,
ConcurrencyKeyHoldPolicy::default(),
),
);
assert!(matches!(phase1, LifecycleResult::Acquired { .. }));
assert!(key_gate.is_key_occupied(&ConcurrencyKey::new(concurrency_key)));
assert_eq!(key_gate.key_holder(&ConcurrencyKey::new(concurrency_key)), Some(run1.id()));
let phase2 = evaluate_state_transition(
RunState::Running,
RunState::Completed,
KeyLifecycleContext::new(
Some(concurrency_key.to_string()),
run1.id(),
&mut key_gate,
ConcurrencyKeyHoldPolicy::default(),
),
);
assert!(matches!(phase2, LifecycleResult::Released { .. }));
assert!(!key_gate.is_key_occupied(&ConcurrencyKey::new(concurrency_key)));
let phase3 = evaluate_state_transition(
RunState::Leased,
RunState::Running,
KeyLifecycleContext::new(
Some(concurrency_key.to_string()),
run2.id(),
&mut key_gate,
ConcurrencyKeyHoldPolicy::default(),
),
);
assert!(matches!(phase3, LifecycleResult::Acquired { .. }));
assert!(key_gate.is_key_occupied(&ConcurrencyKey::new(concurrency_key)));
assert_eq!(key_gate.key_holder(&ConcurrencyKey::new(concurrency_key)), Some(run2.id()));
let phase4 = evaluate_state_transition(
RunState::Running,
RunState::Completed,
KeyLifecycleContext::new(
Some(concurrency_key.to_string()),
run2.id(),
&mut key_gate,
ConcurrencyKeyHoldPolicy::default(),
),
);
assert!(matches!(phase4, LifecycleResult::Released { .. }));
assert!(!key_gate.is_key_occupied(&ConcurrencyKey::new(concurrency_key)));
let phase5 = evaluate_state_transition(
RunState::Leased,
RunState::Running,
KeyLifecycleContext::new(
Some(concurrency_key.to_string()),
run3.id(),
&mut key_gate,
ConcurrencyKeyHoldPolicy::default(),
),
);
assert!(matches!(phase5, LifecycleResult::Acquired { .. }));
assert!(key_gate.is_key_occupied(&ConcurrencyKey::new(concurrency_key)));
assert_eq!(key_gate.key_holder(&ConcurrencyKey::new(concurrency_key)), Some(run3.id()));
}