veilid_tools/must_join_single_future.rs
1use super::*;
2
3use core::task::Poll;
4use futures_util::poll;
5
6#[derive(Debug)]
7struct MustJoinSingleFutureInner<T>
8where
9 T: 'static,
10{
11 locked: bool,
12 join_handle: Option<MustJoinHandle<T>>,
13}
14
15/// Spawns a single background processing task idempotently, possibly returning the return value of the previously executed background task
16/// This does not queue, just ensures that no more than a single copy of the task is running at a time, but allowing tasks to be retriggered
17#[derive(Debug, Clone)]
18pub struct MustJoinSingleFuture<T>
19where
20 T: 'static,
21{
22 inner: Arc<Mutex<MustJoinSingleFutureInner<T>>>,
23}
24
25impl<T> Default for MustJoinSingleFuture<T>
26where
27 T: 'static,
28{
29 fn default() -> Self {
30 Self::new()
31 }
32}
33
34impl<T> MustJoinSingleFuture<T>
35where
36 T: 'static,
37{
38 #[must_use]
39 pub fn new() -> Self {
40 Self {
41 inner: Arc::new(Mutex::new(MustJoinSingleFutureInner {
42 locked: false,
43 join_handle: None,
44 })),
45 }
46 }
47
48 fn try_lock(&self) -> Result<Option<MustJoinHandle<T>>, ()> {
49 let mut inner = self.inner.lock();
50 if inner.locked {
51 // If already locked error out
52 return Err(());
53 }
54 inner.locked = true;
55 // If we got the lock, return what we have for a join handle if anything
56 Ok(inner.join_handle.take())
57 }
58
59 fn unlock(&self, jh: Option<MustJoinHandle<T>>) {
60 let mut inner = self.inner.lock();
61 assert!(inner.locked);
62 assert!(inner.join_handle.is_none());
63 inner.locked = false;
64 inner.join_handle = jh;
65 }
66
67 /// Check the result and take it if there is one
68 pub async fn check(&self) -> Result<Option<T>, ()> {
69 let mut out: Option<T> = None;
70
71 // See if we have a result we can return
72 let maybe_jh = match self.try_lock() {
73 Ok(v) => v,
74 Err(_) => {
75 // If we are already polling somewhere else, don't hand back a result
76 return Err(());
77 }
78 };
79 if let Some(mut jh) = maybe_jh {
80 // See if we finished, if so, return the value of the last execution
81 if let Poll::Ready(r) = poll!(&mut jh) {
82 out = Some(r);
83 // Task finished, unlock with nothing
84 self.unlock(None);
85 } else {
86 // Still running put the join handle back so we can check on it later
87 self.unlock(Some(jh));
88 }
89 } else {
90 // No task, unlock with nothing
91 self.unlock(None);
92 }
93
94 // Return the prior result if we have one
95 Ok(out)
96 }
97
98 /// Wait for the result and take it
99 pub async fn join(&self) -> Result<Option<T>, ()> {
100 let mut out: Option<T> = None;
101
102 // See if we have a result we can return
103 let maybe_jh = match self.try_lock() {
104 Ok(v) => v,
105 Err(_) => {
106 // If we are already polling somewhere else,
107 // that's an error because you can only join
108 // these things once
109 return Err(());
110 }
111 };
112 if let Some(jh) = maybe_jh {
113 // Wait for return value of the last execution
114 out = Some(jh.await);
115 // Task finished, unlock with nothing
116 } else {
117 // No task, unlock with nothing
118 }
119 self.unlock(None);
120
121 // Return the prior result if we have one
122 Ok(out)
123 }
124
125 // Possibly spawn the future possibly returning the value of the last execution
126 pub async fn single_spawn_local(
127 &self,
128 name: &str,
129 future: impl Future<Output = T> + 'static,
130 ) -> Result<(Option<T>, bool), ()> {
131 let mut out: Option<T> = None;
132
133 // See if we have a result we can return
134 let maybe_jh = match self.try_lock() {
135 Ok(v) => v,
136 Err(_) => {
137 // If we are already polling somewhere else, don't hand back a result
138 return Err(());
139 }
140 };
141 let mut run = true;
142
143 if let Some(mut jh) = maybe_jh {
144 // See if we finished, if so, return the value of the last execution
145 if let Poll::Ready(r) = poll!(&mut jh) {
146 out = Some(r);
147 // Task finished, unlock with a new task
148 } else {
149 // Still running, don't run again, unlock with the current join handle
150 run = false;
151 self.unlock(Some(jh));
152 }
153 }
154
155 // Run if we should do that
156 if run {
157 self.unlock(Some(spawn_local(name, future)));
158 }
159
160 // Return the prior result if we have one
161 Ok((out, run))
162 }
163}
164
165impl<T> MustJoinSingleFuture<T>
166where
167 T: 'static + Send,
168{
169 pub async fn single_spawn<F: Future<Output = T> + Send + 'static, C: FnOnce() -> F>(
170 &self,
171 name: &str,
172 future_callback: C,
173 ) -> Result<(Option<T>, bool), ()> {
174 let mut out: Option<T> = None;
175 // See if we have a result we can return
176 let maybe_jh = match self.try_lock() {
177 Ok(v) => v,
178 Err(_) => {
179 // If we are already polling somewhere else, don't hand back a result
180 return Err(());
181 }
182 };
183 let mut run = true;
184 if let Some(mut jh) = maybe_jh {
185 // See if we finished, if so, return the value of the last execution
186 if let Poll::Ready(r) = poll!(&mut jh) {
187 out = Some(r);
188 // Task finished, unlock with a new task
189 } else {
190 // Still running, don't run again, unlock with the current join handle
191 run = false;
192 self.unlock(Some(jh));
193 }
194 }
195 // Run if we should do that
196 if run {
197 self.unlock(Some(spawn(name, future_callback())));
198 }
199 // Return the prior result if we have one
200 Ok((out, run))
201 }
202}