use std::any::Any;
use std::fmt::{Debug};
use std::sync::{Arc, Mutex};
use dashmap::DashMap;
use thiserror::Error;
use tokio::task::JoinHandle;
use tokio::time::sleep;
use tracing::{error, info, instrument};
use crate::actor::{Actor, ExitReason};
use crate::address::Addr;
use crate::message::BroadcastMessage;
use crate::supervision::{SuperVisionAction, SupervisionStrategy};
use crate::testing::TestActor;
pub struct ActorSystem {
registry: DashMap<String, Addr>,
join_handles: Mutex<Vec<JoinHandle<()>>>
}
#[derive(Error, Debug)]
pub enum ActorSystemError {
#[error("An actor with the same name already exists in the registry!")]
ActorNameAlreadyInUse,
#[error("This actor has not been spawned yet!")]
ActorNotSpawnedYet
}
impl ActorSystem {
#[instrument]
pub fn new() -> Arc<Self> {
Arc::new(Self {
registry: DashMap::new(),
join_handles: Mutex::new(Vec::new())
})
}
#[instrument(skip(self, actor), fields(actor_name = %name))]
pub fn spawn<S: Send>(self: &Arc<Self>, mut actor: Actor<S>, name: String) -> Result<(), ActorSystemError> {
let name_backup = name.clone();
if self.registry.contains_key(&name) {
error!("Actor with same name already exists in this actor system!");
return Err(ActorSystemError::ActorNameAlreadyInUse);
}
actor.set_actor_sys(self.clone());
self.registry.insert(name, actor.get_addr());
let sys_ref = self.clone();
let run_handle = tokio::spawn(async move {
let actor_exit_reason = actor.run().await;
match actor_exit_reason {
_ => {
info!("Actor without supervision died! Cleaning up resources and removing actor {} from system", &name_backup);
sys_ref.registry.remove(&name_backup);
return;
}
}
});
let mut join_h = self.join_handles.lock().unwrap();
join_h.push(run_handle);
Ok(())
}
#[instrument(skip(self, actor, supervision_strategy), fields(actor_name = %name))]
pub fn spawn_with_supervision<S: Send + Clone>(self: &Arc<Self>, mut actor: Actor<S>, mut supervision_strategy: Box<dyn SupervisionStrategy<S> + Send>, name: String) -> Result<(), ActorSystemError> {
if self.registry.contains_key(&name) {
error!("Actor with same name already exists in this actor system!");
return Err(ActorSystemError::ActorNameAlreadyInUse);
}
actor.set_actor_sys(self.clone());
info!("Creating backup of actors initial state and behavior");
let actor_backup = actor.create_backup();
let name_backup = name.clone();
self.registry.insert(name, actor.get_addr());
let sys_ref = self.clone();
let join_handle = tokio::spawn(async move {
loop {
let actor_exit_reason = actor.run().await;
info!("Actor exited run loop with reason: {:?}", actor_exit_reason);
let supervision_action = supervision_strategy.apply(actor_exit_reason, &actor_backup, &mut actor);
info!("Supervision action: {:?}", &supervision_action);
match supervision_action {
SuperVisionAction::Exit => {
info!("Cleaning up resources and removing actor {} from system", &name_backup);
sys_ref.registry.remove(&name_backup);
return;
}
SuperVisionAction::Restart => {
info!("Trying to restart the actor with its initial state and behavior");
}
SuperVisionAction::RestartDelayed(delay) => {
info!("Trying to restart the actor with its initial state and behavior after a delay of {}ms", delay.as_millis());
sleep(delay).await;
}
}
}
});
let mut join_h = self.join_handles.lock().unwrap();
join_h.push(join_handle);
Ok(())
}
pub fn stop(self: &Arc<Self>) {
self.registry.clear();
let mut join_h = self.join_handles.lock().unwrap();
for jh in join_h.iter_mut() {
jh.abort();
}
}
#[instrument(skip_all)]
pub async fn start(&self) {
while self.registry.len() > 0 {
}
}
pub fn query(self: &Arc<Self>, name: &str) -> Option<Addr> {
match self.registry.get(name) {
None => {
None
}
Some(addr) => {
Some(addr.clone())
}
}
}
pub fn broadcast_tell<M: Send + Any + Clone>(&self, msg: M) {
let broadcast_msg = BroadcastMessage::without_sender(msg);
for addr in self.registry.iter_mut() {
addr.send(broadcast_msg.get_message());
}
}
pub fn broadcast_ask<M: Send + Any + Clone>(&self, msg: M, reply_to: Addr) {
let broadcast_msg = BroadcastMessage::with_sender(msg, reply_to);
for addr in self.registry.iter_mut() {
addr.send(broadcast_msg.get_message());
}
}
pub async fn spawn_test<S: Send>(self: &Arc<Self>, mut actor: Actor<TestActor<S>>) -> bool {
let name = "test_actor".to_string();
let name_backup = name.clone();
actor.set_actor_sys(self.clone());
self.registry.insert(name, actor.get_addr());
let sys_ref = self.clone();
let test_result = tokio::spawn(async move {
let actor_exit_reason = actor.run().await;
match actor_exit_reason {
ExitReason::Kill => {
info!("ActorTest {} passed successfully", &name_backup);
sys_ref.registry.remove(&name_backup);
return true;
}
ExitReason::Restart => {
return false;
}
ExitReason::Error => {
info!("ActorTest {} failed with error.", &name_backup);
sys_ref.registry.remove(&name_backup);
return false;
}
}
}).await;
match test_result {
Ok(res) => {
return res;
}
Err(_) => {
return false;
}
}
}
}