1use std::cell::RefCell;
2use std::collections::BTreeMap;
3use std::collections::BTreeSet;
4use std::future::Future;
5use std::panic::AssertUnwindSafe;
6use std::sync::atomic::AtomicU64;
7use std::sync::atomic::Ordering;
8use std::time::Duration;
9
10use flume::Receiver;
11use flume::Sender;
12
13use crate::ArgumentError;
14use crate::AsyncCatchUnwind;
15use crate::Dest;
16use crate::Dests;
17use crate::ExitReason;
18use crate::Message;
19use crate::Pid;
20use crate::ProcessFlags;
21use crate::ProcessInfo;
22use crate::ProcessItem;
23use crate::ProcessMonitor;
24use crate::ProcessReceiver;
25use crate::ProcessRegistration;
26use crate::Receivable;
27use crate::Reference;
28use crate::Timeout;
29use crate::alias_create;
30use crate::alias_destroy;
31use crate::alias_destroy_all;
32use crate::link_create;
33use crate::link_destroy;
34use crate::link_fill_info;
35use crate::link_install;
36use crate::link_process_down;
37use crate::monitor_create;
38use crate::monitor_destroy;
39use crate::monitor_destroy_all;
40use crate::monitor_fill_info;
41use crate::monitor_install;
42use crate::monitor_process_down;
43use crate::node_process_send_exit;
44use crate::process_alive;
45use crate::process_destroy_timer;
46use crate::process_drop;
47use crate::process_exit;
48use crate::process_flags;
49use crate::process_info;
50use crate::process_insert;
51use crate::process_list;
52use crate::process_name_list;
53use crate::process_name_lookup;
54use crate::process_name_remove;
55use crate::process_read_timer;
56use crate::process_register;
57use crate::process_register_timer;
58use crate::process_send;
59use crate::process_set_exit_reason;
60use crate::process_set_flags;
61use crate::process_unregister;
62
63pub(crate) type ProcessSend = Sender<ProcessItem>;
65pub(crate) type ProcessReceive = Receiver<ProcessItem>;
67
68pub struct Process {
70 pub(crate) pid: Pid,
72 pub(crate) sender: ProcessSend,
74 pub(crate) receiver: ProcessReceive,
76 pub(crate) items: RefCell<Vec<ProcessItem>>,
78 pub(crate) aliases: RefCell<BTreeSet<u64>>,
80 pub(crate) monitors: RefCell<BTreeMap<Reference, ProcessMonitor>>,
82}
83
84tokio::task_local! {
85 pub(crate) static PROCESS: Process;
87}
88
89impl Process {
90 pub(crate) fn new(pid: Pid, sender: ProcessSend, receiver: ProcessReceive) -> Self {
92 Self {
93 pid,
94 sender,
95 receiver,
96 items: RefCell::new(Vec::new()),
97 aliases: RefCell::new(BTreeSet::new()),
98 monitors: RefCell::new(BTreeMap::new()),
99 }
100 }
101
102 pub fn alias(reply: bool) -> Reference {
106 let reference = Reference::new();
107
108 let sender = PROCESS.with(|process| {
109 process.aliases.borrow_mut().insert(reference.id());
110 process.sender.clone()
111 });
112
113 alias_create(sender, reference, reply);
114
115 reference
116 }
117
118 pub fn unalias(alias: Reference) -> bool {
122 PROCESS.with(|process| process.aliases.borrow_mut().remove(&alias.id()));
123
124 alias_destroy(alias)
125 }
126
127 #[must_use]
129 pub fn current() -> Pid {
130 PROCESS.with(|process| process.pid)
131 }
132
133 #[must_use]
135 pub fn whereis<S: AsRef<str>>(name: S) -> Option<Pid> {
136 process_name_lookup(name.as_ref())
137 }
138
139 pub fn send<D: Into<Dests>, M: Receivable>(dests: D, message: M) {
156 process_send(dests.into(), message);
157 }
158
159 pub fn send_after<D: Into<Dests>, M: Receivable>(
169 dest: D,
170 message: M,
171 duration: Duration,
172 ) -> Reference {
173 let dest = dest.into();
174
175 let reference = Reference::new();
176
177 let handle = tokio::spawn(async move {
178 Process::sleep(duration).await;
179 Process::send(dest, message);
180
181 process_destroy_timer(reference);
182 });
183
184 process_register_timer(reference, duration, handle);
185
186 reference
187 }
188
189 pub fn cancel_timer(timer: Reference) {
191 process_destroy_timer(timer);
192 }
193
194 pub fn read_timer(timer: Reference) -> Option<Duration> {
198 process_read_timer(timer)
199 }
200
201 #[must_use]
203 pub fn receiver() -> ProcessReceiver<()> {
204 ProcessReceiver::new()
205 }
206
207 #[must_use]
211 pub async fn receive<T: Receivable>() -> Message<T> {
212 ProcessReceiver::new()
213 .strict_type_checking()
214 .receive()
215 .await
216 }
217
218 pub fn spawn<T>(function: T) -> Pid
220 where
221 T: Future<Output = ()> + Send + 'static,
222 T::Output: Send + 'static,
223 {
224 match spawn_internal(function, false, false) {
225 SpawnResult::Pid(pid) => pid,
226 SpawnResult::PidMonitor(_, _) => unreachable!(),
227 }
228 }
229
230 pub fn spawn_link<T>(function: T) -> Pid
232 where
233 T: Future<Output = ()> + Send + 'static,
234 T::Output: Send + 'static,
235 {
236 match spawn_internal(function, true, false) {
237 SpawnResult::Pid(pid) => pid,
238 SpawnResult::PidMonitor(_, _) => unreachable!(),
239 }
240 }
241
242 pub fn spawn_monitor<T>(function: T) -> (Pid, Reference)
244 where
245 T: Future<Output = ()> + Send + 'static,
246 T::Output: Send + 'static,
247 {
248 match spawn_internal(function, false, true) {
249 SpawnResult::Pid(_) => unreachable!(),
250 SpawnResult::PidMonitor(pid, monitor) => (pid, monitor),
251 }
252 }
253
254 #[must_use]
256 pub fn alive(pid: Pid) -> bool {
257 process_alive(pid)
258 }
259
260 pub async fn sleep(duration: Duration) {
262 tokio::time::sleep(duration).await
263 }
264
265 pub async fn timeout<F>(duration: Duration, future: F) -> Result<<F as Future>::Output, Timeout>
267 where
268 F: Future,
269 {
270 pingora_timeout::timeout(duration, future)
271 .await
272 .map_err(|_| Timeout)
273 }
274
275 pub fn register<S: Into<String>>(pid: Pid, name: S) -> Result<(), ArgumentError> {
277 process_register(pid, name.into())
278 }
279
280 pub fn unregister<S: AsRef<str>>(name: S) {
282 process_unregister(name.as_ref());
283 }
284
285 #[must_use]
287 pub fn registered() -> Vec<String> {
288 process_name_list()
289 }
290
291 #[must_use]
293 pub fn list() -> Vec<Pid> {
294 process_list()
295 }
296
297 pub fn link(pid: Pid) {
299 let current = Self::current();
300
301 if pid == current {
302 return;
303 }
304
305 link_install(pid, current);
306 }
307
308 pub fn unlink(pid: Pid) {
310 let current = Self::current();
311
312 if pid == current {
313 return;
314 }
315
316 link_destroy(pid, current);
317 }
318
319 pub fn monitor<T: Into<Dest>>(process: T) -> Reference {
321 let current = Self::current();
322 let process = process.into();
323
324 let reference = Reference::new();
325
326 monitor_install(process, reference, current);
327
328 reference
329 }
330
331 pub fn monitor_alias<T: Into<Dest>>(process: T, reply: bool) -> Reference {
340 let current = Self::current();
341 let process = process.into();
342 let sender = PROCESS.with(|process| process.sender.clone());
343
344 let reference = Reference::new();
345
346 alias_create(sender, reference, reply);
347
348 monitor_install(process, reference, current);
349
350 reference
351 }
352
353 pub fn demonitor(monitor: Reference) {
357 let Some(process_monitor) =
358 PROCESS.with(|process| process.monitors.borrow_mut().remove(&monitor))
359 else {
360 return;
361 };
362
363 let ProcessMonitor::ForProcess(pid) = process_monitor else {
364 panic!("Invalid process monitor reference!");
365 };
366
367 let Some(pid) = pid else {
368 return;
369 };
370
371 monitor_destroy(pid, monitor);
372
373 alias_destroy(monitor);
374 }
375
376 #[must_use]
378 pub fn flags() -> ProcessFlags {
379 process_flags(Self::current()).unwrap()
380 }
381
382 pub fn set_flags(flags: ProcessFlags) {
384 process_set_flags(Self::current(), flags)
385 }
386
387 pub fn exit<E: Into<ExitReason>>(pid: Pid, exit_reason: E) {
389 let exit_reason = exit_reason.into();
390
391 if pid.is_local() {
392 process_exit(pid, Self::current(), exit_reason);
393 } else {
394 node_process_send_exit(pid, Self::current(), exit_reason);
395 }
396 }
397
398 #[must_use]
400 pub fn info(pid: Pid) -> Option<ProcessInfo> {
401 if pid.is_remote() {
402 panic!("Can't query information on a remote process!");
403 }
404
405 let info = process_info(pid);
406
407 info.map(|mut info| {
408 link_fill_info(pid, &mut info);
409
410 monitor_fill_info(pid, &mut info);
411
412 info
413 })
414 }
415}
416
417impl Drop for Process {
418 fn drop(&mut self) {
419 let process = process_drop(self.pid).unwrap();
420
421 if let Some(name) = process.name {
422 process_name_remove(&name);
423 }
424
425 let exit_reason = process.exit_reason.unwrap_or_default();
426
427 link_process_down(self.pid, exit_reason.clone());
428
429 monitor_process_down(self.pid, exit_reason);
430 monitor_destroy_all(self.monitors.borrow().iter());
431
432 alias_destroy_all(self.aliases.borrow().iter());
433 }
434}
435
436enum SpawnResult {
438 Pid(Pid),
439 PidMonitor(Pid, Reference),
440}
441
442static ID: AtomicU64 = AtomicU64::new(1);
444
445fn spawn_internal<T>(function: T, link: bool, monitor: bool) -> SpawnResult
447where
448 T: Future<Output = ()> + Send + 'static,
449 T::Output: Send + 'static,
450{
451 let next_id = ID.fetch_add(1, Ordering::Relaxed);
452
453 let (tx, rx) = flume::unbounded();
454
455 let pid = Pid::local(next_id);
456 let process = Process::new(pid, tx.clone(), rx);
457
458 let mut result = SpawnResult::Pid(pid);
459
460 if link {
462 let current = Process::current();
463
464 link_create(pid, current, true);
465 link_create(current, pid, true);
466 }
467
468 if monitor {
470 let monitor = Reference::new();
471
472 PROCESS.with(|process| {
473 process
474 .monitors
475 .borrow_mut()
476 .insert(monitor, ProcessMonitor::ForProcess(Some(pid)))
477 });
478
479 monitor_create(pid, monitor, Process::current(), Some(pid.into()));
480
481 result = SpawnResult::PidMonitor(pid, monitor);
482 }
483
484 let handle = tokio::spawn(PROCESS.scope(process, async move {
486 if let Err(e) = AsyncCatchUnwind::new(AssertUnwindSafe(function)).await {
487 process_set_exit_reason(Process::current(), e.into());
488 }
489 }));
490
491 process_insert(next_id, ProcessRegistration::new(handle, tx));
493
494 result
495}