openentropy_core/sources/ipc/
kqueue_events.rs1#[cfg(any(
4 target_os = "macos",
5 target_os = "freebsd",
6 target_os = "netbsd",
7 target_os = "openbsd"
8))]
9use std::sync::Arc;
10#[cfg(any(
11 target_os = "macos",
12 target_os = "freebsd",
13 target_os = "netbsd",
14 target_os = "openbsd"
15))]
16use std::sync::atomic::{AtomicBool, Ordering};
17#[cfg(any(
18 target_os = "macos",
19 target_os = "freebsd",
20 target_os = "netbsd",
21 target_os = "openbsd"
22))]
23use std::thread;
24
25use crate::source::{EntropySource, Platform, SourceCategory, SourceInfo};
26#[cfg(any(
27 target_os = "macos",
28 target_os = "freebsd",
29 target_os = "netbsd",
30 target_os = "openbsd"
31))]
32use crate::sources::helpers::extract_timing_entropy;
33#[cfg(any(
34 target_os = "macos",
35 target_os = "freebsd",
36 target_os = "netbsd",
37 target_os = "openbsd"
38))]
39use crate::sources::helpers::mach_time;
40
41#[derive(Debug, Clone)]
54pub struct KqueueEventsConfig {
55 pub num_file_watchers: usize,
63
64 pub num_timers: usize,
72
73 pub num_sockets: usize,
81
82 pub timeout_ms: u32,
90}
91
92impl Default for KqueueEventsConfig {
93 fn default() -> Self {
94 Self {
95 num_file_watchers: 4,
96 num_timers: 8,
97 num_sockets: 4,
98 timeout_ms: 1,
99 }
100 }
101}
102
103#[derive(Default)]
135pub struct KqueueEventsSource {
136 pub config: KqueueEventsConfig,
138}
139
140static KQUEUE_EVENTS_INFO: SourceInfo = SourceInfo {
141 name: "kqueue_events",
142 description: "Kqueue event multiplexing timing from timers, files, and sockets",
143 physics: "Registers diverse kqueue event types (timers, file watchers, socket monitors) \
144 and measures kevent() notification timing. Timer events capture kernel timer \
145 coalescing and interrupt jitter. File watchers exercise VFS/APFS notification \
146 paths. Socket events capture mbuf allocator timing. Multiple simultaneous watchers \
147 create knote lock contention and dispatch queue interference. The combination of \
148 independent event sources produces high min-entropy.",
149 category: SourceCategory::IPC,
150 platform: Platform::Any, requirements: &[],
152 entropy_rate_estimate: 2.5,
153 composite: false,
154 is_fast: true,
155};
156
157impl EntropySource for KqueueEventsSource {
158 fn info(&self) -> &SourceInfo {
159 &KQUEUE_EVENTS_INFO
160 }
161
162 fn is_available(&self) -> bool {
163 cfg!(any(
164 target_os = "macos",
165 target_os = "freebsd",
166 target_os = "netbsd",
167 target_os = "openbsd"
168 ))
169 }
170
171 #[cfg(any(
172 target_os = "macos",
173 target_os = "freebsd",
174 target_os = "netbsd",
175 target_os = "openbsd"
176 ))]
177 fn collect(&self, n_samples: usize) -> Vec<u8> {
178 let raw_count = n_samples + 64;
183 let mut timings: Vec<u64> = Vec::with_capacity(raw_count);
184
185 let kq = unsafe { libc::kqueue() };
187 if kq < 0 {
188 return Vec::new();
189 }
190
191 let mut changes: Vec<libc::kevent> = Vec::new();
192 let mut cleanup_fds: Vec<i32> = Vec::new();
193
194 for i in 0..self.config.num_timers {
196 let interval_ms = 1 + (i % 10);
197 let mut ev: libc::kevent = unsafe { std::mem::zeroed() };
198 ev.ident = i;
199 ev.filter = libc::EVFILT_TIMER;
200 ev.flags = libc::EV_ADD | libc::EV_ENABLE;
201 ev.fflags = 0;
202 ev.data = interval_ms as isize;
203 ev.udata = std::ptr::null_mut();
204 changes.push(ev);
205 }
206
207 for _i in 0..self.config.num_sockets {
209 let mut sv: [i32; 2] = [0; 2];
210 let ret =
211 unsafe { libc::socketpair(libc::AF_UNIX, libc::SOCK_STREAM, 0, sv.as_mut_ptr()) };
212 if ret == 0 {
213 cleanup_fds.push(sv[0]);
214 cleanup_fds.push(sv[1]);
215
216 let mut ev: libc::kevent = unsafe { std::mem::zeroed() };
217 ev.ident = sv[0] as usize;
218 ev.filter = libc::EVFILT_READ;
219 ev.flags = libc::EV_ADD | libc::EV_ENABLE;
220 ev.udata = std::ptr::null_mut();
221 changes.push(ev);
222
223 let byte = [0xAAu8];
224 unsafe {
225 libc::write(sv[1], byte.as_ptr() as *const _, 1);
226 }
227
228 let mut ev2: libc::kevent = unsafe { std::mem::zeroed() };
229 ev2.ident = sv[1] as usize;
230 ev2.filter = libc::EVFILT_WRITE;
231 ev2.flags = libc::EV_ADD | libc::EV_ENABLE;
232 ev2.udata = std::ptr::null_mut();
233 changes.push(ev2);
234 }
235 }
236
237 let mut temp_files: Vec<(i32, std::path::PathBuf)> = Vec::new();
239 for i in 0..self.config.num_file_watchers {
240 let path = std::env::temp_dir().join(format!("oe_kq_{i}_{}", std::process::id()));
241 if std::fs::write(&path, b"entropy").is_ok() {
242 let path_str = path.to_str().unwrap_or("");
243 let c_path = match std::ffi::CString::new(path_str) {
244 Ok(c) => c,
245 Err(_) => continue, };
247 let fd = unsafe { libc::open(c_path.as_ptr(), libc::O_RDONLY) };
248 if fd >= 0 {
249 let mut ev: libc::kevent = unsafe { std::mem::zeroed() };
250 ev.ident = fd as usize;
251 ev.filter = libc::EVFILT_VNODE;
252 ev.flags = libc::EV_ADD | libc::EV_ENABLE | libc::EV_CLEAR;
253 ev.fflags = libc::NOTE_WRITE | libc::NOTE_ATTRIB;
254 ev.udata = std::ptr::null_mut();
255 changes.push(ev);
256 temp_files.push((fd, path));
257 }
258 }
259 }
260
261 if !changes.is_empty() {
263 unsafe {
264 libc::kevent(
265 kq,
266 changes.as_ptr(),
267 changes.len() as i32,
268 std::ptr::null_mut(),
269 0,
270 std::ptr::null(),
271 );
272 }
273 }
274
275 let stop = Arc::new(AtomicBool::new(false));
277 let stop2 = stop.clone();
278 let socket_write_fds: Vec<i32> = cleanup_fds.iter().skip(1).step_by(2).copied().collect();
279 let file_paths: Vec<std::path::PathBuf> =
280 temp_files.iter().map(|(_, p)| p.clone()).collect();
281
282 let poker = thread::spawn(move || {
283 let byte = [0xBBu8];
284 while !stop2.load(Ordering::Relaxed) {
285 for &fd in &socket_write_fds {
286 unsafe {
287 libc::write(fd, byte.as_ptr() as *const _, 1);
288 }
289 }
290 for path in &file_paths {
291 let _ = std::fs::write(path, b"poke");
292 }
293 std::thread::sleep(std::time::Duration::from_micros(500));
294 }
295 });
296
297 let timeout = libc::timespec {
299 tv_sec: 0,
300 tv_nsec: self.config.timeout_ms as i64 * 1_000_000,
301 };
302 let mut events: Vec<libc::kevent> =
303 vec![unsafe { std::mem::zeroed() }; changes.len().max(16)];
304
305 let socket_read_fds: Vec<i32> = cleanup_fds.iter().step_by(2).copied().collect();
306 let deadline = std::time::Instant::now() + std::time::Duration::from_secs(4);
307
308 for iter in 0..raw_count {
309 if iter % 64 == 0 && std::time::Instant::now() >= deadline {
310 break;
311 }
312 let t0 = mach_time();
313
314 let n = unsafe {
315 libc::kevent(
316 kq,
317 std::ptr::null(),
318 0,
319 events.as_mut_ptr(),
320 events.len() as i32,
321 &timeout,
322 )
323 };
324
325 let t1 = mach_time();
326
327 if n > 0 {
329 let mut drain = [0u8; 64];
330 for &fd in &socket_read_fds {
331 unsafe {
332 libc::read(fd, drain.as_mut_ptr() as *mut _, drain.len());
333 }
334 }
335 }
336
337 timings.push(t1.wrapping_sub(t0));
338 }
339
340 stop.store(true, Ordering::Relaxed);
342 let _ = poker.join();
343
344 for (fd, path) in &temp_files {
346 unsafe {
347 libc::close(*fd);
348 }
349 let _ = std::fs::remove_file(path);
350 }
351 for &fd in &cleanup_fds {
352 unsafe {
353 libc::close(fd);
354 }
355 }
356 unsafe {
357 libc::close(kq);
358 }
359
360 extract_timing_entropy(&timings, n_samples)
361 }
362
363 #[cfg(not(any(
364 target_os = "macos",
365 target_os = "freebsd",
366 target_os = "netbsd",
367 target_os = "openbsd"
368 )))]
369 fn collect(&self, _n_samples: usize) -> Vec<u8> {
370 Vec::new()
371 }
372}
373
374#[cfg(test)]
375mod tests {
376 use super::*;
377
378 #[test]
379 fn info() {
380 let src = KqueueEventsSource::default();
381 assert_eq!(src.name(), "kqueue_events");
382 assert_eq!(src.info().category, SourceCategory::IPC);
383 assert!(!src.info().composite);
384 }
385
386 #[test]
387 fn default_config() {
388 let config = KqueueEventsConfig::default();
389 assert_eq!(config.num_file_watchers, 4);
390 assert_eq!(config.num_timers, 8);
391 assert_eq!(config.num_sockets, 4);
392 assert_eq!(config.timeout_ms, 1);
393 }
394
395 #[test]
396 fn custom_config() {
397 let src = KqueueEventsSource {
398 config: KqueueEventsConfig {
399 num_file_watchers: 2,
400 num_timers: 4,
401 num_sockets: 2,
402 timeout_ms: 5,
403 },
404 };
405 assert_eq!(src.config.num_timers, 4);
406 }
407
408 #[test]
409 #[ignore] fn collects_bytes() {
411 let src = KqueueEventsSource::default();
412 if src.is_available() {
413 let data = src.collect(64);
414 assert!(!data.is_empty());
415 assert!(data.len() <= 64);
416 }
417 }
418}