Skip to main content

kozan_scheduler/
queue.rs

1//! Task queues — Chrome's `base::sequence_manager::TaskQueue` + `SequenceManager`.
2//!
3//! Two separate types, each with a single responsibility:
4//!
5//! - [`TaskQueue`] — A single FIFO queue. Can be enabled/disabled, throttled.
6//!   Like Chrome's `TaskQueue` class. One per priority level.
7//!
8//! - [`TaskQueueManager`] — Picks from multiple `TaskQueue`s by priority.
9//!   Like Chrome's `SequenceManager`. Applies anti-starvation.
10//!
11//! # Why not just `VecDeque<Task>`?
12//!
13//! A raw `VecDeque` has no place to add per-queue behavior. By wrapping it
14//! in `TaskQueue`, we can add enable/disable, throttling, fencing, and
15//! queue-level metrics without changing any caller code. This is Chrome's
16//! "one class per concept" principle — the key to extensibility.
17//!
18//! # Delayed tasks
19//!
20//! Tasks with a future `run_at` are stored in a `BinaryHeap` (min-heap)
21//! sorted by deadline. [`promote_delayed()`](TaskQueueManager::promote_delayed)
22//! pops only the tasks that are ready — O(k log n) where k = ready count.
23//! This is critical for performance with many timers (setTimeout-equivalent).
24
25use std::cmp::Ordering;
26use std::collections::{BinaryHeap, VecDeque};
27use std::time::Instant;
28
29use crate::task::{Task, TaskPriority};
30
31// ---- TaskQueue (single FIFO) ----
32
33/// A single FIFO task queue with enable/disable support.
34///
35/// Like Chrome's `base::sequence_manager::TaskQueue`.
36/// Each priority level in the scheduler owns one `TaskQueue`.
37///
38/// # Chrome features mapped
39///
40/// | Chrome                         | Kozan                  |
41/// |--------------------------------|------------------------|
42/// | `TaskQueue::SetQueueEnabled()` | `set_enabled()`        |
43/// | `TaskQueue::InsertFence()`     | (future: `set_fence()` |
44/// | `TaskQueue::GetNumberOfPending`| `len()`                |
45/// | `PushBack` / `TakeTask`        | `push()` / `pop()`     |
46pub struct TaskQueue {
47    /// The underlying FIFO buffer.
48    tasks: VecDeque<Task>,
49
50    /// Whether this queue is enabled. Disabled queues are skipped by the picker.
51    /// Chrome uses this for throttling background tabs — disable timer queue.
52    enabled: bool,
53
54    /// The priority this queue serves (for diagnostics and debugging).
55    priority: TaskPriority,
56}
57
58impl TaskQueue {
59    /// Create a new empty queue for the given priority.
60    #[inline]
61    #[must_use]
62    pub fn new(priority: TaskPriority) -> Self {
63        Self {
64            tasks: VecDeque::new(),
65            enabled: true,
66            priority,
67        }
68    }
69
70    /// Push a task to the back of the queue.
71    #[inline]
72    pub fn push(&mut self, task: Task) {
73        self.tasks.push_back(task);
74    }
75
76    /// Pop the front task. Returns `None` if empty or disabled.
77    #[inline]
78    pub fn pop(&mut self) -> Option<Task> {
79        if !self.enabled {
80            return None;
81        }
82        self.tasks.pop_front()
83    }
84
85    /// Peek at the front task without removing it.
86    #[inline]
87    #[must_use]
88    pub fn front(&self) -> Option<&Task> {
89        if !self.enabled {
90            return None;
91        }
92        self.tasks.front()
93    }
94
95    /// Number of tasks in this queue (regardless of enabled state).
96    #[inline]
97    #[must_use]
98    pub fn len(&self) -> usize {
99        self.tasks.len()
100    }
101
102    /// Whether this queue has no tasks.
103    #[inline]
104    #[must_use]
105    pub fn is_empty(&self) -> bool {
106        self.tasks.is_empty()
107    }
108
109    /// Whether this queue has tasks AND is enabled (actually pickable).
110    #[inline]
111    #[must_use]
112    pub fn has_ready(&self) -> bool {
113        self.enabled && !self.tasks.is_empty()
114    }
115
116    /// Whether this queue is enabled.
117    #[inline]
118    #[must_use]
119    pub fn is_enabled(&self) -> bool {
120        self.enabled
121    }
122
123    /// Enable or disable this queue.
124    ///
125    /// Disabled queues are skipped by [`TaskQueueManager::pick()`].
126    /// Chrome uses this to throttle background tab timers.
127    #[inline]
128    pub fn set_enabled(&mut self, enabled: bool) {
129        self.enabled = enabled;
130    }
131
132    /// The priority level this queue serves.
133    #[inline]
134    #[must_use]
135    pub fn priority(&self) -> TaskPriority {
136        self.priority
137    }
138}
139
140// ---- DelayedEntry (for BinaryHeap min-heap) ----
141
142/// Wrapper for delayed tasks in the min-heap.
143/// Ordered by `run_at` (earliest deadline first).
144struct DelayedEntry {
145    task: Task,
146    /// Cached `run_at` for heap ordering. Avoids calling `task.run_at()`
147    /// repeatedly during heap operations.
148    deadline: Instant,
149}
150
151impl PartialEq for DelayedEntry {
152    fn eq(&self, other: &Self) -> bool {
153        self.deadline == other.deadline
154    }
155}
156
157impl Eq for DelayedEntry {}
158
159impl PartialOrd for DelayedEntry {
160    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
161        Some(self.cmp(other))
162    }
163}
164
165impl Ord for DelayedEntry {
166    fn cmp(&self, other: &Self) -> Ordering {
167        // Reverse: BinaryHeap is max-heap, we want min-heap (earliest first).
168        other.deadline.cmp(&self.deadline)
169    }
170}
171
172// ---- TaskQueueManager (priority picker) ----
173
174/// Number of consecutive high-priority picks before forcing a lower-priority pick.
175///
176/// Prevents starvation of low-priority tasks when high-priority tasks
177/// are continuously posted. After this many consecutive picks from
178/// Input/UserBlocking queues, one task from the lowest non-empty queue
179/// is guaranteed to run.
180const STARVATION_THRESHOLD: u32 = 64;
181
182/// Manages multiple [`TaskQueue`]s and picks by priority.
183///
184/// Like Chrome's `base::sequence_manager::SequenceManager` — maintains one
185/// `TaskQueue` per priority level and picks from the highest non-empty
186/// enabled queue each iteration.
187///
188/// # Delayed tasks
189///
190/// Tasks with a future `run_at` go into a `BinaryHeap` (min-heap sorted
191/// by deadline). [`promote_delayed()`](Self::promote_delayed) pops only
192/// ready tasks — O(k log n) where k = newly ready tasks. With 1000 timers
193/// and 0 ready, promotion is O(1) (peek at heap top).
194///
195/// # Anti-starvation
196///
197/// After `STARVATION_THRESHOLD` consecutive picks from high-priority
198/// queues (Input, `UserBlocking`), forces one pick from the lowest non-empty
199/// queue. Counter resets whether the forced pick succeeds or not.
200pub struct TaskQueueManager {
201    /// One queue per priority level.
202    /// Index 0 = Input (highest), Index 5 = Idle (lowest).
203    queues: [TaskQueue; TaskPriority::COUNT],
204
205    /// Delayed tasks in a min-heap sorted by deadline (earliest first).
206    /// O(log n) insert, O(1) peek, O(k log n) promote k ready tasks.
207    delayed: BinaryHeap<DelayedEntry>,
208
209    /// Counter for anti-starvation.
210    consecutive_high: u32,
211}
212
213impl TaskQueueManager {
214    /// Create a new set with one empty queue per priority level.
215    #[must_use]
216    pub fn new() -> Self {
217        Self {
218            queues: [
219                TaskQueue::new(TaskPriority::Input),
220                TaskQueue::new(TaskPriority::UserBlocking),
221                TaskQueue::new(TaskPriority::Normal),
222                TaskQueue::new(TaskPriority::Timer),
223                TaskQueue::new(TaskPriority::BestEffort),
224                TaskQueue::new(TaskPriority::Idle),
225            ],
226            delayed: BinaryHeap::new(),
227            consecutive_high: 0,
228        }
229    }
230
231    /// Post a task to the appropriate priority queue.
232    ///
233    /// If the task has a future `run_at`, it goes to the delayed min-heap.
234    /// Otherwise it goes directly into the priority queue.
235    pub fn push(&mut self, task: Task) {
236        if let Some(deadline) = task.run_at() {
237            if task.is_ready() {
238                // Deadline already passed — go straight to queue.
239                self.queues[task.priority().as_index()].push(task);
240            } else {
241                self.delayed.push(DelayedEntry { task, deadline });
242            }
243        } else {
244            self.queues[task.priority().as_index()].push(task);
245        }
246    }
247
248    /// Pick the highest-priority ready task.
249    ///
250    /// Returns `None` if all queues are empty or disabled.
251    /// Applies anti-starvation after consecutive high-priority picks.
252    pub fn pick(&mut self) -> Option<Task> {
253        // Anti-starvation: force a lower-priority pick if threshold exceeded.
254        if self.consecutive_high >= STARVATION_THRESHOLD {
255            // Reset counter regardless of whether forced pick succeeds.
256            // This prevents infinite re-checking when no low-priority tasks exist.
257            self.consecutive_high = 0;
258
259            if let Some(task) = self.pick_lowest_nonempty() {
260                return Some(task);
261            }
262        }
263
264        // Normal path: highest-priority non-empty enabled queue.
265        for idx in 0..TaskPriority::COUNT {
266            if let Some(task) = self.queues[idx].pop() {
267                if idx <= TaskPriority::UserBlocking.as_index() {
268                    self.consecutive_high += 1;
269                } else {
270                    self.consecutive_high = 0;
271                }
272                return Some(task);
273            }
274        }
275
276        None
277    }
278
279    /// Move delayed tasks that are now ready into their priority queues.
280    ///
281    /// Uses a min-heap: peeks at the earliest deadline, pops if ready.
282    /// O(k log n) where k = tasks becoming ready. When no tasks are ready
283    /// (the common case), this is O(1) — just one peek.
284    pub fn promote_delayed(&mut self) {
285        let now = Instant::now();
286        while let Some(entry) = self.delayed.peek() {
287            if entry.deadline > now {
288                break; // Earliest deadline is in the future — nothing more to promote.
289            }
290            let entry = self.delayed.pop().unwrap();
291            self.queues[entry.task.priority().as_index()].push(entry.task);
292        }
293    }
294
295    /// Time until the next delayed task is ready.
296    ///
297    /// Returns `None` if there are no delayed tasks.
298    /// O(1) — just peeks at the heap top.
299    #[must_use]
300    pub fn next_delayed_ready_in(&self) -> Option<std::time::Duration> {
301        self.delayed
302            .peek()
303            .map(|entry| entry.deadline.saturating_duration_since(Instant::now()))
304    }
305
306    /// Get a reference to a specific priority queue.
307    #[inline]
308    #[must_use]
309    pub fn queue(&self, priority: TaskPriority) -> &TaskQueue {
310        &self.queues[priority.as_index()]
311    }
312
313    /// Get a mutable reference to a specific priority queue.
314    ///
315    /// Use for per-queue operations like enable/disable:
316    /// ```ignore
317    /// manager.queue_mut(TaskPriority::Timer).set_enabled(false);
318    /// ```
319    #[inline]
320    pub fn queue_mut(&mut self, priority: TaskPriority) -> &mut TaskQueue {
321        &mut self.queues[priority.as_index()]
322    }
323
324    /// Total tasks across all queues that are actually pickable
325    /// (in enabled queues).
326    #[must_use]
327    pub fn ready_count(&self) -> usize {
328        self.queues
329            .iter()
330            .filter(|q| q.is_enabled())
331            .map(|q| q.len())
332            .sum()
333    }
334
335    /// Number of delayed tasks waiting for their deadline.
336    #[inline]
337    #[must_use]
338    pub fn delayed_count(&self) -> usize {
339        self.delayed.len()
340    }
341
342    /// Whether there are any pickable tasks in enabled queues.
343    #[must_use]
344    pub fn has_ready(&self) -> bool {
345        self.queues.iter().any(|q| q.has_ready())
346    }
347
348    /// Whether there are no tasks at all (ready or delayed).
349    #[must_use]
350    pub fn is_empty(&self) -> bool {
351        !self.has_ready() && self.delayed.is_empty()
352    }
353
354    /// Whether there are delayed tasks pending (for park timeout calculation).
355    #[inline]
356    #[must_use]
357    pub fn has_delayed(&self) -> bool {
358        !self.delayed.is_empty()
359    }
360
361    /// Pick from the lowest non-empty enabled queue (for anti-starvation).
362    fn pick_lowest_nonempty(&mut self) -> Option<Task> {
363        for idx in (0..TaskPriority::COUNT).rev() {
364            if let Some(task) = self.queues[idx].pop() {
365                return Some(task);
366            }
367        }
368        None
369    }
370}
371
372impl Default for TaskQueueManager {
373    fn default() -> Self {
374        Self::new()
375    }
376}
377
378#[cfg(test)]
379mod tests {
380    use super::*;
381    use std::cell::Cell;
382    use std::rc::Rc;
383    use std::time::Duration;
384
385    // ---- TaskQueue tests ----
386
387    #[test]
388    fn queue_fifo_order() {
389        let log = Rc::new(std::cell::RefCell::new(Vec::new()));
390        let mut q = TaskQueue::new(TaskPriority::Normal);
391
392        for i in 0..3 {
393            let l = log.clone();
394            q.push(Task::new(TaskPriority::Normal, move || {
395                l.borrow_mut().push(i)
396            }));
397        }
398
399        assert_eq!(q.len(), 3);
400        while let Some(task) = q.pop() {
401            task.run();
402        }
403        assert_eq!(*log.borrow(), vec![0, 1, 2]);
404    }
405
406    #[test]
407    fn queue_disabled_returns_none() {
408        let mut q = TaskQueue::new(TaskPriority::Normal);
409        q.push(Task::new(TaskPriority::Normal, || {}));
410
411        q.set_enabled(false);
412        assert!(!q.is_enabled());
413        assert!(q.pop().is_none());
414        assert!(!q.has_ready());
415        assert_eq!(q.len(), 1); // still has the task
416
417        q.set_enabled(true);
418        assert!(q.pop().is_some());
419    }
420
421    #[test]
422    fn queue_front_peek() {
423        let mut q = TaskQueue::new(TaskPriority::Input);
424        assert!(q.front().is_none());
425
426        q.push(Task::new(TaskPriority::Input, || {}));
427        assert!(q.front().is_some());
428        assert_eq!(q.len(), 1); // peek didn't consume
429    }
430
431    #[test]
432    fn queue_front_none_when_disabled() {
433        let mut q = TaskQueue::new(TaskPriority::Normal);
434        q.push(Task::new(TaskPriority::Normal, || {}));
435        q.set_enabled(false);
436        assert!(q.front().is_none());
437    }
438
439    // ---- TaskQueueManager tests ----
440
441    #[test]
442    fn set_empty_returns_none() {
443        let mut mgr = TaskQueueManager::new();
444        assert!(mgr.pick().is_none());
445        assert!(mgr.is_empty());
446    }
447
448    #[test]
449    fn set_higher_priority_first() {
450        let log = Rc::new(std::cell::RefCell::new(Vec::new()));
451        let mut mgr = TaskQueueManager::new();
452
453        let l = log.clone();
454        mgr.push(Task::new(TaskPriority::Idle, move || {
455            l.borrow_mut().push("idle")
456        }));
457        let l = log.clone();
458        mgr.push(Task::new(TaskPriority::Input, move || {
459            l.borrow_mut().push("input")
460        }));
461        let l = log.clone();
462        mgr.push(Task::new(TaskPriority::Normal, move || {
463            l.borrow_mut().push("normal")
464        }));
465
466        while let Some(task) = mgr.pick() {
467            task.run();
468        }
469        assert_eq!(*log.borrow(), vec!["input", "normal", "idle"]);
470    }
471
472    #[test]
473    fn set_fifo_within_priority() {
474        let log = Rc::new(std::cell::RefCell::new(Vec::new()));
475        let mut mgr = TaskQueueManager::new();
476
477        for i in 0..3 {
478            let l = log.clone();
479            mgr.push(Task::new(TaskPriority::Normal, move || {
480                l.borrow_mut().push(i)
481            }));
482        }
483
484        while let Some(task) = mgr.pick() {
485            task.run();
486        }
487        assert_eq!(*log.borrow(), vec![0, 1, 2]);
488    }
489
490    #[test]
491    fn set_disabled_queue_skipped() {
492        let mut mgr = TaskQueueManager::new();
493        mgr.push(Task::new(TaskPriority::Normal, || {}));
494
495        mgr.queue_mut(TaskPriority::Normal).set_enabled(false);
496        assert!(mgr.pick().is_none());
497
498        mgr.queue_mut(TaskPriority::Normal).set_enabled(true);
499        assert!(mgr.pick().is_some());
500    }
501
502    #[test]
503    fn set_anti_starvation() {
504        let log = Rc::new(std::cell::RefCell::new(Vec::<&str>::new()));
505        let mut mgr = TaskQueueManager::new();
506
507        for _ in 0..STARVATION_THRESHOLD + 1 {
508            let l = log.clone();
509            mgr.push(Task::new(TaskPriority::Input, move || {
510                l.borrow_mut().push("input")
511            }));
512        }
513
514        let l = log.clone();
515        mgr.push(Task::new(TaskPriority::Idle, move || {
516            l.borrow_mut().push("idle")
517        }));
518
519        for _ in 0..STARVATION_THRESHOLD + 1 {
520            mgr.pick().unwrap().run();
521        }
522
523        let entries = log.borrow();
524        assert_eq!(entries[STARVATION_THRESHOLD as usize], "idle");
525    }
526
527    #[test]
528    fn anti_starvation_resets_when_no_low_priority() {
529        // When threshold is reached but no low-priority tasks exist,
530        // the counter must reset to avoid re-checking every pick.
531        let mut mgr = TaskQueueManager::new();
532
533        // Only high-priority tasks — no low-priority to force-pick.
534        for _ in 0..STARVATION_THRESHOLD * 3 {
535            mgr.push(Task::new(TaskPriority::Input, || {}));
536        }
537
538        // Should not hang or degrade — counter resets on failed forced pick.
539        for _ in 0..STARVATION_THRESHOLD * 3 {
540            assert!(mgr.pick().is_some());
541        }
542    }
543
544    #[test]
545    fn delayed_task_not_immediately_ready() {
546        let mut mgr = TaskQueueManager::new();
547        mgr.push(Task::delayed(
548            TaskPriority::Timer,
549            Duration::from_secs(60),
550            || {},
551        ));
552
553        assert_eq!(mgr.ready_count(), 0);
554        assert_eq!(mgr.delayed_count(), 1);
555        assert!(mgr.pick().is_none());
556        assert!(mgr.has_delayed());
557    }
558
559    #[test]
560    fn delayed_task_with_zero_delay_goes_to_queue() {
561        let counter = Rc::new(Cell::new(0u32));
562        let mut mgr = TaskQueueManager::new();
563
564        let c = counter.clone();
565        mgr.push(Task::delayed(
566            TaskPriority::Timer,
567            Duration::ZERO,
568            move || c.set(1),
569        ));
570
571        // Zero delay → should go directly to queue (is_ready() = true).
572        assert_eq!(mgr.ready_count(), 1);
573        mgr.pick().unwrap().run();
574        assert_eq!(counter.get(), 1);
575    }
576
577    #[test]
578    fn promote_delayed_uses_heap() {
579        let mut mgr = TaskQueueManager::new();
580
581        // Add tasks with different delays.
582        mgr.push(Task::delayed(
583            TaskPriority::Timer,
584            Duration::from_secs(60),
585            || {},
586        ));
587        mgr.push(Task::delayed(
588            TaskPriority::Timer,
589            Duration::from_secs(30),
590            || {},
591        ));
592        mgr.push(Task::delayed(
593            TaskPriority::Timer,
594            Duration::from_secs(90),
595            || {},
596        ));
597
598        assert_eq!(mgr.delayed_count(), 3);
599
600        // Nothing ready yet.
601        mgr.promote_delayed();
602        assert_eq!(mgr.ready_count(), 0);
603
604        // next_delayed_ready_in should be ~30s (the earliest).
605        let next = mgr.next_delayed_ready_in().unwrap();
606        assert!(next <= Duration::from_secs(31));
607        assert!(next >= Duration::from_secs(28));
608    }
609
610    #[test]
611    fn next_delayed_ready_in_none_when_empty() {
612        let mgr = TaskQueueManager::new();
613        assert!(mgr.next_delayed_ready_in().is_none());
614    }
615
616    #[test]
617    fn set_queue_access() {
618        let mgr = TaskQueueManager::new();
619        assert_eq!(
620            mgr.queue(TaskPriority::Input).priority(),
621            TaskPriority::Input
622        );
623        assert_eq!(mgr.queue(TaskPriority::Idle).priority(), TaskPriority::Idle);
624    }
625
626    #[test]
627    fn ready_count_excludes_disabled() {
628        let mut mgr = TaskQueueManager::new();
629        mgr.push(Task::new(TaskPriority::Input, || {}));
630        mgr.push(Task::new(TaskPriority::Normal, || {}));
631
632        assert_eq!(mgr.ready_count(), 2);
633
634        mgr.queue_mut(TaskPriority::Input).set_enabled(false);
635        assert_eq!(mgr.ready_count(), 1); // Input disabled, only Normal counts
636    }
637
638    #[test]
639    fn ready_count_excludes_delayed() {
640        let mut mgr = TaskQueueManager::new();
641        mgr.push(Task::new(TaskPriority::Input, || {}));
642        mgr.push(Task::delayed(
643            TaskPriority::Timer,
644            Duration::from_secs(60),
645            || {},
646        ));
647
648        assert_eq!(mgr.ready_count(), 1);
649        assert!(mgr.has_ready());
650    }
651}