use crate::errors::Error;
use crate::scheduler::Scheduler;
use crate::stop_token::StopToken;
use crate::traits::{BindSender, Receiver, ReceiverOf, Sender, TypedSender, TypedSenderConnect};
use crate::tuple::Tuple;
use std::ops::BitOr;
#[derive(Default)]
pub struct StopIfRequested;
impl Sender for StopIfRequested {}
impl<TS> BindSender<TS> for StopIfRequested
where
TS: TypedSender,
{
type Output = StopIfRequestedTS<TS>;
fn bind(self, ts: TS) -> Self::Output {
StopIfRequestedTS { ts }
}
}
pub struct StopIfRequestedTS<TS>
where
TS: TypedSender,
{
ts: TS,
}
impl<BindSenderImpl, TS> BitOr<BindSenderImpl> for StopIfRequestedTS<TS>
where
TS: TypedSender,
BindSenderImpl: BindSender<Self>,
{
type Output = BindSenderImpl::Output;
fn bitor(self, rhs: BindSenderImpl) -> Self::Output {
rhs.bind(self)
}
}
impl<TS> TypedSender for StopIfRequestedTS<TS>
where
TS: TypedSender,
{
type Scheduler = TS::Scheduler;
type Value = TS::Value;
}
impl<'a, ScopeImpl, StopTokenImpl, Rcv, TS> TypedSenderConnect<'a, ScopeImpl, StopTokenImpl, Rcv>
for StopIfRequestedTS<TS>
where
StopTokenImpl: 'a + StopToken,
TS: TypedSenderConnect<'a, ScopeImpl, StopTokenImpl, ReceiverWrapper<StopTokenImpl, Rcv>>,
Rcv: ReceiverOf<<TS as TypedSender>::Scheduler, <TS as TypedSender>::Value>,
{
type Output<'scope> = TS::Output<'scope>
where 'a: 'scope, ScopeImpl: 'scope, Rcv: 'scope;
fn connect<'scope>(
self,
scope: &ScopeImpl,
stop_token: StopTokenImpl,
rcv: Rcv,
) -> Self::Output<'scope>
where
'a: 'scope,
ScopeImpl: 'scope,
Rcv: 'scope,
{
let rcv = ReceiverWrapper {
stop_token: stop_token.clone(),
rcv,
};
self.ts.connect(scope, stop_token, rcv)
}
}
pub struct ReceiverWrapper<StopTokenImpl, Rcv>
where
StopTokenImpl: StopToken,
Rcv: Receiver,
{
stop_token: StopTokenImpl,
rcv: Rcv,
}
impl<StopTokenImpl, Rcv> Receiver for ReceiverWrapper<StopTokenImpl, Rcv>
where
StopTokenImpl: StopToken,
Rcv: Receiver,
{
fn set_error(self, error: Error) {
self.rcv.set_error(error)
}
fn set_done(self) {
self.rcv.set_done()
}
}
impl<Sch, Value, StopTokenImpl, Rcv> ReceiverOf<Sch, Value> for ReceiverWrapper<StopTokenImpl, Rcv>
where
Sch: Scheduler<LocalScheduler = Sch>,
Value: Tuple,
StopTokenImpl: StopToken,
Rcv: ReceiverOf<Sch, Value>,
{
fn set_value(self, sch: Sch, value: Value) {
if self.stop_token.stop_requested() {
self.rcv.set_done();
} else {
self.rcv.set_value(sch, value);
}
}
}
#[cfg(test)]
mod tests {
use super::StopIfRequested;
use crate::errors::{new_error, ErrorForTesting};
use crate::just::Just;
use crate::just_done::JustDone;
use crate::just_error::JustError;
use crate::scheduler::ImmediateScheduler;
use crate::sync_wait::SyncWait;
#[test]
fn it_propagates_value() {
(Just::from((1, 2)) | StopIfRequested)
.sync_wait()
.expect("no error")
.expect("no cancelation");
}
#[test]
fn it_propagates_error() {
let sender = JustError::<ImmediateScheduler, (i32, i32)>::from(new_error(
ErrorForTesting::from("this error will be passed through"),
)) | StopIfRequested;
assert_eq!(
ErrorForTesting::from("this error will be passed through"),
*sender
.sync_wait()
.expect_err("should return the error")
.downcast_ref::<ErrorForTesting>()
.unwrap()
)
}
#[test]
fn it_propagates_done() {
let sender = JustDone::<ImmediateScheduler, (i32, i32)>::default() | StopIfRequested;
assert!(sender.sync_wait().expect("no error").is_none())
}
}