lustre_executor/
executor.rs1use crate::id::TaskId;
2use crate::reactor::Reactor;
3use crate::task::Task;
4use futures::task::noop_waker;
5use std::collections::{BTreeMap, VecDeque};
6use std::io;
7use std::sync::{Arc, Mutex};
8use std::task::{Context, Poll as TaskPoll, Waker};
9
10pub struct Executor {
11 tasks: BTreeMap<TaskId<u64>, Task>,
12 task_queue: Arc<Mutex<VecDeque<TaskId<u64>>>>,
13 waker_cache: BTreeMap<TaskId<u64>, Waker>,
14 reactor: Reactor,
15}
16
17impl Executor {
18 pub fn new() -> io::Result<Self> {
19 Ok(Self {
20 tasks: BTreeMap::new(),
21 task_queue: Arc::new(Mutex::new(VecDeque::new())),
22 waker_cache: BTreeMap::new(),
23 reactor: Reactor::new()?,
24 })
25 }
26
27 pub fn spawn(&mut self, task: Task) {
28 let task_id = task.id();
29 if self.tasks.insert(task.id(), task).is_some() {
30 panic!("Task ID collision");
31 }
32 self.task_queue.lock().unwrap().push_back(task_id);
33 }
34
35 pub fn run(&mut self) {
36 loop {
37 let mut had_work = false;
38 while let Some(task_id) = self.task_queue.lock().unwrap().pop_front() {
39 had_work = true;
40 if let Some(task) = self.tasks.get_mut(&task_id) {
41 let waker = self.waker_cache.entry(task_id).or_insert_with(|| {
42 crate::task::TaskWaker::new(task_id, Arc::clone(&self.task_queue))
43 });
44 let mut cx = Context::from_waker(waker);
45 match task.poll(&mut cx) {
46 TaskPoll::Ready(()) => {
47 self.tasks.remove(&task_id);
48 self.waker_cache.remove(&task_id);
49 }
50 TaskPoll::Pending => {}
51 }
52 }
53 }
54 let poll_result = self.reactor.poll(&mut Context::from_waker(&noop_waker()));
55 if !had_work && matches!(poll_result, TaskPoll::Pending) {
56 break;
57 }
58 }
59 }
60
61 pub fn reactor(&mut self) -> &mut Reactor {
62 &mut self.reactor
63 }
64}