use crate::errors::{Error, Result};
use crate::functor::{Closure, Functor, NoErrFunctor};
use crate::scheduler::{ImmediateScheduler, Scheduler, WithScheduler};
use crate::stop_token::{NeverStopToken, StopToken};
use crate::traits::{
BindSender, OperationState, Receiver, ReceiverOf, Sender, TypedSender, TypedSenderConnect,
};
use crate::tuple::Tuple;
use std::convert::From;
use std::marker::PhantomData;
use std::ops::BitOr;
pub struct UponError<'a, FnType, Sch, Out>
where
FnType: 'a + Functor<'a, Error, Output = Result<Out>>,
Out: 'a + Tuple,
Sch: Scheduler,
{
fn_impl: FnType,
sch: Sch,
phantom: PhantomData<&'a fn() -> Out>,
}
impl<'a, FnType, Out> From<FnType> for UponError<'a, FnType, ImmediateScheduler, Out>
where
FnType: 'a + Functor<'a, Error, Output = Result<Out>>,
Out: 'a + Tuple,
{
fn from(fn_impl: FnType) -> Self {
Self {
fn_impl,
sch: ImmediateScheduler,
phantom: PhantomData,
}
}
}
type ClosureUponError<'a, FnType, Sch, Out> =
UponError<'a, Closure<'a, FnType, Result<Out>, Error>, Sch, Out>;
impl<'a, FnType, Out> From<FnType> for ClosureUponError<'a, FnType, ImmediateScheduler, Out>
where
FnType: 'a + FnOnce(Error) -> Result<Out>,
Out: 'a + Tuple,
{
fn from(fn_impl: FnType) -> Self {
Self::from(Closure::new(fn_impl))
}
}
type NoErrUponError<'a, FunctorType, Sch, Out> =
UponError<'a, NoErrFunctor<'a, FunctorType, Out, Error>, Sch, Out>;
impl<'a, FnImpl, Out> From<FnImpl> for NoErrUponError<'a, FnImpl, ImmediateScheduler, Out>
where
FnImpl: 'a + Functor<'a, Error, Output = Out>,
Out: 'a + Tuple,
{
fn from(fn_impl: FnImpl) -> Self {
Self::from(NoErrFunctor::new(fn_impl))
}
}
type NoErrClosureUponError<'a, FnImpl, Sch, Out> =
NoErrUponError<'a, Closure<'a, FnImpl, Out, Error>, Sch, Out>;
impl<'a, FnImpl, Out> From<FnImpl> for NoErrClosureUponError<'a, FnImpl, ImmediateScheduler, Out>
where
FnImpl: 'a + FnOnce(Error) -> Out,
Out: 'a + Tuple,
{
fn from(fn_impl: FnImpl) -> Self {
Self::from(Closure::new(fn_impl))
}
}
impl<'a, FnType, Sch, Out> WithScheduler<Sch, FnType> for UponError<'a, FnType, Sch, Out>
where
FnType: 'a + Functor<'a, Error, Output = Result<Out>>,
Sch: Scheduler,
Out: 'a + Tuple,
{
fn with_scheduler(sch: Sch, fn_impl: FnType) -> Self {
Self {
fn_impl,
sch,
phantom: PhantomData,
}
}
}
impl<'a, FnType, Sch, Out> WithScheduler<Sch, FnType> for ClosureUponError<'a, FnType, Sch, Out>
where
FnType: 'a + FnOnce(Error) -> Result<Out>,
Sch: Scheduler,
Out: 'a + Tuple,
{
fn with_scheduler(sch: Sch, fn_impl: FnType) -> Self {
Self::with_scheduler(sch, Closure::new(fn_impl))
}
}
impl<'a, FnImpl, Sch, Out> WithScheduler<Sch, FnImpl> for NoErrUponError<'a, FnImpl, Sch, Out>
where
FnImpl: 'a + Functor<'a, Error, Output = Out>,
Sch: Scheduler,
Out: 'a + Tuple,
{
fn with_scheduler(sch: Sch, fn_impl: FnImpl) -> Self {
Self::with_scheduler(sch, NoErrFunctor::new(fn_impl))
}
}
impl<'a, FnImpl, Sch, Out> WithScheduler<Sch, FnImpl>
for NoErrClosureUponError<'a, FnImpl, Sch, Out>
where
FnImpl: 'a + FnOnce(Error) -> Out,
Sch: Scheduler,
Out: 'a + Tuple,
{
fn with_scheduler(sch: Sch, fn_impl: FnImpl) -> Self {
Self::with_scheduler(sch, Closure::new(fn_impl))
}
}
impl<'a, FnType, Sch, Out> Sender for UponError<'a, FnType, Sch, Out>
where
FnType: 'a + Functor<'a, Error, Output = Result<Out>>,
Out: 'a + Tuple,
Sch: Scheduler,
{
}
impl<'a, FnType, Sch, Out, NestedSender> BindSender<NestedSender>
for UponError<'a, FnType, Sch, Out>
where
NestedSender: TypedSender<Scheduler = Sch::LocalScheduler, Value = Out>,
FnType: 'a + Functor<'a, Error, Output = Result<Out>>,
Out: 'a + Tuple,
Sch: Scheduler,
{
type Output = UponErrorTS<'a, NestedSender, FnType, Sch, Out>;
fn bind(self, nested: NestedSender) -> Self::Output {
UponErrorTS {
nested,
fn_impl: self.fn_impl,
sch: self.sch,
phantom: PhantomData,
}
}
}
pub struct UponErrorTS<'a, NestedSender, FnType, Sch, Out>
where
NestedSender: TypedSender<Scheduler = Sch::LocalScheduler, Value = Out>,
FnType: 'a + Functor<'a, Error, Output = Result<Out>>,
Out: 'a + Tuple,
Sch: Scheduler,
{
nested: NestedSender,
fn_impl: FnType,
sch: Sch,
phantom: PhantomData<&'a fn() -> Out>,
}
impl<'a, NestedSender, FnType, Sch, Out> TypedSender
for UponErrorTS<'a, NestedSender, FnType, Sch, Out>
where
NestedSender: TypedSender<Scheduler = Sch::LocalScheduler, Value = Out>,
FnType: 'a + Functor<'a, Error, Output = Result<Out>>,
Out: 'a + Tuple,
Sch: Scheduler,
{
type Scheduler = Sch::LocalScheduler;
type Value = Out;
}
impl<'a, ScopeImpl, StopTokenImpl, ReceiverType, NestedSender, FnType, Sch, Out>
TypedSenderConnect<'a, ScopeImpl, StopTokenImpl, ReceiverType>
for UponErrorTS<'a, NestedSender, FnType, Sch, Out>
where
ReceiverType: ReceiverOf<Sch::LocalScheduler, Out>,
NestedSender: TypedSender<Scheduler = Sch::LocalScheduler, Value = Out>
+ TypedSenderConnect<
'a,
ScopeImpl,
StopTokenImpl,
ReceiverWrapper<'a, ScopeImpl, ReceiverType, FnType, Sch, Out>,
>,
FnType: 'a + Functor<'a, Error, Output = Result<Out>>,
Out: 'a + Tuple,
Sch: Scheduler,
Sch::Sender: TypedSenderConnect<
'a,
ScopeImpl,
NeverStopToken,
ErrorReceiver<'a, ReceiverType, FnType, Sch::LocalScheduler, Out>,
>,
ScopeImpl: Clone,
StopTokenImpl: StopToken,
{
type Output<'scope> = NestedSender::Output<'scope>
where
'a: 'scope,
ScopeImpl: 'scope,
StopTokenImpl: 'scope,
ReceiverType: 'scope;
fn connect<'scope>(
self,
scope: &ScopeImpl,
stop_token: StopTokenImpl,
receiver: ReceiverType,
) -> Self::Output<'scope>
where
'a: 'scope,
ScopeImpl: 'scope,
StopTokenImpl: 'scope,
ReceiverType: 'scope,
{
let receiver = ReceiverWrapper {
nested: receiver,
fn_impl: self.fn_impl,
sch: self.sch,
phantom: PhantomData,
scope: scope.clone(),
};
self.nested.connect(scope, stop_token, receiver)
}
}
impl<'a, NestedSender, FnType, Sch, Out, BindSenderImpl> BitOr<BindSenderImpl>
for UponErrorTS<'a, NestedSender, FnType, Sch, Out>
where
BindSenderImpl: BindSender<Self>,
NestedSender: TypedSender<Scheduler = Sch::LocalScheduler, Value = Out>,
FnType: 'a + Functor<'a, Error, Output = Result<Out>>,
Out: 'a + Tuple,
Sch: Scheduler,
{
type Output = BindSenderImpl::Output;
fn bitor(self, rhs: BindSenderImpl) -> Self::Output {
rhs.bind(self)
}
}
pub struct ReceiverWrapper<'a, ScopeImpl, NestedReceiver, FnType, Sch, Out>
where
NestedReceiver: ReceiverOf<Sch::LocalScheduler, Out>,
FnType: 'a + Functor<'a, Error, Output = Result<Out>>,
Sch: Scheduler,
Sch::Sender: TypedSenderConnect<
'a,
ScopeImpl,
NeverStopToken,
ErrorReceiver<'a, NestedReceiver, FnType, Sch::LocalScheduler, Out>,
>,
Out: 'a + Tuple,
{
nested: NestedReceiver,
fn_impl: FnType,
sch: Sch,
phantom: PhantomData<&'a fn() -> Out>,
scope: ScopeImpl,
}
impl<'a, ScopeImpl, NestedReceiver, FnType, Sch, Out> Receiver
for ReceiverWrapper<'a, ScopeImpl, NestedReceiver, FnType, Sch, Out>
where
NestedReceiver: ReceiverOf<Sch::LocalScheduler, Out>,
FnType: 'a + Functor<'a, Error, Output = Result<Out>>,
Sch: Scheduler,
Sch::Sender: TypedSenderConnect<
'a,
ScopeImpl,
NeverStopToken,
ErrorReceiver<'a, NestedReceiver, FnType, Sch::LocalScheduler, Out>,
>,
Out: 'a + Tuple,
{
fn set_done(self) {
self.nested.set_done()
}
fn set_error(self, error: Error) {
self.sch
.schedule()
.connect(
&self.scope,
NeverStopToken,
ErrorReceiver::<'a, NestedReceiver, FnType, Sch::LocalScheduler, Out> {
nested: self.nested,
error,
fn_impl: self.fn_impl,
phantom: PhantomData,
},
)
.start()
}
}
impl<'a, ScopeImpl, NestedReceiver, FnType, Sch, Out> ReceiverOf<Sch::LocalScheduler, Out>
for ReceiverWrapper<'a, ScopeImpl, NestedReceiver, FnType, Sch, Out>
where
NestedReceiver: ReceiverOf<Sch::LocalScheduler, Out>,
FnType: 'a + Functor<'a, Error, Output = Result<Out>>,
Sch: Scheduler,
Sch::Sender: TypedSenderConnect<
'a,
ScopeImpl,
NeverStopToken,
ErrorReceiver<'a, NestedReceiver, FnType, Sch::LocalScheduler, Out>,
>,
Out: 'a + Tuple,
{
fn set_value(self, sch: Sch::LocalScheduler, v: Out) {
self.nested.set_value(sch, v);
}
}
pub struct ErrorReceiver<'a, NestedReceiver, FnType, Sch, Out>
where
NestedReceiver: ReceiverOf<Sch, Out>,
FnType: 'a + Functor<'a, Error, Output = Result<Out>>,
Sch: Scheduler<LocalScheduler = Sch>,
Out: 'a + Tuple,
{
nested: NestedReceiver,
error: Error,
fn_impl: FnType,
phantom: PhantomData<&'a fn(Sch) -> Out>,
}
impl<'a, NestedReceiver, FnType, Sch, Out> Receiver
for ErrorReceiver<'a, NestedReceiver, FnType, Sch, Out>
where
NestedReceiver: ReceiverOf<Sch, Out>,
FnType: 'a + Functor<'a, Error, Output = Result<Out>>,
Sch: Scheduler<LocalScheduler = Sch>,
Out: 'a + Tuple,
{
fn set_done(self) {
self.nested.set_done();
}
fn set_error(self, error: Error) {
self.nested.set_error(error);
}
}
impl<'a, NestedReceiver, FnType, Sch, Out> ReceiverOf<Sch, ()>
for ErrorReceiver<'a, NestedReceiver, FnType, Sch, Out>
where
NestedReceiver: ReceiverOf<Sch, Out>,
FnType: 'a + Functor<'a, Error, Output = Result<Out>>,
Sch: Scheduler<LocalScheduler = Sch>,
Out: 'a + Tuple,
{
fn set_value(self, sch: Sch, _: ()) {
match self.fn_impl.tuple_invoke(self.error) {
Ok(v) => self.nested.set_value(sch, v),
Err(e) => self.nested.set_error(e),
}
}
}