1use std::collections::BTreeMap;
2
3use dashmap::DashMap;
4
5use once_cell::sync::Lazy;
6
7use crate::frame::Monitor;
8use crate::frame::MonitorDown;
9
10use crate::Dest;
11use crate::ExitReason;
12use crate::Node;
13use crate::PROCESS;
14use crate::Pid;
15use crate::ProcessInfo;
16use crate::ProcessItem;
17use crate::ProcessMonitor;
18use crate::Reference;
19use crate::alias_destroy;
20use crate::node_lookup_remote;
21use crate::node_monitor_destroy;
22use crate::node_process_monitor_create;
23use crate::node_process_monitor_destroy;
24use crate::node_process_monitor_destroy_all;
25use crate::node_register;
26use crate::node_send_frame;
27use crate::process_exists_lock;
28use crate::process_name_lookup;
29use crate::process_sender;
30
31#[allow(clippy::type_complexity)]
33static MONITORS: Lazy<DashMap<u64, BTreeMap<Reference, (Pid, Option<Dest>)>>> =
34 Lazy::new(DashMap::new);
35
36pub fn monitor_create(process: Pid, reference: Reference, from: Pid, dest: Option<Dest>) {
38 MONITORS
39 .entry(process.id())
40 .or_default()
41 .insert(reference, (from, dest));
42}
43
44pub fn monitor_destroy(process: Pid, reference: Reference) {
46 if process.is_local() {
47 MONITORS.alter(&process.id(), |_, mut value| {
48 value.remove(&reference);
49 value
50 });
51 } else {
52 let monitor = Monitor::new(false, Some(process.id()), None, None, reference.id());
53
54 node_send_frame(monitor.into(), reference.node());
55
56 if let Some((name, address)) = node_lookup_remote(reference.node()) {
57 node_process_monitor_destroy(Node::from((name, address)), reference);
58 }
59 }
60}
61
62pub fn monitor_destroy_all<'a, M: IntoIterator<Item = (&'a Reference, &'a ProcessMonitor)>>(
64 monitors: M,
65) {
66 for (reference, monitor) in monitors {
67 match monitor {
68 ProcessMonitor::ForProcess(pid) => {
69 let Some(pid) = pid else {
70 continue;
71 };
72
73 if pid.is_local() {
74 MONITORS.alter(&pid.id(), |_, mut value| {
75 value.remove(reference);
76 value
77 });
78 } else {
79 let monitor = Monitor::new(false, Some(pid.id()), None, None, reference.id());
80
81 node_send_frame(monitor.into(), reference.node());
82 }
83 }
84 ProcessMonitor::ForNode(node) => {
85 node_monitor_destroy(node.clone(), *reference);
86 }
87 }
88
89 if reference.is_local() {
90 alias_destroy(*reference);
91 }
92 }
93}
94
95pub fn monitor_install(process: Dest, reference: Reference, from: Pid) {
97 let dest = process.clone();
98
99 let send_process_down = |dest: Dest, exit_reason: ExitReason| {
100 PROCESS.with(|process| {
101 process
102 .sender
103 .send(ProcessItem::MonitorProcessDown(
104 dest,
105 reference,
106 exit_reason,
107 ))
108 .unwrap()
109 });
110
111 alias_destroy(reference);
112 };
113
114 match process {
115 Dest::Pid(pid) => {
116 if pid == from {
117 panic!("Can not monitor yourself!");
118 }
119
120 PROCESS.with(|process| {
121 process
122 .monitors
123 .borrow_mut()
124 .insert(reference, ProcessMonitor::ForProcess(Some(pid)))
125 });
126
127 if pid.is_local() {
128 process_exists_lock(pid, |exists| {
129 if exists {
130 monitor_create(pid, reference, from, Some(process));
131 } else {
132 send_process_down(dest, ExitReason::from("noproc"));
133 }
134 });
135 } else {
136 match node_lookup_remote(pid.node()) {
137 Some((name, address)) => {
138 let node = Node::from((name, address));
139
140 node_process_monitor_create(node.clone(), reference, dest, from);
141
142 let node = node_register(node, true);
143 let monitor = Monitor::new(
144 true,
145 Some(pid.id()),
146 None,
147 Some(from.id()),
148 reference.id(),
149 );
150
151 node_send_frame(monitor.into(), node);
152 }
153 None => {
154 send_process_down(dest, ExitReason::from("noconnection"));
155 }
156 }
157 }
158 }
159 Dest::Named(name, node) => {
160 if node.is_local() {
161 let Some(pid) = process_name_lookup(name.as_ref()) else {
162 PROCESS.with(|process| {
163 process
164 .monitors
165 .borrow_mut()
166 .insert(reference, ProcessMonitor::ForProcess(None))
167 });
168 return send_process_down(dest, ExitReason::from("noproc"));
169 };
170
171 PROCESS.with(|process| {
172 process
173 .monitors
174 .borrow_mut()
175 .insert(reference, ProcessMonitor::ForProcess(Some(pid)))
176 });
177
178 process_exists_lock(pid, |exists| {
179 if exists {
180 monitor_create(pid, reference, from, Some(dest));
181 } else {
182 send_process_down(dest, ExitReason::from("noproc"));
183 }
184 });
185 } else {
186 PROCESS.with(|process| {
187 process
188 .monitors
189 .borrow_mut()
190 .insert(reference, ProcessMonitor::ForProcess(None))
191 });
192
193 node_process_monitor_create(node.clone(), reference, dest, from);
194
195 let node = node_register(node.clone(), true);
196 let monitor = Monitor::new(
197 true,
198 None,
199 Some(name.into_owned()),
200 Some(from.id()),
201 reference.id(),
202 );
203
204 node_send_frame(monitor.into(), node);
205 }
206 }
207 Dest::Alias(_) => panic!("Can not monitor an alias!"),
208 }
209}
210
211pub fn monitor_process_down(from: Pid, exit_reason: ExitReason) {
213 let Some(references) = MONITORS
214 .remove(&from.id())
215 .map(|(_, references)| references)
216 else {
217 return;
218 };
219
220 let mut remote_monitors: BTreeMap<u64, (MonitorDown, Vec<Reference>)> = BTreeMap::new();
221
222 for (reference, (pid, dest)) in references {
223 if pid.is_local() {
224 process_sender(pid).map(|sender| {
225 sender.send(ProcessItem::MonitorProcessDown(
226 dest.unwrap(),
227 reference,
228 exit_reason.clone(),
229 ))
230 });
231 } else {
232 let remote = remote_monitors
233 .entry(pid.node())
234 .or_insert((MonitorDown::new(exit_reason.clone()), Vec::new()));
235
236 remote.0.monitors.push(reference.id());
237 remote.1.push(reference);
238 }
239
240 if reference.is_local() {
241 alias_destroy(reference);
242 }
243 }
244
245 for (node, (monitor_down, references)) in remote_monitors {
246 if let Some((name, address)) = node_lookup_remote(node) {
247 node_process_monitor_destroy_all(Node::from((name, address)), references);
248 }
249
250 node_send_frame(monitor_down.into(), node);
251 }
252}
253
254pub fn monitor_fill_info(pid: Pid, info: &mut ProcessInfo) {
256 let Some(monitors) = MONITORS.get(&pid.id()) else {
257 return;
258 };
259
260 info.monitored_by = monitors.value().values().map(|entry| entry.0).collect();
261}