Skip to main content

hydra/
process_registry.rs

1use std::time::Duration;
2use std::time::Instant;
3
4use dashmap::DashMap;
5use dashmap::mapref::entry::Entry;
6
7use once_cell::sync::Lazy;
8
9use tokio::task::JoinHandle;
10
11use crate::ArgumentError;
12use crate::ExitReason;
13use crate::Pid;
14use crate::ProcessFlags;
15use crate::ProcessInfo;
16use crate::ProcessItem;
17use crate::ProcessRegistration;
18use crate::ProcessSend;
19use crate::Reference;
20use crate::SystemMessage;
21
22/// A collection of process id -> process registration.
23static PROCESS_REGISTRY: Lazy<DashMap<u64, ProcessRegistration>> = Lazy::new(DashMap::new);
24/// A collection of registered named processes.
25static PROCESS_NAMES: Lazy<DashMap<String, u64>> = Lazy::new(DashMap::new);
26/// A collection of process timers.
27static PROCESS_TIMERS: Lazy<DashMap<u64, (Instant, JoinHandle<()>)>> = Lazy::new(DashMap::new);
28
29/// Checks for the given `pid` and calls the given `callback` with the result if it exists or not.
30///
31/// The process will exist for the duration of `callback`.
32pub fn process_exists_lock<C: FnOnce(bool) -> R, R>(pid: Pid, callback: C) -> R {
33    if PROCESS_REGISTRY.get(&pid.id()).is_some() {
34        callback(true)
35    } else {
36        callback(false)
37    }
38}
39
40/// Drops a process from the registry.
41pub fn process_drop(pid: Pid) -> Option<ProcessRegistration> {
42    PROCESS_REGISTRY
43        .remove(&pid.id())
44        .map(|(_, process)| process)
45}
46
47/// Gets the sender for this process.
48pub fn process_sender(pid: Pid) -> Option<ProcessSend> {
49    PROCESS_REGISTRY
50        .get(&pid.id())
51        .map(|process| process.sender.clone())
52}
53
54/// Looks up a process by the given name.
55pub fn process_name_lookup(name: &str) -> Option<Pid> {
56    PROCESS_NAMES
57        .get(name)
58        .map(|process_id| Pid::local(*process_id))
59}
60
61/// Inserts a new process registration.
62pub fn process_insert(id: u64, registration: ProcessRegistration) {
63    PROCESS_REGISTRY.insert(id, registration);
64}
65
66/// Removes a registered name.
67pub fn process_name_remove(name: &str) {
68    PROCESS_NAMES.remove(name);
69}
70
71/// Checks if the process is alive.
72pub fn process_alive(pid: Pid) -> bool {
73    if pid.is_remote() {
74        panic!("Expected a local pid!");
75    }
76
77    PROCESS_REGISTRY
78        .get(&pid.id())
79        .map(|process| process.exit_reason.is_none())
80        .unwrap_or_default()
81}
82
83/// Registers a process under the given name.
84pub fn process_register(pid: Pid, name: String) -> Result<(), ArgumentError> {
85    if pid.is_remote() {
86        return Err(ArgumentError::from("Expected local pid for register!"));
87    }
88
89    let entry = PROCESS_NAMES.entry(name.clone());
90
91    let entry = match entry {
92        Entry::Occupied(entry) => {
93            if *entry.get() == pid.id() {
94                return Ok(());
95            } else {
96                return Err(ArgumentError::from(format!(
97                    "Name {:?} registered to another process!",
98                    name
99                )));
100            }
101        }
102        Entry::Vacant(entry) => entry,
103    };
104
105    let mut updated = false;
106    let mut found = false;
107
108    PROCESS_REGISTRY.alter(&pid.id(), |_, mut process| {
109        found = true;
110
111        if process.name.is_none() {
112            process.name = Some(name);
113            updated = true;
114        }
115
116        process
117    });
118
119    if !found {
120        return Err(ArgumentError::from("Process does not exist!"));
121    }
122
123    if !updated {
124        return Err(ArgumentError::from(format!(
125            "Process {:?} was already registered!",
126            pid
127        )));
128    }
129
130    entry.insert(pid.id());
131
132    Ok(())
133}
134
135/// Unregisters a process with the given name.
136pub fn process_unregister(name: &str) {
137    let Some((_, pid)) = PROCESS_NAMES.remove(name) else {
138        panic!("Name {:?} was not registered!", name);
139    };
140
141    PROCESS_REGISTRY.alter(&pid, |_, mut process| {
142        process.name = None;
143        process
144    });
145}
146
147/// Processes an exit signal for the given [Pid] with the `exit_reason`.
148pub fn process_exit(pid: Pid, from: Pid, exit_reason: ExitReason) {
149    PROCESS_REGISTRY.alter(&pid.id(), |_, mut process| {
150        let trapping_exits = process.flags.contains(ProcessFlags::TRAP_EXIT);
151
152        match exit_reason {
153            ExitReason::Normal | ExitReason::Ignore => {
154                if pid == from {
155                    process.exit_reason = Some(exit_reason);
156                    process.handle.abort();
157                } else if trapping_exits {
158                    process
159                        .sender
160                        .send(ProcessItem::SystemMessage(SystemMessage::Exit(
161                            from,
162                            exit_reason,
163                        )))
164                        .unwrap();
165                }
166            }
167            ExitReason::Kill => {
168                process.exit_reason = Some(exit_reason);
169                process.handle.abort();
170            }
171            ExitReason::Custom(_) => {
172                if pid == from || !trapping_exits {
173                    process.exit_reason = Some(exit_reason);
174                    process.handle.abort();
175                } else {
176                    process
177                        .sender
178                        .send(ProcessItem::SystemMessage(SystemMessage::Exit(
179                            from,
180                            exit_reason,
181                        )))
182                        .unwrap();
183                }
184            }
185        }
186
187        process
188    });
189}
190
191/// Forwards an exit signal to the linked [Pid] from the given [Pid] with the `exit_reason`.
192pub fn process_exit_signal_linked(pid: Pid, from: Pid, exit_reason: ExitReason) {
193    PROCESS_REGISTRY.alter(&pid.id(), |_, mut process| {
194        if process.flags.contains(ProcessFlags::TRAP_EXIT) {
195            process
196                .sender
197                .send(ProcessItem::SystemMessage(SystemMessage::Exit(
198                    from,
199                    exit_reason.clone(),
200                )))
201                .unwrap();
202        } else if !exit_reason.is_normal() {
203            process.exit_reason = Some(exit_reason);
204            process.handle.abort();
205        }
206
207        process
208    });
209}
210
211/// Returns the process flags for the given process.
212pub fn process_flags(pid: Pid) -> Option<ProcessFlags> {
213    PROCESS_REGISTRY.get(&pid.id()).map(|process| process.flags)
214}
215
216/// Sets the process flags.
217pub fn process_set_flags(pid: Pid, flags: ProcessFlags) {
218    PROCESS_REGISTRY.alter(&pid.id(), |_, mut process| {
219        process.flags = flags;
220        process
221    });
222}
223
224/// Sets the process exit reason.
225pub fn process_set_exit_reason(pid: Pid, exit_reason: ExitReason) {
226    PROCESS_REGISTRY.alter(&pid.id(), |_, mut process| {
227        process.exit_reason = Some(exit_reason);
228        process
229    });
230}
231
232/// Returns a list of processes.
233pub fn process_list() -> Vec<Pid> {
234    PROCESS_REGISTRY
235        .iter()
236        .map(|process| Pid::local(*process.key()))
237        .collect()
238}
239
240/// Returns a list of registered process names.
241pub fn process_name_list() -> Vec<String> {
242    PROCESS_NAMES
243        .iter()
244        .map(|value| value.key().to_owned())
245        .collect()
246}
247
248/// Registers a timer.
249pub fn process_register_timer(timer: Reference, duration: Duration, handle: JoinHandle<()>) {
250    PROCESS_TIMERS.insert(timer.id(), (Instant::now() + duration, handle));
251}
252
253/// Reads a timer.
254pub fn process_read_timer(timer: Reference) -> Option<Duration> {
255    let time = PROCESS_TIMERS.get(&timer.id())?;
256
257    let now = Instant::now();
258    let timer = time.value().0;
259
260    if timer <= now {
261        Some(Duration::from_secs(0))
262    } else {
263        Some(timer - now)
264    }
265}
266
267/// Unregisters and kills a timer.
268pub fn process_destroy_timer(timer: Reference) {
269    if let Some((_, (_, handle))) = PROCESS_TIMERS.remove(&timer.id()) {
270        handle.abort();
271    }
272}
273
274/// Gets process registry info.
275pub fn process_info(pid: Pid) -> Option<ProcessInfo> {
276    let process = PROCESS_REGISTRY.get(&pid.id())?;
277
278    let mut info = ProcessInfo::new();
279
280    info.registered_name.clone_from(&process.name);
281    info.message_queue_len = process.sender.len();
282    info.trap_exit = process.flags.contains(ProcessFlags::TRAP_EXIT);
283
284    Some(info)
285}
286
287/// The number of processes currently running.
288#[cfg(feature = "console")]
289pub fn process_len() -> usize {
290    PROCESS_REGISTRY.len()
291}