#[macro_export]
macro_rules! impl_worker {
($worker:ty, $error:ty => $run_body:block) => {
#[async_trait::async_trait]
impl $crate::Worker for $worker {
type Error = $error;
async fn run(&mut self) -> Result<(), Self::Error> $run_body
}
};
}
#[macro_export]
macro_rules! impl_worker_stateful {
($worker:ty, $error:ty => |$self:ident| $run_body:block) => {
#[async_trait::async_trait]
impl $crate::Worker for $worker {
type Error = $error;
async fn run(&mut $self) -> Result<(), Self::Error> $run_body
}
};
}
#[macro_export]
macro_rules! impl_worker_mailbox {
($worker:ty, $error:ty => |$self:ident, $msg:ident| $process_body:block) => {
#[async_trait::async_trait]
impl $crate::Worker for $worker {
type Error = $error;
async fn run(&mut $self) -> Result<(), Self::Error> {
while let Some($msg) = $self.mailbox.recv().await $process_body
Ok(())
}
}
};
}
#[macro_export]
macro_rules! stateful_supervision_tree {
(
name: $name:expr,
strategy: $strategy:ident,
intensity: ($max:expr, $secs:expr),
workers: [ $(($id:expr, $factory:expr, $policy:ident)),* $(,)? ],
supervisors: [ $($sup:expr),* $(,)? ]
) => {
{
let spec = $crate::StatefulSupervisorSpec::new($name)
.with_restart_strategy($crate::RestartStrategy::$strategy)
.with_restart_intensity($crate::RestartIntensity {
max_restarts: $max,
within_seconds: $secs,
})
$(
.with_worker($id, $factory, $crate::RestartPolicy::$policy)
)*
$(
.with_supervisor($sup)
)*;
spec
}
};
(
name: $name:expr,
strategy: $strategy:ident,
workers: [ $(($id:expr, $factory:expr, $policy:ident)),* $(,)? ],
supervisors: [ $($sup:expr),* $(,)? ]
) => {
{
let spec = $crate::StatefulSupervisorSpec::new($name)
.with_restart_strategy($crate::RestartStrategy::$strategy)
$(
.with_worker($id, $factory, $crate::RestartPolicy::$policy)
)*
$(
.with_supervisor($sup)
)*;
spec
}
};
}
#[macro_export]
macro_rules! supervision_tree {
(
name: $name:expr,
strategy: $strategy:ident,
intensity: ($max:expr, $secs:expr),
workers: [ $(($id:expr, $factory:expr, $policy:ident)),* $(,)? ],
supervisors: [ $($sup:expr),* $(,)? ]
) => {
{
let spec = $crate::SupervisorSpec::new($name)
.with_restart_strategy($crate::RestartStrategy::$strategy)
.with_restart_intensity($crate::RestartIntensity {
max_restarts: $max,
within_seconds: $secs,
})
$(
.with_worker($id, $factory, $crate::RestartPolicy::$policy)
)*
$(
.with_supervisor($sup)
)*;
spec
}
};
(
name: $name:expr,
strategy: $strategy:ident,
workers: [ $(($id:expr, $factory:expr, $policy:ident)),* $(,)? ],
supervisors: [ $($sup:expr),* $(,)? ]
) => {
{
let spec = $crate::SupervisorSpec::new($name)
.with_restart_strategy($crate::RestartStrategy::$strategy)
$(
.with_worker($id, $factory, $crate::RestartPolicy::$policy)
)*
$(
.with_supervisor($sup)
)*;
spec
}
};
}
#[macro_export]
macro_rules! serve_supervisor {
(tcp, $handle:expr, $addr:expr) => {{
let server = $crate::distributed::SupervisorServer::new($handle);
tokio::spawn(async move { server.listen_tcp($addr).await })
}};
(unix, $handle:expr, $path:expr) => {{
let server = $crate::distributed::SupervisorServer::new($handle);
tokio::spawn(async move { server.listen_unix($path).await })
}};
}
#[macro_export]
macro_rules! connect_supervisor {
(tcp, $addr:expr) => {
$crate::distributed::RemoteSupervisorHandle::connect_tcp($addr)
};
(unix, $path:expr) => {
$crate::distributed::RemoteSupervisorHandle::connect_unix($path)
};
}
#[macro_export]
macro_rules! distributed_system {
(
server: tcp @ $addr:expr => {
name: $name:expr,
strategy: $strategy:ident,
workers: [ $(($id:expr, $factory:expr, $policy:ident)),* $(,)? ],
supervisors: [ $($sup:expr),* $(,)? ]
}
) => {
{
let spec = supervision_tree! {
name: $name,
strategy: $strategy,
workers: [ $(($id, $factory, $policy)),* ],
supervisors: [ $($sup),* ]
};
let handle = $crate::SupervisorHandle::start(spec);
let server = $crate::distributed::SupervisorServer::new(handle.clone());
let server_task = tokio::spawn(async move {
server.listen_tcp($addr).await
});
(handle, server_task)
}
};
(
server: unix @ $path:expr => {
name: $name:expr,
strategy: $strategy:ident,
workers: [ $(($id:expr, $factory:expr, $policy:ident)),* $(,)? ],
supervisors: [ $($sup:expr),* $(,)? ]
}
) => {
{
let spec = supervision_tree! {
name: $name,
strategy: $strategy,
workers: [ $(($id, $factory, $policy)),* ],
supervisors: [ $($sup),* ]
};
let handle = $crate::SupervisorHandle::start(spec);
let server = $crate::distributed::SupervisorServer::new(handle.clone());
let server_task = tokio::spawn(async move {
server.listen_unix($path).await
});
(handle, server_task)
}
};
}