Skip to main content

moduvex_runtime/reactor/
source.rs

1//! `IoSource` — RAII wrapper that registers an OS handle with the reactor.
2//!
3//! Constructing an `IoSource` registers the handle; dropping it deregisters.
4//! The `readable()` and `writable()` methods return futures that resolve when
5//! the underlying OS handle becomes ready.
6//!
7//! Waker integration: on each poll the future stores the current waker in the
8//! reactor's `WakerRegistry`. When the reactor fires an event for this token
9//! the stored waker is called, re-scheduling the waiting task.
10
11use std::future::Future;
12use std::io;
13use std::pin::Pin;
14use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
15use std::task::{Context, Poll};
16
17use super::{with_reactor, with_reactor_mut};
18use crate::platform::sys::{Interest, RawSource};
19
20/// Global counter for assigning unique tokens to `IoSource` instances.
21static TOKEN_COUNTER: AtomicUsize = AtomicUsize::new(1);
22
23/// Sentinel tokens reserved by the executor/signal subsystem.
24/// These must never be assigned to user `IoSource` instances.
25pub(crate) const WAKE_TOKEN: usize = usize::MAX;
26pub(crate) const SIGNAL_TOKEN_GUARD: usize = usize::MAX - 1;
27
28/// Allocate the next unique token.
29///
30/// Skips sentinel values (`usize::MAX`, `usize::MAX - 1`) used by the executor
31/// and signal subsystem. In practice the counter can never reach these values
32/// in a single process lifetime (~18 quintillion allocations), but we guard
33/// defensively to prevent undefined behaviour on extremely long-lived processes.
34pub(crate) fn next_token() -> usize {
35    let t = TOKEN_COUNTER.fetch_add(1, Ordering::Relaxed);
36    // Guard against wrapping into sentinel range (theoretical: ~18 quintillion allocations).
37    debug_assert!(
38        t < WAKE_TOKEN - 2,
39        "token counter approaching sentinel values — process has allocated too many IoSources"
40    );
41    t
42}
43
44/// RAII I/O source registered with the thread-local reactor.
45///
46/// The `token` field is used as the unique identifier when submitting kevent /
47/// epoll_ctl calls. Tokens must be unique within a single reactor instance —
48/// this type uses an atomic counter to guarantee uniqueness without caller
49/// coordination.
50pub struct IoSource {
51    /// The raw OS handle (fd on Unix, HANDLE on Windows).
52    raw: RawSource,
53    /// Opaque identifier passed to the reactor for event demultiplexing.
54    token: usize,
55    /// Whether the source is currently registered with the reactor.
56    /// Tracked atomically so Drop can skip deregistration if it never happened.
57    registered: AtomicBool,
58}
59
60impl IoSource {
61    /// Register `raw` with the thread-local reactor under `token`, monitoring
62    /// the given `interest` set.
63    ///
64    /// # Errors
65    /// Propagates any OS error from the underlying `register` syscall.
66    pub fn new(raw: RawSource, token: usize, interest: Interest) -> io::Result<Self> {
67        let source = Self {
68            raw,
69            token,
70            registered: AtomicBool::new(false),
71        };
72        with_reactor(|r| r.register(raw, token, interest))?;
73        source.registered.store(true, Ordering::Release);
74        Ok(source)
75    }
76
77    /// Update the interest mask for an already-registered source.
78    ///
79    /// # Errors
80    /// Returns `io::ErrorKind::NotConnected` if the source was never registered
81    /// (e.g. after a failed `new`), or propagates OS errors from `reregister`.
82    pub fn reregister(&self, interest: Interest) -> io::Result<()> {
83        if !self.registered.load(Ordering::Acquire) {
84            return Err(io::Error::new(
85                io::ErrorKind::NotConnected,
86                "IoSource: reregister called on unregistered source",
87            ));
88        }
89        with_reactor(|r| r.reregister(self.raw, self.token, interest))
90    }
91
92    /// The raw OS handle.
93    #[inline]
94    pub fn raw(&self) -> RawSource {
95        self.raw
96    }
97
98    /// The token that identifies this source in reactor events.
99    #[inline]
100    pub fn token(&self) -> usize {
101        self.token
102    }
103
104    /// Returns a future that resolves once the source is readable.
105    ///
106    /// On each poll the current waker is stored in the reactor's waker registry.
107    /// When the reactor fires a READABLE event for this token, the waker fires
108    /// and the next poll returns `Ready(Ok(()))`.
109    pub fn readable(&self) -> ReadableFuture<'_> {
110        ReadableFuture {
111            source: self,
112            armed: false,
113        }
114    }
115
116    /// Returns a future that resolves once the source is writable.
117    ///
118    /// Same waker integration as `readable()` but for WRITABLE events.
119    pub fn writable(&self) -> WritableFuture<'_> {
120        WritableFuture {
121            source: self,
122            armed: false,
123        }
124    }
125}
126
127impl Drop for IoSource {
128    fn drop(&mut self) {
129        // Only attempt deregistration if we successfully registered.
130        if self.registered.swap(false, Ordering::AcqRel) {
131            // Remove wakers first, then deregister from the platform backend.
132            // Ignore errors — the fd may already be closed by the caller.
133            // Use catch_unwind to avoid panicking if the reactor RefCell is
134            // already borrowed (e.g., IoSource dropped inside a reactor callback).
135            let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
136                let _ = with_reactor_mut(|r| r.deregister_with_token(self.raw, self.token));
137            }));
138        }
139    }
140}
141
142// ── Readiness futures ─────────────────────────────────────────────────────────
143
144/// Future returned by [`IoSource::readable`].
145///
146/// Stores the caller's waker in the reactor's `WakerRegistry` and arms
147/// `READABLE` interest. Resolves to `Ok(())` after the reactor fires the event.
148pub struct ReadableFuture<'a> {
149    source: &'a IoSource,
150    /// Whether we have already armed READABLE interest (avoid redundant syscalls).
151    armed: bool,
152}
153
154impl<'a> Future for ReadableFuture<'a> {
155    type Output = io::Result<()>;
156
157    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
158        // If already armed, this is a re-poll after the reactor woke us — ready.
159        if self.armed {
160            return Poll::Ready(Ok(()));
161        }
162
163        // Store the waker so the reactor can wake us when the fd is readable.
164        with_reactor_mut(|r| {
165            r.wakers
166                .set_read_waker(self.source.token, cx.waker().clone());
167        });
168
169        // Arm READABLE interest on first poll and return Pending.
170        self.armed = true;
171        if let Err(e) = self.source.reregister(Interest::READABLE) {
172            return Poll::Ready(Err(e));
173        }
174
175        Poll::Pending
176    }
177}
178
179/// Future returned by [`IoSource::writable`].
180///
181/// Stores the caller's waker in the reactor's `WakerRegistry` and arms
182/// `WRITABLE` interest. Resolves to `Ok(())` after the reactor fires the event.
183pub struct WritableFuture<'a> {
184    source: &'a IoSource,
185    /// Whether we have already armed WRITABLE interest.
186    armed: bool,
187}
188
189impl<'a> Future for WritableFuture<'a> {
190    type Output = io::Result<()>;
191
192    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
193        // If already armed, this is a re-poll after the reactor woke us — ready.
194        if self.armed {
195            return Poll::Ready(Ok(()));
196        }
197
198        // Store the waker so the reactor can wake us when the fd is writable.
199        with_reactor_mut(|r| {
200            r.wakers
201                .set_write_waker(self.source.token, cx.waker().clone());
202        });
203
204        // Arm WRITABLE interest on first poll and return Pending.
205        self.armed = true;
206        if let Err(e) = self.source.reregister(Interest::WRITABLE) {
207            return Poll::Ready(Err(e));
208        }
209
210        Poll::Pending
211    }
212}
213
214// ── Tests ─────────────────────────────────────────────────────────────────────
215
216#[cfg(test)]
217mod tests {
218    use super::*;
219    use crate::platform::sys::create_pipe;
220
221    #[cfg(unix)]
222    #[test]
223    fn io_source_registers_on_construction() {
224        let (r, w) = create_pipe().unwrap();
225        let src = IoSource::new(r, next_token(), Interest::READABLE).expect("IoSource::new failed");
226        assert!(src.registered.load(Ordering::Acquire));
227        // Drop triggers deregister; close the fds manually after drop.
228        drop(src);
229        // SAFETY: fds are valid and owned by this test; src has been dropped.
230        unsafe { libc::close(r) };
231        unsafe { libc::close(w) };
232    }
233
234    #[cfg(unix)]
235    #[test]
236    fn io_source_deregisters_on_drop() {
237        let (r, w) = create_pipe().unwrap();
238        {
239            let src = IoSource::new(r, next_token(), Interest::READABLE).unwrap();
240            assert!(src.registered.load(Ordering::Acquire));
241            // src drops here → deregister called automatically
242        }
243        // After drop, re-registering the same fd with a new IoSource must
244        // succeed, proving the previous deregister went through.
245        let src2 = IoSource::new(r, next_token(), Interest::READABLE)
246            .expect("re-register after drop failed");
247        drop(src2);
248        // SAFETY: fds are valid and owned by this test; src2 has been dropped.
249        unsafe { libc::close(r) };
250        unsafe { libc::close(w) };
251    }
252
253    #[test]
254    fn next_token_is_unique() {
255        let t1 = next_token();
256        let t2 = next_token();
257        let t3 = next_token();
258        assert!(t1 < t2 && t2 < t3);
259    }
260}