js_utils/
event.rs

1//! Event-related utilities.
2
3use std::{
4    cell::RefCell,
5    collections::VecDeque,
6    pin::Pin,
7    rc::Rc,
8    task::{Context, Poll, Waker},
9};
10
11use futures::stream::FusedStream;
12use wasm_bindgen::{convert::FromWasmAbi, prelude::Closure, JsCast};
13use web_sys::EventTarget;
14
15use crate::{closure, JsError};
16
17/// Trait for listening to events with a callback.
18pub trait When: AsRef<EventTarget> + Sized {
19    /// Run `callback` when given event type occurs.
20    fn when<E: FromWasmAbi + 'static, F: FnMut(E) + 'static>(
21        self: &Rc<Self>,
22        event_type: &'static str,
23        callback: F,
24    ) -> Result<EventListener<Self, E>, JsError>;
25}
26
27/// Trait for creating event streams.
28pub trait Stream: When {
29    /// Create stream of given event type.
30    fn listen<E: FromWasmAbi + 'static>(
31        self: &Rc<Self>,
32        event_type: &'static str,
33    ) -> Result<EventStream<Self, E>, JsError>;
34}
35
36/// Listener of events.
37///
38/// Drop to remove event listener.
39#[derive(Debug)]
40pub struct EventListener<T, E>
41where
42    T: AsRef<EventTarget>,
43{
44    event_type: &'static str,
45    target: Rc<T>,
46    closure: Closure<dyn FnMut(E)>,
47}
48
49impl<T, E> Drop for EventListener<T, E>
50where
51    T: AsRef<EventTarget>,
52{
53    fn drop(&mut self) {
54        let _ = self
55            .target
56            .as_ref()
57            .as_ref()
58            .remove_event_listener_with_callback(
59                self.event_type,
60                self.closure.as_ref().unchecked_ref(),
61            );
62    }
63}
64
65impl<T> When for T
66where
67    T: AsRef<EventTarget>,
68{
69    fn when<E: FromWasmAbi + 'static, F: FnMut(E) + 'static>(
70        self: &Rc<Self>,
71        event_type: &'static str,
72        callback: F,
73    ) -> Result<EventListener<Self, E>, JsError> {
74        let closure = closure!(callback);
75        self.as_ref()
76            .as_ref()
77            .add_event_listener_with_callback(event_type, closure.as_ref().unchecked_ref())?;
78        Ok(EventListener {
79            event_type,
80            target: self.clone(),
81            closure,
82        })
83    }
84}
85
86/// Stream of events.
87#[derive(Debug)]
88pub struct EventStream<T, E>
89where
90    T: When,
91{
92    state: Rc<RefCell<State<E>>>,
93    listener: Option<EventListener<T, E>>,
94}
95
96impl<T, E> EventStream<T, E>
97where
98    T: AsRef<EventTarget>,
99{
100    /// Stop listening to events.
101    ///
102    /// This means stream will terminate as soon as all received before events are consumed.
103    pub fn stop(&mut self) {
104        self.listener = None;
105        if let Some(waker) = &self.state.borrow().waker {
106            waker.wake_by_ref();
107        }
108    }
109}
110
111#[derive(Debug)]
112struct State<E> {
113    queue: VecDeque<E>,
114    waker: Option<Waker>,
115}
116
117impl<T, E> Unpin for EventStream<T, E> where T: AsRef<EventTarget> {}
118
119impl<T, E> futures::Stream for EventStream<T, E>
120where
121    T: AsRef<EventTarget>,
122{
123    type Item = E;
124
125    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
126        let mut state = self.state.borrow_mut();
127        if let Some(event) = state.queue.pop_front() {
128            Poll::Ready(Some(event))
129        } else if self.listener.is_none() {
130            Poll::Ready(None)
131        } else {
132            let new_waker = cx.waker();
133            if let Some(waker) = &mut state.waker {
134                if !waker.will_wake(new_waker) {
135                    state.waker = Some(new_waker.clone());
136                }
137            } else {
138                state.waker = Some(new_waker.clone());
139            }
140            Poll::Pending
141        }
142    }
143}
144
145impl<T, E> FusedStream for EventStream<T, E>
146where
147    T: AsRef<EventTarget>,
148{
149    fn is_terminated(&self) -> bool {
150        self.listener.is_none() && self.state.borrow().queue.is_empty()
151    }
152}
153
154impl<T> Stream for T
155where
156    T: When,
157{
158    fn listen<E: FromWasmAbi + 'static>(
159        self: &Rc<Self>,
160        event_type: &'static str,
161    ) -> Result<EventStream<Self, E>, JsError> {
162        let state = Rc::new(RefCell::new(State {
163            queue: VecDeque::new(),
164            waker: None,
165        }));
166        let state_clone = state.clone();
167        let listener = self.when(event_type, move |event| {
168            let mut state = state_clone.borrow_mut();
169            state.queue.push_back(event);
170            if let Some(waker) = &state.waker {
171                waker.wake_by_ref();
172            }
173        })?;
174        let event_stream = EventStream {
175            state,
176            listener: Some(listener),
177        };
178        Ok(event_stream)
179    }
180}
181
182#[cfg(test)]
183mod tests {
184    use std::{cell::Cell, rc::Rc, time::Duration};
185
186    use futures::StreamExt;
187    use wasm_bindgen_test::wasm_bindgen_test;
188    use web_sys::MouseEvent;
189
190    use crate::{
191        body,
192        event::{EventStream, Stream, When},
193        sleep, spawn,
194    };
195
196    #[wasm_bindgen_test]
197    async fn test_event_listener() {
198        let body = Rc::new(body());
199
200        let result = Rc::new(Cell::new(None));
201        let result_clone = result.clone();
202        let _listener = body
203            .when("click", move |_: MouseEvent| {
204                result_clone.set(Some("Done!"));
205            })
206            .unwrap();
207        body.click();
208        sleep(Duration::from_secs_f32(1.1)).await;
209
210        assert_eq!(result.take().unwrap(), "Done!");
211    }
212
213    #[wasm_bindgen_test]
214    async fn test_event_stream() {
215        let body = Rc::new(body());
216
217        let body_clone = body.clone();
218        let handle = spawn(async move {
219            let mut stream: EventStream<_, MouseEvent> = body_clone.listen("click").unwrap();
220            stream.next().await.unwrap();
221            stream.next().await.unwrap();
222            stream.stop();
223        });
224        sleep(Duration::from_secs_f32(0.1)).await;
225        body.click();
226        body.click();
227        let _ = handle.await;
228
229        let mut stream: EventStream<_, MouseEvent> = body.listen("click").unwrap();
230        body.click();
231        body.click();
232        body.click();
233        stream.stop();
234        body.click();
235        body.click();
236
237        let mut c = 0;
238        assert_eq!(
239            stream
240                .map(move |_: MouseEvent| {
241                    c += 1;
242                    c
243                })
244                .collect::<Vec<i32>>()
245                .await,
246            vec![1, 2, 3]
247        );
248    }
249}