1use crate::libc as c;
2use crate::loom::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
3use crate::loom::sync::{Arc, Mutex};
4use crate::loom::thread;
5use crate::runtime::atomic_waker::AtomicWaker;
6use crate::unix::errno::Errno;
7use crate::Result;
8use std::collections::HashMap;
9use std::mem;
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
13#[repr(transparent)]
14pub struct Token(c::c_int);
15
16pub struct PollEventsGuard<'a> {
23 events: c::c_short,
24 shared: &'a Shared,
25 token: Token,
26}
27
28impl PollEventsGuard<'_> {
29 pub fn events(&self) -> c::c_short {
31 self.events
32 }
33}
34
35impl Drop for PollEventsGuard<'_> {
36 fn drop(&mut self) {
37 self.shared.holders.lock().released.push(self.token);
38
39 if let Err(e) = self.shared.parker.send(1) {
40 log::error!("failed to unpark background thread: {}", e);
41 }
42 }
43}
44
45pub struct AsyncPoll {
49 shared: Arc<Shared>,
50 waker: Arc<Waker>,
51}
52
53impl AsyncPoll {
54 pub unsafe fn new(descriptor: c::pollfd) -> Result<AsyncPoll, Errno> {
70 crate::runtime::with_poll(|poll| {
71 let waker = Arc::new(Waker {
72 waker: AtomicWaker::new(),
73 descriptor,
74 returned_events: AtomicUsize::new(0),
75 });
76
77 poll.shared.holders.lock().added.push(waker.clone());
78 poll.shared.parker.send(1)?;
79
80 Ok(AsyncPoll {
81 shared: poll.shared.clone(),
82 waker,
83 })
84 })
85 }
86
87 pub async fn returned_events(&self) -> PollEventsGuard<'_> {
93 use std::future::Future;
94 use std::pin::Pin;
95 use std::task::{Context, Poll};
96
97 return ReturnedEvents(self).await;
98
99 struct ReturnedEvents<'a>(&'a AsyncPoll);
100
101 impl<'a> Future for ReturnedEvents<'a> {
102 type Output = PollEventsGuard<'a>;
103
104 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
105 self.0.waker.waker.register_by_ref(cx.waker());
106 let returned_events = self.0.waker.returned_events.swap(0, Ordering::Acquire);
107
108 if returned_events != 0 {
109 Poll::Ready(PollEventsGuard {
110 events: returned_events as c::c_short,
111 shared: &*self.0.shared,
112 token: self.0.waker.token(),
113 })
114 } else {
115 Poll::Pending
116 }
117 }
118 }
119 }
120}
121
122impl Drop for AsyncPoll {
123 fn drop(&mut self) {
124 self.shared.holders.lock().removed.push(self.waker.token());
125
126 if let Err(e) = self.shared.parker.send(1) {
127 log::error!("failed to unpark background thread: {}", e);
128 }
129 }
130}
131
132pub(crate) struct Waker {
134 pub(crate) waker: AtomicWaker,
136 descriptor: c::pollfd,
138 returned_events: AtomicUsize,
140}
141
142impl Waker {
143 fn token(&self) -> Token {
147 Token(self.descriptor.fd)
148 }
149}
150
151pub(crate) struct Shared {
152 pub(crate) running: AtomicBool,
153 pub(crate) holders: Mutex<Events>,
154 pub(crate) parker: EventFd,
155}
156
157#[derive(Default)]
158pub(crate) struct Events {
159 added: Vec<Arc<Waker>>,
160 released: Vec<Token>,
161 removed: Vec<Token>,
162}
163
164impl Events {
165 fn process(&mut self, driver: &mut Driver, wakers: &mut Vec<Arc<Waker>>) -> Result<()> {
167 let mut added = mem::replace(&mut self.added, Vec::new());
168
169 for waker in added.drain(..) {
170 let loc = Loc {
171 descriptor: driver.descriptors.len(),
172 waker: wakers.len(),
173 };
174
175 driver.locations.insert(waker.token(), loc);
176 driver.descriptors.push(waker.descriptor);
177 wakers.push(waker);
178 }
179
180 let mut removed = mem::replace(&mut self.removed, Vec::new());
181
182 for token in removed.drain(..) {
183 if let Some(loc) = driver.locations.remove(&token) {
184 driver.descriptors.swap_remove(loc.descriptor);
185 wakers.swap_remove(loc.waker);
186
187 if wakers.len() != loc.waker {
189 let token = wakers[loc.waker].token();
191 driver.locations.insert(token, loc);
192 }
193 }
194 }
195
196 let mut released = mem::replace(&mut self.released, Vec::new());
197
198 for r in released.drain(..) {
199 if let Some(Loc { descriptor, waker }) = driver.locations.get(&r) {
200 driver.descriptors[*descriptor].fd = wakers[*waker].descriptor.fd;
201 }
202 }
203
204 self.added = added;
205 self.removed = removed;
206 self.released = released;
207 Ok(())
208 }
209}
210
211pub struct PollDriver {
213 thread: Option<thread::JoinHandle<()>>,
214 shared: Arc<Shared>,
215}
216
217impl PollDriver {
218 pub fn new() -> Result<Self> {
221 let shared = Arc::new(Shared {
222 running: AtomicBool::new(true),
223 holders: Mutex::new(Events::default()),
224 parker: EventFd::new()?,
225 });
226
227 let thread = thread::spawn({
228 let shared = shared.clone();
229 || Driver::start(shared)
230 });
231
232 let handle = Self {
233 thread: Some(thread),
234 shared,
235 };
236
237 Ok(handle)
238 }
239
240 pub fn join(mut self) {
247 self.inner_join();
248 }
249
250 fn inner_join(&mut self) {
251 if let Some(thread) = self.thread.take() {
252 self.shared.running.store(false, Ordering::Release);
253
254 if let Err(errno) = self.shared.parker.send(0) {
255 panic!("failed to set event: {}", errno);
256 }
257
258 if thread.join().is_err() {
259 panic!("event handler thread panicked");
260 }
261 }
262 }
263}
264
265impl Drop for PollDriver {
266 fn drop(&mut self) {
267 let _ = self.inner_join();
268 }
269}
270
271#[derive(Debug, Clone, Copy)]
272struct Loc {
273 descriptor: usize,
274 waker: usize,
275}
276
277struct Driver {
278 locations: HashMap<Token, Loc>,
280 descriptors: Vec<libc::pollfd>,
282}
283
284impl Driver {
285 fn run(mut self, guard: &mut PanicGuard) -> Result<()> {
286 while guard.shared.running.load(Ordering::Acquire) {
287 let mut result = unsafe {
288 errno!(libc::poll(
289 self.descriptors.as_mut_ptr(),
290 self.descriptors.len() as libc::c_ulong,
291 -1,
292 ))?
293 };
294
295 let mut notified = false;
296
297 for (n, e) in self.descriptors.iter_mut().enumerate() {
298 if e.revents == 0 {
299 continue;
300 }
301
302 if result == 0 {
303 break;
304 }
305
306 result -= 1;
307
308 if n == 0 {
309 let _ = guard.shared.parker.recv()?;
310 notified = true;
311 continue;
312 }
313
314 e.fd = -1;
317 let waker = &guard.wakers[n - 1];
318 waker
319 .returned_events
320 .store(std::mem::take(&mut e.revents) as usize, Ordering::Release);
321 waker.waker.wake();
322 }
323
324 if notified {
325 let mut holders = guard.shared.holders.lock();
326 holders.process(&mut self, &mut guard.wakers)?;
327 }
328 }
329
330 return Ok(());
331 }
332
333 fn start(shared: Arc<Shared>) {
334 let state = Driver {
335 locations: HashMap::new(),
336 descriptors: vec![libc::pollfd {
337 fd: shared.parker.fd,
338 events: libc::POLLIN,
339 revents: 0,
340 }],
341 };
342
343 let mut guard = PanicGuard {
344 shared,
345 wakers: vec![],
346 };
347
348 if let Err(e) = state.run(&mut guard) {
349 panic!("poll thread errored: {}", e)
350 }
351
352 mem::forget(guard);
353 }
354}
355
356struct PanicGuard {
359 shared: Arc<Shared>,
360 wakers: Vec<Arc<Waker>>,
361}
362
363impl Drop for PanicGuard {
364 fn drop(&mut self) {
365 self.shared.running.store(false, Ordering::Release);
366
367 for waker in self.wakers.iter() {
369 waker.waker.wake();
370 }
371 }
372}
373
374pub(crate) struct EventFd {
376 fd: c::c_int,
377}
378
379impl EventFd {
380 fn new() -> Result<Self> {
381 unsafe {
382 Ok(Self {
383 fd: errno!(c::eventfd(0, c::EFD_NONBLOCK))?,
384 })
385 }
386 }
387
388 fn send(&self, v: u64) -> Result<(), Errno> {
390 unsafe {
391 let n = v.to_ne_bytes();
392 errno!(c::write(self.fd, n.as_ptr() as *const c::c_void, 8))?;
393 Ok(())
394 }
395 }
396
397 fn recv(&self) -> Result<u64> {
399 unsafe {
400 let mut bytes = [0u8; 8];
401 let read = errno!(c::read(self.fd, bytes.as_mut_ptr() as *mut c::c_void, 8))?;
402
403 assert!(read == 8);
404 Ok(u64::from_ne_bytes(bytes))
405 }
406 }
407}
408
409impl Drop for EventFd {
410 fn drop(&mut self) {
411 unsafe {
412 let _ = libc::close(self.fd);
413 }
414 }
415}