use crate::errors::Error;
use crate::io::EnableDefaultIO;
use crate::just::Just;
use crate::just_done::JustDone;
use crate::just_error::JustError;
use crate::scope::{ScopeWrap, ScopeWrapSend};
use crate::stop_token::StopToken;
use crate::traits::{BindSender, OperationState, ReceiverOf, TypedSender, TypedSenderConnect};
use crate::tuple::Tuple;
use std::marker::PhantomData;
use std::ops::BitOr;
use threadpool::ThreadPool;
pub trait Scheduler: Eq + Clone + 'static {
const EXECUTION_BLOCKS_CALLER: bool;
type LocalScheduler: Scheduler<LocalScheduler = Self::LocalScheduler>;
type Sender: for<'a> TypedSender<Scheduler = Self::LocalScheduler, Value = ()>;
fn schedule(&self) -> Self::Sender;
fn schedule_value<'a, Tpl: 'a + Tuple>(&self, values: Tpl) -> Just<'a, Self, Tpl> {
Just::with_scheduler(self.clone(), values)
}
fn schedule_error<Tpl: Tuple>(&self, error: Error) -> JustError<Self, Tpl> {
JustError::<Self, Tpl>::new(error)
}
fn schedule_done<Tpl: Tuple>(&self) -> JustDone<Self, Tpl> {
JustDone::<Self, Tpl>::new()
}
fn lazy(&self) -> LazyScheduler<Self>
where
Self: Scheduler<LocalScheduler = Self>,
{
LazyScheduler { sch: self.clone() }
}
}
#[derive(Clone, Default, Eq, PartialEq)]
pub struct ImmediateScheduler;
impl Scheduler for ImmediateScheduler {
const EXECUTION_BLOCKS_CALLER: bool = true;
type LocalScheduler = ImmediateScheduler;
type Sender = ImmediateSender;
fn schedule(&self) -> Self::Sender {
ImmediateSender
}
}
impl EnableDefaultIO for ImmediateScheduler {}
pub struct ImmediateSender;
impl TypedSender for ImmediateSender {
type Scheduler = ImmediateScheduler;
type Value = ();
}
impl<'a, ScopeImpl, StopTokenImpl, ReceiverType>
TypedSenderConnect<'a, ScopeImpl, StopTokenImpl, ReceiverType> for ImmediateSender
where
ReceiverType: ReceiverOf<
<ImmediateSender as TypedSender>::Scheduler,
<ImmediateSender as TypedSender>::Value,
>,
StopTokenImpl: StopToken,
{
type Output<'scope> = ImmediateOperationState<'scope, ReceiverType>
where
'a:'scope,
ScopeImpl:'scope,
StopTokenImpl:'scope,
ReceiverType:'scope;
fn connect<'scope>(
self,
_: &ScopeImpl,
_: StopTokenImpl,
receiver: ReceiverType,
) -> Self::Output<'scope>
where
'a: 'scope,
ScopeImpl: 'scope,
StopTokenImpl: 'scope,
ReceiverType: 'scope,
{
ImmediateOperationState {
phantom: PhantomData,
receiver,
}
}
}
impl<BindSenderImpl> BitOr<BindSenderImpl> for ImmediateSender
where
BindSenderImpl: BindSender<Self>,
{
type Output = BindSenderImpl::Output;
fn bitor(self, rhs: BindSenderImpl) -> Self::Output {
rhs.bind(self)
}
}
pub struct ImmediateOperationState<'scope, ReceiverType>
where
ReceiverType: ReceiverOf<ImmediateScheduler, ()> + 'scope,
{
phantom: PhantomData<&'scope ()>,
receiver: ReceiverType,
}
impl<'scope, ReceiverType> OperationState<'scope> for ImmediateOperationState<'scope, ReceiverType>
where
ReceiverType: ReceiverOf<ImmediateScheduler, ()> + 'scope,
{
fn start(self) {
self.receiver.set_value(ImmediateScheduler {}, ());
}
}
impl Scheduler for ThreadPool {
const EXECUTION_BLOCKS_CALLER: bool = false;
type LocalScheduler = ThreadPool;
type Sender = ThreadPoolSender;
fn schedule(&self) -> Self::Sender {
ThreadPoolSender { pool: self.clone() }
}
}
impl EnableDefaultIO for ThreadPool {}
pub struct ThreadPoolSender {
pool: ThreadPool,
}
impl TypedSender for ThreadPoolSender {
type Value = ();
type Scheduler = ThreadPool;
}
impl<'a, ScopeImpl, StopTokenImpl, ReceiverType>
TypedSenderConnect<'a, ScopeImpl, StopTokenImpl, ReceiverType> for ThreadPoolSender
where
ReceiverType: Send
+ ReceiverOf<
<ThreadPoolSender as TypedSender>::Scheduler,
<ThreadPoolSender as TypedSender>::Value,
>,
ScopeImpl: ScopeWrapSend<<ThreadPoolSender as TypedSender>::Scheduler, ReceiverType>,
StopTokenImpl: StopToken,
{
type Output<'scope> = ThreadPoolOperationState<'scope, ScopeImpl::WrapSendOutput>
where
'a:'scope, ScopeImpl:'scope,
StopTokenImpl: 'scope,
ReceiverType:'scope;
fn connect<'scope>(
self,
scope: &ScopeImpl,
_: StopTokenImpl,
receiver: ReceiverType,
) -> Self::Output<'scope>
where
'a: 'scope,
ScopeImpl: 'scope,
StopTokenImpl: 'scope,
ReceiverType: 'scope,
{
ThreadPoolOperationState {
pool: self.pool,
receiver: scope.wrap_send(receiver),
phantom: PhantomData,
}
}
}
impl<BindSenderImpl> BitOr<BindSenderImpl> for ThreadPoolSender
where
BindSenderImpl: BindSender<Self>,
{
type Output = BindSenderImpl::Output;
fn bitor(self, rhs: BindSenderImpl) -> Self::Output {
rhs.bind(self)
}
}
pub struct ThreadPoolOperationState<'a, Receiver>
where
Receiver: ReceiverOf<ThreadPool, ()> + Send + 'static,
{
phantom: PhantomData<&'a i32>,
pool: ThreadPool,
receiver: Receiver,
}
impl<'a, Receiver> OperationState<'a> for ThreadPoolOperationState<'a, Receiver>
where
Receiver: ReceiverOf<ThreadPool, ()> + Send + 'static,
{
fn start(self) {
let pool = self.pool.clone();
let receiver = self.receiver;
self.pool.execute(move || receiver.set_value(pool, ()));
}
}
pub trait WithScheduler<Sch, Arg>
where
Sch: Scheduler,
{
fn with_scheduler(sch: Sch, arg: Arg) -> Self;
}
#[derive(Clone, Eq, PartialEq)]
pub struct LazyScheduler<Sch>
where
Sch: Scheduler<LocalScheduler = Sch>,
{
sch: Sch,
}
impl<Sch> Scheduler for LazyScheduler<Sch>
where
Sch: Scheduler<LocalScheduler = Sch>,
{
const EXECUTION_BLOCKS_CALLER: bool = Sch::EXECUTION_BLOCKS_CALLER;
type LocalScheduler = Sch;
type Sender = LazySchedulerTS<Sch>;
fn schedule(&self) -> Self::Sender {
Self::Sender {
sch: self.sch.clone(),
}
}
}
impl<Sch> EnableDefaultIO for LazyScheduler<Sch> where
Sch: Scheduler<LocalScheduler = Sch> + EnableDefaultIO
{
}
pub struct LazySchedulerTS<Sch>
where
Sch: Scheduler<LocalScheduler = Sch>,
{
sch: Sch,
}
impl<Sch> TypedSender for LazySchedulerTS<Sch>
where
Sch: Scheduler<LocalScheduler = Sch>,
{
type Scheduler = Sch;
type Value = ();
}
impl<'a, ScopeImpl, StopTokenImpl, ReceiverType, Sch>
TypedSenderConnect<'a, ScopeImpl, StopTokenImpl, ReceiverType> for LazySchedulerTS<Sch>
where
ReceiverType: ReceiverOf<Sch, ()>,
Sch: Scheduler<LocalScheduler = Sch>,
ScopeImpl: ScopeWrap<Sch, ReceiverType>,
StopTokenImpl: StopToken,
{
type Output<'scope> = LazySchedulerOperationState<'scope, Sch,ReceiverType>
where
'a:'scope,
ScopeImpl:'scope,
StopTokenImpl: 'scope,
ReceiverType:'scope;
fn connect<'scope>(
self,
_: &ScopeImpl,
_: StopTokenImpl,
receiver: ReceiverType,
) -> Self::Output<'scope>
where
'a: 'scope,
ScopeImpl: 'scope,
StopTokenImpl: 'scope,
ReceiverType: 'scope,
{
LazySchedulerOperationState {
sch: self.sch,
receiver,
phantom: PhantomData,
}
}
}
pub struct LazySchedulerOperationState<'scope, Sch, ReceiverType>
where
ReceiverType: ReceiverOf<Sch, ()> + 'scope,
Sch: Scheduler<LocalScheduler = Sch>,
{
phantom: PhantomData<&'scope i32>,
sch: Sch,
receiver: ReceiverType,
}
impl<'scope, Sch, ReceiverType> OperationState<'scope>
for LazySchedulerOperationState<'scope, Sch, ReceiverType>
where
ReceiverType: ReceiverOf<Sch, ()> + 'scope,
Sch: Scheduler<LocalScheduler = Sch>,
{
fn start(self) {
self.receiver.set_value(self.sch, ());
}
}