use crate::errors::Error;
use crate::scheduler::Scheduler;
use crate::stop_token::{NeverStopToken, StopToken};
use crate::traits::{
BindSender, OperationState, Receiver, ReceiverOf, Sender, TypedSender, TypedSenderConnect,
};
use crate::tuple::Tuple;
use std::marker::PhantomData;
use std::ops::BitOr;
pub struct Transfer<'a, Sch>
where
Sch: Scheduler,
{
phantom: PhantomData<&'a fn() -> Sch>,
target_scheduler: Sch,
}
impl<'a, Sch> Transfer<'a, Sch>
where
Sch: Scheduler,
{
pub fn new(target_scheduler: Sch) -> Self {
Transfer {
target_scheduler,
phantom: PhantomData,
}
}
}
impl<'a, Sch> Sender for Transfer<'a, Sch> where Sch: Scheduler {}
impl<'a, NestedSender, Sch> BindSender<NestedSender> for Transfer<'a, Sch>
where
Sch: Scheduler,
NestedSender: TypedSender,
{
type Output = TransferTS<'a, NestedSender, Sch>;
fn bind(self, nested: NestedSender) -> Self::Output {
TransferTS {
phantom: PhantomData,
nested,
target_scheduler: self.target_scheduler,
}
}
}
pub struct TransferTS<'a, NestedSender, Sch>
where
NestedSender: TypedSender,
Sch: Scheduler,
{
phantom: PhantomData<&'a i32>,
nested: NestedSender,
target_scheduler: Sch,
}
impl<'a, NestedSender, Sch> TypedSender for TransferTS<'a, NestedSender, Sch>
where
NestedSender: TypedSender,
Sch: Scheduler,
{
type Value = NestedSender::Value;
type Scheduler = Sch::LocalScheduler;
}
impl<'a, ScopeImpl, StopTokenImpl, ReceiverType, NestedSender, Sch>
TypedSenderConnect<'a, ScopeImpl, StopTokenImpl, ReceiverType>
for TransferTS<'a, NestedSender, Sch>
where
ReceiverType: ReceiverOf<Sch::LocalScheduler, NestedSender::Value>,
NestedSender: 'a
+ TypedSender
+ TypedSenderConnect<
'a,
ScopeImpl,
StopTokenImpl,
ReceiverWrapper<'a, ScopeImpl, ReceiverType, Sch, <NestedSender as TypedSender>::Value>,
>,
Sch: Scheduler,
Sch::Sender: TypedSender
+ TypedSenderConnect<
'a,
ScopeImpl,
NeverStopToken,
ContinuingReceiverWrapper<ReceiverType, Sch::LocalScheduler, NestedSender::Value>,
>,
ScopeImpl: Clone,
StopTokenImpl: StopToken,
{
type Output<'scope> = NestedSender::Output<'scope>
where
'a: 'scope,
ScopeImpl: 'scope,
StopTokenImpl: 'scope,
ReceiverType: 'scope;
fn connect<'scope>(
self,
scope: &ScopeImpl,
stop_token: StopTokenImpl,
nested: ReceiverType,
) -> Self::Output<'scope>
where
'a: 'scope,
ScopeImpl: 'scope,
StopTokenImpl: 'scope,
ReceiverType: 'scope,
{
let receiver = ReceiverWrapper {
nested,
target_scheduler: self.target_scheduler,
phantom: PhantomData,
scope: scope.clone(),
};
self.nested.connect(scope, stop_token, receiver)
}
}
impl<'a, NestedSender, Sch, BindSenderImpl> BitOr<BindSenderImpl>
for TransferTS<'a, NestedSender, Sch>
where
BindSenderImpl: BindSender<TransferTS<'a, NestedSender, Sch>>,
NestedSender: TypedSender,
Sch: Scheduler,
{
type Output = BindSenderImpl::Output;
fn bitor(self, rhs: BindSenderImpl) -> Self::Output {
rhs.bind(self)
}
}
pub struct ReceiverWrapper<'a, ScopeImpl, NestedReceiver, Sch, Value>
where
NestedReceiver: ReceiverOf<Sch::LocalScheduler, Value>,
Sch: Scheduler,
Value: 'a + Tuple,
{
nested: NestedReceiver,
target_scheduler: Sch,
phantom: PhantomData<&'a fn(Value) -> Value>,
scope: ScopeImpl,
}
impl<'a, ScopeImpl, NestedReceiver, Sch, Value> Receiver
for ReceiverWrapper<'a, ScopeImpl, NestedReceiver, Sch, Value>
where
NestedReceiver: ReceiverOf<Sch::LocalScheduler, Value>,
Sch: Scheduler,
Value: 'a + Tuple,
{
fn set_done(self) {
self.nested.set_done()
}
fn set_error(self, error: Error) {
self.nested.set_error(error)
}
}
impl<'a, ScopeImpl, PreviousScheduler, NestedReceiver, Sch, Value>
ReceiverOf<PreviousScheduler, Value>
for ReceiverWrapper<'a, ScopeImpl, NestedReceiver, Sch, Value>
where
NestedReceiver: ReceiverOf<Sch::LocalScheduler, Value>,
Sch: Scheduler,
PreviousScheduler: Scheduler<LocalScheduler = PreviousScheduler>,
Value: Tuple,
Sch::Sender: TypedSenderConnect<
'a,
ScopeImpl,
NeverStopToken,
ContinuingReceiverWrapper<NestedReceiver, Sch::LocalScheduler, Value>,
>,
{
fn set_value(self, _: PreviousScheduler, values: Value) {
self.target_scheduler
.schedule()
.connect(
&self.scope,
NeverStopToken,
ContinuingReceiverWrapper {
nested: self.nested,
phantom: PhantomData,
values,
},
)
.start();
}
}
struct ContinuingReceiverWrapper<NestedReceiver, Sch, Value>
where
NestedReceiver: ReceiverOf<Sch, Value>,
Sch: Scheduler<LocalScheduler = Sch>,
Value: Tuple,
{
nested: NestedReceiver,
phantom: PhantomData<Sch>,
values: Value,
}
impl<NestedReceiver, Sch, Value> Receiver for ContinuingReceiverWrapper<NestedReceiver, Sch, Value>
where
NestedReceiver: ReceiverOf<Sch, Value>,
Sch: Scheduler<LocalScheduler = Sch>,
Value: Tuple,
{
fn set_done(self) {
self.nested.set_done()
}
fn set_error(self, error: Error) {
self.nested.set_error(error)
}
}
impl<NestedReceiver, Sch, Value> ReceiverOf<Sch, ()>
for ContinuingReceiverWrapper<NestedReceiver, Sch, Value>
where
NestedReceiver: ReceiverOf<Sch, Value>,
Sch: Scheduler<LocalScheduler = Sch>,
Value: Tuple,
{
fn set_value(self, sch: Sch, _: ()) {
self.nested.set_value(sch, self.values)
}
}