1use std::collections::BTreeMap;
2use std::collections::BTreeSet;
3use std::net::SocketAddr;
4use std::sync::Mutex;
5use std::sync::atomic::AtomicU64;
6use std::sync::atomic::Ordering;
7
8use dashmap::DashMap;
9use dashmap::mapref::entry::Entry;
10
11use once_cell::sync::Lazy;
12
13use crate::frame::Frame;
14
15use crate::Dest;
16use crate::ExitReason;
17use crate::Local;
18use crate::Node;
19use crate::NodeOptions;
20use crate::NodeRegistration;
21use crate::NodeRemoteSenderMessage;
22use crate::NodeState;
23use crate::Pid;
24use crate::Process;
25use crate::ProcessItem;
26use crate::Reference;
27use crate::alias_destroy;
28use crate::link_destroy;
29use crate::monitor_destroy;
30use crate::node_local_supervisor;
31use crate::node_remote_connector;
32use crate::process_exit_signal_linked;
33use crate::process_sender;
34
35pub const LOCAL_NODE_ID: u64 = 0;
37
38pub const INVALID_NODE_ID: u64 = u64::MAX;
40
41#[derive(Debug)]
43enum NodeMonitor {
44 Node(u64),
46 ProcessMonitor(u64, Dest),
48 ProcessMonitorCleanup(u64),
50}
51
52static NODE_REGISTRATIONS: Lazy<DashMap<u64, NodeRegistration>> = Lazy::new(DashMap::new);
56
57static NODE_MAP: Lazy<DashMap<Node, u64>> = Lazy::new(DashMap::new);
59
60static NODE_MONITORS: Lazy<DashMap<Node, BTreeMap<Reference, NodeMonitor>>> =
62 Lazy::new(DashMap::new);
63
64static NODE_LINKS: Lazy<DashMap<Node, BTreeSet<(Pid, u64)>>> = Lazy::new(DashMap::new);
66
67static NODE_PENDING_MESSAGES: Lazy<DashMap<Node, Vec<Frame>>> = Lazy::new(DashMap::new);
69
70static NODE_COOKIE: Mutex<Option<String>> = Mutex::new(None);
72
73static NODE_ID: AtomicU64 = AtomicU64::new(1);
75
76pub fn node_alive() -> bool {
78 NODE_MAP.contains_key(&Node::Local)
79}
80
81pub fn node_local_start(name: String, options: NodeOptions) -> Pid {
83 let Entry::Vacant(entry) = NODE_MAP.entry(Node::Local) else {
84 panic!("Local node already started!");
85 };
86
87 let supervisor = Process::spawn(node_local_supervisor(name.clone(), options));
88
89 NODE_REGISTRATIONS.insert(
90 LOCAL_NODE_ID,
91 NodeRegistration::new(
92 Some(supervisor),
93 NodeState::Current,
94 name,
95 options.broadcast_address,
96 ),
97 );
98
99 entry.insert(LOCAL_NODE_ID);
100
101 supervisor
102}
103
104pub fn node_local_stop() {
106 let Some((_, _)) = NODE_MAP.remove(&Node::Local) else {
107 panic!("Local node not started!");
108 };
109
110 NODE_MAP.clear();
111
112 if let Some(entry) = NODE_REGISTRATIONS.get(&LOCAL_NODE_ID)
113 && let Some(supervisor) = entry.supervisor
114 {
115 Process::exit(supervisor, ExitReason::Kill);
116 }
117
118 NODE_REGISTRATIONS.clear();
119 NODE_PENDING_MESSAGES.clear();
120}
121
122pub fn node_local_panic() {
124 NODE_MAP.clear();
125 NODE_REGISTRATIONS.clear();
126 NODE_PENDING_MESSAGES.clear();
127}
128
129pub fn node_local_process() -> Option<Pid> {
131 NODE_REGISTRATIONS
132 .get(&LOCAL_NODE_ID)
133 .and_then(|process| process.supervisor)
134}
135
136pub fn node_register_workers(node: Node, sender: Pid, receiver: Pid) {
138 let Some(entry) = NODE_MAP.get(&node) else {
139 return;
140 };
141
142 NODE_REGISTRATIONS.alter(&entry, |_, mut value| {
143 value.sender = Some(sender);
144 value.receiver = Some(receiver);
145
146 let frames = NODE_PENDING_MESSAGES
147 .remove(&node)
148 .map(|pending| pending.1)
149 .unwrap_or_default();
150
151 Process::send(
154 sender,
155 NodeRemoteSenderMessage::SendFrames(Local::new(frames)),
156 );
157
158 value
159 });
160}
161
162pub fn node_send_frame(frame: Frame, id: u64) {
164 let Some(registration) = NODE_REGISTRATIONS.get(&id) else {
165 return;
166 };
167
168 if let Some(sender) = registration.sender {
169 Process::send(
170 sender,
171 NodeRemoteSenderMessage::SendFrame(Local::new(frame)),
172 );
173 } else if !matches!(registration.state, NodeState::Known) {
174 NODE_PENDING_MESSAGES
175 .entry(Node::from((
176 registration.name.clone(),
177 registration.broadcast_address,
178 )))
179 .or_default()
180 .push(frame);
181 }
182}
183
184pub fn node_accept(node: Node, supervisor: Pid) -> bool {
186 let Node::Remote(name, address) = node else {
187 panic!("Can't accept a local node!");
188 };
189
190 let entry = NODE_MAP.entry(Node::from((name.clone(), address)));
191
192 match entry {
193 Entry::Vacant(entry) => {
194 let next_id = NODE_ID.fetch_add(1, Ordering::Relaxed);
195
196 NODE_REGISTRATIONS.insert(
197 next_id,
198 NodeRegistration::new(Some(supervisor), NodeState::Connected, name, address),
199 );
200
201 entry.insert(next_id);
202
203 true
204 }
205 Entry::Occupied(entry) => {
206 let mut accepted = false;
207
208 NODE_REGISTRATIONS.alter(entry.get(), |_, mut value| {
209 if matches!(value.state, NodeState::Pending)
210 && let Some(current_supervisor) = value.supervisor.take()
211 && supervisor != current_supervisor
212 {
213 Process::exit(current_supervisor, ExitReason::Kill);
214 }
215
216 if value.supervisor.is_none() {
217 accepted = true;
218
219 value.supervisor = Some(supervisor);
220 value.state = NodeState::Connected;
221 }
222
223 value
224 });
225
226 accepted
227 }
228 }
229}
230
231pub fn node_register(node: Node, connect: bool) -> u64 {
233 let Node::Remote(name, address) = node else {
234 panic!("Can't register a local node!");
235 };
236
237 let node = Node::from((name.clone(), address));
238
239 let entry = match NODE_MAP.entry(node.clone()) {
240 Entry::Vacant(entry) => entry,
241 Entry::Occupied(entry) => {
242 let id = *entry.get();
243
244 if connect {
245 NODE_REGISTRATIONS.alter(&id, |_, mut value| {
246 if value.supervisor.is_none() {
247 value.supervisor = Some(Process::spawn(node_remote_connector(node)));
248 value.state = NodeState::Pending;
249 }
250
251 value
252 });
253 }
254
255 return id;
256 }
257 };
258
259 let next_id = NODE_ID.fetch_add(1, Ordering::Relaxed);
260
261 if connect {
262 let supervisor = Process::spawn(node_remote_connector(node));
263
264 NODE_REGISTRATIONS.insert(
265 next_id,
266 NodeRegistration::new(Some(supervisor), NodeState::Pending, name, address),
267 );
268 } else {
269 NODE_REGISTRATIONS.insert(
270 next_id,
271 NodeRegistration::new(None, NodeState::Known, name, address),
272 );
273 }
274
275 entry.insert(next_id);
276
277 next_id
278}
279
280pub fn node_remote_supervisor_down(node: Node, process: Pid) {
282 let Some(id) = NODE_MAP.get(&node) else {
283 return;
284 };
285
286 NODE_REGISTRATIONS.alter(&id, |_, mut value| {
287 if value
288 .supervisor
289 .is_some_and(|supervisor| supervisor != process)
290 {
291 return value;
292 }
293
294 value.supervisor = None;
295 value.sender = None;
296 value.receiver = None;
297 value.state = NodeState::Known;
298
299 if let Some((_, links)) = NODE_LINKS.remove(&node) {
300 for (from, process_id) in links {
301 let process = Pid::local(process_id);
302
303 link_destroy(process, from);
304
305 process_exit_signal_linked(process, from, ExitReason::from("noconnection"));
306 }
307 }
308
309 if let Some((_, monitors)) = NODE_MONITORS.remove(&node) {
310 for (reference, monitor) in monitors {
311 match monitor {
312 NodeMonitor::Node(id) => {
313 process_sender(Pid::local(id)).map(|sender| {
314 sender.send(ProcessItem::MonitorNodeDown(node.clone(), reference))
315 });
316 }
317 NodeMonitor::ProcessMonitor(id, dest) => {
318 process_sender(Pid::local(id)).map(|sender| {
319 sender.send(ProcessItem::MonitorProcessDown(
320 dest,
321 reference,
322 ExitReason::from("noconnection"),
323 ))
324 });
325 }
326 NodeMonitor::ProcessMonitorCleanup(id) => {
327 monitor_destroy(Pid::local(id), reference);
328 }
329 }
330
331 if reference.is_local() {
332 alias_destroy(reference);
333 }
334 }
335 }
336
337 NODE_PENDING_MESSAGES.remove(&node);
338
339 value
340 });
341}
342
343pub fn node_list() -> Vec<Node> {
345 NODE_MAP
346 .iter()
347 .filter_map(|entry| {
348 if matches!(entry.key(), Node::Local) {
349 None
350 } else {
351 Some(entry.key().clone())
352 }
353 })
354 .collect()
355}
356
357pub fn node_list_filtered(state: NodeState) -> Vec<Node> {
359 NODE_REGISTRATIONS
360 .iter()
361 .filter_map(|entry| {
362 if entry.state == state {
363 Some(Node::from((entry.name.clone(), entry.broadcast_address)))
364 } else {
365 None
366 }
367 })
368 .collect()
369}
370
371pub fn node_disconnect(node: Node) {
373 let Some(id) = NODE_MAP.get(&node) else {
374 return;
375 };
376
377 NODE_REGISTRATIONS.alter(&id, |_, mut value| {
378 NODE_PENDING_MESSAGES.remove(&node);
379
380 if let Some(supervisor) = value.supervisor.take() {
381 Process::exit(supervisor, ExitReason::Kill);
382 }
383
384 value.state = NodeState::Known;
385 value
386 });
387}
388
389pub fn node_forget(node: Node) {
391 let Some((_, id)) = NODE_MAP.remove(&node) else {
392 return;
393 };
394
395 let Some((_, registration)) = NODE_REGISTRATIONS.remove(&id) else {
396 return;
397 };
398
399 NODE_PENDING_MESSAGES.remove(&node);
400
401 if let Some(supervisor) = registration.supervisor {
402 Process::exit(supervisor, ExitReason::Kill);
403 }
404}
405
406pub fn node_lookup_local() -> Option<(String, SocketAddr)> {
408 NODE_REGISTRATIONS
409 .get(&LOCAL_NODE_ID)
410 .map(|registration| (registration.name.clone(), registration.broadcast_address))
411}
412
413pub fn node_lookup_remote(id: u64) -> Option<(String, SocketAddr)> {
415 NODE_REGISTRATIONS
416 .get(&id)
417 .map(|registration| (registration.name.clone(), registration.broadcast_address))
418}
419
420pub fn node_monitor_create(node: Node, reference: Reference, from: Pid) {
422 NODE_MONITORS
423 .entry(node)
424 .or_default()
425 .insert(reference, NodeMonitor::Node(from.id()));
426}
427
428pub fn node_process_monitor_create(node: Node, reference: Reference, dest: Dest, from: Pid) {
430 NODE_MONITORS
431 .entry(node)
432 .or_default()
433 .insert(reference, NodeMonitor::ProcessMonitor(from.id(), dest));
434}
435
436pub fn node_process_monitor_cleanup(node: Node, reference: Reference, process: Pid) {
438 NODE_MONITORS
439 .entry(node)
440 .or_default()
441 .insert(reference, NodeMonitor::ProcessMonitorCleanup(process.id()));
442}
443
444pub fn node_process_monitor_destroy(node: Node, reference: Reference) {
446 NODE_MONITORS.alter(&node, |_, mut value| {
447 value.remove(&reference);
448 value
449 });
450}
451
452pub fn node_process_monitor_destroy_all(node: Node, references: Vec<Reference>) {
454 NODE_MONITORS.alter(&node, |_, mut value| {
455 for reference in references {
456 value.remove(&reference);
457 }
458 value
459 });
460}
461
462pub fn node_process_link_destroy(node: Node, link: Pid, from: Pid) {
464 NODE_LINKS.alter(&node, |_, mut value| {
465 value.remove(&(link, from.id()));
466 value
467 });
468}
469
470pub fn node_process_link_destroy_all(node: Node, links: Vec<Pid>, from: Pid) {
472 NODE_LINKS.alter(&node, |_, mut value| {
473 for link in links {
474 value.remove(&(link, from.id()));
475 }
476 value
477 });
478}
479
480pub fn node_process_link_create(node: Node, process: Pid, from: Pid) {
482 NODE_LINKS
483 .entry(node)
484 .or_default()
485 .insert((process, from.id()));
486}
487
488pub fn node_monitor_destroy(node: Node, reference: Reference) {
490 NODE_MONITORS.alter(&node, |_, mut value| {
491 value.remove(&reference);
492 value
493 });
494}
495
496pub fn node_link_destroy(node: Node, process: Pid, from: Pid) {
498 NODE_LINKS.alter(&node, |_, mut value| {
499 value.remove(&(process, from.id()));
500 value
501 });
502}
503
504pub fn node_process_monitor_down(node: Node, reference: Reference, exit_reason: ExitReason) {
506 let mut monitor: Option<NodeMonitor> = None;
507
508 NODE_MONITORS.alter(&node, |_, mut value| {
509 monitor = value.remove(&reference);
510
511 value
512 });
513
514 alias_destroy(reference);
515
516 if let Some(NodeMonitor::ProcessMonitor(id, dest)) = monitor {
517 process_sender(Pid::local(id)).map(|sender| {
518 sender.send(ProcessItem::MonitorProcessDown(
519 dest,
520 reference,
521 exit_reason,
522 ))
523 });
524 }
525}
526
527pub fn node_process_link_down(node: Node, process: Pid, from: Pid, exit_reason: ExitReason) {
529 let mut found = false;
530
531 NODE_LINKS.alter(&node, |_, mut value| {
532 found = value.remove(&(from, process.id()));
533 value
534 });
535
536 if found {
537 process_exit_signal_linked(process, from, exit_reason);
538 }
539}
540
541pub fn node_get_cookie() -> Option<String> {
543 NODE_COOKIE.lock().unwrap().clone()
544}
545
546pub fn node_set_cookie(cookie: Option<String>) {
548 *NODE_COOKIE.lock().unwrap() = cookie;
549}