hydra/
process.rs

1use std::cell::RefCell;
2use std::collections::BTreeMap;
3use std::collections::BTreeSet;
4use std::future::Future;
5use std::panic::AssertUnwindSafe;
6use std::sync::atomic::AtomicU64;
7use std::sync::atomic::Ordering;
8use std::time::Duration;
9
10use flume::Receiver;
11use flume::Sender;
12
13use crate::ArgumentError;
14use crate::AsyncCatchUnwind;
15use crate::Dest;
16use crate::Dests;
17use crate::ExitReason;
18use crate::Message;
19use crate::Pid;
20use crate::ProcessFlags;
21use crate::ProcessInfo;
22use crate::ProcessItem;
23use crate::ProcessMonitor;
24use crate::ProcessReceiver;
25use crate::ProcessRegistration;
26use crate::Receivable;
27use crate::Reference;
28use crate::Timeout;
29use crate::alias_create;
30use crate::alias_destroy;
31use crate::alias_destroy_all;
32use crate::link_create;
33use crate::link_destroy;
34use crate::link_fill_info;
35use crate::link_install;
36use crate::link_process_down;
37use crate::monitor_create;
38use crate::monitor_destroy;
39use crate::monitor_destroy_all;
40use crate::monitor_fill_info;
41use crate::monitor_install;
42use crate::monitor_process_down;
43use crate::node_process_send_exit;
44use crate::process_alive;
45use crate::process_destroy_timer;
46use crate::process_drop;
47use crate::process_exit;
48use crate::process_flags;
49use crate::process_info;
50use crate::process_insert;
51use crate::process_list;
52use crate::process_name_list;
53use crate::process_name_lookup;
54use crate::process_name_remove;
55use crate::process_read_timer;
56use crate::process_register;
57use crate::process_register_timer;
58use crate::process_send;
59use crate::process_set_exit_reason;
60use crate::process_set_flags;
61use crate::process_unregister;
62
63/// The send type for a process.
64pub(crate) type ProcessSend = Sender<ProcessItem>;
65/// The receive type for a process.
66pub(crate) type ProcessReceive = Receiver<ProcessItem>;
67
68/// A light weight task that can send and receive messages.
69pub struct Process {
70    /// The unique id of this process.
71    pub(crate) pid: Pid,
72    /// The sender for this process.
73    pub(crate) sender: ProcessSend,
74    /// The receiver for this process.
75    pub(crate) receiver: ProcessReceive,
76    /// The messages already popped from the receiver for this process.
77    pub(crate) items: RefCell<Vec<ProcessItem>>,
78    /// A collection of process aliases.
79    pub(crate) aliases: RefCell<BTreeSet<u64>>,
80    /// A collection of process monitor references.
81    pub(crate) monitors: RefCell<BTreeMap<Reference, ProcessMonitor>>,
82}
83
84tokio::task_local! {
85    /// Current process information.
86    pub(crate) static PROCESS: Process;
87}
88
89impl Process {
90    /// Constructs a new [Process] from the given [Pid] and channels.
91    pub(crate) fn new(pid: Pid, sender: ProcessSend, receiver: ProcessReceive) -> Self {
92        Self {
93            pid,
94            sender,
95            receiver,
96            items: RefCell::new(Vec::new()),
97            aliases: RefCell::new(BTreeSet::new()),
98            monitors: RefCell::new(BTreeMap::new()),
99        }
100    }
101
102    /// Creates a process alias. If reply is `true` the alias deactivates when the first message is received.
103    ///
104    /// Otherwise, you need to call `unalias(reference)` to deactivate the alias.
105    pub fn alias(reply: bool) -> Reference {
106        let reference = Reference::new();
107
108        let sender = PROCESS.with(|process| {
109            process.aliases.borrow_mut().insert(reference.id());
110            process.sender.clone()
111        });
112
113        alias_create(sender, reference, reply);
114
115        reference
116    }
117
118    /// Explicitly deactivates a process alias.
119    ///
120    /// Returns `true` if the alias was currently-active for the current process, or `false` otherwise.
121    pub fn unalias(alias: Reference) -> bool {
122        PROCESS.with(|process| process.aliases.borrow_mut().remove(&alias.id()));
123
124        alias_destroy(alias)
125    }
126
127    /// Returns the current [Pid].
128    #[must_use]
129    pub fn current() -> Pid {
130        PROCESS.with(|process| process.pid)
131    }
132
133    /// Returns the [Pid] registered under `name` or [None] if the name is not registered.
134    #[must_use]
135    pub fn whereis<S: AsRef<str>>(name: S) -> Option<Pid> {
136        process_name_lookup(name.as_ref())
137    }
138
139    /// Sends a message to `dests`.
140    ///
141    /// ## Example:
142    /// This method allows sending to multiple targets with certain trade offs:
143    /// ```ignore
144    /// let pid1 = Process::spawn(async { /* */ });
145    /// let pid2 = Process::spawn(async { /* */ });
146    ///
147    /// // faster when all processes are local.
148    /// for pid in &[pid1, pid2] {
149    ///     Process::send(pid, "hello world!");
150    /// }
151    ///
152    /// // faster when processes are mostly remote.
153    /// Process::send(&[pid1, pid2], "hello world!");
154    /// ```
155    pub fn send<D: Into<Dests>, M: Receivable>(dests: D, message: M) {
156        process_send(dests.into(), message);
157    }
158
159    /// Sends a message to `dests` after the given `duration`.
160    ///
161    /// See [Process::send] for performance trade-offs.
162    ///
163    /// ## Example:
164    /// Sends a message after 5 seconds to `pid`:
165    /// ```ignore
166    /// Process::send_after(pid, "hello world!", Duration::from_secs(5));
167    /// ```
168    pub fn send_after<D: Into<Dests>, M: Receivable>(
169        dest: D,
170        message: M,
171        duration: Duration,
172    ) -> Reference {
173        let dest = dest.into();
174
175        let reference = Reference::new();
176
177        let handle = tokio::spawn(async move {
178            Process::sleep(duration).await;
179            Process::send(dest, message);
180
181            process_destroy_timer(reference);
182        });
183
184        process_register_timer(reference, duration, handle);
185
186        reference
187    }
188
189    /// Cancels a timer created by `send_after`.
190    pub fn cancel_timer(timer: Reference) {
191        process_destroy_timer(timer);
192    }
193
194    /// Reads a timer created by `send_after`.
195    ///
196    /// It returns the time remaining until the timer expires.
197    pub fn read_timer(timer: Reference) -> Option<Duration> {
198        process_read_timer(timer)
199    }
200
201    /// Creates a receiver with advanced filtering options from the current processes mailbox.
202    #[must_use]
203    pub fn receiver() -> ProcessReceiver<()> {
204        ProcessReceiver::new()
205    }
206
207    /// Creates a receiver for a single message that matches the given type from the current processes mailbox.
208    ///
209    /// This will panic if a message is received that doesn't match the given type.
210    #[must_use]
211    pub async fn receive<T: Receivable>() -> Message<T> {
212        ProcessReceiver::new()
213            .strict_type_checking()
214            .receive()
215            .await
216    }
217
218    /// Spawns the given `function` as a process and returns it's [Pid].
219    pub fn spawn<T>(function: T) -> Pid
220    where
221        T: Future<Output = ()> + Send + 'static,
222        T::Output: Send + 'static,
223    {
224        match spawn_internal(function, false, false) {
225            SpawnResult::Pid(pid) => pid,
226            SpawnResult::PidMonitor(_, _) => unreachable!(),
227        }
228    }
229
230    /// Spawns the given `function` as a process, creates a link between the calling process, and returns the new [Pid].
231    pub fn spawn_link<T>(function: T) -> Pid
232    where
233        T: Future<Output = ()> + Send + 'static,
234        T::Output: Send + 'static,
235    {
236        match spawn_internal(function, true, false) {
237            SpawnResult::Pid(pid) => pid,
238            SpawnResult::PidMonitor(_, _) => unreachable!(),
239        }
240    }
241
242    /// Spawns the given `function` as a process, creates a monitor for the calling process, and returns the new [Pid].
243    pub fn spawn_monitor<T>(function: T) -> (Pid, Reference)
244    where
245        T: Future<Output = ()> + Send + 'static,
246        T::Output: Send + 'static,
247    {
248        match spawn_internal(function, false, true) {
249            SpawnResult::Pid(_) => unreachable!(),
250            SpawnResult::PidMonitor(pid, monitor) => (pid, monitor),
251        }
252    }
253
254    /// Returns true if the given [Pid] is alive on the local node.
255    #[must_use]
256    pub fn alive(pid: Pid) -> bool {
257        process_alive(pid)
258    }
259
260    /// Sleeps the current process for the given duration.
261    pub async fn sleep(duration: Duration) {
262        tokio::time::sleep(duration).await
263    }
264
265    /// Waits for the given future to complete until the given duration is up.
266    pub async fn timeout<F>(duration: Duration, future: F) -> Result<<F as Future>::Output, Timeout>
267    where
268        F: Future,
269    {
270        pingora_timeout::timeout(duration, future)
271            .await
272            .map_err(|_| Timeout)
273    }
274
275    /// Registers the given [Pid] under `name` if the process is local, active, and the name is not already registered.
276    pub fn register<S: Into<String>>(pid: Pid, name: S) -> Result<(), ArgumentError> {
277        process_register(pid, name.into())
278    }
279
280    /// Removes the registered `name`, associated with a [Pid].
281    pub fn unregister<S: AsRef<str>>(name: S) {
282        process_unregister(name.as_ref());
283    }
284
285    /// Returns a [Vec] of registered process names.
286    #[must_use]
287    pub fn registered() -> Vec<String> {
288        process_name_list()
289    }
290
291    /// Returns a [Vec] of [Pid]'s on the local node.
292    #[must_use]
293    pub fn list() -> Vec<Pid> {
294        process_list()
295    }
296
297    /// Creates a bi-directional link between the current process and the given process.
298    pub fn link(pid: Pid) {
299        let current = Self::current();
300
301        if pid == current {
302            return;
303        }
304
305        link_install(pid, current);
306    }
307
308    /// Removes the link between the calling process and the given process.
309    pub fn unlink(pid: Pid) {
310        let current = Self::current();
311
312        if pid == current {
313            return;
314        }
315
316        link_destroy(pid, current);
317    }
318
319    /// Starts monitoring the given process from the calling process. If the process is already dead a message is sent immediately.
320    pub fn monitor<T: Into<Dest>>(process: T) -> Reference {
321        let current = Self::current();
322        let process = process.into();
323
324        let reference = Reference::new();
325
326        monitor_install(process, reference, current);
327
328        reference
329    }
330
331    /// Starts monitoring the given process from the calling process. If the process is already dead a message is sent immediately.
332    ///
333    /// Creates an alias for the calling process that's tied to the process monitor.
334    ///
335    /// The alias will be deactivated if:
336    /// - The monitor sends a down message.
337    /// - The user explicitly calls `unalias`. (The monitor will remain active)
338    /// - `reply` is `true` and a message is sent over the alias.
339    pub fn monitor_alias<T: Into<Dest>>(process: T, reply: bool) -> Reference {
340        let current = Self::current();
341        let process = process.into();
342        let sender = PROCESS.with(|process| process.sender.clone());
343
344        let reference = Reference::new();
345
346        alias_create(sender, reference, reply);
347
348        monitor_install(process, reference, current);
349
350        reference
351    }
352
353    /// Demonitors the monitor identified by the given reference.
354    ///
355    /// If a monitor message was sent to the process already but was not received, it will be discarded automatically.
356    pub fn demonitor(monitor: Reference) {
357        let Some(process_monitor) =
358            PROCESS.with(|process| process.monitors.borrow_mut().remove(&monitor))
359        else {
360            return;
361        };
362
363        let ProcessMonitor::ForProcess(pid) = process_monitor else {
364            panic!("Invalid process monitor reference!");
365        };
366
367        let Some(pid) = pid else {
368            return;
369        };
370
371        monitor_destroy(pid, monitor);
372
373        alias_destroy(monitor);
374    }
375
376    /// Returns the current process flags.
377    #[must_use]
378    pub fn flags() -> ProcessFlags {
379        process_flags(Self::current()).unwrap()
380    }
381
382    /// Sets one or more process flags.
383    pub fn set_flags(flags: ProcessFlags) {
384        process_set_flags(Self::current(), flags)
385    }
386
387    /// Sends an exit signal with the given reason to [Pid].
388    pub fn exit<E: Into<ExitReason>>(pid: Pid, exit_reason: E) {
389        let exit_reason = exit_reason.into();
390
391        if pid.is_local() {
392            process_exit(pid, Self::current(), exit_reason);
393        } else {
394            node_process_send_exit(pid, Self::current(), exit_reason);
395        }
396    }
397
398    /// Fetches debug information for a given local process.
399    #[must_use]
400    pub fn info(pid: Pid) -> Option<ProcessInfo> {
401        if pid.is_remote() {
402            panic!("Can't query information on a remote process!");
403        }
404
405        let info = process_info(pid);
406
407        info.map(|mut info| {
408            link_fill_info(pid, &mut info);
409
410            monitor_fill_info(pid, &mut info);
411
412            info
413        })
414    }
415}
416
417impl Drop for Process {
418    fn drop(&mut self) {
419        let process = process_drop(self.pid).unwrap();
420
421        if let Some(name) = process.name {
422            process_name_remove(&name);
423        }
424
425        let exit_reason = process.exit_reason.unwrap_or_default();
426
427        link_process_down(self.pid, exit_reason.clone());
428
429        monitor_process_down(self.pid, exit_reason);
430        monitor_destroy_all(self.monitors.borrow().iter());
431
432        alias_destroy_all(self.aliases.borrow().iter());
433    }
434}
435
436/// Internal spawn result.
437enum SpawnResult {
438    Pid(Pid),
439    PidMonitor(Pid, Reference),
440}
441
442/// The next process id to allocate if free.
443static ID: AtomicU64 = AtomicU64::new(1);
444
445/// Internal spawn utility.
446fn spawn_internal<T>(function: T, link: bool, monitor: bool) -> SpawnResult
447where
448    T: Future<Output = ()> + Send + 'static,
449    T::Output: Send + 'static,
450{
451    let next_id = ID.fetch_add(1, Ordering::Relaxed);
452
453    let (tx, rx) = flume::unbounded();
454
455    let pid = Pid::local(next_id);
456    let process = Process::new(pid, tx.clone(), rx);
457
458    let mut result = SpawnResult::Pid(pid);
459
460    // If a link was requested, insert it before spawning the process.
461    if link {
462        let current = Process::current();
463
464        link_create(pid, current, true);
465        link_create(current, pid, true);
466    }
467
468    // If a monitor was requested, insert it before spawning the process.
469    if monitor {
470        let monitor = Reference::new();
471
472        PROCESS.with(|process| {
473            process
474                .monitors
475                .borrow_mut()
476                .insert(monitor, ProcessMonitor::ForProcess(Some(pid)))
477        });
478
479        monitor_create(pid, monitor, Process::current(), Some(pid.into()));
480
481        result = SpawnResult::PidMonitor(pid, monitor);
482    }
483
484    // Spawn the process with the newly created process object in scope.
485    let handle = tokio::spawn(PROCESS.scope(process, async move {
486        if let Err(e) = AsyncCatchUnwind::new(AssertUnwindSafe(function)).await {
487            process_set_exit_reason(Process::current(), e.into());
488        }
489    }));
490
491    // Register the process under it's new id.
492    process_insert(next_id, ProcessRegistration::new(handle, tx));
493
494    result
495}