Skip to main content

nexus_async_rt/
io.rs

1//! IO driver backed by mio.
2//!
3//! The [`IoDriver`] owns a `mio::Poll` instance and a token→waker mapping.
4//! When mio reports readiness on a token, the associated waker is fired.
5//!
6//! Tasks interact with IO through [`IoHandle`], a [`Copy`] handle that
7//! provides source registration and deregistration.
8//!
9//! # Token lifecycle
10//!
11//! 1. Task calls `io.register(&mut source, interest, waker)` → gets a `mio::Token`
12//! 2. Runtime calls `mio::Poll::poll` → readiness events arrive
13//! 3. For each event, the driver calls `waker.wake_by_ref()`
14//! 4. Task calls `io.deregister(&mut source)` when done
15//!
16//! Tokens are reused via a freelist. Stale wakeups (token reused after
17//! deregister) produce spurious wakeups — futures must tolerate this
18//! per the async contract.
19
20use std::io;
21use std::task::Waker;
22use std::time::Duration;
23
24use std::sync::Arc;
25
26// =============================================================================
27// Readiness state
28// =============================================================================
29
30/// Per-token readiness flags, updated by `poll_io` from epoll events.
31/// Read by net types to check if IO is ready without a syscall.
32#[derive(Clone, Copy, Default)]
33pub struct Readiness {
34    /// Socket is readable (data available or EOF).
35    pub readable: bool,
36    /// Socket is writable (send buffer has space).
37    pub writable: bool,
38}
39
40use mio::event::Source;
41use mio::{Events, Interest, Poll, Token};
42
43// =============================================================================
44// IoDriver — owned by Runtime
45// =============================================================================
46
47/// Reserved token for the mio::Waker (used to break out of epoll_wait
48/// when the root future or spawned tasks need attention).
49const WAKER_TOKEN: Token = Token(usize::MAX);
50
51/// Mio-backed IO driver. Owns the `Poll` instance and token→waker map.
52pub(crate) struct IoDriver {
53    /// Mio poll instance. Wraps epoll/kqueue.
54    poll: Poll,
55
56    /// Pre-allocated events buffer.
57    events: Events,
58
59    /// Mio waker for breaking out of `Poll::poll` from outside the
60    /// poll loop (e.g., root future's waker, or spawned task waker
61    /// firing from a callback).
62    mio_waker: Arc<mio::Waker>,
63
64    /// Token → waker. Indexed by `Token.0`.
65    /// `None` = vacant slot (in freelist).
66    /// `Some(waker)` = waker to fire on readiness.
67    wakers: Vec<Option<Waker>>,
68
69    /// Per-token readiness state. Updated by `poll_io`, read by net types.
70    /// Cleared when the task consumes the readiness (attempts IO).
71    readiness: Vec<Readiness>,
72
73    /// Intrusive freelist: `next_free[i]` is the index of the next
74    /// free slot after `i`. Only valid when `wakers[i]` is `None`.
75    next_free: Vec<usize>,
76
77    /// Head of the token freelist. `usize::MAX` = empty.
78    free_head: usize,
79}
80
81/// Sentinel for empty freelist.
82const NO_FREE: usize = usize::MAX;
83
84impl IoDriver {
85    /// Create a new IO driver.
86    ///
87    /// `event_capacity`: size of the mio events buffer (how many events
88    /// per `poll` call). 1024 is typical.
89    ///
90    /// `token_capacity`: initial token slot count. Grows as needed.
91    pub(crate) fn new(event_capacity: usize, token_capacity: usize) -> io::Result<Self> {
92        let poll = Poll::new()?;
93        let mio_waker = Arc::new(mio::Waker::new(poll.registry(), WAKER_TOKEN)?);
94        let events = Events::with_capacity(event_capacity);
95
96        let mut wakers = Vec::with_capacity(token_capacity);
97        let mut readiness = Vec::with_capacity(token_capacity);
98        let mut next_free = Vec::with_capacity(token_capacity);
99        wakers.resize_with(token_capacity, || None);
100        readiness.resize(token_capacity, Readiness::default());
101        for i in 0..token_capacity {
102            next_free.push(if i + 1 < token_capacity {
103                i + 1
104            } else {
105                NO_FREE
106            });
107        }
108
109        Ok(Self {
110            poll,
111            events,
112            mio_waker,
113            wakers,
114            readiness,
115            next_free,
116            free_head: if token_capacity > 0 { 0 } else { NO_FREE },
117        })
118    }
119
120    /// Returns a clone of the mio waker for breaking out of epoll_wait.
121    /// Used by the root future's waker and by task wakers that fire
122    /// outside the poll cycle.
123    pub(crate) fn mio_waker(&self) -> Arc<mio::Waker> {
124        Arc::clone(&self.mio_waker)
125    }
126
127    /// Returns a reference to the mio registry for source registration.
128    pub(crate) fn registry(&self) -> &mio::Registry {
129        self.poll.registry()
130    }
131
132    /// Claim a token slot, associating it with a waker. O(1).
133    ///
134    /// Returns the `mio::Token` to use when registering a source.
135    /// Grows the Vecs if no free slots are available.
136    pub(crate) fn claim_token(&mut self, waker: Waker) -> Token {
137        let idx = if self.free_head == NO_FREE {
138            // Grow: append a new slot.
139            let idx = self.wakers.len();
140            self.wakers.push(None);
141            self.readiness.push(Readiness::default());
142            self.next_free.push(NO_FREE);
143            idx
144        } else {
145            // Pop from freelist head. O(1).
146            let idx = self.free_head;
147            self.free_head = self.next_free[idx];
148            idx
149        };
150
151        self.wakers[idx] = Some(waker);
152        Token(idx)
153    }
154
155    /// Release a token slot back to the freelist. O(1).
156    pub(crate) fn release_token(&mut self, token: Token) {
157        let idx = token.0;
158        if idx < self.wakers.len() {
159            self.wakers[idx] = None;
160            // Push to freelist head.
161            self.next_free[idx] = self.free_head;
162            self.free_head = idx;
163        }
164    }
165
166    /// Update the waker associated with a token.
167    ///
168    /// Used when a different task takes over an IO source
169    /// (e.g., after `into_split`).
170    pub(crate) fn set_waker(&mut self, token: Token, waker: Waker) {
171        if let Some(slot) = self.wakers.get_mut(token.0) {
172            *slot = Some(waker);
173        }
174    }
175
176    /// Get the readiness state for a token.
177    pub(crate) fn readiness(&self, token: Token) -> Readiness {
178        self.readiness.get(token.0).copied().unwrap_or_default()
179    }
180
181    /// Clear the readable flag for a token. Called after a successful read
182    /// or a WouldBlock — the next `poll_io` will re-set it when epoll fires.
183    pub(crate) fn clear_readable(&mut self, token: Token) {
184        if let Some(r) = self.readiness.get_mut(token.0) {
185            r.readable = false;
186        }
187    }
188
189    /// Clear the writable flag for a token.
190    pub(crate) fn clear_writable(&mut self, token: Token) {
191        if let Some(r) = self.readiness.get_mut(token.0) {
192            r.writable = false;
193        }
194    }
195
196    /// Poll mio for IO events and wake associated tasks.
197    ///
198    /// `timeout`: `None` blocks indefinitely, `Some(Duration::ZERO)` is
199    /// non-blocking. Returns the number of wakers fired.
200    pub(crate) fn poll_io(&mut self, timeout: Option<Duration>) -> io::Result<usize> {
201        self.poll.poll(&mut self.events, timeout)?;
202
203        let mut woken = 0;
204        for event in &self.events {
205            let token = event.token();
206            if token == WAKER_TOKEN {
207                // Mio waker fired — root future or external wake. Not a socket.
208                continue;
209            }
210            let idx = token.0;
211
212            // Record readiness state from this event.
213            if let Some(r) = self.readiness.get_mut(idx) {
214                if event.is_readable() {
215                    r.readable = true;
216                }
217                if event.is_writable() {
218                    r.writable = true;
219                }
220            }
221
222            if let Some(Some(waker)) = self.wakers.get(idx) {
223                waker.wake_by_ref();
224                woken += 1;
225            }
226            // Stale tokens (None) are silently skipped — spurious wakeup.
227        }
228
229        Ok(woken)
230    }
231}
232
233// =============================================================================
234// IoHandle — Copy handle for tasks
235// =============================================================================
236
237/// [`Copy`] handle for IO operations from async tasks.
238///
239/// Provides source registration with the mio reactor. Obtained from
240/// [`IoHandle::current`].
241///
242/// # Safety
243///
244/// The raw pointers are valid for the lifetime of the [`crate::Runtime`].
245/// Single-threaded — no concurrent access.
246#[derive(Clone, Copy)]
247pub struct IoHandle {
248    /// Pointer to the mio registry (borrowed from Poll, stable).
249    registry: *const mio::Registry,
250    /// Pointer to the IoDriver for token management.
251    driver: *mut IoDriver,
252}
253
254impl IoHandle {
255    /// Create a handle from driver references.
256    pub(crate) fn new(driver: &mut IoDriver) -> Self {
257        Self {
258            registry: std::ptr::from_ref(driver.registry()),
259            driver: std::ptr::from_mut(driver),
260        }
261    }
262
263    /// Returns the [`IoHandle`] for the currently running runtime.
264    ///
265    /// Use when you need to register a mio source from inside an async task —
266    /// e.g., when constructing a [`TcpStream`](crate::TcpStream) or
267    /// [`UdpSocket`](crate::UdpSocket). Mirrors `tokio::runtime::Handle::current()`.
268    ///
269    /// # Panics
270    ///
271    /// Panics if called outside a [`Runtime::block_on`](crate::Runtime::block_on)
272    /// context.
273    #[must_use]
274    pub fn current() -> IoHandle {
275        let ptr = crate::context::current_io_ptr();
276        assert!(
277            !ptr.is_null(),
278            "IoHandle::current() called outside Runtime::block_on"
279        );
280        // SAFETY: ptr installed by Runtime::block_on, valid for Runtime lifetime.
281        // Single-threaded executor — no concurrent access.
282        IoHandle::new(unsafe { &mut *ptr })
283    }
284
285    /// Register a mio source with the given interest and waker.
286    ///
287    /// Returns the assigned token for use with `deregister`.
288    pub fn register(
289        &self,
290        source: &mut impl Source,
291        interest: Interest,
292        waker: Waker,
293    ) -> io::Result<Token> {
294        // SAFETY: driver pointer is valid (Runtime lifetime).
295        let driver = unsafe { &mut *self.driver };
296        let token = driver.claim_token(waker);
297        // SAFETY: registry pointer is valid (borrowed from Poll).
298        let registry = unsafe { &*self.registry };
299        if let Err(e) = registry.register(source, token, interest) {
300            // Roll back: release the token so it's not leaked.
301            driver.release_token(token);
302            return Err(e);
303        }
304        Ok(token)
305    }
306
307    /// Update the waker for a token. Called when a stream is polled
308    /// from a different task than the one that registered it
309    /// (e.g., after `into_split`).
310    pub fn set_waker(&self, token: Token, waker: Waker) {
311        // SAFETY: driver pointer valid (Runtime lifetime).
312        let driver = unsafe { &mut *self.driver };
313        driver.set_waker(token, waker);
314    }
315
316    /// Query the readiness state for a token.
317    ///
318    /// Returns the last-known readiness from epoll events. Cleared
319    /// after the task consumes the readiness (calls clear_readable/clear_writable).
320    pub fn readiness(&self, token: Token) -> Readiness {
321        // SAFETY: driver pointer valid (Runtime lifetime).
322        let driver = unsafe { &*self.driver };
323        driver.readiness(token)
324    }
325
326    /// Clear the readable flag for a token. Call after a successful
327    /// read or WouldBlock to wait for the next epoll notification.
328    pub fn clear_readable(&self, token: Token) {
329        // SAFETY: driver pointer valid (Runtime lifetime).
330        let driver = unsafe { &mut *self.driver };
331        driver.clear_readable(token);
332    }
333
334    /// Clear the writable flag for a token.
335    pub fn clear_writable(&self, token: Token) {
336        let driver = unsafe { &mut *self.driver };
337        driver.clear_writable(token);
338    }
339
340    /// Deregister a source and release its token.
341    ///
342    /// # Safety
343    ///
344    /// The driver and registry pointers must be valid (Runtime lifetime).
345    pub unsafe fn deregister(&self, source: &mut impl Source, token: Token) -> io::Result<()> {
346        // SAFETY: caller guarantees pointers are valid.
347        let driver = unsafe { &mut *self.driver };
348        let registry = unsafe { &*self.registry };
349        registry.deregister(source)?;
350        driver.release_token(token);
351        Ok(())
352    }
353}
354
355#[cfg(test)]
356mod tests {
357    use super::*;
358
359    #[test]
360    #[should_panic(expected = "called outside Runtime::block_on")]
361    fn current_panics_outside_runtime() {
362        // Pins the documented panic contract. Most tests exercise
363        // `current()` transitively (via `TcpStream::connect`, etc.)
364        // inside `block_on`; this is the direct contract test.
365        let _ = IoHandle::current();
366    }
367}