irox_threading/
task.rs

1// SPDX-License-Identifier: MIT
2// Copyright 2023 IROX Contributors
3//
4
5use std::cell::{OnceCell, RefCell};
6use std::future::Future;
7use std::pin::Pin;
8use std::rc::Rc;
9use std::sync::{Arc, Condvar, Mutex};
10use std::task::{Context, Poll, Waker};
11
12pub type LocalFuture<'a, T> = Pin<Box<dyn Future<Output = T> + 'a>>;
13pub type LocalVoidFuture<'a> = LocalFuture<'a, ()>;
14
15#[derive(Debug, Copy, Clone, Eq, PartialEq)]
16pub enum TaskError {
17    /// Mutex locking failed, probably due to panic
18    LockingError,
19
20    /// Task was not completed
21    NotCompletedError,
22
23    /// Executor cannot accept new tasks because it's stopping
24    ExecutorStoppingError,
25
26    /// There was an error sending the task to a worker.
27    ExchangerError,
28}
29
30struct CompletableTaskInner<T> {
31    result: OnceCell<T>,
32    is_complete: bool,
33    waker: Option<Waker>,
34}
35impl<T> CompletableTaskInner<T> {
36    pub fn new() -> Self {
37        CompletableTaskInner {
38            result: OnceCell::new(),
39            is_complete: false,
40            waker: None,
41        }
42    }
43
44    pub fn try_set(&mut self, value: T) -> Result<(), T> {
45        if self.is_complete {
46            return Err(value);
47        };
48        let oldval = self.result.set(value);
49        self.is_complete = true;
50        if let Some(waker) = self.waker.take() {
51            waker.wake();
52        }
53        oldval?;
54
55        Ok(())
56    }
57
58    pub fn take(&mut self) -> Option<T> {
59        self.result.take()
60    }
61}
62
63///
64/// A `CompletableTask` is a one-time-use shuttle struct to enable tasks/threads
65/// to provide the result of an compute operation.  Once the task is completed,
66/// any additional attempts to complete the task results in an error.
67///
68/// This is thread-safe equivalent to [`OnceCell<T>`], but combines the ability
69/// to block the current thread until the task completes.
70#[derive(Clone)]
71pub struct CompletableTask<T> {
72    inner: Arc<(Mutex<CompletableTaskInner<T>>, Condvar)>,
73}
74
75impl<T> CompletableTask<T> {
76    ///
77    /// Creates a new [`CompletableTask`]
78    pub fn new() -> CompletableTask<T> {
79        let inner = CompletableTaskInner::new();
80        CompletableTask {
81            inner: Arc::new((Mutex::new(inner), Condvar::new())),
82        }
83    }
84
85    ///
86    /// Attempt to complete this task with the specified value.
87    ///
88    /// Returns `Ok(())` if the task was successfully completed.
89    /// Returns `Err(value)` with the provided value if:
90    /// * The task has already completed
91    /// * Any errors in locking or mutex poisoning prevented the completion
92    pub fn try_complete(&self, value: T) -> Result<(), T> {
93        let arc = self.inner.clone();
94        let Ok(mut inner) = arc.0.lock() else {
95            return Err(value);
96        };
97        if inner.is_complete {
98            return Err(value);
99        }
100        inner.try_set(value)?;
101
102        arc.1.notify_all();
103        Ok(())
104    }
105
106    ///
107    /// Checks if the task has been completed.
108    ///
109    /// * Returns `Ok(true)` if the task has been completed
110    /// * Returns `Ok(false)` if the task has NOT been completed
111    /// * Returns `Err(())` if any errors in locking prevented the checks
112    pub fn is_complete(&self) -> Result<bool, TaskError> {
113        let arc = self.inner.clone();
114        let Ok(inner) = arc.0.lock() else {
115            return Err(TaskError::LockingError);
116        };
117        Ok(inner.is_complete)
118    }
119
120    ///
121    /// Gets the result of the operation if it has been set.  Does NOT block until
122    /// the task is complete.  Use [`CompletableTask::take_blocking`] for blocking requests.
123    ///
124    /// Returns `Ok(Poll::Ready(T))` if the task has been completed
125    /// Returns `Ok(Poll::Pending))` if the task has NOT been completed
126    /// Returns `Err(())` if the underlying mutex has been poisoned and is corrupt.
127    pub fn try_take(&self) -> Result<Poll<T>, TaskError> {
128        let arc = self.inner.clone();
129        let Ok(mut inner) = arc.0.lock() else {
130            return Err(TaskError::LockingError);
131        };
132        if let Some(val) = inner.take() {
133            return Ok(Poll::Ready(val));
134        }
135        Ok(Poll::Pending)
136    }
137
138    ///
139    /// Gets the result of the operation, blocking until the operation is complete.
140    ///
141    /// Returns `Ok(T)` if the operation completed,
142    /// Returns `Err(())` if any error happens.
143    pub fn take_blocking(&self) -> Result<T, TaskError> {
144        let arc = self.inner.clone();
145        let Ok(inner) = arc.0.lock() else {
146            return Err(TaskError::LockingError);
147        };
148        let Ok(mut res) = arc.1.wait_while(inner, |v| !v.is_complete) else {
149            return Err(TaskError::LockingError);
150        };
151        if let Some(val) = res.result.take() {
152            return Ok(val);
153        }
154        Err(TaskError::NotCompletedError)
155    }
156
157    /// If this is a future, sets the waker to be notified
158    pub(crate) fn set_waker(&self, waker: Waker) -> Result<(), TaskError> {
159        let arc = self.inner.clone();
160        let Ok(mut inner) = arc.0.lock() else {
161            return Err(TaskError::LockingError);
162        };
163        if let Some(waker) = inner.waker.replace(waker) {
164            // clean out the old waker, and wake it up.
165            waker.wake();
166        }
167
168        Ok(())
169    }
170}
171
172impl<T> Default for CompletableTask<T> {
173    fn default() -> Self {
174        CompletableTask::new()
175    }
176}
177
178impl<T> Future for CompletableTask<T> {
179    type Output = Result<T, TaskError>;
180
181    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
182        if let Err(e) = self.set_waker(cx.waker().clone()) {
183            return Poll::Ready(Err(e));
184        }
185
186        match self.try_take() {
187            Ok(Poll::Ready(v)) => Poll::Ready(Ok(v)),
188            Ok(Poll::Pending) => Poll::Pending,
189            Err(e) => Poll::Ready(Err(e)),
190        }
191    }
192}
193
194///
195/// Local, Current thread version of [`CompletableTask`] that uses a [`Rc`] instead of
196/// an [`Arc`] for inner storage.
197pub struct LocalCompletableTask<T> {
198    result: Rc<RefCell<Option<T>>>,
199}
200
201impl<T> Clone for LocalCompletableTask<T> {
202    fn clone(&self) -> Self {
203        LocalCompletableTask {
204            result: self.result.clone(),
205        }
206    }
207}
208
209impl<T> LocalCompletableTask<T> {
210    /// Creates a new, uncompleted task.
211    pub fn new() -> Self {
212        LocalCompletableTask {
213            result: Rc::new(RefCell::new(None)),
214        }
215    }
216
217    ///
218    /// Attempts to complete this task.  This will only actually fail if the
219    /// task has already been completed.  In this case, the original value will
220    /// be returned back as the 'Error' type.
221    pub fn try_complete(&self, value: T) -> Result<(), T> {
222        let res = self.result.clone();
223        if res.borrow().is_some() {
224            return Err(value);
225        }
226        if let Some(t) = res.replace(Some(value)) {
227            return Err(t);
228        }
229        Ok(())
230    }
231
232    ///
233    /// Returns the current status of this task.  If the task is complete, returns
234    /// [`Poll::Ready(T)`], otherwise returns [`Poll::Pending`]
235    pub fn get(&self) -> Poll<T> {
236        if let Some(v) = self.result.take() {
237            return Poll::Ready(v);
238        }
239        Poll::Pending
240    }
241}
242
243impl<T> Default for LocalCompletableTask<T> {
244    fn default() -> Self {
245        LocalCompletableTask::new()
246    }
247}