//!
//! A context allows a child's future to access its received
//! messages, parent and supervisor.
use crate::child_ref::ChildRef;
use crate::children_ref::ChildrenRef;
use crate::dispatcher::{BroadcastTarget, DispatcherType, NotificationType};
use crate::envelope::{Envelope, RefAddr, SignedMessage};
use crate::message::{Answer, BastionMessage, Message, Msg};
use crate::supervisor::SupervisorRef;
use crate::system::SYSTEM;
use async_mutex::Mutex;
use futures::pending;
#[cfg(feature = "scaling")]
use lever::table::lotable::LOTable;
use std::collections::VecDeque;
use std::fmt::{self, Display, Formatter};
use std::pin::Pin;
#[cfg(feature = "scaling")]
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use tracing::{debug, trace};
use uuid::Uuid;
/// Identifier for a root supervisor and dead-letters children.
pub const NIL_ID: BastionId = BastionId(Uuid::nil());
#[derive(Hash, Eq, PartialEq, Debug, Clone)]
/// An identifier used by supervisors, children groups and
/// their elements to identify themselves, using a v4 UUID.
///
/// A `BastionId` is unique to its attached element and is
/// reset when it is restarted. A special `BastionId` exists
/// for the "system supervisor" (the supervisor created by
/// the system at startup) which is a nil UUID
/// (00000000-0000-0000-0000-000000000000).
///
/// # Example
///
/// ```rust
/// # use bastion::prelude::*;
/// #
/// # Bastion::init();
/// #
/// Bastion::children(|children| {
/// children.with_exec(|ctx| {
/// async move {
/// let child_id: &BastionId = ctx.current().id();
/// // ...
/// # Ok(())
/// }
/// })
/// }).expect("Couldn't create the children group.");
/// #
/// # Bastion::start();
/// # Bastion::stop();
/// # Bastion::block_until_stopped();
/// ```
pub struct BastionId(pub(crate) Uuid);
#[derive(Debug)]
/// A child's execution context, allowing its [`exec`] future
/// to receive messages and access a [`ChildRef`] referencing
/// it, a [`ChildrenRef`] referencing its children group and
/// a [`SupervisorRef`] referencing its supervisor.
///
/// # Example
///
/// ```rust
/// # use bastion::prelude::*;
/// #
/// # Bastion::init();
/// #
/// Bastion::children(|children| {
/// children.with_exec(|ctx: BastionContext| {
/// async move {
/// // Get a `ChildRef` referencing the child executing
/// // this future...
/// let current: &ChildRef = ctx.current();
/// // Get a `ChildrenRef` referencing the children
/// // group of the child executing this future...
/// let parent: &ChildrenRef = ctx.parent();
/// // Try to get a `SupervisorRef` referencing the
/// // supervisor of the child executing this future...
/// let supervisor: Option<&SupervisorRef> = ctx.supervisor();
/// // Note that `supervisor` will be `None` because
/// // this child was created using `Bastion::children`,
/// // which made it supervised by the system supervisor
/// // (which users can't get a reference to).
///
/// // Try to receive a message...
/// let opt_msg: Option<SignedMessage> = ctx.try_recv().await;
/// // Wait for a message to be received...
/// let msg: SignedMessage = ctx.recv().await?;
///
/// Ok(())
/// }
/// })
/// }).expect("Couldn't create the children group.");
/// #
/// # Bastion::start();
/// # Bastion::stop();
/// # Bastion::block_until_stopped();
/// ```
pub struct BastionContext {
id: BastionId,
child: ChildRef,
children: ChildrenRef,
supervisor: Option<SupervisorRef>,
state: Arc<Mutex<Pin<Box<ContextState>>>>,
}
#[derive(Debug)]
pub(crate) struct ContextState {
messages: VecDeque<SignedMessage>,
#[cfg(feature = "scaling")]
stats: Arc<AtomicU64>,
#[cfg(feature = "scaling")]
actor_stats: Arc<LOTable<BastionId, u32>>,
}
impl BastionId {
pub(crate) fn new() -> Self {
let uuid = Uuid::new_v4();
BastionId(uuid)
}
}
impl BastionContext {
pub(crate) fn new(
id: BastionId,
child: ChildRef,
children: ChildrenRef,
supervisor: Option<SupervisorRef>,
state: Arc<Mutex<Pin<Box<ContextState>>>>,
) -> Self {
debug!("BastionContext({}): Creating.", id);
BastionContext {
id,
child,
children,
supervisor,
state,
}
}
/// Returns a [`ChildRef`] referencing the children group's
/// element that is linked to this `BastionContext`.
///
/// # Example
///
/// ```rust
/// # use bastion::prelude::*;
/// #
/// # Bastion::init();
/// #
/// Bastion::children(|children| {
/// children.with_exec(|ctx: BastionContext| {
/// async move {
/// let current: &ChildRef = ctx.current();
/// // Stop or kill the current element (note that this will
/// // only take effect after this future becomes "pending")...
///
/// Ok(())
/// }
/// })
/// }).expect("Couldn't create the children group.");
/// #
/// # Bastion::start();
/// # Bastion::stop();
/// # Bastion::block_until_stopped();
/// ```
///
/// [`ChildRef`]: children/struct.ChildRef.html
pub fn current(&self) -> &ChildRef {
&self.child
}
/// Returns a [`ChildrenRef`] referencing the children group
/// of the element that is linked to this `BastionContext`.
///
/// # Example
///
/// ```rust
/// # use bastion::prelude::*;
/// #
/// # Bastion::init();
/// #
/// Bastion::children(|children| {
/// children.with_exec(|ctx: BastionContext| {
/// async move {
/// let parent: &ChildrenRef = ctx.parent();
/// // Get the other elements of the group, broadcast message,
/// // or stop or kill the children group...
///
/// Ok(())
/// }
/// })
/// }).expect("Couldn't create the children group.");
/// #
/// # Bastion::start();
/// # Bastion::stop();
/// # Bastion::block_until_stopped();
/// ```
///
/// [`ChildrenRef`]: children/struct.ChildrenRef.html
pub fn parent(&self) -> &ChildrenRef {
&self.children
}
/// Returns a [`SupervisorRef`] referencing the supervisor
/// that supervises the element that is linked to this
/// `BastionContext` if it isn't the system supervisor
/// (ie. if the children group wasn't created using
/// [`Bastion::children`]).
///
/// # Example
///
/// ```rust
/// # use bastion::prelude::*;
/// #
/// # Bastion::init();
/// #
/// // When calling the method from a children group supervised
/// // by a supervisor created by the user...
/// Bastion::supervisor(|sp| {
/// sp.children(|children| {
/// children.with_exec(|ctx: BastionContext| {
/// async move {
/// // ...the method will return a SupervisorRef referencing the
/// // user-created supervisor...
/// let supervisor: Option<&SupervisorRef> = ctx.supervisor(); // Some
/// assert!(supervisor.is_some());
///
/// Ok(())
/// }
/// })
/// })
/// }).expect("Couldn't create the supervisor.");
///
/// // When calling the method from a children group supervised
/// // by the system's supervisor...
/// Bastion::children(|children| {
/// children.with_exec(|ctx: BastionContext| {
/// async move {
/// // ...the method won't return a SupervisorRef...
/// let supervisor: Option<&SupervisorRef> = ctx.supervisor(); // None
/// assert!(supervisor.is_none());
///
/// Ok(())
/// }
/// })
/// }).expect("Couldn't create the children group.");
/// #
/// # Bastion::start();
/// # Bastion::stop();
/// # Bastion::block_until_stopped();
/// ```
///
/// [`SupervisorRef`]: supervisor/struct.SupervisorRef.html
/// [`Bastion::children`]: struct.Bastion.html#method.children
pub fn supervisor(&self) -> Option<&SupervisorRef> {
self.supervisor.as_ref()
}
/// Tries to retrieve asynchronously a message received by
/// the element this `BastionContext` is linked to.
///
/// If you need to wait (always asynchronously) until at
/// least one message can be retrieved, use [`recv`] instead.
///
/// This method returns [`SignedMessage`] if a message was available, or
/// `None` otherwise.
///
/// # Example
///
/// ```rust
/// # use bastion::prelude::*;
/// #
/// # Bastion::init();
/// #
/// Bastion::children(|children| {
/// children.with_exec(|ctx: BastionContext| {
/// async move {
/// let opt_msg: Option<SignedMessage> = ctx.try_recv().await;
/// // If a message was received by the element, `opt_msg` will
/// // be `Some(Msg)`, otherwise it will be `None`.
///
/// Ok(())
/// }
/// })
/// }).expect("Couldn't create the children group.");
/// #
/// # Bastion::start();
/// # Bastion::stop();
/// # Bastion::block_until_stopped();
/// ```
///
/// [`recv`]: #method.recv
/// [`SignedMessage`]: ../prelude/struct.SignedMessage.html
pub async fn try_recv(&self) -> Option<SignedMessage> {
debug!("BastionContext({}): Trying to receive message.", self.id);
let state = self.state.clone();
let mut guard = state.lock().await;
if let Some(msg) = guard.pop_message() {
trace!("BastionContext({}): Received message: {:?}", self.id, msg);
Some(msg)
} else {
trace!("BastionContext({}): Received no message.", self.id);
None
}
}
/// Retrieves asynchronously a message received by the element
/// this `BastionContext` is linked to and waits (always
/// asynchronously) for one if none has been received yet.
///
/// If you don't need to wait until at least one message
/// can be retrieved, use [`try_recv`] instead.
///
/// This method returns [`SignedMessage`] if it succeeded, or `Err(())`
/// otherwise.
///
/// # Example
///
/// ```rust
/// # use bastion::prelude::*;
/// #
/// # Bastion::init();
/// #
/// Bastion::children(|children| {
/// children.with_exec(|ctx: BastionContext| {
/// async move {
/// // This will block until a message has been received...
/// let msg: SignedMessage = ctx.recv().await?;
///
/// Ok(())
/// }
/// })
/// }).expect("Couldn't create the children group.");
/// #
/// # Bastion::start();
/// # Bastion::stop();
/// # Bastion::block_until_stopped();
/// ```
///
/// [`try_recv`]: #method.try_recv
/// [`SignedMessage`]: ../prelude/struct.SignedMessage.html
pub async fn recv(&self) -> Result<SignedMessage, ()> {
debug!("BastionContext({}): Waiting to receive message.", self.id);
loop {
let state = self.state.clone();
let mut guard = state.lock().await;
if let Some(msg) = guard.pop_message() {
trace!("BastionContext({}): Received message: {:?}", self.id, msg);
return Ok(msg);
}
drop(guard);
pending!();
}
}
/// Returns [`RefAddr`] of the current `BastionContext`
///
/// # Example
///
/// ```rust
/// # use bastion::prelude::*;
/// #
/// # Bastion::init();
/// #
///
/// Bastion::children(|children| {
/// children.with_exec(|ctx: BastionContext| {
/// async move {
/// ctx.tell(&ctx.signature(), "Hello to myself");
///
/// # Bastion::stop();
/// Ok(())
/// }
/// })
/// }).expect("Couldn't create the children group.");
/// #
/// # Bastion::start();
/// # Bastion::block_until_stopped();
/// ```
///
/// [`RefAddr`]: /prelude/struct.Answer.html
pub fn signature(&self) -> RefAddr {
RefAddr::new(
self.current().path().clone(),
self.current().sender().clone(),
)
}
/// Sends a message to the specified [`RefAddr`]
///
/// # Arguments
///
/// * `to` – the [`RefAddr`] to send the message to
/// * `msg` – The actual message to send
///
/// # Example
///
/// ```rust
/// # use bastion::prelude::*;
/// #
/// # Bastion::init();
/// #
/// Bastion::children(|children| {
/// children.with_exec(|ctx: BastionContext| {
/// async move {
/// // Wait for a message to be received...
/// let smsg: SignedMessage = ctx.recv().await?;
/// // Obtain address of this message sender...
/// let sender_addr = smsg.signature();
/// // And send something back
/// ctx.tell(&sender_addr, "Ack").expect("Unable to acknowledge");
/// Ok(())
/// }
/// })
/// }).expect("Couldn't create the children group.");
/// #
/// # Bastion::start();
/// # Bastion::stop();
/// # Bastion::block_until_stopped();
/// ```
///
/// [`RefAddr`]: ../prelude/struct.RefAddr.html
pub fn tell<M: Message>(&self, to: &RefAddr, msg: M) -> Result<(), M> {
debug!(
"{:?}: Telling message: {:?} to: {:?}",
self.current().path(),
msg,
to.path()
);
let msg = BastionMessage::tell(msg);
let env = Envelope::new_with_sign(msg, self.signature());
// FIXME: panics?
to.sender()
.unbounded_send(env)
.map_err(|err| err.into_inner().into_msg().unwrap())
}
/// Sends a message from behalf of current context to the addr,
/// allowing to addr owner answer.
///
/// This method returns [`Answer`] if it succeeded, or `Err(msg)`
/// otherwise.
///
/// # Argument
///
/// * `msg` - The message to send.
///
/// # Example
///
/// ```
/// # use bastion::prelude::*;
/// #
/// # fn main() {
/// # Bastion::init();
/// // The message that will be "asked"...
/// const ASK_MSG: &'static str = "A message containing data (ask).";
/// // The message the will be "answered"...
/// const ANSWER_MSG: &'static str = "A message containing data (answer).";
///
/// # let children_ref =
/// // Create a new child...
/// Bastion::children(|children| {
/// children.with_exec(|ctx: BastionContext| {
/// async move {
/// // ...which will receive the message asked...
/// msg! { ctx.recv().await?,
/// msg: &'static str =!> {
/// assert_eq!(msg, ASK_MSG);
/// // Handle the message...
///
/// // ...and eventually answer to it...
/// answer!(ctx, ANSWER_MSG);
/// };
/// // This won't happen because this example
/// // only "asks" a `&'static str`...
/// _: _ => ();
/// }
///
/// Ok(())
/// }
/// })
/// }).expect("Couldn't create the children group.");
///
/// # Bastion::children(|children| {
/// # children.with_exec(move |ctx: BastionContext| {
/// # let child_ref = children_ref.elems()[0].clone();
/// # async move {
/// // Later, the message is "asked" to the child...
/// let answer: Answer = ctx.ask(&child_ref.addr(), ASK_MSG).expect("Couldn't send the message.");
///
/// // ...and the child's answer is received...
/// msg! { answer.await.expect("Couldn't receive the answer."),
/// msg: &'static str => {
/// assert_eq!(msg, ANSWER_MSG);
/// // Handle the answer...
/// };
/// // This won't happen because this example
/// // only answers a `&'static str`...
/// _: _ => ();
/// }
/// #
/// # Ok(())
/// # }
/// # })
/// # }).unwrap();
/// #
/// # Bastion::start();
/// # Bastion::stop();
/// # Bastion::block_until_stopped();
/// # }
/// ```
///
/// [`Answer`]: /message/struct.Answer.html
pub fn ask<M: Message>(&self, to: &RefAddr, msg: M) -> Result<Answer, M> {
debug!(
"{:?}: Asking message: {:?} to: {:?}",
self.current().path(),
msg,
to
);
let (msg, answer) = BastionMessage::ask(msg);
let env = Envelope::new_with_sign(msg, self.signature());
// FIXME: panics?
to.sender()
.unbounded_send(env)
.map_err(|err| err.into_inner().into_msg().unwrap())?;
Ok(answer)
}
/// Sends the notification to each declared dispatcher of the actor.
///
/// # Argument
///
/// * `dispatchers` - Vector of dispatcher names to which need to
/// deliver a notification.
/// * `notification_type` - The type of the notification to send.
///
pub fn notify(&self, dispatchers: &[DispatcherType], notification_type: NotificationType) {
let global_dispatcher = SYSTEM.dispatcher();
let from_actor = self.current();
global_dispatcher.notify(from_actor, dispatchers, notification_type);
}
/// Sends the broadcasted message to the target group(s).
///
/// # Argument
///
/// * `target` - Defines the message receivers in according with
/// the [`BroadcastTarget`] value.
/// * `message` - The broadcasted message.
///
/// [`BroadcastTarget`]: ../dispatcher/enum.DispatcherType.html
pub fn broadcast_message<M: Message>(&self, target: BroadcastTarget, message: M) {
let msg = Arc::new(SignedMessage {
msg: Msg::broadcast(message),
sign: self.signature(),
});
let global_dispatcher = SYSTEM.dispatcher();
global_dispatcher.broadcast_message(target, &msg);
}
}
impl ContextState {
pub(crate) fn new() -> Self {
ContextState {
messages: VecDeque::new(),
#[cfg(feature = "scaling")]
stats: Arc::new(AtomicU64::new(0)),
#[cfg(feature = "scaling")]
actor_stats: Arc::new(LOTable::new()),
}
}
#[cfg(feature = "scaling")]
pub(crate) fn set_stats(&mut self, stats: Arc<AtomicU64>) {
self.stats = stats;
}
#[cfg(feature = "scaling")]
pub(crate) fn set_actor_stats(&mut self, actor_stats: Arc<LOTable<BastionId, u32>>) {
self.actor_stats = actor_stats;
}
#[cfg(feature = "scaling")]
pub(crate) fn stats(&self) -> Arc<AtomicU64> {
self.stats.clone()
}
#[cfg(feature = "scaling")]
pub(crate) fn actor_stats(&self) -> Arc<LOTable<BastionId, u32>> {
self.actor_stats.clone()
}
pub(crate) fn push_message(&mut self, msg: Msg, sign: RefAddr) {
self.messages.push_back(SignedMessage::new(msg, sign))
}
pub(crate) fn pop_message(&mut self) -> Option<SignedMessage> {
self.messages.pop_front()
}
#[cfg(feature = "scaling")]
pub(crate) fn mailbox_size(&self) -> u32 {
self.messages.len() as _
}
}
impl Display for BastionId {
fn fmt(&self, fmt: &mut Formatter) -> fmt::Result {
self.0.fmt(fmt)
}
}