Skip to main content

actionqueue_engine/index/
running.rs

1//! Running index - active runs that are leased or executing.
2//!
3//! The running index holds run instances that are actively in-flight:
4//! - `Leased`: reserved for an executor but not yet started
5//! - `Running`: currently executing
6
7use actionqueue_core::run::run_instance::RunInstance;
8use actionqueue_core::run::state::RunState;
9
10/// A view of all active runs (`Leased` and `Running`).
11///
12/// This structure provides filtering and traversal over runs that are
13/// currently in-flight. Active runs transition from Running to either:
14/// - RetryWait (on failure, if retries remain)
15/// - Completed (on success)
16/// - Failed (on failure, no retries remaining)
17/// - Canceled
18#[derive(Debug, Clone, Default, PartialEq, Eq)]
19pub struct RunningIndex {
20    /// The runs in Running state
21    runs: Vec<RunInstance>,
22}
23
24impl RunningIndex {
25    /// Creates a new empty running index.
26    pub fn new() -> Self {
27        Self { runs: Vec::new() }
28    }
29
30    /// Creates a running index from a vector of runs.
31    ///
32    /// All runs must be in the `Leased` or `Running` state.
33    pub fn from_runs(runs: Vec<RunInstance>) -> Self {
34        debug_assert!(
35            runs.iter().all(|r| matches!(r.state(), RunState::Leased | RunState::Running)),
36            "RunningIndex::from_runs called with non-Leased/Running run"
37        );
38        Self { runs }
39    }
40
41    /// Returns all runs in the running index.
42    pub fn runs(&self) -> &[RunInstance] {
43        &self.runs
44    }
45
46    /// Returns the number of runs in the running index.
47    pub fn len(&self) -> usize {
48        self.runs.len()
49    }
50
51    /// Returns true if the index contains no runs.
52    pub fn is_empty(&self) -> bool {
53        self.runs.is_empty()
54    }
55
56    /// Returns only leased runs.
57    pub fn leased(&self) -> Vec<&RunInstance> {
58        self.runs.iter().filter(|run| run.state() == RunState::Leased).collect()
59    }
60
61    /// Returns only currently executing runs.
62    pub fn executing(&self) -> Vec<&RunInstance> {
63        self.runs.iter().filter(|run| run.state() == RunState::Running).collect()
64    }
65}
66
67impl From<&[RunInstance]> for RunningIndex {
68    fn from(runs: &[RunInstance]) -> Self {
69        let running_runs: Vec<RunInstance> = runs
70            .iter()
71            .filter(|run| matches!(run.state(), RunState::Leased | RunState::Running))
72            .cloned()
73            .collect();
74        Self::from_runs(running_runs)
75    }
76}
77
78#[cfg(test)]
79mod tests {
80    use actionqueue_core::ids::TaskId;
81    use actionqueue_core::run::state::RunState;
82
83    use super::*;
84    use crate::index::test_util::build_run;
85
86    #[test]
87    fn running_index_filters_correctly() {
88        let now = 1000;
89        let task_id = TaskId::new();
90
91        // Create runs in different states
92        let runs = vec![
93            build_run(task_id, RunState::Ready, 900, now, 0, None),
94            build_run(task_id, RunState::Leased, 900, now, 0, None),
95            build_run(
96                task_id,
97                RunState::Running,
98                950,
99                now,
100                0,
101                Some(actionqueue_core::ids::AttemptId::new()),
102            ),
103            build_run(task_id, RunState::Completed, 800, now, 0, None),
104        ];
105
106        let index = RunningIndex::from(runs.as_slice());
107
108        assert_eq!(index.len(), 2);
109        assert_eq!(index.runs().len(), 2);
110        assert_eq!(index.leased().len(), 1);
111        assert_eq!(index.executing().len(), 1);
112        assert!(index
113            .runs()
114            .iter()
115            .all(|run| matches!(run.state(), RunState::Leased | RunState::Running)));
116    }
117
118    #[test]
119    fn running_index_is_empty() {
120        let index = RunningIndex::new();
121        assert!(index.is_empty());
122        assert_eq!(index.len(), 0);
123
124        let now = 1000;
125        let task_id = TaskId::new();
126        let run = build_run(task_id, RunState::Leased, 900, now, 0, None);
127        let index = RunningIndex::from(std::slice::from_ref(&run));
128
129        assert!(!index.is_empty());
130        assert_eq!(index.len(), 1);
131    }
132
133    #[test]
134    fn running_index_preserves_active_run_order() {
135        let now = 1000;
136        let task_id = TaskId::new();
137
138        let ready = build_run(task_id, RunState::Ready, 800, now, 0, None);
139        let leased = build_run(task_id, RunState::Leased, 810, now, 0, None);
140        let completed = build_run(task_id, RunState::Completed, 820, now, 0, None);
141        let running = build_run(
142            task_id,
143            RunState::Running,
144            830,
145            now,
146            0,
147            Some(actionqueue_core::ids::AttemptId::new()),
148        );
149
150        let expected_order = vec![leased.id().to_string(), running.id().to_string()];
151        let runs = vec![ready, leased, completed, running];
152        let index = RunningIndex::from(runs.as_slice());
153
154        let actual_order: Vec<String> =
155            index.runs().iter().map(|run| run.id().to_string()).collect();
156        assert_eq!(actual_order, expected_order);
157    }
158}