audio_device/runtime/
events.rs1use crate::loom::sync::atomic::{AtomicBool, Ordering};
2use crate::loom::sync::{Arc, Mutex};
3use crate::loom::thread;
4use crate::runtime::atomic_waker::AtomicWaker;
5use crate::windows::{Event, RawEvent};
6use crate::Result;
7use std::io;
8use std::mem;
9use windows_sys::Windows::Win32::SystemServices as ss;
10use windows_sys::Windows::Win32::WindowsProgramming as wp;
11
12struct Waker {
14 ready: AtomicBool,
15 waker: AtomicWaker,
16 handle: ss::HANDLE,
17}
18
19struct Shared {
20 running: AtomicBool,
21 holders: Mutex<Holders>,
22 parker: Event,
23}
24
25#[derive(Default)]
26struct Holders {
27 added: Vec<Arc<Waker>>,
28 removed: Vec<Event>,
29}
30
31pub struct EventsDriver {
39 thread: Option<thread::JoinHandle<()>>,
40 shared: Arc<Shared>,
41}
42
43impl EventsDriver {
44 pub fn new() -> Result<Self> {
47 let shared = Arc::new(Shared {
48 running: AtomicBool::new(true),
49 holders: Mutex::new(Holders::default()),
50 parker: Event::new(false, false)?,
51 });
52
53 let thread = thread::spawn({
54 let shared = shared.clone();
55 || Driver::start(shared)
56 });
57
58 let handle = Self {
59 thread: Some(thread),
60 shared,
61 };
62
63 Ok(handle)
64 }
65
66 pub fn join(mut self) {
73 self.inner_join();
74 }
75
76 fn inner_join(&mut self) {
77 if let Some(thread) = self.thread.take() {
78 self.shared.running.store(false, Ordering::Release);
79 self.shared.parker.set();
80
81 if thread.join().is_err() {
82 panic!("event handler thread panicked");
83 }
84 }
85 }
86}
87
88impl Drop for EventsDriver {
89 fn drop(&mut self) {
90 let _ = self.inner_join();
91 }
92}
93
94struct Driver {
95 events: Vec<ss::HANDLE>,
96 wakers: Vec<Arc<Waker>>,
97 shared: Arc<Shared>,
98}
99
100impl Driver {
101 fn run(mut self) {
102 let guard = PanicGuard {
103 shared: &*self.shared,
104 wakers: &mut self.wakers,
105 };
106
107 while self.shared.running.load(Ordering::Acquire) {
108 let result = unsafe {
109 ss::WaitForMultipleObjects(
110 self.events.len() as u32,
111 self.events.as_ptr(),
112 ss::FALSE,
113 wp::INFINITE,
114 )
115 };
116
117 match result {
118 ss::WAIT_RETURN_CAUSE::WAIT_ABANDONED_0 => panic!("wait abandoned"),
119 ss::WAIT_RETURN_CAUSE::WAIT_TIMEOUT => panic!("timed out"),
120 ss::WAIT_RETURN_CAUSE::WAIT_FAILED => {
121 panic!("wait failed: {}", io::Error::last_os_error())
122 }
123 other => {
124 let base = ss::WAIT_RETURN_CAUSE::WAIT_OBJECT_0.0;
125 let other = other.0;
126
127 if other < base {
128 panic!("other out of bounds; other = {}", other);
129 }
130
131 let index = (other - base) as usize;
132
133 if !(index < self.events.len()) {
134 panic!("wakeup out of bounds; index = {}", index);
135 }
136
137 if index > 0 {
140 if let Some(waker) = guard.wakers.get(index - 1) {
141 waker.ready.store(true, Ordering::Release);
142 waker.waker.wake();
143 }
144
145 continue;
146 }
147 }
148 }
149
150 let mut holders = self.shared.holders.lock();
151 let mut added = mem::replace(&mut holders.added, Vec::new());
152
153 for waker in added.drain(..) {
154 self.events.push(waker.handle);
155 guard.wakers.push(waker);
156 }
157
158 holders.added = added;
159
160 let mut removed = mem::replace(&mut holders.removed, Vec::new());
161
162 for event in removed.drain(..) {
163 let removed = unsafe { event.raw_event().0 };
164
165 if let Some(index) = guard.wakers.iter().position(|w| w.handle.0 == removed) {
166 guard.wakers.swap_remove(index);
167 self.events.swap_remove(index + 1);
168 }
169 }
170
171 holders.removed = removed;
172 }
173
174 mem::forget(guard);
175
176 struct PanicGuard<'a> {
179 shared: &'a Shared,
180 wakers: &'a mut Vec<Arc<Waker>>,
181 }
182
183 impl Drop for PanicGuard<'_> {
184 fn drop(&mut self) {
185 self.shared.running.store(false, Ordering::Release);
186
187 for waker in self.wakers.iter() {
189 waker.waker.wake();
190 }
191 }
192 }
193 }
194
195 fn start(shared: Arc<Shared>) {
196 let state = Driver {
197 events: vec![unsafe { shared.parker.raw_event() }],
198 wakers: vec![],
199 shared,
200 };
201
202 state.run()
203 }
204}
205
206pub struct AsyncEvent {
210 shared: Arc<Shared>,
211 waker: Arc<Waker>,
212 event: Option<Event>,
213}
214
215impl AsyncEvent {
216 pub fn new(initial_state: bool) -> windows::Result<AsyncEvent> {
225 crate::runtime::with_events(|events| {
226 let event = Event::new(false, initial_state)?;
227 let handle = unsafe { event.raw_event() };
228
229 let waker = Arc::new(Waker {
230 ready: AtomicBool::new(false),
231 waker: AtomicWaker::new(),
232 handle,
233 });
234
235 events.shared.holders.lock().added.push(waker.clone());
236 events.shared.parker.set();
237
238 Ok(AsyncEvent {
239 shared: events.shared.clone(),
240 waker,
241 event: Some(event),
242 })
243 })
244 }
245
246 pub async fn wait(&self) {
248 use std::future::Future;
249 use std::pin::Pin;
250 use std::task::{Context, Poll};
251
252 return WaitFor {
253 shared: &*self.shared,
254 waker: &*self.waker,
255 }
256 .await;
257
258 struct WaitFor<'a> {
259 shared: &'a Shared,
260 waker: &'a Waker,
261 }
262
263 impl Future for WaitFor<'_> {
264 type Output = ();
265
266 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
267 if !self.shared.running.load(Ordering::Acquire) {
268 panic!("background thread panicked");
269 }
270
271 if self.waker.ready.load(Ordering::Acquire) {
272 return Poll::Ready(());
273 }
274
275 self.waker.waker.register_by_ref(cx.waker());
276 Poll::Pending
277 }
278 }
279 }
280
281 pub fn set(&self) {
283 self.event.as_ref().unwrap().set();
284 }
285}
286
287impl RawEvent for AsyncEvent {
288 unsafe fn raw_event(&self) -> ss::HANDLE {
289 self.event.as_ref().unwrap().raw_event()
290 }
291}
292
293impl Drop for AsyncEvent {
294 fn drop(&mut self) {
295 let event = self.event.take().unwrap();
296 self.shared.holders.lock().removed.push(event);
297 self.shared.parker.set();
298 }
299}
300
301unsafe impl Send for AsyncEvent {}
302unsafe impl Sync for AsyncEvent {}