use crate::{
atom::Atom,
process::ExitReason,
term::{Term, boxed::Tuple, pid_ref::PidRef},
};
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
#[repr(i64)]
pub enum ControlOp {
Link = 1,
Send = 2,
Exit = 3,
Unlink = 4,
RegSend = 6,
Exit2 = 8,
MonitorP = 19,
DemonitorP = 20,
MonitorPExit = 21,
SpawnRequest = 29,
SpawnReply = 31,
}
impl ControlOp {
#[must_use]
pub const fn from_opcode(opcode: i64) -> Option<Self> {
match opcode {
1 => Some(Self::Link),
2 => Some(Self::Send),
3 => Some(Self::Exit),
4 => Some(Self::Unlink),
6 => Some(Self::RegSend),
8 => Some(Self::Exit2),
19 => Some(Self::MonitorP),
20 => Some(Self::DemonitorP),
21 => Some(Self::MonitorPExit),
29 => Some(Self::SpawnRequest),
31 => Some(Self::SpawnReply),
_ => None,
}
}
#[must_use]
pub const fn opcode(self) -> i64 {
self as i64
}
}
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
pub struct DistributedPid {
pub node: Option<Atom>,
pub pid_number: u64,
pub serial: u64,
}
impl DistributedPid {
pub fn from_term(term: Term) -> Option<Self> {
let pid = PidRef::new(term)?;
Some(Self {
node: pid.node(),
pid_number: pid.pid_number(),
serial: pid.serial(),
})
}
#[must_use]
pub const fn local(pid_number: u64) -> Self {
Self {
node: None,
pid_number,
serial: 0,
}
}
#[must_use]
pub const fn remote(node: Atom, pid_number: u64, serial: u64) -> Self {
Self {
node: Some(node),
pid_number,
serial,
}
}
}
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub struct OutboundControlMessage {
pub op: ControlOp,
pub from: Term,
pub to: Term,
pub reason: Option<ExitReason>,
}
impl OutboundControlMessage {
#[must_use]
pub const fn link(from: Term, to: Term) -> Self {
Self {
op: ControlOp::Link,
from,
to,
reason: None,
}
}
#[must_use]
pub const fn unlink(from: Term, to: Term) -> Self {
Self {
op: ControlOp::Unlink,
from,
to,
reason: None,
}
}
#[must_use]
pub const fn exit(from: Term, to: Term, reason: ExitReason) -> Self {
Self {
op: ControlOp::Exit,
from,
to,
reason: Some(reason),
}
}
#[must_use]
pub const fn exit2(from: Term, to: Term, reason: ExitReason) -> Self {
Self {
op: ControlOp::Exit2,
from,
to,
reason: Some(reason),
}
}
}
pub trait ControlMessageSink {
fn send_control(
&mut self,
node: Atom,
message: OutboundControlMessage,
) -> Result<(), LifecycleError>;
}
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum LifecycleError {
MalformedControl,
NotRemotePid,
SendFailed,
}
pub trait ControlDiagnostics {
fn unknown_opcode(&mut self, opcode: i64);
fn malformed_control(&mut self, op: Option<ControlOp>);
}
#[derive(Copy, Clone, Debug, Default)]
pub struct NoopDiagnostics;
impl ControlDiagnostics for NoopDiagnostics {
fn unknown_opcode(&mut self, _opcode: i64) {}
fn malformed_control(&mut self, _op: Option<ControlOp>) {}
}
pub trait ControlMessageHandler {
fn handle_send(&mut self, _tuple: Tuple) {}
fn handle_reg_send(&mut self, _tuple: Tuple) {}
fn handle_link(
&mut self,
_from: DistributedPid,
_from_term: Term,
_to: DistributedPid,
_to_term: Term,
) {
}
fn handle_unlink(&mut self, _from: DistributedPid, _to: DistributedPid) {}
fn handle_exit(&mut self, _from: DistributedPid, _to: DistributedPid, _r: ExitReason) {}
fn handle_exit2(&mut self, _from: DistributedPid, _to: DistributedPid, _r: ExitReason) {}
fn handle_monitor_p(&mut self, _tuple: Tuple) {}
fn handle_demonitor_p(&mut self, _tuple: Tuple) {}
fn handle_monitor_p_exit(&mut self, _tuple: Tuple) {}
fn handle_spawn_request(&mut self, _tuple: Tuple) {}
fn handle_spawn_reply(&mut self, _tuple: Tuple) {}
}
pub fn dispatch_control_message<H, D>(term: Term, handler: &mut H, diagnostics: &mut D)
where
H: ControlMessageHandler,
D: ControlDiagnostics,
{
let Some(tuple) = Tuple::new(term) else {
diagnostics.malformed_control(None);
return;
};
let Some(opcode) = tuple.get(0).and_then(Term::as_small_int) else {
diagnostics.malformed_control(None);
return;
};
let Some(op) = ControlOp::from_opcode(opcode) else {
diagnostics.unknown_opcode(opcode);
return;
};
if !dispatch_known(tuple, op, handler) {
diagnostics.malformed_control(Some(op));
}
}
fn dispatch_known<H: ControlMessageHandler>(tuple: Tuple, op: ControlOp, h: &mut H) -> bool {
match op {
ControlOp::Send => {
h.handle_send(tuple);
true
}
ControlOp::RegSend => {
h.handle_reg_send(tuple);
true
}
ControlOp::Link => dispatch_link(tuple, h),
ControlOp::Exit => dispatch_exit(tuple, h, false),
ControlOp::Unlink => dispatch_unlink(tuple, h),
ControlOp::MonitorP => {
h.handle_monitor_p(tuple);
true
}
ControlOp::DemonitorP => {
h.handle_demonitor_p(tuple);
true
}
ControlOp::MonitorPExit => {
h.handle_monitor_p_exit(tuple);
true
}
ControlOp::Exit2 => dispatch_exit(tuple, h, true),
ControlOp::SpawnRequest => {
h.handle_spawn_request(tuple);
true
}
ControlOp::SpawnReply => {
h.handle_spawn_reply(tuple);
true
}
}
}
fn dispatch_link<H: ControlMessageHandler>(tuple: Tuple, h: &mut H) -> bool {
let Some((from, ft, to, tt)) = pid_pair_terms(tuple) else {
return false;
};
h.handle_link(from, ft, to, tt);
true
}
fn dispatch_unlink<H: ControlMessageHandler>(tuple: Tuple, h: &mut H) -> bool {
let Some((from, to)) = pid_pair(tuple) else {
return false;
};
h.handle_unlink(from, to);
true
}
fn dispatch_exit<H: ControlMessageHandler>(tuple: Tuple, h: &mut H, explicit: bool) -> bool {
let Some((from, to)) = pid_pair(tuple) else {
return false;
};
let Some(reason) = tuple.get(3).and_then(exit_reason_from_term) else {
return false;
};
if explicit {
h.handle_exit2(from, to, reason);
} else {
h.handle_exit(from, to, reason);
}
true
}
fn pid_pair(tuple: Tuple) -> Option<(DistributedPid, DistributedPid)> {
let (from, _, to, _) = pid_pair_terms(tuple)?;
Some((from, to))
}
fn pid_pair_terms(tuple: Tuple) -> Option<(DistributedPid, Term, DistributedPid, Term)> {
if tuple.arity() < 3 {
return None;
}
let ft = tuple.get(1)?;
let tt = tuple.get(2)?;
Some((
DistributedPid::from_term(ft)?,
ft,
DistributedPid::from_term(tt)?,
tt,
))
}
#[must_use]
pub fn exit_reason_from_term(term: Term) -> Option<ExitReason> {
match term.as_atom()? {
Atom::NORMAL => Some(ExitReason::Normal),
Atom::KILL => Some(ExitReason::Kill),
Atom::KILLED => Some(ExitReason::Killed),
Atom::ERROR => Some(ExitReason::Error),
Atom::NOCONNECTION => Some(ExitReason::NoConnection),
_ => None,
}
}
#[derive(Clone, Debug, Default)]
pub struct ControlLifecycleState {
links: Vec<RemoteLink>,
delivered_exits: Vec<(DistributedPid, DistributedPid, ExitReason)>,
}
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub struct RemoteLink {
pub left: DistributedPid,
pub left_term: Term,
pub right: DistributedPid,
pub right_term: Term,
}
impl ControlLifecycleState {
pub fn establish_link(&mut self, left: DistributedPid, right: DistributedPid) -> bool {
let lt = pid_term(left).unwrap_or(Term::NIL);
let rt = pid_term(right).unwrap_or(Term::NIL);
self.establish_link_terms(left, lt, right, rt)
}
pub fn establish_link_terms(
&mut self,
left: DistributedPid,
left_term: Term,
right: DistributedPid,
right_term: Term,
) -> bool {
if left == right || self.has_link(left, right) {
return false;
}
self.links.push(RemoteLink {
left,
left_term,
right,
right_term,
});
true
}
pub fn remove_link(&mut self, left: DistributedPid, right: DistributedPid) -> bool {
let before = self.links.len();
self.links
.retain(|lk| !same_link(lk.left, lk.right, left, right));
before != self.links.len()
}
#[must_use]
pub fn has_link(&self, left: DistributedPid, right: DistributedPid) -> bool {
self.links
.iter()
.any(|lk| same_link(lk.left, lk.right, left, right))
}
#[must_use]
pub fn links(&self) -> &[RemoteLink] {
&self.links
}
pub fn deliver_exit(&mut self, from: DistributedPid, to: DistributedPid, reason: ExitReason) {
self.delivered_exits.push((from, to, reason));
self.remove_link(from, to);
}
#[must_use]
pub fn delivered_exits(&self) -> &[(DistributedPid, DistributedPid, ExitReason)] {
&self.delivered_exits
}
}
impl ControlMessageHandler for ControlLifecycleState {
fn handle_link(&mut self, from: DistributedPid, ft: Term, to: DistributedPid, tt: Term) {
self.establish_link_terms(from, ft, to, tt);
}
fn handle_unlink(&mut self, from: DistributedPid, to: DistributedPid) {
self.remove_link(from, to);
}
fn handle_exit(&mut self, from: DistributedPid, to: DistributedPid, reason: ExitReason) {
self.deliver_exit(from, to, reason);
}
fn handle_exit2(&mut self, from: DistributedPid, to: DistributedPid, reason: ExitReason) {
self.deliver_exit(from, to, reason);
}
}
fn same_link(sl: DistributedPid, sr: DistributedPid, l: DistributedPid, r: DistributedPid) -> bool {
(sl == l && sr == r) || (sl == r && sr == l)
}
fn pid_term(pid: DistributedPid) -> Option<Term> {
if pid.node.is_none() {
Term::try_pid(pid.pid_number)
} else {
None
}
}
pub fn link_remote<S: ControlMessageSink>(
sink: &mut S,
state: &mut ControlLifecycleState,
local_pid: Term,
remote_pid: Term,
) -> Result<(), LifecycleError> {
let local = DistributedPid::from_term(local_pid).ok_or(LifecycleError::MalformedControl)?;
let remote = DistributedPid::from_term(remote_pid).ok_or(LifecycleError::MalformedControl)?;
let node = remote.node.ok_or(LifecycleError::NotRemotePid)?;
sink.send_control(node, OutboundControlMessage::link(local_pid, remote_pid))?;
state.establish_link_terms(local, local_pid, remote, remote_pid);
Ok(())
}
pub fn unlink_remote<S: ControlMessageSink>(
sink: &mut S,
state: &mut ControlLifecycleState,
local_pid: Term,
remote_pid: Term,
) -> Result<(), LifecycleError> {
let local = DistributedPid::from_term(local_pid).ok_or(LifecycleError::MalformedControl)?;
let remote = DistributedPid::from_term(remote_pid).ok_or(LifecycleError::MalformedControl)?;
let node = remote.node.ok_or(LifecycleError::NotRemotePid)?;
sink.send_control(node, OutboundControlMessage::unlink(local_pid, remote_pid))?;
state.remove_link(local, remote);
Ok(())
}
pub fn propagate_remote_exit<S: ControlMessageSink>(
sink: &mut S,
state: &mut ControlLifecycleState,
exiting_pid: Term,
reason: ExitReason,
) -> Result<(), LifecycleError> {
let source = DistributedPid::from_term(exiting_pid).ok_or(LifecycleError::MalformedControl)?;
let peers: Vec<(DistributedPid, Term)> = state
.links()
.iter()
.filter_map(|link| {
if link.left == source {
Some((link.right, link.right_term))
} else if link.right == source {
Some((link.left, link.left_term))
} else {
None
}
})
.collect();
for (peer, peer_term) in peers {
let Some(node) = peer.node else { continue };
if peer_term == Term::NIL {
return Err(LifecycleError::MalformedControl);
}
sink.send_control(
node,
OutboundControlMessage::exit(exiting_pid, peer_term, reason),
)?;
state.remove_link(source, peer);
}
Ok(())
}
pub fn exit2_remote<S: ControlMessageSink>(
sink: &mut S,
from_pid: Term,
remote_pid: Term,
reason: ExitReason,
) -> Result<(), LifecycleError> {
let remote = DistributedPid::from_term(remote_pid).ok_or(LifecycleError::MalformedControl)?;
let node = remote.node.ok_or(LifecycleError::NotRemotePid)?;
sink.send_control(
node,
OutboundControlMessage::exit2(from_pid, remote_pid, reason),
)
}
#[cfg(test)]
#[path = "control_lifecycle_tests.rs"]
mod tests;