1use std::cmp::Ordering;
26use std::collections::{BinaryHeap, VecDeque};
27use std::time::Instant;
28
29use crate::task::{Task, TaskPriority};
30
31pub struct TaskQueue {
47 tasks: VecDeque<Task>,
49
50 enabled: bool,
53
54 priority: TaskPriority,
56}
57
58impl TaskQueue {
59 #[inline]
61 #[must_use]
62 pub fn new(priority: TaskPriority) -> Self {
63 Self {
64 tasks: VecDeque::new(),
65 enabled: true,
66 priority,
67 }
68 }
69
70 #[inline]
72 pub fn push(&mut self, task: Task) {
73 self.tasks.push_back(task);
74 }
75
76 #[inline]
78 pub fn pop(&mut self) -> Option<Task> {
79 if !self.enabled {
80 return None;
81 }
82 self.tasks.pop_front()
83 }
84
85 #[inline]
87 #[must_use]
88 pub fn front(&self) -> Option<&Task> {
89 if !self.enabled {
90 return None;
91 }
92 self.tasks.front()
93 }
94
95 #[inline]
97 #[must_use]
98 pub fn len(&self) -> usize {
99 self.tasks.len()
100 }
101
102 #[inline]
104 #[must_use]
105 pub fn is_empty(&self) -> bool {
106 self.tasks.is_empty()
107 }
108
109 #[inline]
111 #[must_use]
112 pub fn has_ready(&self) -> bool {
113 self.enabled && !self.tasks.is_empty()
114 }
115
116 #[inline]
118 #[must_use]
119 pub fn is_enabled(&self) -> bool {
120 self.enabled
121 }
122
123 #[inline]
128 pub fn set_enabled(&mut self, enabled: bool) {
129 self.enabled = enabled;
130 }
131
132 #[inline]
134 #[must_use]
135 pub fn priority(&self) -> TaskPriority {
136 self.priority
137 }
138}
139
140struct DelayedEntry {
145 task: Task,
146 deadline: Instant,
149}
150
151impl PartialEq for DelayedEntry {
152 fn eq(&self, other: &Self) -> bool {
153 self.deadline == other.deadline
154 }
155}
156
157impl Eq for DelayedEntry {}
158
159impl PartialOrd for DelayedEntry {
160 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
161 Some(self.cmp(other))
162 }
163}
164
165impl Ord for DelayedEntry {
166 fn cmp(&self, other: &Self) -> Ordering {
167 other.deadline.cmp(&self.deadline)
169 }
170}
171
172const STARVATION_THRESHOLD: u32 = 64;
181
182pub struct TaskQueueManager {
201 queues: [TaskQueue; TaskPriority::COUNT],
204
205 delayed: BinaryHeap<DelayedEntry>,
208
209 consecutive_high: u32,
211}
212
213impl TaskQueueManager {
214 #[must_use]
216 pub fn new() -> Self {
217 Self {
218 queues: [
219 TaskQueue::new(TaskPriority::Input),
220 TaskQueue::new(TaskPriority::UserBlocking),
221 TaskQueue::new(TaskPriority::Normal),
222 TaskQueue::new(TaskPriority::Timer),
223 TaskQueue::new(TaskPriority::BestEffort),
224 TaskQueue::new(TaskPriority::Idle),
225 ],
226 delayed: BinaryHeap::new(),
227 consecutive_high: 0,
228 }
229 }
230
231 pub fn push(&mut self, task: Task) {
236 if let Some(deadline) = task.run_at() {
237 if task.is_ready() {
238 self.queues[task.priority().as_index()].push(task);
240 } else {
241 self.delayed.push(DelayedEntry { task, deadline });
242 }
243 } else {
244 self.queues[task.priority().as_index()].push(task);
245 }
246 }
247
248 pub fn pick(&mut self) -> Option<Task> {
253 if self.consecutive_high >= STARVATION_THRESHOLD {
255 self.consecutive_high = 0;
258
259 if let Some(task) = self.pick_lowest_nonempty() {
260 return Some(task);
261 }
262 }
263
264 for idx in 0..TaskPriority::COUNT {
266 if let Some(task) = self.queues[idx].pop() {
267 if idx <= TaskPriority::UserBlocking.as_index() {
268 self.consecutive_high += 1;
269 } else {
270 self.consecutive_high = 0;
271 }
272 return Some(task);
273 }
274 }
275
276 None
277 }
278
279 pub fn promote_delayed(&mut self) {
285 let now = Instant::now();
286 while let Some(entry) = self.delayed.peek() {
287 if entry.deadline > now {
288 break; }
290 let entry = self.delayed.pop().unwrap();
291 self.queues[entry.task.priority().as_index()].push(entry.task);
292 }
293 }
294
295 #[must_use]
300 pub fn next_delayed_ready_in(&self) -> Option<std::time::Duration> {
301 self.delayed
302 .peek()
303 .map(|entry| entry.deadline.saturating_duration_since(Instant::now()))
304 }
305
306 #[inline]
308 #[must_use]
309 pub fn queue(&self, priority: TaskPriority) -> &TaskQueue {
310 &self.queues[priority.as_index()]
311 }
312
313 #[inline]
320 pub fn queue_mut(&mut self, priority: TaskPriority) -> &mut TaskQueue {
321 &mut self.queues[priority.as_index()]
322 }
323
324 #[must_use]
327 pub fn ready_count(&self) -> usize {
328 self.queues
329 .iter()
330 .filter(|q| q.is_enabled())
331 .map(|q| q.len())
332 .sum()
333 }
334
335 #[inline]
337 #[must_use]
338 pub fn delayed_count(&self) -> usize {
339 self.delayed.len()
340 }
341
342 #[must_use]
344 pub fn has_ready(&self) -> bool {
345 self.queues.iter().any(|q| q.has_ready())
346 }
347
348 #[must_use]
350 pub fn is_empty(&self) -> bool {
351 !self.has_ready() && self.delayed.is_empty()
352 }
353
354 #[inline]
356 #[must_use]
357 pub fn has_delayed(&self) -> bool {
358 !self.delayed.is_empty()
359 }
360
361 fn pick_lowest_nonempty(&mut self) -> Option<Task> {
363 for idx in (0..TaskPriority::COUNT).rev() {
364 if let Some(task) = self.queues[idx].pop() {
365 return Some(task);
366 }
367 }
368 None
369 }
370}
371
372impl Default for TaskQueueManager {
373 fn default() -> Self {
374 Self::new()
375 }
376}
377
378#[cfg(test)]
379mod tests {
380 use super::*;
381 use std::cell::Cell;
382 use std::rc::Rc;
383 use std::time::Duration;
384
385 #[test]
388 fn queue_fifo_order() {
389 let log = Rc::new(std::cell::RefCell::new(Vec::new()));
390 let mut q = TaskQueue::new(TaskPriority::Normal);
391
392 for i in 0..3 {
393 let l = log.clone();
394 q.push(Task::new(TaskPriority::Normal, move || {
395 l.borrow_mut().push(i)
396 }));
397 }
398
399 assert_eq!(q.len(), 3);
400 while let Some(task) = q.pop() {
401 task.run();
402 }
403 assert_eq!(*log.borrow(), vec![0, 1, 2]);
404 }
405
406 #[test]
407 fn queue_disabled_returns_none() {
408 let mut q = TaskQueue::new(TaskPriority::Normal);
409 q.push(Task::new(TaskPriority::Normal, || {}));
410
411 q.set_enabled(false);
412 assert!(!q.is_enabled());
413 assert!(q.pop().is_none());
414 assert!(!q.has_ready());
415 assert_eq!(q.len(), 1); q.set_enabled(true);
418 assert!(q.pop().is_some());
419 }
420
421 #[test]
422 fn queue_front_peek() {
423 let mut q = TaskQueue::new(TaskPriority::Input);
424 assert!(q.front().is_none());
425
426 q.push(Task::new(TaskPriority::Input, || {}));
427 assert!(q.front().is_some());
428 assert_eq!(q.len(), 1); }
430
431 #[test]
432 fn queue_front_none_when_disabled() {
433 let mut q = TaskQueue::new(TaskPriority::Normal);
434 q.push(Task::new(TaskPriority::Normal, || {}));
435 q.set_enabled(false);
436 assert!(q.front().is_none());
437 }
438
439 #[test]
442 fn set_empty_returns_none() {
443 let mut mgr = TaskQueueManager::new();
444 assert!(mgr.pick().is_none());
445 assert!(mgr.is_empty());
446 }
447
448 #[test]
449 fn set_higher_priority_first() {
450 let log = Rc::new(std::cell::RefCell::new(Vec::new()));
451 let mut mgr = TaskQueueManager::new();
452
453 let l = log.clone();
454 mgr.push(Task::new(TaskPriority::Idle, move || {
455 l.borrow_mut().push("idle")
456 }));
457 let l = log.clone();
458 mgr.push(Task::new(TaskPriority::Input, move || {
459 l.borrow_mut().push("input")
460 }));
461 let l = log.clone();
462 mgr.push(Task::new(TaskPriority::Normal, move || {
463 l.borrow_mut().push("normal")
464 }));
465
466 while let Some(task) = mgr.pick() {
467 task.run();
468 }
469 assert_eq!(*log.borrow(), vec!["input", "normal", "idle"]);
470 }
471
472 #[test]
473 fn set_fifo_within_priority() {
474 let log = Rc::new(std::cell::RefCell::new(Vec::new()));
475 let mut mgr = TaskQueueManager::new();
476
477 for i in 0..3 {
478 let l = log.clone();
479 mgr.push(Task::new(TaskPriority::Normal, move || {
480 l.borrow_mut().push(i)
481 }));
482 }
483
484 while let Some(task) = mgr.pick() {
485 task.run();
486 }
487 assert_eq!(*log.borrow(), vec![0, 1, 2]);
488 }
489
490 #[test]
491 fn set_disabled_queue_skipped() {
492 let mut mgr = TaskQueueManager::new();
493 mgr.push(Task::new(TaskPriority::Normal, || {}));
494
495 mgr.queue_mut(TaskPriority::Normal).set_enabled(false);
496 assert!(mgr.pick().is_none());
497
498 mgr.queue_mut(TaskPriority::Normal).set_enabled(true);
499 assert!(mgr.pick().is_some());
500 }
501
502 #[test]
503 fn set_anti_starvation() {
504 let log = Rc::new(std::cell::RefCell::new(Vec::<&str>::new()));
505 let mut mgr = TaskQueueManager::new();
506
507 for _ in 0..STARVATION_THRESHOLD + 1 {
508 let l = log.clone();
509 mgr.push(Task::new(TaskPriority::Input, move || {
510 l.borrow_mut().push("input")
511 }));
512 }
513
514 let l = log.clone();
515 mgr.push(Task::new(TaskPriority::Idle, move || {
516 l.borrow_mut().push("idle")
517 }));
518
519 for _ in 0..STARVATION_THRESHOLD + 1 {
520 mgr.pick().unwrap().run();
521 }
522
523 let entries = log.borrow();
524 assert_eq!(entries[STARVATION_THRESHOLD as usize], "idle");
525 }
526
527 #[test]
528 fn anti_starvation_resets_when_no_low_priority() {
529 let mut mgr = TaskQueueManager::new();
532
533 for _ in 0..STARVATION_THRESHOLD * 3 {
535 mgr.push(Task::new(TaskPriority::Input, || {}));
536 }
537
538 for _ in 0..STARVATION_THRESHOLD * 3 {
540 assert!(mgr.pick().is_some());
541 }
542 }
543
544 #[test]
545 fn delayed_task_not_immediately_ready() {
546 let mut mgr = TaskQueueManager::new();
547 mgr.push(Task::delayed(
548 TaskPriority::Timer,
549 Duration::from_secs(60),
550 || {},
551 ));
552
553 assert_eq!(mgr.ready_count(), 0);
554 assert_eq!(mgr.delayed_count(), 1);
555 assert!(mgr.pick().is_none());
556 assert!(mgr.has_delayed());
557 }
558
559 #[test]
560 fn delayed_task_with_zero_delay_goes_to_queue() {
561 let counter = Rc::new(Cell::new(0u32));
562 let mut mgr = TaskQueueManager::new();
563
564 let c = counter.clone();
565 mgr.push(Task::delayed(
566 TaskPriority::Timer,
567 Duration::ZERO,
568 move || c.set(1),
569 ));
570
571 assert_eq!(mgr.ready_count(), 1);
573 mgr.pick().unwrap().run();
574 assert_eq!(counter.get(), 1);
575 }
576
577 #[test]
578 fn promote_delayed_uses_heap() {
579 let mut mgr = TaskQueueManager::new();
580
581 mgr.push(Task::delayed(
583 TaskPriority::Timer,
584 Duration::from_secs(60),
585 || {},
586 ));
587 mgr.push(Task::delayed(
588 TaskPriority::Timer,
589 Duration::from_secs(30),
590 || {},
591 ));
592 mgr.push(Task::delayed(
593 TaskPriority::Timer,
594 Duration::from_secs(90),
595 || {},
596 ));
597
598 assert_eq!(mgr.delayed_count(), 3);
599
600 mgr.promote_delayed();
602 assert_eq!(mgr.ready_count(), 0);
603
604 let next = mgr.next_delayed_ready_in().unwrap();
606 assert!(next <= Duration::from_secs(31));
607 assert!(next >= Duration::from_secs(28));
608 }
609
610 #[test]
611 fn next_delayed_ready_in_none_when_empty() {
612 let mgr = TaskQueueManager::new();
613 assert!(mgr.next_delayed_ready_in().is_none());
614 }
615
616 #[test]
617 fn set_queue_access() {
618 let mgr = TaskQueueManager::new();
619 assert_eq!(
620 mgr.queue(TaskPriority::Input).priority(),
621 TaskPriority::Input
622 );
623 assert_eq!(mgr.queue(TaskPriority::Idle).priority(), TaskPriority::Idle);
624 }
625
626 #[test]
627 fn ready_count_excludes_disabled() {
628 let mut mgr = TaskQueueManager::new();
629 mgr.push(Task::new(TaskPriority::Input, || {}));
630 mgr.push(Task::new(TaskPriority::Normal, || {}));
631
632 assert_eq!(mgr.ready_count(), 2);
633
634 mgr.queue_mut(TaskPriority::Input).set_enabled(false);
635 assert_eq!(mgr.ready_count(), 1); }
637
638 #[test]
639 fn ready_count_excludes_delayed() {
640 let mut mgr = TaskQueueManager::new();
641 mgr.push(Task::new(TaskPriority::Input, || {}));
642 mgr.push(Task::delayed(
643 TaskPriority::Timer,
644 Duration::from_secs(60),
645 || {},
646 ));
647
648 assert_eq!(mgr.ready_count(), 1);
649 assert!(mgr.has_ready());
650 }
651}