nexus_async_rt/io.rs
1//! IO driver backed by mio.
2//!
3//! The [`IoDriver`] owns a `mio::Poll` instance and a token→waker mapping.
4//! When mio reports readiness on a token, the associated waker is fired.
5//!
6//! Tasks interact with IO through [`IoHandle`], a [`Copy`] handle that
7//! provides source registration and deregistration.
8//!
9//! # Token lifecycle
10//!
11//! 1. Task calls `io.register(&mut source, interest, waker)` → gets a `mio::Token`
12//! 2. Runtime calls `mio::Poll::poll` → readiness events arrive
13//! 3. For each event, the driver calls `waker.wake_by_ref()`
14//! 4. Task calls `io.deregister(&mut source)` when done
15//!
16//! Tokens are reused via a freelist. Stale wakeups (token reused after
17//! deregister) produce spurious wakeups — futures must tolerate this
18//! per the async contract.
19
20use std::io;
21use std::task::Waker;
22use std::time::Duration;
23
24use std::sync::Arc;
25
26// =============================================================================
27// Readiness state
28// =============================================================================
29
30/// Per-token readiness flags, updated by `poll_io` from epoll events.
31/// Read by net types to check if IO is ready without a syscall.
32#[derive(Clone, Copy, Default)]
33pub struct Readiness {
34 /// Socket is readable (data available or EOF).
35 pub readable: bool,
36 /// Socket is writable (send buffer has space).
37 pub writable: bool,
38}
39
40use mio::event::Source;
41use mio::{Events, Interest, Poll, Token};
42
43// =============================================================================
44// IoDriver — owned by Runtime
45// =============================================================================
46
47/// Reserved token for the mio::Waker (used to break out of epoll_wait
48/// when the root future or spawned tasks need attention).
49const WAKER_TOKEN: Token = Token(usize::MAX);
50
51/// Mio-backed IO driver. Owns the `Poll` instance and token→waker map.
52pub(crate) struct IoDriver {
53 /// Mio poll instance. Wraps epoll/kqueue.
54 poll: Poll,
55
56 /// Pre-allocated events buffer.
57 events: Events,
58
59 /// Mio waker for breaking out of `Poll::poll` from outside the
60 /// poll loop (e.g., root future's waker, or spawned task waker
61 /// firing from a callback).
62 mio_waker: Arc<mio::Waker>,
63
64 /// Token → waker. Indexed by `Token.0`.
65 /// `None` = vacant slot (in freelist).
66 /// `Some(waker)` = waker to fire on readiness.
67 wakers: Vec<Option<Waker>>,
68
69 /// Per-token readiness state. Updated by `poll_io`, read by net types.
70 /// Cleared when the task consumes the readiness (attempts IO).
71 readiness: Vec<Readiness>,
72
73 /// Intrusive freelist: `next_free[i]` is the index of the next
74 /// free slot after `i`. Only valid when `wakers[i]` is `None`.
75 next_free: Vec<usize>,
76
77 /// Head of the token freelist. `usize::MAX` = empty.
78 free_head: usize,
79}
80
81/// Sentinel for empty freelist.
82const NO_FREE: usize = usize::MAX;
83
84impl IoDriver {
85 /// Create a new IO driver.
86 ///
87 /// `event_capacity`: size of the mio events buffer (how many events
88 /// per `poll` call). 1024 is typical.
89 ///
90 /// `token_capacity`: initial token slot count. Grows as needed.
91 pub(crate) fn new(event_capacity: usize, token_capacity: usize) -> io::Result<Self> {
92 let poll = Poll::new()?;
93 let mio_waker = Arc::new(mio::Waker::new(poll.registry(), WAKER_TOKEN)?);
94 let events = Events::with_capacity(event_capacity);
95
96 let mut wakers = Vec::with_capacity(token_capacity);
97 let mut readiness = Vec::with_capacity(token_capacity);
98 let mut next_free = Vec::with_capacity(token_capacity);
99 wakers.resize_with(token_capacity, || None);
100 readiness.resize(token_capacity, Readiness::default());
101 for i in 0..token_capacity {
102 next_free.push(if i + 1 < token_capacity {
103 i + 1
104 } else {
105 NO_FREE
106 });
107 }
108
109 Ok(Self {
110 poll,
111 events,
112 mio_waker,
113 wakers,
114 readiness,
115 next_free,
116 free_head: if token_capacity > 0 { 0 } else { NO_FREE },
117 })
118 }
119
120 /// Returns a clone of the mio waker for breaking out of epoll_wait.
121 /// Used by the root future's waker and by task wakers that fire
122 /// outside the poll cycle.
123 pub(crate) fn mio_waker(&self) -> Arc<mio::Waker> {
124 Arc::clone(&self.mio_waker)
125 }
126
127 /// Returns a reference to the mio registry for source registration.
128 pub(crate) fn registry(&self) -> &mio::Registry {
129 self.poll.registry()
130 }
131
132 /// Claim a token slot, associating it with a waker. O(1).
133 ///
134 /// Returns the `mio::Token` to use when registering a source.
135 /// Grows the Vecs if no free slots are available.
136 pub(crate) fn claim_token(&mut self, waker: Waker) -> Token {
137 let idx = if self.free_head == NO_FREE {
138 // Grow: append a new slot.
139 let idx = self.wakers.len();
140 self.wakers.push(None);
141 self.readiness.push(Readiness::default());
142 self.next_free.push(NO_FREE);
143 idx
144 } else {
145 // Pop from freelist head. O(1).
146 let idx = self.free_head;
147 self.free_head = self.next_free[idx];
148 idx
149 };
150
151 self.wakers[idx] = Some(waker);
152 Token(idx)
153 }
154
155 /// Release a token slot back to the freelist. O(1).
156 pub(crate) fn release_token(&mut self, token: Token) {
157 let idx = token.0;
158 if idx < self.wakers.len() {
159 self.wakers[idx] = None;
160 // Push to freelist head.
161 self.next_free[idx] = self.free_head;
162 self.free_head = idx;
163 }
164 }
165
166 /// Update the waker associated with a token.
167 ///
168 /// Used when a different task takes over an IO source
169 /// (e.g., after `into_split`).
170 pub(crate) fn set_waker(&mut self, token: Token, waker: Waker) {
171 if let Some(slot) = self.wakers.get_mut(token.0) {
172 *slot = Some(waker);
173 }
174 }
175
176 /// Get the readiness state for a token.
177 pub(crate) fn readiness(&self, token: Token) -> Readiness {
178 self.readiness.get(token.0).copied().unwrap_or_default()
179 }
180
181 /// Clear the readable flag for a token. Called after a successful read
182 /// or a WouldBlock — the next `poll_io` will re-set it when epoll fires.
183 pub(crate) fn clear_readable(&mut self, token: Token) {
184 if let Some(r) = self.readiness.get_mut(token.0) {
185 r.readable = false;
186 }
187 }
188
189 /// Clear the writable flag for a token.
190 pub(crate) fn clear_writable(&mut self, token: Token) {
191 if let Some(r) = self.readiness.get_mut(token.0) {
192 r.writable = false;
193 }
194 }
195
196 /// Poll mio for IO events and wake associated tasks.
197 ///
198 /// `timeout`: `None` blocks indefinitely, `Some(Duration::ZERO)` is
199 /// non-blocking. Returns the number of wakers fired.
200 pub(crate) fn poll_io(&mut self, timeout: Option<Duration>) -> io::Result<usize> {
201 self.poll.poll(&mut self.events, timeout)?;
202
203 let mut woken = 0;
204 for event in &self.events {
205 let token = event.token();
206 if token == WAKER_TOKEN {
207 // Mio waker fired — root future or external wake. Not a socket.
208 continue;
209 }
210 let idx = token.0;
211
212 // Record readiness state from this event.
213 if let Some(r) = self.readiness.get_mut(idx) {
214 if event.is_readable() {
215 r.readable = true;
216 }
217 if event.is_writable() {
218 r.writable = true;
219 }
220 }
221
222 if let Some(Some(waker)) = self.wakers.get(idx) {
223 waker.wake_by_ref();
224 woken += 1;
225 }
226 // Stale tokens (None) are silently skipped — spurious wakeup.
227 }
228
229 Ok(woken)
230 }
231}
232
233// =============================================================================
234// IoHandle — Copy handle for tasks
235// =============================================================================
236
237/// [`Copy`] handle for IO operations from async tasks.
238///
239/// Provides source registration with the mio reactor. Obtained from
240/// [`IoHandle::current`].
241///
242/// # Safety
243///
244/// The raw pointers are valid for the lifetime of the [`crate::Runtime`].
245/// Single-threaded — no concurrent access.
246#[derive(Clone, Copy)]
247pub struct IoHandle {
248 /// Pointer to the mio registry (borrowed from Poll, stable).
249 registry: *const mio::Registry,
250 /// Pointer to the IoDriver for token management.
251 driver: *mut IoDriver,
252}
253
254impl IoHandle {
255 /// Create a handle from driver references.
256 pub(crate) fn new(driver: &mut IoDriver) -> Self {
257 Self {
258 registry: std::ptr::from_ref(driver.registry()),
259 driver: std::ptr::from_mut(driver),
260 }
261 }
262
263 /// Returns the [`IoHandle`] for the currently running runtime.
264 ///
265 /// Use when you need to register a mio source from inside an async task —
266 /// e.g., when constructing a [`TcpStream`](crate::TcpStream) or
267 /// [`UdpSocket`](crate::UdpSocket). Mirrors `tokio::runtime::Handle::current()`.
268 ///
269 /// # Panics
270 ///
271 /// Panics if called outside a [`Runtime::block_on`](crate::Runtime::block_on)
272 /// context.
273 #[must_use]
274 pub fn current() -> IoHandle {
275 let ptr = crate::context::current_io_ptr();
276 assert!(
277 !ptr.is_null(),
278 "IoHandle::current() called outside Runtime::block_on"
279 );
280 // SAFETY: ptr installed by Runtime::block_on, valid for Runtime lifetime.
281 // Single-threaded executor — no concurrent access.
282 IoHandle::new(unsafe { &mut *ptr })
283 }
284
285 /// Register a mio source with the given interest and waker.
286 ///
287 /// Returns the assigned token for use with `deregister`.
288 pub fn register(
289 &self,
290 source: &mut impl Source,
291 interest: Interest,
292 waker: Waker,
293 ) -> io::Result<Token> {
294 // SAFETY: driver pointer is valid (Runtime lifetime).
295 let driver = unsafe { &mut *self.driver };
296 let token = driver.claim_token(waker);
297 // SAFETY: registry pointer is valid (borrowed from Poll).
298 let registry = unsafe { &*self.registry };
299 if let Err(e) = registry.register(source, token, interest) {
300 // Roll back: release the token so it's not leaked.
301 driver.release_token(token);
302 return Err(e);
303 }
304 Ok(token)
305 }
306
307 /// Update the waker for a token. Called when a stream is polled
308 /// from a different task than the one that registered it
309 /// (e.g., after `into_split`).
310 pub fn set_waker(&self, token: Token, waker: Waker) {
311 // SAFETY: driver pointer valid (Runtime lifetime).
312 let driver = unsafe { &mut *self.driver };
313 driver.set_waker(token, waker);
314 }
315
316 /// Query the readiness state for a token.
317 ///
318 /// Returns the last-known readiness from epoll events. Cleared
319 /// after the task consumes the readiness (calls clear_readable/clear_writable).
320 pub fn readiness(&self, token: Token) -> Readiness {
321 // SAFETY: driver pointer valid (Runtime lifetime).
322 let driver = unsafe { &*self.driver };
323 driver.readiness(token)
324 }
325
326 /// Clear the readable flag for a token. Call after a successful
327 /// read or WouldBlock to wait for the next epoll notification.
328 pub fn clear_readable(&self, token: Token) {
329 // SAFETY: driver pointer valid (Runtime lifetime).
330 let driver = unsafe { &mut *self.driver };
331 driver.clear_readable(token);
332 }
333
334 /// Clear the writable flag for a token.
335 pub fn clear_writable(&self, token: Token) {
336 let driver = unsafe { &mut *self.driver };
337 driver.clear_writable(token);
338 }
339
340 /// Deregister a source and release its token.
341 ///
342 /// # Safety
343 ///
344 /// The driver and registry pointers must be valid (Runtime lifetime).
345 pub unsafe fn deregister(&self, source: &mut impl Source, token: Token) -> io::Result<()> {
346 // SAFETY: caller guarantees pointers are valid.
347 let driver = unsafe { &mut *self.driver };
348 let registry = unsafe { &*self.registry };
349 registry.deregister(source)?;
350 driver.release_token(token);
351 Ok(())
352 }
353}
354
355#[cfg(test)]
356mod tests {
357 use super::*;
358
359 #[test]
360 #[should_panic(expected = "called outside Runtime::block_on")]
361 fn current_panics_outside_runtime() {
362 // Pins the documented panic contract. Most tests exercise
363 // `current()` transitively (via `TcpStream::connect`, etc.)
364 // inside `block_on`; this is the direct contract test.
365 let _ = IoHandle::current();
366 }
367}