Skip to main content

nexus_rt/
mio.rs

1//! Mio IO driver for nexus-rt.
2//!
3//! Integrates [`mio`] as a driver following the
4//! [`Installer`]/[`Plugin`](crate::Plugin) pattern. Handlers receive
5//! [`mio::event::Event`] directly — no wrapper types.
6//!
7//! # Architecture
8//!
9//! - [`MioInstaller`] is the installer — consumed at setup, registers the
10//!   [`MioDriver`] into [`WorldBuilder`] and returns a [`MioPoller`].
11//! - [`MioPoller`] is the poll-time handle. `poll(world, timeout)` polls
12//!   mio for readiness events and fires handlers.
13//! - [`MioDriver`] is the World resource wrapping `mio::Poll` +
14//!   `slab::Slab<S>`. Users register mio sources via `registry()` and
15//!   manage handlers via `insert()`/`remove()`.
16//!
17//! # Handler lifecycle (move-out-fire)
18//!
19//! 1. User calls `driver.insert(handler)` → `mio::Token`
20//! 2. User calls `driver.registry().register(&mut source, token, interest)`
21//! 3. On readiness: poller removes handler from slab, fires it
22//! 4. Handler re-registers itself if it wants more events
23//!
24//! Handlers that don't re-insert themselves are dropped after firing.
25//! Stale tokens (already removed) are silently skipped.
26//!
27//! # Source lifecycle
28//!
29//! Mio sources (sockets, pipes, etc.) are registered with `mio::Registry`
30//! **separately** from handlers. The driver does not track or own sources.
31//!
32//! When a handler is removed — either by [`MioDriver::remove`] or by the
33//! move-out-fire pattern during [`MioPoller::poll`] — the mio source
34//! remains registered with the kernel. The user is responsible for
35//! calling `registry().deregister(&mut source)` if no replacement handler
36//! will be inserted. Forgetting to deregister is safe (stale tokens are
37//! skipped) but wastes kernel resources.
38
39use std::io;
40use std::marker::PhantomData;
41use std::ops::DerefMut;
42use std::time::Duration;
43
44use crate::driver::Installer;
45use crate::handler::Handler;
46use crate::world::{ResourceId, World, WorldBuilder};
47
48/// Default mio events buffer capacity.
49const DEFAULT_EVENT_CAPACITY: usize = 1024;
50
51/// Default handler slab pre-allocation.
52const DEFAULT_HANDLER_CAPACITY: usize = 64;
53
54/// Configuration trait for generic IO driver code.
55///
56/// ZST annotation type that bundles the handler storage type with a
57/// wrapping function. Library code parameterized over `C: MioConfig`
58/// can insert and wrap handlers without knowing the concrete storage
59/// strategy.
60pub trait MioConfig: Send + 'static {
61    /// The handler storage type (e.g. `Box<dyn Handler<mio::event::Event>>`).
62    type Storage: DerefMut<Target = dyn Handler<::mio::event::Event>> + Send + 'static;
63
64    /// Wrap a concrete handler into the storage type.
65    fn wrap(handler: impl Handler<::mio::event::Event> + 'static) -> Self::Storage;
66}
67
68/// Boxed mio configuration — heap-allocates each handler.
69pub struct BoxedMio;
70
71impl MioConfig for BoxedMio {
72    type Storage = Box<dyn Handler<::mio::event::Event>>;
73
74    fn wrap(handler: impl Handler<::mio::event::Event> + 'static) -> Self::Storage {
75        Box::new(handler)
76    }
77}
78
79/// Inline mio configuration — stores handlers in a fixed-size buffer.
80///
81/// Panics if a handler exceeds the buffer size (256 bytes).
82#[cfg(feature = "smartptr")]
83pub struct InlineMio;
84
85#[cfg(feature = "smartptr")]
86impl MioConfig for InlineMio {
87    type Storage = crate::FlatVirtual<::mio::event::Event, nexus_smartptr::B256>;
88
89    fn wrap(handler: impl Handler<::mio::event::Event> + 'static) -> Self::Storage {
90        let ptr: *const dyn Handler<::mio::event::Event> = &handler;
91        // SAFETY: ptr's metadata (vtable) corresponds to handler's concrete type.
92        unsafe { nexus_smartptr::Flat::new_raw(handler, ptr) }
93    }
94}
95
96/// Flex mio configuration — inline with heap fallback.
97///
98/// Stores inline if the handler fits in 256 bytes, otherwise
99/// heap-allocates. No panics.
100#[cfg(feature = "smartptr")]
101pub struct FlexMio;
102
103#[cfg(feature = "smartptr")]
104impl MioConfig for FlexMio {
105    type Storage = crate::FlexVirtual<::mio::event::Event, nexus_smartptr::B256>;
106
107    fn wrap(handler: impl Handler<::mio::event::Event> + 'static) -> Self::Storage {
108        let ptr: *const dyn Handler<::mio::event::Event> = &handler;
109        // SAFETY: ptr's metadata (vtable) corresponds to handler's concrete type.
110        unsafe { nexus_smartptr::Flex::new_raw(handler, ptr) }
111    }
112}
113
114/// World resource wrapping `mio::Poll` and a handler slab.
115///
116/// `S` is the handler storage type. Defaults to
117/// `Box<dyn Handler<mio::event::Event>>`.
118///
119/// Users interact with this through `Res<MioDriver<S>>` (for
120/// `registry()`) or `ResMut<MioDriver<S>>` (for `insert`/`remove`).
121pub struct MioDriver<S = Box<dyn Handler<::mio::event::Event>>> {
122    poll: ::mio::Poll,
123    handlers: ::slab::Slab<S>,
124}
125
126impl<S: Send + 'static> crate::world::Resource for MioDriver<S> {}
127
128impl<S> MioDriver<S> {
129    /// Access the mio registry for registering/reregistering sources.
130    ///
131    /// `Poll::registry()` takes `&self`, so this works through
132    /// `Res<MioDriver<S>>` (shared access).
133    pub fn registry(&self) -> &::mio::Registry {
134        self.poll.registry()
135    }
136
137    /// Insert a handler and return its token.
138    ///
139    /// The token maps to a `mio::Token` for use with
140    /// `Registry::register`. Requires `ResMut<MioDriver<S>>`.
141    pub fn insert(&mut self, handler: S) -> ::mio::Token {
142        ::mio::Token(self.handlers.insert(handler))
143    }
144
145    /// Remove a handler by token.
146    ///
147    /// The caller is responsible for deregistering the mio source via
148    /// `registry().deregister(&mut source)` if no replacement handler
149    /// will be inserted. Failing to deregister is safe (stale events
150    /// are skipped) but wastes kernel resources.
151    ///
152    /// # Panics
153    ///
154    /// Panics if the token is not present in the slab.
155    pub fn remove(&mut self, token: ::mio::Token) -> S {
156        self.handlers.remove(token.0)
157    }
158
159    /// Returns `true` if the token has a handler in the slab.
160    pub fn contains(&self, token: ::mio::Token) -> bool {
161        self.handlers.contains(token.0)
162    }
163
164    /// Number of active handlers.
165    pub fn len(&self) -> usize {
166        self.handlers.len()
167    }
168
169    /// Whether the handler slab is empty.
170    pub fn is_empty(&self) -> bool {
171        self.handlers.is_empty()
172    }
173}
174
175/// Mio driver installer — generic over handler storage.
176///
177/// `S` is the handler storage type. Defaults to
178/// `Box<dyn Handler<mio::event::Event>>`.
179///
180/// Consumed by [`WorldBuilder::install_driver`]. Registers a
181/// [`MioDriver<S>`] resource and returns a [`MioPoller`] for poll-time use.
182///
183/// # Examples
184///
185/// ```ignore
186/// // Defaults: 1024 events, 64 handlers
187/// let poller = builder.install_driver(MioInstaller::new());
188///
189/// // Custom capacities
190/// let poller = builder.install_driver(
191///     MioInstaller::new()
192///         .event_capacity(256)
193///         .handler_capacity(32),
194/// );
195/// ```
196pub struct MioInstaller<S = Box<dyn Handler<::mio::event::Event>>> {
197    event_capacity: usize,
198    handler_capacity: usize,
199    _marker: PhantomData<S>,
200}
201
202impl<S> MioInstaller<S> {
203    /// Creates a new mio installer with default capacities.
204    ///
205    /// Defaults:
206    /// - `event_capacity`: 1024 (mio events buffer)
207    /// - `handler_capacity`: 64 (slab pre-allocation)
208    pub fn new() -> Self {
209        MioInstaller {
210            event_capacity: DEFAULT_EVENT_CAPACITY,
211            handler_capacity: DEFAULT_HANDLER_CAPACITY,
212            _marker: PhantomData,
213        }
214    }
215
216    /// Set the mio events buffer capacity (default: 1024).
217    ///
218    /// This is the maximum number of readiness events returned per
219    /// [`poll()`](MioPoller::poll) call. Events beyond this limit are
220    /// deferred to the next poll.
221    pub fn event_capacity(mut self, cap: usize) -> Self {
222        self.event_capacity = cap;
223        self
224    }
225
226    /// Set the initial handler slab pre-allocation (default: 64).
227    ///
228    /// The slab grows automatically if more handlers are inserted.
229    /// Pre-allocating avoids reallocation during early setup.
230    pub fn handler_capacity(mut self, cap: usize) -> Self {
231        self.handler_capacity = cap;
232        self
233    }
234}
235
236impl<S> Default for MioInstaller<S> {
237    fn default() -> Self {
238        Self::new()
239    }
240}
241
242impl<S: Send + 'static> Installer for MioInstaller<S> {
243    type Poller = MioPoller<S>;
244
245    fn install(self, world: &mut WorldBuilder) -> MioPoller<S> {
246        let poll = ::mio::Poll::new().expect("failed to create mio Poll");
247        let handlers = ::slab::Slab::<S>::with_capacity(self.handler_capacity);
248        let driver_id = world.register(MioDriver { poll, handlers });
249        MioPoller {
250            driver_id,
251            events: ::mio::Events::with_capacity(self.event_capacity),
252            buf: Vec::with_capacity(self.event_capacity),
253        }
254    }
255}
256
257/// Mio driver poller — poll-time handle.
258///
259/// Owns the `mio::Events` buffer and a drain buffer for the two-phase
260/// poll pattern (drain handlers out, then fire).
261pub struct MioPoller<S = Box<dyn Handler<::mio::event::Event>>> {
262    driver_id: ResourceId,
263    events: ::mio::Events,
264    buf: Vec<(::mio::event::Event, S)>,
265}
266
267impl<S> std::fmt::Debug for MioPoller<S> {
268    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
269        f.debug_struct("MioPoller")
270            .field("driver_id", &self.driver_id)
271            .field("buf_len", &self.buf.len())
272            .finish()
273    }
274}
275
276impl<S: DerefMut + Send + 'static> MioPoller<S>
277where
278    S::Target: Handler<::mio::event::Event>,
279{
280    /// Poll mio for readiness events and fire handlers.
281    ///
282    /// Three-phase:
283    /// 1. Poll mio with the given timeout
284    /// 2. Drain handlers from the slab into an internal buffer
285    /// 3. Fire each handler with its event
286    ///
287    /// Returns [`Ok`] with the number of handlers fired on success.
288    /// Returns [`Err`] if [`mio::Poll::poll`] fails; the error is
289    /// propagated unchanged from the underlying mio call.
290    ///
291    /// Stale tokens (handler already removed) are silently skipped.
292    ///
293    /// # Re-registration
294    ///
295    /// Handlers are **moved out** of the slab before firing. A handler
296    /// that wants to continue receiving events must re-insert a new
297    /// handler and call `reregister()` on the source with the new token:
298    ///
299    /// ```ignore
300    /// fn on_readable(mut driver: ResMut<MioDriver>, event: mio::event::Event) {
301    ///     let stream: &mut TcpStream = /* ... */;
302    ///     // ... read from stream ...
303    ///
304    ///     // Re-register for next event
305    ///     let handler = on_readable.into_handler(/* registry */);
306    ///     let new_token = driver.insert(Box::new(handler));
307    ///     driver.registry()
308    ///         .reregister(stream, new_token, mio::Interest::READABLE)
309    ///         .unwrap();
310    /// }
311    /// ```
312    pub fn poll(&mut self, world: &mut World, timeout: Option<Duration>) -> io::Result<usize> {
313        // 1. Poll mio
314        // SAFETY: driver_id was produced by install() on the same builder.
315        // Type matches (MioDriver<S>). We have &mut World.
316        let driver = unsafe { world.get_mut::<MioDriver<S>>(self.driver_id) };
317        driver.poll.poll(&mut self.events, timeout)?;
318
319        // 2. Drain handlers — move out of slab for each ready token
320        for event in &self.events {
321            let key = event.token().0;
322            if driver.handlers.contains(key) {
323                let handler = driver.handlers.remove(key);
324                self.buf.push((event.clone(), handler));
325            }
326        }
327
328        // 3. Fire handlers
329        let fired = self.buf.len();
330        for (event, mut handler) in self.buf.drain(..) {
331            world.next_sequence();
332            handler.deref_mut().run(world, event);
333        }
334
335        Ok(fired)
336    }
337}
338
339#[cfg(test)]
340mod tests {
341    use super::*;
342    use crate::{IntoHandler, ResMut, WorldBuilder};
343
344    #[test]
345    fn install_registers_driver() {
346        let mut builder = WorldBuilder::new();
347        let _poller: MioPoller = builder.install_driver(MioInstaller::new());
348        let world = builder.build();
349        assert!(world.contains::<MioDriver>());
350    }
351
352    #[test]
353    fn poll_empty_returns_zero() {
354        let mut builder = WorldBuilder::new();
355        let mut poller: MioPoller = builder.install_driver(MioInstaller::new());
356        let mut world = builder.build();
357        let fired = poller
358            .poll(&mut world, Some(Duration::from_millis(0)))
359            .unwrap();
360        assert_eq!(fired, 0);
361    }
362
363    #[test]
364    fn waker_fires_handler() {
365        let mut builder = WorldBuilder::new();
366        builder.register::<bool>(false);
367        let mut poller: MioPoller = builder.install_driver(MioInstaller::new());
368        let mut world = builder.build();
369
370        fn on_wake(mut flag: ResMut<bool>, _event: ::mio::event::Event) {
371            *flag = true;
372        }
373
374        let handler = on_wake.into_handler(world.registry());
375        let driver = world.resource_mut::<MioDriver>();
376        let token = driver.insert(Box::new(handler));
377        let waker = ::mio::Waker::new(driver.registry(), token).unwrap();
378
379        waker.wake().unwrap();
380
381        let fired = poller
382            .poll(&mut world, Some(Duration::from_millis(100)))
383            .unwrap();
384        assert_eq!(fired, 1);
385        assert!(*world.resource::<bool>());
386    }
387
388    #[test]
389    fn handler_fires_twice_with_waker() {
390        let mut builder = WorldBuilder::new();
391        builder.register::<u64>(0);
392        let mut poller: MioPoller = builder.install_driver(MioInstaller::new());
393        let mut world = builder.build();
394
395        fn on_wake(mut counter: ResMut<u64>, _event: ::mio::event::Event) {
396            *counter += 1;
397        }
398
399        // Create waker — only one allowed per Poll instance.
400        let handler = on_wake.into_handler(world.registry());
401        let driver = world.resource_mut::<MioDriver>();
402        let token = driver.insert(Box::new(handler));
403        let waker = ::mio::Waker::new(driver.registry(), token).unwrap();
404
405        waker.wake().unwrap();
406        let fired = poller
407            .poll(&mut world, Some(Duration::from_millis(100)))
408            .unwrap();
409        assert_eq!(fired, 1);
410        assert_eq!(*world.resource::<u64>(), 1);
411
412        // Re-insert handler. Slab reuses the freed slot, so the token
413        // matches the waker's registration.
414        let handler2 = on_wake.into_handler(world.registry());
415        let driver = world.resource_mut::<MioDriver>();
416        let token2 = driver.insert(Box::new(handler2));
417        assert_eq!(token, token2, "slab must reuse freed slot");
418
419        waker.wake().unwrap();
420        let fired = poller
421            .poll(&mut world, Some(Duration::from_millis(100)))
422            .unwrap();
423        assert_eq!(fired, 1);
424        assert_eq!(*world.resource::<u64>(), 2);
425    }
426
427    #[test]
428    fn cancel_before_fire() {
429        let mut builder = WorldBuilder::new();
430        builder.register::<bool>(false);
431        let mut poller: MioPoller = builder.install_driver(MioInstaller::new());
432        let mut world = builder.build();
433
434        fn on_wake(mut flag: ResMut<bool>, _event: ::mio::event::Event) {
435            *flag = true;
436        }
437
438        let handler = on_wake.into_handler(world.registry());
439        let driver = world.resource_mut::<MioDriver>();
440        let token = driver.insert(Box::new(handler));
441        let waker = ::mio::Waker::new(driver.registry(), token).unwrap();
442
443        // Remove handler before waking
444        let driver = world.resource_mut::<MioDriver>();
445        let _removed = driver.remove(token);
446
447        waker.wake().unwrap();
448
449        // Poll — stale token, handler should NOT fire
450        let fired = poller
451            .poll(&mut world, Some(Duration::from_millis(100)))
452            .unwrap();
453        assert_eq!(fired, 0);
454        assert!(!*world.resource::<bool>());
455    }
456
457    #[test]
458    fn poll_advances_sequence() {
459        let mut builder = WorldBuilder::new();
460        builder.register::<u64>(0);
461        let mut poller: MioPoller = builder.install_driver(MioInstaller::new());
462        let mut world = builder.build();
463
464        fn on_wake(mut counter: ResMut<u64>, _event: ::mio::event::Event) {
465            *counter += 1;
466        }
467
468        let handler = on_wake.into_handler(world.registry());
469        let driver = world.resource_mut::<MioDriver>();
470        let token = driver.insert(Box::new(handler));
471        let waker = ::mio::Waker::new(driver.registry(), token).unwrap();
472
473        waker.wake().unwrap();
474
475        let seq_before = world.current_sequence();
476        poller
477            .poll(&mut world, Some(Duration::from_millis(100)))
478            .unwrap();
479        assert_eq!(world.current_sequence().0, seq_before.0 + 1);
480    }
481
482    #[test]
483    fn stale_token_skipped() {
484        let mut builder = WorldBuilder::new();
485        let mut poller: MioPoller = builder.install_driver(MioInstaller::new());
486        let mut world = builder.build();
487
488        // Just poll with no sources — exercises the empty/stale path
489        let fired = poller
490            .poll(&mut world, Some(Duration::from_millis(0)))
491            .unwrap();
492        assert_eq!(fired, 0);
493    }
494
495    #[test]
496    fn custom_capacities() {
497        let mut builder = WorldBuilder::new();
498        let _poller: MioPoller =
499            builder.install_driver(MioInstaller::new().event_capacity(256).handler_capacity(32));
500        let world = builder.build();
501        assert!(world.contains::<MioDriver>());
502    }
503}