forge_orchestration/scheduler/
queue.rs1use std::collections::BinaryHeap;
9use std::cmp::Ordering;
10use parking_lot::Mutex;
11use super::Workload;
12
13#[derive(Debug)]
15pub struct QueuedWorkload {
16 pub workload: Workload,
18 pub sequence: u64,
20 pub attempts: u32,
22 pub backoff_until: Option<chrono::DateTime<chrono::Utc>>,
24}
25
26impl QueuedWorkload {
27 pub fn new(workload: Workload, sequence: u64) -> Self {
29 Self {
30 workload,
31 sequence,
32 attempts: 0,
33 backoff_until: None,
34 }
35 }
36
37 pub fn record_failure(&mut self) {
39 self.attempts += 1;
40 let backoff_secs = (2_i64.pow(self.attempts.min(8)) as i64).min(300);
42 self.backoff_until = Some(chrono::Utc::now() + chrono::Duration::seconds(backoff_secs));
43 }
44
45 pub fn is_ready(&self) -> bool {
47 self.backoff_until
48 .map(|t| chrono::Utc::now() >= t)
49 .unwrap_or(true)
50 }
51}
52
53impl PartialEq for QueuedWorkload {
54 fn eq(&self, other: &Self) -> bool {
55 self.workload.id == other.workload.id
56 }
57}
58
59impl Eq for QueuedWorkload {}
60
61impl PartialOrd for QueuedWorkload {
62 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
63 Some(self.cmp(other))
64 }
65}
66
67impl Ord for QueuedWorkload {
68 fn cmp(&self, other: &Self) -> Ordering {
69 match self.workload.priority.cmp(&other.workload.priority) {
71 Ordering::Equal => {
72 other.sequence.cmp(&self.sequence)
74 }
75 other => other,
76 }
77 }
78}
79
80pub struct SchedulingQueue {
82 queue: Mutex<BinaryHeap<QueuedWorkload>>,
84 sequence: Mutex<u64>,
86}
87
88impl SchedulingQueue {
89 pub fn new() -> Self {
91 Self {
92 queue: Mutex::new(BinaryHeap::new()),
93 sequence: Mutex::new(0),
94 }
95 }
96
97 pub fn enqueue(&self, workload: Workload) {
99 let mut seq = self.sequence.lock();
100 let sequence = *seq;
101 *seq += 1;
102 drop(seq);
103
104 let queued = QueuedWorkload::new(workload, sequence);
105 self.queue.lock().push(queued);
106 }
107
108 pub fn dequeue(&self) -> Option<Workload> {
110 let mut queue = self.queue.lock();
111
112 let mut not_ready = Vec::new();
114
115 while let Some(queued) = queue.pop() {
116 if queued.is_ready() {
117 for q in not_ready {
119 queue.push(q);
120 }
121 return Some(queued.workload);
122 } else {
123 not_ready.push(queued);
124 }
125 }
126
127 for q in not_ready {
129 queue.push(q);
130 }
131
132 None
133 }
134
135 pub fn peek(&self) -> Option<String> {
137 self.queue.lock().peek().map(|q| q.workload.id.clone())
138 }
139
140 pub fn len(&self) -> usize {
142 self.queue.lock().len()
143 }
144
145 pub fn is_empty(&self) -> bool {
147 self.queue.lock().is_empty()
148 }
149
150 pub fn remove(&self, workload_id: &str) -> bool {
152 let mut queue = self.queue.lock();
153 let items: Vec<_> = std::mem::take(&mut *queue).into_vec();
154 let mut found = false;
155
156 for item in items {
157 if item.workload.id == workload_id {
158 found = true;
159 } else {
160 queue.push(item);
161 }
162 }
163
164 found
165 }
166
167 pub fn requeue_with_backoff(&self, mut queued: QueuedWorkload) {
169 queued.record_failure();
170 self.queue.lock().push(queued);
171 }
172}
173
174impl Default for SchedulingQueue {
175 fn default() -> Self {
176 Self::new()
177 }
178}
179
180#[cfg(test)]
181mod tests {
182 use super::*;
183
184 #[test]
185 fn test_priority_ordering() {
186 let queue = SchedulingQueue::new();
187
188 queue.enqueue(Workload::new("low", "low").with_priority(10));
189 queue.enqueue(Workload::new("high", "high").with_priority(100));
190 queue.enqueue(Workload::new("medium", "medium").with_priority(50));
191
192 assert_eq!(queue.dequeue().unwrap().id, "high");
193 assert_eq!(queue.dequeue().unwrap().id, "medium");
194 assert_eq!(queue.dequeue().unwrap().id, "low");
195 }
196
197 #[test]
198 fn test_fifo_within_priority() {
199 let queue = SchedulingQueue::new();
200
201 queue.enqueue(Workload::new("first", "first").with_priority(50));
202 queue.enqueue(Workload::new("second", "second").with_priority(50));
203 queue.enqueue(Workload::new("third", "third").with_priority(50));
204
205 assert_eq!(queue.dequeue().unwrap().id, "first");
206 assert_eq!(queue.dequeue().unwrap().id, "second");
207 assert_eq!(queue.dequeue().unwrap().id, "third");
208 }
209
210 #[test]
211 fn test_remove() {
212 let queue = SchedulingQueue::new();
213
214 queue.enqueue(Workload::new("w1", "w1"));
215 queue.enqueue(Workload::new("w2", "w2"));
216 queue.enqueue(Workload::new("w3", "w3"));
217
218 assert!(queue.remove("w2"));
219 assert_eq!(queue.len(), 2);
220 assert!(!queue.remove("w2")); }
222}