Skip to main content

hydra/
node_registry.rs

1use std::collections::BTreeMap;
2use std::collections::BTreeSet;
3use std::net::SocketAddr;
4use std::sync::Mutex;
5use std::sync::atomic::AtomicU64;
6use std::sync::atomic::Ordering;
7
8use dashmap::DashMap;
9use dashmap::mapref::entry::Entry;
10
11use once_cell::sync::Lazy;
12
13use crate::frame::Frame;
14
15use crate::Dest;
16use crate::ExitReason;
17use crate::Local;
18use crate::Node;
19use crate::NodeOptions;
20use crate::NodeRegistration;
21use crate::NodeRemoteSenderMessage;
22use crate::NodeState;
23use crate::Pid;
24use crate::Process;
25use crate::ProcessItem;
26use crate::Reference;
27use crate::alias_destroy;
28use crate::link_destroy;
29use crate::monitor_destroy;
30use crate::node_local_supervisor;
31use crate::node_remote_connector;
32use crate::process_exit_signal_linked;
33use crate::process_sender;
34
35/// Represents the node id always used for the local node.
36pub const LOCAL_NODE_ID: u64 = 0;
37
38/// Represents a node id that will never be allocated.
39pub const INVALID_NODE_ID: u64 = u64::MAX;
40
41/// The type of node monitor that was installed.
42#[derive(Debug)]
43enum NodeMonitor {
44    /// The monitor is explicitly for the node itself.
45    Node(u64),
46    /// The monitor is installed on behalf of a remote process monitor.
47    ProcessMonitor(u64, Dest),
48    /// The monitor is installed on behalf of a remote process monitor for cleanup.
49    ProcessMonitorCleanup(u64),
50}
51
52// When a pid is serialized over the wire, we need to lookup it's node@ip:port combination.
53// If it's already in the registry, we need to get it's node id, else
54// we need to get it's thing.
55static NODE_REGISTRATIONS: Lazy<DashMap<u64, NodeRegistration>> = Lazy::new(DashMap::new);
56
57/// A collection of node:id into the node registrations.
58static NODE_MAP: Lazy<DashMap<Node, u64>> = Lazy::new(DashMap::new);
59
60/// A collection of node monitors installed.
61static NODE_MONITORS: Lazy<DashMap<Node, BTreeMap<Reference, NodeMonitor>>> =
62    Lazy::new(DashMap::new);
63
64/// A collection of node links installed.
65static NODE_LINKS: Lazy<DashMap<Node, BTreeSet<(Pid, u64)>>> = Lazy::new(DashMap::new);
66
67/// A collection of node:vec<msg> pending messages for a node.
68static NODE_PENDING_MESSAGES: Lazy<DashMap<Node, Vec<Frame>>> = Lazy::new(DashMap::new);
69
70/// A secret value that secures the connection between nodes.
71static NODE_COOKIE: Mutex<Option<String>> = Mutex::new(None);
72
73/// The next id for this node.
74static NODE_ID: AtomicU64 = AtomicU64::new(1);
75
76/// Returns `true` if the local node is alive.
77pub fn node_alive() -> bool {
78    NODE_MAP.contains_key(&Node::Local)
79}
80
81/// Starts the local node.
82pub fn node_local_start(name: String, options: NodeOptions) -> Pid {
83    let Entry::Vacant(entry) = NODE_MAP.entry(Node::Local) else {
84        panic!("Local node already started!");
85    };
86
87    let supervisor = Process::spawn(node_local_supervisor(name.clone(), options));
88
89    NODE_REGISTRATIONS.insert(
90        LOCAL_NODE_ID,
91        NodeRegistration::new(
92            Some(supervisor),
93            NodeState::Current,
94            name,
95            options.broadcast_address,
96        ),
97    );
98
99    entry.insert(LOCAL_NODE_ID);
100
101    supervisor
102}
103
104/// Stops the local node, forgetting all nodes.
105pub fn node_local_stop() {
106    let Some((_, _)) = NODE_MAP.remove(&Node::Local) else {
107        panic!("Local node not started!");
108    };
109
110    NODE_MAP.clear();
111
112    if let Some(entry) = NODE_REGISTRATIONS.get(&LOCAL_NODE_ID)
113        && let Some(supervisor) = entry.supervisor
114    {
115        Process::exit(supervisor, ExitReason::Kill);
116    }
117
118    NODE_REGISTRATIONS.clear();
119    NODE_PENDING_MESSAGES.clear();
120}
121
122/// Cleans up distribution information when the local node goes down unexpectedly.
123pub fn node_local_panic() {
124    NODE_MAP.clear();
125    NODE_REGISTRATIONS.clear();
126    NODE_PENDING_MESSAGES.clear();
127}
128
129/// Returns the process responsible for the local node.
130pub fn node_local_process() -> Option<Pid> {
131    NODE_REGISTRATIONS
132        .get(&LOCAL_NODE_ID)
133        .and_then(|process| process.supervisor)
134}
135
136/// Sets available worker processes for a node, then, drains any pending messages to them.
137pub fn node_register_workers(node: Node, sender: Pid, receiver: Pid) {
138    let Some(entry) = NODE_MAP.get(&node) else {
139        return;
140    };
141
142    NODE_REGISTRATIONS.alter(&entry, |_, mut value| {
143        value.sender = Some(sender);
144        value.receiver = Some(receiver);
145
146        let frames = NODE_PENDING_MESSAGES
147            .remove(&node)
148            .map(|pending| pending.1)
149            .unwrap_or_default();
150
151        // We need to pop the pending messages and send them to the sender
152        // This way, the order for sent messages is maintained and all future messages go direct to the sender.
153        Process::send(
154            sender,
155            NodeRemoteSenderMessage::SendFrames(Local::new(frames)),
156        );
157
158        value
159    });
160}
161
162/// Gets the send process for a given node.
163pub fn node_send_frame(frame: Frame, id: u64) {
164    let Some(registration) = NODE_REGISTRATIONS.get(&id) else {
165        return;
166    };
167
168    if let Some(sender) = registration.sender {
169        Process::send(
170            sender,
171            NodeRemoteSenderMessage::SendFrame(Local::new(frame)),
172        );
173    } else if !matches!(registration.state, NodeState::Known) {
174        NODE_PENDING_MESSAGES
175            .entry(Node::from((
176                registration.name.clone(),
177                registration.broadcast_address,
178            )))
179            .or_default()
180            .push(frame);
181    }
182}
183
184/// Accepts a remote node's connection if one doesn't exist, returns `true` if accepted.
185pub fn node_accept(node: Node, supervisor: Pid) -> bool {
186    let Node::Remote(name, address) = node else {
187        panic!("Can't accept a local node!");
188    };
189
190    let entry = NODE_MAP.entry(Node::from((name.clone(), address)));
191
192    match entry {
193        Entry::Vacant(entry) => {
194            let next_id = NODE_ID.fetch_add(1, Ordering::Relaxed);
195
196            NODE_REGISTRATIONS.insert(
197                next_id,
198                NodeRegistration::new(Some(supervisor), NodeState::Connected, name, address),
199            );
200
201            entry.insert(next_id);
202
203            true
204        }
205        Entry::Occupied(entry) => {
206            let mut accepted = false;
207
208            NODE_REGISTRATIONS.alter(entry.get(), |_, mut value| {
209                if matches!(value.state, NodeState::Pending)
210                    && let Some(current_supervisor) = value.supervisor.take()
211                    && supervisor != current_supervisor
212                {
213                    Process::exit(current_supervisor, ExitReason::Kill);
214                }
215
216                if value.supervisor.is_none() {
217                    accepted = true;
218
219                    value.supervisor = Some(supervisor);
220                    value.state = NodeState::Connected;
221                }
222
223                value
224            });
225
226            accepted
227        }
228    }
229}
230
231/// Registers a remote node's information, or returns an existing one.
232pub fn node_register(node: Node, connect: bool) -> u64 {
233    let Node::Remote(name, address) = node else {
234        panic!("Can't register a local node!");
235    };
236
237    let node = Node::from((name.clone(), address));
238
239    let entry = match NODE_MAP.entry(node.clone()) {
240        Entry::Vacant(entry) => entry,
241        Entry::Occupied(entry) => {
242            let id = *entry.get();
243
244            if connect {
245                NODE_REGISTRATIONS.alter(&id, |_, mut value| {
246                    if value.supervisor.is_none() {
247                        value.supervisor = Some(Process::spawn(node_remote_connector(node)));
248                        value.state = NodeState::Pending;
249                    }
250
251                    value
252                });
253            }
254
255            return id;
256        }
257    };
258
259    let next_id = NODE_ID.fetch_add(1, Ordering::Relaxed);
260
261    if connect {
262        let supervisor = Process::spawn(node_remote_connector(node));
263
264        NODE_REGISTRATIONS.insert(
265            next_id,
266            NodeRegistration::new(Some(supervisor), NodeState::Pending, name, address),
267        );
268    } else {
269        NODE_REGISTRATIONS.insert(
270            next_id,
271            NodeRegistration::new(None, NodeState::Known, name, address),
272        );
273    }
274
275    entry.insert(next_id);
276
277    next_id
278}
279
280/// Triggered when a remote node supervisor goes down unexpectedly.
281pub fn node_remote_supervisor_down(node: Node, process: Pid) {
282    let Some(id) = NODE_MAP.get(&node) else {
283        return;
284    };
285
286    NODE_REGISTRATIONS.alter(&id, |_, mut value| {
287        if value
288            .supervisor
289            .is_some_and(|supervisor| supervisor != process)
290        {
291            return value;
292        }
293
294        value.supervisor = None;
295        value.sender = None;
296        value.receiver = None;
297        value.state = NodeState::Known;
298
299        if let Some((_, links)) = NODE_LINKS.remove(&node) {
300            for (from, process_id) in links {
301                let process = Pid::local(process_id);
302
303                link_destroy(process, from);
304
305                process_exit_signal_linked(process, from, ExitReason::from("noconnection"));
306            }
307        }
308
309        if let Some((_, monitors)) = NODE_MONITORS.remove(&node) {
310            for (reference, monitor) in monitors {
311                match monitor {
312                    NodeMonitor::Node(id) => {
313                        process_sender(Pid::local(id)).map(|sender| {
314                            sender.send(ProcessItem::MonitorNodeDown(node.clone(), reference))
315                        });
316                    }
317                    NodeMonitor::ProcessMonitor(id, dest) => {
318                        process_sender(Pid::local(id)).map(|sender| {
319                            sender.send(ProcessItem::MonitorProcessDown(
320                                dest,
321                                reference,
322                                ExitReason::from("noconnection"),
323                            ))
324                        });
325                    }
326                    NodeMonitor::ProcessMonitorCleanup(id) => {
327                        monitor_destroy(Pid::local(id), reference);
328                    }
329                }
330
331                if reference.is_local() {
332                    alias_destroy(reference);
333                }
334            }
335        }
336
337        NODE_PENDING_MESSAGES.remove(&node);
338
339        value
340    });
341}
342
343/// Returns the node list excluding the local node.
344pub fn node_list() -> Vec<Node> {
345    NODE_MAP
346        .iter()
347        .filter_map(|entry| {
348            if matches!(entry.key(), Node::Local) {
349                None
350            } else {
351                Some(entry.key().clone())
352            }
353        })
354        .collect()
355}
356
357/// Returns the node list filtered to the given node state.
358pub fn node_list_filtered(state: NodeState) -> Vec<Node> {
359    NODE_REGISTRATIONS
360        .iter()
361        .filter_map(|entry| {
362            if entry.state == state {
363                Some(Node::from((entry.name.clone(), entry.broadcast_address)))
364            } else {
365                None
366            }
367        })
368        .collect()
369}
370
371/// Disconnects a connected node, leaving it as a known node.
372pub fn node_disconnect(node: Node) {
373    let Some(id) = NODE_MAP.get(&node) else {
374        return;
375    };
376
377    NODE_REGISTRATIONS.alter(&id, |_, mut value| {
378        NODE_PENDING_MESSAGES.remove(&node);
379
380        if let Some(supervisor) = value.supervisor.take() {
381            Process::exit(supervisor, ExitReason::Kill);
382        }
383
384        value.state = NodeState::Known;
385        value
386    });
387}
388
389/// Disconnects and forgets a node completely.
390pub fn node_forget(node: Node) {
391    let Some((_, id)) = NODE_MAP.remove(&node) else {
392        return;
393    };
394
395    let Some((_, registration)) = NODE_REGISTRATIONS.remove(&id) else {
396        return;
397    };
398
399    NODE_PENDING_MESSAGES.remove(&node);
400
401    if let Some(supervisor) = registration.supervisor {
402        Process::exit(supervisor, ExitReason::Kill);
403    }
404}
405
406/// Looks up the node information for the local node.
407pub fn node_lookup_local() -> Option<(String, SocketAddr)> {
408    NODE_REGISTRATIONS
409        .get(&LOCAL_NODE_ID)
410        .map(|registration| (registration.name.clone(), registration.broadcast_address))
411}
412
413/// Looks up the node information for a remote node id.
414pub fn node_lookup_remote(id: u64) -> Option<(String, SocketAddr)> {
415    NODE_REGISTRATIONS
416        .get(&id)
417        .map(|registration| (registration.name.clone(), registration.broadcast_address))
418}
419
420/// Creates a monitor for the given node and reference from the given process.
421pub fn node_monitor_create(node: Node, reference: Reference, from: Pid) {
422    NODE_MONITORS
423        .entry(node)
424        .or_default()
425        .insert(reference, NodeMonitor::Node(from.id()));
426}
427
428/// Creates a monitor for the given node and reference from the given process for dest.
429pub fn node_process_monitor_create(node: Node, reference: Reference, dest: Dest, from: Pid) {
430    NODE_MONITORS
431        .entry(node)
432        .or_default()
433        .insert(reference, NodeMonitor::ProcessMonitor(from.id(), dest));
434}
435
436/// Creates a monitor cleanup for the given node and reference from the given process.
437pub fn node_process_monitor_cleanup(node: Node, reference: Reference, process: Pid) {
438    NODE_MONITORS
439        .entry(node)
440        .or_default()
441        .insert(reference, NodeMonitor::ProcessMonitorCleanup(process.id()));
442}
443
444/// Destroys a node process monitor for the given node and reference.
445pub fn node_process_monitor_destroy(node: Node, reference: Reference) {
446    NODE_MONITORS.alter(&node, |_, mut value| {
447        value.remove(&reference);
448        value
449    });
450}
451
452/// Destroys all process monitors for the given node by their references.
453pub fn node_process_monitor_destroy_all(node: Node, references: Vec<Reference>) {
454    NODE_MONITORS.alter(&node, |_, mut value| {
455        for reference in references {
456            value.remove(&reference);
457        }
458        value
459    });
460}
461
462/// Destroys a node process link for the given node by the link process.
463pub fn node_process_link_destroy(node: Node, link: Pid, from: Pid) {
464    NODE_LINKS.alter(&node, |_, mut value| {
465        value.remove(&(link, from.id()));
466        value
467    });
468}
469
470/// Destroys all process links for the given node by their link processes.
471pub fn node_process_link_destroy_all(node: Node, links: Vec<Pid>, from: Pid) {
472    NODE_LINKS.alter(&node, |_, mut value| {
473        for link in links {
474            value.remove(&(link, from.id()));
475        }
476        value
477    });
478}
479
480/// Creates a monitor for the given node and process from the given linked process.
481pub fn node_process_link_create(node: Node, process: Pid, from: Pid) {
482    NODE_LINKS
483        .entry(node)
484        .or_default()
485        .insert((process, from.id()));
486}
487
488/// Removes a monitor for the given node and reference.
489pub fn node_monitor_destroy(node: Node, reference: Reference) {
490    NODE_MONITORS.alter(&node, |_, mut value| {
491        value.remove(&reference);
492        value
493    });
494}
495
496/// Removes a link for the given node and process.
497pub fn node_link_destroy(node: Node, process: Pid, from: Pid) {
498    NODE_LINKS.alter(&node, |_, mut value| {
499        value.remove(&(process, from.id()));
500        value
501    });
502}
503
504/// Fires when a remote process has notified the local node that it went down for a monitor.
505pub fn node_process_monitor_down(node: Node, reference: Reference, exit_reason: ExitReason) {
506    let mut monitor: Option<NodeMonitor> = None;
507
508    NODE_MONITORS.alter(&node, |_, mut value| {
509        monitor = value.remove(&reference);
510
511        value
512    });
513
514    alias_destroy(reference);
515
516    if let Some(NodeMonitor::ProcessMonitor(id, dest)) = monitor {
517        process_sender(Pid::local(id)).map(|sender| {
518            sender.send(ProcessItem::MonitorProcessDown(
519                dest,
520                reference,
521                exit_reason,
522            ))
523        });
524    }
525}
526
527/// Fires when a remote process has notified the local node that it went down for a link.
528pub fn node_process_link_down(node: Node, process: Pid, from: Pid, exit_reason: ExitReason) {
529    let mut found = false;
530
531    NODE_LINKS.alter(&node, |_, mut value| {
532        found = value.remove(&(from, process.id()));
533        value
534    });
535
536    if found {
537        process_exit_signal_linked(process, from, exit_reason);
538    }
539}
540
541/// Gets the cookie secret value.
542pub fn node_get_cookie() -> Option<String> {
543    NODE_COOKIE.lock().unwrap().clone()
544}
545
546/// Sets or clears the cookie secret value.
547pub fn node_set_cookie(cookie: Option<String>) {
548    *NODE_COOKIE.lock().unwrap() = cookie;
549}