use crate::child_ref::ChildRef;
use crate::children_ref::ChildrenRef;
use crate::dispatcher::{BroadcastTarget, DispatcherType, NotificationType};
use crate::envelope::{Envelope, RefAddr, SignedMessage};
use crate::message::{Answer, BastionMessage, Message, Msg};
use crate::supervisor::SupervisorRef;
use crate::{prelude::ReceiveError, system::SYSTEM};
use crossbeam_queue::SegQueue;
use futures::pending;
use futures::FutureExt;
use futures_timer::Delay;
#[cfg(feature = "scaling")]
use lever::table::lotable::LOTable;
use std::fmt::{self, Display, Formatter};
use std::pin::Pin;
#[cfg(feature = "scaling")]
use std::sync::atomic::AtomicU64;
use std::{sync::Arc, time::Duration};
use tracing::{debug, trace};
use uuid::Uuid;
pub const NIL_ID: BastionId = BastionId(Uuid::nil());
#[derive(Hash, Eq, PartialEq, Debug, Clone)]
pub struct BastionId(pub(crate) Uuid);
#[derive(Debug)]
pub struct BastionContext {
id: BastionId,
child: ChildRef,
children: ChildrenRef,
supervisor: Option<SupervisorRef>,
state: Arc<Pin<Box<ContextState>>>,
}
#[derive(Debug)]
pub(crate) struct ContextState {
messages: SegQueue<SignedMessage>,
#[cfg(feature = "scaling")]
stats: Arc<AtomicU64>,
#[cfg(feature = "scaling")]
actor_stats: Arc<LOTable<BastionId, u32>>,
}
impl BastionId {
pub(crate) fn new() -> Self {
let uuid = Uuid::new_v4();
BastionId(uuid)
}
}
impl BastionContext {
pub(crate) fn new(
id: BastionId,
child: ChildRef,
children: ChildrenRef,
supervisor: Option<SupervisorRef>,
state: Arc<Pin<Box<ContextState>>>,
) -> Self {
debug!("BastionContext({}): Creating.", id);
BastionContext {
id,
child,
children,
supervisor,
state,
}
}
pub fn current(&self) -> &ChildRef {
&self.child
}
pub fn parent(&self) -> &ChildrenRef {
&self.children
}
pub fn supervisor(&self) -> Option<&SupervisorRef> {
self.supervisor.as_ref()
}
pub async fn try_recv(&self) -> Option<SignedMessage> {
Delay::new(Duration::from_millis(0)).await;
trace!("BastionContext({}): Trying to receive message.", self.id);
if let Some(msg) = self.state.pop_message() {
trace!("BastionContext({}): Received message: {:?}", self.id, msg);
Some(msg)
} else {
trace!("BastionContext({}): Received no message.", self.id);
None
}
}
pub async fn recv(&self) -> Result<SignedMessage, ()> {
debug!("BastionContext({}): Waiting to receive message.", self.id);
loop {
if let Some(msg) = self.state.pop_message() {
trace!("BastionContext({}): Received message: {:?}", self.id, msg);
return Ok(msg);
}
pending!();
}
}
pub async fn try_recv_timeout(&self, timeout: Duration) -> Result<SignedMessage, ReceiveError> {
debug!(
"BastionContext({}): Waiting to receive message within {} milliseconds.",
self.id,
timeout.as_millis()
);
futures::select! {
message = self.recv().fuse() => {
message.map_err(|_| ReceiveError::Other)
},
_duration = Delay::new(timeout).fuse() => {
Err(ReceiveError::Timeout(timeout))
}
}
}
pub fn signature(&self) -> RefAddr {
RefAddr::new(
self.current().path().clone(),
self.current().sender().clone(),
)
}
pub fn tell<M: Message>(&self, to: &RefAddr, msg: M) -> Result<(), M> {
debug!(
"{:?}: Telling message: {:?} to: {:?}",
self.current().path(),
msg,
to.path()
);
let msg = BastionMessage::tell(msg);
let env = Envelope::new_with_sign(msg, self.signature());
to.sender()
.unbounded_send(env)
.map_err(|err| err.into_inner().into_msg().unwrap())
}
pub fn ask<M: Message>(&self, to: &RefAddr, msg: M) -> Result<Answer, M> {
debug!(
"{:?}: Asking message: {:?} to: {:?}",
self.current().path(),
msg,
to
);
let (msg, answer) = BastionMessage::ask(msg, self.signature());
let env = Envelope::new_with_sign(msg, self.signature());
to.sender()
.unbounded_send(env)
.map_err(|err| err.into_inner().into_msg().unwrap())?;
Ok(answer)
}
pub fn notify(&self, dispatchers: &[DispatcherType], notification_type: NotificationType) {
let global_dispatcher = SYSTEM.dispatcher();
let from_actor = self.current();
global_dispatcher.notify(from_actor, dispatchers, notification_type);
}
pub fn broadcast_message<M: Message>(&self, target: BroadcastTarget, message: M) {
let msg = Arc::new(SignedMessage {
msg: Msg::broadcast(message),
sign: self.signature(),
});
let global_dispatcher = SYSTEM.dispatcher();
global_dispatcher.broadcast_message(target, &msg);
}
}
impl ContextState {
pub(crate) fn new() -> Self {
ContextState {
messages: SegQueue::new(),
#[cfg(feature = "scaling")]
stats: Arc::new(AtomicU64::new(0)),
#[cfg(feature = "scaling")]
actor_stats: Arc::new(LOTable::new()),
}
}
#[cfg(feature = "scaling")]
pub(crate) fn set_stats(&mut self, stats: Arc<AtomicU64>) {
self.stats = stats;
}
#[cfg(feature = "scaling")]
pub(crate) fn set_actor_stats(&mut self, actor_stats: Arc<LOTable<BastionId, u32>>) {
self.actor_stats = actor_stats;
}
#[cfg(feature = "scaling")]
pub(crate) fn stats(&self) -> Arc<AtomicU64> {
self.stats.clone()
}
#[cfg(feature = "scaling")]
pub(crate) fn actor_stats(&self) -> Arc<LOTable<BastionId, u32>> {
self.actor_stats.clone()
}
pub(crate) fn push_message(&self, msg: Msg, sign: RefAddr) {
self.messages.push(SignedMessage::new(msg, sign))
}
pub(crate) fn pop_message(&self) -> Option<SignedMessage> {
self.messages.pop()
}
#[cfg(feature = "scaling")]
pub(crate) fn mailbox_size(&self) -> u32 {
self.messages.len() as _
}
}
impl Display for BastionId {
fn fmt(&self, fmt: &mut Formatter) -> fmt::Result {
self.0.fmt(fmt)
}
}
#[cfg(test)]
mod context_tests {
use super::*;
use crate::prelude::*;
use crate::Bastion;
use std::panic;
#[cfg(feature = "tokio-runtime")]
mod tokio_tests {
#[tokio::test]
async fn test_context() {
super::test_context()
}
}
#[cfg(not(feature = "tokio-runtime"))]
mod no_tokio_tests {
#[test]
fn test_context() {
super::test_context()
}
}
fn test_context() {
Bastion::init();
Bastion::start();
test_recv();
test_try_recv();
test_try_recv_fail();
test_try_recv_timeout();
test_try_recv_timeout_fail();
}
fn test_recv() {
let children = Bastion::children(|children| {
children.with_exec(|ctx: BastionContext| async move {
msg! { ctx.recv().await?,
ref msg: &'static str => {
assert_eq!(msg, &"test recv");
};
msg: _ => { panic!("didn't receive the expected message {:?}", msg);};
}
Ok(())
})
})
.expect("Couldn't create the children group.");
children
.broadcast("test recv")
.expect("couldn't send message");
}
fn test_try_recv() {
let children = Bastion::children(|children| {
children.with_exec(|ctx: BastionContext| async move {
Delay::new(std::time::Duration::from_millis(1)).await;
msg! { ctx.try_recv().await.expect("no message"),
ref msg: &'static str => {
assert_eq!(msg, &"test try recv");
};
_: _ => { panic!("didn't receive the expected message");};
}
Ok(())
})
})
.expect("Couldn't create the children group.");
children
.broadcast("test try recv")
.expect("couldn't send message");
}
fn test_try_recv_fail() {
Bastion::children(|children| {
children.with_exec(|ctx: BastionContext| async move {
assert!(ctx.try_recv().await.is_none());
Ok(())
})
})
.expect("Couldn't create the children group.");
}
fn test_try_recv_timeout() {
let children =
Bastion::children(|children| {
children.with_exec(|ctx: BastionContext| async move {
msg! { ctx.try_recv_timeout(std::time::Duration::from_millis(5)).await.expect("recv_timeout failed"),
ref msg: &'static str => {
assert_eq!(msg, &"test recv timeout");
};
_: _ => { panic!("didn't receive the expected message");};
}
Ok(())
})
})
.expect("Couldn't create the children group.");
children
.broadcast("test recv timeout")
.expect("couldn't send message");
}
fn test_try_recv_timeout_fail() {
let children = Bastion::children(|children| {
children.with_exec(|ctx: BastionContext| async move {
assert!(ctx
.try_recv_timeout(std::time::Duration::from_millis(1))
.await
.is_err());
Ok(())
})
})
.expect("Couldn't create the children group.");
run!(async { Delay::new(std::time::Duration::from_millis(2)).await });
children.broadcast("test recv timeout").unwrap();
}
}