Skip to main content

lustre_executor/io/
reactor.rs

1//! Mio-based reactor for I/O event polling.
2
3use mio::Poll as MioPoll;
4use mio::{Events, Interest, Token};
5use std::collections::HashMap;
6use std::io;
7use std::task::{Context, Poll as TaskPoll, Waker};
8use std::time::Duration;
9
10/// Reactor for handling I/O readiness events.
11pub struct Reactor {
12    poll: MioPoll,
13    events: Events,
14    wakers: HashMap<Token, Waker>,
15    next_token: usize,
16}
17
18impl Reactor {
19    /// Creates a new reactor.
20    pub fn new() -> io::Result<Self> {
21        Ok(Self {
22            poll: MioPoll::new()?,
23            events: Events::with_capacity(1024),
24            wakers: HashMap::new(),
25            next_token: 0,
26        })
27    }
28
29    /// Registers an I/O source with the reactor.
30    pub fn register(
31        &mut self,
32        source: &mut impl mio::event::Source,
33        interest: Interest,
34        waker: Waker,
35    ) -> Token {
36        let token = Token(self.next_token);
37        self.next_token += 1;
38        self.poll
39            .registry()
40            .register(source, token, interest)
41            .unwrap();
42        self.wakers.insert(token, waker);
43        token
44    }
45
46    /// Polls for I/O events and wakes associated tasks.
47    pub fn poll(&mut self, _cx: &mut Context, timeout: Option<Duration>) -> TaskPoll<()> {
48        self.poll.poll(&mut self.events, timeout).unwrap();
49        for event in &self.events {
50            if let Some(waker) = self.wakers.get(&event.token()) {
51                waker.wake_by_ref();
52            }
53        }
54        if self.events.is_empty() {
55            TaskPoll::Pending
56        } else {
57            TaskPoll::Ready(())
58        }
59    }
60}