Skip to main content

actionqueue_engine/index/
mod.rs

1//! Indexing utilities for run instances by state.
2//!
3//! This module provides index types for grouping and filtering run instances
4//! by their lifecycle state.
5//!
6//! The indexes support the scheduling workflow:
7//! - Scheduled: runs waiting to become ready
8//! - Ready: runs eligible to be leased
9//! - Running: runs currently being executed
10//! - Terminal: completed, failed, or canceled runs
11
12pub mod ready;
13pub mod running;
14pub mod scheduled;
15pub mod terminal;
16
17#[cfg(test)]
18pub(crate) mod test_util {
19    //! Shared test helpers for index module tests.
20
21    use actionqueue_core::ids::TaskId;
22    use actionqueue_core::run::run_instance::RunInstance;
23    use actionqueue_core::run::state::RunState;
24
25    /// Builds a `RunInstance` in the requested state by driving it through the
26    /// required state transitions. Used by all four index test suites.
27    #[allow(clippy::too_many_arguments)] // Test helper with naturally many parameters
28    pub fn build_run(
29        task_id: TaskId,
30        state: RunState,
31        scheduled_at: u64,
32        created_at: u64,
33        effective_priority: i32,
34        active_attempt: Option<actionqueue_core::ids::AttemptId>,
35    ) -> RunInstance {
36        let mut run = RunInstance::new_scheduled(task_id, scheduled_at, created_at)
37            .expect("test helper run construction should be valid");
38        if state == RunState::Ready {
39            run.promote_to_ready_with_priority(effective_priority).expect("promote to ready");
40        } else if state == RunState::Leased {
41            run.promote_to_ready_with_priority(effective_priority).expect("promote to ready");
42            run.transition_to(RunState::Leased).expect("ready -> leased");
43        } else if state == RunState::Running {
44            run.promote_to_ready_with_priority(effective_priority).expect("promote to ready");
45            run.transition_to(RunState::Leased).expect("ready -> leased");
46            run.transition_to(RunState::Running).expect("leased -> running");
47        } else if state == RunState::Completed {
48            run.promote_to_ready_with_priority(effective_priority).expect("promote to ready");
49            run.transition_to(RunState::Leased).expect("ready -> leased");
50            run.transition_to(RunState::Running).expect("leased -> running");
51            let aid = active_attempt.unwrap_or_default();
52            run.start_attempt(aid).expect("start attempt");
53            run.finish_attempt(aid).expect("finish attempt");
54            run.transition_to(RunState::Completed).expect("running -> completed");
55        } else if state == RunState::Failed {
56            run.promote_to_ready_with_priority(effective_priority).expect("promote to ready");
57            run.transition_to(RunState::Leased).expect("ready -> leased");
58            run.transition_to(RunState::Running).expect("leased -> running");
59            let aid = active_attempt.unwrap_or_default();
60            run.start_attempt(aid).expect("start attempt");
61            run.finish_attempt(aid).expect("finish attempt");
62            run.transition_to(RunState::Failed).expect("running -> failed");
63        } else if state == RunState::Canceled {
64            run.transition_to(RunState::Canceled).expect("cancel");
65        }
66
67        if let Some(aid) = active_attempt {
68            if run.state() == RunState::Leased {
69                run.transition_to(RunState::Running).expect("leased -> running for attempt");
70                run.start_attempt(aid).expect("start attempt");
71                run.transition_to(RunState::Leased).expect("running -> leased reset");
72            } else if run.state() == RunState::Running && run.current_attempt_id().is_none() {
73                run.start_attempt(aid).expect("start active attempt");
74            }
75        }
76
77        run
78    }
79}