forge_orchestration/scheduler/
queue.rs

1//! Scheduling queue with priority ordering
2//!
3//! Implements a priority queue for pending workloads with:
4//! - Priority-based ordering
5//! - FIFO within same priority
6//! - Backoff for repeatedly failing workloads
7
8use std::collections::BinaryHeap;
9use std::cmp::Ordering;
10use parking_lot::Mutex;
11use super::Workload;
12
13/// Queued workload wrapper for priority ordering
14#[derive(Debug)]
15pub struct QueuedWorkload {
16    /// The workload
17    pub workload: Workload,
18    /// Queue position (for FIFO within priority)
19    pub sequence: u64,
20    /// Number of scheduling attempts
21    pub attempts: u32,
22    /// Backoff until this time
23    pub backoff_until: Option<chrono::DateTime<chrono::Utc>>,
24}
25
26impl QueuedWorkload {
27    /// Create new queued workload
28    pub fn new(workload: Workload, sequence: u64) -> Self {
29        Self {
30            workload,
31            sequence,
32            attempts: 0,
33            backoff_until: None,
34        }
35    }
36
37    /// Increment attempts and set backoff
38    pub fn record_failure(&mut self) {
39        self.attempts += 1;
40        // Exponential backoff: 1s, 2s, 4s, 8s, ... up to 5 minutes
41        let backoff_secs = (2_i64.pow(self.attempts.min(8)) as i64).min(300);
42        self.backoff_until = Some(chrono::Utc::now() + chrono::Duration::seconds(backoff_secs));
43    }
44
45    /// Check if workload is ready to be scheduled
46    pub fn is_ready(&self) -> bool {
47        self.backoff_until
48            .map(|t| chrono::Utc::now() >= t)
49            .unwrap_or(true)
50    }
51}
52
53impl PartialEq for QueuedWorkload {
54    fn eq(&self, other: &Self) -> bool {
55        self.workload.id == other.workload.id
56    }
57}
58
59impl Eq for QueuedWorkload {}
60
61impl PartialOrd for QueuedWorkload {
62    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
63        Some(self.cmp(other))
64    }
65}
66
67impl Ord for QueuedWorkload {
68    fn cmp(&self, other: &Self) -> Ordering {
69        // Higher priority first
70        match self.workload.priority.cmp(&other.workload.priority) {
71            Ordering::Equal => {
72                // Lower sequence (earlier) first for FIFO
73                other.sequence.cmp(&self.sequence)
74            }
75            other => other,
76        }
77    }
78}
79
80/// Scheduling queue
81pub struct SchedulingQueue {
82    /// Priority queue of pending workloads
83    queue: Mutex<BinaryHeap<QueuedWorkload>>,
84    /// Sequence counter for FIFO ordering
85    sequence: Mutex<u64>,
86}
87
88impl SchedulingQueue {
89    /// Create new scheduling queue
90    pub fn new() -> Self {
91        Self {
92            queue: Mutex::new(BinaryHeap::new()),
93            sequence: Mutex::new(0),
94        }
95    }
96
97    /// Add workload to queue
98    pub fn enqueue(&self, workload: Workload) {
99        let mut seq = self.sequence.lock();
100        let sequence = *seq;
101        *seq += 1;
102        drop(seq);
103
104        let queued = QueuedWorkload::new(workload, sequence);
105        self.queue.lock().push(queued);
106    }
107
108    /// Get next workload to schedule
109    pub fn dequeue(&self) -> Option<Workload> {
110        let mut queue = self.queue.lock();
111        
112        // Find first ready workload
113        let mut not_ready = Vec::new();
114        
115        while let Some(queued) = queue.pop() {
116            if queued.is_ready() {
117                // Put back the not-ready ones
118                for q in not_ready {
119                    queue.push(q);
120                }
121                return Some(queued.workload);
122            } else {
123                not_ready.push(queued);
124            }
125        }
126
127        // Put back all not-ready workloads
128        for q in not_ready {
129            queue.push(q);
130        }
131
132        None
133    }
134
135    /// Peek at next workload without removing
136    pub fn peek(&self) -> Option<String> {
137        self.queue.lock().peek().map(|q| q.workload.id.clone())
138    }
139
140    /// Get queue length
141    pub fn len(&self) -> usize {
142        self.queue.lock().len()
143    }
144
145    /// Check if queue is empty
146    pub fn is_empty(&self) -> bool {
147        self.queue.lock().is_empty()
148    }
149
150    /// Remove workload from queue
151    pub fn remove(&self, workload_id: &str) -> bool {
152        let mut queue = self.queue.lock();
153        let items: Vec<_> = std::mem::take(&mut *queue).into_vec();
154        let mut found = false;
155        
156        for item in items {
157            if item.workload.id == workload_id {
158                found = true;
159            } else {
160                queue.push(item);
161            }
162        }
163
164        found
165    }
166
167    /// Re-queue a workload with backoff
168    pub fn requeue_with_backoff(&self, mut queued: QueuedWorkload) {
169        queued.record_failure();
170        self.queue.lock().push(queued);
171    }
172}
173
174impl Default for SchedulingQueue {
175    fn default() -> Self {
176        Self::new()
177    }
178}
179
180#[cfg(test)]
181mod tests {
182    use super::*;
183
184    #[test]
185    fn test_priority_ordering() {
186        let queue = SchedulingQueue::new();
187
188        queue.enqueue(Workload::new("low", "low").with_priority(10));
189        queue.enqueue(Workload::new("high", "high").with_priority(100));
190        queue.enqueue(Workload::new("medium", "medium").with_priority(50));
191
192        assert_eq!(queue.dequeue().unwrap().id, "high");
193        assert_eq!(queue.dequeue().unwrap().id, "medium");
194        assert_eq!(queue.dequeue().unwrap().id, "low");
195    }
196
197    #[test]
198    fn test_fifo_within_priority() {
199        let queue = SchedulingQueue::new();
200
201        queue.enqueue(Workload::new("first", "first").with_priority(50));
202        queue.enqueue(Workload::new("second", "second").with_priority(50));
203        queue.enqueue(Workload::new("third", "third").with_priority(50));
204
205        assert_eq!(queue.dequeue().unwrap().id, "first");
206        assert_eq!(queue.dequeue().unwrap().id, "second");
207        assert_eq!(queue.dequeue().unwrap().id, "third");
208    }
209
210    #[test]
211    fn test_remove() {
212        let queue = SchedulingQueue::new();
213
214        queue.enqueue(Workload::new("w1", "w1"));
215        queue.enqueue(Workload::new("w2", "w2"));
216        queue.enqueue(Workload::new("w3", "w3"));
217
218        assert!(queue.remove("w2"));
219        assert_eq!(queue.len(), 2);
220        assert!(!queue.remove("w2")); // Already removed
221    }
222}