Skip to main content

moduvex_runtime/reactor/
mod.rs

1//! Reactor — I/O readiness event loop.
2//!
3//! The reactor multiplexes OS I/O events (epoll / kqueue / IOCP) onto async
4//! tasks. Each thread owns exactly one reactor, accessed via the thread-local
5//! `REACTOR`. The `with_reactor` helper provides safe, borrow-scoped access.
6//!
7//! Platform dispatch is done entirely at compile time via `cfg` attributes —
8//! zero runtime overhead, no vtable.
9
10use std::cell::RefCell;
11use std::io;
12
13use crate::platform::sys::{Events, Interest, RawSource};
14
15// ── Platform-specific backends ────────────────────────────────────────────────
16
17#[cfg(target_os = "linux")]
18mod epoll;
19#[cfg(target_os = "linux")]
20use epoll::EpollReactor as PlatformReactor;
21
22#[cfg(any(target_os = "macos", target_os = "freebsd"))]
23mod kqueue;
24#[cfg(any(target_os = "macos", target_os = "freebsd"))]
25use kqueue::KqueueReactor as PlatformReactor;
26
27#[cfg(target_os = "windows")]
28mod iocp;
29#[cfg(target_os = "windows")]
30use iocp::IocpReactor as PlatformReactor;
31
32pub mod source;
33pub mod waker_registry;
34
35pub use source::IoSource;
36pub(crate) use waker_registry::WakerRegistry;
37
38// ── ReactorBackend trait ──────────────────────────────────────────────────────
39
40/// Abstraction over platform-specific I/O polling mechanisms.
41///
42/// Implementors: `EpollReactor` (Linux), `KqueueReactor` (macOS/BSD),
43/// `IocpReactor` (Windows stub).
44///
45/// All methods take `&self` (shared ref) so the reactor can be held behind a
46/// `RefCell` and lent immutably to concurrent borrows on the same thread.
47pub(crate) trait ReactorBackend: Sized {
48    /// Construct a new backend instance, opening the underlying OS resource.
49    fn new() -> io::Result<Self>;
50
51    /// Register `source` with the reactor under `token`, monitoring `interest`.
52    ///
53    /// `token` is an opaque caller-chosen `usize` returned verbatim in events.
54    /// Callers must ensure `token` is unique within a single reactor instance.
55    fn register(&self, source: RawSource, token: usize, interest: Interest) -> io::Result<()>;
56
57    /// Update the interest mask of an already-registered source.
58    fn reregister(&self, source: RawSource, token: usize, interest: Interest) -> io::Result<()>;
59
60    /// Remove `source` from the reactor. Must be called before closing the fd.
61    fn deregister(&self, source: RawSource) -> io::Result<()>;
62
63    /// Block until at least one event is ready or `timeout_ms` elapses.
64    ///
65    /// Ready events are appended to `events` (cleared first). Returns the
66    /// number of events collected.
67    ///
68    /// `timeout_ms = None` blocks indefinitely.
69    /// `timeout_ms = Some(0)` returns immediately (non-blocking check).
70    fn poll(&self, events: &mut Events, timeout_ms: Option<u64>) -> io::Result<usize>;
71}
72
73// ── Reactor wrapper ───────────────────────────────────────────────────────────
74
75/// Thread-owned reactor that wraps the platform-specific backend.
76///
77/// Stored in a `RefCell` inside the thread-local so that mutable access can be
78/// checked at runtime (panics on re-entrant borrow, which must not happen in
79/// correct code).
80pub struct Reactor {
81    inner: PlatformReactor,
82    /// Maps reactor tokens to task wakers. Fired when I/O becomes ready.
83    pub(crate) wakers: WakerRegistry,
84}
85
86impl Reactor {
87    /// Create a new `Reactor`, opening the underlying OS polling resource.
88    pub fn new() -> io::Result<Self> {
89        Ok(Self {
90            inner: PlatformReactor::new()?,
91            wakers: WakerRegistry::new(),
92        })
93    }
94
95    /// Register a raw source. Delegates to the platform backend.
96    #[inline]
97    pub fn register(&self, source: RawSource, token: usize, interest: Interest) -> io::Result<()> {
98        self.inner.register(source, token, interest)
99    }
100
101    /// Re-register (update interest) a raw source.
102    #[inline]
103    pub fn reregister(
104        &self,
105        source: RawSource,
106        token: usize,
107        interest: Interest,
108    ) -> io::Result<()> {
109        self.inner.reregister(source, token, interest)
110    }
111
112    /// Deregister a raw source and remove its wakers from the registry.
113    #[inline]
114    pub fn deregister(&self, source: RawSource) -> io::Result<()> {
115        self.inner.deregister(source)
116    }
117
118    /// Deregister a raw source and also clear its waker slots by token.
119    ///
120    /// Use this variant when you know the token (IoSource drop path).
121    pub(crate) fn deregister_with_token(
122        &mut self,
123        source: RawSource,
124        token: usize,
125    ) -> io::Result<()> {
126        self.wakers.remove_token(token);
127        self.inner.deregister(source)
128    }
129
130    /// Poll for ready events and wake registered task wakers.
131    ///
132    /// Fills `events` from the platform backend, then fires any wakers whose
133    /// tokens appear in the event list. Returns the number of events collected.
134    pub fn poll(&mut self, events: &mut Events, timeout_ms: Option<u64>) -> io::Result<usize> {
135        let n = self.inner.poll(events, timeout_ms)?;
136        // Wake tasks registered for these events.
137        for ev in events.iter() {
138            self.wakers.wake_token(ev.token, ev.readable, ev.writable);
139        }
140        Ok(n)
141    }
142
143    /// Poll without waking (for executor's self-pipe drain path).
144    ///
145    /// The executor uses this when it wants raw events and handles waking itself.
146    pub(crate) fn poll_raw(
147        &self,
148        events: &mut Events,
149        timeout_ms: Option<u64>,
150    ) -> io::Result<usize> {
151        self.inner.poll(events, timeout_ms)
152    }
153}
154
155// ── Thread-local reactor ──────────────────────────────────────────────────────
156
157thread_local! {
158    /// Per-thread reactor instance.
159    ///
160    /// Lazily initialised on first access. Panics if the platform backend
161    /// fails to initialise (e.g. `kqueue()` / `epoll_create1()` returns an
162    /// error), which would indicate a severe OS-level resource exhaustion.
163    static REACTOR: RefCell<Reactor> = RefCell::new(
164        Reactor::new().expect("failed to initialise platform reactor")
165    );
166}
167
168/// Borrow the thread-local reactor for the duration of `f`.
169///
170/// # Panics
171/// Panics if called re-entrantly on the same thread (i.e. from within another
172/// `with_reactor` call on the same thread). This mirrors the contract of
173/// `RefCell::borrow` and should never happen in correct executor code.
174pub fn with_reactor<F, R>(f: F) -> R
175where
176    F: FnOnce(&Reactor) -> R,
177{
178    REACTOR.with(|cell| f(&cell.borrow()))
179}
180
181/// Mutably borrow the thread-local reactor for the duration of `f`.
182///
183/// Only the executor's poll loop should use this; all other callers use the
184/// shared `with_reactor`.
185pub(crate) fn with_reactor_mut<F, R>(f: F) -> R
186where
187    F: FnOnce(&mut Reactor) -> R,
188{
189    REACTOR.with(|cell| f(&mut cell.borrow_mut()))
190}
191
192// ── Tests ─────────────────────────────────────────────────────────────────────
193
194#[cfg(test)]
195mod tests {
196    use super::*;
197    use crate::platform::sys::{create_pipe, events_with_capacity};
198
199    #[test]
200    fn reactor_initialises_via_thread_local() {
201        // `with_reactor` must not panic — reactor initialised lazily here.
202        with_reactor(|_r| {});
203    }
204
205    #[cfg(unix)]
206    #[test]
207    fn reactor_register_deregister_roundtrip() {
208        let (r, w) = create_pipe().unwrap();
209        with_reactor(|reactor| {
210            reactor
211                .register(r, 10, Interest::READABLE)
212                .expect("register");
213            reactor.deregister(r).expect("deregister");
214        });
215        // SAFETY: fds are valid and owned by this test; deregistered above.
216        unsafe { libc::close(r) };
217        unsafe { libc::close(w) };
218    }
219
220    #[test]
221    fn reactor_poll_timeout_zero_returns_immediately() {
222        let mut events = events_with_capacity(16);
223        with_reactor_mut(|reactor| {
224            // No sources registered — timeout=0 must return immediately with 0.
225            let n = reactor.poll(&mut events, Some(0)).expect("poll");
226            assert_eq!(n, 0);
227        });
228    }
229}