actionqueue_engine/index/
scheduled.rs1use actionqueue_core::run::run_instance::RunInstance;
8use actionqueue_core::run::state::RunState;
9
10#[derive(Debug, Clone, Default, PartialEq, Eq)]
16pub struct ScheduledIndex {
17 runs: Vec<RunInstance>,
19}
20
21impl ScheduledIndex {
22 pub fn new() -> Self {
24 Self { runs: Vec::new() }
25 }
26
27 pub fn from_runs(runs: Vec<RunInstance>) -> Self {
31 debug_assert!(
32 runs.iter().all(|r| r.state() == RunState::Scheduled),
33 "ScheduledIndex::from_runs called with non-Scheduled run"
34 );
35 Self { runs }
36 }
37
38 pub fn runs(&self) -> &[RunInstance] {
40 &self.runs
41 }
42
43 pub fn len(&self) -> usize {
45 self.runs.len()
46 }
47
48 pub fn is_empty(&self) -> bool {
50 self.runs.is_empty()
51 }
52
53 pub fn ready_for_promotion(&self, current_time: u64) -> Vec<&RunInstance> {
57 self.runs.iter().filter(|run| run.scheduled_at() <= current_time).collect()
58 }
59
60 pub fn waiting(&self, current_time: u64) -> Vec<&RunInstance> {
64 self.runs.iter().filter(|run| run.scheduled_at() > current_time).collect()
65 }
66}
67
68impl From<&[RunInstance]> for ScheduledIndex {
69 fn from(runs: &[RunInstance]) -> Self {
70 let scheduled_runs: Vec<RunInstance> =
71 runs.iter().filter(|run| run.state() == RunState::Scheduled).cloned().collect();
72 Self::from_runs(scheduled_runs)
73 }
74}
75
76#[cfg(test)]
77mod tests {
78 use actionqueue_core::ids::TaskId;
79 use actionqueue_core::run::run_instance::RunInstance;
80
81 use super::*;
82 use crate::index::test_util::build_run;
83
84 #[test]
85 fn scheduled_index_filters_correctly() {
86 let now = 1000;
87 let task_id = TaskId::new();
88
89 let runs = vec![
90 RunInstance::new_scheduled(task_id, 900, now).expect("valid scheduled run"), RunInstance::new_scheduled(task_id, 1000, now).expect("valid scheduled run"), RunInstance::new_scheduled(task_id, 1100, now).expect("valid scheduled run"), ];
94
95 let index = ScheduledIndex::from(runs.as_slice());
96
97 assert_eq!(index.len(), 3);
98
99 let ready = index.ready_for_promotion(1000);
100 assert_eq!(ready.len(), 2); let waiting = index.waiting(1000);
103 assert_eq!(waiting.len(), 1); assert!(index.runs().iter().all(|run| run.state() == RunState::Scheduled));
105 }
106
107 #[test]
108 fn scheduled_index_is_empty() {
109 let index = ScheduledIndex::new();
110 assert!(index.is_empty());
111 assert_eq!(index.len(), 0);
112
113 let now = 1000;
114 let task_id = TaskId::new();
115 let run = RunInstance::new_scheduled(task_id, 1000, now).expect("valid scheduled run");
116 let index = ScheduledIndex::from(std::slice::from_ref(&run));
117
118 assert!(!index.is_empty());
119 assert_eq!(index.len(), 1);
120 }
121
122 #[test]
123 fn scheduled_index_preserves_order_and_state_purity() {
124 let now = 1000;
125 let task_id = TaskId::new();
126
127 let ready = build_run(task_id, RunState::Ready, 700, now, 0, None);
128 let scheduled_first =
129 RunInstance::new_scheduled(task_id, 1100, now).expect("valid scheduled run");
130 let running = build_run(
131 task_id,
132 RunState::Running,
133 800,
134 now,
135 0,
136 Some(actionqueue_core::ids::AttemptId::new()),
137 );
138 let scheduled_second =
139 RunInstance::new_scheduled(task_id, 1200, now).expect("valid scheduled run");
140 let terminal = build_run(task_id, RunState::Completed, 900, now, 0, None);
141
142 let expected_order =
143 vec![scheduled_first.id().to_string(), scheduled_second.id().to_string()];
144 let runs = vec![ready, scheduled_first, running, scheduled_second, terminal];
145 let index = ScheduledIndex::from(runs.as_slice());
146
147 let actual_order: Vec<String> =
148 index.runs().iter().map(|run| run.id().to_string()).collect();
149 assert_eq!(actual_order, expected_order);
150 assert!(index.runs().iter().all(|run| run.state() == RunState::Scheduled));
151 }
152}