Skip to main content

actionqueue_engine/index/
scheduled.rs

1//! Scheduled index - runs waiting to become ready.
2//!
3//! The scheduled index holds run instances in the Scheduled state. These runs
4//! are newly derived and waiting for their scheduled_at time to pass before
5//! they can transition to Ready.
6
7use actionqueue_core::run::run_instance::RunInstance;
8use actionqueue_core::run::state::RunState;
9
10/// A view of all runs in the Scheduled state.
11///
12/// This structure provides filtering and traversal over runs that are
13/// waiting to become ready. Runs transition from Scheduled to Ready when
14/// the scheduled_at timestamp has passed according to the scheduler clock.
15#[derive(Debug, Clone, Default, PartialEq, Eq)]
16pub struct ScheduledIndex {
17    /// The runs in Scheduled state
18    runs: Vec<RunInstance>,
19}
20
21impl ScheduledIndex {
22    /// Creates a new empty scheduled index.
23    pub fn new() -> Self {
24        Self { runs: Vec::new() }
25    }
26
27    /// Creates a scheduled index from a vector of runs.
28    ///
29    /// All runs must be in the `Scheduled` state.
30    pub fn from_runs(runs: Vec<RunInstance>) -> Self {
31        debug_assert!(
32            runs.iter().all(|r| r.state() == RunState::Scheduled),
33            "ScheduledIndex::from_runs called with non-Scheduled run"
34        );
35        Self { runs }
36    }
37
38    /// Returns all runs in the scheduled index.
39    pub fn runs(&self) -> &[RunInstance] {
40        &self.runs
41    }
42
43    /// Returns the number of runs in the scheduled index.
44    pub fn len(&self) -> usize {
45        self.runs.len()
46    }
47
48    /// Returns true if the index contains no runs.
49    pub fn is_empty(&self) -> bool {
50        self.runs.is_empty()
51    }
52
53    /// Filters runs that are ready to transition to the Ready state.
54    ///
55    /// A run is ready for promotion if its scheduled_at time has passed.
56    pub fn ready_for_promotion(&self, current_time: u64) -> Vec<&RunInstance> {
57        self.runs.iter().filter(|run| run.scheduled_at() <= current_time).collect()
58    }
59
60    /// Filters runs that are still waiting to become ready.
61    ///
62    /// A run is still waiting if its scheduled_at time is in the future.
63    pub fn waiting(&self, current_time: u64) -> Vec<&RunInstance> {
64        self.runs.iter().filter(|run| run.scheduled_at() > current_time).collect()
65    }
66}
67
68impl From<&[RunInstance]> for ScheduledIndex {
69    fn from(runs: &[RunInstance]) -> Self {
70        let scheduled_runs: Vec<RunInstance> =
71            runs.iter().filter(|run| run.state() == RunState::Scheduled).cloned().collect();
72        Self::from_runs(scheduled_runs)
73    }
74}
75
76#[cfg(test)]
77mod tests {
78    use actionqueue_core::ids::TaskId;
79    use actionqueue_core::run::run_instance::RunInstance;
80
81    use super::*;
82    use crate::index::test_util::build_run;
83
84    #[test]
85    fn scheduled_index_filters_correctly() {
86        let now = 1000;
87        let task_id = TaskId::new();
88
89        let runs = vec![
90            RunInstance::new_scheduled(task_id, 900, now).expect("valid scheduled run"), /* past, ready for promotion */
91            RunInstance::new_scheduled(task_id, 1000, now).expect("valid scheduled run"), /* current, ready for promotion */
92            RunInstance::new_scheduled(task_id, 1100, now).expect("valid scheduled run"), /* future, still waiting */
93        ];
94
95        let index = ScheduledIndex::from(runs.as_slice());
96
97        assert_eq!(index.len(), 3);
98
99        let ready = index.ready_for_promotion(1000);
100        assert_eq!(ready.len(), 2); // 900 and 1000 are <= 1000
101
102        let waiting = index.waiting(1000);
103        assert_eq!(waiting.len(), 1); // 1100 is > 1000
104        assert!(index.runs().iter().all(|run| run.state() == RunState::Scheduled));
105    }
106
107    #[test]
108    fn scheduled_index_is_empty() {
109        let index = ScheduledIndex::new();
110        assert!(index.is_empty());
111        assert_eq!(index.len(), 0);
112
113        let now = 1000;
114        let task_id = TaskId::new();
115        let run = RunInstance::new_scheduled(task_id, 1000, now).expect("valid scheduled run");
116        let index = ScheduledIndex::from(std::slice::from_ref(&run));
117
118        assert!(!index.is_empty());
119        assert_eq!(index.len(), 1);
120    }
121
122    #[test]
123    fn scheduled_index_preserves_order_and_state_purity() {
124        let now = 1000;
125        let task_id = TaskId::new();
126
127        let ready = build_run(task_id, RunState::Ready, 700, now, 0, None);
128        let scheduled_first =
129            RunInstance::new_scheduled(task_id, 1100, now).expect("valid scheduled run");
130        let running = build_run(
131            task_id,
132            RunState::Running,
133            800,
134            now,
135            0,
136            Some(actionqueue_core::ids::AttemptId::new()),
137        );
138        let scheduled_second =
139            RunInstance::new_scheduled(task_id, 1200, now).expect("valid scheduled run");
140        let terminal = build_run(task_id, RunState::Completed, 900, now, 0, None);
141
142        let expected_order =
143            vec![scheduled_first.id().to_string(), scheduled_second.id().to_string()];
144        let runs = vec![ready, scheduled_first, running, scheduled_second, terminal];
145        let index = ScheduledIndex::from(runs.as_slice());
146
147        let actual_order: Vec<String> =
148            index.runs().iter().map(|run| run.id().to_string()).collect();
149        assert_eq!(actual_order, expected_order);
150        assert!(index.runs().iter().all(|run| run.state() == RunState::Scheduled));
151    }
152}