moduvex_runtime/reactor/source.rs
1//! `IoSource` — RAII wrapper that registers an OS handle with the reactor.
2//!
3//! Constructing an `IoSource` registers the handle; dropping it deregisters.
4//! The `readable()` and `writable()` methods return futures that resolve when
5//! the underlying OS handle becomes ready.
6//!
7//! Waker integration: on each poll the future stores the current waker in the
8//! reactor's `WakerRegistry`. When the reactor fires an event for this token
9//! the stored waker is called, re-scheduling the waiting task.
10
11use std::future::Future;
12use std::io;
13use std::pin::Pin;
14use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
15use std::task::{Context, Poll};
16
17use super::{with_reactor, with_reactor_mut};
18use crate::platform::sys::{Interest, RawSource};
19
20/// Global counter for assigning unique tokens to `IoSource` instances.
21static TOKEN_COUNTER: AtomicUsize = AtomicUsize::new(1);
22
23/// Sentinel tokens reserved by the executor/signal subsystem.
24/// These must never be assigned to user `IoSource` instances.
25pub(crate) const WAKE_TOKEN: usize = usize::MAX;
26pub(crate) const SIGNAL_TOKEN_GUARD: usize = usize::MAX - 1;
27
28/// Allocate the next unique token.
29///
30/// Skips sentinel values (`usize::MAX`, `usize::MAX - 1`) used by the executor
31/// and signal subsystem. In practice the counter can never reach these values
32/// in a single process lifetime (~18 quintillion allocations), but we guard
33/// defensively to prevent undefined behaviour on extremely long-lived processes.
34pub(crate) fn next_token() -> usize {
35 let t = TOKEN_COUNTER.fetch_add(1, Ordering::Relaxed);
36 // Guard against wrapping into sentinel range (theoretical: ~18 quintillion allocations).
37 debug_assert!(
38 t < WAKE_TOKEN - 2,
39 "token counter approaching sentinel values — process has allocated too many IoSources"
40 );
41 t
42}
43
44/// RAII I/O source registered with the thread-local reactor.
45///
46/// The `token` field is used as the unique identifier when submitting kevent /
47/// epoll_ctl calls. Tokens must be unique within a single reactor instance —
48/// this type uses an atomic counter to guarantee uniqueness without caller
49/// coordination.
50pub struct IoSource {
51 /// The raw OS handle (fd on Unix, HANDLE on Windows).
52 raw: RawSource,
53 /// Opaque identifier passed to the reactor for event demultiplexing.
54 token: usize,
55 /// Whether the source is currently registered with the reactor.
56 /// Tracked atomically so Drop can skip deregistration if it never happened.
57 registered: AtomicBool,
58}
59
60impl IoSource {
61 /// Register `raw` with the thread-local reactor under `token`, monitoring
62 /// the given `interest` set.
63 ///
64 /// # Errors
65 /// Propagates any OS error from the underlying `register` syscall.
66 pub fn new(raw: RawSource, token: usize, interest: Interest) -> io::Result<Self> {
67 let source = Self {
68 raw,
69 token,
70 registered: AtomicBool::new(false),
71 };
72 with_reactor(|r| r.register(raw, token, interest))?;
73 source.registered.store(true, Ordering::Release);
74 Ok(source)
75 }
76
77 /// Update the interest mask for an already-registered source.
78 ///
79 /// # Errors
80 /// Returns `io::ErrorKind::NotConnected` if the source was never registered
81 /// (e.g. after a failed `new`), or propagates OS errors from `reregister`.
82 pub fn reregister(&self, interest: Interest) -> io::Result<()> {
83 if !self.registered.load(Ordering::Acquire) {
84 return Err(io::Error::new(
85 io::ErrorKind::NotConnected,
86 "IoSource: reregister called on unregistered source",
87 ));
88 }
89 with_reactor(|r| r.reregister(self.raw, self.token, interest))
90 }
91
92 /// The raw OS handle.
93 #[inline]
94 pub fn raw(&self) -> RawSource {
95 self.raw
96 }
97
98 /// The token that identifies this source in reactor events.
99 #[inline]
100 pub fn token(&self) -> usize {
101 self.token
102 }
103
104 /// Returns a future that resolves once the source is readable.
105 ///
106 /// On each poll the current waker is stored in the reactor's waker registry.
107 /// When the reactor fires a READABLE event for this token, the waker fires
108 /// and the next poll returns `Ready(Ok(()))`.
109 pub fn readable(&self) -> ReadableFuture<'_> {
110 ReadableFuture {
111 source: self,
112 armed: false,
113 }
114 }
115
116 /// Returns a future that resolves once the source is writable.
117 ///
118 /// Same waker integration as `readable()` but for WRITABLE events.
119 pub fn writable(&self) -> WritableFuture<'_> {
120 WritableFuture {
121 source: self,
122 armed: false,
123 }
124 }
125}
126
127impl Drop for IoSource {
128 fn drop(&mut self) {
129 // Only attempt deregistration if we successfully registered.
130 if self.registered.swap(false, Ordering::AcqRel) {
131 // Remove wakers first, then deregister from the platform backend.
132 // Ignore errors — the fd may already be closed by the caller.
133 // Use catch_unwind to avoid panicking if the reactor RefCell is
134 // already borrowed (e.g., IoSource dropped inside a reactor callback).
135 let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
136 let _ = with_reactor_mut(|r| r.deregister_with_token(self.raw, self.token));
137 }));
138 }
139 }
140}
141
142// ── Readiness futures ─────────────────────────────────────────────────────────
143
144/// Future returned by [`IoSource::readable`].
145///
146/// Stores the caller's waker in the reactor's `WakerRegistry` and arms
147/// `READABLE` interest. Resolves to `Ok(())` after the reactor fires the event.
148pub struct ReadableFuture<'a> {
149 source: &'a IoSource,
150 /// Whether we have already armed READABLE interest (avoid redundant syscalls).
151 armed: bool,
152}
153
154impl<'a> Future for ReadableFuture<'a> {
155 type Output = io::Result<()>;
156
157 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
158 // If already armed, this is a re-poll after the reactor woke us — ready.
159 if self.armed {
160 return Poll::Ready(Ok(()));
161 }
162
163 // Store the waker so the reactor can wake us when the fd is readable.
164 with_reactor_mut(|r| {
165 r.wakers
166 .set_read_waker(self.source.token, cx.waker().clone());
167 });
168
169 // Arm READABLE interest on first poll and return Pending.
170 self.armed = true;
171 if let Err(e) = self.source.reregister(Interest::READABLE) {
172 return Poll::Ready(Err(e));
173 }
174
175 Poll::Pending
176 }
177}
178
179/// Future returned by [`IoSource::writable`].
180///
181/// Stores the caller's waker in the reactor's `WakerRegistry` and arms
182/// `WRITABLE` interest. Resolves to `Ok(())` after the reactor fires the event.
183pub struct WritableFuture<'a> {
184 source: &'a IoSource,
185 /// Whether we have already armed WRITABLE interest.
186 armed: bool,
187}
188
189impl<'a> Future for WritableFuture<'a> {
190 type Output = io::Result<()>;
191
192 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
193 // If already armed, this is a re-poll after the reactor woke us — ready.
194 if self.armed {
195 return Poll::Ready(Ok(()));
196 }
197
198 // Store the waker so the reactor can wake us when the fd is writable.
199 with_reactor_mut(|r| {
200 r.wakers
201 .set_write_waker(self.source.token, cx.waker().clone());
202 });
203
204 // Arm WRITABLE interest on first poll and return Pending.
205 self.armed = true;
206 if let Err(e) = self.source.reregister(Interest::WRITABLE) {
207 return Poll::Ready(Err(e));
208 }
209
210 Poll::Pending
211 }
212}
213
214// ── Tests ─────────────────────────────────────────────────────────────────────
215
216#[cfg(test)]
217mod tests {
218 use super::*;
219 use crate::platform::sys::create_pipe;
220
221 #[cfg(unix)]
222 #[test]
223 fn io_source_registers_on_construction() {
224 let (r, w) = create_pipe().unwrap();
225 let src = IoSource::new(r, next_token(), Interest::READABLE).expect("IoSource::new failed");
226 assert!(src.registered.load(Ordering::Acquire));
227 // Drop triggers deregister; close the fds manually after drop.
228 drop(src);
229 // SAFETY: fds are valid and owned by this test; src has been dropped.
230 unsafe { libc::close(r) };
231 unsafe { libc::close(w) };
232 }
233
234 #[cfg(unix)]
235 #[test]
236 fn io_source_deregisters_on_drop() {
237 let (r, w) = create_pipe().unwrap();
238 {
239 let src = IoSource::new(r, next_token(), Interest::READABLE).unwrap();
240 assert!(src.registered.load(Ordering::Acquire));
241 // src drops here → deregister called automatically
242 }
243 // After drop, re-registering the same fd with a new IoSource must
244 // succeed, proving the previous deregister went through.
245 let src2 = IoSource::new(r, next_token(), Interest::READABLE)
246 .expect("re-register after drop failed");
247 drop(src2);
248 // SAFETY: fds are valid and owned by this test; src2 has been dropped.
249 unsafe { libc::close(r) };
250 unsafe { libc::close(w) };
251 }
252
253 #[test]
254 fn next_token_is_unique() {
255 let t1 = next_token();
256 let t2 = next_token();
257 let t3 = next_token();
258 assert!(t1 < t2 && t2 < t3);
259 }
260}