hydra/
process_registry.rs1use std::time::Duration;
2use std::time::Instant;
3
4use dashmap::DashMap;
5use dashmap::mapref::entry::Entry;
6
7use once_cell::sync::Lazy;
8
9use tokio::task::JoinHandle;
10
11use crate::ArgumentError;
12use crate::ExitReason;
13use crate::Pid;
14use crate::ProcessFlags;
15use crate::ProcessInfo;
16use crate::ProcessItem;
17use crate::ProcessRegistration;
18use crate::ProcessSend;
19use crate::Reference;
20use crate::SystemMessage;
21
22static PROCESS_REGISTRY: Lazy<DashMap<u64, ProcessRegistration>> = Lazy::new(DashMap::new);
24static PROCESS_NAMES: Lazy<DashMap<String, u64>> = Lazy::new(DashMap::new);
26static PROCESS_TIMERS: Lazy<DashMap<u64, (Instant, JoinHandle<()>)>> = Lazy::new(DashMap::new);
28
29pub fn process_exists_lock<C: FnOnce(bool) -> R, R>(pid: Pid, callback: C) -> R {
33 if PROCESS_REGISTRY.get(&pid.id()).is_some() {
34 callback(true)
35 } else {
36 callback(false)
37 }
38}
39
40pub fn process_drop(pid: Pid) -> Option<ProcessRegistration> {
42 PROCESS_REGISTRY
43 .remove(&pid.id())
44 .map(|(_, process)| process)
45}
46
47pub fn process_sender(pid: Pid) -> Option<ProcessSend> {
49 PROCESS_REGISTRY
50 .get(&pid.id())
51 .map(|process| process.sender.clone())
52}
53
54pub fn process_name_lookup(name: &str) -> Option<Pid> {
56 PROCESS_NAMES
57 .get(name)
58 .map(|process_id| Pid::local(*process_id))
59}
60
61pub fn process_insert(id: u64, registration: ProcessRegistration) {
63 PROCESS_REGISTRY.insert(id, registration);
64}
65
66pub fn process_name_remove(name: &str) {
68 PROCESS_NAMES.remove(name);
69}
70
71pub fn process_alive(pid: Pid) -> bool {
73 if pid.is_remote() {
74 panic!("Expected a local pid!");
75 }
76
77 PROCESS_REGISTRY
78 .get(&pid.id())
79 .map(|process| process.exit_reason.is_none())
80 .unwrap_or_default()
81}
82
83pub fn process_register(pid: Pid, name: String) -> Result<(), ArgumentError> {
85 if pid.is_remote() {
86 return Err(ArgumentError::from("Expected local pid for register!"));
87 }
88
89 let entry = PROCESS_NAMES.entry(name.clone());
90
91 let entry = match entry {
92 Entry::Occupied(entry) => {
93 if *entry.get() == pid.id() {
94 return Ok(());
95 } else {
96 return Err(ArgumentError::from(format!(
97 "Name {:?} registered to another process!",
98 name
99 )));
100 }
101 }
102 Entry::Vacant(entry) => entry,
103 };
104
105 let mut updated = false;
106 let mut found = false;
107
108 PROCESS_REGISTRY.alter(&pid.id(), |_, mut process| {
109 found = true;
110
111 if process.name.is_none() {
112 process.name = Some(name);
113 updated = true;
114 }
115
116 process
117 });
118
119 if !found {
120 return Err(ArgumentError::from("Process does not exist!"));
121 }
122
123 if !updated {
124 return Err(ArgumentError::from(format!(
125 "Process {:?} was already registered!",
126 pid
127 )));
128 }
129
130 entry.insert(pid.id());
131
132 Ok(())
133}
134
135pub fn process_unregister(name: &str) {
137 let Some((_, pid)) = PROCESS_NAMES.remove(name) else {
138 panic!("Name {:?} was not registered!", name);
139 };
140
141 PROCESS_REGISTRY.alter(&pid, |_, mut process| {
142 process.name = None;
143 process
144 });
145}
146
147pub fn process_exit(pid: Pid, from: Pid, exit_reason: ExitReason) {
149 PROCESS_REGISTRY.alter(&pid.id(), |_, mut process| {
150 let trapping_exits = process.flags.contains(ProcessFlags::TRAP_EXIT);
151
152 match exit_reason {
153 ExitReason::Normal | ExitReason::Ignore => {
154 if pid == from {
155 process.exit_reason = Some(exit_reason);
156 process.handle.abort();
157 } else if trapping_exits {
158 process
159 .sender
160 .send(ProcessItem::SystemMessage(SystemMessage::Exit(
161 from,
162 exit_reason,
163 )))
164 .unwrap();
165 }
166 }
167 ExitReason::Kill => {
168 process.exit_reason = Some(exit_reason);
169 process.handle.abort();
170 }
171 ExitReason::Custom(_) => {
172 if pid == from || !trapping_exits {
173 process.exit_reason = Some(exit_reason);
174 process.handle.abort();
175 } else {
176 process
177 .sender
178 .send(ProcessItem::SystemMessage(SystemMessage::Exit(
179 from,
180 exit_reason,
181 )))
182 .unwrap();
183 }
184 }
185 }
186
187 process
188 });
189}
190
191pub fn process_exit_signal_linked(pid: Pid, from: Pid, exit_reason: ExitReason) {
193 PROCESS_REGISTRY.alter(&pid.id(), |_, mut process| {
194 if process.flags.contains(ProcessFlags::TRAP_EXIT) {
195 process
196 .sender
197 .send(ProcessItem::SystemMessage(SystemMessage::Exit(
198 from,
199 exit_reason.clone(),
200 )))
201 .unwrap();
202 } else if !exit_reason.is_normal() {
203 process.exit_reason = Some(exit_reason);
204 process.handle.abort();
205 }
206
207 process
208 });
209}
210
211pub fn process_flags(pid: Pid) -> Option<ProcessFlags> {
213 PROCESS_REGISTRY.get(&pid.id()).map(|process| process.flags)
214}
215
216pub fn process_set_flags(pid: Pid, flags: ProcessFlags) {
218 PROCESS_REGISTRY.alter(&pid.id(), |_, mut process| {
219 process.flags = flags;
220 process
221 });
222}
223
224pub fn process_set_exit_reason(pid: Pid, exit_reason: ExitReason) {
226 PROCESS_REGISTRY.alter(&pid.id(), |_, mut process| {
227 process.exit_reason = Some(exit_reason);
228 process
229 });
230}
231
232pub fn process_list() -> Vec<Pid> {
234 PROCESS_REGISTRY
235 .iter()
236 .map(|process| Pid::local(*process.key()))
237 .collect()
238}
239
240pub fn process_name_list() -> Vec<String> {
242 PROCESS_NAMES
243 .iter()
244 .map(|value| value.key().to_owned())
245 .collect()
246}
247
248pub fn process_register_timer(timer: Reference, duration: Duration, handle: JoinHandle<()>) {
250 PROCESS_TIMERS.insert(timer.id(), (Instant::now() + duration, handle));
251}
252
253pub fn process_read_timer(timer: Reference) -> Option<Duration> {
255 let time = PROCESS_TIMERS.get(&timer.id())?;
256
257 let now = Instant::now();
258 let timer = time.value().0;
259
260 if timer <= now {
261 Some(Duration::from_secs(0))
262 } else {
263 Some(timer - now)
264 }
265}
266
267pub fn process_destroy_timer(timer: Reference) {
269 if let Some((_, (_, handle))) = PROCESS_TIMERS.remove(&timer.id()) {
270 handle.abort();
271 }
272}
273
274pub fn process_info(pid: Pid) -> Option<ProcessInfo> {
276 let process = PROCESS_REGISTRY.get(&pid.id())?;
277
278 let mut info = ProcessInfo::new();
279
280 info.registered_name.clone_from(&process.name);
281 info.message_queue_len = process.sender.len();
282 info.trap_exit = process.flags.contains(ProcessFlags::TRAP_EXIT);
283
284 Some(info)
285}
286
287#[cfg(feature = "console")]
289pub fn process_len() -> usize {
290 PROCESS_REGISTRY.len()
291}