event_source/
future.rs

1/*
2 * Created on Thu Sep 07 2023
3 *
4 * Copyright (c) storycraft. Licensed under the MIT Licence.
5 */
6
7use core::{
8    future::Future,
9    mem,
10    pin::Pin,
11    task::{Context, Poll, Waker},
12};
13
14use higher_kinded_types::ForLifetime;
15use sync_wrapper::SyncWrapper;
16use unique::Unique;
17
18use crate::{sealed::Sealed, types::Node, EventSource};
19
20pin_project_lite::pin_project!(
21    #[derive(Debug)]
22    #[must_use = "futures do nothing unless you `.await` or poll them"]
23    /// Future created with [`EventSource::on`]
24    pub struct EventFnFuture<'a, F, T: ForLifetime> {
25        source: &'a EventSource<T>,
26
27        #[pin]
28        listener: Sealed<F>,
29
30        #[pin]
31        node: Node<T>,
32    }
33
34    impl<F, T: ForLifetime> PinnedDrop for EventFnFuture<'_, F, T> {
35        fn drop(this: Pin<&mut Self>) {
36            let project = this.project();
37            let node = match project.node.initialized_mut() {
38                Some(initialized) => initialized,
39                None => return,
40            };
41
42            let _ = node.reset(&mut project.source.list.lock());
43        }
44    }
45);
46
47impl<'a, T: ForLifetime, F> EventFnFuture<'a, F, T> {
48    pub(super) const fn new(source: &'a EventSource<T>, listener: F) -> Self {
49        Self {
50            source,
51            listener: Sealed::new(listener),
52            node: pin_list::Node::new(),
53        }
54    }
55}
56
57impl<'a, T: ForLifetime, F: FnMut(T::Of<'_>, &mut ControlFlow) + Send> Future
58    for EventFnFuture<'a, F, T>
59{
60    type Output = ();
61
62    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
63        let mut this = self.project();
64
65        let mut list = this.source.list.lock();
66        let node = {
67            let initialized = match this.node.as_mut().initialized_mut() {
68                Some(initialized) => initialized,
69                None => list.push_back(
70                    this.node,
71                    ListenerItem::new(
72                        Unique::new(this.listener.get_ptr_mut().as_ptr() as _).unwrap(),
73                    ),
74                    (),
75                ),
76            };
77
78            initialized.protected_mut(&mut list).unwrap()
79        };
80
81        if node.done {
82            return Poll::Ready(());
83        }
84
85        node.update_waker(cx.waker());
86
87        Poll::Pending
88    }
89}
90
91type DynClosure<'closure, T> =
92    dyn for<'a, 'b> FnMut(<T as ForLifetime>::Of<'a>, &'b mut ControlFlow) + Send + 'closure;
93
94#[derive(Debug)]
95pub struct ListenerItem<T: ForLifetime> {
96    done: bool,
97    waker: Option<Waker>,
98    closure_ptr: SyncWrapper<Unique<DynClosure<'static, T>>>,
99}
100
101impl<T: ForLifetime> ListenerItem<T> {
102    fn new(closure: Unique<DynClosure<T>>) -> Self {
103        Self {
104            done: false,
105            waker: None,
106
107            // SAFETY: Extend lifetime and manage manually, see ListenerItem::poll for safety requirement
108            closure_ptr: SyncWrapper::new(unsafe { mem::transmute::<Unique<_>, Unique<_>>(closure) }),
109        }
110    }
111
112    fn update_waker(&mut self, waker: &Waker) {
113        match self.waker {
114            Some(ref waker) if waker.will_wake(waker) => (),
115
116            _ => {
117                self.waker = Some(waker.clone());
118            }
119        }
120    }
121
122    /// # Safety
123    /// Calling this method is only safe if pointer to closure is valid
124    pub unsafe fn poll(&mut self, event: T::Of<'_>) -> bool {
125        let mut flow = ControlFlow {
126            done: self.done,
127            propagation: true,
128        };
129
130        self.closure_ptr.get_mut().as_mut()(event, &mut flow);
131
132        if flow.done && !self.done {
133            self.done = true;
134
135            if let Some(waker) = self.waker.take() {
136                waker.wake();
137            }
138        }
139
140        flow.propagation
141    }
142}
143
144#[derive(Debug)]
145/// Control current listener's behaviour
146pub struct ControlFlow {
147    done: bool,
148    propagation: bool,
149}
150
151impl ControlFlow {
152    /// Stop propagation of the current event
153    pub fn stop_propagation(&mut self) {
154        if self.propagation {
155            self.propagation = false;
156        }
157    }
158
159    /// Check if listener is finished already
160    pub const fn done(&self) -> bool {
161        self.done
162    }
163
164    /// Mark listener as finished
165    pub fn set_done(&mut self) {
166        if !self.done {
167            self.done = true;
168        }
169    }
170}