use std::collections::HashMap;
use std::fmt;
use std::sync::{Arc, Mutex};
use crate::atom::Atom;
use crate::process::ExitReason;
use crate::term::Term;
const REMOTE_MONITOR_REFERENCE_START: u64 = 1 << 56;
const REMOTE_MONITOR_REFERENCE_MAX: u64 = Term::SMALL_INT_MAX as u64;
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
pub struct RemotePid {
pub node: Atom,
pub pid_number: u64,
pub serial: u64,
}
impl RemotePid {
#[must_use]
pub const fn new(node: Atom, pid_number: u64, serial: u64) -> Self {
Self {
node,
pid_number,
serial,
}
}
}
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum MonitorControlMessage {
MonitorP {
reference: u64,
watcher: RemotePid,
target: RemotePid,
},
DemonitorP {
reference: u64,
watcher: RemotePid,
target: RemotePid,
},
MonitorPExit {
reference: u64,
target: RemotePid,
reason: ExitReason,
},
}
impl MonitorControlMessage {
#[must_use]
pub const fn opcode(self) -> u8 {
match self {
Self::MonitorP { .. } => 19,
Self::DemonitorP { .. } => 20,
Self::MonitorPExit { .. } => 21,
}
}
}
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub struct OutboundMonitorControl {
pub node: Atom,
pub message: MonitorControlMessage,
}
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum ControlSendError {
NoConnection,
ReferenceExhausted,
}
impl fmt::Display for ControlSendError {
fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::NoConnection => formatter.write_str("no distribution connection"),
Self::ReferenceExhausted => {
formatter.write_str("remote monitor reference space exhausted")
}
}
}
}
impl std::error::Error for ControlSendError {}
pub trait MonitorControlSender: Send + Sync {
fn send_control(
&self,
node: Atom,
message: MonitorControlMessage,
) -> Result<(), ControlSendError>;
}
#[derive(Default)]
pub struct RecordingMonitorSender {
sent: Mutex<Vec<OutboundMonitorControl>>,
}
impl RecordingMonitorSender {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub fn drain(&self) -> Vec<OutboundMonitorControl> {
let mut sent = self.sent.lock().unwrap_or_else(|error| error.into_inner());
sent.drain(..).collect()
}
}
impl MonitorControlSender for RecordingMonitorSender {
fn send_control(
&self,
node: Atom,
message: MonitorControlMessage,
) -> Result<(), ControlSendError> {
self.sent
.lock()
.unwrap_or_else(|error| error.into_inner())
.push(OutboundMonitorControl { node, message });
Ok(())
}
}
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub struct OutboundRemoteMonitor {
pub watcher_pid: u64,
pub reference: u64,
pub target: RemotePid,
}
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub struct InboundRemoteMonitor {
pub watcher: RemotePid,
pub reference: u64,
pub target_pid: u64,
}
struct RemoteMonitorState {
next_reference: u64,
outbound_by_ref: HashMap<u64, OutboundRemoteMonitor>,
inbound: Vec<InboundRemoteMonitor>,
}
impl Default for RemoteMonitorState {
fn default() -> Self {
Self {
next_reference: REMOTE_MONITOR_REFERENCE_START,
outbound_by_ref: HashMap::new(),
inbound: Vec::new(),
}
}
}
pub struct ControlPlane {
local_node: Atom,
state: Mutex<RemoteMonitorState>,
sender: Arc<dyn MonitorControlSender>,
}
impl ControlPlane {
#[must_use]
pub fn new(local_node: Atom, sender: Arc<dyn MonitorControlSender>) -> Self {
Self {
local_node,
state: Mutex::new(RemoteMonitorState::default()),
sender,
}
}
pub fn monitor_remote(
&self,
watcher_pid: u64,
target: RemotePid,
) -> Result<u64, ControlSendError> {
let reference = {
let mut state = self.state.lock().unwrap_or_else(|error| error.into_inner());
if state.next_reference > REMOTE_MONITOR_REFERENCE_MAX {
return Err(ControlSendError::ReferenceExhausted);
}
let reference = state.next_reference;
state.next_reference = state.next_reference.saturating_add(1);
state.outbound_by_ref.insert(
reference,
OutboundRemoteMonitor {
watcher_pid,
reference,
target,
},
);
reference
};
let watcher = RemotePid::new(self.local_node, watcher_pid, 0);
let message = MonitorControlMessage::MonitorP {
reference,
watcher,
target,
};
if let Err(error) = self.sender.send_control(target.node, message) {
let mut state = self
.state
.lock()
.unwrap_or_else(|poison| poison.into_inner());
state.outbound_by_ref.remove(&reference);
return Err(error);
}
Ok(reference)
}
pub fn demonitor_remote(
&self,
watcher_pid: u64,
reference: u64,
) -> Result<bool, ControlSendError> {
let monitor = {
let mut state = self.state.lock().unwrap_or_else(|error| error.into_inner());
match state.outbound_by_ref.get(&reference).copied() {
Some(monitor) if monitor.watcher_pid == watcher_pid => {
state.outbound_by_ref.remove(&reference);
Some(monitor)
}
_ => None,
}
};
let Some(monitor) = monitor else {
return Ok(false);
};
let watcher = RemotePid::new(self.local_node, watcher_pid, 0);
self.sender.send_control(
monitor.target.node,
MonitorControlMessage::DemonitorP {
reference,
watcher,
target: monitor.target,
},
)?;
Ok(true)
}
pub fn register_inbound_monitor(&self, reference: u64, watcher: RemotePid, target_pid: u64) {
let mut state = self.state.lock().unwrap_or_else(|error| error.into_inner());
state.inbound.retain(|monitor| {
!(monitor.reference == reference
&& monitor.watcher == watcher
&& monitor.target_pid == target_pid)
});
state.inbound.push(InboundRemoteMonitor {
watcher,
reference,
target_pid,
});
}
pub fn remove_inbound_monitor(&self, reference: u64, watcher: RemotePid, target_pid: u64) {
let mut state = self.state.lock().unwrap_or_else(|error| error.into_inner());
state.inbound.retain(|monitor| {
!(monitor.reference == reference
&& monitor.watcher == watcher
&& monitor.target_pid == target_pid)
});
}
pub fn collect_inbound_for_target(&self, target_pid: u64) -> Vec<InboundRemoteMonitor> {
let mut state = self.state.lock().unwrap_or_else(|error| error.into_inner());
let mut drained = Vec::new();
state.inbound.retain(|monitor| {
if monitor.target_pid == target_pid {
drained.push(*monitor);
false
} else {
true
}
});
drained
}
pub fn take_outbound_for_exit(&self, reference: u64) -> Option<OutboundRemoteMonitor> {
self.state
.lock()
.unwrap_or_else(|error| error.into_inner())
.outbound_by_ref
.remove(&reference)
}
pub fn collect_outbound_for_node(&self, node: Atom) -> Vec<OutboundRemoteMonitor> {
let mut state = self.state.lock().unwrap_or_else(|error| error.into_inner());
let references: Vec<u64> = state
.outbound_by_ref
.iter()
.filter_map(|(reference, monitor)| (monitor.target.node == node).then_some(*reference))
.collect();
references
.into_iter()
.filter_map(|reference| state.outbound_by_ref.remove(&reference))
.collect()
}
pub fn remove_inbound_for_watcher_node(&self, node: Atom) {
let mut state = self.state.lock().unwrap_or_else(|error| error.into_inner());
state.inbound.retain(|monitor| monitor.watcher.node != node);
}
pub fn send_monitor_exit(
&self,
monitor: InboundRemoteMonitor,
reason: ExitReason,
) -> Result<(), ControlSendError> {
let target = RemotePid::new(self.local_node, monitor.target_pid, 0);
self.sender.send_control(
monitor.watcher.node,
MonitorControlMessage::MonitorPExit {
reference: monitor.reference,
target,
reason,
},
)
}
#[must_use]
pub fn sender(&self) -> Arc<dyn MonitorControlSender> {
Arc::clone(&self.sender)
}
}
pub trait DistributionMonitorFacility: Send + Sync {
fn monitor_remote(&self, watcher_pid: u64, target: RemotePid) -> Result<u64, ControlSendError>;
fn demonitor_remote(&self, watcher_pid: u64, reference: u64) -> Result<bool, ControlSendError>;
}
impl DistributionMonitorFacility for ControlPlane {
fn monitor_remote(&self, watcher_pid: u64, target: RemotePid) -> Result<u64, ControlSendError> {
self.monitor_remote(watcher_pid, target)
}
fn demonitor_remote(&self, watcher_pid: u64, reference: u64) -> Result<bool, ControlSendError> {
self.demonitor_remote(watcher_pid, reference)
}
}
#[cfg(test)]
#[path = "control_monitor_tests.rs"]
mod tests;