1use std::cell::Cell;
22use std::future::{Future, IntoFuture};
23use std::mem;
24use std::ops::{Deref, DerefMut};
25use std::pin::Pin;
26use std::task::{Context, Poll, Waker};
27
28use futures_lite::{future, Stream};
29use slab::Slab;
30
31use crate::sync::{MutexGuard, ThreadSafety, __private::*};
32
33pub struct Handler<T: Event, TS: ThreadSafety> {
54 state: TS::OnceLock<Box<TS::Mutex<State<T>>>>,
61}
62
63struct State<T: Event> {
64 listeners: Slab<Listener>,
68
69 directs: Vec<DirectListener<T>>,
71
72 head_and_tail: Option<(usize, usize)>,
74
75 waker: Option<Waker>,
77
78 instance: Option<T::Clonable>,
80}
81
82type DirectListener<T> =
83 Box<dyn FnMut(&mut <T as Event>::Unique<'_>) -> DirectFuture + Send + 'static>;
84type DirectFuture = Pin<Box<dyn Future<Output = bool> + Send + 'static>>;
85
86impl<T: Event, TS: ThreadSafety> Handler<T, TS> {
87 pub(crate) fn new() -> Self {
88 Self {
89 state: TS::OnceLock::new(),
90 }
91 }
92
93 pub(crate) async fn run_with(&self, event: &mut T::Unique<'_>) {
94 let state = match self.state.get() {
96 Some(state) => state,
97 None => return,
98 };
99
100 let mut state_lock = Some(state.lock().unwrap());
102 if self.run_direct_listeners(&mut state_lock, event).await {
103 return;
104 }
105
106 {
108 let state = state_lock.get_or_insert_with(|| state.lock().unwrap());
109
110 let head = match state.head_and_tail {
112 Some((head, _)) => head,
113 None => return,
114 };
115
116 state.instance = Some(T::downgrade(event));
118
119 if let Some(waker) = state.notify(head) {
121 waker.wake();
122 }
123 }
124
125 future::poll_fn(|cx| {
127 let mut state = state_lock.take().unwrap_or_else(|| state.lock().unwrap());
128
129 if state.head_and_tail.is_none() {
131 return Poll::Ready(());
132 }
133
134 if state.instance.is_none() {
136 return Poll::Ready(());
137 }
138
139 if let Some(waker) = &state.waker {
141 if waker.will_wake(cx.waker()) {
142 return Poll::Pending;
143 }
144 }
145
146 state.waker = Some(cx.waker().clone());
148 Poll::Pending
149 })
150 .await
151 }
152
153 async fn run_direct_listeners(
154 &self,
155 state: &mut Option<MutexGuard<'_, State<T>, TS>>,
156 event: &mut T::Unique<'_>,
157 ) -> bool {
158 struct RestoreDirects<'a, T: Event, TS: ThreadSafety> {
160 state: &'a Handler<T, TS>,
161 directs: Vec<DirectListener<T>>,
162 }
163
164 impl<T: Event, TS: ThreadSafety> Drop for RestoreDirects<'_, T, TS> {
165 fn drop(&mut self) {
166 let mut directs = mem::take(&mut self.directs);
167 self.state
168 .state()
169 .lock()
170 .unwrap()
171 .directs
172 .append(&mut directs);
173 }
174 }
175
176 let state_ref = state.as_mut().unwrap();
178 if state_ref.directs.is_empty() {
179 return false;
180 }
181
182 let mut directs = RestoreDirects {
184 directs: mem::take(&mut state_ref.directs),
185 state: self,
186 };
187
188 *state = None;
190
191 for direct in &mut directs.directs {
193 if direct(event).await {
194 return true;
195 }
196 }
197
198 false
199 }
200
201 pub fn wait(&self) -> Waiter<'_, T, TS> {
203 Waiter::new(self)
204 }
205
206 pub fn wait_direct_async<
208 Fut: Future<Output = bool> + Send + 'static,
209 F: FnMut(&mut T::Unique<'_>) -> Fut + Send + 'static,
210 >(
211 &self,
212 mut f: F,
213 ) {
214 let mut state = self.state().lock().unwrap();
215 state.directs.push(Box::new(move |u| Box::pin(f(u))))
216 }
217
218 pub fn wait_direct(&self, mut f: impl FnMut(&mut T::Unique<'_>) -> bool + Send + 'static) {
220 self.wait_direct_async(move |u| std::future::ready(f(u)))
221 }
222
223 fn state(&self) -> &TS::Mutex<State<T>> {
225 self.state
226 .get_or_init(|| Box::new(TS::Mutex::new(State::new())))
227 }
228}
229
230impl<T: Event, TS: ThreadSafety> Unpin for Handler<T, TS> {}
231
232impl<'a, T: Event, TS: ThreadSafety> IntoFuture for &'a Handler<T, TS> {
233 type IntoFuture = Waiter<'a, T, TS>;
234 type Output = T::Clonable;
235
236 fn into_future(self) -> Self::IntoFuture {
237 self.wait()
238 }
239}
240
241pub struct Waiter<'a, T: Event, TS: ThreadSafety> {
243 handler: &'a Handler<T, TS>,
245
246 index: usize,
248}
249
250impl<T: Event, TS: ThreadSafety> Unpin for Waiter<'_, T, TS> {}
251
252impl<'a, T: Event, TS: ThreadSafety> Waiter<'a, T, TS> {
253 pub(crate) fn new(handler: &'a Handler<T, TS>) -> Self {
255 let state = handler.state();
257
258 let index = state.lock().unwrap().insert();
260 Self { handler, index }
261 }
262
263 fn notify_next(&mut self, mut state: MutexGuard<'_, State<T>, TS>) {
264 if let Some(next) = state.listeners[self.index].next.get() {
265 if let Some(waker) = state.notify(next) {
267 waker.wake();
268 }
269 } else {
270 state.instance = None;
272 if let Some(waker) = state.waker.take() {
273 waker.wake();
274 }
275 }
276 }
277
278 pub async fn hold(&mut self) -> HoldGuard<'_, 'a, T, TS> {
280 let event = future::poll_fn(|cx| {
282 let mut state = self.handler.state().lock().unwrap();
283
284 if state.take_notification(self.index) {
286 let event = match state.instance.clone() {
287 Some(event) => event,
288 None => return Poll::Pending,
289 };
290
291 return Poll::Ready(event);
293 }
294
295 state.register_waker(self.index, cx.waker());
297 Poll::Pending
298 })
299 .await;
300
301 HoldGuard {
302 waiter: self,
303 event: Some(event),
304 }
305 }
306}
307
308impl<T: Event, TS: ThreadSafety> Future for Waiter<'_, T, TS> {
309 type Output = T::Clonable;
310
311 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
312 match self.poll_next(cx) {
313 Poll::Ready(Some(event)) => Poll::Ready(event),
314 Poll::Ready(None) => panic!("event handler was dropped"),
315 Poll::Pending => Poll::Pending,
316 }
317 }
318}
319
320impl<T: Event, TS: ThreadSafety> Stream for Waiter<'_, T, TS> {
321 type Item = T::Clonable;
322
323 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
324 let mut state = self.handler.state.get().unwrap().lock().unwrap();
325
326 if state.take_notification(self.index) {
328 let event = match state.instance.clone() {
329 Some(event) => event,
330 None => return Poll::Pending,
331 };
332
333 self.notify_next(state);
335
336 return Poll::Ready(Some(event));
338 }
339
340 state.register_waker(self.index, cx.waker());
342
343 Poll::Pending
344 }
345
346 fn size_hint(&self) -> (usize, Option<usize>) {
347 (usize::MAX, None)
348 }
349}
350
351impl<'a, T: Event, TS: ThreadSafety> Drop for Waiter<'a, T, TS> {
352 fn drop(&mut self) {
353 let mut state = self.handler.state().lock().unwrap();
354
355 let listener = state.remove(self.index);
357
358 if listener.notified.get() {
360 self.notify_next(state);
361 }
362 }
363}
364
365pub struct HoldGuard<'waiter, 'handler, T: Event, TS: ThreadSafety> {
367 waiter: &'waiter mut Waiter<'handler, T, TS>,
369
370 event: Option<T::Clonable>,
372}
373
374impl<T: Event, TS: ThreadSafety> Deref for HoldGuard<'_, '_, T, TS> {
375 type Target = T::Clonable;
376
377 fn deref(&self) -> &Self::Target {
378 self.event.as_ref().unwrap()
379 }
380}
381
382impl<T: Event, TS: ThreadSafety> DerefMut for HoldGuard<'_, '_, T, TS> {
383 fn deref_mut(&mut self) -> &mut Self::Target {
384 self.event.as_mut().unwrap()
385 }
386}
387
388impl<T: Event, TS: ThreadSafety> HoldGuard<'_, '_, T, TS> {
389 pub fn into_inner(mut self) -> T::Clonable {
391 self.event.take().unwrap()
392 }
393}
394
395impl<T: Event, TS: ThreadSafety> Drop for HoldGuard<'_, '_, T, TS> {
396 fn drop(&mut self) {
397 self.waiter
399 .notify_next(self.waiter.handler.state().lock().unwrap());
400 }
401}
402
403impl<T: Event> State<T> {
404 fn new() -> Self {
406 Self {
407 listeners: Slab::new(),
408 directs: Vec::new(),
409 head_and_tail: None,
410 waker: None,
411 instance: None,
412 }
413 }
414
415 fn insert(&mut self) -> usize {
417 let listener = Listener {
419 next: Cell::new(None),
420 prev: Cell::new(self.head_and_tail.map(|(_, tail)| tail)),
421 waker: Cell::new(None),
422 notified: Cell::new(false),
423 };
424
425 let index = self.listeners.insert(listener);
427
428 match &mut self.head_and_tail {
430 Some((_head, tail)) => {
431 self.listeners[*tail].next.set(Some(index));
432 *tail = index;
433 }
434
435 None => {
436 self.head_and_tail = Some((index, index));
437 }
438 }
439
440 index
441 }
442
443 fn remove(&mut self, index: usize) -> Listener {
445 let listener = self.listeners.remove(index);
447
448 match &mut self.head_and_tail {
450 Some((head, tail)) => {
451 if *head == index && *tail == index {
452 self.head_and_tail = None;
453 } else if *head == index {
454 self.head_and_tail = Some((listener.next.get().unwrap(), *tail));
455 } else if *tail == index {
456 self.head_and_tail = Some((*head, listener.prev.get().unwrap()));
457 }
458 }
459
460 None => panic!("invalid listener list: head and tail are both None"),
461 }
462
463 if let Some(next) = listener.next.get() {
465 self.listeners[next].prev.set(listener.prev.get());
466 }
467
468 if let Some(prev) = listener.prev.get() {
469 self.listeners[prev].next.set(listener.next.get());
470 }
471
472 listener
473 }
474
475 fn take_notification(&mut self, index: usize) -> bool {
477 self.listeners[index].notified.replace(false)
478 }
479
480 fn register_waker(&mut self, index: usize, waker: &Waker) {
482 let listener = &mut self.listeners[index];
483
484 let current_waker = listener.waker.take();
486 match current_waker {
487 Some(current_waker) if current_waker.will_wake(waker) => {
488 listener.waker.replace(Some(current_waker));
489 }
490 _ => {
491 listener.waker.replace(Some(waker.clone()));
492 }
493 }
494 }
495
496 fn notify(&mut self, index: usize) -> Option<Waker> {
498 if self.listeners[index].notified.replace(true) {
500 return None;
501 }
502
503 self.listeners[index].waker.replace(None)
505 }
506}
507
508struct Listener {
510 next: Cell<Option<usize>>,
512
513 prev: Cell<Option<usize>>,
515
516 waker: Cell<Option<Waker>>,
518
519 notified: Cell<bool>,
521}
522
523pub trait Event {
525 type Clonable: Clone + 'static;
526 type Unique<'a>: 'a;
527
528 fn downgrade(unique: &mut Self::Unique<'_>) -> Self::Clonable;
529}
530
531impl<T: Clone + 'static> Event for T {
532 type Clonable = T;
533 type Unique<'a> = T;
534
535 fn downgrade(unique: &mut Self::Unique<'_>) -> Self::Clonable {
536 unique.clone()
537 }
538}
539
540struct CallOnDrop<F: FnMut()>(F);
541
542impl<F: FnMut()> Drop for CallOnDrop<F> {
543 fn drop(&mut self) {
544 (self.0)();
545 }
546}