use std::future::Future;
use std::sync::Arc;
use std::time::Duration;
use crate::executor::{LocalExecutor, TaskId};
use crate::frame::{FrameInfo, FrameScheduler};
use crate::microtask::MicrotaskQueue;
use crate::queue::TaskQueueManager;
use crate::task::{Task, TaskPriority};
use crate::waker::{WakeReceiver, WakeSender};
#[derive(Debug, Clone, Copy)]
#[must_use]
pub struct TickResult {
pub tasks_run: usize,
pub microtasks_run: usize,
pub cross_thread_tasks: usize,
pub futures_polled: usize,
pub frame_produced: bool,
pub park_timeout: Option<Duration>,
}
pub struct Scheduler {
task_queue: TaskQueueManager,
microtask_queue: MicrotaskQueue,
executor: LocalExecutor,
frame_scheduler: FrameScheduler,
receiver: WakeReceiver,
}
impl Scheduler {
#[must_use]
pub fn new() -> (Self, WakeSender) {
let (sender, receiver) = crate::waker::cross_thread_channel();
let scheduler = Self {
task_queue: TaskQueueManager::new(),
microtask_queue: MicrotaskQueue::new(),
executor: LocalExecutor::new(),
frame_scheduler: FrameScheduler::new(),
receiver,
};
(scheduler, sender)
}
pub fn post_task(&mut self, priority: TaskPriority, callback: impl FnOnce() + 'static) {
self.task_queue.push(Task::new(priority, callback));
}
pub fn post_delayed_task(
&mut self,
priority: TaskPriority,
delay: Duration,
callback: impl FnOnce() + 'static,
) {
self.task_queue
.push(Task::delayed(priority, delay, callback));
}
pub fn post_raw_task(&mut self, task: Task) {
self.task_queue.push(task);
}
pub fn queue_microtask(&mut self, callback: impl FnOnce() + 'static) {
self.microtask_queue.queue_microtask(callback);
}
pub fn spawn(&mut self, future: impl Future<Output = ()> + 'static) -> TaskId {
self.executor.spawn(future)
}
pub fn set_needs_frame(&mut self) {
self.frame_scheduler.set_needs_frame();
}
pub fn request_frame(&mut self, callback: impl FnMut(FrameInfo) -> bool + 'static) {
self.frame_scheduler.request_frame(callback);
}
pub fn set_executor_notify(&mut self, notify: Arc<dyn Fn() + Send + Sync>) {
self.executor.set_notify(notify);
}
pub fn set_queue_enabled(&mut self, priority: TaskPriority, enabled: bool) {
self.task_queue.queue_mut(priority).set_enabled(enabled);
}
pub fn tick(&mut self, render: &mut dyn FnMut(FrameInfo)) -> TickResult {
let mut result = TickResult {
tasks_run: 0,
microtasks_run: 0,
cross_thread_tasks: 0,
futures_polled: 0,
frame_produced: false,
park_timeout: None,
};
result.cross_thread_tasks = self.receiver.drain_into(|ct| {
let priority = ct.priority();
self.task_queue.push(Task::new(priority, move || ct.run()));
});
crate::timer::fire_expired();
self.task_queue.promote_delayed();
result.futures_polled = self.executor.poll_all();
if let Some(task) = self.task_queue.pick() {
task.run();
result.tasks_run = 1;
}
result.microtasks_run = self.microtask_queue.drain();
if self.frame_scheduler.should_produce_frame() {
let info = self.frame_scheduler.begin_frame();
self.frame_scheduler.run_callbacks(info);
render(info);
self.frame_scheduler.end_frame();
result.frame_produced = true;
}
result.park_timeout = self.calculate_park_timeout();
result
}
pub fn run_until_idle(&mut self, render: &mut dyn FnMut(FrameInfo)) {
loop {
let result = self.tick(render);
if result.tasks_run == 0
&& result.microtasks_run == 0
&& result.cross_thread_tasks == 0
&& result.futures_polled == 0
&& !result.frame_produced
&& self.executor.is_idle()
{
break;
}
}
}
#[must_use]
pub fn has_work(&self) -> bool {
self.task_queue.has_ready()
|| self.task_queue.has_delayed()
|| !self.microtask_queue.is_empty()
|| self.executor.has_woken()
|| self.frame_scheduler.should_produce_frame()
}
#[must_use]
pub fn frame_scheduler(&self) -> &FrameScheduler {
&self.frame_scheduler
}
pub fn frame_scheduler_mut(&mut self) -> &mut FrameScheduler {
&mut self.frame_scheduler
}
#[must_use]
pub fn task_queue(&self) -> &TaskQueueManager {
&self.task_queue
}
#[must_use]
pub fn executor(&self) -> &LocalExecutor {
&self.executor
}
fn calculate_park_timeout(&self) -> Option<Duration> {
if self.task_queue.has_ready()
|| !self.microtask_queue.is_empty()
|| self.executor.has_woken()
{
return Some(Duration::ZERO);
}
let mut min_timeout: Option<Duration> = None;
if let Some(d) = self.task_queue.next_delayed_ready_in() {
min_timeout = Some(d);
}
if let Some(f) = self.frame_scheduler.time_until_next_frame() {
min_timeout = Some(match min_timeout {
None => f,
Some(m) => m.min(f),
});
}
if let Some(t) = crate::timer::next_deadline() {
min_timeout = Some(match min_timeout {
None => t,
Some(m) => m.min(t),
});
}
min_timeout
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::cell::Cell;
use std::rc::Rc;
use std::task::Poll;
#[test]
fn empty_tick_does_nothing() {
let (mut sched, _sender) = Scheduler::new();
let result = sched.tick(&mut |_| {});
assert_eq!(result.tasks_run, 0);
assert_eq!(result.microtasks_run, 0);
assert_eq!(result.cross_thread_tasks, 0);
assert_eq!(result.futures_polled, 0);
assert!(!result.frame_produced);
}
#[test]
fn post_and_run_task() {
let (mut sched, _sender) = Scheduler::new();
let called = Rc::new(Cell::new(false));
let c = called.clone();
sched.post_task(TaskPriority::Normal, move || c.set(true));
let result = sched.tick(&mut |_| {});
assert_eq!(result.tasks_run, 1);
assert!(called.get());
}
#[test]
fn microtask_drains_after_macrotask() {
let (mut sched, _sender) = Scheduler::new();
let log = Rc::new(std::cell::RefCell::new(Vec::new()));
let l = log.clone();
sched.post_task(TaskPriority::Normal, move || {
l.borrow_mut().push("macro");
});
let l = log.clone();
sched.queue_microtask(move || {
l.borrow_mut().push("micro");
});
let _ = sched.tick(&mut |_| {});
assert_eq!(*log.borrow(), vec!["macro", "micro"]);
}
#[test]
fn priority_ordering() {
let (mut sched, _sender) = Scheduler::new();
let log = Rc::new(std::cell::RefCell::new(Vec::new()));
let l = log.clone();
sched.post_task(TaskPriority::Idle, move || l.borrow_mut().push("idle"));
let l = log.clone();
sched.post_task(TaskPriority::Input, move || l.borrow_mut().push("input"));
let _ = sched.tick(&mut |_| {});
assert_eq!(*log.borrow(), vec!["input"]);
let _ = sched.tick(&mut |_| {});
assert_eq!(*log.borrow(), vec!["input", "idle"]);
}
#[test]
fn spawn_async_task() {
let (mut sched, _sender) = Scheduler::new();
let done = Rc::new(Cell::new(false));
let d = done.clone();
sched.spawn(async move {
d.set(true);
});
let _ = sched.tick(&mut |_| {});
assert!(done.get());
}
#[test]
fn cross_thread_task() {
let (mut sched, sender) = Scheduler::new();
let called = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
let c = called.clone();
sender
.post(move || c.store(true, std::sync::atomic::Ordering::SeqCst))
.unwrap();
let result = sched.tick(&mut |_| {});
assert_eq!(result.cross_thread_tasks, 1);
assert_eq!(result.tasks_run, 1);
assert!(called.load(std::sync::atomic::Ordering::SeqCst));
}
#[test]
fn frame_production() {
let (mut sched, _sender) = Scheduler::new();
let rendered = Rc::new(Cell::new(false));
sched.set_needs_frame();
let r = rendered.clone();
let result = sched.tick(&mut move |_info| {
r.set(true);
});
assert!(result.frame_produced);
assert!(rendered.get());
}
#[test]
fn request_frame_callback() {
let (mut sched, _sender) = Scheduler::new();
let called = Rc::new(Cell::new(false));
let c = called.clone();
sched.request_frame(move |_| {
c.set(true);
false
});
let _ = sched.tick(&mut |_| {});
assert!(called.get());
}
#[test]
fn run_until_idle_drains_all() {
let (mut sched, _sender) = Scheduler::new();
let counter = Rc::new(Cell::new(0u32));
for _ in 0..5 {
let c = counter.clone();
sched.post_task(TaskPriority::Normal, move || c.set(c.get() + 1));
}
sched.run_until_idle(&mut |_| {});
assert_eq!(counter.get(), 5);
}
#[test]
fn has_work_reflects_state() {
let (mut sched, _sender) = Scheduler::new();
assert!(!sched.has_work());
sched.post_task(TaskPriority::Normal, || {});
assert!(sched.has_work());
let _ = sched.tick(&mut |_| {});
assert!(!sched.has_work());
}
#[test]
fn disabled_queue_skipped() {
let (mut sched, _sender) = Scheduler::new();
let called = Rc::new(Cell::new(false));
let c = called.clone();
sched.post_task(TaskPriority::Timer, move || c.set(true));
sched.set_queue_enabled(TaskPriority::Timer, false);
let _ = sched.tick(&mut |_| {});
assert!(!called.get());
sched.set_queue_enabled(TaskPriority::Timer, true);
let _ = sched.tick(&mut |_| {});
assert!(called.get());
}
#[test]
fn park_timeout_zero_when_work() {
let (mut sched, _sender) = Scheduler::new();
sched.post_task(TaskPriority::Normal, || {});
let result = sched.tick(&mut |_| {});
assert_eq!(result.tasks_run, 1);
}
#[test]
fn park_timeout_none_when_idle() {
let (mut sched, _sender) = Scheduler::new();
let result = sched.tick(&mut |_| {});
assert!(result.park_timeout.is_none());
}
#[test]
fn async_with_waker() {
use std::cell::RefCell;
let (mut sched, _sender) = Scheduler::new();
let counter = Rc::new(Cell::new(0u32));
let waker_holder: Rc<RefCell<Option<std::task::Waker>>> = Rc::new(RefCell::new(None));
let c = counter.clone();
let wh = waker_holder.clone();
sched.spawn(async move {
std::future::poll_fn(|cx| {
let count = c.get();
if count == 0 {
*wh.borrow_mut() = Some(cx.waker().clone());
c.set(1);
Poll::Pending
} else {
c.set(2);
Poll::Ready(())
}
})
.await;
});
let _ = sched.tick(&mut |_| {});
assert_eq!(counter.get(), 1);
waker_holder.borrow().as_ref().unwrap().wake_by_ref();
let _ = sched.tick(&mut |_| {});
assert_eq!(counter.get(), 2);
}
}