moduvex_runtime/reactor/source.rs
1//! `IoSource` — RAII wrapper that registers an OS handle with the reactor.
2//!
3//! Constructing an `IoSource` registers the handle; dropping it deregisters.
4//! The `readable()` and `writable()` methods return futures that resolve when
5//! the underlying OS handle becomes ready.
6//!
7//! Waker integration: on each poll the future stores the current waker in the
8//! reactor's `WakerRegistry`. When the reactor fires an event for this token
9//! the stored waker is called, re-scheduling the waiting task.
10
11use std::future::Future;
12use std::io;
13use std::pin::Pin;
14use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
15use std::task::{Context, Poll};
16
17use super::{with_reactor, with_reactor_mut};
18use crate::platform::sys::{Interest, RawSource};
19
20/// Global counter for assigning unique tokens to `IoSource` instances.
21static TOKEN_COUNTER: AtomicUsize = AtomicUsize::new(1);
22
23/// Allocate the next unique token. Token 0 is reserved; `usize::MAX` is the
24/// executor's self-pipe sentinel — both are skipped automatically given the
25/// counter starts at 1 and wraps through the full `usize` space in practice.
26pub(crate) fn next_token() -> usize {
27 TOKEN_COUNTER.fetch_add(1, Ordering::Relaxed)
28}
29
30/// RAII I/O source registered with the thread-local reactor.
31///
32/// The `token` field is used as the unique identifier when submitting kevent /
33/// epoll_ctl calls. Tokens must be unique within a single reactor instance —
34/// this type uses an atomic counter to guarantee uniqueness without caller
35/// coordination.
36pub struct IoSource {
37 /// The raw OS handle (fd on Unix, HANDLE on Windows).
38 raw: RawSource,
39 /// Opaque identifier passed to the reactor for event demultiplexing.
40 token: usize,
41 /// Whether the source is currently registered with the reactor.
42 /// Tracked atomically so Drop can skip deregistration if it never happened.
43 registered: AtomicBool,
44}
45
46impl IoSource {
47 /// Register `raw` with the thread-local reactor under `token`, monitoring
48 /// the given `interest` set.
49 ///
50 /// # Errors
51 /// Propagates any OS error from the underlying `register` syscall.
52 pub fn new(raw: RawSource, token: usize, interest: Interest) -> io::Result<Self> {
53 let source = Self {
54 raw,
55 token,
56 registered: AtomicBool::new(false),
57 };
58 with_reactor(|r| r.register(raw, token, interest))?;
59 source.registered.store(true, Ordering::Release);
60 Ok(source)
61 }
62
63 /// Update the interest mask for an already-registered source.
64 ///
65 /// # Errors
66 /// Returns `io::ErrorKind::NotConnected` if the source was never registered
67 /// (e.g. after a failed `new`), or propagates OS errors from `reregister`.
68 pub fn reregister(&self, interest: Interest) -> io::Result<()> {
69 if !self.registered.load(Ordering::Acquire) {
70 return Err(io::Error::new(
71 io::ErrorKind::NotConnected,
72 "IoSource: reregister called on unregistered source",
73 ));
74 }
75 with_reactor(|r| r.reregister(self.raw, self.token, interest))
76 }
77
78 /// The raw OS handle.
79 #[inline]
80 pub fn raw(&self) -> RawSource {
81 self.raw
82 }
83
84 /// The token that identifies this source in reactor events.
85 #[inline]
86 pub fn token(&self) -> usize {
87 self.token
88 }
89
90 /// Returns a future that resolves once the source is readable.
91 ///
92 /// On each poll the current waker is stored in the reactor's waker registry.
93 /// When the reactor fires a READABLE event for this token, the waker fires
94 /// and the next poll returns `Ready(Ok(()))`.
95 pub fn readable(&self) -> ReadableFuture<'_> {
96 ReadableFuture {
97 source: self,
98 armed: false,
99 }
100 }
101
102 /// Returns a future that resolves once the source is writable.
103 ///
104 /// Same waker integration as `readable()` but for WRITABLE events.
105 pub fn writable(&self) -> WritableFuture<'_> {
106 WritableFuture {
107 source: self,
108 armed: false,
109 }
110 }
111}
112
113impl Drop for IoSource {
114 fn drop(&mut self) {
115 // Only attempt deregistration if we successfully registered.
116 if self.registered.swap(false, Ordering::AcqRel) {
117 // Remove wakers first, then deregister from the platform backend.
118 // Ignore errors — the fd may already be closed by the caller.
119 // Use catch_unwind to avoid panicking if the reactor RefCell is
120 // already borrowed (e.g., IoSource dropped inside a reactor callback).
121 let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
122 let _ = with_reactor_mut(|r| r.deregister_with_token(self.raw, self.token));
123 }));
124 }
125 }
126}
127
128// ── Readiness futures ─────────────────────────────────────────────────────────
129
130/// Future returned by [`IoSource::readable`].
131///
132/// Stores the caller's waker in the reactor's `WakerRegistry` and arms
133/// `READABLE` interest. Resolves to `Ok(())` after the reactor fires the event.
134pub struct ReadableFuture<'a> {
135 source: &'a IoSource,
136 /// Whether we have already armed READABLE interest (avoid redundant syscalls).
137 armed: bool,
138}
139
140impl<'a> Future for ReadableFuture<'a> {
141 type Output = io::Result<()>;
142
143 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
144 // If already armed, this is a re-poll after the reactor woke us — ready.
145 if self.armed {
146 return Poll::Ready(Ok(()));
147 }
148
149 // Store the waker so the reactor can wake us when the fd is readable.
150 with_reactor_mut(|r| {
151 r.wakers
152 .set_read_waker(self.source.token, cx.waker().clone());
153 });
154
155 // Arm READABLE interest on first poll and return Pending.
156 self.armed = true;
157 if let Err(e) = self.source.reregister(Interest::READABLE) {
158 return Poll::Ready(Err(e));
159 }
160
161 Poll::Pending
162 }
163}
164
165/// Future returned by [`IoSource::writable`].
166///
167/// Stores the caller's waker in the reactor's `WakerRegistry` and arms
168/// `WRITABLE` interest. Resolves to `Ok(())` after the reactor fires the event.
169pub struct WritableFuture<'a> {
170 source: &'a IoSource,
171 /// Whether we have already armed WRITABLE interest.
172 armed: bool,
173}
174
175impl<'a> Future for WritableFuture<'a> {
176 type Output = io::Result<()>;
177
178 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
179 // If already armed, this is a re-poll after the reactor woke us — ready.
180 if self.armed {
181 return Poll::Ready(Ok(()));
182 }
183
184 // Store the waker so the reactor can wake us when the fd is writable.
185 with_reactor_mut(|r| {
186 r.wakers
187 .set_write_waker(self.source.token, cx.waker().clone());
188 });
189
190 // Arm WRITABLE interest on first poll and return Pending.
191 self.armed = true;
192 if let Err(e) = self.source.reregister(Interest::WRITABLE) {
193 return Poll::Ready(Err(e));
194 }
195
196 Poll::Pending
197 }
198}
199
200// ── Tests ─────────────────────────────────────────────────────────────────────
201
202#[cfg(test)]
203mod tests {
204 use super::*;
205 use crate::platform::sys::create_pipe;
206
207 #[cfg(unix)]
208 #[test]
209 fn io_source_registers_on_construction() {
210 let (r, w) = create_pipe().unwrap();
211 let src = IoSource::new(r, next_token(), Interest::READABLE).expect("IoSource::new failed");
212 assert!(src.registered.load(Ordering::Acquire));
213 // Drop triggers deregister; close the fds manually after drop.
214 drop(src);
215 // SAFETY: fds are valid and owned by this test; src has been dropped.
216 unsafe { libc::close(r) };
217 unsafe { libc::close(w) };
218 }
219
220 #[cfg(unix)]
221 #[test]
222 fn io_source_deregisters_on_drop() {
223 let (r, w) = create_pipe().unwrap();
224 {
225 let src = IoSource::new(r, next_token(), Interest::READABLE).unwrap();
226 assert!(src.registered.load(Ordering::Acquire));
227 // src drops here → deregister called automatically
228 }
229 // After drop, re-registering the same fd with a new IoSource must
230 // succeed, proving the previous deregister went through.
231 let src2 = IoSource::new(r, next_token(), Interest::READABLE)
232 .expect("re-register after drop failed");
233 drop(src2);
234 // SAFETY: fds are valid and owned by this test; src2 has been dropped.
235 unsafe { libc::close(r) };
236 unsafe { libc::close(w) };
237 }
238
239 #[test]
240 fn next_token_is_unique() {
241 let t1 = next_token();
242 let t2 = next_token();
243 let t3 = next_token();
244 assert!(t1 < t2 && t2 < t3);
245 }
246}