Skip to main content

actionqueue_engine/index/
ready.rs

1//! Ready index - runs eligible to be leased.
2//!
3//! The ready index holds run instances in the Ready state. These runs have
4//! passed their scheduled_at time and are eligible to be selected by the
5//! scheduler for leasing to an executor.
6
7use actionqueue_core::run::run_instance::RunInstance;
8use actionqueue_core::run::state::RunState;
9
10/// A view of all runs in the Ready state.
11///
12/// This structure provides filtering and traversal over runs that are
13/// eligible to be leased. Runs transition from Ready to Leased (or Canceled)
14/// when selected by the scheduler.
15#[derive(Debug, Clone, Default, PartialEq, Eq)]
16pub struct ReadyIndex {
17    /// The runs in Ready state
18    runs: Vec<RunInstance>,
19}
20
21impl ReadyIndex {
22    /// Creates a new empty ready index.
23    pub fn new() -> Self {
24        Self { runs: Vec::new() }
25    }
26
27    /// Creates a ready index from a vector of runs.
28    ///
29    /// All runs must be in the `Ready` state.
30    pub fn from_runs(runs: Vec<RunInstance>) -> Self {
31        debug_assert!(
32            runs.iter().all(|r| r.state() == RunState::Ready),
33            "ReadyIndex::from_runs called with non-Ready run"
34        );
35        Self { runs }
36    }
37
38    /// Returns all runs in the ready index.
39    pub fn runs(&self) -> &[RunInstance] {
40        &self.runs
41    }
42
43    /// Returns the number of runs in the ready index.
44    pub fn len(&self) -> usize {
45        self.runs.len()
46    }
47
48    /// Returns true if the index contains no runs.
49    pub fn is_empty(&self) -> bool {
50        self.runs.is_empty()
51    }
52}
53
54impl From<&[RunInstance]> for ReadyIndex {
55    fn from(runs: &[RunInstance]) -> Self {
56        let ready_runs: Vec<RunInstance> =
57            runs.iter().filter(|run| run.state() == RunState::Ready).cloned().collect();
58        Self::from_runs(ready_runs)
59    }
60}
61
62#[cfg(test)]
63mod tests {
64    use actionqueue_core::ids::TaskId;
65    use actionqueue_core::run::state::RunState;
66
67    use super::*;
68    use crate::index::test_util::build_run;
69
70    #[test]
71    fn ready_index_filters_correctly() {
72        let now = 1000;
73        let task_id = TaskId::new();
74
75        // Create runs in different states
76        let runs = vec![
77            build_run(task_id, RunState::Scheduled, 900, now, 0, None),
78            build_run(task_id, RunState::Ready, 900, now, 0, None),
79            build_run(task_id, RunState::Ready, 950, now, 0, None),
80            build_run(task_id, RunState::Completed, 800, now, 0, None),
81        ];
82
83        let index = ReadyIndex::from(runs.as_slice());
84
85        assert_eq!(index.len(), 2);
86        assert_eq!(index.runs().len(), 2);
87        assert!(index.runs().iter().all(|run| run.state() == RunState::Ready));
88    }
89
90    #[test]
91    fn ready_index_is_empty() {
92        let index = ReadyIndex::new();
93        assert!(index.is_empty());
94        assert_eq!(index.len(), 0);
95
96        let now = 1000;
97        let task_id = TaskId::new();
98        let run = build_run(task_id, RunState::Ready, 900, now, 0, None);
99        let index = ReadyIndex::from(std::slice::from_ref(&run));
100
101        assert!(!index.is_empty());
102        assert_eq!(index.len(), 1);
103    }
104
105    #[test]
106    fn ready_index_preserves_ready_order_and_state_purity() {
107        let now = 1000;
108        let task_id = TaskId::new();
109
110        let scheduled = build_run(task_id, RunState::Scheduled, 800, now, 0, None);
111        let ready_first = build_run(task_id, RunState::Ready, 810, now, 0, None);
112        let running = build_run(
113            task_id,
114            RunState::Running,
115            820,
116            now,
117            0,
118            Some(actionqueue_core::ids::AttemptId::new()),
119        );
120        let ready_second = build_run(task_id, RunState::Ready, 830, now, 0, None);
121        let canceled = build_run(task_id, RunState::Canceled, 840, now, 0, None);
122
123        let expected_order = vec![ready_first.id().to_string(), ready_second.id().to_string()];
124        let runs = vec![scheduled, ready_first, running, ready_second, canceled];
125        let index = ReadyIndex::from(runs.as_slice());
126
127        let actual_order: Vec<String> =
128            index.runs().iter().map(|run| run.id().to_string()).collect();
129        assert_eq!(actual_order, expected_order);
130        assert!(index.runs().iter().all(|run| run.state() == RunState::Ready));
131    }
132}