lustre_executor/core/
executor.rs1use crate::Reactor;
4use crate::Task;
5use crate::TaskId;
6use crate::TimerData;
7use futures::task::noop_waker;
8use std::collections::{BTreeMap, VecDeque};
9use std::io;
10use std::sync::{Arc, Mutex};
11use std::task::{Context, Poll as TaskPoll, Waker};
12use std::time::Instant;
13
14pub struct Executor {
16 tasks: BTreeMap<TaskId<u64>, Task>,
17 task_queue: Arc<Mutex<VecDeque<TaskId<u64>>>>,
18 waker_cache: BTreeMap<TaskId<u64>, Waker>,
19 reactor: Reactor,
20 timer_data: Arc<Mutex<TimerData>>,
21}
22
23impl Executor {
24 pub fn new() -> io::Result<Self> {
26 Ok(Self {
27 tasks: BTreeMap::new(),
28 task_queue: Arc::new(Mutex::new(VecDeque::new())),
29 waker_cache: BTreeMap::new(),
30 reactor: Reactor::new()?,
31 timer_data: Arc::new(Mutex::new(TimerData::new())),
32 })
33 }
34
35 pub fn spawn(&mut self, task: Task) {
37 let task_id = task.id();
38 if self.tasks.insert(task.id(), task).is_some() {
39 panic!("Task ID collision");
40 }
41 self.task_queue.lock().unwrap().push_back(task_id);
42 }
43
44 pub fn register_timer(&mut self, deadline: Instant, waker: Waker) {
46 self.timer_data
47 .lock()
48 .unwrap()
49 .register_timer(deadline, waker);
50 }
51
52 pub fn timer_data(&self) -> Arc<Mutex<TimerData>> {
54 Arc::clone(&self.timer_data)
55 }
56
57 pub fn run(&mut self) {
59 loop {
60 let mut had_work = false;
61 while let Some(task_id) = self.task_queue.lock().unwrap().pop_front() {
62 had_work = true;
63 if let Some(task) = self.tasks.get_mut(&task_id) {
64 let waker = self.waker_cache.entry(task_id).or_insert_with(|| {
65 crate::core::task::TaskWaker::new(task_id, Arc::clone(&self.task_queue))
66 });
67 let mut cx = Context::from_waker(waker);
68 match task.poll(&mut cx) {
69 TaskPoll::Ready(()) => {
70 self.tasks.remove(&task_id);
71 self.waker_cache.remove(&task_id);
72 }
73 TaskPoll::Pending => {}
74 }
75 }
76 }
77
78 let timeout = self.timer_data.lock().unwrap().next_timeout();
80
81 let poll_result = self
82 .reactor
83 .poll(&mut Context::from_waker(&noop_waker()), timeout);
84
85 self.timer_data.lock().unwrap().check_expired();
87 if !had_work
88 && matches!(poll_result, TaskPoll::Pending)
89 && self.task_queue.lock().unwrap().is_empty()
90 && self.timer_data.lock().unwrap().is_empty()
91 {
92 break;
93 }
94 }
95 }
96
97 pub fn reactor(&mut self) -> &mut Reactor {
99 &mut self.reactor
100 }
101}