Skip to main content

veilid_tools/
must_join_single_future.rs

1use super::*;
2
3use core::task::Poll;
4use futures_util::poll;
5
6#[derive(Debug)]
7struct MustJoinSingleFutureInner<T>
8where
9    T: 'static,
10{
11    locked: bool,
12    join_handle: Option<MustJoinHandle<T>>,
13}
14
15/// Spawns a single background processing task idempotently, possibly returning the return value of the previously executed background task
16/// This does not queue, just ensures that no more than a single copy of the task is running at a time, but allowing tasks to be retriggered
17#[derive(Debug, Clone)]
18pub struct MustJoinSingleFuture<T>
19where
20    T: 'static,
21{
22    inner: Arc<Mutex<MustJoinSingleFutureInner<T>>>,
23}
24
25impl<T> Default for MustJoinSingleFuture<T>
26where
27    T: 'static,
28{
29    fn default() -> Self {
30        Self::new()
31    }
32}
33
34impl<T> MustJoinSingleFuture<T>
35where
36    T: 'static,
37{
38    #[must_use]
39    pub fn new() -> Self {
40        Self {
41            inner: Arc::new(Mutex::new(MustJoinSingleFutureInner {
42                locked: false,
43                join_handle: None,
44            })),
45        }
46    }
47
48    fn try_lock(&self) -> Result<Option<MustJoinHandle<T>>, ()> {
49        let mut inner = self.inner.lock();
50        if inner.locked {
51            // If already locked error out
52            return Err(());
53        }
54        inner.locked = true;
55        // If we got the lock, return what we have for a join handle if anything
56        Ok(inner.join_handle.take())
57    }
58
59    fn unlock(&self, jh: Option<MustJoinHandle<T>>) {
60        let mut inner = self.inner.lock();
61        assert!(inner.locked);
62        assert!(inner.join_handle.is_none());
63        inner.locked = false;
64        inner.join_handle = jh;
65    }
66
67    /// Check the result and take it if there is one
68    pub async fn check(&self) -> Result<Option<T>, ()> {
69        let mut out: Option<T> = None;
70
71        // See if we have a result we can return
72        let maybe_jh = match self.try_lock() {
73            Ok(v) => v,
74            Err(_) => {
75                // If we are already polling somewhere else, don't hand back a result
76                return Err(());
77            }
78        };
79        if let Some(mut jh) = maybe_jh {
80            // See if we finished, if so, return the value of the last execution
81            if let Poll::Ready(r) = poll!(&mut jh) {
82                out = Some(r);
83                // Task finished, unlock with nothing
84                self.unlock(None);
85            } else {
86                // Still running put the join handle back so we can check on it later
87                self.unlock(Some(jh));
88            }
89        } else {
90            // No task, unlock with nothing
91            self.unlock(None);
92        }
93
94        // Return the prior result if we have one
95        Ok(out)
96    }
97
98    /// Wait for the result and take it
99    pub async fn join(&self) -> Result<Option<T>, ()> {
100        let mut out: Option<T> = None;
101
102        // See if we have a result we can return
103        let maybe_jh = match self.try_lock() {
104            Ok(v) => v,
105            Err(_) => {
106                // If we are already polling somewhere else,
107                // that's an error because you can only join
108                // these things once
109                return Err(());
110            }
111        };
112        if let Some(jh) = maybe_jh {
113            // Wait for return value of the last execution
114            out = Some(jh.await);
115            // Task finished, unlock with nothing
116        } else {
117            // No task, unlock with nothing
118        }
119        self.unlock(None);
120
121        // Return the prior result if we have one
122        Ok(out)
123    }
124
125    // Possibly spawn the future possibly returning the value of the last execution
126    pub async fn single_spawn_local(
127        &self,
128        name: &str,
129        future: impl Future<Output = T> + 'static,
130    ) -> Result<(Option<T>, bool), ()> {
131        let mut out: Option<T> = None;
132
133        // See if we have a result we can return
134        let maybe_jh = match self.try_lock() {
135            Ok(v) => v,
136            Err(_) => {
137                // If we are already polling somewhere else, don't hand back a result
138                return Err(());
139            }
140        };
141        let mut run = true;
142
143        if let Some(mut jh) = maybe_jh {
144            // See if we finished, if so, return the value of the last execution
145            if let Poll::Ready(r) = poll!(&mut jh) {
146                out = Some(r);
147                // Task finished, unlock with a new task
148            } else {
149                // Still running, don't run again, unlock with the current join handle
150                run = false;
151                self.unlock(Some(jh));
152            }
153        }
154
155        // Run if we should do that
156        if run {
157            self.unlock(Some(spawn_local(name, future)));
158        }
159
160        // Return the prior result if we have one
161        Ok((out, run))
162    }
163}
164
165impl<T> MustJoinSingleFuture<T>
166where
167    T: 'static + Send,
168{
169    pub async fn single_spawn<F: Future<Output = T> + Send + 'static, C: FnOnce() -> F>(
170        &self,
171        name: &str,
172        future_callback: C,
173    ) -> Result<(Option<T>, bool), ()> {
174        let mut out: Option<T> = None;
175        // See if we have a result we can return
176        let maybe_jh = match self.try_lock() {
177            Ok(v) => v,
178            Err(_) => {
179                // If we are already polling somewhere else, don't hand back a result
180                return Err(());
181            }
182        };
183        let mut run = true;
184        if let Some(mut jh) = maybe_jh {
185            // See if we finished, if so, return the value of the last execution
186            if let Poll::Ready(r) = poll!(&mut jh) {
187                out = Some(r);
188                // Task finished, unlock with a new task
189            } else {
190                // Still running, don't run again, unlock with the current join handle
191                run = false;
192                self.unlock(Some(jh));
193            }
194        }
195        // Run if we should do that
196        if run {
197            self.unlock(Some(spawn(name, future_callback())));
198        }
199        // Return the prior result if we have one
200        Ok((out, run))
201    }
202}