use std::collections::BTreeMap;
use std::collections::BTreeSet;
use std::net::SocketAddr;
use std::sync::Mutex;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use dashmap::DashMap;
use dashmap::mapref::entry::Entry;
use once_cell::sync::Lazy;
use crate::frame::Frame;
use crate::Dest;
use crate::ExitReason;
use crate::Local;
use crate::Node;
use crate::NodeOptions;
use crate::NodeRegistration;
use crate::NodeRemoteSenderMessage;
use crate::NodeState;
use crate::Pid;
use crate::Process;
use crate::ProcessItem;
use crate::Reference;
use crate::alias_destroy;
use crate::link_destroy;
use crate::monitor_destroy;
use crate::node_local_supervisor;
use crate::node_remote_connector;
use crate::process_exit_signal_linked;
use crate::process_sender;
pub const LOCAL_NODE_ID: u64 = 0;
pub const INVALID_NODE_ID: u64 = u64::MAX;
#[derive(Debug)]
enum NodeMonitor {
Node(u64),
ProcessMonitor(u64, Dest),
ProcessMonitorCleanup(u64),
}
static NODE_REGISTRATIONS: Lazy<DashMap<u64, NodeRegistration>> = Lazy::new(DashMap::new);
static NODE_MAP: Lazy<DashMap<Node, u64>> = Lazy::new(DashMap::new);
static NODE_MONITORS: Lazy<DashMap<Node, BTreeMap<Reference, NodeMonitor>>> =
Lazy::new(DashMap::new);
static NODE_LINKS: Lazy<DashMap<Node, BTreeSet<(Pid, u64)>>> = Lazy::new(DashMap::new);
static NODE_PENDING_MESSAGES: Lazy<DashMap<Node, Vec<Frame>>> = Lazy::new(DashMap::new);
static NODE_COOKIE: Mutex<Option<String>> = Mutex::new(None);
static NODE_ID: AtomicU64 = AtomicU64::new(1);
pub fn node_alive() -> bool {
NODE_MAP.contains_key(&Node::Local)
}
pub fn node_local_start(name: String, options: NodeOptions) -> Pid {
let Entry::Vacant(entry) = NODE_MAP.entry(Node::Local) else {
panic!("Local node already started!");
};
let supervisor = Process::spawn(node_local_supervisor(name.clone(), options));
NODE_REGISTRATIONS.insert(
LOCAL_NODE_ID,
NodeRegistration::new(
Some(supervisor),
NodeState::Current,
name,
options.broadcast_address,
),
);
entry.insert(LOCAL_NODE_ID);
supervisor
}
pub fn node_local_stop() {
let Some((_, _)) = NODE_MAP.remove(&Node::Local) else {
panic!("Local node not started!");
};
NODE_MAP.clear();
if let Some(entry) = NODE_REGISTRATIONS.get(&LOCAL_NODE_ID)
&& let Some(supervisor) = entry.supervisor
{
Process::exit(supervisor, ExitReason::Kill);
}
NODE_REGISTRATIONS.clear();
NODE_PENDING_MESSAGES.clear();
}
pub fn node_local_panic() {
NODE_MAP.clear();
NODE_REGISTRATIONS.clear();
NODE_PENDING_MESSAGES.clear();
}
pub fn node_local_process() -> Option<Pid> {
NODE_REGISTRATIONS
.get(&LOCAL_NODE_ID)
.and_then(|process| process.supervisor)
}
pub fn node_register_workers(node: Node, sender: Pid, receiver: Pid) {
let Some(entry) = NODE_MAP.get(&node) else {
return;
};
NODE_REGISTRATIONS.alter(&entry, |_, mut value| {
value.sender = Some(sender);
value.receiver = Some(receiver);
let frames = NODE_PENDING_MESSAGES
.remove(&node)
.map(|pending| pending.1)
.unwrap_or_default();
Process::send(
sender,
NodeRemoteSenderMessage::SendFrames(Local::new(frames)),
);
value
});
}
pub fn node_send_frame(frame: Frame, id: u64) {
let Some(registration) = NODE_REGISTRATIONS.get(&id) else {
return;
};
if let Some(sender) = registration.sender {
Process::send(
sender,
NodeRemoteSenderMessage::SendFrame(Local::new(frame)),
);
} else if !matches!(registration.state, NodeState::Known) {
NODE_PENDING_MESSAGES
.entry(Node::from((
registration.name.clone(),
registration.broadcast_address,
)))
.or_default()
.push(frame);
}
}
pub fn node_accept(node: Node, supervisor: Pid) -> bool {
let Node::Remote(name, address) = node else {
panic!("Can't accept a local node!");
};
let entry = NODE_MAP.entry(Node::from((name.clone(), address)));
match entry {
Entry::Vacant(entry) => {
let next_id = NODE_ID.fetch_add(1, Ordering::Relaxed);
NODE_REGISTRATIONS.insert(
next_id,
NodeRegistration::new(Some(supervisor), NodeState::Connected, name, address),
);
entry.insert(next_id);
true
}
Entry::Occupied(entry) => {
let mut accepted = false;
NODE_REGISTRATIONS.alter(entry.get(), |_, mut value| {
if matches!(value.state, NodeState::Pending)
&& let Some(current_supervisor) = value.supervisor.take()
&& supervisor != current_supervisor
{
Process::exit(current_supervisor, ExitReason::Kill);
}
if value.supervisor.is_none() {
accepted = true;
value.supervisor = Some(supervisor);
value.state = NodeState::Connected;
}
value
});
accepted
}
}
}
pub fn node_register(node: Node, connect: bool) -> u64 {
let Node::Remote(name, address) = node else {
panic!("Can't register a local node!");
};
let node = Node::from((name.clone(), address));
let entry = match NODE_MAP.entry(node.clone()) {
Entry::Vacant(entry) => entry,
Entry::Occupied(entry) => {
let id = *entry.get();
if connect {
NODE_REGISTRATIONS.alter(&id, |_, mut value| {
if value.supervisor.is_none() {
value.supervisor = Some(Process::spawn(node_remote_connector(node)));
value.state = NodeState::Pending;
}
value
});
}
return id;
}
};
let next_id = NODE_ID.fetch_add(1, Ordering::Relaxed);
if connect {
let supervisor = Process::spawn(node_remote_connector(node));
NODE_REGISTRATIONS.insert(
next_id,
NodeRegistration::new(Some(supervisor), NodeState::Pending, name, address),
);
} else {
NODE_REGISTRATIONS.insert(
next_id,
NodeRegistration::new(None, NodeState::Known, name, address),
);
}
entry.insert(next_id);
next_id
}
pub fn node_remote_supervisor_down(node: Node, process: Pid) {
let Some(id) = NODE_MAP.get(&node) else {
return;
};
NODE_REGISTRATIONS.alter(&id, |_, mut value| {
if value
.supervisor
.is_some_and(|supervisor| supervisor != process)
{
return value;
}
value.supervisor = None;
value.sender = None;
value.receiver = None;
value.state = NodeState::Known;
if let Some((_, links)) = NODE_LINKS.remove(&node) {
for (from, process_id) in links {
let process = Pid::local(process_id);
link_destroy(process, from);
process_exit_signal_linked(process, from, ExitReason::from("noconnection"));
}
}
if let Some((_, monitors)) = NODE_MONITORS.remove(&node) {
for (reference, monitor) in monitors {
match monitor {
NodeMonitor::Node(id) => {
process_sender(Pid::local(id)).map(|sender| {
sender.send(ProcessItem::MonitorNodeDown(node.clone(), reference))
});
}
NodeMonitor::ProcessMonitor(id, dest) => {
process_sender(Pid::local(id)).map(|sender| {
sender.send(ProcessItem::MonitorProcessDown(
dest,
reference,
ExitReason::from("noconnection"),
))
});
}
NodeMonitor::ProcessMonitorCleanup(id) => {
monitor_destroy(Pid::local(id), reference);
}
}
if reference.is_local() {
alias_destroy(reference);
}
}
}
NODE_PENDING_MESSAGES.remove(&node);
value
});
}
pub fn node_list() -> Vec<Node> {
NODE_MAP
.iter()
.filter_map(|entry| {
if matches!(entry.key(), Node::Local) {
None
} else {
Some(entry.key().clone())
}
})
.collect()
}
pub fn node_list_filtered(state: NodeState) -> Vec<Node> {
NODE_REGISTRATIONS
.iter()
.filter_map(|entry| {
if entry.state == state {
Some(Node::from((entry.name.clone(), entry.broadcast_address)))
} else {
None
}
})
.collect()
}
pub fn node_disconnect(node: Node) {
let Some(id) = NODE_MAP.get(&node) else {
return;
};
NODE_REGISTRATIONS.alter(&id, |_, mut value| {
NODE_PENDING_MESSAGES.remove(&node);
if let Some(supervisor) = value.supervisor.take() {
Process::exit(supervisor, ExitReason::Kill);
}
value.state = NodeState::Known;
value
});
}
pub fn node_forget(node: Node) {
let Some((_, id)) = NODE_MAP.remove(&node) else {
return;
};
let Some((_, registration)) = NODE_REGISTRATIONS.remove(&id) else {
return;
};
NODE_PENDING_MESSAGES.remove(&node);
if let Some(supervisor) = registration.supervisor {
Process::exit(supervisor, ExitReason::Kill);
}
}
pub fn node_lookup_local() -> Option<(String, SocketAddr)> {
NODE_REGISTRATIONS
.get(&LOCAL_NODE_ID)
.map(|registration| (registration.name.clone(), registration.broadcast_address))
}
pub fn node_lookup_remote(id: u64) -> Option<(String, SocketAddr)> {
NODE_REGISTRATIONS
.get(&id)
.map(|registration| (registration.name.clone(), registration.broadcast_address))
}
pub fn node_monitor_create(node: Node, reference: Reference, from: Pid) {
NODE_MONITORS
.entry(node)
.or_default()
.insert(reference, NodeMonitor::Node(from.id()));
}
pub fn node_process_monitor_create(node: Node, reference: Reference, dest: Dest, from: Pid) {
NODE_MONITORS
.entry(node)
.or_default()
.insert(reference, NodeMonitor::ProcessMonitor(from.id(), dest));
}
pub fn node_process_monitor_cleanup(node: Node, reference: Reference, process: Pid) {
NODE_MONITORS
.entry(node)
.or_default()
.insert(reference, NodeMonitor::ProcessMonitorCleanup(process.id()));
}
pub fn node_process_monitor_destroy(node: Node, reference: Reference) {
NODE_MONITORS.alter(&node, |_, mut value| {
value.remove(&reference);
value
});
}
pub fn node_process_monitor_destroy_all(node: Node, references: Vec<Reference>) {
NODE_MONITORS.alter(&node, |_, mut value| {
for reference in references {
value.remove(&reference);
}
value
});
}
pub fn node_process_link_destroy(node: Node, link: Pid, from: Pid) {
NODE_LINKS.alter(&node, |_, mut value| {
value.remove(&(link, from.id()));
value
});
}
pub fn node_process_link_destroy_all(node: Node, links: Vec<Pid>, from: Pid) {
NODE_LINKS.alter(&node, |_, mut value| {
for link in links {
value.remove(&(link, from.id()));
}
value
});
}
pub fn node_process_link_create(node: Node, process: Pid, from: Pid) {
NODE_LINKS
.entry(node)
.or_default()
.insert((process, from.id()));
}
pub fn node_monitor_destroy(node: Node, reference: Reference) {
NODE_MONITORS.alter(&node, |_, mut value| {
value.remove(&reference);
value
});
}
pub fn node_link_destroy(node: Node, process: Pid, from: Pid) {
NODE_LINKS.alter(&node, |_, mut value| {
value.remove(&(process, from.id()));
value
});
}
pub fn node_process_monitor_down(node: Node, reference: Reference, exit_reason: ExitReason) {
let mut monitor: Option<NodeMonitor> = None;
NODE_MONITORS.alter(&node, |_, mut value| {
monitor = value.remove(&reference);
value
});
alias_destroy(reference);
if let Some(NodeMonitor::ProcessMonitor(id, dest)) = monitor {
process_sender(Pid::local(id)).map(|sender| {
sender.send(ProcessItem::MonitorProcessDown(
dest,
reference,
exit_reason,
))
});
}
}
pub fn node_process_link_down(node: Node, process: Pid, from: Pid, exit_reason: ExitReason) {
let mut found = false;
NODE_LINKS.alter(&node, |_, mut value| {
found = value.remove(&(from, process.id()));
value
});
if found {
process_exit_signal_linked(process, from, exit_reason);
}
}
pub fn node_get_cookie() -> Option<String> {
NODE_COOKIE.lock().unwrap().clone()
}
pub fn node_set_cookie(cookie: Option<String>) {
*NODE_COOKIE.lock().unwrap() = cookie;
}