async_winit/
handler.rs

1/*
2
3`async-winit` is free software: you can redistribute it and/or modify it under the terms of one of
4the following licenses:
5
6* GNU Lesser General Public License as published by the Free Software Foundation, either
7  version 3 of the License, or (at your option) any later version.
8* Mozilla Public License as published by the Mozilla Foundation, version 2.
9
10`async-winit` is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even
11the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General
12Public License and the Patron License for more details.
13
14You should have received a copy of the GNU Lesser General Public License and the Mozilla
15Public License along with `async-winit`. If not, see <https://www.gnu.org/licenses/>.
16
17*/
18
19//! Handle incoming events.
20
21use std::cell::Cell;
22use std::future::{Future, IntoFuture};
23use std::mem;
24use std::ops::{Deref, DerefMut};
25use std::pin::Pin;
26use std::task::{Context, Poll, Waker};
27
28use futures_lite::{future, Stream};
29use slab::Slab;
30
31use crate::sync::{MutexGuard, ThreadSafety, __private::*};
32
33/// An event handler.
34///
35/// This type is used to receive events from the GUI system. Whenever an event occurs, it is sent to
36/// all of the listeners of the corresponding event type. The listeners can then process the event
37/// asynchronously.
38///
39/// There are four ways to listen to events:
40///
41/// - Using the `wait_once()` function, which waits for a single instance of the event. However, there
42///   is a race condition where it can miss events in multithreaded environments where the event
43///   occurs between the time the event is received and the time the listener is registered. To avoid
44///   this, use one of the other methods. However, this method is the most efficient.
45/// - Using the `wait_many()` stream, which asynchronously iterates over events.
46/// - Using the `wait_direct[_async]()` function, which runs a closure in the event handler. This is
47///   good for use cases like drawing.
48/// - Using the `wait_guard()` function, which forces the event handler to stop until the event
49///   has been completely processed. This is good for use cases like handling suspends.
50///
51/// This type does not allocate unless you use any waiting functions; therefore, you only pay overhead
52/// for events that you use.
53pub struct Handler<T: Event, TS: ThreadSafety> {
54    /// State of the handler.
55    ///
56    /// `State` is around sixteen words plus the size of `T::Clonable`, and we store around 25 of
57    /// them per instance of `window::Registration`. In the interest of not blowing up the size of
58    /// `Registration`, we allocate this on the heap. Also, since sometimes the event will not ever
59    /// be used, we use a `OnceLock` to avoid allocating the state until it is needed.
60    state: TS::OnceLock<Box<TS::Mutex<State<T>>>>,
61}
62
63struct State<T: Event> {
64    /// Listeners for the event.
65    ///
66    /// These form a linked list.
67    listeners: Slab<Listener>,
68
69    /// List of direct listeners.
70    directs: Vec<DirectListener<T>>,
71
72    /// The head and tail of the linked list.
73    head_and_tail: Option<(usize, usize)>,
74
75    /// The top-level task waiting for this task to finish.
76    waker: Option<Waker>,
77
78    /// The currently active event.
79    instance: Option<T::Clonable>,
80}
81
82type DirectListener<T> =
83    Box<dyn FnMut(&mut <T as Event>::Unique<'_>) -> DirectFuture + Send + 'static>;
84type DirectFuture = Pin<Box<dyn Future<Output = bool> + Send + 'static>>;
85
86impl<T: Event, TS: ThreadSafety> Handler<T, TS> {
87    pub(crate) fn new() -> Self {
88        Self {
89            state: TS::OnceLock::new(),
90        }
91    }
92
93    pub(crate) async fn run_with(&self, event: &mut T::Unique<'_>) {
94        // If the state hasn't been created yet, return.
95        let state = match self.state.get() {
96            Some(state) => state,
97            None => return,
98        };
99
100        // Run the direct listeners.
101        let mut state_lock = Some(state.lock().unwrap());
102        if self.run_direct_listeners(&mut state_lock, event).await {
103            return;
104        }
105
106        // Set up the listeners to run.
107        {
108            let state = state_lock.get_or_insert_with(|| state.lock().unwrap());
109
110            // If there are no listeners, return.
111            let head = match state.head_and_tail {
112                Some((head, _)) => head,
113                None => return,
114            };
115
116            // Set up the state.
117            state.instance = Some(T::downgrade(event));
118
119            // Notify the first entry in the list.
120            if let Some(waker) = state.notify(head) {
121                waker.wake();
122            }
123        }
124
125        // Wait for the listeners to finish running.
126        future::poll_fn(|cx| {
127            let mut state = state_lock.take().unwrap_or_else(|| state.lock().unwrap());
128
129            // If there are no listeners, return.
130            if state.head_and_tail.is_none() {
131                return Poll::Ready(());
132            }
133
134            // If the waking is over, return.
135            if state.instance.is_none() {
136                return Poll::Ready(());
137            }
138
139            // If we don't need to set the waker, stop right now.
140            if let Some(waker) = &state.waker {
141                if waker.will_wake(cx.waker()) {
142                    return Poll::Pending;
143                }
144            }
145
146            // Set the waker and return.
147            state.waker = Some(cx.waker().clone());
148            Poll::Pending
149        })
150        .await
151    }
152
153    async fn run_direct_listeners(
154        &self,
155        state: &mut Option<MutexGuard<'_, State<T>, TS>>,
156        event: &mut T::Unique<'_>,
157    ) -> bool {
158        /// Guard to restore direct listeners event a
159        struct RestoreDirects<'a, T: Event, TS: ThreadSafety> {
160            state: &'a Handler<T, TS>,
161            directs: Vec<DirectListener<T>>,
162        }
163
164        impl<T: Event, TS: ThreadSafety> Drop for RestoreDirects<'_, T, TS> {
165            fn drop(&mut self) {
166                let mut directs = mem::take(&mut self.directs);
167                self.state
168                    .state()
169                    .lock()
170                    .unwrap()
171                    .directs
172                    .append(&mut directs);
173            }
174        }
175
176        // If there are not indirect listeners, skip this part entirely.
177        let state_ref = state.as_mut().unwrap();
178        if state_ref.directs.is_empty() {
179            return false;
180        }
181
182        // Take out the direct listeners.
183        let mut directs = RestoreDirects {
184            directs: mem::take(&mut state_ref.directs),
185            state: self,
186        };
187
188        // Make sure the mutex isn't locked while we call user code.
189        *state = None;
190
191        // Iterate over the direct listeners.
192        for direct in &mut directs.directs {
193            if direct(event).await {
194                return true;
195            }
196        }
197
198        false
199    }
200
201    /// Wait for the next event.
202    pub fn wait(&self) -> Waiter<'_, T, TS> {
203        Waiter::new(self)
204    }
205
206    /// Register an async closure be called when the event is received.
207    pub fn wait_direct_async<
208        Fut: Future<Output = bool> + Send + 'static,
209        F: FnMut(&mut T::Unique<'_>) -> Fut + Send + 'static,
210    >(
211        &self,
212        mut f: F,
213    ) {
214        let mut state = self.state().lock().unwrap();
215        state.directs.push(Box::new(move |u| Box::pin(f(u))))
216    }
217
218    /// Register a closure be called when the event is received.
219    pub fn wait_direct(&self, mut f: impl FnMut(&mut T::Unique<'_>) -> bool + Send + 'static) {
220        self.wait_direct_async(move |u| std::future::ready(f(u)))
221    }
222
223    /// Get the inner state.
224    fn state(&self) -> &TS::Mutex<State<T>> {
225        self.state
226            .get_or_init(|| Box::new(TS::Mutex::new(State::new())))
227    }
228}
229
230impl<T: Event, TS: ThreadSafety> Unpin for Handler<T, TS> {}
231
232impl<'a, T: Event, TS: ThreadSafety> IntoFuture for &'a Handler<T, TS> {
233    type IntoFuture = Waiter<'a, T, TS>;
234    type Output = T::Clonable;
235
236    fn into_future(self) -> Self::IntoFuture {
237        self.wait()
238    }
239}
240
241/// Waits for an event to be received.
242pub struct Waiter<'a, T: Event, TS: ThreadSafety> {
243    /// The event handler.
244    handler: &'a Handler<T, TS>,
245
246    /// The index of our listener.
247    index: usize,
248}
249
250impl<T: Event, TS: ThreadSafety> Unpin for Waiter<'_, T, TS> {}
251
252impl<'a, T: Event, TS: ThreadSafety> Waiter<'a, T, TS> {
253    /// Create a new waiter.
254    pub(crate) fn new(handler: &'a Handler<T, TS>) -> Self {
255        // Get the inner state.
256        let state = handler.state();
257
258        // Insert the listener.
259        let index = state.lock().unwrap().insert();
260        Self { handler, index }
261    }
262
263    fn notify_next(&mut self, mut state: MutexGuard<'_, State<T>, TS>) {
264        if let Some(next) = state.listeners[self.index].next.get() {
265            // Notify the next listener.
266            if let Some(waker) = state.notify(next) {
267                waker.wake();
268            }
269        } else {
270            // We're done with the chain, notify the top-level task.
271            state.instance = None;
272            if let Some(waker) = state.waker.take() {
273                waker.wake();
274            }
275        }
276    }
277
278    /// Wait for a guard that prevents the event from moving on.
279    pub async fn hold(&mut self) -> HoldGuard<'_, 'a, T, TS> {
280        // Wait for the event.
281        let event = future::poll_fn(|cx| {
282            let mut state = self.handler.state().lock().unwrap();
283
284            // See if we are notified.
285            if state.take_notification(self.index) {
286                let event = match state.instance.clone() {
287                    Some(event) => event,
288                    None => return Poll::Pending,
289                };
290
291                // Return the event.
292                return Poll::Ready(event);
293            }
294
295            // Register the waker and sleep.
296            state.register_waker(self.index, cx.waker());
297            Poll::Pending
298        })
299        .await;
300
301        HoldGuard {
302            waiter: self,
303            event: Some(event),
304        }
305    }
306}
307
308impl<T: Event, TS: ThreadSafety> Future for Waiter<'_, T, TS> {
309    type Output = T::Clonable;
310
311    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
312        match self.poll_next(cx) {
313            Poll::Ready(Some(event)) => Poll::Ready(event),
314            Poll::Ready(None) => panic!("event handler was dropped"),
315            Poll::Pending => Poll::Pending,
316        }
317    }
318}
319
320impl<T: Event, TS: ThreadSafety> Stream for Waiter<'_, T, TS> {
321    type Item = T::Clonable;
322
323    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
324        let mut state = self.handler.state.get().unwrap().lock().unwrap();
325
326        // See if we are notified.
327        if state.take_notification(self.index) {
328            let event = match state.instance.clone() {
329                Some(event) => event,
330                None => return Poll::Pending,
331            };
332
333            // Notify the next listener in the chain.
334            self.notify_next(state);
335
336            // Return the event.
337            return Poll::Ready(Some(event));
338        }
339
340        // Register the waker.
341        state.register_waker(self.index, cx.waker());
342
343        Poll::Pending
344    }
345
346    fn size_hint(&self) -> (usize, Option<usize>) {
347        (usize::MAX, None)
348    }
349}
350
351impl<'a, T: Event, TS: ThreadSafety> Drop for Waiter<'a, T, TS> {
352    fn drop(&mut self) {
353        let mut state = self.handler.state().lock().unwrap();
354
355        // Remove the listener.
356        let listener = state.remove(self.index);
357
358        // Notify the next listener if we are notified.
359        if listener.notified.get() {
360            self.notify_next(state);
361        }
362    }
363}
364
365/// A guard that notifies the next listener when dropped.
366pub struct HoldGuard<'waiter, 'handler, T: Event, TS: ThreadSafety> {
367    /// The waiter.
368    waiter: &'waiter mut Waiter<'handler, T, TS>,
369
370    /// The event we just received.
371    event: Option<T::Clonable>,
372}
373
374impl<T: Event, TS: ThreadSafety> Deref for HoldGuard<'_, '_, T, TS> {
375    type Target = T::Clonable;
376
377    fn deref(&self) -> &Self::Target {
378        self.event.as_ref().unwrap()
379    }
380}
381
382impl<T: Event, TS: ThreadSafety> DerefMut for HoldGuard<'_, '_, T, TS> {
383    fn deref_mut(&mut self) -> &mut Self::Target {
384        self.event.as_mut().unwrap()
385    }
386}
387
388impl<T: Event, TS: ThreadSafety> HoldGuard<'_, '_, T, TS> {
389    /// Get the event.
390    pub fn into_inner(mut self) -> T::Clonable {
391        self.event.take().unwrap()
392    }
393}
394
395impl<T: Event, TS: ThreadSafety> Drop for HoldGuard<'_, '_, T, TS> {
396    fn drop(&mut self) {
397        // Tell the waiter to notify the next listener.
398        self.waiter
399            .notify_next(self.waiter.handler.state().lock().unwrap());
400    }
401}
402
403impl<T: Event> State<T> {
404    /// Get a fresh state instance.
405    fn new() -> Self {
406        Self {
407            listeners: Slab::new(),
408            directs: Vec::new(),
409            head_and_tail: None,
410            waker: None,
411            instance: None,
412        }
413    }
414
415    /// Insert a new listener into the list.
416    fn insert(&mut self) -> usize {
417        // Create the listener.
418        let listener = Listener {
419            next: Cell::new(None),
420            prev: Cell::new(self.head_and_tail.map(|(_, tail)| tail)),
421            waker: Cell::new(None),
422            notified: Cell::new(false),
423        };
424
425        // Insert the listener into the list.
426        let index = self.listeners.insert(listener);
427
428        // Update the head and tail.
429        match &mut self.head_and_tail {
430            Some((_head, tail)) => {
431                self.listeners[*tail].next.set(Some(index));
432                *tail = index;
433            }
434
435            None => {
436                self.head_and_tail = Some((index, index));
437            }
438        }
439
440        index
441    }
442
443    /// Remove a listener from the list.
444    fn remove(&mut self, index: usize) -> Listener {
445        // Get the listener.
446        let listener = self.listeners.remove(index);
447
448        // Update the head and tail.
449        match &mut self.head_and_tail {
450            Some((head, tail)) => {
451                if *head == index && *tail == index {
452                    self.head_and_tail = None;
453                } else if *head == index {
454                    self.head_and_tail = Some((listener.next.get().unwrap(), *tail));
455                } else if *tail == index {
456                    self.head_and_tail = Some((*head, listener.prev.get().unwrap()));
457                }
458            }
459
460            None => panic!("invalid listener list: head and tail are both None"),
461        }
462
463        // Update the next and previous listeners.
464        if let Some(next) = listener.next.get() {
465            self.listeners[next].prev.set(listener.prev.get());
466        }
467
468        if let Some(prev) = listener.prev.get() {
469            self.listeners[prev].next.set(listener.next.get());
470        }
471
472        listener
473    }
474
475    /// Take out the notification.
476    fn take_notification(&mut self, index: usize) -> bool {
477        self.listeners[index].notified.replace(false)
478    }
479
480    /// Register a waker.
481    fn register_waker(&mut self, index: usize, waker: &Waker) {
482        let listener = &mut self.listeners[index];
483
484        // If the listener's waker is the same as ours, no need to clone.
485        let current_waker = listener.waker.take();
486        match current_waker {
487            Some(current_waker) if current_waker.will_wake(waker) => {
488                listener.waker.replace(Some(current_waker));
489            }
490            _ => {
491                listener.waker.replace(Some(waker.clone()));
492            }
493        }
494    }
495
496    /// Notify the listener.
497    fn notify(&mut self, index: usize) -> Option<Waker> {
498        // If the listener is already notified, return.
499        if self.listeners[index].notified.replace(true) {
500            return None;
501        }
502
503        // Return the waker.
504        self.listeners[index].waker.replace(None)
505    }
506}
507
508/// A registered listener in the event handler.
509struct Listener {
510    /// The next listener in the list.
511    next: Cell<Option<usize>>,
512
513    /// The previous listener in the list.
514    prev: Cell<Option<usize>>,
515
516    /// The waker for the listener.
517    waker: Cell<Option<Waker>>,
518
519    /// Whether or not this listener is notified.
520    notified: Cell<bool>,
521}
522
523/// The type of event that can be sent over a [`Handler`].
524pub trait Event {
525    type Clonable: Clone + 'static;
526    type Unique<'a>: 'a;
527
528    fn downgrade(unique: &mut Self::Unique<'_>) -> Self::Clonable;
529}
530
531impl<T: Clone + 'static> Event for T {
532    type Clonable = T;
533    type Unique<'a> = T;
534
535    fn downgrade(unique: &mut Self::Unique<'_>) -> Self::Clonable {
536        unique.clone()
537    }
538}
539
540struct CallOnDrop<F: FnMut()>(F);
541
542impl<F: FnMut()> Drop for CallOnDrop<F> {
543    fn drop(&mut self) {
544        (self.0)();
545    }
546}