Skip to main content

moduvex_runtime/reactor/
source.rs

1//! `IoSource` — RAII wrapper that registers an OS handle with the reactor.
2//!
3//! Constructing an `IoSource` registers the handle; dropping it deregisters.
4//! The `readable()` and `writable()` methods return futures that resolve when
5//! the underlying OS handle becomes ready.
6//!
7//! Waker integration: on each poll the future stores the current waker in the
8//! reactor's `WakerRegistry`. When the reactor fires an event for this token
9//! the stored waker is called, re-scheduling the waiting task.
10
11use std::future::Future;
12use std::io;
13use std::pin::Pin;
14use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
15use std::task::{Context, Poll};
16
17use super::{with_reactor, with_reactor_mut};
18use crate::platform::sys::{Interest, RawSource};
19
20/// Global counter for assigning unique tokens to `IoSource` instances.
21static TOKEN_COUNTER: AtomicUsize = AtomicUsize::new(1);
22
23/// Allocate the next unique token. Token 0 is reserved; `usize::MAX` is the
24/// executor's self-pipe sentinel — both are skipped automatically given the
25/// counter starts at 1 and wraps through the full `usize` space in practice.
26pub(crate) fn next_token() -> usize {
27    TOKEN_COUNTER.fetch_add(1, Ordering::Relaxed)
28}
29
30/// RAII I/O source registered with the thread-local reactor.
31///
32/// The `token` field is used as the unique identifier when submitting kevent /
33/// epoll_ctl calls. Tokens must be unique within a single reactor instance —
34/// this type uses an atomic counter to guarantee uniqueness without caller
35/// coordination.
36pub struct IoSource {
37    /// The raw OS handle (fd on Unix, HANDLE on Windows).
38    raw: RawSource,
39    /// Opaque identifier passed to the reactor for event demultiplexing.
40    token: usize,
41    /// Whether the source is currently registered with the reactor.
42    /// Tracked atomically so Drop can skip deregistration if it never happened.
43    registered: AtomicBool,
44}
45
46impl IoSource {
47    /// Register `raw` with the thread-local reactor under `token`, monitoring
48    /// the given `interest` set.
49    ///
50    /// # Errors
51    /// Propagates any OS error from the underlying `register` syscall.
52    pub fn new(raw: RawSource, token: usize, interest: Interest) -> io::Result<Self> {
53        let source = Self {
54            raw,
55            token,
56            registered: AtomicBool::new(false),
57        };
58        with_reactor(|r| r.register(raw, token, interest))?;
59        source.registered.store(true, Ordering::Release);
60        Ok(source)
61    }
62
63    /// Update the interest mask for an already-registered source.
64    ///
65    /// # Errors
66    /// Returns `io::ErrorKind::NotConnected` if the source was never registered
67    /// (e.g. after a failed `new`), or propagates OS errors from `reregister`.
68    pub fn reregister(&self, interest: Interest) -> io::Result<()> {
69        if !self.registered.load(Ordering::Acquire) {
70            return Err(io::Error::new(
71                io::ErrorKind::NotConnected,
72                "IoSource: reregister called on unregistered source",
73            ));
74        }
75        with_reactor(|r| r.reregister(self.raw, self.token, interest))
76    }
77
78    /// The raw OS handle.
79    #[inline]
80    pub fn raw(&self) -> RawSource {
81        self.raw
82    }
83
84    /// The token that identifies this source in reactor events.
85    #[inline]
86    pub fn token(&self) -> usize {
87        self.token
88    }
89
90    /// Returns a future that resolves once the source is readable.
91    ///
92    /// On each poll the current waker is stored in the reactor's waker registry.
93    /// When the reactor fires a READABLE event for this token, the waker fires
94    /// and the next poll returns `Ready(Ok(()))`.
95    pub fn readable(&self) -> ReadableFuture<'_> {
96        ReadableFuture {
97            source: self,
98            armed: false,
99        }
100    }
101
102    /// Returns a future that resolves once the source is writable.
103    ///
104    /// Same waker integration as `readable()` but for WRITABLE events.
105    pub fn writable(&self) -> WritableFuture<'_> {
106        WritableFuture {
107            source: self,
108            armed: false,
109        }
110    }
111}
112
113impl Drop for IoSource {
114    fn drop(&mut self) {
115        // Only attempt deregistration if we successfully registered.
116        if self.registered.swap(false, Ordering::AcqRel) {
117            // Remove wakers first, then deregister from the platform backend.
118            // Ignore errors — the fd may already be closed by the caller.
119            // Use catch_unwind to avoid panicking if the reactor RefCell is
120            // already borrowed (e.g., IoSource dropped inside a reactor callback).
121            let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
122                let _ = with_reactor_mut(|r| r.deregister_with_token(self.raw, self.token));
123            }));
124        }
125    }
126}
127
128// ── Readiness futures ─────────────────────────────────────────────────────────
129
130/// Future returned by [`IoSource::readable`].
131///
132/// Stores the caller's waker in the reactor's `WakerRegistry` and arms
133/// `READABLE` interest. Resolves to `Ok(())` after the reactor fires the event.
134pub struct ReadableFuture<'a> {
135    source: &'a IoSource,
136    /// Whether we have already armed READABLE interest (avoid redundant syscalls).
137    armed: bool,
138}
139
140impl<'a> Future for ReadableFuture<'a> {
141    type Output = io::Result<()>;
142
143    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
144        // If already armed, this is a re-poll after the reactor woke us — ready.
145        if self.armed {
146            return Poll::Ready(Ok(()));
147        }
148
149        // Store the waker so the reactor can wake us when the fd is readable.
150        with_reactor_mut(|r| {
151            r.wakers
152                .set_read_waker(self.source.token, cx.waker().clone());
153        });
154
155        // Arm READABLE interest on first poll and return Pending.
156        self.armed = true;
157        if let Err(e) = self.source.reregister(Interest::READABLE) {
158            return Poll::Ready(Err(e));
159        }
160
161        Poll::Pending
162    }
163}
164
165/// Future returned by [`IoSource::writable`].
166///
167/// Stores the caller's waker in the reactor's `WakerRegistry` and arms
168/// `WRITABLE` interest. Resolves to `Ok(())` after the reactor fires the event.
169pub struct WritableFuture<'a> {
170    source: &'a IoSource,
171    /// Whether we have already armed WRITABLE interest.
172    armed: bool,
173}
174
175impl<'a> Future for WritableFuture<'a> {
176    type Output = io::Result<()>;
177
178    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
179        // If already armed, this is a re-poll after the reactor woke us — ready.
180        if self.armed {
181            return Poll::Ready(Ok(()));
182        }
183
184        // Store the waker so the reactor can wake us when the fd is writable.
185        with_reactor_mut(|r| {
186            r.wakers
187                .set_write_waker(self.source.token, cx.waker().clone());
188        });
189
190        // Arm WRITABLE interest on first poll and return Pending.
191        self.armed = true;
192        if let Err(e) = self.source.reregister(Interest::WRITABLE) {
193            return Poll::Ready(Err(e));
194        }
195
196        Poll::Pending
197    }
198}
199
200// ── Tests ─────────────────────────────────────────────────────────────────────
201
202#[cfg(test)]
203mod tests {
204    use super::*;
205    use crate::platform::sys::create_pipe;
206
207    #[cfg(unix)]
208    #[test]
209    fn io_source_registers_on_construction() {
210        let (r, w) = create_pipe().unwrap();
211        let src = IoSource::new(r, next_token(), Interest::READABLE).expect("IoSource::new failed");
212        assert!(src.registered.load(Ordering::Acquire));
213        // Drop triggers deregister; close the fds manually after drop.
214        drop(src);
215        // SAFETY: fds are valid and owned by this test; src has been dropped.
216        unsafe { libc::close(r) };
217        unsafe { libc::close(w) };
218    }
219
220    #[cfg(unix)]
221    #[test]
222    fn io_source_deregisters_on_drop() {
223        let (r, w) = create_pipe().unwrap();
224        {
225            let src = IoSource::new(r, next_token(), Interest::READABLE).unwrap();
226            assert!(src.registered.load(Ordering::Acquire));
227            // src drops here → deregister called automatically
228        }
229        // After drop, re-registering the same fd with a new IoSource must
230        // succeed, proving the previous deregister went through.
231        let src2 = IoSource::new(r, next_token(), Interest::READABLE)
232            .expect("re-register after drop failed");
233        drop(src2);
234        // SAFETY: fds are valid and owned by this test; src2 has been dropped.
235        unsafe { libc::close(r) };
236        unsafe { libc::close(w) };
237    }
238
239    #[test]
240    fn next_token_is_unique() {
241        let t1 = next_token();
242        let t2 = next_token();
243        let t3 = next_token();
244        assert!(t1 < t2 && t2 < t3);
245    }
246}