async_winit/filter.rs
1/*
2
3`async-winit` is free software: you can redistribute it and/or modify it under the terms of one of
4the following licenses:
5
6* GNU Lesser General Public License as published by the Free Software Foundation, either
7 version 3 of the License, or (at your option) any later version.
8* Mozilla Public License as published by the Mozilla Foundation, version 2.
9
10`async-winit` is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even
11the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General
12Public License and the Patron License for more details.
13
14You should have received a copy of the GNU Lesser General Public License and the Mozilla
15Public License along with `async-winit`. If not, see <https://www.gnu.org/licenses/>.
16
17*/
18
19//! Filters, or the mechanism used internally by the event loop.
20//!
21//! This module is exposed such that it is possible to integrate `async-winit` easily with existing
22//! `winit` applications. The `Filter` type can be provided events, and will send those events to this
23//! library's event handlers.
24
25use std::cell::Cell;
26use std::cmp;
27use std::future::Future;
28use std::pin::Pin;
29use std::sync::atomic::{AtomicBool, Ordering};
30use std::sync::{Arc, Mutex};
31use std::task::{Context, Poll, Wake, Waker};
32use std::time::Instant;
33
34use futures_lite::prelude::*;
35use parking::Parker;
36
37use crate::event_loop::Wakeup;
38use crate::reactor::Reactor;
39use crate::sync::ThreadSafety;
40
41use winit::event::Event;
42use winit::event_loop::{ControlFlow, EventLoop, EventLoopProxy, EventLoopWindowTarget};
43
44/// Either a function returned, or an associated future returned first.
45#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
46pub enum ReturnOrFinish<O, T> {
47 /// The function returned.
48 Output(O),
49
50 /// The associated future returned first.
51 FutureReturned(T),
52}
53
54/// The filter for passing events to `async` contexts.
55///
56/// This type takes events and passes them to the event handlers. It also handles the `async` contexts
57/// that are waiting for events.
58pub struct Filter<TS: ThreadSafety> {
59 /// The deadline to wait until.
60 deadline: Option<Instant>,
61
62 /// The wakers to wake up later.
63 wakers: Vec<Waker>,
64
65 /// The parker to use for posting.
66 parker: Parker,
67
68 /// The notifier.
69 notifier: Arc<ReactorWaker>,
70
71 /// The waker version of `notifier`.
72 ///
73 /// Keeping it cached like this is more efficient than creating a new waker every time.
74 notifier_waker: Waker,
75
76 /// A waker connected to `parker`.
77 ///
78 /// Again, it is more efficient to keep it cached like this.
79 parker_waker: Waker,
80
81 /// The future has indicated that it wants to yield.
82 yielding: bool,
83
84 /// The reactor.
85 reactor: TS::Rc<Reactor<TS>>,
86}
87
88impl<TS: ThreadSafety> Filter<TS> {
89 /// Create a new filter from an event loop.
90 ///
91 /// The future is polled once before returning to set up event handlers.
92 pub fn new(inner: &EventLoop<Wakeup>) -> Filter<TS> {
93 let reactor = Reactor::<TS>::get();
94
95 // Create a waker to wake us up.
96 let notifier = Arc::new(ReactorWaker {
97 proxy: Mutex::new(inner.create_proxy()),
98 notified: AtomicBool::new(true),
99 awake: AtomicBool::new(false),
100 });
101 let notifier_waker = Waker::from(notifier.clone());
102 reactor.set_proxy(notifier.clone());
103
104 // Parker/unparker pair.
105 let (parker, unparker) = parking::pair();
106 let parker_waker = Waker::from(Arc::new(EventPostWaker {
107 reactor_waker: notifier.clone(),
108 unparker,
109 }));
110
111 Filter {
112 deadline: None,
113 wakers: vec![],
114 parker,
115 notifier,
116 notifier_waker,
117 parker_waker,
118 yielding: false,
119 reactor,
120 }
121 }
122
123 /// Handle an event.
124 ///
125 /// This function will block on the future if it is in the holding pattern.
126 pub fn handle_event<F>(
127 &mut self,
128 future: Pin<&mut F>,
129 event: Event<'_, Wakeup>,
130 elwt: &EventLoopWindowTarget<Wakeup>,
131 flow: &mut ControlFlow,
132 ) -> ReturnOrFinish<(), F::Output>
133 where
134 F: Future,
135 {
136 // Create a future that can be polled freely.
137 let output = Cell::new(ReturnOrFinish::Output(()));
138 let future = {
139 let output = &output;
140 async move {
141 output.set(ReturnOrFinish::FutureReturned(future.await));
142 }
143 };
144 futures_lite::pin!(future);
145
146 // Some events have special meanings.
147 let about_to_sleep = match &event {
148 Event::NewEvents(_) => {
149 // Stop yielding now.
150 self.yielding = false;
151
152 // We were previously asleep and are now awake.
153 self.notifier.awake.store(true, Ordering::SeqCst);
154
155 // Figure out how long we should wait for.
156 self.deadline = self.reactor.process_timers(&mut self.wakers);
157
158 // We are not about to fall asleep.
159 false
160 }
161
162 Event::RedrawEventsCleared => {
163 // We are about to fall asleep, so make sure that the future knows it.
164 self.notifier.awake.store(false, Ordering::SeqCst);
165
166 // We are about to fall asleep.
167 true
168 }
169
170 _ => {
171 // We are not about to fall asleep.
172 false
173 }
174 };
175
176 // Notify the reactor with our event.
177 let notifier = self.reactor.post_event(event);
178 futures_lite::pin!(notifier);
179
180 // Try to poll it once.
181 let mut cx = Context::from_waker(&self.parker_waker);
182 if notifier.as_mut().poll(&mut cx).is_pending() {
183 // We've hit a point where the future is interested, stop yielding.
184 self.yielding = false;
185
186 // Poll the future in parallel with the user's future.
187 let driver = future.as_mut().or(notifier);
188 futures_lite::pin!(driver);
189
190 // Drain the request queue before anything else.
191 self.reactor.drain_loop_queue(elwt);
192
193 // Block on the parker/unparker pair.
194 loop {
195 if let Poll::Ready(()) = driver.as_mut().poll(&mut cx) {
196 break;
197 }
198
199 // Drain the incoming queue of requests.
200 self.reactor.drain_loop_queue(elwt);
201
202 // Handle timers.
203 let deadline = {
204 let current_deadline = self.reactor.process_timers(&mut self.wakers);
205
206 match (current_deadline, self.deadline) {
207 (None, None) => None,
208 (Some(x), None) | (None, Some(x)) => Some(x),
209 (Some(a), Some(b)) => Some(cmp::min(a, b)),
210 }
211 };
212
213 // Wake any wakers that need to be woken.
214 for waker in self.wakers.drain(..) {
215 waker.wake();
216 }
217
218 // Park the thread until it is notified, or until the timeout.
219 match deadline {
220 None => self.parker.park(),
221 Some(deadline) => {
222 self.parker.park_deadline(deadline);
223 }
224 }
225 }
226 }
227
228 // If the future is still notified, we should poll it.
229 while !self.yielding && self.notifier.notified.swap(false, Ordering::SeqCst) {
230 let mut cx = Context::from_waker(&self.notifier_waker);
231 if future.as_mut().poll(&mut cx).is_pending() {
232 // If the future is *still* notified, it's probably calling future::yield_now(), which
233 // indicates that it wants to stop hogging the event loop. Indicate that we should stop
234 // polling it until we get NewEvents.
235 if self.notifier.notified.load(Ordering::SeqCst) {
236 self.yielding = true;
237 }
238
239 // Drain the incoming queue of requests.
240 self.reactor.drain_loop_queue(elwt);
241 }
242 }
243
244 // Wake everything up if we're about to sleep.
245 if about_to_sleep {
246 self.reactor.drain_loop_queue(elwt);
247 for waker in self.wakers.drain(..) {
248 waker.wake();
249 }
250 }
251
252 // Set the control flow.
253 if let Some(code) = self.reactor.exit_requested() {
254 // The user wants to exit.
255 flow.set_exit_with_code(code);
256 } else if self.yielding {
257 // The future wants to be polled again as soon as possible.
258 flow.set_poll();
259 } else if let Some(deadline) = self.deadline {
260 // The future wants to be polled again when the deadline is reached.
261 flow.set_wait_until(deadline);
262 } else {
263 // The future wants to poll.
264 flow.set_wait();
265 }
266
267 // Return the output if any.
268 output.replace(ReturnOrFinish::Output(()))
269 }
270}
271
272pub(crate) struct ReactorWaker {
273 /// The proxy used to wake up the event loop.
274 proxy: Mutex<EventLoopProxy<Wakeup>>,
275
276 /// Whether or not we are already notified.
277 notified: AtomicBool,
278
279 /// Whether or not the reactor is awake.
280 ///
281 /// The reactor is awake when we don't
282 awake: AtomicBool,
283}
284
285impl ReactorWaker {
286 pub(crate) fn notify(&self) {
287 // If we are already notified, don't notify again.
288 if self.notified.swap(true, Ordering::SeqCst) {
289 return;
290 }
291
292 // If we are currently polling the event loop, don't notify.
293 if self.awake.load(Ordering::SeqCst) {
294 return;
295 }
296
297 // Wake up the reactor.
298 self.proxy
299 .lock()
300 .unwrap()
301 .send_event(Wakeup { _private: () })
302 .ok();
303 }
304}
305
306impl Wake for ReactorWaker {
307 fn wake(self: Arc<Self>) {
308 self.notify()
309 }
310
311 fn wake_by_ref(self: &Arc<Self>) {
312 self.notify()
313 }
314}
315
316struct EventPostWaker {
317 /// The underlying reactor waker.
318 reactor_waker: Arc<ReactorWaker>,
319
320 /// The unparker for the notifier.
321 unparker: parking::Unparker,
322}
323
324impl Wake for EventPostWaker {
325 fn wake(self: Arc<Self>) {
326 self.reactor_waker.notify();
327 self.unparker.unpark();
328 }
329
330 fn wake_by_ref(self: &Arc<Self>) {
331 self.reactor_waker.notify();
332 self.unparker.unpark();
333 }
334}