1use std::{
4 cell::RefCell,
5 collections::VecDeque,
6 pin::Pin,
7 rc::Rc,
8 task::{Context, Poll, Waker},
9};
10
11use futures::stream::FusedStream;
12use wasm_bindgen::{convert::FromWasmAbi, prelude::Closure, JsCast};
13use web_sys::EventTarget;
14
15use crate::{closure, JsError};
16
17pub trait When: AsRef<EventTarget> + Sized {
19 fn when<E: FromWasmAbi + 'static, F: FnMut(E) + 'static>(
21 self: &Self,
22 event_type: &'static str,
23 callback: F,
24 ) -> Result<EventListener<Self, E>, JsError>;
25}
26
27pub trait Stream: When {
29 fn listen<E: FromWasmAbi + 'static>(
31 self: &Self,
32 event_type: &'static str,
33 ) -> Result<EventStream<Self, E>, JsError>;
34}
35
36#[derive(Debug)]
40pub struct EventListener<T, E>
41where
42 T: AsRef<EventTarget>,
43{
44 event_type: &'static str,
45 target: T,
46 closure: Closure<dyn FnMut(E)>,
47}
48
49impl<T, E> Drop for EventListener<T, E>
50where
51 T: AsRef<EventTarget>,
52{
53 fn drop(&mut self) {
54 let _ = self
55 .target
56 .as_ref()
57 .remove_event_listener_with_callback(
58 self.event_type,
59 self.closure.as_ref().unchecked_ref(),
60 );
61 }
62}
63
64impl<T> When for T
65where
66 T: AsRef<EventTarget> + Clone,
67{
68 fn when<E: FromWasmAbi + 'static, F: FnMut(E) + 'static>(
69 self: &Self,
70 event_type: &'static str,
71 callback: F,
72 ) -> Result<EventListener<Self, E>, JsError> {
73 let closure = closure!(callback);
74 self.as_ref()
75 .add_event_listener_with_callback(event_type, closure.as_ref().unchecked_ref())?;
76 Ok(EventListener {
77 event_type,
78 target: self.clone(),
79 closure,
80 })
81 }
82}
83
84#[derive(Debug)]
86pub struct EventStream<T, E>
87where
88 T: When,
89{
90 state: Rc<RefCell<State<E>>>,
91 listener: Option<EventListener<T, E>>,
92}
93
94impl<T, E> EventStream<T, E>
95where
96 T: AsRef<EventTarget> + Clone,
97{
98 pub fn stop(&mut self) {
102 self.listener = None;
103 if let Some(waker) = &self.state.borrow().waker {
104 waker.wake_by_ref();
105 }
106 }
107}
108
109#[derive(Debug)]
110struct State<E> {
111 queue: VecDeque<E>,
112 waker: Option<Waker>,
113}
114
115impl<T, E> Unpin for EventStream<T, E> where T: AsRef<EventTarget> + Clone {}
116
117impl<T, E> futures::Stream for EventStream<T, E>
118where
119 T: AsRef<EventTarget> + Clone,
120{
121 type Item = E;
122
123 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
124 let mut state = self.state.borrow_mut();
125 if let Some(event) = state.queue.pop_front() {
126 Poll::Ready(Some(event))
127 } else if self.listener.is_none() {
128 Poll::Ready(None)
129 } else {
130 let new_waker = cx.waker();
131 if let Some(waker) = &mut state.waker {
132 if !waker.will_wake(new_waker) {
133 state.waker = Some(new_waker.clone());
134 }
135 } else {
136 state.waker = Some(new_waker.clone());
137 }
138 Poll::Pending
139 }
140 }
141}
142
143impl<T, E> FusedStream for EventStream<T, E>
144where
145 T: AsRef<EventTarget> + Clone,
146{
147 fn is_terminated(&self) -> bool {
148 self.listener.is_none() && self.state.borrow().queue.is_empty()
149 }
150}
151
152impl<T> Stream for T
153where
154 T: When,
155{
156 fn listen<E: FromWasmAbi + 'static>(
157 self: &Self,
158 event_type: &'static str,
159 ) -> Result<EventStream<Self, E>, JsError> {
160 let state = Rc::new(RefCell::new(State {
161 queue: VecDeque::new(),
162 waker: None,
163 }));
164 let state_clone = state.clone();
165 let listener = self.when(event_type, move |event| {
166 let mut state = state_clone.borrow_mut();
167 state.queue.push_back(event);
168 if let Some(waker) = &state.waker {
169 waker.wake_by_ref();
170 }
171 })?;
172 let event_stream = EventStream {
173 state,
174 listener: Some(listener),
175 };
176 Ok(event_stream)
177 }
178}
179
180#[cfg(test)]
181mod tests {
182 use std::{cell::Cell, rc::Rc, time::Duration};
183
184 use futures::StreamExt;
185 use wasm_bindgen_test::wasm_bindgen_test;
186 use web_sys::MouseEvent;
187
188 use crate::{
189 body,
190 event::{EventStream, Stream, When},
191 sleep, spawn,
192 };
193
194 #[wasm_bindgen_test]
195 async fn test_event_listener() {
196 let body = body();
197
198 let result = Rc::new(Cell::new(None));
199 let result_clone = result.clone();
200 let _listener = body
201 .when("click", move |_: MouseEvent| {
202 result_clone.set(Some("Done!"));
203 })
204 .unwrap();
205 body.click();
206 sleep(Duration::from_secs_f32(1.1)).await;
207
208 assert_eq!(result.take().unwrap(), "Done!");
209 }
210
211 #[wasm_bindgen_test]
212 async fn test_event_stream() {
213 let body = body();
214
215 let body_clone = body.clone();
216 let handle = spawn(async move {
217 let mut stream: EventStream<_, MouseEvent> = body_clone.listen("click").unwrap();
218 stream.next().await.unwrap();
219 stream.next().await.unwrap();
220 stream.stop();
221 });
222 sleep(Duration::from_secs_f32(0.1)).await;
223 body.click();
224 body.click();
225 let _ = handle.await;
226
227 let mut stream: EventStream<_, MouseEvent> = body.listen("click").unwrap();
228 body.click();
229 body.click();
230 body.click();
231 stream.stop();
232 body.click();
233 body.click();
234
235 let mut c = 0;
236 assert_eq!(
237 stream
238 .map(move |_: MouseEvent| {
239 c += 1;
240 c
241 })
242 .collect::<Vec<i32>>()
243 .await,
244 vec![1, 2, 3]
245 );
246 }
247}