use std::{
any::Any,
future::Future,
pin::Pin,
sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex,
},
task::{Context, Poll},
};
use crate::{Cancellation, CancellationCause};
pub(crate) mod private;
use private::*;
#[must_use]
pub struct Promise<T> {
state: PromiseState<T>,
target: Arc<PromiseTarget<T>>,
dependencies: Vec<Box<dyn Any + Send + Sync>>,
}
impl<T> Promise<T> {
pub fn sneak_peek(&self) -> &PromiseState<T> {
&self.state
}
pub fn peek(&mut self) -> &PromiseState<T> {
self.update();
&self.state
}
pub fn take(&mut self) -> PromiseState<T> {
self.update();
self.state.take()
}
pub fn wait(&self) -> &Self {
if !self.state.is_pending() {
return self;
}
Self::impl_wait(&self.target, None);
self
}
pub fn interruptible_wait(&self, interrupter: &Interrupter) -> &Self
where
T: 'static,
{
if !self.state.is_pending() {
return self;
}
if let Some(interrupt) = interrupter.push(self.target.clone()) {
Self::impl_wait(&self.target, Some(interrupt));
}
self
}
pub fn wait_mut(&mut self) -> &mut Self {
if !self.state.is_pending() {
return self;
}
if let Some(mut guard) = Self::impl_wait(&self.target, None) {
Self::impl_try_take_result(&mut self.state, &mut guard.result);
}
self
}
pub fn interruptible_wait_mut(&mut self, interrupter: &Interrupter) -> &mut Self
where
T: 'static,
{
if !self.state.is_pending() {
return self;
}
if let Some(interrupt) = interrupter.push(self.target.clone()) {
if let Some(mut guard) = Self::impl_wait(&self.target, Some(interrupt)) {
Self::impl_try_take_result(&mut self.state, &mut guard.result);
}
}
self
}
pub fn update(&mut self) {
if self.state.is_pending() {
match self.target.inner.lock() {
Ok(mut guard) => {
self.state.update(guard.result.take());
}
Err(_) => {
self.state = PromiseState::make_poisoned();
}
}
}
}
}
impl<T: 'static + Send + Sync> Promise<Promise<T>> {
pub fn flatten(self) -> Promise<T> {
self.impl_flatten()
}
}
impl<T> Drop for Promise<T> {
fn drop(&mut self) {
if self.state.is_pending() {
let f = match self.target.inner.lock() {
Ok(mut guard) => guard.on_promise_drop.take(),
Err(_) => None,
};
if let Some(f) = f {
f();
}
}
}
}
impl<T: Unpin> Future for Promise<T> {
type Output = PromiseState<T>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let self_mut = self.get_mut();
let state = self_mut.take();
if state.is_pending() {
if let Ok(mut inner) = self_mut.target.inner.lock() {
inner.waker = Some(cx.waker().clone());
}
Poll::Pending
} else {
Poll::Ready(state)
}
}
}
#[derive(Debug, Clone)]
pub enum PromiseState<T> {
Available(T),
Pending,
Cancelled(Cancellation),
Disposed,
Taken,
}
impl<T> PromiseState<T> {
pub fn as_ref(&self) -> PromiseState<&T> {
match self {
Self::Available(value) => PromiseState::Available(value),
Self::Pending => PromiseState::Pending,
Self::Cancelled(cancellation) => PromiseState::Cancelled(cancellation.clone()),
Self::Disposed => PromiseState::Disposed,
Self::Taken => PromiseState::Taken,
}
}
pub fn available(self) -> Option<T> {
match self {
Self::Available(value) => Some(value),
_ => None,
}
}
pub fn is_available(&self) -> bool {
matches!(self, Self::Available(_))
}
pub fn is_pending(&self) -> bool {
matches!(self, Self::Pending)
}
pub fn is_cancelled(&self) -> bool {
matches!(self, Self::Cancelled(_))
}
pub fn cancellation(&self) -> Option<&Cancellation> {
match self {
Self::Cancelled(cause) => Some(cause),
_ => None,
}
}
pub fn is_disposed(&self) -> bool {
matches!(self, Self::Disposed)
}
pub fn is_taken(&self) -> bool {
matches!(self, Self::Taken)
}
pub fn take(&mut self) -> PromiseState<T> {
let next_value = match self {
Self::Available(_) => Self::Taken,
Self::Pending => Self::Pending,
Self::Cancelled(cancellation) => Self::Cancelled(cancellation.clone()),
Self::Disposed => Self::Disposed,
Self::Taken => Self::Taken,
};
std::mem::replace(self, next_value)
}
pub fn map<U>(self, f: impl FnOnce(T) -> U) -> PromiseState<U> {
match self {
Self::Available(x) => PromiseState::Available(f(x)),
Self::Pending => PromiseState::Pending,
Self::Cancelled(cause) => PromiseState::Cancelled(cause),
Self::Disposed => PromiseState::Disposed,
Self::Taken => PromiseState::Taken,
}
}
pub fn then<U>(self, f: impl FnOnce(T) -> PromiseState<U>) -> PromiseState<U> {
self.map(f).flatten()
}
fn update(&mut self, result: Option<PromiseResult<T>>) {
match result {
Some(PromiseResult::Available(response)) => {
*self = PromiseState::Available(response);
}
Some(PromiseResult::Cancelled(cause)) => {
*self = PromiseState::Cancelled(cause);
}
Some(PromiseResult::Disposed) => {
*self = PromiseState::Disposed;
}
None => {
}
}
}
fn make_poisoned() -> Self {
Self::Cancelled(Cancellation::from_cause(
CancellationCause::PoisonedMutexInPromise,
))
}
}
impl<T> PromiseState<PromiseState<T>> {
pub fn flatten(self) -> PromiseState<T> {
match self {
Self::Available(x) => x,
Self::Pending => PromiseState::Pending,
Self::Cancelled(cause) => PromiseState::Cancelled(cause),
Self::Disposed => PromiseState::Disposed,
Self::Taken => PromiseState::Taken,
}
}
}
pub struct Interrupter {
inner: Arc<Mutex<InterrupterInner>>,
}
#[allow(clippy::arc_with_non_send_sync)]
impl Interrupter {
pub fn new() -> Self {
Self {
inner: Arc::new(Mutex::new(InterrupterInner::new())),
}
}
pub fn interrupt(&self) {
let mut guard = match self.inner.lock() {
Ok(guard) => guard,
Err(poisoned) => {
let mut inner = poisoned.into_inner();
*inner = InterrupterInner::new();
return;
}
};
guard.triggered = true;
for waiter in &*guard.waiters {
waiter.interrupt.store(true, Ordering::SeqCst);
waiter.interruptible.interrupt();
}
guard.waiters.clear();
}
pub fn reset(&self) {
match self.inner.lock() {
Ok(mut guard) => {
guard.triggered = false;
}
Err(poisoned) => {
let mut guard = poisoned.into_inner();
*guard = InterrupterInner::new();
}
}
}
fn push<T: 'static>(&self, target: Arc<PromiseTarget<T>>) -> Option<Arc<AtomicBool>> {
let mut guard = match self.inner.lock() {
Ok(guard) => guard,
Err(poisoned) => {
let mut guard = poisoned.into_inner();
*guard = InterrupterInner::new();
guard
}
};
if guard.triggered {
return None;
}
let interruptee = Interruptee {
interrupt: Arc::new(AtomicBool::new(false)),
interruptible: target,
};
let interrupt = interruptee.interrupt.clone();
guard.waiters.push(interruptee);
Some(interrupt)
}
}
impl Default for Interrupter {
fn default() -> Self {
Interrupter::new()
}
}
#[cfg(test)]
mod tests {
use crate::prelude::*;
#[test]
fn test_promise_flatten() {
{
let (outer_sender, mut flat_promise) = {
let (outer_sender, outer_promise) = Promise::<Promise<&str>>::new();
(outer_sender, outer_promise.flatten())
};
let (inner_sender, inner_promise) = Promise::<&str>::new();
assert!(outer_sender.send(inner_promise).is_ok());
assert!(flat_promise.peek().is_pending());
assert!(inner_sender.send("hello").is_ok());
assert_eq!(flat_promise.take().available(), Some("hello"));
}
{
let (outer_sender, mut flat_promise) = {
let (outer_sender, outer_promise) = Promise::<Promise<&str>>::new();
(outer_sender, outer_promise.flatten())
};
let (inner_sender, inner_promise) = Promise::<&str>::new();
assert!(flat_promise.peek().is_pending());
assert!(inner_sender.send("hello").is_ok());
assert!(flat_promise.peek().is_pending());
assert!(outer_sender.send(inner_promise).is_ok());
assert_eq!(flat_promise.take().available(), Some("hello"));
}
{
let (inner_sender, mut flat_promise) = {
let (outer_sender, mut outer_promise) = Promise::<Promise<&str>>::new();
assert!(outer_promise.peek().is_pending());
let (inner_sender, inner_promise) = Promise::<&str>::new();
assert!(outer_sender.send(inner_promise).is_ok());
(inner_sender, outer_promise.flatten())
};
assert!(flat_promise.peek().is_pending());
assert!(inner_sender.send("hello").is_ok());
assert_eq!(flat_promise.take().available(), Some("hello"));
}
{
let (mut flat_promise, outer_sender, inner_promise) = {
let (outer_sender, outer_promise) = Promise::<Promise<&str>>::new();
let (inner_sender, inner_promise) = Promise::<&str>::new();
assert!(inner_sender.send("hello").is_ok());
(outer_promise.flatten(), outer_sender, inner_promise)
};
assert!(flat_promise.peek().is_pending());
assert!(outer_sender.send(inner_promise).is_ok());
assert_eq!(flat_promise.take().available(), Some("hello"));
}
{
let mut flat_promise = {
let (outer_sender, mut outer_promise) = Promise::<Promise<&str>>::new();
assert!(outer_promise.peek().is_pending());
let (inner_sender, inner_promise) = Promise::<&str>::new();
assert!(outer_sender.send(inner_promise).is_ok());
assert!(inner_sender.send("hello").is_ok());
assert!(outer_promise.peek().is_available());
outer_promise.flatten()
};
assert_eq!(flat_promise.take().available(), Some("hello"));
}
{
let mut flat_promise = {
let (outer_sender, outer_promise) = Promise::<Promise<&str>>::new();
let (inner_sender, inner_promise) = Promise::<&str>::new();
assert!(inner_sender.send("hello").is_ok());
assert!(outer_sender.send(inner_promise).is_ok());
outer_promise.flatten()
};
assert_eq!(flat_promise.take().available(), Some("hello"));
}
}
use super::Sender;
struct DoubleFlattenPairs {
outer_promise: Promise<Promise<Promise<&'static str>>>,
outer_sender: Sender<Promise<Promise<&'static str>>>,
mid_promise: Promise<Promise<&'static str>>,
mid_sender: Sender<Promise<&'static str>>,
inner_promise: Promise<&'static str>,
inner_sender: Sender<&'static str>,
}
impl DoubleFlattenPairs {
fn new() -> DoubleFlattenPairs {
let (outer_sender, outer_promise) = Promise::new();
let (mid_sender, mid_promise) = Promise::new();
let (inner_sender, inner_promise) = Promise::new();
Self {
outer_promise,
outer_sender,
mid_promise,
mid_sender,
inner_promise,
inner_sender,
}
}
}
#[test]
fn test_promise_double_flatten() {
{
let DoubleFlattenPairs {
outer_promise,
outer_sender,
mid_promise,
mid_sender,
inner_promise,
inner_sender,
} = DoubleFlattenPairs::new();
let mut flat_promise = outer_promise.flatten().flatten();
assert!(flat_promise.peek().is_pending());
assert!(outer_sender.send(mid_promise).is_ok());
assert!(flat_promise.peek().is_pending());
assert!(mid_sender.send(inner_promise).is_ok());
assert!(flat_promise.peek().is_pending());
assert!(inner_sender.send("hello").is_ok());
assert_eq!(flat_promise.take().available(), Some("hello"));
}
{
let DoubleFlattenPairs {
outer_promise,
outer_sender,
mid_promise,
mid_sender,
inner_promise,
inner_sender,
} = DoubleFlattenPairs::new();
let mut flat_promise = outer_promise.flatten();
assert!(flat_promise.peek().is_pending());
assert!(outer_sender.send(mid_promise).is_ok());
assert!(flat_promise.peek().is_pending());
let mut flat_promise = flat_promise.flatten();
assert!(flat_promise.peek().is_pending());
assert!(mid_sender.send(inner_promise).is_ok());
assert!(flat_promise.peek().is_pending());
assert!(inner_sender.send("hello").is_ok());
assert_eq!(flat_promise.take().available(), Some("hello"));
}
{
let DoubleFlattenPairs {
outer_promise,
outer_sender,
mid_promise,
mid_sender,
inner_promise,
inner_sender,
} = DoubleFlattenPairs::new();
assert!(outer_sender.send(mid_promise).is_ok());
let mut flat_promise = outer_promise.flatten().flatten();
assert!(flat_promise.peek().is_pending());
assert!(mid_sender.send(inner_promise).is_ok());
assert!(flat_promise.peek().is_pending());
assert!(inner_sender.send("hello").is_ok());
assert_eq!(flat_promise.take().available(), Some("hello"));
}
{
let DoubleFlattenPairs {
mut outer_promise,
outer_sender,
mid_promise,
mid_sender,
inner_promise,
inner_sender,
} = DoubleFlattenPairs::new();
assert!(outer_sender.send(mid_promise).is_ok());
assert!(outer_promise.peek().is_available());
assert!(mid_sender.send(inner_promise).is_ok());
let mut flat_promise = outer_promise.flatten().flatten();
assert!(flat_promise.peek().is_pending());
assert!(inner_sender.send("hello").is_ok());
assert_eq!(flat_promise.take().available(), Some("hello"));
}
{
let DoubleFlattenPairs {
mut outer_promise,
outer_sender,
mid_promise,
mid_sender,
inner_promise,
inner_sender,
} = DoubleFlattenPairs::new();
assert!(outer_sender.send(mid_promise).is_ok());
assert!(outer_promise.peek().is_available());
assert!(mid_sender.send(inner_promise).is_ok());
assert!(inner_sender.send("hello").is_ok());
let mut flat_promise = outer_promise.flatten().flatten();
assert_eq!(flat_promise.take().available(), Some("hello"));
}
{
let DoubleFlattenPairs {
outer_promise,
outer_sender,
mid_promise,
mid_sender,
inner_promise,
inner_sender,
} = DoubleFlattenPairs::new();
assert!(mid_sender.send(inner_promise).is_ok());
let mut flat_promise = outer_promise.flatten().flatten();
assert!(flat_promise.peek().is_pending());
assert!(inner_sender.send("hello").is_ok());
assert!(flat_promise.peek().is_pending());
assert!(outer_sender.send(mid_promise).is_ok());
assert_eq!(flat_promise.take().available(), Some("hello"));
}
{
let DoubleFlattenPairs {
outer_promise,
outer_sender,
mid_promise,
mid_sender,
inner_promise,
inner_sender,
} = DoubleFlattenPairs::new();
assert!(inner_sender.send("hello").is_ok());
let mut flat_promise = outer_promise.flatten().flatten();
assert!(flat_promise.peek().is_pending());
assert!(outer_sender.send(mid_promise).is_ok());
assert!(flat_promise.peek().is_pending());
assert!(mid_sender.send(inner_promise).is_ok());
assert_eq!(flat_promise.take().available(), Some("hello"));
}
}
}