use std::collections::HashMap;
use crate::{
atom::Atom,
process::{ExitReason, Monitor, Process},
term::{Term, boxed},
};
pub type Reference = u64;
#[derive(Debug, Default)]
pub struct MonitorSet {
next_reference: Reference,
monitors: HashMap<Reference, Monitor>,
by_target: HashMap<u64, Vec<Reference>>,
dead: HashMap<u64, ExitReason>,
}
impl MonitorSet {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub fn record_dead(&mut self, pid: u64, reason: ExitReason) {
self.dead.insert(pid, reason);
}
pub fn monitor(&mut self, watcher: &mut Process, target: &mut Process) -> Reference {
let reference = self.allocate_reference();
let monitor = Monitor::new(reference, watcher.pid(), target.pid());
watcher.add_monitor(monitor);
target.add_monitor(monitor);
self.monitors.insert(reference, monitor);
self.by_target
.entry(target.pid())
.or_default()
.push(reference);
reference
}
pub fn monitor_pid(&mut self, watcher: &mut Process, target_pid: u64) -> Reference {
let reference = self.allocate_reference();
if let Some(reason) = self.dead.get(&target_pid).copied() {
if enqueue_down_message(watcher, reference, target_pid, reason).is_err() {
watcher.terminate(ExitReason::Error);
}
return reference;
}
let monitor = Monitor::new(reference, watcher.pid(), target_pid);
watcher.add_monitor(monitor);
self.monitors.insert(reference, monitor);
self.by_target
.entry(target_pid)
.or_default()
.push(reference);
reference
}
pub fn demonitor(
&mut self,
reference: Reference,
processes: &mut [&mut Process],
) -> Option<Monitor> {
let monitor = self.monitors.remove(&reference)?;
if let Some(references) = self.by_target.get_mut(&monitor.target()) {
references.retain(|seen| *seen != reference);
if references.is_empty() {
self.by_target.remove(&monitor.target());
}
}
for process in processes.iter_mut() {
process.remove_monitor(reference);
}
Some(monitor)
}
pub fn process_exited(
&mut self,
target_pid: u64,
reason: ExitReason,
processes: &mut [&mut Process],
) {
self.record_dead(target_pid, reason);
let references = self.by_target.remove(&target_pid).unwrap_or_default();
for reference in references {
if let Some(monitor) = self.monitors.remove(&reference) {
if let Some(index) = process_index_by_pid(processes, monitor.watcher()) {
let watcher = &mut processes[index];
watcher.remove_monitor(reference);
if enqueue_down_message(watcher, reference, target_pid, reason).is_err() {
watcher.terminate(ExitReason::Error);
}
}
if let Some(index) = process_index_by_pid(processes, target_pid) {
processes[index].remove_monitor(reference);
}
}
}
}
pub fn collect_watchers_and_remove(
&mut self,
target_pid: u64,
reason: ExitReason,
) -> Vec<(u64, Reference)> {
self.record_dead(target_pid, reason);
let references = self.by_target.remove(&target_pid).unwrap_or_default();
let mut watchers = Vec::with_capacity(references.len());
for reference in references {
if let Some(monitor) = self.monitors.remove(&reference) {
watchers.push((monitor.watcher(), reference));
}
}
watchers
}
pub fn allocate_reference_pub(&mut self) -> Reference {
self.allocate_reference()
}
#[must_use]
pub fn get_monitor(&self, reference: Reference) -> Option<crate::process::Monitor> {
self.monitors.get(&reference).copied()
}
pub fn register_monitor(
&mut self,
reference: Reference,
monitor: crate::process::Monitor,
target_pid: u64,
) {
self.monitors.insert(reference, monitor);
self.by_target
.entry(target_pid)
.or_default()
.push(reference);
}
pub fn remove_monitor(&mut self, reference: Reference) {
if let Some(monitor) = self.monitors.remove(&reference)
&& let Some(references) = self.by_target.get_mut(&monitor.target())
{
references.retain(|r| *r != reference);
if references.is_empty() {
self.by_target.remove(&monitor.target());
}
}
}
fn allocate_reference(&mut self) -> Reference {
let reference = self.next_reference;
self.next_reference = self.next_reference.saturating_add(1);
reference
}
}
pub fn enqueue_down_message_pub(
watcher: &mut Process,
reference: Reference,
target_pid: u64,
reason: ExitReason,
) {
if enqueue_down_message(watcher, reference, target_pid, reason).is_err() {
watcher.terminate(ExitReason::Error);
}
}
fn enqueue_down_message(
watcher: &mut Process,
reference: Reference,
target_pid: u64,
reason: ExitReason,
) -> Result<(), ()> {
const DOWN_MESSAGE_WORDS: usize = 7;
crate::gc::ensure_space(watcher, DOWN_MESSAGE_WORDS, 256).map_err(|_| ())?;
let reference_words = watcher.heap_mut().alloc(2).map_err(|_| ())?;
let reference_words = unsafe { std::slice::from_raw_parts_mut(reference_words, 2) };
let reference_term = boxed::write_reference(reference_words, reference).ok_or(())?;
let elements = [
Term::atom(Atom::DOWN),
reference_term,
Term::atom(Atom::PROCESS),
Term::pid(target_pid),
reason.as_term(),
];
let words = watcher.heap_mut().alloc(1 + elements.len()).map_err(|_| ())?;
let words = unsafe { std::slice::from_raw_parts_mut(words, 1 + elements.len()) };
let message = boxed::write_tuple(words, &elements).ok_or(())?;
watcher.mailbox_mut().push_owned(message);
Ok(())
}
fn process_index_by_pid(processes: &[&mut Process], pid: u64) -> Option<usize> {
processes.iter().position(|process| process.pid() == pid)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
process::{Monitor, ProcessStatus},
term::boxed::{Reference as RefTerm, Tuple},
};
fn running(pid: u64) -> Process {
let mut process = Process::new(pid, 64);
process
.transition_to(ProcessStatus::Running)
.unwrap_or_else(|error| panic!("process starts: {error}"));
process
}
fn down_tuple(process: &mut Process) -> Tuple {
process.mailbox_mut().drain_arrival();
Tuple::new(
process
.mailbox()
.front_for_test()
.unwrap_or_else(|| panic!("down message exists")),
)
.unwrap_or_else(|| panic!("down message is tuple"))
}
#[test]
fn monitor_returns_unique_references_and_down_message_without_killing_watcher() {
let mut monitors = MonitorSet::new();
let mut watcher = running(1);
let mut target = running(2);
let first = monitors.monitor(&mut watcher, &mut target);
let second = monitors.monitor(&mut watcher, &mut target);
assert_ne!(first, second);
monitors.process_exited(
target.pid(),
ExitReason::Error,
&mut [&mut watcher, &mut target],
);
assert_eq!(watcher.status(), ProcessStatus::Running);
let tuple = down_tuple(&mut watcher);
assert_eq!(tuple.arity(), 5);
assert_eq!(tuple.get(0), Some(Term::atom(Atom::DOWN)));
assert_eq!(
tuple.get(1).and_then(RefTerm::new).map(RefTerm::id),
Some(first)
);
assert_eq!(tuple.get(2), Some(Term::atom(Atom::PROCESS)));
assert_eq!(tuple.get(3).and_then(Term::as_pid), Some(2));
assert_eq!(tuple.get(4), Some(Term::atom(Atom::ERROR)));
}
#[test]
fn demonitor_prevents_down_delivery() {
let mut monitors = MonitorSet::new();
let mut watcher = running(1);
let mut target = running(2);
let reference = monitors.monitor(&mut watcher, &mut target);
assert_eq!(
monitors.demonitor(reference, &mut [&mut watcher, &mut target]),
Some(Monitor::new(reference, 1, 2))
);
monitors.process_exited(
target.pid(),
ExitReason::Error,
&mut [&mut watcher, &mut target],
);
assert_eq!(watcher.status(), ProcessStatus::Running);
assert!(watcher.mailbox().is_empty());
}
#[test]
fn monitor_dead_process_delivers_immediate_down() {
let mut monitors = MonitorSet::new();
let mut watcher = running(1);
monitors.record_dead(2, ExitReason::Normal);
let reference = monitors.monitor_pid(&mut watcher, 2);
let tuple = down_tuple(&mut watcher);
assert_eq!(tuple.get(0), Some(Term::atom(Atom::DOWN)));
assert_eq!(
tuple.get(1).and_then(RefTerm::new).map(RefTerm::id),
Some(reference)
);
assert_eq!(tuple.get(3).and_then(Term::as_pid), Some(2));
assert_eq!(tuple.get(4), Some(Term::atom(Atom::NORMAL)));
}
fn alloc_young_tuple(process: &mut Process, elements: &[Term]) -> Term {
let ptr = process
.heap_mut()
.alloc(1 + elements.len())
.unwrap_or_else(|_| panic!("young tuple fits"));
let words = unsafe { std::slice::from_raw_parts_mut(ptr, 1 + elements.len()) };
boxed::write_tuple(words, elements).unwrap_or_else(|| panic!("tuple writes"))
}
#[test]
fn down_on_near_full_heap_preserves_live_young_terms() {
let mut monitors = MonitorSet::new();
let mut watcher = Process::new(1, 16);
watcher
.transition_to(ProcessStatus::Running)
.unwrap_or_else(|error| panic!("process starts: {error}"));
monitors.record_dead(2, ExitReason::Error);
let live = alloc_young_tuple(&mut watcher, &[Term::small_int(7), Term::small_int(8)]);
watcher.set_x_reg(0, live);
while watcher.heap().available() >= 7 {
let _ = watcher.heap_mut().alloc(1);
}
assert!(watcher.heap().available() < 7);
let reference = monitors.monitor_pid(&mut watcher, 2);
let recovered =
Tuple::new(watcher.x_reg(0)).unwrap_or_else(|| panic!("X0 is still a tuple"));
assert_eq!(recovered.arity(), 2);
assert_eq!(recovered.get(0), Some(Term::small_int(7)));
assert_eq!(recovered.get(1), Some(Term::small_int(8)));
assert_eq!(watcher.status(), ProcessStatus::Running);
let tuple = down_tuple(&mut watcher);
assert_eq!(tuple.get(0), Some(Term::atom(Atom::DOWN)));
assert_eq!(
tuple.get(1).and_then(RefTerm::new).map(RefTerm::id),
Some(reference)
);
}
fn exhausted_watcher(pid: u64) -> Process {
let mut watcher = Process::new(pid, 4);
watcher
.transition_to(ProcessStatus::Running)
.unwrap_or_else(|error| panic!("process starts: {error}"));
watcher.heap_mut().set_max_capacity(4);
watcher
}
#[test]
fn down_delivery_without_growth_room_terminates_instead_of_panicking() {
let mut watcher = exhausted_watcher(1);
enqueue_down_message_pub(&mut watcher, 0, 2, ExitReason::Error);
assert_eq!(watcher.status(), ProcessStatus::Exited(ExitReason::Error));
let mut victim = exhausted_watcher(3);
assert!(enqueue_down_message(&mut victim, 0, 2, ExitReason::Error).is_err());
}
#[test]
fn down_delivery_grows_mailbox_heap_instead_of_dropping_signal() {
let mut monitors = MonitorSet::new();
let mut watcher = Process::new(1, 1);
watcher
.transition_to(ProcessStatus::Running)
.unwrap_or_else(|error| panic!("process starts: {error}"));
monitors.record_dead(2, ExitReason::Error);
let reference = monitors.monitor_pid(&mut watcher, 2);
assert_eq!(watcher.status(), ProcessStatus::Running);
assert!(watcher.heap().capacity() >= 7);
let tuple = down_tuple(&mut watcher);
assert_eq!(tuple.get(0), Some(Term::atom(Atom::DOWN)));
assert_eq!(
tuple.get(1).and_then(RefTerm::new).map(RefTerm::id),
Some(reference)
);
assert_eq!(tuple.get(4), Some(Term::atom(Atom::ERROR)));
}
}