use crate::errors::{Error, Result};
use crate::functor::{NoArgClosure, NoArgFunctor, NoErrNoArgFunctor};
use crate::scheduler::{ImmediateScheduler, Scheduler, WithScheduler};
use crate::stop_token::{NeverStopToken, StopToken};
use crate::traits::{
BindSender, OperationState, Receiver, ReceiverOf, Sender, TypedSender, TypedSenderConnect,
};
use crate::tuple::Tuple;
use std::convert::From;
use std::marker::PhantomData;
use std::ops::BitOr;
pub struct UponDone<'a, FnType, Sch, Out>
where
FnType: 'a + NoArgFunctor<'a, Output = Result<Out>>,
Out: 'a + Tuple,
Sch: Scheduler,
{
fn_impl: FnType,
sch: Sch,
phantom: PhantomData<&'a fn() -> Out>,
}
impl<'a, FnType, Out> From<FnType> for UponDone<'a, FnType, ImmediateScheduler, Out>
where
FnType: 'a + NoArgFunctor<'a, Output = Result<Out>>,
Out: 'a + Tuple,
{
fn from(fn_impl: FnType) -> Self {
Self {
fn_impl,
sch: ImmediateScheduler,
phantom: PhantomData,
}
}
}
type ClosureUponDone<'a, FnType, Sch, Out> =
UponDone<'a, NoArgClosure<'a, FnType, Result<Out>>, Sch, Out>;
impl<'a, FnType, Out> From<FnType> for ClosureUponDone<'a, FnType, ImmediateScheduler, Out>
where
FnType: 'a + FnOnce() -> Result<Out>,
Out: 'a + Tuple,
{
fn from(fn_impl: FnType) -> Self {
Self::from(NoArgClosure::new(fn_impl))
}
}
type NoErrUponDone<'a, FunctorType, Sch, Out> =
UponDone<'a, NoErrNoArgFunctor<'a, FunctorType, Out>, Sch, Out>;
impl<'a, FnImpl, Out> From<FnImpl> for NoErrUponDone<'a, FnImpl, ImmediateScheduler, Out>
where
FnImpl: 'a + NoArgFunctor<'a, Output = Out>,
Out: 'a + Tuple,
{
fn from(fn_impl: FnImpl) -> Self {
Self::from(NoErrNoArgFunctor::new(fn_impl))
}
}
type NoErrClosureUponDone<'a, FnImpl, Sch, Out> =
NoErrUponDone<'a, NoArgClosure<'a, FnImpl, Out>, Sch, Out>;
impl<'a, FnImpl, Out> From<FnImpl> for NoErrClosureUponDone<'a, FnImpl, ImmediateScheduler, Out>
where
FnImpl: 'a + FnOnce() -> Out,
Out: 'a + Tuple,
{
fn from(fn_impl: FnImpl) -> Self {
Self::from(NoArgClosure::new(fn_impl))
}
}
impl<'a, FnType, Sch, Out> WithScheduler<Sch, FnType> for UponDone<'a, FnType, Sch, Out>
where
FnType: NoArgFunctor<'a, Output = Result<Out>>,
Sch: Scheduler,
Out: 'a + Tuple,
{
fn with_scheduler(sch: Sch, fn_impl: FnType) -> Self {
Self {
fn_impl,
sch,
phantom: PhantomData,
}
}
}
impl<'a, FnType, Sch, Out> WithScheduler<Sch, FnType> for ClosureUponDone<'a, FnType, Sch, Out>
where
FnType: 'a + FnOnce() -> Result<Out>,
Sch: Scheduler,
Out: 'a + Tuple,
{
fn with_scheduler(sch: Sch, fn_impl: FnType) -> Self {
Self::with_scheduler(sch, NoArgClosure::new(fn_impl))
}
}
impl<'a, FnImpl, Sch, Out> WithScheduler<Sch, FnImpl> for NoErrUponDone<'a, FnImpl, Sch, Out>
where
FnImpl: 'a + NoArgFunctor<'a, Output = Out>,
Sch: Scheduler,
Out: 'a + Tuple,
{
fn with_scheduler(sch: Sch, fn_impl: FnImpl) -> Self {
Self::with_scheduler(sch, NoErrNoArgFunctor::new(fn_impl))
}
}
impl<'a, FnImpl, Sch, Out> WithScheduler<Sch, FnImpl> for NoErrClosureUponDone<'a, FnImpl, Sch, Out>
where
FnImpl: 'a + FnOnce() -> Out,
Sch: Scheduler,
Out: 'a + Tuple,
{
fn with_scheduler(sch: Sch, fn_impl: FnImpl) -> Self {
Self::with_scheduler(sch, NoArgClosure::new(fn_impl))
}
}
impl<'a, FnType, Sch, Out> Sender for UponDone<'a, FnType, Sch, Out>
where
FnType: 'a + NoArgFunctor<'a, Output = Result<Out>>,
Out: 'a + Tuple,
Sch: Scheduler,
{
}
impl<'a, FnType, Sch, Out, NestedSender> BindSender<NestedSender> for UponDone<'a, FnType, Sch, Out>
where
NestedSender: TypedSender<Scheduler = Sch::LocalScheduler, Value = Out>,
FnType: 'a + NoArgFunctor<'a, Output = Result<Out>>,
Out: 'a + Tuple,
Sch: Scheduler,
{
type Output = UponDoneTS<'a, NestedSender, FnType, Sch, Out>;
fn bind(self, nested: NestedSender) -> Self::Output {
UponDoneTS {
nested,
fn_impl: self.fn_impl,
sch: self.sch,
phantom: PhantomData,
}
}
}
pub struct UponDoneTS<'a, NestedSender, FnType, Sch, Out>
where
NestedSender: TypedSender<Scheduler = Sch::LocalScheduler, Value = Out>,
FnType: 'a + NoArgFunctor<'a, Output = Result<Out>>,
Out: 'a + Tuple,
Sch: Scheduler,
{
nested: NestedSender,
fn_impl: FnType,
sch: Sch,
phantom: PhantomData<&'a fn() -> Out>,
}
impl<'a, NestedSender, FnType, Sch, Out> TypedSender
for UponDoneTS<'a, NestedSender, FnType, Sch, Out>
where
NestedSender: TypedSender<Scheduler = Sch::LocalScheduler, Value = Out>,
FnType: 'a + NoArgFunctor<'a, Output = Result<Out>>,
Out: 'a + Tuple,
Sch: Scheduler,
{
type Scheduler = Sch::LocalScheduler;
type Value = Out;
}
impl<'a, ScopeImpl, StopTokenImpl, ReceiverType, NestedSender, FnType, Sch, Out>
TypedSenderConnect<'a, ScopeImpl, StopTokenImpl, ReceiverType>
for UponDoneTS<'a, NestedSender, FnType, Sch, Out>
where
ReceiverType: ReceiverOf<Sch::LocalScheduler, Out>,
NestedSender: TypedSender<Scheduler = Sch::LocalScheduler, Value = Out>
+ TypedSenderConnect<
'a,
ScopeImpl,
StopTokenImpl,
ReceiverWrapper<'a, ScopeImpl, ReceiverType, FnType, Sch, Out>,
>,
FnType: 'a + NoArgFunctor<'a, Output = Result<Out>>,
Out: 'a + Tuple,
Sch: Scheduler,
Sch::Sender: TypedSenderConnect<
'a,
ScopeImpl,
NeverStopToken,
DoneReceiver<'a, ReceiverType, FnType, Sch::LocalScheduler, Out>,
>,
ScopeImpl: Clone,
StopTokenImpl: StopToken,
{
type Output<'scope> = NestedSender::Output<'scope>
where
'a: 'scope,
ScopeImpl: 'scope,
StopTokenImpl: 'scope,
ReceiverType: 'scope;
fn connect<'scope>(
self,
scope: &ScopeImpl,
stop_token: StopTokenImpl,
receiver: ReceiverType,
) -> Self::Output<'scope>
where
'a: 'scope,
ScopeImpl: 'scope,
StopTokenImpl: 'scope,
ReceiverType: 'scope,
{
let receiver = ReceiverWrapper {
nested: receiver,
fn_impl: self.fn_impl,
sch: self.sch,
phantom: PhantomData,
scope: scope.clone(),
};
self.nested.connect(scope, stop_token, receiver)
}
}
impl<'a, NestedSender, FnType, Sch, Out, BindSenderImpl> BitOr<BindSenderImpl>
for UponDoneTS<'a, NestedSender, FnType, Sch, Out>
where
BindSenderImpl: BindSender<Self>,
NestedSender: TypedSender<Scheduler = Sch::LocalScheduler, Value = Out>,
FnType: 'a + NoArgFunctor<'a, Output = Result<Out>>,
Out: 'a + Tuple,
Sch: Scheduler,
{
type Output = BindSenderImpl::Output;
fn bitor(self, rhs: BindSenderImpl) -> Self::Output {
rhs.bind(self)
}
}
pub struct ReceiverWrapper<'a, ScopeImpl, NestedReceiver, FnType, Sch, Out>
where
NestedReceiver: ReceiverOf<Sch::LocalScheduler, Out>,
FnType: 'a + NoArgFunctor<'a, Output = Result<Out>>,
Sch: Scheduler,
Sch::Sender: TypedSenderConnect<
'a,
ScopeImpl,
NeverStopToken,
DoneReceiver<'a, NestedReceiver, FnType, Sch::LocalScheduler, Out>,
>,
Out: 'a + Tuple,
{
nested: NestedReceiver,
fn_impl: FnType,
sch: Sch,
phantom: PhantomData<&'a fn() -> Out>,
scope: ScopeImpl,
}
impl<'a, ScopeImpl, NestedReceiver, FnType, Sch, Out> Receiver
for ReceiverWrapper<'a, ScopeImpl, NestedReceiver, FnType, Sch, Out>
where
NestedReceiver: ReceiverOf<Sch::LocalScheduler, Out>,
FnType: 'a + NoArgFunctor<'a, Output = Result<Out>>,
Sch: Scheduler,
Sch::Sender: TypedSenderConnect<
'a,
ScopeImpl,
NeverStopToken,
DoneReceiver<'a, NestedReceiver, FnType, Sch::LocalScheduler, Out>,
>,
Out: 'a + Tuple,
{
fn set_done(self) {
self.sch
.schedule()
.connect(
&self.scope,
NeverStopToken,
DoneReceiver::<NestedReceiver, FnType, Sch::LocalScheduler, Out> {
nested: self.nested,
fn_impl: self.fn_impl,
phantom: PhantomData,
},
)
.start()
}
fn set_error(self, error: Error) {
self.nested.set_error(error);
}
}
impl<'a, ScopeImpl, NestedReceiver, FnType, Sch, Out> ReceiverOf<Sch::LocalScheduler, Out>
for ReceiverWrapper<'a, ScopeImpl, NestedReceiver, FnType, Sch, Out>
where
NestedReceiver: ReceiverOf<Sch::LocalScheduler, Out>,
FnType: 'a + NoArgFunctor<'a, Output = Result<Out>>,
Sch: Scheduler,
Sch::Sender: TypedSenderConnect<
'a,
ScopeImpl,
NeverStopToken,
DoneReceiver<'a, NestedReceiver, FnType, Sch::LocalScheduler, Out>,
>,
Out: 'a + Tuple,
{
fn set_value(self, sch: Sch::LocalScheduler, v: Out) {
self.nested.set_value(sch, v);
}
}
pub struct DoneReceiver<'a, NestedReceiver, FnType, Sch, Out>
where
NestedReceiver: ReceiverOf<Sch, Out>,
FnType: 'a + NoArgFunctor<'a, Output = Result<Out>>,
Sch: Scheduler<LocalScheduler = Sch>,
Out: 'a + Tuple,
{
nested: NestedReceiver,
fn_impl: FnType,
phantom: PhantomData<&'a fn(Sch) -> Out>,
}
impl<'a, NestedReceiver, FnType, Sch, Out> Receiver
for DoneReceiver<'a, NestedReceiver, FnType, Sch, Out>
where
NestedReceiver: ReceiverOf<Sch, Out>,
FnType: 'a + NoArgFunctor<'a, Output = Result<Out>>,
Sch: Scheduler<LocalScheduler = Sch>,
Out: 'a + Tuple,
{
fn set_done(self) {
self.nested.set_done();
}
fn set_error(self, error: Error) {
self.nested.set_error(error);
}
}
impl<'a, NestedReceiver, FnType, Sch, Out> ReceiverOf<Sch, ()>
for DoneReceiver<'a, NestedReceiver, FnType, Sch, Out>
where
NestedReceiver: ReceiverOf<Sch, Out>,
FnType: 'a + NoArgFunctor<'a, Output = Result<Out>>,
Sch: Scheduler<LocalScheduler = Sch>,
Out: 'a + Tuple,
{
fn set_value(self, sch: Sch, _: ()) {
match self.fn_impl.tuple_invoke() {
Ok(v) => self.nested.set_value(sch, v),
Err(e) => self.nested.set_error(e),
}
}
}