1use core::{
8 future::Future,
9 mem,
10 pin::Pin,
11 task::{Context, Poll, Waker},
12};
13
14use higher_kinded_types::ForLifetime;
15use sync_wrapper::SyncWrapper;
16use unique::Unique;
17
18use crate::{sealed::Sealed, types::Node, EventSource};
19
20pin_project_lite::pin_project!(
21 #[derive(Debug)]
22 #[must_use = "futures do nothing unless you `.await` or poll them"]
23 pub struct EventFnFuture<'a, F, T: ForLifetime> {
25 source: &'a EventSource<T>,
26
27 #[pin]
28 listener: Sealed<F>,
29
30 #[pin]
31 node: Node<T>,
32 }
33
34 impl<F, T: ForLifetime> PinnedDrop for EventFnFuture<'_, F, T> {
35 fn drop(this: Pin<&mut Self>) {
36 let project = this.project();
37 let node = match project.node.initialized_mut() {
38 Some(initialized) => initialized,
39 None => return,
40 };
41
42 let _ = node.reset(&mut project.source.list.lock());
43 }
44 }
45);
46
47impl<'a, T: ForLifetime, F> EventFnFuture<'a, F, T> {
48 pub(super) const fn new(source: &'a EventSource<T>, listener: F) -> Self {
49 Self {
50 source,
51 listener: Sealed::new(listener),
52 node: pin_list::Node::new(),
53 }
54 }
55}
56
57impl<'a, T: ForLifetime, F: FnMut(T::Of<'_>, &mut ControlFlow) + Send> Future
58 for EventFnFuture<'a, F, T>
59{
60 type Output = ();
61
62 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
63 let mut this = self.project();
64
65 let mut list = this.source.list.lock();
66 let node = {
67 let initialized = match this.node.as_mut().initialized_mut() {
68 Some(initialized) => initialized,
69 None => list.push_back(
70 this.node,
71 ListenerItem::new(
72 Unique::new(this.listener.get_ptr_mut().as_ptr() as _).unwrap(),
73 ),
74 (),
75 ),
76 };
77
78 initialized.protected_mut(&mut list).unwrap()
79 };
80
81 if node.done {
82 return Poll::Ready(());
83 }
84
85 node.update_waker(cx.waker());
86
87 Poll::Pending
88 }
89}
90
91type DynClosure<'closure, T> =
92 dyn for<'a, 'b> FnMut(<T as ForLifetime>::Of<'a>, &'b mut ControlFlow) + Send + 'closure;
93
94#[derive(Debug)]
95pub struct ListenerItem<T: ForLifetime> {
96 done: bool,
97 waker: Option<Waker>,
98 closure_ptr: SyncWrapper<Unique<DynClosure<'static, T>>>,
99}
100
101impl<T: ForLifetime> ListenerItem<T> {
102 fn new(closure: Unique<DynClosure<T>>) -> Self {
103 Self {
104 done: false,
105 waker: None,
106
107 closure_ptr: SyncWrapper::new(unsafe { mem::transmute::<Unique<_>, Unique<_>>(closure) }),
109 }
110 }
111
112 fn update_waker(&mut self, waker: &Waker) {
113 match self.waker {
114 Some(ref waker) if waker.will_wake(waker) => (),
115
116 _ => {
117 self.waker = Some(waker.clone());
118 }
119 }
120 }
121
122 pub unsafe fn poll(&mut self, event: T::Of<'_>) -> bool {
125 let mut flow = ControlFlow {
126 done: self.done,
127 propagation: true,
128 };
129
130 self.closure_ptr.get_mut().as_mut()(event, &mut flow);
131
132 if flow.done && !self.done {
133 self.done = true;
134
135 if let Some(waker) = self.waker.take() {
136 waker.wake();
137 }
138 }
139
140 flow.propagation
141 }
142}
143
144#[derive(Debug)]
145pub struct ControlFlow {
147 done: bool,
148 propagation: bool,
149}
150
151impl ControlFlow {
152 pub fn stop_propagation(&mut self) {
154 if self.propagation {
155 self.propagation = false;
156 }
157 }
158
159 pub const fn done(&self) -> bool {
161 self.done
162 }
163
164 pub fn set_done(&mut self) {
166 if !self.done {
167 self.done = true;
168 }
169 }
170}