async_winit/
filter.rs

1/*
2
3`async-winit` is free software: you can redistribute it and/or modify it under the terms of one of
4the following licenses:
5
6* GNU Lesser General Public License as published by the Free Software Foundation, either
7  version 3 of the License, or (at your option) any later version.
8* Mozilla Public License as published by the Mozilla Foundation, version 2.
9
10`async-winit` is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even
11the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General
12Public License and the Patron License for more details.
13
14You should have received a copy of the GNU Lesser General Public License and the Mozilla
15Public License along with `async-winit`. If not, see <https://www.gnu.org/licenses/>.
16
17*/
18
19//! Filters, or the mechanism used internally by the event loop.
20//!
21//! This module is exposed such that it is possible to integrate `async-winit` easily with existing
22//! `winit` applications. The `Filter` type can be provided events, and will send those events to this
23//! library's event handlers.
24
25use std::cell::Cell;
26use std::cmp;
27use std::future::Future;
28use std::pin::Pin;
29use std::sync::atomic::{AtomicBool, Ordering};
30use std::sync::{Arc, Mutex};
31use std::task::{Context, Poll, Wake, Waker};
32use std::time::Instant;
33
34use futures_lite::prelude::*;
35use parking::Parker;
36
37use crate::event_loop::Wakeup;
38use crate::reactor::Reactor;
39use crate::sync::ThreadSafety;
40
41use winit::event::Event;
42use winit::event_loop::{ControlFlow, EventLoop, EventLoopProxy, EventLoopWindowTarget};
43
44/// Either a function returned, or an associated future returned first.
45#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
46pub enum ReturnOrFinish<O, T> {
47    /// The function returned.
48    Output(O),
49
50    /// The associated future returned first.
51    FutureReturned(T),
52}
53
54/// The filter for passing events to `async` contexts.
55///
56/// This type takes events and passes them to the event handlers. It also handles the `async` contexts
57/// that are waiting for events.
58pub struct Filter<TS: ThreadSafety> {
59    /// The deadline to wait until.
60    deadline: Option<Instant>,
61
62    /// The wakers to wake up later.
63    wakers: Vec<Waker>,
64
65    /// The parker to use for posting.
66    parker: Parker,
67
68    /// The notifier.
69    notifier: Arc<ReactorWaker>,
70
71    /// The waker version of `notifier`.
72    ///
73    /// Keeping it cached like this is more efficient than creating a new waker every time.
74    notifier_waker: Waker,
75
76    /// A waker connected to `parker`.
77    ///
78    /// Again, it is more efficient to keep it cached like this.
79    parker_waker: Waker,
80
81    /// The future has indicated that it wants to yield.
82    yielding: bool,
83
84    /// The reactor.
85    reactor: TS::Rc<Reactor<TS>>,
86}
87
88impl<TS: ThreadSafety> Filter<TS> {
89    /// Create a new filter from an event loop.
90    ///
91    /// The future is polled once before returning to set up event handlers.
92    pub fn new(inner: &EventLoop<Wakeup>) -> Filter<TS> {
93        let reactor = Reactor::<TS>::get();
94
95        // Create a waker to wake us up.
96        let notifier = Arc::new(ReactorWaker {
97            proxy: Mutex::new(inner.create_proxy()),
98            notified: AtomicBool::new(true),
99            awake: AtomicBool::new(false),
100        });
101        let notifier_waker = Waker::from(notifier.clone());
102        reactor.set_proxy(notifier.clone());
103
104        // Parker/unparker pair.
105        let (parker, unparker) = parking::pair();
106        let parker_waker = Waker::from(Arc::new(EventPostWaker {
107            reactor_waker: notifier.clone(),
108            unparker,
109        }));
110
111        Filter {
112            deadline: None,
113            wakers: vec![],
114            parker,
115            notifier,
116            notifier_waker,
117            parker_waker,
118            yielding: false,
119            reactor,
120        }
121    }
122
123    /// Handle an event.
124    ///
125    /// This function will block on the future if it is in the holding pattern.
126    pub fn handle_event<F>(
127        &mut self,
128        future: Pin<&mut F>,
129        event: Event<'_, Wakeup>,
130        elwt: &EventLoopWindowTarget<Wakeup>,
131        flow: &mut ControlFlow,
132    ) -> ReturnOrFinish<(), F::Output>
133    where
134        F: Future,
135    {
136        // Create a future that can be polled freely.
137        let output = Cell::new(ReturnOrFinish::Output(()));
138        let future = {
139            let output = &output;
140            async move {
141                output.set(ReturnOrFinish::FutureReturned(future.await));
142            }
143        };
144        futures_lite::pin!(future);
145
146        // Some events have special meanings.
147        let about_to_sleep = match &event {
148            Event::NewEvents(_) => {
149                // Stop yielding now.
150                self.yielding = false;
151
152                // We were previously asleep and are now awake.
153                self.notifier.awake.store(true, Ordering::SeqCst);
154
155                // Figure out how long we should wait for.
156                self.deadline = self.reactor.process_timers(&mut self.wakers);
157
158                // We are not about to fall asleep.
159                false
160            }
161
162            Event::RedrawEventsCleared => {
163                // We are about to fall asleep, so make sure that the future knows it.
164                self.notifier.awake.store(false, Ordering::SeqCst);
165
166                // We are about to fall asleep.
167                true
168            }
169
170            _ => {
171                // We are not about to fall asleep.
172                false
173            }
174        };
175
176        // Notify the reactor with our event.
177        let notifier = self.reactor.post_event(event);
178        futures_lite::pin!(notifier);
179
180        // Try to poll it once.
181        let mut cx = Context::from_waker(&self.parker_waker);
182        if notifier.as_mut().poll(&mut cx).is_pending() {
183            // We've hit a point where the future is interested, stop yielding.
184            self.yielding = false;
185
186            // Poll the future in parallel with the user's future.
187            let driver = future.as_mut().or(notifier);
188            futures_lite::pin!(driver);
189
190            // Drain the request queue before anything else.
191            self.reactor.drain_loop_queue(elwt);
192
193            // Block on the parker/unparker pair.
194            loop {
195                if let Poll::Ready(()) = driver.as_mut().poll(&mut cx) {
196                    break;
197                }
198
199                // Drain the incoming queue of requests.
200                self.reactor.drain_loop_queue(elwt);
201
202                // Handle timers.
203                let deadline = {
204                    let current_deadline = self.reactor.process_timers(&mut self.wakers);
205
206                    match (current_deadline, self.deadline) {
207                        (None, None) => None,
208                        (Some(x), None) | (None, Some(x)) => Some(x),
209                        (Some(a), Some(b)) => Some(cmp::min(a, b)),
210                    }
211                };
212
213                // Wake any wakers that need to be woken.
214                for waker in self.wakers.drain(..) {
215                    waker.wake();
216                }
217
218                // Park the thread until it is notified, or until the timeout.
219                match deadline {
220                    None => self.parker.park(),
221                    Some(deadline) => {
222                        self.parker.park_deadline(deadline);
223                    }
224                }
225            }
226        }
227
228        // If the future is still notified, we should poll it.
229        while !self.yielding && self.notifier.notified.swap(false, Ordering::SeqCst) {
230            let mut cx = Context::from_waker(&self.notifier_waker);
231            if future.as_mut().poll(&mut cx).is_pending() {
232                // If the future is *still* notified, it's probably calling future::yield_now(), which
233                // indicates that it wants to stop hogging the event loop. Indicate that we should stop
234                // polling it until we get NewEvents.
235                if self.notifier.notified.load(Ordering::SeqCst) {
236                    self.yielding = true;
237                }
238
239                // Drain the incoming queue of requests.
240                self.reactor.drain_loop_queue(elwt);
241            }
242        }
243
244        // Wake everything up if we're about to sleep.
245        if about_to_sleep {
246            self.reactor.drain_loop_queue(elwt);
247            for waker in self.wakers.drain(..) {
248                waker.wake();
249            }
250        }
251
252        // Set the control flow.
253        if let Some(code) = self.reactor.exit_requested() {
254            // The user wants to exit.
255            flow.set_exit_with_code(code);
256        } else if self.yielding {
257            // The future wants to be polled again as soon as possible.
258            flow.set_poll();
259        } else if let Some(deadline) = self.deadline {
260            // The future wants to be polled again when the deadline is reached.
261            flow.set_wait_until(deadline);
262        } else {
263            // The future wants to poll.
264            flow.set_wait();
265        }
266
267        // Return the output if any.
268        output.replace(ReturnOrFinish::Output(()))
269    }
270}
271
272pub(crate) struct ReactorWaker {
273    /// The proxy used to wake up the event loop.
274    proxy: Mutex<EventLoopProxy<Wakeup>>,
275
276    /// Whether or not we are already notified.
277    notified: AtomicBool,
278
279    /// Whether or not the reactor is awake.
280    ///
281    /// The reactor is awake when we don't
282    awake: AtomicBool,
283}
284
285impl ReactorWaker {
286    pub(crate) fn notify(&self) {
287        // If we are already notified, don't notify again.
288        if self.notified.swap(true, Ordering::SeqCst) {
289            return;
290        }
291
292        // If we are currently polling the event loop, don't notify.
293        if self.awake.load(Ordering::SeqCst) {
294            return;
295        }
296
297        // Wake up the reactor.
298        self.proxy
299            .lock()
300            .unwrap()
301            .send_event(Wakeup { _private: () })
302            .ok();
303    }
304}
305
306impl Wake for ReactorWaker {
307    fn wake(self: Arc<Self>) {
308        self.notify()
309    }
310
311    fn wake_by_ref(self: &Arc<Self>) {
312        self.notify()
313    }
314}
315
316struct EventPostWaker {
317    /// The underlying reactor waker.
318    reactor_waker: Arc<ReactorWaker>,
319
320    /// The unparker for the notifier.
321    unparker: parking::Unparker,
322}
323
324impl Wake for EventPostWaker {
325    fn wake(self: Arc<Self>) {
326        self.reactor_waker.notify();
327        self.unparker.unpark();
328    }
329
330    fn wake_by_ref(self: &Arc<Self>) {
331        self.reactor_waker.notify();
332        self.unparker.unpark();
333    }
334}