use crate::Reactor;
use crate::Task;
use crate::TaskId;
use crate::TimerData;
use futures::task::noop_waker;
use std::collections::{BTreeMap, VecDeque};
use std::io;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll as TaskPoll, Waker};
use std::time::Instant;
pub struct Executor {
tasks: BTreeMap<TaskId<u64>, Task>,
task_queue: Arc<Mutex<VecDeque<TaskId<u64>>>>,
waker_cache: BTreeMap<TaskId<u64>, Waker>,
reactor: Reactor,
timer_data: Arc<Mutex<TimerData>>,
}
impl Executor {
pub fn new() -> io::Result<Self> {
Ok(Self {
tasks: BTreeMap::new(),
task_queue: Arc::new(Mutex::new(VecDeque::new())),
waker_cache: BTreeMap::new(),
reactor: Reactor::new()?,
timer_data: Arc::new(Mutex::new(TimerData::new())),
})
}
pub fn spawn(&mut self, task: Task) {
let task_id = task.id();
if self.tasks.insert(task.id(), task).is_some() {
panic!("Task ID collision");
}
self.task_queue.lock().unwrap().push_back(task_id);
}
pub fn register_timer(&mut self, deadline: Instant, waker: Waker) {
self.timer_data
.lock()
.unwrap()
.register_timer(deadline, waker);
}
pub fn timer_data(&self) -> Arc<Mutex<TimerData>> {
Arc::clone(&self.timer_data)
}
pub fn run(&mut self) {
loop {
let mut had_work = false;
while let Some(task_id) = self.task_queue.lock().unwrap().pop_front() {
had_work = true;
if let Some(task) = self.tasks.get_mut(&task_id) {
let waker = self.waker_cache.entry(task_id).or_insert_with(|| {
crate::core::task::TaskWaker::new(task_id, Arc::clone(&self.task_queue))
});
let mut cx = Context::from_waker(waker);
match task.poll(&mut cx) {
TaskPoll::Ready(()) => {
self.tasks.remove(&task_id);
self.waker_cache.remove(&task_id);
}
TaskPoll::Pending => {}
}
}
}
let timeout = self.timer_data.lock().unwrap().next_timeout();
let poll_result = self
.reactor
.poll(&mut Context::from_waker(&noop_waker()), timeout);
self.timer_data.lock().unwrap().check_expired();
if !had_work
&& matches!(poll_result, TaskPoll::Pending)
&& self.task_queue.lock().unwrap().is_empty()
&& self.timer_data.lock().unwrap().is_empty()
{
break;
}
}
}
pub fn reactor(&mut self) -> &mut Reactor {
&mut self.reactor
}
}