use crate::broadcast::{Broadcast, Parent};
use crate::children::Children;
use crate::children_ref::ChildrenRef;
use crate::config::Config;
use crate::context::{BastionContext, BastionId};
use crate::envelope::Envelope;
use crate::message::{BastionMessage, Message};
use crate::path::BastionPathElement;
use crate::supervisor::{Supervisor, SupervisorRef};
use crate::system::SYSTEM;
use core::future::Future;
use tracing::{debug, trace};
use std::fmt::{self, Debug, Formatter};
distributed_api! {
use std::sync::Arc;
use crate::distributed::*;
use artillery_core::cluster::ap::*;
}
pub struct Bastion {
_priv: (),
}
impl Bastion {
pub fn init() {
let config = Config::default();
Bastion::init_with(config)
}
pub fn init_with(config: Config) {
debug!("Bastion: Initializing with config: {:?}", config);
if config.backtraces().is_hide() {
debug!("Bastion: Hiding backtraces.");
std::panic::set_hook(Box::new(|_| ()));
}
lazy_static::initialize(&SYSTEM);
}
pub fn supervisor<S>(init: S) -> Result<SupervisorRef, ()>
where
S: FnOnce(Supervisor) -> Supervisor,
{
debug!("Bastion: Creating supervisor.");
let parent = Parent::system();
let bcast = Broadcast::new(parent, BastionPathElement::Supervisor(BastionId::new()));
debug!("Bastion: Initializing Supervisor({}).", bcast.id());
let supervisor = Supervisor::new(bcast);
let supervisor = init(supervisor);
debug!("Supervisor({}): Initialized.", supervisor.id());
let supervisor_ref = supervisor.as_ref();
debug!("Bastion: Deploying Supervisor({}).", supervisor.id());
let msg = BastionMessage::deploy_supervisor(supervisor);
let envelope = Envelope::new(msg, SYSTEM.path().clone(), SYSTEM.sender().clone());
trace!("Bastion: Sending envelope: {:?}", envelope);
SYSTEM.sender().unbounded_send(envelope).map_err(|_| ())?;
Ok(supervisor_ref)
}
pub fn children<C>(init: C) -> Result<ChildrenRef, ()>
where
C: FnOnce(Children) -> Children,
{
debug!("Bastion: Creating children group.");
SYSTEM.supervisor().children(init)
}
pub fn spawn<I, F>(action: I) -> Result<ChildrenRef, ()>
where
I: Fn(BastionContext) -> F + Send + 'static,
F: Future<Output = Result<(), ()>> + Send + 'static,
{
Bastion::children(|ch| ch.with_redundancy(1).with_exec(action))
}
distributed_api! {
#[allow(missing_docs)]
pub fn distributed<I, F>(cluster_config: &'static ArtilleryAPClusterConfig, action: I) -> Result<ChildrenRef, ()>
where
I: Fn(Arc<DistributedContext>) -> F + Send + Sync + 'static,
F: Future<Output = Result<(), ()>> + Send + 'static,
{
cluster_actor(cluster_config, action)
}
}
pub fn broadcast<M: Message>(msg: M) -> Result<(), M> {
debug!("Bastion: Broadcasting message: {:?}", msg);
let msg = BastionMessage::broadcast(msg);
let envelope = Envelope::from_dead_letters(msg);
trace!("Bastion: Sending envelope: {:?}", envelope);
SYSTEM
.sender()
.unbounded_send(envelope)
.map_err(|err| err.into_inner().into_msg().unwrap())
}
pub fn start() {
debug!("Bastion: Starting.");
let msg = BastionMessage::start();
let envelope = Envelope::from_dead_letters(msg);
trace!("Bastion: Sending envelope: {:?}", envelope);
SYSTEM.sender().unbounded_send(envelope).ok();
}
pub fn stop() {
debug!("Bastion: Stopping.");
let msg = BastionMessage::stop();
let envelope = Envelope::from_dead_letters(msg);
trace!("Bastion: Sending envelope: {:?}", envelope);
SYSTEM.sender().unbounded_send(envelope).ok();
}
pub fn kill() {
debug!("Bastion: Killing.");
let msg = BastionMessage::kill();
let envelope = Envelope::from_dead_letters(msg);
trace!("Bastion: Sending envelope: {:?}", envelope);
SYSTEM.sender().unbounded_send(envelope).ok();
let handle = SYSTEM.handle();
let system = crate::executor::run(async { handle.lock().await.take() });
if let Some(system) = system {
debug!("Bastion: Cancelling system handle.");
system.cancel();
}
SYSTEM.notify_stopped();
}
pub fn block_until_stopped() {
debug!("Bastion: Blocking until system is stopped.");
SYSTEM.wait_until_stopped();
}
}
impl Debug for Bastion {
fn fmt(&self, fmt: &mut Formatter) -> fmt::Result {
fmt.debug_struct("Bastion").finish()
}
}