extern crate alloc;
use alloc::sync::Arc;
use core::fmt::{Debug, Formatter};
use core::future::Future;
use core::pin::Pin;
use core::task::{Context, Poll};
use std::sync::{Condvar, Mutex, RwLock};
#[derive(Default)]
pub enum EventualStatus<T> {
#[default]
NotReady,
Running,
CompleteEmpty,
Complete(T),
}
impl<T> Debug for EventualStatus<T> {
fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
match self {
EventualStatus::NotReady => write!(f, "NotReady"),
EventualStatus::Running => write!(f, "Running"),
EventualStatus::CompleteEmpty => write!(f, "CompleteEmpty"),
EventualStatus::Complete(_) => write!(f, "Complete"),
}
}
}
impl<T> From<Option<T>> for EventualStatus<T> {
fn from(value: Option<T>) -> Self {
match value {
None => EventualStatus::CompleteEmpty,
Some(v) => EventualStatus::Complete(v),
}
}
}
impl<T> From<EventualStatus<T>> for Option<T> {
fn from(value: EventualStatus<T>) -> Self {
if let EventualStatus::Complete(v) = value {
return Some(v);
}
None
}
}
impl<T: Clone> Clone for EventualStatus<T> {
fn clone(&self) -> Self {
match self {
EventualStatus::NotReady => EventualStatus::NotReady,
EventualStatus::Running => EventualStatus::Running,
EventualStatus::CompleteEmpty => EventualStatus::CompleteEmpty,
EventualStatus::Complete(c) => EventualStatus::Complete(c.clone()),
}
}
}
impl<T> EventualStatus<T> {
pub fn take(&mut self) -> Option<T> {
if !self.is_complete() {
return None;
}
core::mem::replace(self, EventualStatus::CompleteEmpty).into()
}
pub fn is_complete(&self) -> bool {
if let EventualStatus::NotReady = self {
return false;
} else if let EventualStatus::Running = self {
return false;
}
true
}
pub fn is_pending(&self) -> bool {
!self.is_complete()
}
}
struct EventualInner<T> {
condvar: Condvar,
guard: Mutex<()>,
val: RwLock<EventualStatus<Arc<T>>>,
}
impl<T> Default for EventualInner<T> {
fn default() -> Self {
EventualInner {
condvar: Default::default(),
guard: Default::default(),
val: RwLock::new(EventualStatus::NotReady),
}
}
}
impl<T> EventualInner<T> {
fn new(status: EventualStatus<Arc<T>>) -> Self {
EventualInner {
condvar: Default::default(),
guard: Default::default(),
val: RwLock::new(status),
}
}
}
#[derive(Clone)]
pub struct Eventual<T> {
inner: Arc<EventualInner<T>>,
}
impl<T> Default for Eventual<T> {
fn default() -> Self {
Eventual {
inner: Arc::new(EventualInner::default()),
}
}
}
impl<T> Eventual<T> {
pub fn new_loaded(val: T) -> Self {
Eventual {
inner: Arc::new(EventualInner::new(EventualStatus::Complete(Arc::new(val)))),
}
}
pub fn set(&self, val: Option<T>) {
if let Ok(mut write) = self.inner.val.write() {
let val = val.map(Arc::new);
*write = val.into();
}
if self.is_ready() {
self.inner.condvar.notify_all();
}
}
pub fn set_shared(&self, val: Arc<T>) {
if let Ok(mut write) = self.inner.val.write() {
*write = EventualStatus::Complete(val);
self.inner.condvar.notify_all()
}
}
pub fn get(&self) -> EventualStatus<Arc<T>> {
if let Ok(read) = self.inner.val.read() {
return read.clone();
}
EventualStatus::NotReady
}
pub fn take(&self) -> Option<Arc<T>> {
if let Ok(mut write) = self.inner.val.write() {
return write.take();
}
None
}
pub fn start(&self) {
if let Ok(mut write) = self.inner.val.write() {
match *write {
EventualStatus::NotReady | EventualStatus::CompleteEmpty => {
*write = EventualStatus::Running;
}
_ => {
}
}
}
}
pub fn is_ready(&self) -> bool {
if let Ok(read) = self.inner.val.read() {
return read.is_complete();
}
false
}
pub fn is_pending(&self) -> bool {
if let Ok(read) = self.inner.val.read() {
return read.is_pending();
}
true
}
pub fn block_until_ready(&self) -> EventualStatus<Arc<T>> {
match self.get() {
EventualStatus::CompleteEmpty => return EventualStatus::CompleteEmpty,
EventualStatus::Complete(v) => return EventualStatus::Complete(v),
_ => {}
}
if let Ok(guard) = self.inner.guard.lock() {
let _unused = self.inner.condvar.wait_while(guard, |()| self.is_pending());
}
self.get()
}
}
impl<T> Future for Eventual<T> {
type Output = EventualStatus<Arc<T>>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let status = self.get();
if status.is_complete() {
return Poll::Ready(status);
}
cx.waker().wake_by_ref();
Poll::Pending
}
}