Skip to main content

lustre_executor/core/
executor.rs

1//! The core executor runtime for managing async tasks.
2
3use crate::Reactor;
4use crate::Task;
5use crate::TaskId;
6use crate::TimerData;
7use futures::task::noop_waker;
8use std::collections::{BTreeMap, VecDeque};
9use std::io;
10use std::sync::{Arc, Mutex};
11use std::task::{Context, Poll as TaskPoll, Waker};
12use std::time::Instant;
13
14/// The main executor that runs async tasks and handles I/O events.
15pub struct Executor {
16    tasks: BTreeMap<TaskId<u64>, Task>,
17    task_queue: Arc<Mutex<VecDeque<TaskId<u64>>>>,
18    waker_cache: BTreeMap<TaskId<u64>, Waker>,
19    reactor: Reactor,
20    timer_data: Arc<Mutex<TimerData>>,
21}
22
23impl Executor {
24    /// Creates a new executor with default ID generator.
25    pub fn new() -> io::Result<Self> {
26        Ok(Self {
27            tasks: BTreeMap::new(),
28            task_queue: Arc::new(Mutex::new(VecDeque::new())),
29            waker_cache: BTreeMap::new(),
30            reactor: Reactor::new()?,
31            timer_data: Arc::new(Mutex::new(TimerData::new())),
32        })
33    }
34
35    /// Spawns a new task with an async future.
36    pub fn spawn(&mut self, task: Task) {
37        let task_id = task.id();
38        if self.tasks.insert(task.id(), task).is_some() {
39            panic!("Task ID collision");
40        }
41        self.task_queue.lock().unwrap().push_back(task_id);
42    }
43
44    /// Registers a timer to wake at the given deadline.
45    pub fn register_timer(&mut self, deadline: Instant, waker: Waker) {
46        self.timer_data
47            .lock()
48            .unwrap()
49            .register_timer(deadline, waker);
50    }
51
52    /// Gets the timer data for creating timers.
53    pub fn timer_data(&self) -> Arc<Mutex<TimerData>> {
54        Arc::clone(&self.timer_data)
55    }
56
57    /// Runs the executor event loop until all tasks complete.
58    pub fn run(&mut self) {
59        loop {
60            let mut had_work = false;
61            while let Some(task_id) = self.task_queue.lock().unwrap().pop_front() {
62                had_work = true;
63                if let Some(task) = self.tasks.get_mut(&task_id) {
64                    let waker = self.waker_cache.entry(task_id).or_insert_with(|| {
65                        crate::core::task::TaskWaker::new(task_id, Arc::clone(&self.task_queue))
66                    });
67                    let mut cx = Context::from_waker(waker);
68                    match task.poll(&mut cx) {
69                        TaskPoll::Ready(()) => {
70                            self.tasks.remove(&task_id);
71                            self.waker_cache.remove(&task_id);
72                        }
73                        TaskPoll::Pending => {}
74                    }
75                }
76            }
77
78            // Calculate timeout
79            let timeout = self.timer_data.lock().unwrap().next_timeout();
80
81            let poll_result = self
82                .reactor
83                .poll(&mut Context::from_waker(&noop_waker()), timeout);
84
85            // Check and wake expired timers
86            self.timer_data.lock().unwrap().check_expired();
87            if !had_work
88                && matches!(poll_result, TaskPoll::Pending)
89                && self.task_queue.lock().unwrap().is_empty()
90                && self.timer_data.lock().unwrap().is_empty()
91            {
92                break;
93            }
94        }
95    }
96
97    /// Returns a mutable reference to the reactor for I/O operations.
98    pub fn reactor(&mut self) -> &mut Reactor {
99        &mut self.reactor
100    }
101}