1use std::{
4 cell::RefCell,
5 collections::VecDeque,
6 pin::Pin,
7 rc::Rc,
8 task::{Context, Poll, Waker},
9};
10
11use futures::stream::FusedStream;
12use wasm_bindgen::{convert::FromWasmAbi, prelude::Closure, JsCast};
13use web_sys::EventTarget;
14
15use crate::{closure, JsError};
16
17pub trait When: AsRef<EventTarget> + Sized {
19 fn when<E: FromWasmAbi + 'static, F: FnMut(E) + 'static>(
21 self: &Rc<Self>,
22 event_type: &'static str,
23 callback: F,
24 ) -> Result<EventListener<Self, E>, JsError>;
25}
26
27pub trait Stream: When {
29 fn listen<E: FromWasmAbi + 'static>(
31 self: &Rc<Self>,
32 event_type: &'static str,
33 ) -> Result<EventStream<Self, E>, JsError>;
34}
35
36#[derive(Debug)]
40pub struct EventListener<T, E>
41where
42 T: AsRef<EventTarget>,
43{
44 event_type: &'static str,
45 target: Rc<T>,
46 closure: Closure<dyn FnMut(E)>,
47}
48
49impl<T, E> Drop for EventListener<T, E>
50where
51 T: AsRef<EventTarget>,
52{
53 fn drop(&mut self) {
54 let _ = self
55 .target
56 .as_ref()
57 .as_ref()
58 .remove_event_listener_with_callback(
59 self.event_type,
60 self.closure.as_ref().unchecked_ref(),
61 );
62 }
63}
64
65impl<T> When for T
66where
67 T: AsRef<EventTarget>,
68{
69 fn when<E: FromWasmAbi + 'static, F: FnMut(E) + 'static>(
70 self: &Rc<Self>,
71 event_type: &'static str,
72 callback: F,
73 ) -> Result<EventListener<Self, E>, JsError> {
74 let closure = closure!(callback);
75 self.as_ref()
76 .as_ref()
77 .add_event_listener_with_callback(event_type, closure.as_ref().unchecked_ref())?;
78 Ok(EventListener {
79 event_type,
80 target: self.clone(),
81 closure,
82 })
83 }
84}
85
86#[derive(Debug)]
88pub struct EventStream<T, E>
89where
90 T: When,
91{
92 state: Rc<RefCell<State<E>>>,
93 listener: Option<EventListener<T, E>>,
94}
95
96impl<T, E> EventStream<T, E>
97where
98 T: AsRef<EventTarget>,
99{
100 pub fn stop(&mut self) {
104 self.listener = None;
105 if let Some(waker) = &self.state.borrow().waker {
106 waker.wake_by_ref();
107 }
108 }
109}
110
111#[derive(Debug)]
112struct State<E> {
113 queue: VecDeque<E>,
114 waker: Option<Waker>,
115}
116
117impl<T, E> Unpin for EventStream<T, E> where T: AsRef<EventTarget> {}
118
119impl<T, E> futures::Stream for EventStream<T, E>
120where
121 T: AsRef<EventTarget>,
122{
123 type Item = E;
124
125 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
126 let mut state = self.state.borrow_mut();
127 if let Some(event) = state.queue.pop_front() {
128 Poll::Ready(Some(event))
129 } else if self.listener.is_none() {
130 Poll::Ready(None)
131 } else {
132 let new_waker = cx.waker();
133 if let Some(waker) = &mut state.waker {
134 if !waker.will_wake(new_waker) {
135 state.waker = Some(new_waker.clone());
136 }
137 } else {
138 state.waker = Some(new_waker.clone());
139 }
140 Poll::Pending
141 }
142 }
143}
144
145impl<T, E> FusedStream for EventStream<T, E>
146where
147 T: AsRef<EventTarget>,
148{
149 fn is_terminated(&self) -> bool {
150 self.listener.is_none() && self.state.borrow().queue.is_empty()
151 }
152}
153
154impl<T> Stream for T
155where
156 T: When,
157{
158 fn listen<E: FromWasmAbi + 'static>(
159 self: &Rc<Self>,
160 event_type: &'static str,
161 ) -> Result<EventStream<Self, E>, JsError> {
162 let state = Rc::new(RefCell::new(State {
163 queue: VecDeque::new(),
164 waker: None,
165 }));
166 let state_clone = state.clone();
167 let listener = self.when(event_type, move |event| {
168 let mut state = state_clone.borrow_mut();
169 state.queue.push_back(event);
170 if let Some(waker) = &state.waker {
171 waker.wake_by_ref();
172 }
173 })?;
174 let event_stream = EventStream {
175 state,
176 listener: Some(listener),
177 };
178 Ok(event_stream)
179 }
180}
181
182#[cfg(test)]
183mod tests {
184 use std::{cell::Cell, rc::Rc, time::Duration};
185
186 use futures::StreamExt;
187 use wasm_bindgen_test::wasm_bindgen_test;
188 use web_sys::MouseEvent;
189
190 use crate::{
191 body,
192 event::{EventStream, Stream, When},
193 sleep, spawn,
194 };
195
196 #[wasm_bindgen_test]
197 async fn test_event_listener() {
198 let body = Rc::new(body());
199
200 let result = Rc::new(Cell::new(None));
201 let result_clone = result.clone();
202 let _listener = body
203 .when("click", move |_: MouseEvent| {
204 result_clone.set(Some("Done!"));
205 })
206 .unwrap();
207 body.click();
208 sleep(Duration::from_secs_f32(1.1)).await;
209
210 assert_eq!(result.take().unwrap(), "Done!");
211 }
212
213 #[wasm_bindgen_test]
214 async fn test_event_stream() {
215 let body = Rc::new(body());
216
217 let body_clone = body.clone();
218 let handle = spawn(async move {
219 let mut stream: EventStream<_, MouseEvent> = body_clone.listen("click").unwrap();
220 stream.next().await.unwrap();
221 stream.next().await.unwrap();
222 stream.stop();
223 });
224 sleep(Duration::from_secs_f32(0.1)).await;
225 body.click();
226 body.click();
227 let _ = handle.await;
228
229 let mut stream: EventStream<_, MouseEvent> = body.listen("click").unwrap();
230 body.click();
231 body.click();
232 body.click();
233 stream.stop();
234 body.click();
235 body.click();
236
237 let mut c = 0;
238 assert_eq!(
239 stream
240 .map(move |_: MouseEvent| {
241 c += 1;
242 c
243 })
244 .collect::<Vec<i32>>()
245 .await,
246 vec![1, 2, 3]
247 );
248 }
249}