use std::cell::{OnceCell, RefCell};
use std::future::Future;
use std::pin::Pin;
use std::rc::Rc;
use std::sync::{Arc, Condvar, Mutex};
use std::task::Poll;
pub type SendFuture<'a, T> = dyn Future<Output = T> + Send + 'a;
pub type SharedSendFuture<'a, T> = Arc<SendFuture<'a, T>>;
pub type SharedFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
pub type LocalFuture<'a, T> = Pin<Box<dyn Future<Output = T> + 'a>>;
pub type LocalVoidFuture<'a> = LocalFuture<'a, ()>;
#[derive(Debug, Copy, Clone)]
pub enum TaskError {
LockingError,
NotCompletedError,
}
struct CompletableTaskInner<T> {
result: OnceCell<T>,
is_complete: bool,
}
impl<T> CompletableTaskInner<T> {
pub fn new() -> Self {
CompletableTaskInner {
result: OnceCell::new(),
is_complete: false,
}
}
pub fn try_set(&mut self, value: T) -> Result<(), T> {
if self.is_complete {
return Err(value);
};
let oldval = self.result.set(value);
self.is_complete = true;
oldval?;
Ok(())
}
pub fn take(&mut self) -> Option<T> {
self.result.take()
}
}
pub struct CompletableTask<T> {
inner: Arc<(Mutex<CompletableTaskInner<T>>, Condvar)>,
}
impl<T> CompletableTask<T> {
pub fn new() -> CompletableTask<T> {
let inner = CompletableTaskInner::new();
CompletableTask {
inner: Arc::new((Mutex::new(inner), Condvar::new())),
}
}
pub fn try_complete(&self, value: T) -> Result<(), T> {
let arc = self.inner.clone();
let Ok(mut inner) = arc.0.lock() else {
return Err(value);
};
if inner.is_complete {
return Err(value);
}
inner.try_set(value)?;
arc.1.notify_all();
Ok(())
}
pub fn is_complete(&self) -> Result<bool, TaskError> {
let arc = self.inner.clone();
let Ok(inner) = arc.0.lock() else {
return Err(TaskError::LockingError);
};
Ok(inner.is_complete)
}
pub fn try_take(&self) -> Result<Poll<T>, TaskError> {
let arc = self.inner.clone();
let Ok(mut inner) = arc.0.lock() else {
return Err(TaskError::LockingError);
};
if let Some(val) = inner.take() {
return Ok(Poll::Ready(val));
}
Ok(Poll::Pending)
}
pub fn take_blocking(&self) -> Result<T, TaskError> {
let arc = self.inner.clone();
let Ok(inner) = arc.0.lock() else {
return Err(TaskError::LockingError);
};
let Ok(mut res) = arc.1.wait_while(inner, |v| !v.is_complete) else {
return Err(TaskError::LockingError);
};
if let Some(val) = res.result.take() {
return Ok(val);
}
Err(TaskError::NotCompletedError)
}
}
impl<T> Default for CompletableTask<T> {
fn default() -> Self {
CompletableTask::new()
}
}
pub struct LocalCompletableTask<T> {
result: Rc<RefCell<Option<T>>>,
}
impl<T> Clone for LocalCompletableTask<T> {
fn clone(&self) -> Self {
LocalCompletableTask {
result: self.result.clone(),
}
}
}
impl<T> LocalCompletableTask<T> {
pub fn new() -> Self {
LocalCompletableTask {
result: Rc::new(RefCell::new(None)),
}
}
pub fn try_complete(&self, value: T) -> Result<(), T> {
let res = self.result.clone();
if res.borrow().is_some() {
return Err(value);
}
if let Some(t) = res.replace(Some(value)) {
return Err(t);
}
Ok(())
}
pub fn get(&self) -> Poll<T> {
if let Some(v) = self.result.take() {
return Poll::Ready(v);
}
Poll::Pending
}
}
impl<T> Default for LocalCompletableTask<T> {
fn default() -> Self {
LocalCompletableTask::new()
}
}