use atomic_take::AtomicTake;
use indexmap::IndexSet;
use rustc_hash::{FxHashMap, FxHasher};
use std::{
any::Any,
future::Future,
hash::BuildHasherDefault,
num::NonZeroUsize,
sync::{Arc, Mutex},
};
use tokio::{
sync::{mpsc, oneshot, watch},
task::JoinHandle,
};
use crate::{
address::Address,
errors::{
PermissionDeniedError, PuppetAlreadyExist, PuppetCannotHandleMessage,
PuppetDoesNotExistError, PuppetError, PuppetOperationError, PuppetSendCommandError,
PuppetSendMessageError, ResourceAlreadyExist,
},
executor::{self, DedicatedExecutor},
message::{
Envelope, Mailbox, Message, Postman, ServiceCommand, ServiceMailbox, ServicePacket,
ServicePostman,
},
pid::{Id, Pid},
prelude::CriticalError,
puppet::{Context, Handler, Puppet, PuppetHandle, PuppetStatus, ResponseFor},
};
pub type BoxedAny = Box<dyn Any + Send + Sync>;
pub type StatusChannels = (watch::Sender<PuppetStatus>, watch::Receiver<PuppetStatus>);
type FxIndexSet<T> = IndexSet<T, BuildHasherDefault<FxHasher>>;
#[derive(Clone, Debug)]
pub struct Puppeteer {
pub(crate) message_postmans: Arc<Mutex<FxHashMap<Pid, BoxedAny>>>,
pub(crate) service_postmans: Arc<Mutex<FxHashMap<Pid, ServicePostman>>>,
pub(crate) statuses: Arc<Mutex<FxHashMap<Pid, StatusChannels>>>,
pub(crate) master_to_puppets: Arc<Mutex<FxHashMap<Pid, FxIndexSet<Pid>>>>,
pub(crate) puppet_to_master: Arc<Mutex<FxHashMap<Pid, Pid>>>,
pub(crate) resources: Arc<Mutex<FxHashMap<Id, Arc<Mutex<BoxedAny>>>>>,
pub(crate) failure_tx: Arc<AtomicTake<oneshot::Sender<CriticalError>>>,
pub(crate) failure_rx: Arc<AtomicTake<oneshot::Receiver<CriticalError>>>,
pub(crate) executor: DedicatedExecutor,
}
impl Default for Puppeteer {
fn default() -> Self {
Self::new()
}
}
#[allow(clippy::expect_used)]
impl Puppeteer {
#[must_use]
pub fn new() -> Self {
let (tx, rx) = oneshot::channel();
let cpus = NonZeroUsize::new(num_cpus::get()).expect("Failed to get number of CPUs");
let executor = DedicatedExecutor::new(cpus);
Self {
message_postmans: Arc::default(),
service_postmans: Arc::default(),
statuses: Arc::default(),
master_to_puppets: Arc::default(),
puppet_to_master: Arc::default(),
resources: Arc::default(),
executor,
failure_tx: Arc::new(AtomicTake::new(tx)),
failure_rx: Arc::new(AtomicTake::new(rx)),
}
}
pub async fn on_unrecoverable_failure<F, Fut>(&self, f: F)
where
F: FnOnce(Puppeteer, CriticalError) -> Fut + Send,
Fut: Future<Output = ()> + Send,
{
if let Some(rx) = self.failure_rx.take() {
if let Ok(err) = rx.await {
f(self.clone(), err).await;
}
}
}
pub async fn wait_for_unrecoverable_failure(self) -> CriticalError {
self.failure_rx.take().unwrap().await.unwrap()
}
pub(crate) fn register_puppet_by_pid<P>(
&self,
master: Pid,
postman: Postman<P>,
service_postman: ServicePostman,
status_tx: watch::Sender<PuppetStatus>,
status_rx: watch::Receiver<PuppetStatus>,
) -> Result<(), PuppetError>
where
P: Puppet,
{
let puppet = Pid::new::<P>();
if self.is_puppet_exists_by_pid(puppet) {
return Err(PuppetAlreadyExist::new(puppet).into());
}
if !self.is_puppet_exists_by_pid(master) && master != puppet {
return Err(PuppetDoesNotExistError::new(master).into());
}
self.message_postmans
.lock()
.expect("Failed to acquire mutex lock")
.insert(puppet, Box::new(postman));
self.service_postmans
.lock()
.expect("Failed to acquire mutex lock")
.insert(puppet, service_postman);
self.master_to_puppets
.lock()
.expect("Failed to acquire mutex lock")
.entry(master)
.or_default()
.insert(puppet);
self.puppet_to_master
.lock()
.expect("Failed to acquire mutex lock")
.insert(puppet, master);
self.statuses
.lock()
.expect("Failed to acquire mutex lock")
.insert(puppet, (status_tx, status_rx));
Ok(())
}
#[must_use]
pub fn is_puppet_exists<P>(&self) -> bool
where
P: Puppet,
{
let puppet = Pid::new::<P>();
self.is_puppet_exists_by_pid(puppet)
}
pub(crate) fn is_puppet_exists_by_pid(&self, puppet: Pid) -> bool {
self.get_puppet_master_by_pid(puppet).is_some()
}
#[must_use]
pub(crate) fn get_postman<P>(&self) -> Option<Postman<P>>
where
P: Puppet,
{
let puppet = Pid::new::<P>();
self.message_postmans
.lock()
.expect("Failed to acquire mutex lock")
.get(&puppet)
.and_then(|boxed| boxed.downcast_ref::<Postman<P>>())
.cloned()
}
pub(crate) fn get_service_postman_by_pid(&self, puppet: Pid) -> Option<ServicePostman> {
self.service_postmans
.lock()
.expect("Failed to acquire mutex lock")
.get(&puppet)
.cloned()
}
pub(crate) fn set_status_by_pid(&self, puppet: Pid, status: PuppetStatus) {
self.statuses
.lock()
.expect("Failed to acquire mutex lock")
.get(&puppet)
.map(|(tx, _)| {
tx.send_if_modified(|current| {
if *current == status {
false
} else {
*current = status;
true
}
})
});
}
#[must_use]
pub fn subscribe_puppet_status<P>(&self) -> Option<watch::Receiver<PuppetStatus>>
where
P: Puppet,
{
let puppet = Pid::new::<P>();
self.subscribe_puppet_status_by_pid(puppet)
}
pub(crate) fn subscribe_puppet_status_by_pid(
&self,
puppet: Pid,
) -> Option<watch::Receiver<PuppetStatus>> {
self.statuses
.lock()
.expect("Failed to acquire mutex lock")
.get(&puppet)
.map(|(_, rx)| rx.clone())
}
#[must_use]
pub fn get_puppet_status<P>(&self) -> Option<PuppetStatus>
where
P: Puppet,
{
let puppet = Pid::new::<P>();
self.get_puppet_status_by_pid(puppet)
}
pub(crate) fn get_puppet_status_by_pid(&self, puppet: Pid) -> Option<PuppetStatus> {
self.statuses
.lock()
.expect("Failed to acquire mutex lock")
.get(&puppet)
.map(|(_, rx)| *rx.borrow())
}
pub(crate) fn puppet_has_puppet_by_pid(&self, master: Pid, puppet: Pid) -> Option<bool> {
self.master_to_puppets
.lock()
.expect("Failed to acquire mutex lock")
.get(&master)
.map(|puppets| puppets.contains(&puppet))
}
pub(crate) fn puppet_has_permission_by_pid(&self, master: Pid, puppet: Pid) -> Option<bool> {
self.puppet_has_puppet_by_pid(master, puppet)
}
pub(crate) fn get_puppet_master_by_pid(&self, puppet: Pid) -> Option<Pid> {
self.puppet_to_master
.lock()
.expect("Failed to acquire mutex lock")
.get(&puppet)
.copied()
}
pub fn set_puppet_master<P, O, M>(&self) -> Result<(), PuppetOperationError>
where
P: Puppet,
O: Puppet,
M: Puppet,
{
let old_master = Pid::new::<O>();
let new_master = Pid::new::<M>();
let puppet = Pid::new::<P>();
self.set_puppet_master_by_pid(old_master, new_master, puppet)
}
pub(crate) fn set_puppet_master_by_pid(
&self,
old_master: Pid,
new_master: Pid,
puppet: Pid,
) -> Result<(), PuppetOperationError> {
match self.puppet_has_permission_by_pid(old_master, puppet) {
None => Err(PuppetDoesNotExistError::new(puppet).into()),
Some(false) => {
Err(PermissionDeniedError::new(old_master, puppet)
.with_message("Cannot change master of another master puppet")
.into())
}
Some(true) => {
self.master_to_puppets
.lock()
.expect("Failed to acquire mutex lock")
.get_mut(&old_master)
.expect("Old master has no puppets")
.shift_remove(&puppet);
self.master_to_puppets
.lock()
.expect("Failed to acquire mutex lock")
.entry(new_master)
.or_default()
.insert(puppet);
self.puppet_to_master
.lock()
.expect("Failed to acquire mutex lock")
.insert(puppet, new_master);
Ok(())
}
}
}
pub(crate) fn get_puppets_by_pid(&self, master: Pid) -> Option<FxIndexSet<Pid>> {
self.master_to_puppets
.lock()
.expect("Failed to acquire mutex lock")
.get(&master)
.cloned()
}
pub(crate) fn detach_puppet_by_pid(
&self,
master: Pid,
puppet: Pid,
) -> Result<(), PuppetOperationError> {
match self.puppet_has_permission_by_pid(master, puppet) {
None => Err(PuppetDoesNotExistError::new(puppet).into()),
Some(false) => {
Err(PermissionDeniedError::new(master, puppet)
.with_message("Cannot detach puppet from another master")
.into())
}
Some(true) => {
self.set_puppet_master_by_pid(master, puppet, puppet)?;
Ok(())
}
}
}
pub fn delete_puppet<O, P>(&self) -> Result<(), PuppetOperationError>
where
O: Puppet,
P: Puppet,
{
let master = Pid::new::<O>();
let puppet = Pid::new::<P>();
self.delete_puppet_by_pid(master, puppet)
}
pub(crate) fn delete_puppet_by_pid(
&self,
master: Pid,
puppet: Pid,
) -> Result<(), PuppetOperationError> {
match self.puppet_has_permission_by_pid(master, puppet) {
None => Err(PuppetDoesNotExistError::new(puppet).into()),
Some(false) => {
Err(PermissionDeniedError::new(master, puppet)
.with_message("Cannot delete puppet of another master")
.into())
}
Some(true) => {
self.message_postmans
.lock()
.expect("Failed to acquire mutex lock")
.remove(&puppet);
self.statuses
.lock()
.expect("Failed to acquire mutex lock")
.remove(&puppet);
self.master_to_puppets
.lock()
.expect("Failed to acquire mutex lock")
.get_mut(&master)
.expect("Master has no puppets")
.shift_remove(&puppet);
self.puppet_to_master
.lock()
.expect("Failed to acquire mutex lock")
.remove(&puppet);
Ok(())
}
}
}
pub fn send<P, E>(&self, message: E) -> Result<(), PuppetSendMessageError>
where
P: Handler<E>,
E: Message,
{
if let Some(postman) = self.get_postman::<P>() {
Ok(postman.send(message)?)
} else {
Err(PuppetDoesNotExistError::new(Pid::new::<P>()).into())
}
}
pub async fn ask<P, E>(&self, message: E) -> Result<ResponseFor<P, E>, PuppetSendMessageError>
where
P: Handler<E>,
E: Message,
{
if let Some(postman) = self.get_postman::<P>() {
Ok(postman.send_and_await_response::<E>(message, None).await?)
} else {
Err(PuppetDoesNotExistError::new(Pid::new::<P>()).into())
}
}
pub(crate) async fn ask_with_timeout<P, E>(
&self,
message: E,
duration: std::time::Duration,
) -> Result<ResponseFor<P, E>, PuppetSendMessageError>
where
P: Handler<E>,
E: Message,
{
if let Some(postman) = self.get_postman::<P>() {
Ok(postman
.send_and_await_response::<E>(message, Some(duration))
.await?)
} else {
Err(PuppetDoesNotExistError::new(Pid::new::<P>()).into())
}
}
pub(crate) async fn send_command_by_pid(
&self,
master: Pid,
puppet: Pid,
command: ServiceCommand,
) -> Result<(), PuppetSendCommandError> {
match self.puppet_has_permission_by_pid(master, puppet) {
None => Err(PuppetDoesNotExistError::new(puppet).into()),
Some(false) => {
Err(PermissionDeniedError::new(master, puppet)
.with_message("Cannot send command to puppet of another master")
.into())
}
Some(true) => {
let Some(serivce_address) = self.get_service_postman_by_pid(puppet) else {
return Err(PuppetDoesNotExistError::new(puppet).into());
};
Ok(serivce_address
.send_and_await_response(puppet, command, None)
.await?)
}
}
}
pub async fn spawn<P, M>(&self, puppet: P) -> Result<Address<P>, PuppetError>
where
P: Puppet,
M: Puppet,
{
let master_pid = Pid::new::<M>();
self.spawn_puppet_by_pid(puppet, master_pid).await
}
#[allow(clippy::impl_trait_in_params)]
pub async fn spawn_self<P>(&self, puppet: P) -> Result<Address<P>, PuppetError>
where
P: Puppet,
{
self.spawn::<P, P>(puppet).await
}
pub(crate) async fn spawn_puppet_by_pid<P>(
&self,
mut puppet: P,
master_pid: Pid,
) -> Result<Address<P>, PuppetError>
where
P: Puppet,
{
let puppet_pid = Pid::new::<P>();
if !self.is_puppet_exists_by_pid(master_pid) && master_pid != puppet_pid {
return Err(PuppetDoesNotExistError::new(master_pid).into());
}
let pid = Pid::new::<P>();
let (status_tx, status_rx) = watch::channel::<PuppetStatus>(PuppetStatus::Inactive);
let (message_tx, message_rx) = mpsc::unbounded_channel::<Box<dyn Envelope<P>>>();
let (command_tx, command_rx) = mpsc::channel::<ServicePacket>(1);
let postman = Postman::new(message_tx);
let service_postman = ServicePostman::new(command_tx);
self.register_puppet_by_pid::<P>(
master_pid,
postman.clone(),
service_postman,
status_tx,
status_rx.clone(),
)?;
let ctx = Context::<P>::new(self.clone());
let handle = PuppetHandle {
status_rx: status_rx.clone(),
message_rx: Mailbox::new(message_rx),
command_rx: ServiceMailbox::new(command_rx),
};
let address = Address {
pid,
status_rx,
message_tx: postman,
pptr: self.clone(),
};
puppet.on_init(&ctx).await?;
ctx.start(&mut puppet, false).await?;
tokio::spawn(run_puppet_loop(puppet, ctx, handle));
Ok(address)
}
pub fn add_resource<T>(&self, resource: T) -> Result<(), ResourceAlreadyExist>
where
T: Send + Sync + Clone + 'static,
{
let id = Id::new::<T>();
if let std::collections::hash_map::Entry::Vacant(e) = self
.resources
.lock()
.expect("Failed to acquire mutex lock")
.entry(id)
{
e.insert(Arc::new(Mutex::new(Box::new(resource))));
Ok(())
} else {
Err(ResourceAlreadyExist { id })
}
}
#[must_use]
pub fn get_resource<T>(&self) -> Option<T>
where
T: Send + Sync + Clone + 'static,
{
let resource = {
let id = Id::new::<T>();
let resources_guard = self.resources.lock().expect("Failed to acquire mutex lock");
Arc::clone(resources_guard.get(&id)?)
};
let boxed = resource
.lock()
.expect("Failed to acquire mutex lock on resource");
let any_ref = boxed.downcast_ref::<T>()?;
Some(any_ref.clone())
}
pub fn with_resource<T, F, R>(&self, f: F) -> Option<R>
where
T: Send + Sync + Clone + 'static,
F: FnOnce(&T) -> R,
{
let resource = {
let id = Id::new::<T>();
let resources_guard = self.resources.lock().expect("Failed to acquire mutex lock");
Arc::clone(resources_guard.get(&id)?)
};
let boxed = resource
.lock()
.expect("Failed to acquire mutex lock on resource");
let any_ref = boxed.downcast_ref::<T>()?;
Some(f(any_ref))
}
pub fn with_expected_resource<T, F, R>(&self, f: F) -> R
where
T: Send + Sync + Clone + 'static,
F: FnOnce(&T) -> R,
{
let resource = {
let id = Id::new::<T>();
let resources_guard = self.resources.lock().expect("Failed to acquire mutex lock");
Arc::clone(resources_guard.get(&id).expect("Resource doesn't exist"))
};
let boxed = resource
.lock()
.expect("Failed to acquire mutex lock on resource");
let any_ref = boxed
.downcast_ref::<T>()
.expect("Failed to downcast resource");
f(any_ref)
}
pub fn with_resource_mut<T, F, R>(&self, f: F) -> Option<R>
where
T: Send + Sync + Clone + 'static,
F: FnOnce(&mut T) -> R,
{
let resource = {
let id = Id::new::<T>();
let resources_guard = self.resources.lock().expect("Failed to acquire mutex lock");
Arc::clone(resources_guard.get(&id)?)
};
let mut boxed = resource
.lock()
.expect("Failed to acquire mutex lock on resource");
let any_mut = boxed.downcast_mut::<T>()?;
Some(f(any_mut))
}
pub fn with_expected_resource_mut<T, F, R>(&self, f: F) -> Option<R>
where
T: Send + Sync + Clone + 'static,
F: FnOnce(&mut T) -> R,
{
let resource = {
let id = Id::new::<T>();
let resources_guard = self.resources.lock().expect("Failed to acquire mutex lock");
Arc::clone(resources_guard.get(&id).expect("Resource doesn't exist"))
};
let mut boxed = resource
.lock()
.expect("Failed to acquire mutex lock on resource");
let any_mut = boxed
.downcast_mut::<T>()
.expect("Failed to downcast resource");
Some(f(any_mut))
}
#[must_use]
pub fn expect_resource<T>(&self) -> T
where
T: Send + Sync + Clone + 'static,
{
self.get_resource::<T>().expect("Resource doesn't exist")
}
pub fn spawn_task<F, Fut, O>(&self, task: F) -> JoinHandle<O>
where
F: FnOnce(Self) -> Fut + Send + 'static,
Fut: Future<Output = O> + Send + 'static,
O: Send + 'static,
{
tokio::spawn(task(self.clone()))
}
pub fn spawn_heavy_task<F, Fut, O>(&self, task: F) -> executor::Job<O>
where
F: FnOnce(Self) -> Fut + Send + 'static,
Fut: Future<Output = O> + Send + 'static,
O: Send + 'static,
{
let cloned_self = self.clone();
self.executor.spawn(async move { task(cloned_self).await })
}
}
pub(crate) async fn run_puppet_loop<P>(
mut puppet: P,
mut ctx: Context<P>,
mut handle: PuppetHandle<P>,
) where
P: Puppet,
{
let mut puppet_status = handle.status_rx;
loop {
tokio::select! {
Ok(()) = puppet_status.changed() => {
if matches!(*puppet_status.borrow(), PuppetStatus::Inactive
| PuppetStatus::Failed) {
tracing::info!(puppet = %ctx.pid, "Stopping loop due to puppet status change");
break;
}
}
Some(mut service_packet) = handle.command_rx.recv() => {
if matches!(*puppet_status.borrow(), PuppetStatus::Active) {
if let Err(err) = service_packet.handle_command(&mut puppet, &mut ctx).await {
tracing::error!(puppet = %ctx.pid, "Failed to handle command: {}", err);
}
} else {
tracing::debug!(puppet = %ctx.pid, "Ignoring command due to non-Active puppet status");
let status = *puppet_status.borrow();
let error_response = PuppetCannotHandleMessage::new(ctx.pid, status).into();
service_packet.reply_error(error_response);
}
}
Some(mut envelope) = handle.message_rx.recv() => {
let status = *puppet_status.borrow();
if matches!(status, PuppetStatus::Active) {
envelope.handle_message(&mut puppet, &mut ctx).await;
} else {
tracing::debug!(puppet = %ctx.pid, "Ignoring message due to non-Active puppet status");
envelope.reply_error(&ctx, PuppetCannotHandleMessage::new(ctx.pid, status).into()).await;
}
}
else => {
tracing::debug!(puppet = %ctx.pid, "Stopping loop due to closed channels");
break;
}
}
}
}
#[allow(unused_variables, clippy::unwrap_used)]
#[cfg(test)]
mod tests {
use std::time::Duration;
use executor::ConcurrentExecutor;
use crate::{executor::SequentialExecutor, supervision::strategy::OneForAll};
use super::*;
#[derive(Debug, Clone, Default)]
struct MasterActor {
failures: usize,
}
impl Puppet for MasterActor {
type Supervision = OneForAll;
async fn reset(&self, ctx: &Context<Self>) -> Result<Self, CriticalError> {
println!("Resetting MasterActor");
Err(CriticalError::new(ctx.pid, "Failed to reset MasterActor"))
}
}
#[derive(Debug, Clone, Default)]
struct PuppetActor {
failures: usize,
}
impl Puppet for PuppetActor {
type Supervision = OneForAll;
async fn reset(&self, ctx: &Context<Self>) -> Result<Self, CriticalError> {
Ok(Self {
failures: self.failures,
})
}
}
#[derive(Debug)]
struct MasterMessage;
#[derive(Debug)]
struct PuppetMessage;
#[derive(Debug)]
struct MasterFailingMessage;
#[derive(Debug)]
struct PuppetFailingMessage;
impl Handler<MasterMessage> for MasterActor {
type Response = ();
type Executor = SequentialExecutor;
async fn handle_message(
&mut self,
_msg: MasterMessage,
_ctx: &Context<Self>,
) -> Result<Self::Response, PuppetError> {
Ok(())
}
}
impl Handler<PuppetMessage> for PuppetActor {
type Response = ();
type Executor = SequentialExecutor;
async fn handle_message(
&mut self,
_msg: PuppetMessage,
_ctx: &Context<Self>,
) -> Result<Self::Response, PuppetError> {
Ok(())
}
}
impl Handler<MasterFailingMessage> for MasterActor {
type Response = ();
type Executor = SequentialExecutor;
async fn handle_message(
&mut self,
_msg: MasterFailingMessage,
ctx: &Context<Self>,
) -> Result<Self::Response, PuppetError> {
println!("Handling MasterFailingMessage. Failures: {}", self.failures);
if self.failures < 3 {
self.failures += 1;
Err(PuppetError::critical(
ctx.pid,
"Failed to handle MasterFailingMessage",
))
} else {
Ok(())
}
}
}
impl Handler<PuppetFailingMessage> for PuppetActor {
type Response = ();
type Executor = SequentialExecutor;
async fn handle_message(
&mut self,
_msg: PuppetFailingMessage,
ctx: &Context<Self>,
) -> Result<Self::Response, PuppetError> {
println!("Handling MasterFailingMessage. Failures: {}", self.failures);
if self.failures < 3 {
self.failures += 1;
Err(PuppetError::critical(
ctx.pid,
"Failed to handle PuppetFailingMessage",
))
} else {
Ok(())
}
}
}
pub fn register_puppet<P, M>(pptr: &Puppeteer) -> Result<(), PuppetError>
where
P: Puppet,
M: Puppet,
{
let (message_tx, _message_rx) = mpsc::unbounded_channel::<Box<dyn Envelope<P>>>();
let (service_tx, _service_rx) = mpsc::channel::<ServicePacket>(1);
let (status_tx, status_rx) = watch::channel::<PuppetStatus>(PuppetStatus::Inactive);
let postman = Postman::new(message_tx);
let service_postman = ServicePostman::new(service_tx);
let master_pid = Pid::new::<M>();
pptr.register_puppet_by_pid(master_pid, postman, service_postman, status_tx, status_rx)
}
#[tokio::test]
async fn test_register() {
let pptr = Puppeteer::new();
let res = register_puppet::<PuppetActor, MasterActor>(&pptr);
assert!(res.is_err());
let res = register_puppet::<PuppetActor, PuppetActor>(&pptr);
assert!(res.is_ok());
let res = register_puppet::<PuppetActor, MasterActor>(&pptr);
assert!(res.is_err());
}
#[tokio::test]
async fn test_is_puppet_exists() {
let pptr = Puppeteer::new();
let res = register_puppet::<PuppetActor, PuppetActor>(&pptr);
assert!(res.is_ok());
assert!(pptr.is_puppet_exists::<PuppetActor>());
assert!(!pptr.is_puppet_exists::<MasterActor>());
}
#[tokio::test]
async fn test_get_postman() {
let pptr = Puppeteer::new();
let res = register_puppet::<PuppetActor, PuppetActor>(&pptr);
assert!(res.is_ok());
assert!(pptr.get_postman::<PuppetActor>().is_some());
assert!(pptr.get_postman::<MasterActor>().is_none());
}
#[tokio::test]
async fn test_get_service_postman_by_pid() {
let pptr = Puppeteer::new();
let res = register_puppet::<PuppetActor, PuppetActor>(&pptr);
assert!(res.is_ok());
let puppet_pid = Pid::new::<PuppetActor>();
assert!(pptr.get_service_postman_by_pid(puppet_pid).is_some());
let master_pid = Pid::new::<MasterActor>();
assert!(pptr.get_service_postman_by_pid(master_pid).is_none());
}
#[tokio::test]
async fn test_get_status_by_pid() {
let pptr = Puppeteer::new();
let res = register_puppet::<PuppetActor, PuppetActor>(&pptr);
assert!(res.is_ok());
let puppet_pid = Pid::new::<PuppetActor>();
assert_eq!(
pptr.get_puppet_status_by_pid(puppet_pid),
Some(PuppetStatus::Inactive)
);
let master_pid = Pid::new::<MasterActor>();
assert!(pptr.get_puppet_status_by_pid(master_pid).is_none());
}
#[tokio::test]
async fn test_set_status_by_pid() {
let pptr = Puppeteer::new();
let res = register_puppet::<PuppetActor, PuppetActor>(&pptr);
assert!(res.is_ok());
let puppet_pid = Pid::new::<PuppetActor>();
pptr.set_status_by_pid(puppet_pid, PuppetStatus::Active);
assert_eq!(
pptr.get_puppet_status_by_pid(puppet_pid),
Some(PuppetStatus::Active)
);
}
#[tokio::test]
async fn test_subscribe_status_by_pid() {
let pptr = Puppeteer::new();
let res = register_puppet::<PuppetActor, PuppetActor>(&pptr);
assert!(res.is_ok());
let puppet_pid = Pid::new::<PuppetActor>();
let rx = pptr.subscribe_puppet_status_by_pid(puppet_pid).unwrap();
pptr.set_status_by_pid(puppet_pid, PuppetStatus::Active);
assert_eq!(*rx.borrow(), PuppetStatus::Active);
}
#[tokio::test]
async fn test_has_puppet_by_pid() {
let pptr = Puppeteer::new();
let res = register_puppet::<MasterActor, MasterActor>(&pptr);
assert!(res.is_ok());
let res = register_puppet::<PuppetActor, MasterActor>(&pptr);
assert!(res.is_ok());
let master_pid = Pid::new::<MasterActor>();
let puppet_pid = Pid::new::<PuppetActor>();
assert!(pptr
.puppet_has_puppet_by_pid(master_pid, puppet_pid)
.is_some());
let master_pid = Pid::new::<PuppetActor>();
assert!(pptr
.puppet_has_puppet_by_pid(master_pid, puppet_pid)
.is_none());
}
#[tokio::test]
async fn test_has_permission_by_pid() {
let pptr = Puppeteer::new();
let res = register_puppet::<MasterActor, MasterActor>(&pptr);
assert!(res.is_ok());
let res = register_puppet::<PuppetActor, MasterActor>(&pptr);
assert!(res.is_ok());
let master_pid = Pid::new::<MasterActor>();
let puppet_pid = Pid::new::<PuppetActor>();
assert!(pptr
.puppet_has_permission_by_pid(master_pid, puppet_pid)
.is_some());
let master_pid = Pid::new::<PuppetActor>();
assert!(pptr
.puppet_has_permission_by_pid(master_pid, puppet_pid)
.is_none());
}
#[tokio::test]
async fn test_get_master_by_pid() {
let pptr = Puppeteer::new();
let res = register_puppet::<MasterActor, MasterActor>(&pptr);
assert!(res.is_ok());
let res = register_puppet::<PuppetActor, MasterActor>(&pptr);
assert!(res.is_ok());
let master_pid = Pid::new::<MasterActor>();
let puppet_pid = Pid::new::<PuppetActor>();
assert_eq!(pptr.get_puppet_master_by_pid(puppet_pid), Some(master_pid));
assert_eq!(pptr.get_puppet_master_by_pid(master_pid), Some(master_pid));
}
#[tokio::test]
async fn test_set_master_by_pid() {
let pptr = Puppeteer::new();
let res = register_puppet::<MasterActor, MasterActor>(&pptr);
assert!(res.is_ok());
let res = register_puppet::<PuppetActor, PuppetActor>(&pptr);
assert!(res.is_ok());
let master_pid = Pid::new::<MasterActor>();
let puppet_pid = Pid::new::<PuppetActor>();
assert!(pptr
.set_puppet_master_by_pid(puppet_pid, master_pid, puppet_pid)
.is_ok());
assert_eq!(pptr.get_puppet_master_by_pid(puppet_pid), Some(master_pid));
assert_eq!(pptr.get_puppet_master_by_pid(master_pid), Some(master_pid));
}
#[tokio::test]
async fn test_get_puppets_by_pid() {
let pptr = Puppeteer::new();
let res = register_puppet::<MasterActor, MasterActor>(&pptr);
res.unwrap();
let res = register_puppet::<PuppetActor, MasterActor>(&pptr);
res.unwrap();
let master_pid = Pid::new::<MasterActor>();
let puppet_pid = Pid::new::<PuppetActor>();
let puppets = pptr.get_puppets_by_pid(master_pid).unwrap();
assert_eq!(puppets.len(), 2);
assert!(puppets.contains(&puppet_pid));
}
#[tokio::test]
async fn test_detach_puppet_by_pid() {
let pptr = Puppeteer::new();
let res = register_puppet::<MasterActor, MasterActor>(&pptr);
res.unwrap();
let res = register_puppet::<PuppetActor, MasterActor>(&pptr);
res.unwrap();
let master_pid = Pid::new::<MasterActor>();
let puppet_pid = Pid::new::<PuppetActor>();
assert!(pptr.detach_puppet_by_pid(master_pid, puppet_pid).is_ok());
assert_eq!(pptr.get_puppet_master_by_pid(puppet_pid), Some(puppet_pid));
assert_eq!(pptr.get_puppet_master_by_pid(master_pid), Some(master_pid));
}
#[tokio::test]
async fn test_delete_puppet_by_pid() {
let pptr = Puppeteer::new();
let res = register_puppet::<MasterActor, MasterActor>(&pptr);
res.unwrap();
let res = register_puppet::<PuppetActor, MasterActor>(&pptr);
res.unwrap();
let master_pid = Pid::new::<MasterActor>();
let puppet_pid = Pid::new::<PuppetActor>();
assert!(pptr.delete_puppet_by_pid(master_pid, puppet_pid).is_ok());
assert!(pptr.get_postman::<PuppetActor>().is_none());
assert!(pptr.get_postman::<MasterActor>().is_some());
}
#[tokio::test]
async fn test_spawn() {
let pptr = Puppeteer::new();
let res = pptr.spawn::<_, PuppetActor>(PuppetActor::default()).await;
res.unwrap();
let res = pptr.spawn::<_, MasterActor>(PuppetActor::default()).await;
res.unwrap_err();
}
#[tokio::test]
async fn test_send() {
let pptr = Puppeteer::new();
let res = pptr.spawn::<_, PuppetActor>(PuppetActor::default()).await;
res.unwrap();
let res = pptr.send::<PuppetActor, PuppetMessage>(PuppetMessage);
res.unwrap();
let res = pptr.send::<MasterActor, MasterMessage>(MasterMessage);
assert!(res.is_err());
}
#[tokio::test]
async fn test_ask() {
let pptr = Puppeteer::new();
let res = pptr.spawn_self(PuppetActor::default()).await;
res.unwrap();
let res = pptr.ask::<PuppetActor, _>(PuppetMessage).await;
res.unwrap();
let res = pptr.ask::<MasterActor, _>(MasterMessage).await;
assert!(res.is_err());
}
#[tokio::test]
async fn test_ask_with_timeout() {
let pptr = Puppeteer::new();
let res = pptr.spawn_self(PuppetActor::default()).await;
assert!(res.is_ok());
let res = pptr
.ask_with_timeout::<PuppetActor, _>(PuppetMessage, Duration::from_secs(1))
.await;
assert!(res.is_ok());
let res = pptr
.ask_with_timeout::<MasterActor, _>(MasterMessage, Duration::from_secs(1))
.await;
assert!(res.is_err());
}
#[tokio::test]
async fn self_mutate_puppet() {
#[derive(Debug, Clone, Default)]
pub struct CounterPuppet {
counter: i32,
}
impl Puppet for CounterPuppet {
type Supervision = OneForAll;
async fn reset(&self, _ctx: &Context<Self>) -> Result<Self, CriticalError> {
Ok(CounterPuppet::default())
}
}
#[derive(Debug)]
pub struct IncrementCounter;
impl Handler<IncrementCounter> for CounterPuppet {
type Response = i32;
type Executor = SequentialExecutor;
async fn handle_message(
&mut self,
_msg: IncrementCounter,
ctx: &Context<Self>,
) -> Result<Self::Response, PuppetError> {
println!("Counter: {}", self.counter);
if self.counter < 10 {
self.counter += 1;
ctx.send::<Self, _>(IncrementCounter)?;
} else {
ctx.send::<Self, _>(DebugCounterPuppet)?;
}
Ok(self.counter)
}
}
#[derive(Debug)]
pub struct DebugCounterPuppet;
impl Handler<DebugCounterPuppet> for CounterPuppet {
type Response = i32;
type Executor = SequentialExecutor;
async fn handle_message(
&mut self,
_msg: DebugCounterPuppet,
_ctx: &Context<Self>,
) -> Result<Self::Response, PuppetError> {
Ok(self.counter)
}
}
let pptr = Puppeteer::new();
let address = pptr.spawn_self(CounterPuppet::default()).await.unwrap();
address.send(IncrementCounter).unwrap();
tokio::time::sleep(Duration::from_secs(1)).await;
let x = address.ask(DebugCounterPuppet).await.unwrap();
assert_eq!(x, 10);
}
#[tokio::test]
#[allow(unused_assignments)]
async fn test_successful_recovery_after_failure() {
let pptr = Puppeteer::new();
let res = pptr.spawn_self(PuppetActor::default()).await;
res.unwrap();
let mut success = false;
loop {
let res = pptr.ask::<PuppetActor, _>(PuppetFailingMessage).await;
if res.is_ok() {
success = true;
break;
}
}
assert!(success);
tokio::time::sleep(Duration::from_secs(1)).await;
}
#[tokio::test]
#[allow(unused_assignments)]
async fn test_failed_recovery_after_failure() {
let pptr = Puppeteer::new();
let res = pptr.spawn_self(MasterActor::default()).await;
res.unwrap();
let mut success = false;
loop {
let res = pptr.ask::<MasterActor, _>(MasterFailingMessage).await;
if res.is_err() {
success = true;
break;
}
}
assert!(success);
tokio::time::sleep(Duration::from_secs(1)).await;
}
#[tokio::test]
async fn test_wait_for_unrecoverable_failure() {
let pptr = Puppeteer::new();
let res = pptr.spawn_self(MasterActor::default()).await;
assert!(res.is_ok());
for _ in 0..3 {
let _ = pptr.ask::<MasterActor, _>(MasterFailingMessage).await;
}
let err = pptr.wait_for_unrecoverable_failure().await;
assert_eq!(err.puppet, Pid::new::<MasterActor>());
assert_eq!(err.message, "Failed to reset MasterActor");
}
#[tokio::test]
#[should_panic(expected = "Unrecoverable error encountered")]
async fn test_wait_for_unrecoverable_failure_panic() {
let pptr = Puppeteer::new();
let res = pptr.spawn_self(MasterActor::default()).await;
assert!(res.is_ok());
for _ in 0..3 {
let _ = pptr.ask::<MasterActor, _>(MasterFailingMessage).await;
}
let err = pptr.wait_for_unrecoverable_failure().await;
panic!("Unrecoverable error encountered: {err:?}");
}
#[tokio::test]
async fn test_wait_for_unrecoverable_failure_timeout() {
let pptr = Puppeteer::new();
let res = pptr.spawn_self(PuppetActor::default()).await;
assert!(res.is_ok());
let result = tokio::time::timeout(
Duration::from_millis(100),
pptr.wait_for_unrecoverable_failure(),
)
.await;
assert!(result.is_err());
}
#[tokio::test]
#[should_panic(expected = "Unrecoverable error encountered")]
async fn test_unrecoverable_error_inside_puppet() {
#[derive(Debug, Clone, Default)]
struct UnrecoverablePuppet;
#[derive(Debug)]
struct UnrecoverableMessage;
impl Puppet for UnrecoverablePuppet {
type Supervision = OneForAll;
async fn reset(&self, ctx: &Context<Self>) -> Result<Self, CriticalError> {
Err(CriticalError::new(
ctx.pid,
"Failed to reset UnrecoverablePuppet",
))
}
}
impl Handler<UnrecoverableMessage> for UnrecoverablePuppet {
type Response = ();
type Executor = SequentialExecutor;
async fn handle_message(
&mut self,
msg: UnrecoverableMessage,
ctx: &Context<Self>,
) -> Result<Self::Response, PuppetError> {
Err(ctx.critical_error("Unrecoverable error"))
}
}
let pptr = Puppeteer::new();
let res = pptr.spawn_self(UnrecoverablePuppet).await;
assert!(res.is_ok());
pptr.send::<UnrecoverablePuppet, _>(UnrecoverableMessage)
.unwrap();
let result = pptr.wait_for_unrecoverable_failure().await;
panic!("Unrecoverable error encountered: {result:?}");
}
#[tokio::test]
async fn test_puppet_self_send_msg() {
#[derive(Debug, Clone, Default)]
struct SelfSendPuppet {
i: i32,
}
#[derive(Debug)]
struct SelfSendMessage;
impl Puppet for SelfSendPuppet {
type Supervision = OneForAll;
}
impl Handler<SelfSendMessage> for SelfSendPuppet {
type Response = i32;
type Executor = SequentialExecutor;
async fn handle_message(
&mut self,
msg: SelfSendMessage,
ctx: &Context<Self>,
) -> Result<Self::Response, PuppetError> {
if self.i < 10 {
dbg!(self.i);
self.i += 1;
Ok(ctx.ask::<SelfSendPuppet, _>(SelfSendMessage).await?)
} else {
dbg!("finish");
Ok(self.i)
}
}
}
let pptr = Puppeteer::new();
let res = pptr.spawn_self(SelfSendPuppet { i: 0 }).await;
assert!(res.is_ok());
pptr.send::<SelfSendPuppet, _>(SelfSendMessage).unwrap();
tokio::time::sleep(Duration::from_secs(1)).await;
}
}