use crate::actors::director::ActorsDirector;
use crate::actors::envelope::ManagerEnvelope;
use crate::actors::proxy::{ActorProxy, ActorReport};
use crate::system_director::SystemDirector;
use crate::Actor;
use async_channel::{unbounded as channel, Receiver, Sender};
use async_std::{sync::Arc, task};
use dashmap::mapref::entry::Entry;
use dashmap::DashMap;
use std::any::Any;
use std::any::TypeId;
use std::fmt::Debug;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
#[async_trait::async_trait]
pub(crate) trait Manager: Send + Sync + Debug {
fn end(&self);
fn get_type_id(&self) -> TypeId;
fn get_statistics(&self) -> ActorsManagerReport;
fn get_sender_as_any(&self) -> Box<dyn Any>;
fn is_empty(&self) -> bool;
fn remove_actor(&self, actor_id: Box<dyn Any + Send>);
}
#[derive(Debug)]
pub(crate) enum ActorManagerProxyCommand<A: Actor> {
Dispatch(Box<dyn ManagerEnvelope<Actor = A>>),
DispatchToAll(Box<dyn ManagerEnvelope<Actor = A>>),
EndActor(A::Id),
}
pub(crate) type ActorsManagerReport = Vec<ActorReport>;
#[derive(Debug)]
pub(crate) struct ActorsManager<A: Actor> {
actors: Arc<DashMap<A::Id, ActorProxy<A>>>,
sender: Sender<ActorManagerProxyCommand<A>>,
is_ending: Arc<AtomicBool>,
actors_director: ActorsDirector,
}
impl<A: Actor> ActorsManager<A> {
pub fn new(
actors_director: ActorsDirector,
system_director: SystemDirector,
innactivity_duration_until_end: Duration,
) -> ActorsManager<A> {
let (sender, receiver) = channel::<ActorManagerProxyCommand<A>>();
let actors = Arc::new(DashMap::new());
let is_ending = Arc::new(AtomicBool::new(false));
let manager = ActorsManager {
actors: actors.clone(),
sender,
is_ending: is_ending.clone(),
actors_director: actors_director.clone(),
};
task::spawn(actor_manager_loop(
receiver,
actors,
actors_director,
manager.clone(),
is_ending,
system_director,
innactivity_duration_until_end,
));
manager
}
pub(crate) fn end(&self) {
self.is_ending.store(true, Ordering::Relaxed);
for actor in self.actors.iter() {
actor.end();
}
}
pub(crate) async fn signal_actor_removed(&self) {
if !self.is_ready_to_be_removed() {
return;
}
let entry = self
.actors_director
.get_blocking_manager_entry(std::any::TypeId::of::<A>());
if !self.is_ready_to_be_removed() {
return;
}
if let Entry::Occupied(entry) = entry {
entry.remove();
self.actors_director.signal_manager_removed().await;
}
}
fn is_ready_to_be_removed(&self) -> bool {
self.is_ending.load(Ordering::Acquire) && self.actors.is_empty() && self.sender.is_empty()
}
pub(crate) fn get_sender(&self) -> Sender<ActorManagerProxyCommand<A>> {
self.sender.clone()
}
pub(crate) fn get_type_id(&self) -> TypeId {
TypeId::of::<A>()
}
pub(crate) fn get_statistics(&self) -> ActorsManagerReport {
let mut report = vec![];
for actor in self.actors.iter() {
report.push(actor.get_report());
}
report
}
pub(crate) fn remove_actor(&self, actor_id: A::Id) {
self.actors.remove(&actor_id);
}
pub(crate) fn get_blocking_actor_entry(&self, id: A::Id) -> Entry<A::Id, ActorProxy<A>> {
self.actors.entry(id)
}
fn is_empty(&self) -> bool {
self.actors.is_empty() && self.sender.is_empty()
}
}
async fn actor_manager_loop<'a, A: Actor>(
receiver: Receiver<ActorManagerProxyCommand<A>>,
actors: Arc<DashMap<A::Id, ActorProxy<A>>>,
actors_director: ActorsDirector,
manager: ActorsManager<A>,
is_ending: Arc<AtomicBool>,
system_director: SystemDirector,
innactivity_duration_until_end: Duration,
) {
while let Ok(command) = receiver.recv().await {
match command {
ActorManagerProxyCommand::Dispatch(command) => {
process_dispatch_command(
command,
&actors,
&actors_director,
&manager,
&is_ending,
&system_director,
&innactivity_duration_until_end,
)
.await;
}
ActorManagerProxyCommand::DispatchToAll(command) => {
process_dispatch_all_command(command, &actors).await;
}
ActorManagerProxyCommand::EndActor(actor_id) => {
process_end_actor_command(actor_id, &actors).await;
}
}
}
}
async fn process_end_actor_command<'a, A: Actor>(
actor_id: A::Id,
actors: &'a Arc<DashMap<A::Id, ActorProxy<A>>>,
) {
if let Some(actor) = actors.get_mut(&actor_id) {
actor.end();
}
}
async fn process_dispatch_command<'a, A: Actor>(
mut command: Box<dyn ManagerEnvelope<Actor = A>>,
actors: &'a Arc<DashMap<A::Id, ActorProxy<A>>>,
actors_director: &'a ActorsDirector,
manager: &'a ActorsManager<A>,
is_ending: &'a Arc<AtomicBool>,
system_director: &'a SystemDirector,
innactivity_duration_until_end: &'a Duration,
) {
let actor_id = command.get_actor_id();
if let Some(mut actor) = actors.get_mut(&actor_id) {
command.deliver(&mut actor).await;
return;
}
let mut actor = ActorProxy::<A>::new(
system_director.clone(),
actors_director.clone(),
manager.clone(),
actor_id.clone(),
*innactivity_duration_until_end,
);
command.deliver(&mut actor).await;
if is_ending.load(Ordering::Relaxed) {
actor.end();
}
actors.insert(actor_id, actor);
}
async fn process_dispatch_all_command<'a, A: Actor>(
mut command: Box<dyn ManagerEnvelope<Actor = A>>,
actors: &'a Arc<DashMap<A::Id, ActorProxy<A>>>,
) {
for mut actor in actors.iter_mut() {
command.deliver(&mut actor).await;
}
}
impl<A: Actor> Clone for ActorsManager<A> {
fn clone(&self) -> Self {
ActorsManager {
actors: self.actors.clone(),
sender: self.sender.clone(),
is_ending: self.is_ending.clone(),
actors_director: self.actors_director.clone(),
}
}
}
#[async_trait::async_trait]
impl<A: Actor> Manager for ActorsManager<A> {
fn end(&self) {
ActorsManager::<A>::end(self)
}
fn get_type_id(&self) -> TypeId {
ActorsManager::<A>::get_type_id(self)
}
fn get_statistics(&self) -> ActorsManagerReport {
ActorsManager::<A>::get_statistics(self)
}
fn is_empty(&self) -> bool {
ActorsManager::<A>::is_empty(self)
}
fn get_sender_as_any(&self) -> Box<dyn Any> {
Box::new(ActorsManager::<A>::get_sender(self))
}
fn remove_actor(&self, actor_id: Box<dyn Any + Send>) {
if let Ok(actor_id) = actor_id.downcast::<A::Id>() {
ActorsManager::<A>::remove_actor(self, *actor_id)
}
}
}