Skip to main content

hydra/
monitor.rs

1use std::collections::BTreeMap;
2
3use dashmap::DashMap;
4
5use once_cell::sync::Lazy;
6
7use crate::frame::Monitor;
8use crate::frame::MonitorDown;
9
10use crate::Dest;
11use crate::ExitReason;
12use crate::Node;
13use crate::PROCESS;
14use crate::Pid;
15use crate::ProcessInfo;
16use crate::ProcessItem;
17use crate::ProcessMonitor;
18use crate::Reference;
19use crate::alias_destroy;
20use crate::node_lookup_remote;
21use crate::node_monitor_destroy;
22use crate::node_process_monitor_create;
23use crate::node_process_monitor_destroy;
24use crate::node_process_monitor_destroy_all;
25use crate::node_register;
26use crate::node_send_frame;
27use crate::process_exists_lock;
28use crate::process_name_lookup;
29use crate::process_sender;
30
31/// A collection of the local processes being monitored, and the references that require the message.
32#[allow(clippy::type_complexity)]
33static MONITORS: Lazy<DashMap<u64, BTreeMap<Reference, (Pid, Option<Dest>)>>> =
34    Lazy::new(DashMap::new);
35
36/// Creates a monitor for the given local process and reference from the given process.
37pub fn monitor_create(process: Pid, reference: Reference, from: Pid, dest: Option<Dest>) {
38    MONITORS
39        .entry(process.id())
40        .or_default()
41        .insert(reference, (from, dest));
42}
43
44/// Destroys a monitor for the given local process and reference.
45pub fn monitor_destroy(process: Pid, reference: Reference) {
46    if process.is_local() {
47        MONITORS.alter(&process.id(), |_, mut value| {
48            value.remove(&reference);
49            value
50        });
51    } else {
52        let monitor = Monitor::new(false, Some(process.id()), None, None, reference.id());
53
54        node_send_frame(monitor.into(), reference.node());
55
56        if let Some((name, address)) = node_lookup_remote(reference.node()) {
57            node_process_monitor_destroy(Node::from((name, address)), reference);
58        }
59    }
60}
61
62/// Destroys all monitors registered for the given reference, monitor combination.
63pub fn monitor_destroy_all<'a, M: IntoIterator<Item = (&'a Reference, &'a ProcessMonitor)>>(
64    monitors: M,
65) {
66    for (reference, monitor) in monitors {
67        match monitor {
68            ProcessMonitor::ForProcess(pid) => {
69                let Some(pid) = pid else {
70                    continue;
71                };
72
73                if pid.is_local() {
74                    MONITORS.alter(&pid.id(), |_, mut value| {
75                        value.remove(reference);
76                        value
77                    });
78                } else {
79                    let monitor = Monitor::new(false, Some(pid.id()), None, None, reference.id());
80
81                    node_send_frame(monitor.into(), reference.node());
82                }
83            }
84            ProcessMonitor::ForNode(node) => {
85                node_monitor_destroy(node.clone(), *reference);
86            }
87        }
88
89        if reference.is_local() {
90            alias_destroy(*reference);
91        }
92    }
93}
94
95/// Installs a monitor for the given process.
96pub fn monitor_install(process: Dest, reference: Reference, from: Pid) {
97    let dest = process.clone();
98
99    let send_process_down = |dest: Dest, exit_reason: ExitReason| {
100        PROCESS.with(|process| {
101            process
102                .sender
103                .send(ProcessItem::MonitorProcessDown(
104                    dest,
105                    reference,
106                    exit_reason,
107                ))
108                .unwrap()
109        });
110
111        alias_destroy(reference);
112    };
113
114    match process {
115        Dest::Pid(pid) => {
116            if pid == from {
117                panic!("Can not monitor yourself!");
118            }
119
120            PROCESS.with(|process| {
121                process
122                    .monitors
123                    .borrow_mut()
124                    .insert(reference, ProcessMonitor::ForProcess(Some(pid)))
125            });
126
127            if pid.is_local() {
128                process_exists_lock(pid, |exists| {
129                    if exists {
130                        monitor_create(pid, reference, from, Some(process));
131                    } else {
132                        send_process_down(dest, ExitReason::from("noproc"));
133                    }
134                });
135            } else {
136                match node_lookup_remote(pid.node()) {
137                    Some((name, address)) => {
138                        let node = Node::from((name, address));
139
140                        node_process_monitor_create(node.clone(), reference, dest, from);
141
142                        let node = node_register(node, true);
143                        let monitor = Monitor::new(
144                            true,
145                            Some(pid.id()),
146                            None,
147                            Some(from.id()),
148                            reference.id(),
149                        );
150
151                        node_send_frame(monitor.into(), node);
152                    }
153                    None => {
154                        send_process_down(dest, ExitReason::from("noconnection"));
155                    }
156                }
157            }
158        }
159        Dest::Named(name, node) => {
160            if node.is_local() {
161                let Some(pid) = process_name_lookup(name.as_ref()) else {
162                    PROCESS.with(|process| {
163                        process
164                            .monitors
165                            .borrow_mut()
166                            .insert(reference, ProcessMonitor::ForProcess(None))
167                    });
168                    return send_process_down(dest, ExitReason::from("noproc"));
169                };
170
171                PROCESS.with(|process| {
172                    process
173                        .monitors
174                        .borrow_mut()
175                        .insert(reference, ProcessMonitor::ForProcess(Some(pid)))
176                });
177
178                process_exists_lock(pid, |exists| {
179                    if exists {
180                        monitor_create(pid, reference, from, Some(dest));
181                    } else {
182                        send_process_down(dest, ExitReason::from("noproc"));
183                    }
184                });
185            } else {
186                PROCESS.with(|process| {
187                    process
188                        .monitors
189                        .borrow_mut()
190                        .insert(reference, ProcessMonitor::ForProcess(None))
191                });
192
193                node_process_monitor_create(node.clone(), reference, dest, from);
194
195                let node = node_register(node.clone(), true);
196                let monitor = Monitor::new(
197                    true,
198                    None,
199                    Some(name.into_owned()),
200                    Some(from.id()),
201                    reference.id(),
202                );
203
204                node_send_frame(monitor.into(), node);
205            }
206        }
207        Dest::Alias(_) => panic!("Can not monitor an alias!"),
208    }
209}
210
211/// Sends monitor messages about the given process going down for the given reason.
212pub fn monitor_process_down(from: Pid, exit_reason: ExitReason) {
213    let Some(references) = MONITORS
214        .remove(&from.id())
215        .map(|(_, references)| references)
216    else {
217        return;
218    };
219
220    let mut remote_monitors: BTreeMap<u64, (MonitorDown, Vec<Reference>)> = BTreeMap::new();
221
222    for (reference, (pid, dest)) in references {
223        if pid.is_local() {
224            process_sender(pid).map(|sender| {
225                sender.send(ProcessItem::MonitorProcessDown(
226                    dest.unwrap(),
227                    reference,
228                    exit_reason.clone(),
229                ))
230            });
231        } else {
232            let remote = remote_monitors
233                .entry(pid.node())
234                .or_insert((MonitorDown::new(exit_reason.clone()), Vec::new()));
235
236            remote.0.monitors.push(reference.id());
237            remote.1.push(reference);
238        }
239
240        if reference.is_local() {
241            alias_destroy(reference);
242        }
243    }
244
245    for (node, (monitor_down, references)) in remote_monitors {
246        if let Some((name, address)) = node_lookup_remote(node) {
247            node_process_monitor_destroy_all(Node::from((name, address)), references);
248        }
249
250        node_send_frame(monitor_down.into(), node);
251    }
252}
253
254/// Fills in monitor information for the process.
255pub fn monitor_fill_info(pid: Pid, info: &mut ProcessInfo) {
256    let Some(monitors) = MONITORS.get(&pid.id()) else {
257        return;
258    };
259
260    info.monitored_by = monitors.value().values().map(|entry| entry.0).collect();
261}