1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
use super::*;

use core::task::Poll;
use futures_util::poll;

#[derive(Debug)]
struct MustJoinSingleFutureInner<T>
where
    T: 'static,
{
    locked: bool,
    join_handle: Option<MustJoinHandle<T>>,
}

/// Spawns a single background processing task idempotently, possibly returning the return value of the previously executed background task
/// This does not queue, just ensures that no more than a single copy of the task is running at a time, but allowing tasks to be retriggered
#[derive(Debug, Clone)]
pub struct MustJoinSingleFuture<T>
where
    T: 'static,
{
    inner: Arc<Mutex<MustJoinSingleFutureInner<T>>>,
}

impl<T> Default for MustJoinSingleFuture<T>
where
    T: 'static,
{
    fn default() -> Self {
        Self::new()
    }
}

impl<T> MustJoinSingleFuture<T>
where
    T: 'static,
{
    pub fn new() -> Self {
        Self {
            inner: Arc::new(Mutex::new(MustJoinSingleFutureInner {
                locked: false,
                join_handle: None,
            })),
        }
    }

    fn try_lock(&self) -> Result<Option<MustJoinHandle<T>>, ()> {
        let mut inner = self.inner.lock();
        if inner.locked {
            // If already locked error out
            return Err(());
        }
        inner.locked = true;
        // If we got the lock, return what we have for a join handle if anything
        Ok(inner.join_handle.take())
    }

    fn unlock(&self, jh: Option<MustJoinHandle<T>>) {
        let mut inner = self.inner.lock();
        assert!(inner.locked);
        assert!(inner.join_handle.is_none());
        inner.locked = false;
        inner.join_handle = jh;
    }

    /// Check the result and take it if there is one
    pub async fn check(&self) -> Result<Option<T>, ()> {
        let mut out: Option<T> = None;

        // See if we have a result we can return
        let maybe_jh = match self.try_lock() {
            Ok(v) => v,
            Err(_) => {
                // If we are already polling somewhere else, don't hand back a result
                return Err(());
            }
        };
        if maybe_jh.is_some() {
            let mut jh = maybe_jh.unwrap();

            // See if we finished, if so, return the value of the last execution
            if let Poll::Ready(r) = poll!(&mut jh) {
                out = Some(r);
                // Task finished, unlock with nothing
                self.unlock(None);
            } else {
                // Still running put the join handle back so we can check on it later
                self.unlock(Some(jh));
            }
        } else {
            // No task, unlock with nothing
            self.unlock(None);
        }

        // Return the prior result if we have one
        Ok(out)
    }

    /// Wait for the result and take it
    pub async fn join(&self) -> Result<Option<T>, ()> {
        let mut out: Option<T> = None;

        // See if we have a result we can return
        let maybe_jh = match self.try_lock() {
            Ok(v) => v,
            Err(_) => {
                // If we are already polling somewhere else,
                // that's an error because you can only join
                // these things once
                return Err(());
            }
        };
        if maybe_jh.is_some() {
            let jh = maybe_jh.unwrap();
            // Wait for return value of the last execution
            out = Some(jh.await);
            // Task finished, unlock with nothing
        } else {
            // No task, unlock with nothing
        }
        self.unlock(None);

        // Return the prior result if we have one
        Ok(out)
    }

    // Possibly spawn the future possibly returning the value of the last execution
    pub async fn single_spawn_local(
        &self,
        future: impl Future<Output = T> + 'static,
    ) -> Result<(Option<T>, bool), ()> {
        let mut out: Option<T> = None;

        // See if we have a result we can return
        let maybe_jh = match self.try_lock() {
            Ok(v) => v,
            Err(_) => {
                // If we are already polling somewhere else, don't hand back a result
                return Err(());
            }
        };
        let mut run = true;

        if maybe_jh.is_some() {
            let mut jh = maybe_jh.unwrap();

            // See if we finished, if so, return the value of the last execution
            if let Poll::Ready(r) = poll!(&mut jh) {
                out = Some(r);
                // Task finished, unlock with a new task
            } else {
                // Still running, don't run again, unlock with the current join handle
                run = false;
                self.unlock(Some(jh));
            }
        }

        // Run if we should do that
        if run {
            self.unlock(Some(spawn_local(future)));
        }

        // Return the prior result if we have one
        Ok((out, run))
    }
}

impl<T> MustJoinSingleFuture<T>
where
    T: 'static + Send,
{
    pub async fn single_spawn(
        &self,
        future: impl Future<Output = T> + Send + 'static,
    ) -> Result<(Option<T>, bool), ()> {
        let mut out: Option<T> = None;
        // See if we have a result we can return
        let maybe_jh = match self.try_lock() {
            Ok(v) => v,
            Err(_) => {
                // If we are already polling somewhere else, don't hand back a result
                return Err(());
            }
        };
        let mut run = true;
        if maybe_jh.is_some() {
            let mut jh = maybe_jh.unwrap();
            // See if we finished, if so, return the value of the last execution
            if let Poll::Ready(r) = poll!(&mut jh) {
                out = Some(r);
                // Task finished, unlock with a new task
            } else {
                // Still running, don't run again, unlock with the current join handle
                run = false;
                self.unlock(Some(jh));
            }
        }
        // Run if we should do that
        if run {
            self.unlock(Some(spawn(future)));
        }
        // Return the prior result if we have one
        Ok((out, run))
    }
}