dag_executor/dag/
scheduler.rs1use crate::state::{TaskRecord, TaskState};
4use std::cmp::Ordering;
5use std::collections::{BinaryHeap, HashMap};
6
7struct Prioritized {
9 priority: u8,
10 seq: u64,
12 id: String,
13}
14
15impl PartialEq for Prioritized {
16 fn eq(&self, other: &Self) -> bool {
17 self.priority == other.priority && self.seq == other.seq
18 }
19}
20impl Eq for Prioritized {}
21
22impl Ord for Prioritized {
23 fn cmp(&self, other: &Self) -> Ordering {
24 self.priority
27 .cmp(&other.priority)
28 .then_with(|| other.seq.cmp(&self.seq))
29 }
30}
31impl PartialOrd for Prioritized {
32 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
33 Some(self.cmp(other))
34 }
35}
36
37#[derive(Default)]
42pub struct Scheduler {
43 records: HashMap<String, TaskRecord>,
44 ready: BinaryHeap<Prioritized>,
45 seq: u64,
46}
47
48impl Scheduler {
49 pub fn new() -> Self {
51 Self::default()
52 }
53
54 pub fn with_records(records: HashMap<String, TaskRecord>) -> Self {
56 Scheduler {
57 records,
58 ready: BinaryHeap::new(),
59 seq: 0,
60 }
61 }
62
63 pub fn ensure_record(&mut self, id: &str) -> &mut TaskRecord {
65 self.records
66 .entry(id.to_string())
67 .or_insert_with(|| TaskRecord::new(id))
68 }
69
70 pub fn record(&self, id: &str) -> Option<&TaskRecord> {
72 self.records.get(id)
73 }
74
75 pub fn records(&self) -> &HashMap<String, TaskRecord> {
77 &self.records
78 }
79
80 pub fn records_mut(&mut self) -> &mut HashMap<String, TaskRecord> {
82 &mut self.records
83 }
84
85 pub fn state(&self, id: &str) -> Option<TaskState> {
87 self.records.get(id).map(|r| r.state)
88 }
89
90 pub fn transition(&mut self, id: &str, state: TaskState) -> bool {
92 match self.records.get_mut(id) {
93 Some(r) => r.transition(state),
94 None => false,
95 }
96 }
97
98 pub fn mark_ready(&mut self, id: &str, priority: u8) {
100 let applied = {
101 let record = self.ensure_record(id);
102 record.transition(TaskState::Ready)
103 };
104 if applied {
105 let seq = self.seq;
106 self.seq += 1;
107 self.ready.push(Prioritized {
108 priority,
109 seq,
110 id: id.to_string(),
111 });
112 }
113 }
114
115 pub fn next_ready(&mut self) -> Option<String> {
119 while let Some(entry) = self.ready.pop() {
120 if self.state(&entry.id) == Some(TaskState::Ready) {
121 return Some(entry.id);
122 }
123 }
124 None
125 }
126
127 pub fn has_ready(&self) -> bool {
129 !self.ready.is_empty()
131 }
132
133 pub fn all_terminal(&self) -> bool {
135 self.records.values().all(|r| r.state.is_terminal())
136 }
137
138 pub fn count_in(&self, state: TaskState) -> usize {
140 self.records.values().filter(|r| r.state == state).count()
141 }
142}