use crate::errors::Error;
use crate::scheduler::Scheduler;
use crate::scope::{detached_scope, ScopeDataSendPtr, ScopeImpl};
use crate::stop_token::{NeverStopToken, StopToken};
use crate::traits::{OperationState, Receiver, ReceiverOf, TypedSenderConnect};
use crate::tuple::Tuple;
pub trait StartDetached {
fn start_detached(self);
}
impl<T> StartDetached for T
where
T: StartDetachedWithStopToken<NeverStopToken>,
{
fn start_detached(self) {
self.start_detached_with_stop_token(NeverStopToken)
}
}
pub trait StartDetachedWithStopToken<StopTokenImpl>
where
StopTokenImpl: StopToken,
{
fn start_detached_with_stop_token(self, stop_token: StopTokenImpl);
}
impl<StopTokenImpl, T> StartDetachedWithStopToken<StopTokenImpl> for T
where
StopTokenImpl: StopToken,
T: TypedSenderConnect<'static, ScopeImpl<ScopeDataSendPtr>, StopTokenImpl, DiscardingReceiver>,
{
fn start_detached_with_stop_token(self, stop_token: StopTokenImpl) {
detached_scope(move |scope: &ScopeImpl<ScopeDataSendPtr>| {
self.connect(scope, stop_token, DiscardingReceiver { completed: false })
.start();
})
}
}
pub struct DiscardingReceiver {
completed: bool,
}
impl Receiver for DiscardingReceiver {
fn set_done(mut self) {
self.completed = true;
}
fn set_error(mut self, error: Error) {
self.completed = true;
panic!("detached completion failed with error: {:?}", error);
}
}
impl<Sch, Tpl> ReceiverOf<Sch, Tpl> for DiscardingReceiver
where
Sch: Scheduler<LocalScheduler = Sch>,
Tpl: Tuple,
{
fn set_value(mut self, _: Sch, _: Tpl) {
self.completed = true;
}
}
impl Drop for DiscardingReceiver {
fn drop(&mut self) {
if !self.completed {
panic!("start_detached operation did not complete with a signal")
}
}
}
#[cfg(test)]
mod test {
use super::StartDetached;
use crate::errors::{new_error, ErrorForTesting, Result};
use crate::just::Just;
use crate::scheduler::{ImmediateScheduler, Scheduler};
use crate::then::Then;
use std::sync::mpsc;
#[test]
fn handles_value() {
let (tx, rx) = mpsc::channel();
(Just::from((String::from("dcba"),))
| Then::from(|(x,): (String,)| (x.chars().rev().collect::<String>(),))
| Then::from(move |(x,)| tx.send(x).map_err(new_error)))
.start_detached();
assert_eq!(
String::from("abcd"),
rx.recv().expect("value callback was invoked")
);
}
#[test]
fn handles_done() {
ImmediateScheduler
.schedule_done::<(i32, i32, i32)>()
.start_detached();
}
#[test]
#[should_panic]
fn handles_error() {
(Just::from((String::from("dcba"),))
| Then::from(|(x,): (String,)| (x.chars().rev().collect::<String>(),))
| Then::from(move |(_,)| -> Result<()> {
Err(new_error(ErrorForTesting::from(
"start_detached error signal test",
)))
}))
.start_detached();
}
}