1use std::{
2 io,
3 mem,
4 os::fd::RawFd,
5 time::Duration,
6};
7
8use crossio_core::{
9 Backend,
10 BackendConfig,
11 CrossioError,
12 Event,
13 Events,
14 Interest,
15 Token,
16};
17
18use libc::{
19 c_void,
20 kevent,
21 kqueue,
22 timespec,
23 EV_ADD,
24 EV_CLEAR,
25 EV_DELETE,
26 EV_ENABLE,
27 EV_EOF,
28 EV_ERROR,
29 EVFILT_READ,
30 EVFILT_WRITE,
31};
32
33pub struct KqueueBackend {
35 kq: RawFd,
36 config: BackendConfig,
37}
38
39impl KqueueBackend {
40 fn new_kqueue() -> Result<RawFd, CrossioError> {
41 let fd = unsafe { kqueue() };
44 if fd == -1 {
45 return Err(CrossioError::Io(io::Error::last_os_error()));
46 }
47
48 Ok(fd)
49 }
50
51 fn to_timespec(timeout: Option<Duration>) -> Option<timespec> {
52 timeout.map(|d| timespec {
53 tv_sec: d.as_secs() as libc::time_t,
54 tv_nsec: (d.subsec_nanos()) as libc::c_long,
55 })
56 }
57
58 fn apply_changes(&self, changes: &mut [kevent]) -> Result<(), CrossioError> {
59 if changes.is_empty() {
60 return Ok(());
61 }
62
63 let res = unsafe {
67 libc::kevent(
68 self.kq,
69 changes.as_ptr(),
70 changes.len() as i32,
71 std::ptr::null_mut(),
72 0,
73 std::ptr::null(),
74 )
75 };
76
77 if res == -1 {
78 return Err(CrossioError::Io(io::Error::last_os_error()));
79 }
80
81 Ok(())
82 }
83
84 fn interest_to_changes(
85 fd: RawFd,
86 token: Token,
87 interest: Interest,
88 flags: i16,
89 out: &mut Vec<kevent>,
90 ) {
91 let udata = token.as_usize() as *mut c_void;
92
93 unsafe {
94 if interest.contains(Interest::READABLE) {
95 let mut ev: kevent = mem::zeroed();
96 libc::EV_SET(
97 &mut ev,
98 fd as libc::uintptr_t,
99 EVFILT_READ,
100 flags as u16,
101 0,
102 0,
103 udata,
104 );
105 out.push(ev);
106 }
107
108 if interest.contains(Interest::WRITABLE) {
109 let mut ev: kevent = mem::zeroed();
110 libc::EV_SET(
111 &mut ev,
112 fd as libc::uintptr_t,
113 EVFILT_WRITE,
114 flags as u16,
115 0,
116 0,
117 udata,
118 );
119 out.push(ev);
120 }
121 }
122 }
123
124 fn kevent_to_event(ev: &kevent) -> Option<Event> {
125 let token = Token::from_usize(ev.udata as usize);
126 let mut interest = Interest::empty();
127
128 if ev.filter == EVFILT_READ {
129 interest |= Interest::READABLE;
130 }
131 if ev.filter == EVFILT_WRITE {
132 interest |= Interest::WRITABLE;
133 }
134
135 if (ev.flags & EV_EOF) != 0 || (ev.flags & EV_ERROR) != 0 {
136 interest |= Interest::CLOSED;
137 }
138
139 if interest.is_empty() {
140 None
141 } else {
142 Some(Event::new(token, interest))
143 }
144 }
145}
146
147impl Drop for KqueueBackend {
148 fn drop(&mut self) {
149 unsafe {
150 let _ = libc::close(self.kq);
151 }
152 }
153}
154
155impl Backend for KqueueBackend {
156 type RawSource = RawFd;
157
158 fn new(config: BackendConfig) -> Result<Self, CrossioError> {
159 let kq = Self::new_kqueue()?;
160 Ok(Self { kq, config })
161 }
162
163 fn register(
164 &self,
165 source: Self::RawSource,
166 token: Token,
167 interest: Interest,
168 ) -> Result<(), CrossioError> {
169 let mut changes = Vec::new();
170 let flags = (EV_ADD | EV_ENABLE | EV_CLEAR) as i16;
171 Self::interest_to_changes(source, token, interest, flags, &mut changes);
172 self.apply_changes(&mut changes)
173 }
174
175 fn reregister(
176 &self,
177 source: Self::RawSource,
178 token: Token,
179 interest: Interest,
180 ) -> Result<(), CrossioError> {
181 let mut changes = Vec::new();
182 let flags = (EV_ADD | EV_ENABLE | EV_CLEAR) as i16;
183 Self::interest_to_changes(source, token, interest, flags, &mut changes);
184 self.apply_changes(&mut changes)
185 }
186
187 fn deregister(&self, source: Self::RawSource) -> Result<(), CrossioError> {
188 let mut changes = Vec::new();
189 let token = Token::from_usize(0);
190 let interest = Interest::readable_writable();
191 let flags = (EV_DELETE) as i16;
192 Self::interest_to_changes(source, token, interest, flags, &mut changes);
193 self.apply_changes(&mut changes)
194 }
195
196 fn poll(
197 &self,
198 events: &mut Events,
199 timeout: Option<Duration>,
200 ) -> Result<usize, CrossioError> {
201 events.clear();
202
203 let capacity = self.config.event_batch_size().max(1);
204 let mut evlist: Vec<kevent> = Vec::with_capacity(capacity);
205
206 let ts_opt = Self::to_timespec(timeout.or(self.config.default_timeout()));
207 let timeout_ptr = ts_opt
208 .as_ref()
209 .map(|ts| ts as *const timespec)
210 .unwrap_or(std::ptr::null());
211
212 let n = unsafe {
215 libc::kevent(
216 self.kq,
217 std::ptr::null(),
218 0,
219 evlist.as_mut_ptr(),
220 capacity as i32,
221 timeout_ptr,
222 )
223 };
224
225 if n == -1 {
226 return Err(CrossioError::Io(io::Error::last_os_error()));
227 }
228
229 unsafe {
230 evlist.set_len(n as usize);
231 }
232
233 for ev in &evlist {
234 if let Some(e) = Self::kevent_to_event(ev) {
235 events.push(e);
236 }
237 }
238
239 Ok(events.len())
240 }
241}
242
243#[cfg(test)]
244mod tests {
245 use super::KqueueBackend;
246 use crossio_core::{
247 registration::{Registration, Source},
248 BackendConfig,
249 Interest,
250 Reactor,
251 Token,
252 };
253 use std::{
254 os::fd::RawFd,
255 time::Duration,
256 };
257
258 struct FdSource(RawFd);
259
260 impl Source<KqueueBackend> for FdSource {
261 fn raw_source(&self) -> RawFd {
262 self.0
263 }
264 }
265
266 #[test]
267 fn kqueue_reports_readable_pipe() {
268 let mut fds = [0; 2];
269 let res = unsafe { libc::pipe(fds.as_mut_ptr()) };
270 assert_eq!(res, 0, "pipe creation failed");
271
272 let read_fd = fds[0];
273 let write_fd = fds[1];
274
275 let config = BackendConfig::default().with_event_batch_size(16);
276 let mut reactor = Reactor::<KqueueBackend>::with_config(config)
277 .expect("failed to create kqueue reactor");
278
279 let source = FdSource(read_fd);
280 let token = Token::from_usize(1);
281 let registration = Registration::new(token, Interest::READABLE);
282
283 reactor
284 .register(&source, registration)
285 .expect("failed to register pipe with kqueue");
286
287 let buf = [42u8];
288 let written = unsafe {
289 libc::write(write_fd, buf.as_ptr() as *const _, buf.len())
290 };
291 assert_eq!(written, buf.len() as isize, "write to pipe failed");
292
293 let events = reactor
294 .poll(Some(Duration::from_millis(100)))
295 .expect("kqueue poll failed");
296
297 let has_readable = events
298 .iter()
299 .any(|ev| ev.token() == token && ev.readiness().contains(Interest::READABLE));
300
301 assert!(has_readable, "kqueue did not report readable pipe");
302
303 unsafe {
304 libc::close(read_fd);
305 libc::close(write_fd);
306 }
307 }
308}