audio_device/runtime/
events.rs

1use crate::loom::sync::atomic::{AtomicBool, Ordering};
2use crate::loom::sync::{Arc, Mutex};
3use crate::loom::thread;
4use crate::runtime::atomic_waker::AtomicWaker;
5use crate::windows::{Event, RawEvent};
6use crate::Result;
7use std::io;
8use std::mem;
9use windows_sys::Windows::Win32::SystemServices as ss;
10use windows_sys::Windows::Win32::WindowsProgramming as wp;
11
12/// Data on the waker for a handle.
13struct Waker {
14    ready: AtomicBool,
15    waker: AtomicWaker,
16    handle: ss::HANDLE,
17}
18
19struct Shared {
20    running: AtomicBool,
21    holders: Mutex<Holders>,
22    parker: Event,
23}
24
25#[derive(Default)]
26struct Holders {
27    added: Vec<Arc<Waker>>,
28    removed: Vec<Event>,
29}
30
31/// An executor to drive things which are woken up by [windows event
32/// objects].
33///
34/// This is necessary to use in combination with [AsyncEvent].
35///
36/// [windows event objects]:
37/// https://docs.microsoft.com/en-us/windows/win32/sync/event-objects
38pub struct EventsDriver {
39    thread: Option<thread::JoinHandle<()>>,
40    shared: Arc<Shared>,
41}
42
43impl EventsDriver {
44    /// Construct a new events windows event object driver and return its
45    /// handle.
46    pub fn new() -> Result<Self> {
47        let shared = Arc::new(Shared {
48            running: AtomicBool::new(true),
49            holders: Mutex::new(Holders::default()),
50            parker: Event::new(false, false)?,
51        });
52
53        let thread = thread::spawn({
54            let shared = shared.clone();
55            || Driver::start(shared)
56        });
57
58        let handle = Self {
59            thread: Some(thread),
60            shared,
61        };
62
63        Ok(handle)
64    }
65
66    /// Join the current handle.
67    ///
68    /// # Panics
69    ///
70    /// This panics if the background thread panicked. But this should only ever
71    /// happen if there's a bug.
72    pub fn join(mut self) {
73        self.inner_join();
74    }
75
76    fn inner_join(&mut self) {
77        if let Some(thread) = self.thread.take() {
78            self.shared.running.store(false, Ordering::Release);
79            self.shared.parker.set();
80
81            if thread.join().is_err() {
82                panic!("event handler thread panicked");
83            }
84        }
85    }
86}
87
88impl Drop for EventsDriver {
89    fn drop(&mut self) {
90        let _ = self.inner_join();
91    }
92}
93
94struct Driver {
95    events: Vec<ss::HANDLE>,
96    wakers: Vec<Arc<Waker>>,
97    shared: Arc<Shared>,
98}
99
100impl Driver {
101    fn run(mut self) {
102        let guard = PanicGuard {
103            shared: &*self.shared,
104            wakers: &mut self.wakers,
105        };
106
107        while self.shared.running.load(Ordering::Acquire) {
108            let result = unsafe {
109                ss::WaitForMultipleObjects(
110                    self.events.len() as u32,
111                    self.events.as_ptr(),
112                    ss::FALSE,
113                    wp::INFINITE,
114                )
115            };
116
117            match result {
118                ss::WAIT_RETURN_CAUSE::WAIT_ABANDONED_0 => panic!("wait abandoned"),
119                ss::WAIT_RETURN_CAUSE::WAIT_TIMEOUT => panic!("timed out"),
120                ss::WAIT_RETURN_CAUSE::WAIT_FAILED => {
121                    panic!("wait failed: {}", io::Error::last_os_error())
122                }
123                other => {
124                    let base = ss::WAIT_RETURN_CAUSE::WAIT_OBJECT_0.0;
125                    let other = other.0;
126
127                    if other < base {
128                        panic!("other out of bounds; other = {}", other);
129                    }
130
131                    let index = (other - base) as usize;
132
133                    if !(index < self.events.len()) {
134                        panic!("wakeup out of bounds; index = {}", index);
135                    }
136
137                    // NB: index 0 is the wakeup to notify once things are
138                    // added, any other is a legit registered event.
139                    if index > 0 {
140                        if let Some(waker) = guard.wakers.get(index - 1) {
141                            waker.ready.store(true, Ordering::Release);
142                            waker.waker.wake();
143                        }
144
145                        continue;
146                    }
147                }
148            }
149
150            let mut holders = self.shared.holders.lock();
151            let mut added = mem::replace(&mut holders.added, Vec::new());
152
153            for waker in added.drain(..) {
154                self.events.push(waker.handle);
155                guard.wakers.push(waker);
156            }
157
158            holders.added = added;
159
160            let mut removed = mem::replace(&mut holders.removed, Vec::new());
161
162            for event in removed.drain(..) {
163                let removed = unsafe { event.raw_event().0 };
164
165                if let Some(index) = guard.wakers.iter().position(|w| w.handle.0 == removed) {
166                    guard.wakers.swap_remove(index);
167                    self.events.swap_remove(index + 1);
168                }
169            }
170
171            holders.removed = removed;
172        }
173
174        mem::forget(guard);
175
176        /// Wrap a panic guard around self which will release any resources it
177        /// has allocated when dropped and mark itself as panicked.
178        struct PanicGuard<'a> {
179            shared: &'a Shared,
180            wakers: &'a mut Vec<Arc<Waker>>,
181        }
182
183        impl Drop for PanicGuard<'_> {
184            fn drop(&mut self) {
185                self.shared.running.store(false, Ordering::Release);
186
187                // Wake up every waker so that they can observe the panic.
188                for waker in self.wakers.iter() {
189                    waker.waker.wake();
190                }
191            }
192        }
193    }
194
195    fn start(shared: Arc<Shared>) {
196        let state = Driver {
197            events: vec![unsafe { shared.parker.raw_event() }],
198            wakers: vec![],
199            shared,
200        };
201
202        state.run()
203    }
204}
205
206/// An asynchronous variant of [Event].
207///
208/// See [AsyncEvent::new].
209pub struct AsyncEvent {
210    shared: Arc<Shared>,
211    waker: Arc<Waker>,
212    event: Option<Event>,
213}
214
215impl AsyncEvent {
216    /// Construct an asynchronous event associated with the current handle. The
217    /// constructed event has the initial state specified by `initial_state`.
218    ///
219    /// # Panics
220    ///
221    /// Panics unless an audio runtime is available.
222    ///
223    /// See [Runtime][crate::runtime::Runtime].
224    pub fn new(initial_state: bool) -> windows::Result<AsyncEvent> {
225        crate::runtime::with_events(|events| {
226            let event = Event::new(false, initial_state)?;
227            let handle = unsafe { event.raw_event() };
228
229            let waker = Arc::new(Waker {
230                ready: AtomicBool::new(false),
231                waker: AtomicWaker::new(),
232                handle,
233            });
234
235            events.shared.holders.lock().added.push(waker.clone());
236            events.shared.parker.set();
237
238            Ok(AsyncEvent {
239                shared: events.shared.clone(),
240                waker,
241                event: Some(event),
242            })
243        })
244    }
245
246    /// Wait for the specified event handle to become set.
247    pub async fn wait(&self) {
248        use std::future::Future;
249        use std::pin::Pin;
250        use std::task::{Context, Poll};
251
252        return WaitFor {
253            shared: &*self.shared,
254            waker: &*self.waker,
255        }
256        .await;
257
258        struct WaitFor<'a> {
259            shared: &'a Shared,
260            waker: &'a Waker,
261        }
262
263        impl Future for WaitFor<'_> {
264            type Output = ();
265
266            fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
267                if !self.shared.running.load(Ordering::Acquire) {
268                    panic!("background thread panicked");
269                }
270
271                if self.waker.ready.load(Ordering::Acquire) {
272                    return Poll::Ready(());
273                }
274
275                self.waker.waker.register_by_ref(cx.waker());
276                Poll::Pending
277            }
278        }
279    }
280
281    /// Set the current event handle.
282    pub fn set(&self) {
283        self.event.as_ref().unwrap().set();
284    }
285}
286
287impl RawEvent for AsyncEvent {
288    unsafe fn raw_event(&self) -> ss::HANDLE {
289        self.event.as_ref().unwrap().raw_event()
290    }
291}
292
293impl Drop for AsyncEvent {
294    fn drop(&mut self) {
295        let event = self.event.take().unwrap();
296        self.shared.holders.lock().removed.push(event);
297        self.shared.parker.set();
298    }
299}
300
301unsafe impl Send for AsyncEvent {}
302unsafe impl Sync for AsyncEvent {}