js_utils/
event.rs

1//! Event-related utilities.
2
3use std::{
4    cell::RefCell,
5    collections::VecDeque,
6    pin::Pin,
7    rc::Rc,
8    task::{Context, Poll, Waker},
9};
10
11use futures::stream::FusedStream;
12use wasm_bindgen::{convert::FromWasmAbi, prelude::Closure, JsCast};
13use web_sys::EventTarget;
14
15use crate::{closure, JsError};
16
17/// Trait for listening to events with a callback.
18pub trait When: AsRef<EventTarget> + Sized {
19    /// Run `callback` when given event type occurs.
20    fn when<E: FromWasmAbi + 'static, F: FnMut(E) + 'static>(
21        self: &Self,
22        event_type: &'static str,
23        callback: F,
24    ) -> Result<EventListener<Self, E>, JsError>;
25}
26
27/// Trait for creating event streams.
28pub trait Stream: When {
29    /// Create stream of given event type.
30    fn listen<E: FromWasmAbi + 'static>(
31        self: &Self,
32        event_type: &'static str,
33    ) -> Result<EventStream<Self, E>, JsError>;
34}
35
36/// Listener of events.
37///
38/// Drop to remove event listener.
39#[derive(Debug)]
40pub struct EventListener<T, E>
41where
42    T: AsRef<EventTarget>,
43{
44    event_type: &'static str,
45    target: T,
46    closure: Closure<dyn FnMut(E)>,
47}
48
49impl<T, E> Drop for EventListener<T, E>
50where
51    T: AsRef<EventTarget>,
52{
53    fn drop(&mut self) {
54        let _ = self
55            .target
56            .as_ref()
57            .remove_event_listener_with_callback(
58                self.event_type,
59                self.closure.as_ref().unchecked_ref(),
60            );
61    }
62}
63
64impl<T> When for T
65where
66    T: AsRef<EventTarget> + Clone,
67{
68    fn when<E: FromWasmAbi + 'static, F: FnMut(E) + 'static>(
69        self: &Self,
70        event_type: &'static str,
71        callback: F,
72    ) -> Result<EventListener<Self, E>, JsError> {
73        let closure = closure!(callback);
74        self.as_ref()
75            .add_event_listener_with_callback(event_type, closure.as_ref().unchecked_ref())?;
76        Ok(EventListener {
77            event_type,
78            target: self.clone(),
79            closure,
80        })
81    }
82}
83
84/// Stream of events.
85#[derive(Debug)]
86pub struct EventStream<T, E>
87where
88    T: When,
89{
90    state: Rc<RefCell<State<E>>>,
91    listener: Option<EventListener<T, E>>,
92}
93
94impl<T, E> EventStream<T, E>
95where
96    T: AsRef<EventTarget> + Clone,
97{
98    /// Stop listening to events.
99    ///
100    /// This means stream will terminate as soon as all received before events are consumed.
101    pub fn stop(&mut self) {
102        self.listener = None;
103        if let Some(waker) = &self.state.borrow().waker {
104            waker.wake_by_ref();
105        }
106    }
107}
108
109#[derive(Debug)]
110struct State<E> {
111    queue: VecDeque<E>,
112    waker: Option<Waker>,
113}
114
115impl<T, E> Unpin for EventStream<T, E> where T: AsRef<EventTarget> + Clone {}
116
117impl<T, E> futures::Stream for EventStream<T, E>
118where
119    T: AsRef<EventTarget> + Clone,
120{
121    type Item = E;
122
123    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
124        let mut state = self.state.borrow_mut();
125        if let Some(event) = state.queue.pop_front() {
126            Poll::Ready(Some(event))
127        } else if self.listener.is_none() {
128            Poll::Ready(None)
129        } else {
130            let new_waker = cx.waker();
131            if let Some(waker) = &mut state.waker {
132                if !waker.will_wake(new_waker) {
133                    state.waker = Some(new_waker.clone());
134                }
135            } else {
136                state.waker = Some(new_waker.clone());
137            }
138            Poll::Pending
139        }
140    }
141}
142
143impl<T, E> FusedStream for EventStream<T, E>
144where
145    T: AsRef<EventTarget> + Clone,
146{
147    fn is_terminated(&self) -> bool {
148        self.listener.is_none() && self.state.borrow().queue.is_empty()
149    }
150}
151
152impl<T> Stream for T
153where
154    T: When,
155{
156    fn listen<E: FromWasmAbi + 'static>(
157        self: &Self,
158        event_type: &'static str,
159    ) -> Result<EventStream<Self, E>, JsError> {
160        let state = Rc::new(RefCell::new(State {
161            queue: VecDeque::new(),
162            waker: None,
163        }));
164        let state_clone = state.clone();
165        let listener = self.when(event_type, move |event| {
166            let mut state = state_clone.borrow_mut();
167            state.queue.push_back(event);
168            if let Some(waker) = &state.waker {
169                waker.wake_by_ref();
170            }
171        })?;
172        let event_stream = EventStream {
173            state,
174            listener: Some(listener),
175        };
176        Ok(event_stream)
177    }
178}
179
180#[cfg(test)]
181mod tests {
182    use std::{cell::Cell, rc::Rc, time::Duration};
183
184    use futures::StreamExt;
185    use wasm_bindgen_test::wasm_bindgen_test;
186    use web_sys::MouseEvent;
187
188    use crate::{
189        body,
190        event::{EventStream, Stream, When},
191        sleep, spawn,
192    };
193
194    #[wasm_bindgen_test]
195    async fn test_event_listener() {
196        let body = body();
197
198        let result = Rc::new(Cell::new(None));
199        let result_clone = result.clone();
200        let _listener = body
201            .when("click", move |_: MouseEvent| {
202                result_clone.set(Some("Done!"));
203            })
204            .unwrap();
205        body.click();
206        sleep(Duration::from_secs_f32(1.1)).await;
207
208        assert_eq!(result.take().unwrap(), "Done!");
209    }
210
211    #[wasm_bindgen_test]
212    async fn test_event_stream() {
213        let body = body();
214
215        let body_clone = body.clone();
216        let handle = spawn(async move {
217            let mut stream: EventStream<_, MouseEvent> = body_clone.listen("click").unwrap();
218            stream.next().await.unwrap();
219            stream.next().await.unwrap();
220            stream.stop();
221        });
222        sleep(Duration::from_secs_f32(0.1)).await;
223        body.click();
224        body.click();
225        let _ = handle.await;
226
227        let mut stream: EventStream<_, MouseEvent> = body.listen("click").unwrap();
228        body.click();
229        body.click();
230        body.click();
231        stream.stop();
232        body.click();
233        body.click();
234
235        let mut c = 0;
236        assert_eq!(
237            stream
238                .map(move |_: MouseEvent| {
239                    c += 1;
240                    c
241                })
242                .collect::<Vec<i32>>()
243                .await,
244            vec![1, 2, 3]
245        );
246    }
247}