audio_device/runtime/
poll.rs

1use crate::libc as c;
2use crate::loom::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
3use crate::loom::sync::{Arc, Mutex};
4use crate::loom::thread;
5use crate::runtime::atomic_waker::AtomicWaker;
6use crate::unix::errno::Errno;
7use crate::Result;
8use std::collections::HashMap;
9use std::mem;
10
11/// The token associated with the current waiter.
12#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
13#[repr(transparent)]
14pub struct Token(c::c_int);
15
16/// A guard for the returned events of the poll handler.
17///
18/// Dropping this handle will allow the background thread to poll the handler
19/// again.
20///
21/// Constructed by waiting on [AsyncPoll::returned_events].
22pub struct PollEventsGuard<'a> {
23    events: c::c_short,
24    shared: &'a Shared,
25    token: Token,
26}
27
28impl PollEventsGuard<'_> {
29    /// Access the returned events.
30    pub fn events(&self) -> c::c_short {
31        self.events
32    }
33}
34
35impl Drop for PollEventsGuard<'_> {
36    fn drop(&mut self) {
37        self.shared.holders.lock().released.push(self.token);
38
39        if let Err(e) = self.shared.parker.send(1) {
40            log::error!("failed to unpark background thread: {}", e);
41        }
42    }
43}
44
45/// An unsafe asynchronous poller around a `pollfd`.
46///
47/// See [AsyncPoll::new].
48pub struct AsyncPoll {
49    shared: Arc<Shared>,
50    waker: Arc<Waker>,
51}
52
53impl AsyncPoll {
54    /// Construct a new poll handle around the given `pollfd`, registering it
55    /// for interest in receiving events asynchronously.
56    ///
57    /// Dropping the returned handle will unregister interest.
58    ///
59    /// # Panics
60    ///
61    /// Panics unless an audio runtime is available.
62    ///
63    /// See [Runtime][crate::runtime::Runtime].
64    ///
65    /// # Safety
66    ///
67    /// This is unsafe, because the caller must ensure that the provided
68    /// `pollfd` is not closed before this handle is dropped.
69    pub unsafe fn new(descriptor: c::pollfd) -> Result<AsyncPoll, Errno> {
70        crate::runtime::with_poll(|poll| {
71            let waker = Arc::new(Waker {
72                waker: AtomicWaker::new(),
73                descriptor,
74                returned_events: AtomicUsize::new(0),
75            });
76
77            poll.shared.holders.lock().added.push(waker.clone());
78            poll.shared.parker.send(1)?;
79
80            Ok(AsyncPoll {
81                shared: poll.shared.clone(),
82                waker,
83            })
84        })
85    }
86
87    /// Wait for events to be triggered on the background driver and return a
88    /// guard to the events.
89    ///
90    /// Once this guard is dropped the driver will be released to register more
91    /// interest.
92    pub async fn returned_events(&self) -> PollEventsGuard<'_> {
93        use std::future::Future;
94        use std::pin::Pin;
95        use std::task::{Context, Poll};
96
97        return ReturnedEvents(self).await;
98
99        struct ReturnedEvents<'a>(&'a AsyncPoll);
100
101        impl<'a> Future for ReturnedEvents<'a> {
102            type Output = PollEventsGuard<'a>;
103
104            fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
105                self.0.waker.waker.register_by_ref(cx.waker());
106                let returned_events = self.0.waker.returned_events.swap(0, Ordering::Acquire);
107
108                if returned_events != 0 {
109                    Poll::Ready(PollEventsGuard {
110                        events: returned_events as c::c_short,
111                        shared: &*self.0.shared,
112                        token: self.0.waker.token(),
113                    })
114                } else {
115                    Poll::Pending
116                }
117            }
118        }
119    }
120}
121
122impl Drop for AsyncPoll {
123    fn drop(&mut self) {
124        self.shared.holders.lock().removed.push(self.waker.token());
125
126        if let Err(e) = self.shared.parker.send(1) {
127            log::error!("failed to unpark background thread: {}", e);
128        }
129    }
130}
131
132/// Data on the waker for a handle.
133pub(crate) struct Waker {
134    /// The waker to call when waking up the task waiting for events.
135    pub(crate) waker: AtomicWaker,
136    /// The descriptors associated with this waker.
137    descriptor: c::pollfd,
138    /// The last revents decoded. `None` if no events are ready.
139    returned_events: AtomicUsize,
140}
141
142impl Waker {
143    /// Get the token associated with this waker.
144    ///
145    /// Note: always the first file descriptor.
146    fn token(&self) -> Token {
147        Token(self.descriptor.fd)
148    }
149}
150
151pub(crate) struct Shared {
152    pub(crate) running: AtomicBool,
153    pub(crate) holders: Mutex<Events>,
154    pub(crate) parker: EventFd,
155}
156
157#[derive(Default)]
158pub(crate) struct Events {
159    added: Vec<Arc<Waker>>,
160    released: Vec<Token>,
161    removed: Vec<Token>,
162}
163
164impl Events {
165    // Process all queued elements in the driver.
166    fn process(&mut self, driver: &mut Driver, wakers: &mut Vec<Arc<Waker>>) -> Result<()> {
167        let mut added = mem::replace(&mut self.added, Vec::new());
168
169        for waker in added.drain(..) {
170            let loc = Loc {
171                descriptor: driver.descriptors.len(),
172                waker: wakers.len(),
173            };
174
175            driver.locations.insert(waker.token(), loc);
176            driver.descriptors.push(waker.descriptor);
177            wakers.push(waker);
178        }
179
180        let mut removed = mem::replace(&mut self.removed, Vec::new());
181
182        for token in removed.drain(..) {
183            if let Some(loc) = driver.locations.remove(&token) {
184                driver.descriptors.swap_remove(loc.descriptor);
185                wakers.swap_remove(loc.waker);
186
187                // re-organize unless we're removing the last waker.
188                if wakers.len() != loc.waker {
189                    // NB: redirect swap removed.
190                    let token = wakers[loc.waker].token();
191                    driver.locations.insert(token, loc);
192                }
193            }
194        }
195
196        let mut released = mem::replace(&mut self.released, Vec::new());
197
198        for r in released.drain(..) {
199            if let Some(Loc { descriptor, waker }) = driver.locations.get(&r) {
200                driver.descriptors[*descriptor].fd = wakers[*waker].descriptor.fd;
201            }
202        }
203
204        self.added = added;
205        self.removed = removed;
206        self.released = released;
207        Ok(())
208    }
209}
210
211/// An executor to drive things which are woken up by polling.
212pub struct PollDriver {
213    thread: Option<thread::JoinHandle<()>>,
214    shared: Arc<Shared>,
215}
216
217impl PollDriver {
218    /// Construct a new events windows event object driver and return its
219    /// handle.
220    pub fn new() -> Result<Self> {
221        let shared = Arc::new(Shared {
222            running: AtomicBool::new(true),
223            holders: Mutex::new(Events::default()),
224            parker: EventFd::new()?,
225        });
226
227        let thread = thread::spawn({
228            let shared = shared.clone();
229            || Driver::start(shared)
230        });
231
232        let handle = Self {
233            thread: Some(thread),
234            shared,
235        };
236
237        Ok(handle)
238    }
239
240    /// Join the current handle.
241    ///
242    /// # Panics
243    ///
244    /// This panics if the background thread panicked. But this should only ever
245    /// happen if there's a bug.
246    pub fn join(mut self) {
247        self.inner_join();
248    }
249
250    fn inner_join(&mut self) {
251        if let Some(thread) = self.thread.take() {
252            self.shared.running.store(false, Ordering::Release);
253
254            if let Err(errno) = self.shared.parker.send(0) {
255                panic!("failed to set event: {}", errno);
256            }
257
258            if thread.join().is_err() {
259                panic!("event handler thread panicked");
260            }
261        }
262    }
263}
264
265impl Drop for PollDriver {
266    fn drop(&mut self) {
267        let _ = self.inner_join();
268    }
269}
270
271#[derive(Debug, Clone, Copy)]
272struct Loc {
273    descriptor: usize,
274    waker: usize,
275}
276
277struct Driver {
278    /// Location of a given token.
279    locations: HashMap<Token, Loc>,
280    /// The descriptors being driven.
281    descriptors: Vec<libc::pollfd>,
282}
283
284impl Driver {
285    fn run(mut self, guard: &mut PanicGuard) -> Result<()> {
286        while guard.shared.running.load(Ordering::Acquire) {
287            let mut result = unsafe {
288                errno!(libc::poll(
289                    self.descriptors.as_mut_ptr(),
290                    self.descriptors.len() as libc::c_ulong,
291                    -1,
292                ))?
293            };
294
295            let mut notified = false;
296
297            for (n, e) in self.descriptors.iter_mut().enumerate() {
298                if e.revents == 0 {
299                    continue;
300                }
301
302                if result == 0 {
303                    break;
304                }
305
306                result -= 1;
307
308                if n == 0 {
309                    let _ = guard.shared.parker.recv()?;
310                    notified = true;
311                    continue;
312                }
313
314                // Disable file descriptor and wakeup the task to receive the
315                // returned events.
316                e.fd = -1;
317                let waker = &guard.wakers[n - 1];
318                waker
319                    .returned_events
320                    .store(std::mem::take(&mut e.revents) as usize, Ordering::Release);
321                waker.waker.wake();
322            }
323
324            if notified {
325                let mut holders = guard.shared.holders.lock();
326                holders.process(&mut self, &mut guard.wakers)?;
327            }
328        }
329
330        return Ok(());
331    }
332
333    fn start(shared: Arc<Shared>) {
334        let state = Driver {
335            locations: HashMap::new(),
336            descriptors: vec![libc::pollfd {
337                fd: shared.parker.fd,
338                events: libc::POLLIN,
339                revents: 0,
340            }],
341        };
342
343        let mut guard = PanicGuard {
344            shared,
345            wakers: vec![],
346        };
347
348        if let Err(e) = state.run(&mut guard) {
349            panic!("poll thread errored: {}", e)
350        }
351
352        mem::forget(guard);
353    }
354}
355
356/// Wrap a panic guard around self which will release any resources it
357/// has allocated when dropped and mark itself as panicked.
358struct PanicGuard {
359    shared: Arc<Shared>,
360    wakers: Vec<Arc<Waker>>,
361}
362
363impl Drop for PanicGuard {
364    fn drop(&mut self) {
365        self.shared.running.store(false, Ordering::Release);
366
367        // Wake up every waker so that they can observe the panic.
368        for waker in self.wakers.iter() {
369            waker.waker.wake();
370        }
371    }
372}
373
374/// Helper wrapper around an eventfd.
375pub(crate) struct EventFd {
376    fd: c::c_int,
377}
378
379impl EventFd {
380    fn new() -> Result<Self> {
381        unsafe {
382            Ok(Self {
383                fd: errno!(c::eventfd(0, c::EFD_NONBLOCK))?,
384            })
385        }
386    }
387
388    /// Add the given number to the eventfd.
389    fn send(&self, v: u64) -> Result<(), Errno> {
390        unsafe {
391            let n = v.to_ne_bytes();
392            errno!(c::write(self.fd, n.as_ptr() as *const c::c_void, 8))?;
393            Ok(())
394        }
395    }
396
397    /// Read the next value from the eventfd.
398    fn recv(&self) -> Result<u64> {
399        unsafe {
400            let mut bytes = [0u8; 8];
401            let read = errno!(c::read(self.fd, bytes.as_mut_ptr() as *mut c::c_void, 8))?;
402
403            assert!(read == 8);
404            Ok(u64::from_ne_bytes(bytes))
405        }
406    }
407}
408
409impl Drop for EventFd {
410    fn drop(&mut self) {
411        unsafe {
412            let _ = libc::close(self.fd);
413        }
414    }
415}