use crate::errors::{Error, Result};
use crate::functor::{Closure, Functor, NoErrFunctor};
use crate::stop_token::StopToken;
use crate::traits::{
BindSender, OperationState, Receiver, ReceiverOf, Sender, TypedSender, TypedSenderConnect,
};
use std::marker::PhantomData;
use std::ops::BitOr;
pub struct LetError<'a, FnType, Out>
where
FnType: 'a + Functor<'a, Error, Output = Result<Out>>,
Out: TypedSender,
{
fn_impl: FnType,
phantom: PhantomData<&'a fn() -> Out>,
}
impl<'a, FnType, Out> From<FnType> for LetError<'a, FnType, Out>
where
FnType: 'a + Functor<'a, Error, Output = Result<Out>>,
Out: TypedSender,
{
fn from(fn_impl: FnType) -> Self {
LetError {
fn_impl,
phantom: PhantomData,
}
}
}
type ClosureLetError<'a, FnType, Out> = LetError<'a, Closure<'a, FnType, Result<Out>, Error>, Out>;
impl<'a, FnType, Out> From<FnType> for ClosureLetError<'a, FnType, Out>
where
FnType: 'a + FnOnce(Error) -> Result<Out>,
Out: TypedSender,
{
fn from(fn_impl: FnType) -> Self {
Self::from(Closure::new(fn_impl))
}
}
type NoErrLetError<'a, FnType, Out> = LetError<'a, NoErrFunctor<'a, FnType, Out, Error>, Out>;
impl<'a, FnType, Out> From<FnType> for NoErrLetError<'a, FnType, Out>
where
FnType: 'a + Functor<'a, Error, Output = Out>,
Out: TypedSender,
{
fn from(fn_impl: FnType) -> Self {
Self::from(NoErrFunctor::new(fn_impl))
}
}
type NoErrClosureLetError<'a, FnType, Out> =
NoErrLetError<'a, Closure<'a, FnType, Out, Error>, Out>;
impl<'a, FnType, Out> From<FnType> for NoErrClosureLetError<'a, FnType, Out>
where
FnType: 'a + FnOnce(Error) -> Out,
Out: TypedSender,
{
fn from(fn_impl: FnType) -> Self {
Self::from(Closure::new(fn_impl))
}
}
impl<'a, FnType, Out> Sender for LetError<'a, FnType, Out>
where
FnType: 'a + Functor<'a, Error, Output = Result<Out>>,
Out: TypedSender,
{
}
impl<'a, FnType, Out, NestedSender> BindSender<NestedSender> for LetError<'a, FnType, Out>
where
NestedSender: TypedSender<Scheduler = Out::Scheduler, Value = Out::Value>,
FnType: 'a + Functor<'a, Error, Output = Result<Out>>,
Out: TypedSender,
{
type Output = LetErrorSender<'a, NestedSender, FnType, Out>;
fn bind(self, nested: NestedSender) -> Self::Output {
LetErrorSender {
nested,
fn_impl: self.fn_impl,
phantom: PhantomData,
}
}
}
pub struct LetErrorSender<'a, NestedSender, FnType, Out>
where
NestedSender: TypedSender<Scheduler = Out::Scheduler, Value = Out::Value>,
FnType: 'a + Functor<'a, Error, Output = Result<Out>>,
Out: TypedSender,
{
nested: NestedSender,
fn_impl: FnType,
phantom: PhantomData<&'a fn() -> Out>,
}
impl<'a, FnType, Out, NestedSender> TypedSender for LetErrorSender<'a, NestedSender, FnType, Out>
where
NestedSender: TypedSender<Scheduler = Out::Scheduler, Value = Out::Value>,
FnType: 'a + Functor<'a, Error, Output = Result<Out>>,
Out: TypedSender,
{
type Value = Out::Value;
type Scheduler = Out::Scheduler;
}
impl<'a, ScopeImpl, StopTokenImpl, ReceiverType, FnType, Out, NestedSender>
TypedSenderConnect<'a, ScopeImpl, StopTokenImpl, ReceiverType>
for LetErrorSender<'a, NestedSender, FnType, Out>
where
NestedSender: TypedSender<Scheduler = Out::Scheduler, Value = Out::Value>
+ TypedSenderConnect<
'a,
ScopeImpl,
StopTokenImpl,
ReceiverWrapper<'a, ScopeImpl, StopTokenImpl, ReceiverType, FnType, Out>,
>,
FnType: 'a + Functor<'a, Error, Output = Result<Out>>,
Out: TypedSender + TypedSenderConnect<'a, ScopeImpl, StopTokenImpl, ReceiverType>,
ReceiverType: ReceiverOf<Out::Scheduler, Out::Value>,
ScopeImpl: Clone,
StopTokenImpl: StopToken,
{
type Output<'scope> = NestedSender::Output<'scope>
where
'a: 'scope,
ScopeImpl: 'scope,
StopTokenImpl: 'scope,
ReceiverType: 'scope;
fn connect<'scope>(
self,
scope: &ScopeImpl,
stop_token: StopTokenImpl,
receiver: ReceiverType,
) -> Self::Output<'scope>
where
'a: 'scope,
ScopeImpl: 'scope,
StopTokenImpl: 'scope,
ReceiverType: 'scope,
{
let receiver = ReceiverWrapper {
receiver,
fn_impl: self.fn_impl,
phantom: PhantomData,
scope: scope.clone(),
stop_token: stop_token.clone(),
};
self.nested.connect(scope, stop_token, receiver)
}
}
impl<'a, BindSenderImpl, NestedSender, FnType, Out> BitOr<BindSenderImpl>
for LetErrorSender<'a, NestedSender, FnType, Out>
where
BindSenderImpl: BindSender<Self>,
NestedSender: TypedSender<Scheduler = Out::Scheduler, Value = Out::Value>,
FnType: Functor<'a, Error, Output = Result<Out>>,
Out: TypedSender,
{
type Output = BindSenderImpl::Output;
fn bitor(self, rhs: BindSenderImpl) -> Self::Output {
rhs.bind(self)
}
}
pub struct ReceiverWrapper<'a, ScopeImpl, StopTokenImpl, ReceiverType, FnType, Out>
where
ReceiverType: ReceiverOf<Out::Scheduler, Out::Value>,
FnType: Functor<'a, Error, Output = Result<Out>>,
Out: TypedSenderConnect<'a, ScopeImpl, StopTokenImpl, ReceiverType>,
StopTokenImpl: StopToken,
{
receiver: ReceiverType,
fn_impl: FnType,
phantom: PhantomData<&'a fn() -> Out>,
scope: ScopeImpl,
stop_token: StopTokenImpl,
}
impl<'a, ScopeImpl, StopTokenImpl, ReceiverType, FnType, Out> Receiver
for ReceiverWrapper<'a, ScopeImpl, StopTokenImpl, ReceiverType, FnType, Out>
where
ReceiverType: ReceiverOf<Out::Scheduler, Out::Value>,
FnType: Functor<'a, Error, Output = Result<Out>>,
Out: TypedSenderConnect<'a, ScopeImpl, StopTokenImpl, ReceiverType>,
StopTokenImpl: StopToken,
{
fn set_done(self) {
self.receiver.set_done();
}
fn set_error(self, error: Error) {
match self.fn_impl.tuple_invoke(error) {
Ok(sender) => sender
.connect(&self.scope, self.stop_token, self.receiver)
.start(),
Err(error) => self.receiver.set_error(error),
};
}
}
impl<'a, ScopeImpl, StopTokenImpl, ReceiverType, FnType, Out> ReceiverOf<Out::Scheduler, Out::Value>
for ReceiverWrapper<'a, ScopeImpl, StopTokenImpl, ReceiverType, FnType, Out>
where
ReceiverType: ReceiverOf<Out::Scheduler, Out::Value>,
FnType: Functor<'a, Error, Output = Result<Out>>,
Out: TypedSenderConnect<'a, ScopeImpl, StopTokenImpl, ReceiverType>,
StopTokenImpl: StopToken,
{
fn set_value(self, sch: Out::Scheduler, value: Out::Value) {
self.receiver.set_value(sch, value);
}
}
#[cfg(test)]
mod tests {
use super::LetError;
use crate::errors::{new_error, Error, ErrorForTesting};
use crate::just::Just;
use crate::scheduler::{ImmediateScheduler, Scheduler};
use crate::sync_wait::SyncWait;
#[test]
fn it_works() {
assert_eq!(
Some((String::from("yay"),)),
(ImmediateScheduler.schedule_error::<(String,)>(new_error(ErrorForTesting::from(
"this error will be consumed"
))) | LetError::from(|error: Error| {
assert_eq!(
ErrorForTesting::from("this error will be consumed"),
*error.downcast_ref::<ErrorForTesting>().unwrap()
);
ImmediateScheduler.schedule_value((String::from("yay"),))
}))
.sync_wait()
.expect("should succeed")
);
}
#[test]
fn it_works_with_errors() {
assert_eq!(
ErrorForTesting::from("this error will be returned"),
*(ImmediateScheduler.schedule_error::<(String,)>(new_error(ErrorForTesting::from(
"this error will be consumed"
))) | LetError::from(|error: Error| {
match error.downcast_ref::<ErrorForTesting>() {
Some(_) => Err(new_error(ErrorForTesting::from(
"this error will be returned",
))),
None => Ok(ImmediateScheduler.schedule_value((String::from("nay"),))),
}
}))
.sync_wait()
.expect_err("should return an error")
.downcast_ref::<ErrorForTesting>()
.unwrap()
);
}
#[test]
fn it_cascades_done() {
assert_eq!(
None,
(ImmediateScheduler.schedule_done::<(String,)>()
| LetError::from(|_: Error| -> Just<ImmediateScheduler, (String,)> {
panic!("should not be called!");
}))
.sync_wait()
.expect("should succeed")
);
}
#[test]
fn it_cascades_value() {
assert_eq!(
Some((String::from("yay"),)),
(ImmediateScheduler.schedule_value((String::from("yay"),))
| LetError::from(|_: Error| -> Just<ImmediateScheduler, (String,)> {
panic!("should not be called!");
}))
.sync_wait()
.expect("should succeed")
);
}
}