1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247
// SPDX-License-Identifier: MIT
// Copyright 2023 IROX Contributors
//
use std::cell::{OnceCell, RefCell};
use std::future::Future;
use std::pin::Pin;
use std::rc::Rc;
use std::sync::{Arc, Condvar, Mutex};
use std::task::{Context, Poll, Waker};
pub type LocalFuture<'a, T> = Pin<Box<dyn Future<Output = T> + 'a>>;
pub type LocalVoidFuture<'a> = LocalFuture<'a, ()>;
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum TaskError {
/// Mutex locking failed, probably due to panic
LockingError,
/// Task was not completed
NotCompletedError,
/// Executor cannot accept new tasks because it's stopping
ExecutorStoppingError,
/// There was an error sending the task to a worker.
ExchangerError,
}
struct CompletableTaskInner<T> {
result: OnceCell<T>,
is_complete: bool,
waker: Option<Waker>,
}
impl<T> CompletableTaskInner<T> {
pub fn new() -> Self {
CompletableTaskInner {
result: OnceCell::new(),
is_complete: false,
waker: None,
}
}
pub fn try_set(&mut self, value: T) -> Result<(), T> {
if self.is_complete {
return Err(value);
};
let oldval = self.result.set(value);
self.is_complete = true;
if let Some(waker) = self.waker.take() {
waker.wake();
}
oldval?;
Ok(())
}
pub fn take(&mut self) -> Option<T> {
self.result.take()
}
}
///
/// A `CompletableTask` is a one-time-use shuttle struct to enable tasks/threads
/// to provide the result of an compute operation. Once the task is completed,
/// any additional attempts to complete the task results in an error.
///
/// This is thread-safe equivalent to [`OnceCell<T>`], but combines the ability
/// to block the current thread until the task completes.
#[derive(Clone)]
pub struct CompletableTask<T> {
inner: Arc<(Mutex<CompletableTaskInner<T>>, Condvar)>,
}
impl<T> CompletableTask<T> {
///
/// Creates a new [`CompletableTask`]
pub fn new() -> CompletableTask<T> {
let inner = CompletableTaskInner::new();
CompletableTask {
inner: Arc::new((Mutex::new(inner), Condvar::new())),
}
}
///
/// Attempt to complete this task with the specified value.
///
/// Returns `Ok(())` if the task was successfully completed.
/// Returns `Err(value)` with the provided value if:
/// * The task has already completed
/// * Any errors in locking or mutex poisoning prevented the completion
pub fn try_complete(&self, value: T) -> Result<(), T> {
let arc = self.inner.clone();
let Ok(mut inner) = arc.0.lock() else {
return Err(value);
};
if inner.is_complete {
return Err(value);
}
inner.try_set(value)?;
arc.1.notify_all();
Ok(())
}
///
/// Checks if the task has been completed.
///
/// * Returns `Ok(true)` if the task has been completed
/// * Returns `Ok(false)` if the task has NOT been completed
/// * Returns `Err(())` if any errors in locking prevented the checks
pub fn is_complete(&self) -> Result<bool, TaskError> {
let arc = self.inner.clone();
let Ok(inner) = arc.0.lock() else {
return Err(TaskError::LockingError);
};
Ok(inner.is_complete)
}
///
/// Gets the result of the operation if it has been set. Does NOT block until
/// the task is complete. Use [`CompletableTask::take_blocking`] for blocking requests.
///
/// Returns `Ok(Poll::Ready(T))` if the task has been completed
/// Returns `Ok(Poll::Pending))` if the task has NOT been completed
/// Returns `Err(())` if the underlying mutex has been poisoned and is corrupt.
pub fn try_take(&self) -> Result<Poll<T>, TaskError> {
let arc = self.inner.clone();
let Ok(mut inner) = arc.0.lock() else {
return Err(TaskError::LockingError);
};
if let Some(val) = inner.take() {
return Ok(Poll::Ready(val));
}
Ok(Poll::Pending)
}
///
/// Gets the result of the operation, blocking until the operation is complete.
///
/// Returns `Ok(T)` if the operation completed,
/// Returns `Err(())` if any error happens.
pub fn take_blocking(&self) -> Result<T, TaskError> {
let arc = self.inner.clone();
let Ok(inner) = arc.0.lock() else {
return Err(TaskError::LockingError);
};
let Ok(mut res) = arc.1.wait_while(inner, |v| !v.is_complete) else {
return Err(TaskError::LockingError);
};
if let Some(val) = res.result.take() {
return Ok(val);
}
Err(TaskError::NotCompletedError)
}
/// If this is a future, sets the waker to be notified
pub(crate) fn set_waker(&self, waker: Waker) -> Result<(), TaskError> {
let arc = self.inner.clone();
let Ok(mut inner) = arc.0.lock() else {
return Err(TaskError::LockingError);
};
if let Some(waker) = inner.waker.replace(waker) {
// clean out the old waker, and wake it up.
waker.wake();
}
Ok(())
}
}
impl<T> Default for CompletableTask<T> {
fn default() -> Self {
CompletableTask::new()
}
}
impl<T> Future for CompletableTask<T> {
type Output = Result<T, TaskError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if let Err(e) = self.set_waker(cx.waker().clone()) {
return Poll::Ready(Err(e));
}
match self.try_take() {
Ok(Poll::Ready(v)) => Poll::Ready(Ok(v)),
Ok(Poll::Pending) => Poll::Pending,
Err(e) => Poll::Ready(Err(e)),
}
}
}
///
/// Local, Current thread version of [`CompletableTask`] that uses a [`Rc`] instead of
/// an [`Arc`] for inner storage.
pub struct LocalCompletableTask<T> {
result: Rc<RefCell<Option<T>>>,
}
impl<T> Clone for LocalCompletableTask<T> {
fn clone(&self) -> Self {
LocalCompletableTask {
result: self.result.clone(),
}
}
}
impl<T> LocalCompletableTask<T> {
/// Creates a new, uncompleted task.
pub fn new() -> Self {
LocalCompletableTask {
result: Rc::new(RefCell::new(None)),
}
}
///
/// Attempts to complete this task. This will only actually fail if the
/// task has already been completed. In this case, the original value will
/// be returned back as the 'Error' type.
pub fn try_complete(&self, value: T) -> Result<(), T> {
let res = self.result.clone();
if res.borrow().is_some() {
return Err(value);
}
if let Some(t) = res.replace(Some(value)) {
return Err(t);
}
Ok(())
}
///
/// Returns the current status of this task. If the task is complete, returns
/// [`Poll::Ready(T)`], otherwise returns [`Poll::Pending`]
pub fn get(&self) -> Poll<T> {
if let Some(v) = self.result.take() {
return Poll::Ready(v);
}
Poll::Pending
}
}
impl<T> Default for LocalCompletableTask<T> {
fn default() -> Self {
LocalCompletableTask::new()
}
}