1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
/*
 * Created on Thu Sep 07 2023
 *
 * Copyright (c) storycraft. Licensed under the MIT Licence.
 */

use core::{
    future::Future,
    mem,
    pin::Pin,
    task::{Context, Poll, Waker},
};

use higher_kinded_types::ForLifetime;
use unique::Unique;

use crate::{sealed::Sealed, types::Node, EventSource};

pin_project_lite::pin_project!(
    #[derive(Debug)]
    #[must_use = "futures do nothing unless you `.await` or poll them"]
    /// Future created with [`EventSource::on`]
    pub struct EventFnFuture<'a, F, T: ForLifetime> {
        source: &'a EventSource<T>,

        #[pin]
        listener: Sealed<F>,

        #[pin]
        node: Node<T>,
    }

    impl<F, T: ForLifetime> PinnedDrop for EventFnFuture<'_, F, T> {
        fn drop(this: Pin<&mut Self>) {
            let project = this.project();
            let node = match project.node.initialized_mut() {
                Some(initialized) => initialized,
                None => return,
            };

            let _ = node.reset(&mut project.source.list.lock());
        }
    }
);

impl<'a, T: ForLifetime, F> EventFnFuture<'a, F, T> {
    pub(super) const fn new(source: &'a EventSource<T>, listener: F) -> Self {
        Self {
            source,
            listener: Sealed::new(listener),
            node: pin_list::Node::new(),
        }
    }
}

impl<'a, T: ForLifetime, F: FnMut(T::Of<'_>, &mut ControlFlow) + Send + Sync> Future
    for EventFnFuture<'a, F, T>
{
    type Output = ();

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let mut this = self.project();

        let mut list = this.source.list.lock();
        let node = {
            let initialized = match this.node.as_mut().initialized_mut() {
                Some(initialized) => initialized,
                None => list.push_back(
                    this.node,
                    ListenerItem::new(
                        Unique::new(this.listener.get_ptr_mut().as_ptr() as _).unwrap(),
                    ),
                    (),
                ),
            };

            initialized.protected_mut(&mut list).unwrap()
        };

        if node.done {
            return Poll::Ready(());
        }

        node.update_waker(cx.waker());

        Poll::Pending
    }
}

type DynClosure<'closure, T> =
    dyn for<'a, 'b> FnMut(<T as ForLifetime>::Of<'a>, &'b mut ControlFlow) + Send + Sync + 'closure;

#[derive(Debug)]
pub struct ListenerItem<T: ForLifetime> {
    done: bool,
    waker: Option<Waker>,
    closure_ptr: Unique<DynClosure<'static, T>>,
}

impl<T: ForLifetime> ListenerItem<T> {
    fn new(closure: Unique<DynClosure<T>>) -> Self {
        Self {
            done: false,
            waker: None,

            // SAFETY: Extend lifetime and manage manually, see ListenerItem::poll for safety requirement
            closure_ptr: unsafe { mem::transmute::<Unique<_>, Unique<_>>(closure) },
        }
    }

    fn update_waker(&mut self, waker: &Waker) {
        match self.waker {
            Some(ref waker) if waker.will_wake(waker) => (),

            _ => {
                self.waker = Some(waker.clone());
            }
        }
    }

    /// # Safety
    /// Calling this method is only safe if pointer to closure is valid
    pub unsafe fn poll(&mut self, event: T::Of<'_>) -> bool {
        let mut flow = ControlFlow {
            done: self.done,
            propagation: true,
        };

        self.closure_ptr.as_mut()(event, &mut flow);

        if flow.done && !self.done {
            self.done = true;

            if let Some(waker) = self.waker.take() {
                waker.wake();
            }
        }

        flow.propagation
    }
}

#[derive(Debug)]
/// Control current listener's behaviour
pub struct ControlFlow {
    done: bool,
    propagation: bool,
}

impl ControlFlow {
    /// Stop propagation of the current event
    pub fn stop_propagation(&mut self) {
        if self.propagation {
            self.propagation = false;
        }
    }

    /// Check if listener is finished already
    pub const fn done(&self) -> bool {
        self.done
    }

    /// Mark listener as finished
    pub fn set_done(&mut self) {
        if !self.done {
            self.done = true;
        }
    }
}