crossio_kqueue/
kqueue.rs

1use std::{
2    io,
3    mem,
4    os::fd::RawFd,
5    time::Duration,
6};
7
8use crossio_core::{
9    Backend,
10    BackendConfig,
11    CrossioError,
12    Event,
13    Events,
14    Interest,
15    Token,
16};
17
18use libc::{
19    c_void,
20    kevent,
21    kqueue,
22    timespec,
23    EV_ADD,
24    EV_CLEAR,
25    EV_DELETE,
26    EV_ENABLE,
27    EV_EOF,
28    EV_ERROR,
29    EVFILT_READ,
30    EVFILT_WRITE,
31};
32
33/// BSD/macOS kqueue-backed implementation of the `Backend` trait.
34pub struct KqueueBackend {
35    kq: RawFd,
36    config: BackendConfig,
37}
38
39impl KqueueBackend {
40    fn new_kqueue() -> Result<RawFd, CrossioError> {
41        // SAFETY: kqueue takes no parameters and returns a file descriptor on
42        // success or -1 on failure.
43        let fd = unsafe { kqueue() };
44        if fd == -1 {
45            return Err(CrossioError::Io(io::Error::last_os_error()));
46        }
47
48        Ok(fd)
49    }
50
51    fn to_timespec(timeout: Option<Duration>) -> Option<timespec> {
52        timeout.map(|d| timespec {
53            tv_sec: d.as_secs() as libc::time_t,
54            tv_nsec: (d.subsec_nanos()) as libc::c_long,
55        })
56    }
57
58    fn apply_changes(&self, changes: &mut [kevent]) -> Result<(), CrossioError> {
59        if changes.is_empty() {
60            return Ok(());
61        }
62
63        // SAFETY: `changes.as_ptr()` points to `changes.len()` valid kevent
64        // structures. The timeout is NULL, so this call returns immediately
65        // after applying the changelist.
66        let res = unsafe {
67            libc::kevent(
68                self.kq,
69                changes.as_ptr(),
70                changes.len() as i32,
71                std::ptr::null_mut(),
72                0,
73                std::ptr::null(),
74            )
75        };
76
77        if res == -1 {
78            return Err(CrossioError::Io(io::Error::last_os_error()));
79        }
80
81        Ok(())
82    }
83
84    fn interest_to_changes(
85        fd: RawFd,
86        token: Token,
87        interest: Interest,
88        flags: i16,
89        out: &mut Vec<kevent>,
90    ) {
91        let udata = token.as_usize() as *mut c_void;
92
93        unsafe {
94            if interest.contains(Interest::READABLE) {
95                let mut ev: kevent = mem::zeroed();
96                libc::EV_SET(
97                    &mut ev,
98                    fd as libc::uintptr_t,
99                    EVFILT_READ,
100                    flags as u16,
101                    0,
102                    0,
103                    udata,
104                );
105                out.push(ev);
106            }
107
108            if interest.contains(Interest::WRITABLE) {
109                let mut ev: kevent = mem::zeroed();
110                libc::EV_SET(
111                    &mut ev,
112                    fd as libc::uintptr_t,
113                    EVFILT_WRITE,
114                    flags as u16,
115                    0,
116                    0,
117                    udata,
118                );
119                out.push(ev);
120            }
121        }
122    }
123
124    fn kevent_to_event(ev: &kevent) -> Option<Event> {
125        let token = Token::from_usize(ev.udata as usize);
126        let mut interest = Interest::empty();
127
128        if ev.filter == EVFILT_READ {
129            interest |= Interest::READABLE;
130        }
131        if ev.filter == EVFILT_WRITE {
132            interest |= Interest::WRITABLE;
133        }
134
135        if (ev.flags & EV_EOF) != 0 || (ev.flags & EV_ERROR) != 0 {
136            interest |= Interest::CLOSED;
137        }
138
139        if interest.is_empty() {
140            None
141        } else {
142            Some(Event::new(token, interest))
143        }
144    }
145}
146
147impl Drop for KqueueBackend {
148    fn drop(&mut self) {
149        unsafe {
150            let _ = libc::close(self.kq);
151        }
152    }
153}
154
155impl Backend for KqueueBackend {
156    type RawSource = RawFd;
157
158    fn new(config: BackendConfig) -> Result<Self, CrossioError> {
159        let kq = Self::new_kqueue()?;
160        Ok(Self { kq, config })
161    }
162
163    fn register(
164        &self,
165        source: Self::RawSource,
166        token: Token,
167        interest: Interest,
168    ) -> Result<(), CrossioError> {
169        let mut changes = Vec::new();
170        let flags = (EV_ADD | EV_ENABLE | EV_CLEAR) as i16;
171        Self::interest_to_changes(source, token, interest, flags, &mut changes);
172        self.apply_changes(&mut changes)
173    }
174
175    fn reregister(
176        &self,
177        source: Self::RawSource,
178        token: Token,
179        interest: Interest,
180    ) -> Result<(), CrossioError> {
181        let mut changes = Vec::new();
182        let flags = (EV_ADD | EV_ENABLE | EV_CLEAR) as i16;
183        Self::interest_to_changes(source, token, interest, flags, &mut changes);
184        self.apply_changes(&mut changes)
185    }
186
187    fn deregister(&self, source: Self::RawSource) -> Result<(), CrossioError> {
188        let mut changes = Vec::new();
189        let token = Token::from_usize(0);
190        let interest = Interest::readable_writable();
191        let flags = (EV_DELETE) as i16;
192        Self::interest_to_changes(source, token, interest, flags, &mut changes);
193        self.apply_changes(&mut changes)
194    }
195
196    fn poll(
197        &self,
198        events: &mut Events,
199        timeout: Option<Duration>,
200    ) -> Result<usize, CrossioError> {
201        events.clear();
202
203        let capacity = self.config.event_batch_size().max(1);
204        let mut evlist: Vec<kevent> = Vec::with_capacity(capacity);
205
206        let ts_opt = Self::to_timespec(timeout.or(self.config.default_timeout()));
207        let timeout_ptr = ts_opt
208            .as_ref()
209            .map(|ts| ts as *const timespec)
210            .unwrap_or(std::ptr::null());
211
212        // SAFETY: `evlist` has sufficient capacity for `capacity` events. On
213        // success, kevent initializes the first `n` entries.
214        let n = unsafe {
215            libc::kevent(
216                self.kq,
217                std::ptr::null(),
218                0,
219                evlist.as_mut_ptr(),
220                capacity as i32,
221                timeout_ptr,
222            )
223        };
224
225        if n == -1 {
226            return Err(CrossioError::Io(io::Error::last_os_error()));
227        }
228
229        unsafe {
230            evlist.set_len(n as usize);
231        }
232
233        for ev in &evlist {
234            if let Some(e) = Self::kevent_to_event(ev) {
235                events.push(e);
236            }
237        }
238
239        Ok(events.len())
240    }
241}
242
243#[cfg(test)]
244mod tests {
245    use super::KqueueBackend;
246    use crossio_core::{
247        registration::{Registration, Source},
248        BackendConfig,
249        Interest,
250        Reactor,
251        Token,
252    };
253    use std::{
254        os::fd::RawFd,
255        time::Duration,
256    };
257
258    struct FdSource(RawFd);
259
260    impl Source<KqueueBackend> for FdSource {
261        fn raw_source(&self) -> RawFd {
262            self.0
263        }
264    }
265
266    #[test]
267    fn kqueue_reports_readable_pipe() {
268        let mut fds = [0; 2];
269        let res = unsafe { libc::pipe(fds.as_mut_ptr()) };
270        assert_eq!(res, 0, "pipe creation failed");
271
272        let read_fd = fds[0];
273        let write_fd = fds[1];
274
275        let config = BackendConfig::default().with_event_batch_size(16);
276        let mut reactor = Reactor::<KqueueBackend>::with_config(config)
277            .expect("failed to create kqueue reactor");
278
279        let source = FdSource(read_fd);
280        let token = Token::from_usize(1);
281        let registration = Registration::new(token, Interest::READABLE);
282
283        reactor
284            .register(&source, registration)
285            .expect("failed to register pipe with kqueue");
286
287        let buf = [42u8];
288        let written = unsafe {
289            libc::write(write_fd, buf.as_ptr() as *const _, buf.len())
290        };
291        assert_eq!(written, buf.len() as isize, "write to pipe failed");
292
293        let events = reactor
294            .poll(Some(Duration::from_millis(100)))
295            .expect("kqueue poll failed");
296
297        let has_readable = events
298            .iter()
299            .any(|ev| ev.token() == token && ev.readiness().contains(Interest::READABLE));
300
301        assert!(has_readable, "kqueue did not report readable pipe");
302
303        unsafe {
304            libc::close(read_fd);
305            libc::close(write_fd);
306        }
307    }
308}