Skip to main content

lustre_executor/
executor.rs

1use crate::id::TaskId;
2use crate::reactor::Reactor;
3use crate::task::Task;
4use futures::task::noop_waker;
5use std::collections::{BTreeMap, VecDeque};
6use std::io;
7use std::sync::{Arc, Mutex};
8use std::task::{Context, Poll as TaskPoll, Waker};
9
10pub struct Executor {
11    tasks: BTreeMap<TaskId<u64>, Task>,
12    task_queue: Arc<Mutex<VecDeque<TaskId<u64>>>>,
13    waker_cache: BTreeMap<TaskId<u64>, Waker>,
14    reactor: Reactor,
15}
16
17impl Executor {
18    pub fn new() -> io::Result<Self> {
19        Ok(Self {
20            tasks: BTreeMap::new(),
21            task_queue: Arc::new(Mutex::new(VecDeque::new())),
22            waker_cache: BTreeMap::new(),
23            reactor: Reactor::new()?,
24        })
25    }
26
27    pub fn spawn(&mut self, task: Task) {
28        let task_id = task.id();
29        if self.tasks.insert(task.id(), task).is_some() {
30            panic!("Task ID collision");
31        }
32        self.task_queue.lock().unwrap().push_back(task_id);
33    }
34
35    pub fn run(&mut self) {
36        loop {
37            let mut had_work = false;
38            while let Some(task_id) = self.task_queue.lock().unwrap().pop_front() {
39                had_work = true;
40                if let Some(task) = self.tasks.get_mut(&task_id) {
41                    let waker = self.waker_cache.entry(task_id).or_insert_with(|| {
42                        crate::task::TaskWaker::new(task_id, Arc::clone(&self.task_queue))
43                    });
44                    let mut cx = Context::from_waker(waker);
45                    match task.poll(&mut cx) {
46                        TaskPoll::Ready(()) => {
47                            self.tasks.remove(&task_id);
48                            self.waker_cache.remove(&task_id);
49                        }
50                        TaskPoll::Pending => {}
51                    }
52                }
53            }
54            let poll_result = self.reactor.poll(&mut Context::from_waker(&noop_waker()));
55            if !had_work && matches!(poll_result, TaskPoll::Pending) {
56                break;
57            }
58        }
59    }
60
61    pub fn reactor(&mut self) -> &mut Reactor {
62        &mut self.reactor
63    }
64}