1use once_cell::sync::Lazy;
10use std::{
11 collections::HashMap,
12 fs::File,
13 io::{self, Read, Write},
14 os::unix::io::{AsRawFd, FromRawFd, RawFd},
15 sync::{
16 atomic::{AtomicBool, Ordering},
17 Arc, Mutex,
18 },
19 task::Waker,
20};
21
22type ReactorFds = Arc<Mutex<HashMap<RawFd, (Interest, Arc<AtomicBool>, Waker)>>>;
23
24bitflags::bitflags! {
25 pub struct Interest: i16 {
27 const READ = libc::POLLIN;
29
30 const WRITE = libc::POLLOUT;
32
33 const BOTH = libc::POLLIN | libc::POLLOUT;
35 }
36}
37
38pub struct Handle {
40 fds: ReactorFds,
42
43 interrupt: File,
45}
46
47impl Handle {
48 pub fn register(
50 &self,
51 fd: RawFd,
52 interest: Interest,
53 completed: Arc<AtomicBool>,
54 waker: Waker,
55 ) {
56 let mut lock = self.fds.lock().unwrap();
57 lock.insert(fd, (interest, completed, waker));
58 let _ = self.interrupt.try_clone().unwrap().write_all(b"0");
59 }
60
61 pub fn unregister(&self, fd: RawFd) {
63 let mut lock = self.fds.lock().unwrap();
64 lock.remove(&fd);
65 let _ = self.interrupt.try_clone().unwrap().write_all(b"0");
66 }
67}
68
69pub static REACTOR: Lazy<Handle> = Lazy::new(|| {
71 let (mut reader, writer) = create_pipe();
73
74 let fds: ReactorFds = Arc::default();
75 let fds_ = fds.clone();
76
77 std::thread::spawn(move || {
78 let fds = fds_;
79 let mut pollers = Vec::new();
80 let mut buffer = [0u8; 1];
81
82 pollers.push(libc::pollfd {
83 fd: reader.as_raw_fd(),
84 events: libc::POLLIN,
85 revents: 0,
86 });
87
88 loop {
89 let returned = unsafe {
90 let pollers: &mut [libc::pollfd] = &mut pollers;
91 libc::poll(
92 pollers as *mut _ as *mut libc::pollfd,
93 pollers.len() as u64,
94 -1,
95 )
96 };
97
98 if returned == -1 {
99 panic!(
100 "fatal error in process reactor: {}",
101 io::Error::last_os_error()
102 );
103 } else if returned < 1 {
104 continue;
105 }
106
107 let lock = fds.lock().unwrap();
108 if pollers[0].revents == libc::POLLIN {
109 let _ = reader.read(&mut buffer);
110 } else {
111 pollers[1..]
112 .iter()
113 .filter(|event| event.revents != 0)
114 .for_each(|event| {
115 if let Some(value) = lock.get(&event.fd) {
116 if value
117 .0
118 .contains(Interest::from_bits_truncate(event.revents))
119 {
120 value.1.store(true, Ordering::SeqCst);
121 value.2.wake_by_ref();
122 }
123 }
124 })
125 }
126
127 pollers.clear();
128
129 pollers.push(libc::pollfd {
130 fd: reader.as_raw_fd(),
131 events: libc::POLLIN,
132 revents: 0,
133 });
134
135 for (&fd, &(interest, _, _)) in lock.iter() {
136 pollers.push(libc::pollfd {
137 fd,
138 events: interest.bits(),
139 revents: 0,
140 });
141 }
142 }
143 });
144
145 Handle {
146 fds,
147 interrupt: writer,
148 }
149});
150
151fn create_pipe() -> (File, File) {
152 let mut fds = [0; 2];
153 unsafe { libc::pipe(&mut fds as *mut _ as *mut libc::c_int) };
154 let reader = unsafe { File::from_raw_fd(fds[0]) };
155 let writer = unsafe { File::from_raw_fd(fds[1]) };
156 (reader, writer)
157}