lustre-executor 0.2.0

A blazingly fast, minimal async executor with pluggable ID generation and I/O support.
Documentation
//! The core executor runtime for managing async tasks.

use crate::Reactor;
use crate::Task;
use crate::TaskId;
use crate::TimerData;
use futures::task::noop_waker;
use std::collections::{BTreeMap, VecDeque};
use std::io;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll as TaskPoll, Waker};
use std::time::Instant;

/// The main executor that runs async tasks and handles I/O events.
pub struct Executor {
    tasks: BTreeMap<TaskId<u64>, Task>,
    task_queue: Arc<Mutex<VecDeque<TaskId<u64>>>>,
    waker_cache: BTreeMap<TaskId<u64>, Waker>,
    reactor: Reactor,
    timer_data: Arc<Mutex<TimerData>>,
}

impl Executor {
    /// Creates a new executor with default ID generator.
    pub fn new() -> io::Result<Self> {
        Ok(Self {
            tasks: BTreeMap::new(),
            task_queue: Arc::new(Mutex::new(VecDeque::new())),
            waker_cache: BTreeMap::new(),
            reactor: Reactor::new()?,
            timer_data: Arc::new(Mutex::new(TimerData::new())),
        })
    }

    /// Spawns a new task with an async future.
    pub fn spawn(&mut self, task: Task) {
        let task_id = task.id();
        if self.tasks.insert(task.id(), task).is_some() {
            panic!("Task ID collision");
        }
        self.task_queue.lock().unwrap().push_back(task_id);
    }

    /// Registers a timer to wake at the given deadline.
    pub fn register_timer(&mut self, deadline: Instant, waker: Waker) {
        self.timer_data
            .lock()
            .unwrap()
            .register_timer(deadline, waker);
    }

    /// Gets the timer data for creating timers.
    pub fn timer_data(&self) -> Arc<Mutex<TimerData>> {
        Arc::clone(&self.timer_data)
    }

    /// Runs the executor event loop until all tasks complete.
    pub fn run(&mut self) {
        loop {
            let mut had_work = false;
            while let Some(task_id) = self.task_queue.lock().unwrap().pop_front() {
                had_work = true;
                if let Some(task) = self.tasks.get_mut(&task_id) {
                    let waker = self.waker_cache.entry(task_id).or_insert_with(|| {
                        crate::core::task::TaskWaker::new(task_id, Arc::clone(&self.task_queue))
                    });
                    let mut cx = Context::from_waker(waker);
                    match task.poll(&mut cx) {
                        TaskPoll::Ready(()) => {
                            self.tasks.remove(&task_id);
                            self.waker_cache.remove(&task_id);
                        }
                        TaskPoll::Pending => {}
                    }
                }
            }

            // Calculate timeout
            let timeout = self.timer_data.lock().unwrap().next_timeout();

            let poll_result = self
                .reactor
                .poll(&mut Context::from_waker(&noop_waker()), timeout);

            // Check and wake expired timers
            self.timer_data.lock().unwrap().check_expired();
            if !had_work
                && matches!(poll_result, TaskPoll::Pending)
                && self.task_queue.lock().unwrap().is_empty()
                && self.timer_data.lock().unwrap().is_empty()
            {
                break;
            }
        }
    }

    /// Returns a mutable reference to the reactor for I/O operations.
    pub fn reactor(&mut self) -> &mut Reactor {
        &mut self.reactor
    }
}