use crate::errors::{Error, Result};
use crate::functor::{Closure, Functor, NoErrFunctor};
use crate::scheduler::Scheduler;
use crate::stop_token::StopToken;
use crate::traits::{BindSender, Receiver, ReceiverOf, Sender, TypedSender, TypedSenderConnect};
use crate::tuple::Tuple;
use std::marker::PhantomData;
use std::ops::BitOr;
pub struct Then<'a, FnType, Out, ArgTuple>
where
FnType: 'a + Functor<'a, ArgTuple, Output = Result<Out>>,
ArgTuple: Tuple,
Out: 'a + Tuple,
{
fn_impl: FnType,
phantom: PhantomData<&'a fn(ArgTuple) -> Out>,
}
impl<'a, FnType, Out, ArgTuple> From<FnType> for Then<'a, FnType, Out, ArgTuple>
where
FnType: 'a + Functor<'a, ArgTuple, Output = Result<Out>>,
ArgTuple: Tuple,
Out: 'a + Tuple,
{
fn from(fn_impl: FnType) -> Self {
Then {
fn_impl,
phantom: PhantomData,
}
}
}
type ClosureThen<'a, FnType, Out, ArgTuple> =
Then<'a, Closure<'a, FnType, Result<Out>, ArgTuple>, Out, ArgTuple>;
impl<'a, FnType, ArgTuple, Out> From<FnType> for ClosureThen<'a, FnType, Out, ArgTuple>
where
FnType: 'a + FnOnce(ArgTuple) -> Result<Out>,
ArgTuple: Tuple,
Out: 'a + Tuple,
{
fn from(fn_impl: FnType) -> Self {
Self::from(Closure::new(fn_impl))
}
}
type NoErrThen<'a, FunctorType, Out, ArgTuple> =
Then<'a, NoErrFunctor<'a, FunctorType, Out, ArgTuple>, Out, ArgTuple>;
impl<'a, FnImpl, Out, ArgTuple> From<FnImpl> for NoErrThen<'a, FnImpl, Out, ArgTuple>
where
FnImpl: 'a + Functor<'a, ArgTuple, Output = Out>,
ArgTuple: Tuple,
Out: 'a + Tuple,
{
fn from(fn_impl: FnImpl) -> Self {
Self::from(NoErrFunctor::new(fn_impl))
}
}
type NoErrClosureThen<'a, FnImpl, Out, ArgTuple> =
NoErrThen<'a, Closure<'a, FnImpl, Out, ArgTuple>, Out, ArgTuple>;
impl<'a, FnImpl, Out, ArgTuple> From<FnImpl> for NoErrClosureThen<'a, FnImpl, Out, ArgTuple>
where
FnImpl: 'a + FnOnce(ArgTuple) -> Out,
ArgTuple: Tuple,
Out: 'a + Tuple,
{
fn from(fn_impl: FnImpl) -> Self {
Self::from(Closure::new(fn_impl))
}
}
impl<'a, FnType, Out: Tuple, ArgTuple: Tuple> Sender for Then<'a, FnType, Out, ArgTuple> where
FnType: 'a + Functor<'a, ArgTuple, Output = Result<Out>>
{
}
impl<'a, FnType, Out, NestedSender> BindSender<NestedSender>
for Then<'a, FnType, Out, <NestedSender as TypedSender>::Value>
where
NestedSender: TypedSender,
FnType: 'a + Functor<'a, NestedSender::Value, Output = Result<Out>>,
NestedSender::Value: Tuple,
Out: 'a + Tuple,
{
type Output = ThenSender<'a, NestedSender, FnType, Out>;
fn bind(self, nested: NestedSender) -> Self::Output {
ThenSender {
nested,
fn_impl: self.fn_impl,
phantom: PhantomData,
}
}
}
impl<'a, NestedSender, FnType, Out, BindSenderImpl> BitOr<BindSenderImpl>
for ThenSender<'a, NestedSender, FnType, Out>
where
BindSenderImpl: BindSender<ThenSender<'a, NestedSender, FnType, Out>>,
NestedSender: TypedSender,
FnType: 'a + Functor<'a, NestedSender::Value, Output = Result<Out>>,
NestedSender::Value: Tuple,
Out: 'a + Tuple,
{
type Output = BindSenderImpl::Output;
fn bitor(self, rhs: BindSenderImpl) -> Self::Output {
rhs.bind(self)
}
}
pub struct ThenSender<'a, NestedSender, FnType, Out>
where
NestedSender: TypedSender,
FnType: 'a + Functor<'a, NestedSender::Value, Output = Result<Out>>,
Out: 'a + Tuple,
{
nested: NestedSender,
fn_impl: FnType,
phantom: PhantomData<&'a fn() -> Out>,
}
impl<'a, NestedSender, FnType, Out> TypedSender for ThenSender<'a, NestedSender, FnType, Out>
where
NestedSender: TypedSender,
FnType: 'a + Functor<'a, NestedSender::Value, Output = Result<Out>>,
Out: 'a + Tuple,
{
type Value = Out;
type Scheduler = NestedSender::Scheduler;
}
impl<'a, ScopeImpl, StopTokenImpl, ReceiverImpl, NestedSender, FnType, Out>
TypedSenderConnect<'a, ScopeImpl, StopTokenImpl, ReceiverImpl>
for ThenSender<'a, NestedSender, FnType, Out>
where
ReceiverImpl: ReceiverOf<Self::Scheduler, Out>,
NestedSender: TypedSender
+ TypedSenderConnect<
'a,
ScopeImpl,
StopTokenImpl,
ThenWrappedReceiver<
'a,
ReceiverImpl,
FnType,
Self::Scheduler,
Out,
<NestedSender as TypedSender>::Value,
>,
>,
NestedSender::Value: 'a,
FnType: 'a + Functor<'a, NestedSender::Value, Output = Result<Out>>,
Out: 'a + Tuple,
StopTokenImpl: StopToken,
{
type Output<'scope> = NestedSender::Output<'scope>
where
'a: 'scope,
ScopeImpl: 'scope,
StopTokenImpl: 'scope,
ReceiverImpl: 'scope;
fn connect<'scope>(
self,
scope: &ScopeImpl,
stop_token: StopTokenImpl,
receiver: ReceiverImpl,
) -> Self::Output<'scope>
where
'a: 'scope,
ScopeImpl: 'scope,
StopTokenImpl: 'scope,
ReceiverImpl: 'scope,
{
let wrapped_receiver = ThenWrappedReceiver {
nested: receiver,
fn_impl: self.fn_impl,
phantom: PhantomData,
};
self.nested.connect(scope, stop_token, wrapped_receiver)
}
}
pub struct ThenWrappedReceiver<'a, ReceiverImpl, FnType, Sch, Out, ArgTuple>
where
ReceiverImpl: ReceiverOf<Sch, Out>,
FnType: 'a + Functor<'a, ArgTuple, Output = Result<Out>>,
Sch: Scheduler<LocalScheduler = Sch>,
ArgTuple: Tuple,
Out: 'a + Tuple,
{
nested: ReceiverImpl,
fn_impl: FnType,
phantom: PhantomData<&'a fn(Sch, ArgTuple) -> Out>,
}
impl<'a, ReceiverImpl, FnType, Sch, ArgTuple, Out> Receiver
for ThenWrappedReceiver<'a, ReceiverImpl, FnType, Sch, Out, ArgTuple>
where
ReceiverImpl: ReceiverOf<Sch, Out>,
FnType: 'a + Functor<'a, ArgTuple, Output = Result<Out>>,
Sch: Scheduler<LocalScheduler = Sch>,
ArgTuple: Tuple,
Out: 'a + Tuple,
{
fn set_done(self) {
self.nested.set_done();
}
fn set_error(self, error: Error) {
self.nested.set_error(error);
}
}
impl<'a, ReceiverImpl, FnType, Sch, ArgTuple, Out> ReceiverOf<Sch, ArgTuple>
for ThenWrappedReceiver<'a, ReceiverImpl, FnType, Sch, Out, ArgTuple>
where
ReceiverImpl: ReceiverOf<Sch, Out>,
FnType: 'a + Functor<'a, ArgTuple, Output = Result<Out>>,
Sch: Scheduler<LocalScheduler = Sch>,
ArgTuple: Tuple,
Out: 'a + Tuple,
{
fn set_value(self, sch: Sch, values: ArgTuple) {
match self.fn_impl.tuple_invoke(values) {
Ok(v) => self.nested.set_value(sch, v),
Err(e) => self.nested.set_error(e),
};
}
}
#[cfg(test)]
mod tests {
use super::Then;
use crate::errors::{new_error, ErrorForTesting, Result};
use crate::just::Just;
use crate::just_error::JustError;
use crate::scheduler::ImmediateScheduler;
use crate::sync_wait::SyncWait;
#[test]
fn it_works() {
assert_eq!(
Some((6, 7, 8)),
(Just::from((4, 5, 6)) | Then::from(|(x, y, z)| (x + 2, y + 2, z + 2)))
.sync_wait()
.expect("should succeed")
)
}
#[test]
fn it_works_with_errors() {
assert_eq!(
Some((6, 7, 8)),
(Just::from((4, 5, 6)) | Then::from(|(x, y, z)| Ok((x + 2, y + 2, z + 2))))
.sync_wait()
.expect("should succeed")
)
}
#[test]
fn errors_from_preceding_sender_are_propagated() {
match (JustError::<ImmediateScheduler, ()>::from(new_error(ErrorForTesting::from("error")))
| Then::from(|()| -> (i32, i32) { panic!("expect this function to not be invoked") }))
.sync_wait()
{
Ok(_) => panic!("expected an error"),
Err(e) => {
assert_eq!(
ErrorForTesting::from("error"),
*e.downcast_ref::<ErrorForTesting>().unwrap()
);
}
}
}
#[test]
fn errors_from_functor_are_propagated() {
match (Just::from(())
| Then::from(|()| -> Result<()> { Err(Box::new(ErrorForTesting::from("error"))) }))
.sync_wait()
{
Ok(_) => panic!("expected an error"),
Err(e) => {
assert_eq!(
ErrorForTesting::from("error"),
*e.downcast_ref::<ErrorForTesting>().unwrap()
);
}
}
}
}