use std::collections::HashMap;
use std::collections::HashSet;
use dashmap::mapref::entry::Entry::Occupied;
use dashmap::DashMap;
use once_cell::sync::OnceCell;
use crate::ActorCell;
use crate::ActorId;
use crate::ActorStatus;
use crate::GroupName;
use crate::ScopeName;
use crate::SupervisionEvent;
pub const DEFAULT_SCOPE: &str = "__default_scope__";
pub const ALL_SCOPES_NOTIFICATION: &str = "__world_scope__";
pub const ALL_GROUPS_NOTIFICATION: &str = "__world_group_";
#[cfg(test)]
mod tests;
#[derive(Clone, Debug)]
pub enum GroupChangeMessage {
Join(ScopeName, GroupName, Vec<ActorCell>),
Leave(ScopeName, GroupName, Vec<ActorCell>),
}
impl GroupChangeMessage {
pub fn get_group(&self) -> GroupName {
match self {
Self::Join(_, name, _) => name.clone(),
Self::Leave(_, name, _) => name.clone(),
}
}
pub fn get_scope(&self) -> ScopeName {
match self {
Self::Join(scope, _, _) => scope.to_string(),
Self::Leave(scope, _, _) => scope.to_string(),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct ScopeGroupKey {
scope: ScopeName,
group: GroupName,
}
impl ScopeGroupKey {
pub fn get_scope(&self) -> ScopeName {
self.scope.to_owned()
}
pub fn get_group(&self) -> GroupName {
self.group.to_owned()
}
}
#[derive(Default)]
struct GroupState {
members: HashMap<ActorId, ActorCell>,
listeners: Vec<ActorCell>,
}
struct PgState {
map: DashMap<ScopeGroupKey, GroupState>,
index: DashMap<ScopeName, HashSet<GroupName>>,
world_listeners: DashMap<ScopeGroupKey, Vec<ActorCell>>,
}
static PG_MONITOR: OnceCell<PgState> = OnceCell::new();
fn get_monitor<'a>() -> &'a PgState {
PG_MONITOR.get_or_init(|| PgState {
map: DashMap::new(),
index: DashMap::new(),
world_listeners: DashMap::new(),
})
}
fn notify_world_listeners(
monitor: &PgState,
scope: &ScopeName,
group: &GroupName,
actors: &[ActorCell],
is_join: bool,
) {
let scoped_key = ScopeGroupKey {
scope: scope.to_owned(),
group: ALL_GROUPS_NOTIFICATION.to_owned(),
};
let global_key = ScopeGroupKey {
scope: ALL_SCOPES_NOTIFICATION.to_owned(),
group: ALL_GROUPS_NOTIFICATION.to_owned(),
};
for key in [scoped_key, global_key] {
let listeners = monitor
.world_listeners
.get(&key)
.map(|entry| entry.value().clone());
if let Some(listeners) = listeners {
let change = if is_join {
GroupChangeMessage::Join(scope.to_owned(), group.clone(), actors.to_vec())
} else {
GroupChangeMessage::Leave(scope.to_owned(), group.clone(), actors.to_vec())
};
for listener in &listeners {
let _ = listener
.send_supervisor_evt(SupervisionEvent::ProcessGroupChanged(change.clone()));
}
}
}
}
pub fn join(group: GroupName, actors: Vec<ActorCell>) {
join_scoped(DEFAULT_SCOPE.to_owned(), group, actors);
}
pub fn join_scoped(scope: ScopeName, group: GroupName, actors: Vec<ActorCell>) {
let key = ScopeGroupKey {
scope: scope.to_owned(),
group: group.to_owned(),
};
let monitor = get_monitor();
let actors: Vec<ActorCell> = actors
.into_iter()
.filter(|a| (a.get_status() as u8) <= (ActorStatus::Draining as u8))
.collect();
if actors.is_empty() {
return;
}
let listeners = {
let mut entry = monitor.map.entry(key).or_default();
let gs = entry.value_mut();
for actor in actors.iter() {
gs.members.insert(actor.get_id(), actor.clone());
}
gs.listeners.clone()
};
monitor
.index
.entry(scope.to_owned())
.or_default()
.insert(group.to_owned());
for listener in &listeners {
let _ = listener.send_supervisor_evt(SupervisionEvent::ProcessGroupChanged(
GroupChangeMessage::Join(scope.to_owned(), group.clone(), actors.clone()),
));
}
notify_world_listeners(monitor, &scope, &group, &actors, true);
}
pub fn leave(group: GroupName, actors: Vec<ActorCell>) {
leave_scoped(DEFAULT_SCOPE.to_owned(), group, actors);
}
pub fn leave_scoped(scope: ScopeName, group: GroupName, actors: Vec<ActorCell>) {
let key = ScopeGroupKey {
scope: scope.to_owned(),
group: group.to_owned(),
};
let monitor = get_monitor();
let result = if let Occupied(mut occupied) = monitor.map.entry(key) {
let gs = occupied.get_mut();
for actor in actors.iter() {
gs.members.remove(&actor.get_id());
}
let listeners = gs.listeners.clone();
let all_members_left = gs.members.is_empty();
let fully_empty = all_members_left && gs.listeners.is_empty();
if fully_empty {
occupied.remove();
}
Some((listeners, all_members_left))
} else {
None
};
let Some((listeners, all_members_left)) = result else {
return;
};
if all_members_left {
if let Some(mut groups) = monitor.index.get_mut(&scope) {
groups.remove(&group);
if groups.is_empty() {
drop(groups);
monitor.index.remove(&scope);
}
}
}
for listener in &listeners {
let _ = listener.send_supervisor_evt(SupervisionEvent::ProcessGroupChanged(
GroupChangeMessage::Leave(scope.to_owned(), group.clone(), actors.clone()),
));
}
notify_world_listeners(monitor, &scope, &group, &actors, false);
}
pub(crate) fn leave_all(actor: ActorId) {
let monitor = get_monitor();
let mut removal_events: Vec<(ScopeGroupKey, ActorCell, Vec<ActorCell>)> = vec![];
let mut empty_member_keys: Vec<ScopeGroupKey> = vec![];
for mut kv in monitor.map.iter_mut() {
let key = kv.key().clone();
let gs = kv.value_mut();
if let Some(actor_cell) = gs.members.remove(&actor) {
let listeners = gs.listeners.clone();
removal_events.push((key.clone(), actor_cell, listeners));
}
if gs.members.is_empty() {
empty_member_keys.push(key);
}
}
for key in empty_member_keys {
if let Occupied(entry) = monitor.map.entry(key.clone()) {
if entry.get().members.is_empty() {
if let Some(mut groups) = monitor.index.get_mut(&key.scope) {
groups.remove(&key.group);
if groups.is_empty() {
drop(groups);
monitor.index.remove(&key.scope);
}
}
if entry.get().listeners.is_empty() {
entry.remove();
}
}
}
}
for (scope_and_group, cell, per_group_listeners) in removal_events.iter() {
for listener in per_group_listeners {
let _ = listener.send_supervisor_evt(SupervisionEvent::ProcessGroupChanged(
GroupChangeMessage::Leave(
scope_and_group.scope.clone(),
scope_and_group.group.clone(),
vec![cell.clone()],
),
));
}
notify_world_listeners(
monitor,
&scope_and_group.scope,
&scope_and_group.group,
std::slice::from_ref(cell),
false,
);
}
}
pub fn get_local_members(group: &GroupName) -> Vec<ActorCell> {
get_scoped_local_members(&DEFAULT_SCOPE.to_owned(), group)
}
pub fn get_scoped_local_members(scope: &ScopeName, group: &GroupName) -> Vec<ActorCell> {
let key = ScopeGroupKey {
scope: scope.to_owned(),
group: group.to_owned(),
};
let monitor = get_monitor();
if let Some(gs) = monitor.map.get(&key) {
gs.value()
.members
.values()
.filter(|a| a.get_id().is_local())
.cloned()
.collect::<Vec<_>>()
} else {
vec![]
}
}
pub fn get_members(group_name: &GroupName) -> Vec<ActorCell> {
get_scoped_members(&DEFAULT_SCOPE.to_owned(), group_name)
}
pub fn get_scoped_members(scope: &ScopeName, group: &GroupName) -> Vec<ActorCell> {
let key = ScopeGroupKey {
scope: scope.to_owned(),
group: group.to_owned(),
};
let monitor = get_monitor();
if let Some(gs) = monitor.map.get(&key) {
gs.value().members.values().cloned().collect::<Vec<_>>()
} else {
vec![]
}
}
pub fn which_groups() -> Vec<GroupName> {
let monitor = get_monitor();
let mut groups = monitor
.map
.iter()
.filter(|kvp| !kvp.value().members.is_empty())
.map(|kvp| kvp.key().group.to_owned())
.collect::<Vec<_>>();
groups.sort_unstable();
groups.dedup();
groups
}
pub fn which_scoped_groups(scope: &ScopeName) -> Vec<GroupName> {
let monitor = get_monitor();
match monitor.index.get(scope) {
Some(groups) => groups.iter().cloned().collect(),
None => vec![],
}
}
pub fn which_scopes_and_groups() -> Vec<ScopeGroupKey> {
let monitor = get_monitor();
monitor
.map
.iter()
.filter(|kvp| !kvp.value().members.is_empty())
.map(|kvp| kvp.key().clone())
.collect::<Vec<_>>()
}
pub fn which_scopes() -> Vec<ScopeName> {
let monitor = get_monitor();
monitor
.map
.iter()
.filter(|kvp| !kvp.value().members.is_empty())
.map(|kvp| kvp.key().scope.to_owned())
.collect::<Vec<_>>()
}
pub fn monitor(group: GroupName, actor: ActorCell) {
let key = ScopeGroupKey {
scope: DEFAULT_SCOPE.to_owned(),
group,
};
let monitor = get_monitor();
monitor.map.entry(key).or_default().listeners.push(actor);
}
pub fn monitor_scope(scope: ScopeName, actor: ActorCell) {
let key = ScopeGroupKey {
scope: scope.to_owned(),
group: ALL_GROUPS_NOTIFICATION.to_owned(),
};
let monitor = get_monitor();
monitor.world_listeners.entry(key).or_default().push(actor);
}
pub fn demonitor(group_name: GroupName, actor: ActorId) {
let key = ScopeGroupKey {
scope: DEFAULT_SCOPE.to_owned(),
group: group_name,
};
let monitor = get_monitor();
if let Some(mut gs) = monitor.map.get_mut(&key) {
gs.listeners.retain(|a| a.get_id() != actor);
}
}
pub fn demonitor_scope(scope: ScopeName, actor: ActorId) {
let key = ScopeGroupKey {
scope: scope.to_owned(),
group: ALL_GROUPS_NOTIFICATION.to_owned(),
};
let monitor = get_monitor();
if let Occupied(mut entry) = monitor.world_listeners.entry(key) {
let listeners = entry.get_mut();
listeners.retain(|a| a.get_id() != actor);
if listeners.is_empty() {
entry.remove();
}
}
}
pub(crate) fn demonitor_all(actor: ActorId) {
let monitor = get_monitor();
let mut maybe_empty = vec![];
for mut kv in monitor.map.iter_mut() {
let gs = kv.value_mut();
gs.listeners.retain(|a| a.get_id() != actor);
if gs.members.is_empty() && gs.listeners.is_empty() {
maybe_empty.push(kv.key().clone());
}
}
for key in maybe_empty {
if let Occupied(entry) = monitor.map.entry(key.clone()) {
if entry.get().members.is_empty() && entry.get().listeners.is_empty() {
entry.remove();
if let Some(mut groups) = monitor.index.get_mut(&key.scope) {
groups.remove(&key.group);
if groups.is_empty() {
drop(groups);
monitor.index.remove(&key.scope);
}
}
}
}
}
let mut empty_world_keys = vec![];
for mut kv in monitor.world_listeners.iter_mut() {
kv.value_mut().retain(|a| a.get_id() != actor);
if kv.value().is_empty() {
empty_world_keys.push(kv.key().clone());
}
}
for key in empty_world_keys {
monitor.world_listeners.remove(&key);
}
}