moduvex_runtime/reactor/mod.rs
1//! Reactor — I/O readiness event loop.
2//!
3//! The reactor multiplexes OS I/O events (epoll / kqueue / IOCP) onto async
4//! tasks. Each thread owns exactly one reactor, accessed via the thread-local
5//! `REACTOR`. The `with_reactor` helper provides safe, borrow-scoped access.
6//!
7//! Platform dispatch is done entirely at compile time via `cfg` attributes —
8//! zero runtime overhead, no vtable.
9
10use std::cell::RefCell;
11use std::io;
12
13use crate::platform::sys::{Events, Interest, RawSource};
14
15// ── Platform-specific backends ────────────────────────────────────────────────
16
17#[cfg(target_os = "linux")]
18mod epoll;
19#[cfg(target_os = "linux")]
20use epoll::EpollReactor as PlatformReactor;
21
22#[cfg(any(target_os = "macos", target_os = "freebsd"))]
23mod kqueue;
24#[cfg(any(target_os = "macos", target_os = "freebsd"))]
25use kqueue::KqueueReactor as PlatformReactor;
26
27#[cfg(target_os = "windows")]
28mod iocp;
29#[cfg(target_os = "windows")]
30use iocp::IocpReactor as PlatformReactor;
31
32pub mod source;
33pub mod waker_registry;
34
35pub use source::IoSource;
36pub(crate) use waker_registry::WakerRegistry;
37
38// ── ReactorBackend trait ──────────────────────────────────────────────────────
39
40/// Abstraction over platform-specific I/O polling mechanisms.
41///
42/// Implementors: `EpollReactor` (Linux), `KqueueReactor` (macOS/BSD),
43/// `IocpReactor` (Windows stub).
44///
45/// All methods take `&self` (shared ref) so the reactor can be held behind a
46/// `RefCell` and lent immutably to concurrent borrows on the same thread.
47pub(crate) trait ReactorBackend: Sized {
48 /// Construct a new backend instance, opening the underlying OS resource.
49 fn new() -> io::Result<Self>;
50
51 /// Register `source` with the reactor under `token`, monitoring `interest`.
52 ///
53 /// `token` is an opaque caller-chosen `usize` returned verbatim in events.
54 /// Callers must ensure `token` is unique within a single reactor instance.
55 fn register(&self, source: RawSource, token: usize, interest: Interest) -> io::Result<()>;
56
57 /// Update the interest mask of an already-registered source.
58 fn reregister(&self, source: RawSource, token: usize, interest: Interest) -> io::Result<()>;
59
60 /// Remove `source` from the reactor. Must be called before closing the fd.
61 fn deregister(&self, source: RawSource) -> io::Result<()>;
62
63 /// Block until at least one event is ready or `timeout_ms` elapses.
64 ///
65 /// Ready events are appended to `events` (cleared first). Returns the
66 /// number of events collected.
67 ///
68 /// `timeout_ms = None` blocks indefinitely.
69 /// `timeout_ms = Some(0)` returns immediately (non-blocking check).
70 fn poll(&self, events: &mut Events, timeout_ms: Option<u64>) -> io::Result<usize>;
71}
72
73// ── Reactor wrapper ───────────────────────────────────────────────────────────
74
75/// Thread-owned reactor that wraps the platform-specific backend.
76///
77/// Stored in a `RefCell` inside the thread-local so that mutable access can be
78/// checked at runtime (panics on re-entrant borrow, which must not happen in
79/// correct code).
80pub struct Reactor {
81 inner: PlatformReactor,
82 /// Maps reactor tokens to task wakers. Fired when I/O becomes ready.
83 pub(crate) wakers: WakerRegistry,
84}
85
86impl Reactor {
87 /// Create a new `Reactor`, opening the underlying OS polling resource.
88 pub fn new() -> io::Result<Self> {
89 Ok(Self {
90 inner: PlatformReactor::new()?,
91 wakers: WakerRegistry::new(),
92 })
93 }
94
95 /// Register a raw source. Delegates to the platform backend.
96 #[inline]
97 pub fn register(&self, source: RawSource, token: usize, interest: Interest) -> io::Result<()> {
98 self.inner.register(source, token, interest)
99 }
100
101 /// Re-register (update interest) a raw source.
102 #[inline]
103 pub fn reregister(
104 &self,
105 source: RawSource,
106 token: usize,
107 interest: Interest,
108 ) -> io::Result<()> {
109 self.inner.reregister(source, token, interest)
110 }
111
112 /// Deregister a raw source and remove its wakers from the registry.
113 #[inline]
114 pub fn deregister(&self, source: RawSource) -> io::Result<()> {
115 self.inner.deregister(source)
116 }
117
118 /// Deregister a raw source and also clear its waker slots by token.
119 ///
120 /// Use this variant when you know the token (IoSource drop path).
121 pub(crate) fn deregister_with_token(
122 &mut self,
123 source: RawSource,
124 token: usize,
125 ) -> io::Result<()> {
126 self.wakers.remove_token(token);
127 self.inner.deregister(source)
128 }
129
130 /// Poll for ready events and wake registered task wakers.
131 ///
132 /// Fills `events` from the platform backend, then fires any wakers whose
133 /// tokens appear in the event list. Returns the number of events collected.
134 pub fn poll(&mut self, events: &mut Events, timeout_ms: Option<u64>) -> io::Result<usize> {
135 let n = self.inner.poll(events, timeout_ms)?;
136 // Wake tasks registered for these events.
137 for ev in events.iter() {
138 self.wakers.wake_token(ev.token, ev.readable, ev.writable);
139 }
140 Ok(n)
141 }
142
143 /// Poll without waking (for executor's self-pipe drain path).
144 ///
145 /// The executor uses this when it wants raw events and handles waking itself.
146 pub(crate) fn poll_raw(
147 &self,
148 events: &mut Events,
149 timeout_ms: Option<u64>,
150 ) -> io::Result<usize> {
151 self.inner.poll(events, timeout_ms)
152 }
153}
154
155// ── Thread-local reactor ──────────────────────────────────────────────────────
156
157thread_local! {
158 /// Per-thread reactor instance.
159 ///
160 /// Lazily initialised on first access. Panics if the platform backend
161 /// fails to initialise (e.g. `kqueue()` / `epoll_create1()` returns an
162 /// error), which would indicate a severe OS-level resource exhaustion.
163 static REACTOR: RefCell<Reactor> = RefCell::new(
164 Reactor::new().expect("failed to initialise platform reactor")
165 );
166}
167
168/// Borrow the thread-local reactor for the duration of `f`.
169///
170/// # Panics
171/// Panics if called re-entrantly on the same thread (i.e. from within another
172/// `with_reactor` call on the same thread). This mirrors the contract of
173/// `RefCell::borrow` and should never happen in correct executor code.
174pub fn with_reactor<F, R>(f: F) -> R
175where
176 F: FnOnce(&Reactor) -> R,
177{
178 REACTOR.with(|cell| f(&cell.borrow()))
179}
180
181/// Mutably borrow the thread-local reactor for the duration of `f`.
182///
183/// Only the executor's poll loop should use this; all other callers use the
184/// shared `with_reactor`.
185pub(crate) fn with_reactor_mut<F, R>(f: F) -> R
186where
187 F: FnOnce(&mut Reactor) -> R,
188{
189 REACTOR.with(|cell| f(&mut cell.borrow_mut()))
190}
191
192// ── Tests ─────────────────────────────────────────────────────────────────────
193
194#[cfg(test)]
195mod tests {
196 use super::*;
197 use crate::platform::sys::{create_pipe, events_with_capacity};
198
199 #[test]
200 fn reactor_initialises_via_thread_local() {
201 // `with_reactor` must not panic — reactor initialised lazily here.
202 with_reactor(|_r| {});
203 }
204
205 #[cfg(unix)]
206 #[test]
207 fn reactor_register_deregister_roundtrip() {
208 let (r, w) = create_pipe().unwrap();
209 with_reactor(|reactor| {
210 reactor
211 .register(r, 10, Interest::READABLE)
212 .expect("register");
213 reactor.deregister(r).expect("deregister");
214 });
215 // SAFETY: fds are valid and owned by this test; deregistered above.
216 unsafe { libc::close(r) };
217 unsafe { libc::close(w) };
218 }
219
220 #[test]
221 fn reactor_poll_timeout_zero_returns_immediately() {
222 let mut events = events_with_capacity(16);
223 with_reactor_mut(|reactor| {
224 // No sources registered — timeout=0 must return immediately with 0.
225 let n = reactor.poll(&mut events, Some(0)).expect("poll");
226 assert_eq!(n, 0);
227 });
228 }
229}