#![allow(private_bounds)]
use std::sync::Arc;
use crate::command::{Command, ControlFlow};
use crate::domain::SharedDomainData;
use crate::task_status::TaskStatus;
#[cfg(feature = "error-stack")]
use crate::error::BoxError;
#[cfg(feature = "error-stack")]
use crate::error::DbuffErr;
#[cfg(feature = "error-stack")]
use crate::error::ErasedError;
#[cfg(feature = "error-stack")]
use error_stack::Report;
#[cfg(feature = "error-stack")]
type ErrorHandler<D> =
Option<Arc<dyn Fn(error_stack::Report<crate::error::DbuffErr>, &mut D) + Send + Sync>>;
#[cfg(not(feature = "error-stack"))]
type ErrorHandler<D> =
Option<Arc<dyn Fn(Box<dyn std::error::Error + Send + Sync>, &mut D) + Send + Sync>>;
type ChainError = Arc<dyn std::error::Error + Send + Sync>;
pub struct ChainHandle {
inner: tokio::task::JoinHandle<ControlFlow>,
}
impl std::future::Future for ChainHandle {
type Output = Result<ControlFlow, tokio::task::JoinError>;
fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let inner = unsafe { self.map_unchecked_mut(|s| &mut s.inner) };
inner.poll(cx)
}
}
impl std::ops::Deref for ChainHandle {
type Target = tokio::task::JoinHandle<ControlFlow>;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
#[cfg(feature = "error-stack")]
#[allow(private_bounds)]
pub(crate) trait CmdError:
std::fmt::Debug + std::fmt::Display + Send + Sync + 'static
{
}
#[cfg(feature = "error-stack")]
impl<T> CmdError for T
where
T: std::fmt::Debug + std::fmt::Display + Send + Sync + 'static,
{}
#[cfg(not(feature = "error-stack"))]
#[allow(private_bounds)]
pub(crate) trait CmdError: std::error::Error + Send + Sync + 'static {}
#[cfg(not(feature = "error-stack"))]
impl<T> CmdError for T
where
T: std::error::Error + Send + Sync + 'static,
{}
#[cfg(not(feature = "error-stack"))]
struct ArcErrorWrapper(Arc<dyn std::error::Error + Send + Sync>);
#[cfg(not(feature = "error-stack"))]
impl std::fmt::Display for ArcErrorWrapper {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
#[cfg(not(feature = "error-stack"))]
impl std::fmt::Debug for ArcErrorWrapper {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
#[cfg(not(feature = "error-stack"))]
impl std::error::Error for ArcErrorWrapper {}
#[cfg(feature = "error-stack")]
struct DebugDisplayAdapter(Arc<dyn ErasedError>);
#[cfg(feature = "error-stack")]
impl std::fmt::Display for DebugDisplayAdapter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
#[cfg(feature = "error-stack")]
impl std::fmt::Debug for DebugDisplayAdapter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
#[cfg(feature = "error-stack")]
impl std::error::Error for DebugDisplayAdapter {}
pub struct DomainExecutor<D, S> {
pub(crate) domain: SharedDomainData<D>,
pub(crate) services: S,
pub(crate) rt: tokio::runtime::Handle,
pub(crate) error_handler: ErrorHandler<D>,
}
impl<D, S> DomainExecutor<D, S>
where
D: Clone + Send + Sync + 'static,
S: Clone + Send + 'static,
{
#[cfg(feature = "error-stack")]
#[must_use = "the error handler must be consumed as part of the chain"]
pub fn on_error(
mut self,
f: impl Fn(Report<DbuffErr>, &mut D) + Send + Sync + 'static,
) -> Self {
self.error_handler = Some(Arc::new(f));
self
}
#[cfg(not(feature = "error-stack"))]
#[must_use = "the error handler must be consumed as part of the chain"]
pub fn on_error(
mut self,
f: impl Fn(Box<dyn std::error::Error + Send + Sync>, &mut D) + Send + Sync + 'static,
) -> Self {
self.error_handler = Some(Arc::new(f));
self
}
pub fn exec<C>(
self,
cmd: C,
setter: impl Fn(&mut D, &C::Output) + Send + Sync + 'static,
) -> DomainChain<D, C::Output, S>
where
C: Command<S> + 'static,
C::Output: Clone + Send + 'static,
C::Error: CmdError,
{
let domain = self.domain.clone();
let services = self.services.clone();
let err_handler = self.error_handler.clone();
let setter = Arc::new(setter);
let future: std::pin::Pin<
Box<dyn std::future::Future<Output = Result<C::Output, ChainError>> + Send + 'static>,
> = Box::pin(async move {
match cmd.execute(services).await {
Ok(output) => {
let setter = setter;
let output_for_setter = output.clone();
domain.modify(move |d| setter(d, &output_for_setter));
Ok(output)
}
Err(e) => {
#[cfg(feature = "error-stack")]
let shared: Arc<dyn ErasedError> = Arc::new(e);
#[cfg(feature = "error-stack")]
let err: ChainError = Arc::new(DebugDisplayAdapter(shared.clone()));
#[cfg(not(feature = "error-stack"))]
let err: ChainError = Arc::new(e);
if let Some(h) = &err_handler {
let h = h.clone();
#[cfg(feature = "error-stack")]
domain.modify(move |d| {
h(
Report::new(BoxError(Box::new(DebugDisplayAdapter(shared))))
.change_context(DbuffErr),
d,
);
});
#[cfg(not(feature = "error-stack"))]
{
let err = err.clone();
domain.modify(move |d| h(Box::new(ArcErrorWrapper(err)), d));
}
}
Err(err)
}
}
});
DomainChain {
future,
domain: self.domain.clone(),
services: self.services.clone(),
rt: self.rt.clone(),
error_handler: self.error_handler,
}
}
pub fn exec_discard<C>(self, cmd: C) -> DomainChain<D, (), S>
where
C: Command<S> + 'static,
C::Error: CmdError,
{
let services = self.services.clone();
let err_handler = self.error_handler.clone();
let domain = self.domain.clone();
let future: std::pin::Pin<
Box<dyn std::future::Future<Output = Result<(), ChainError>> + Send + 'static>,
> = Box::pin(async move {
match cmd.execute(services).await {
Ok(_) => Ok(()),
Err(e) => {
#[cfg(feature = "error-stack")]
let shared: Arc<dyn ErasedError> = Arc::new(e);
#[cfg(feature = "error-stack")]
let err: ChainError = Arc::new(DebugDisplayAdapter(shared.clone()));
#[cfg(not(feature = "error-stack"))]
let err: ChainError = Arc::new(e);
if let Some(h) = &err_handler {
let h = h.clone();
#[cfg(feature = "error-stack")]
domain.modify(move |d| {
h(
Report::new(BoxError(Box::new(DebugDisplayAdapter(shared))))
.change_context(DbuffErr),
d,
);
});
#[cfg(not(feature = "error-stack"))]
{
let err = err.clone();
domain.modify(move |d| h(Box::new(ArcErrorWrapper(err)), d));
}
}
Err(err)
}
}
});
DomainChain {
future,
domain: self.domain.clone(),
services: self.services.clone(),
rt: self.rt.clone(),
error_handler: self.error_handler,
}
}
}
pub struct DomainChain<D, T, S> {
future: std::pin::Pin<
Box<dyn std::future::Future<Output = Result<T, ChainError>> + Send + 'static>,
>,
domain: SharedDomainData<D>,
services: S,
rt: tokio::runtime::Handle,
error_handler: ErrorHandler<D>,
}
impl<D, T, S> DomainChain<D, T, S>
where
D: Clone + Send + Sync + 'static,
T: Send + 'static,
S: Clone + Send + 'static,
{
pub fn exec<C>(
self,
cmd: C,
setter: impl Fn(&mut D, &C::Output) + Send + Sync + 'static,
) -> DomainChain<D, C::Output, S>
where
C: Command<S> + 'static,
C::Output: Clone + Send + 'static,
C::Error: CmdError,
{
let domain = self.domain.clone();
let services = self.services.clone();
let err_handler = self.error_handler.clone();
let setter = Arc::new(setter);
let future: std::pin::Pin<
Box<dyn std::future::Future<Output = Result<C::Output, ChainError>> + Send + 'static>,
> = Box::pin(async move {
self.future.await?;
match cmd.execute(services).await {
Ok(output) => {
let setter = setter;
let output_for_setter = output.clone();
domain.modify(move |d| setter(d, &output_for_setter));
Ok(output)
}
Err(e) => {
#[cfg(feature = "error-stack")]
let shared: Arc<dyn ErasedError> = Arc::new(e);
#[cfg(feature = "error-stack")]
let err: ChainError = Arc::new(DebugDisplayAdapter(shared.clone()));
#[cfg(not(feature = "error-stack"))]
let err: ChainError = Arc::new(e);
if let Some(h) = &err_handler {
let h = h.clone();
#[cfg(feature = "error-stack")]
domain.modify(move |d| {
h(
Report::new(BoxError(Box::new(DebugDisplayAdapter(shared))))
.change_context(DbuffErr),
d,
);
});
#[cfg(not(feature = "error-stack"))]
{
let err = err.clone();
domain.modify(move |d| h(Box::new(ArcErrorWrapper(err)), d));
}
}
Err(err)
}
}
});
DomainChain {
future,
domain: self.domain.clone(),
services: self.services.clone(),
rt: self.rt.clone(),
error_handler: self.error_handler,
}
}
pub fn exec_discard<C>(self, cmd: C) -> DomainChain<D, (), S>
where
C: Command<S> + 'static,
C::Error: CmdError,
{
let services = self.services.clone();
let err_handler = self.error_handler.clone();
let domain = self.domain.clone();
let future: std::pin::Pin<
Box<dyn std::future::Future<Output = Result<(), ChainError>> + Send + 'static>,
> = Box::pin(async move {
self.future.await?;
match cmd.execute(services).await {
Ok(_) => Ok(()),
Err(e) => {
#[cfg(feature = "error-stack")]
let shared: Arc<dyn ErasedError> = Arc::new(e);
#[cfg(feature = "error-stack")]
let err: ChainError = Arc::new(DebugDisplayAdapter(shared.clone()));
#[cfg(not(feature = "error-stack"))]
let err: ChainError = Arc::new(e);
if let Some(h) = &err_handler {
let h = h.clone();
#[cfg(feature = "error-stack")]
domain.modify(move |d| {
h(
Report::new(BoxError(Box::new(DebugDisplayAdapter(shared))))
.change_context(DbuffErr),
d,
);
});
#[cfg(not(feature = "error-stack"))]
{
let err = err.clone();
domain.modify(move |d| h(Box::new(ArcErrorWrapper(err)), d));
}
}
Err(err)
}
}
});
DomainChain {
future,
domain: self.domain.clone(),
services: self.services.clone(),
rt: self.rt.clone(),
error_handler: self.error_handler,
}
}
pub fn then<C, F>(
self,
factory: F,
setter: impl Fn(&mut D, &C::Output) + Send + Sync + 'static,
) -> DomainChain<D, C::Output, S>
where
C: Command<S> + 'static,
C::Output: Clone + Send + 'static,
C::Error: CmdError,
F: FnOnce(&T) -> C + Send + 'static,
{
let domain = self.domain.clone();
let services = self.services.clone();
let err_handler = self.error_handler.clone();
let setter = Arc::new(setter);
let future: std::pin::Pin<
Box<dyn std::future::Future<Output = Result<C::Output, ChainError>> + Send + 'static>,
> = Box::pin(async move {
let prev = self.future.await?;
let cmd = factory(&prev);
match cmd.execute(services).await {
Ok(output) => {
let setter = setter;
let output_for_setter = output.clone();
domain.modify(move |d| setter(d, &output_for_setter));
Ok(output)
}
Err(e) => {
#[cfg(feature = "error-stack")]
let shared: Arc<dyn ErasedError> = Arc::new(e);
#[cfg(feature = "error-stack")]
let err: ChainError = Arc::new(DebugDisplayAdapter(shared.clone()));
#[cfg(not(feature = "error-stack"))]
let err: ChainError = Arc::new(e);
if let Some(h) = &err_handler {
let h = h.clone();
#[cfg(feature = "error-stack")]
domain.modify(move |d| {
h(
Report::new(BoxError(Box::new(DebugDisplayAdapter(shared))))
.change_context(DbuffErr),
d,
);
});
#[cfg(not(feature = "error-stack"))]
{
let err = err.clone();
domain.modify(move |d| h(Box::new(ArcErrorWrapper(err)), d));
}
}
Err(err)
}
}
});
DomainChain {
future,
domain: self.domain.clone(),
services: self.services.clone(),
rt: self.rt.clone(),
error_handler: self.error_handler,
}
}
#[must_use = "the tracked builder must be consumed to take effect"]
pub fn tracked(
self,
status_setter: impl Fn(&mut D, TaskStatus<T>) + Send + Sync + 'static,
) -> Self
where
T: Clone + Send + 'static,
{
let domain = self.domain.clone();
let setter = Arc::new(status_setter);
let future: std::pin::Pin<
Box<dyn std::future::Future<Output = Result<T, ChainError>> + Send + 'static>,
> = Box::pin(async move {
{
let setter = setter.clone();
domain.modify(move |d| setter(d, TaskStatus::Pending));
}
match self.future.await {
Ok(output) => {
let output_for_status = output.clone();
let setter = setter.clone();
domain.modify(move |d| setter(d, TaskStatus::Resolved(output_for_status)));
Ok(output)
}
Err(e) => {
let setter = setter.clone();
let e_for_status = e.clone();
domain.modify(move |d| setter(d, TaskStatus::Error(e_for_status)));
Err(e)
}
}
});
DomainChain {
future,
domain: self.domain,
services: self.services,
rt: self.rt,
error_handler: self.error_handler,
}
}
#[must_use = "the chain must be consumed with `.go()` to spawn execution"]
pub fn go(self) -> ChainHandle {
ChainHandle {
inner: self.rt.spawn(async move {
if self.future.await.is_ok() {
ControlFlow::Continue
} else {
ControlFlow::Break
}
}),
}
}
pub fn go_detach(self) {
self.rt.spawn(async move {
if self.future.await.is_ok() {
ControlFlow::Continue
} else {
ControlFlow::Break
}
});
}
}
macro_rules! impl_parallel {
($n:tt; $($cmd:ident, $setter:ident, $idx:tt),+ $(,)?) => {
paste::paste! {
#[allow(non_snake_case)]
impl<D, S> DomainExecutor<D, S>
where
D: Clone + Send + Sync + 'static,
S: Clone + Send + 'static,
{
pub fn [<exec_parallel_ $n>]<$($cmd, $setter),+>(
self,
$($cmd: ($cmd, $setter)),+
) -> DomainChain<D, (), S>
where
$($cmd: Command<S> + 'static,)+
$($cmd::Output: Clone + Send + 'static,)+
$($cmd::Error: CmdError,)+
$($setter: Fn(&mut D, &$cmd::Output) + Send + Sync + 'static,)+
{
let domain = self.domain.clone();
let services = self.services.clone();
let rt = self.rt.clone();
let err_handler = self.error_handler.clone();
$(
let ($cmd, $setter) = $cmd;
let $setter = Arc::new($setter);
let [<domain_ $idx>] = domain.clone();
let [<services_ $idx>] = services.clone();
let _ = rt.clone();
let [<err_handler_ $idx>] = err_handler.clone();
)+
let future: std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), ChainError>> + Send + 'static>> =
Box::pin(async move {
let results = tokio::join!(
$(
async move {
match $cmd.execute([<services_ $idx>]).await {
Ok(output) => {
let o = output.clone();
[<domain_ $idx>].modify(move |d| $setter(d, &o));
Ok(true)
}
Err(e) => {
#[cfg(feature = "error-stack")]
let shared: Arc<dyn ErasedError> = Arc::new(e);
#[cfg(feature = "error-stack")]
let err: ChainError = Arc::new(DebugDisplayAdapter(shared.clone()));
#[cfg(not(feature = "error-stack"))]
let err: ChainError = Arc::new(e);
if let Some(h) = &[<err_handler_ $idx>] {
let h = h.clone();
#[cfg(feature = "error-stack")]
[<domain_ $idx>].modify(move |d| h(Report::new(BoxError(Box::new(DebugDisplayAdapter(shared)))).change_context(DbuffErr), d));
#[cfg(not(feature = "error-stack"))]
{
let err = err.clone();
[<domain_ $idx>].modify(move |d| h(Box::new(ArcErrorWrapper(err)), d));
}
}
Err(err)
}
}
},
)+
);
let mut first_error: Option<ChainError> = None;
$(
if let Err(e) = &results.$idx {
if first_error.is_none() {
first_error = Some(e.clone());
}
}
)+
match first_error {
Some(e) => Err(e),
None => Ok(()),
}
});
DomainChain {
future,
domain: self.domain.clone(),
services: self.services.clone(),
rt: self.rt.clone(),
error_handler: self.error_handler,
}
}
}
#[allow(non_snake_case)]
impl<D, T, S> DomainChain<D, T, S>
where
D: Clone + Send + Sync + 'static,
T: Send + 'static,
S: Clone + Send + 'static,
{
pub fn [<exec_parallel_ $n>]<$($cmd, $setter),+>(
self,
$($cmd: ($cmd, $setter)),+
) -> DomainChain<D, (), S>
where
$($cmd: Command<S> + 'static,)+
$($cmd::Output: Clone + Send + 'static,)+
$($cmd::Error: CmdError,)+
$($setter: Fn(&mut D, &$cmd::Output) + Send + Sync + 'static,)+
{
let domain = self.domain.clone();
let services = self.services.clone();
let rt = self.rt.clone();
let err_handler = self.error_handler.clone();
let prev = self.future;
$(
let ($cmd, $setter) = $cmd;
let $setter = Arc::new($setter);
let [<domain_ $idx>] = domain.clone();
let [<services_ $idx>] = services.clone();
let _ = rt.clone();
let [<err_handler_ $idx>] = err_handler.clone();
)+
let future: std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), ChainError>> + Send + 'static>> =
Box::pin(async move {
prev.await?;
let results = tokio::join!(
$(
async move {
match $cmd.execute([<services_ $idx>]).await {
Ok(output) => {
let o = output.clone();
[<domain_ $idx>].modify(move |d| $setter(d, &o));
Ok(true)
}
Err(e) => {
#[cfg(feature = "error-stack")]
let shared: Arc<dyn ErasedError> = Arc::new(e);
#[cfg(feature = "error-stack")]
let err: ChainError = Arc::new(DebugDisplayAdapter(shared.clone()));
#[cfg(not(feature = "error-stack"))]
let err: ChainError = Arc::new(e);
if let Some(h) = &[<err_handler_ $idx>] {
let h = h.clone();
#[cfg(feature = "error-stack")]
[<domain_ $idx>].modify(move |d| h(Report::new(BoxError(Box::new(DebugDisplayAdapter(shared)))).change_context(DbuffErr), d));
#[cfg(not(feature = "error-stack"))]
{
let err = err.clone();
[<domain_ $idx>].modify(move |d| h(Box::new(ArcErrorWrapper(err)), d));
}
}
Err(err)
}
}
},
)+
);
let mut first_error: Option<ChainError> = None;
$(
if let Err(e) = &results.$idx {
if first_error.is_none() {
first_error = Some(e.clone());
}
}
)+
match first_error {
Some(e) => Err(e),
None => Ok(()),
}
});
DomainChain {
future,
domain: self.domain.clone(),
services: self.services.clone(),
rt: self.rt.clone(),
error_handler: self.error_handler,
}
}
}
#[allow(non_snake_case)]
impl<D, S> DomainExecutor<D, S>
where
D: Clone + Send + Sync + 'static,
S: Clone + Send + 'static,
{
pub fn [<exec_parallel_discard_ $n>]<$($cmd),+>(
self,
$($cmd: $cmd),+
) -> DomainChain<D, (), S>
where
$($cmd: Command<S> + 'static,)+
$($cmd::Error: CmdError,)+
{
let domain = self.domain.clone();
let services = self.services.clone();
let rt = self.rt.clone();
let err_handler = self.error_handler.clone();
$(
let [<domain_ $idx>] = domain.clone();
let [<services_ $idx>] = services.clone();
let _ = rt.clone();
let [<err_handler_ $idx>] = err_handler.clone();
)+
let future: std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), ChainError>> + Send + 'static>> =
Box::pin(async move {
let results = tokio::join!(
$(
async move {
match $cmd.execute([<services_ $idx>]).await {
Ok(_) => Ok(true),
Err(e) => {
#[cfg(feature = "error-stack")]
let shared: Arc<dyn ErasedError> = Arc::new(e);
#[cfg(feature = "error-stack")]
let err: ChainError = Arc::new(DebugDisplayAdapter(shared.clone()));
#[cfg(not(feature = "error-stack"))]
let err: ChainError = Arc::new(e);
if let Some(h) = &[<err_handler_ $idx>] {
let h = h.clone();
#[cfg(feature = "error-stack")]
[<domain_ $idx>].modify(move |d| h(Report::new(BoxError(Box::new(DebugDisplayAdapter(shared)))).change_context(DbuffErr), d));
#[cfg(not(feature = "error-stack"))]
{
let err = err.clone();
[<domain_ $idx>].modify(move |d| h(Box::new(ArcErrorWrapper(err)), d));
}
}
Err(err)
}
}
},
)+
);
let mut first_error: Option<ChainError> = None;
$(
if let Err(e) = &results.$idx {
if first_error.is_none() {
first_error = Some(e.clone());
}
}
)+
match first_error {
Some(e) => Err(e),
None => Ok(()),
}
});
DomainChain {
future,
domain: self.domain.clone(),
services: self.services.clone(),
rt: self.rt.clone(),
error_handler: self.error_handler,
}
}
}
#[allow(non_snake_case)]
impl<D, T, S> DomainChain<D, T, S>
where
D: Clone + Send + Sync + 'static,
T: Send + 'static,
S: Clone + Send + 'static,
{
pub fn [<exec_parallel_discard_ $n>]<$($cmd),+>(
self,
$($cmd: $cmd),+
) -> DomainChain<D, (), S>
where
$($cmd: Command<S> + 'static,)+
$($cmd::Error: CmdError,)+
{
let domain = self.domain.clone();
let services = self.services.clone();
let rt = self.rt.clone();
let err_handler = self.error_handler.clone();
let prev = self.future;
$(
let [<domain_ $idx>] = domain.clone();
let [<services_ $idx>] = services.clone();
let _ = rt.clone();
let [<err_handler_ $idx>] = err_handler.clone();
)+
let future: std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), ChainError>> + Send + 'static>> =
Box::pin(async move {
prev.await?;
let results = tokio::join!(
$(
async move {
match $cmd.execute([<services_ $idx>]).await {
Ok(_) => Ok(true),
Err(e) => {
#[cfg(feature = "error-stack")]
let shared: Arc<dyn ErasedError> = Arc::new(e);
#[cfg(feature = "error-stack")]
let err: ChainError = Arc::new(DebugDisplayAdapter(shared.clone()));
#[cfg(not(feature = "error-stack"))]
let err: ChainError = Arc::new(e);
if let Some(h) = &[<err_handler_ $idx>] {
let h = h.clone();
#[cfg(feature = "error-stack")]
[<domain_ $idx>].modify(move |d| h(Report::new(BoxError(Box::new(DebugDisplayAdapter(shared)))).change_context(DbuffErr), d));
#[cfg(not(feature = "error-stack"))]
{
let err = err.clone();
[<domain_ $idx>].modify(move |d| h(Box::new(ArcErrorWrapper(err)), d));
}
}
Err(err)
}
}
},
)+
);
let mut first_error: Option<ChainError> = None;
$(
if let Err(e) = &results.$idx {
if first_error.is_none() {
first_error = Some(e.clone());
}
}
)+
match first_error {
Some(e) => Err(e),
None => Ok(()),
}
});
DomainChain {
future,
domain: self.domain.clone(),
services: self.services.clone(),
rt: self.rt.clone(),
error_handler: self.error_handler,
}
}
}
}
};
}
impl_parallel!(2; C0, S0, 0, C1, S1, 1);
impl_parallel!(3; C0, S0, 0, C1, S1, 1, C2, S2, 2);
impl_parallel!(4; C0, S0, 0, C1, S1, 1, C2, S2, 2, C3, S3, 3);
impl_parallel!(5; C0, S0, 0, C1, S1, 1, C2, S2, 2, C3, S3, 3, C4, S4, 4);