use tokio::sync::watch;
use crate::failure::cause::Cause;
use crate::failure::exit::Exit;
use crate::kernel::{Effect, box_future};
use crate::runtime::{FiberId, Never};
#[derive(Clone, Debug)]
pub struct Deferred<A, E> {
tx: watch::Sender<Option<Exit<A, E>>>,
rx: watch::Receiver<Option<Exit<A, E>>>,
}
impl<A, E> Deferred<A, E>
where
A: Clone + Send + Sync + 'static,
E: Clone + Send + Sync + 'static,
{
pub fn make() -> Effect<Deferred<A, E>, Never, ()> {
Effect::new(|_r| {
let (tx, rx) = watch::channel(None);
Ok(Deferred { tx, rx })
})
}
pub fn wait(&self) -> Effect<A, Cause<E>, ()> {
let mut rx = self.rx.clone();
Effect::new_async(move |_r| {
box_future(async move {
loop {
if let Some(exit) = rx.borrow().clone() {
return match exit {
Exit::Success(a) => Ok(a),
Exit::Failure(c) => Err(c),
};
}
if rx.changed().await.is_err() {
return Err(Cause::die("deferred: all senders dropped"));
}
}
})
})
}
pub fn wait_future(
&self,
) -> impl std::future::Future<Output = Result<A, Cause<E>>> + Send + 'static
where
A: Clone + Send + Sync + 'static,
E: Clone + Send + Sync + 'static,
{
let mut rx = self.rx.clone();
async move {
loop {
if let Some(exit) = rx.borrow().clone() {
return match exit {
Exit::Success(a) => Ok(a),
Exit::Failure(c) => Err(c),
};
}
if rx.changed().await.is_err() {
return Err(Cause::die("deferred: all senders dropped"));
}
}
}
}
#[inline]
pub fn try_succeed(&self, value: A) -> bool
where
A: Clone,
{
self.tx.send_if_modified(|slot| {
if slot.is_none() {
*slot = Some(Exit::succeed(value.clone()));
true
} else {
false
}
})
}
#[inline]
pub fn try_fail_cause(&self, cause: Cause<E>) -> bool
where
E: Clone,
{
self.tx.send_if_modified(|slot| {
if slot.is_none() {
*slot = Some(Exit::Failure(cause.clone()));
true
} else {
false
}
})
}
pub fn poll(&self) -> Effect<Option<Exit<A, E>>, Never, ()> {
let rx = self.rx.clone();
Effect::new(move |_r| Ok(rx.borrow().clone()))
}
pub fn is_done(&self) -> Effect<bool, Never, ()> {
let rx = self.rx.clone();
Effect::new(move |_r| Ok(rx.borrow().is_some()))
}
pub fn succeed(&self, value: A) -> Effect<bool, Never, ()> {
let tx = self.tx.clone();
Effect::new(move |_r| {
Ok(tx.send_if_modified(|slot| {
if slot.is_none() {
*slot = Some(Exit::succeed(value.clone()));
true
} else {
false
}
}))
})
}
pub fn fail(&self, error: E) -> Effect<bool, Never, ()> {
let tx = self.tx.clone();
Effect::new(move |_r| {
Ok(tx.send_if_modified(|slot| {
if slot.is_none() {
*slot = Some(Exit::fail(error.clone()));
true
} else {
false
}
}))
})
}
pub fn fail_cause(&self, cause: Cause<E>) -> Effect<bool, Never, ()> {
let tx = self.tx.clone();
Effect::new(move |_r| {
Ok(tx.send_if_modified(|slot| {
if slot.is_none() {
*slot = Some(Exit::Failure(cause.clone()));
true
} else {
false
}
}))
})
}
pub fn interrupt(&self) -> Effect<bool, Never, ()> {
let tx = self.tx.clone();
Effect::new(move |_r| {
Ok(tx.send_if_modified(|slot| {
if slot.is_none() {
*slot = Some(Exit::interrupt(FiberId::fresh()));
true
} else {
false
}
}))
})
}
pub fn complete<R>(&self, eff: Effect<A, E, R>) -> Effect<bool, E, R> {
let tx = self.tx.clone();
Effect::new_async(move |r| {
box_future(async move {
let value = eff.run(r).await?;
Ok(tx.send_if_modified(|slot| {
if slot.is_none() {
*slot = Some(Exit::succeed(value));
true
} else {
false
}
}))
})
})
}
pub fn unsafe_done(&self, exit: Exit<A, E>) -> Effect<(), Never, ()> {
let tx = self.tx.clone();
Effect::new(move |_r| {
let _ = tx.send(Some(exit));
Ok(())
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::kernel::succeed;
#[tokio::test]
async fn deferred_wait_suspends_until_succeed() {
let d = Deferred::<u8, ()>::make().run(&mut ()).await.unwrap();
let d2 = d.clone();
let (out, _) = tokio::join!(async { d.wait().run(&mut ()).await }, async {
tokio::task::yield_now().await;
d2.succeed(7).run(&mut ()).await.unwrap();
},);
assert_eq!(out, Ok(7));
}
#[tokio::test]
async fn deferred_second_succeed_returns_false() {
let d = Deferred::<i32, ()>::make().run(&mut ()).await.unwrap();
assert!(d.succeed(1).run(&mut ()).await.unwrap());
assert!(!d.succeed(2).run(&mut ()).await.unwrap());
}
#[tokio::test]
async fn deferred_fail_propagates_error() {
let d = Deferred::<u8, &str>::make().run(&mut ()).await.unwrap();
assert!(d.fail("boom").run(&mut ()).await.unwrap());
let out = d.wait().run(&mut ()).await;
assert_eq!(out, Err(Cause::fail("boom")));
}
#[tokio::test]
async fn deferred_interrupt_delivers_interrupt_cause() {
let d = Deferred::<u8, ()>::make().run(&mut ()).await.unwrap();
assert!(d.interrupt().run(&mut ()).await.unwrap());
let out = d.wait().run(&mut ()).await;
match out {
Err(Cause::Interrupt(_)) => {}
o => panic!("expected interrupt cause, got {o:?}"),
}
}
#[tokio::test]
async fn deferred_poll_none_before_complete() {
let d = Deferred::<u16, ()>::make().run(&mut ()).await.unwrap();
assert_eq!(d.poll().run(&mut ()).await.unwrap(), None);
d.succeed(42).run(&mut ()).await.unwrap();
assert_eq!(
d.poll().run(&mut ()).await.unwrap(),
Some(Exit::succeed(42_u16))
);
}
#[tokio::test]
async fn deferred_second_fail_cause_returns_false() {
let d = Deferred::<(), ()>::make().run(&mut ()).await.unwrap();
assert!(d.fail_cause(Cause::die("a")).run(&mut ()).await.unwrap());
assert!(!d.fail_cause(Cause::die("b")).run(&mut ()).await.unwrap());
}
#[tokio::test]
async fn deferred_complete_runs_effect_and_sets_success() {
let d = Deferred::<u8, ()>::make().run(&mut ()).await.unwrap();
let eff = succeed::<u8, (), ()>(9);
assert!(d.complete(eff).run(&mut ()).await.unwrap());
assert_eq!(d.wait().run(&mut ()).await, Ok(9));
}
#[tokio::test]
async fn deferred_unsafe_done_overwrites_prior() {
let d = Deferred::<u8, ()>::make().run(&mut ()).await.unwrap();
d.succeed(1).run(&mut ()).await.unwrap();
d.unsafe_done(Exit::succeed(2)).run(&mut ()).await.unwrap();
assert_eq!(d.wait().run(&mut ()).await, Ok(2));
}
}