use std::collections::HashMap;
use std::sync::Mutex;
use super::actor_cell::ActorCell;
use super::messages::SupervisionEvent;
use crate::ActorId;
#[derive(Default, Debug)]
pub(crate) struct SupervisionTree {
children: Mutex<Option<HashMap<ActorId, ActorCell>>>,
supervisor: Mutex<Option<ActorCell>>,
#[cfg(feature = "monitors")]
monitors: Mutex<Option<HashMap<ActorId, ActorCell>>>,
}
impl SupervisionTree {
pub(crate) fn insert_child(&self, child: ActorCell) {
let mut guard = self.children.lock().unwrap();
if let Some(map) = &mut *(guard) {
map.insert(child.get_id(), child);
} else {
*guard = Some(HashMap::from_iter([(child.get_id(), child)]));
}
}
pub(crate) fn remove_child(&self, child: ActorId) {
let mut guard = self.children.lock().unwrap();
if let Some(map) = &mut *(guard) {
map.remove(&child);
}
}
pub(crate) fn set_supervisor(&self, parent: ActorCell) {
*(self.supervisor.lock().unwrap()) = Some(parent);
}
pub(crate) fn clear_supervisor(&self) {
*(self.supervisor.lock().unwrap()) = None;
}
pub(crate) fn try_get_supervisor(&self) -> Option<ActorCell> {
self.supervisor.lock().unwrap().clone()
}
#[cfg(feature = "monitors")]
pub(crate) fn set_monitor(&self, who: ActorCell) {
let mut guard = self.monitors.lock().unwrap();
if let Some(map) = &mut *guard {
map.insert(who.get_id(), who);
} else {
*guard = Some(HashMap::from_iter([(who.get_id(), who)]))
}
}
#[cfg(feature = "monitors")]
pub(crate) fn remove_monitor(&self, who: ActorId) {
let mut guard = self.monitors.lock().unwrap();
if let Some(map) = &mut *guard {
map.remove(&who);
if map.is_empty() {
*guard = None;
}
}
}
pub(crate) fn terminate_all_children(&self) {
let mut guard = self.children.lock().unwrap();
let cells = if let Some(map) = &mut *guard {
map.values().cloned().collect()
} else {
vec![]
};
*guard = None;
drop(guard);
for cell in cells {
cell.terminate();
cell.clear_supervisor();
}
}
pub(crate) fn stop_all_children(&self, reason: Option<String>) {
self.for_each_child(|cell| {
cell.stop(reason.clone());
});
}
pub(crate) fn drain_all_children(&self) {
self.for_each_child(|cell| {
_ = cell.drain();
});
}
pub(crate) async fn stop_all_children_and_wait(
&self,
reason: Option<String>,
timeout: Option<crate::concurrency::Duration>,
) {
let cells = self.get_children();
let mut js = crate::concurrency::JoinSet::new();
for cell in cells {
let lreason = reason.clone();
let ltimeout = timeout;
js.spawn(async move { cell.stop_and_wait(lreason, ltimeout).await });
}
while let Some(res) = js.join_next().await {
#[cfg(any(
feature = "async-std",
all(target_arch = "wasm32", target_os = "unknown")
))]
if res.is_err() {
panic!("JoinSet join error");
}
#[cfg(not(any(
feature = "async-std",
all(target_arch = "wasm32", target_os = "unknown")
)))]
{
match res {
Err(err) if err.is_panic() => std::panic::resume_unwind(err.into_panic()),
Err(err) => panic!("{err}"),
_ => {}
}
}
}
}
pub(crate) async fn drain_all_children_and_wait(
&self,
timeout: Option<crate::concurrency::Duration>,
) {
let cells = self.get_children();
let mut js = crate::concurrency::JoinSet::new();
for cell in cells {
let ltimeout = timeout;
js.spawn(async move { cell.drain_and_wait(ltimeout).await });
}
while let Some(res) = js.join_next().await {
#[cfg(any(
feature = "async-std",
all(target_arch = "wasm32", target_os = "unknown")
))]
if res.is_err() {
panic!("JoinSet join error");
}
#[cfg(not(any(
feature = "async-std",
all(target_arch = "wasm32", target_os = "unknown")
)))]
{
match res {
Err(err) if err.is_panic() => std::panic::resume_unwind(err.into_panic()),
Err(err) => panic!("{err}"),
_ => {}
}
}
}
}
pub(crate) fn is_child_of(&self, id: ActorId) -> bool {
if let Some(parent) = &*(self.supervisor.lock().unwrap()) {
parent.get_id() == id
} else {
false
}
}
pub(crate) fn get_children(&self) -> Vec<ActorCell> {
let guard = self.children.lock().unwrap();
if let Some(map) = &*guard {
map.values().cloned().collect()
} else {
vec![]
}
}
pub(crate) fn for_each_child<F>(&self, mut f: F)
where
F: FnMut(&ActorCell),
{
let guard = self.children.lock().unwrap();
if let Some(map) = &*guard {
for cell in map.values() {
f(cell);
}
}
}
pub(crate) fn notify_supervisor(&self, evt: SupervisionEvent) {
#[cfg(feature = "monitors")]
let monitor_targets: Vec<ActorCell> = {
let guard = self.monitors.lock().unwrap();
if let Some(monitors) = &*guard {
monitors.values().cloned().collect()
} else {
Vec::new()
}
};
let supervisor_target = {
let guard = self.supervisor.lock().unwrap();
(*guard).clone()
};
#[cfg(feature = "monitors")]
if !monitor_targets.is_empty() {
for monitor in monitor_targets.iter() {
let monitor_evt = evt.clone_no_data();
if monitor.send_supervisor_evt(monitor_evt).is_err() {
let mut guard = self.monitors.lock().unwrap();
if let Some(monitors) = &mut *guard {
monitors.remove(&monitor.get_id());
}
}
}
}
if let Some(parent) = supervisor_target {
_ = parent.send_supervisor_evt(evt);
}
}
#[cfg(test)]
pub(crate) fn get_num_children(&self) -> usize {
let guard = self.children.lock().unwrap();
if let Some(map) = &*guard {
map.len()
} else {
0
}
}
#[cfg(test)]
pub(crate) fn get_num_parents(&self) -> usize {
usize::from(self.supervisor.lock().unwrap().is_some())
}
}