hydra/
registry.rs

1use std::collections::BTreeMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::Arc;
5use std::time::Duration;
6
7use dashmap::DashMap;
8
9use once_cell::sync::Lazy;
10
11use serde::Deserialize;
12use serde::Serialize;
13
14use crate::CallError;
15use crate::ChildSpec;
16use crate::ChildType;
17use crate::Dest;
18use crate::ExitReason;
19use crate::From;
20use crate::GenServer;
21use crate::Message;
22use crate::Node;
23use crate::Pid;
24use crate::Process;
25use crate::ProcessFlags;
26use crate::Reference;
27use crate::RegistryOptions;
28use crate::Shutdown;
29use crate::SystemMessage;
30use crate::shutdown_infinity;
31use crate::shutdown_timeout;
32
33/// A local collection of active process registries.
34static REGISTRY: Lazy<DashMap<String, DashMap<RegistryKey, Pid>>> = Lazy::new(DashMap::new);
35
36/// A registry key.
37#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
38pub enum RegistryKey {
39    /// A 32bit signed integer key.
40    Integer32(i32),
41    /// A 64bit signed integer key.
42    Integer64(i64),
43    /// A 128bit signed integer key.
44    Integer128(i128),
45    /// A 32bit unsigned integer key.
46    UInteger32(u32),
47    /// A 64bit unsigned integer key.
48    UInteger64(u64),
49    /// A 128bit unsigned integer key.
50    UInteger128(u128),
51    /// A string key.
52    String(String),
53}
54
55/// A registry message.
56#[doc(hidden)]
57#[derive(Serialize, Deserialize)]
58pub enum RegistryMessage {
59    Lookup(RegistryKey),
60    LookupSuccess(Option<Pid>),
61    LookupOrStart(RegistryKey),
62    LookupOrStartSuccess(Pid),
63    LookupOrStartError(RegistryError),
64    Start(RegistryKey),
65    StartSuccess(Pid),
66    StartError(RegistryError),
67    Stop(RegistryKey),
68    StopSuccess,
69    StopError(RegistryError),
70    Count,
71    CountSuccess(usize),
72    List,
73    ListSuccess(Vec<(RegistryKey, Pid)>),
74    Remove(RegistryKey),
75    RemoveSuccess(Option<Pid>),
76    RemoveLookup(Pid),
77}
78
79/// Errors for [Registry] calls.
80#[derive(Debug, Serialize, Deserialize)]
81pub enum RegistryError {
82    /// A call to the [Registry] server has failed.
83    CallError(CallError),
84    /// The process failed to start.
85    StartError(ExitReason),
86    /// The registry wasn't configured with a start routine.
87    StartNotSupported,
88    /// The process is already registered and running.
89    AlreadyStarted(Pid),
90    /// The process was not found for the given key.
91    NotFound,
92}
93
94/// Provides a centralized 'registry' of processes using any value as a key.
95///
96/// A registry is designed to only support a single type of [Process] or [GenServer], you should use multiple if necessary.
97#[derive(Clone)]
98pub struct Registry {
99    name: String,
100    #[allow(clippy::type_complexity)]
101    start: Option<
102        Arc<
103            dyn Fn(RegistryKey) -> Box<dyn Future<Output = Result<Pid, ExitReason>> + Send + Sync>
104                + Send
105                + Sync,
106        >,
107    >,
108    shutdown: Shutdown,
109    lookup: BTreeMap<Pid, RegistryKey>,
110}
111
112impl Registry {
113    /// Constructs a new local [Registry] with the given name.
114    ///
115    /// Names must be unique on a per-node basis.
116    #[must_use]
117    pub fn new<T: Into<String>>(name: T) -> Self {
118        Self {
119            name: name.into(),
120            start: None,
121            shutdown: Shutdown::BrutalKill,
122            lookup: BTreeMap::new(),
123        }
124    }
125
126    /// Assigns a start routine for the registry to be able to dynamically spawn processes.
127    ///
128    /// Must return a future that resolves to [Result<Pid, ExitReason>].
129    #[must_use]
130    pub fn with_start<T, F>(mut self, start: T) -> Self
131    where
132        T: Fn(RegistryKey) -> F + Send + Sync + 'static,
133        F: Future<Output = Result<Pid, ExitReason>> + Send + Sync + 'static,
134    {
135        self.start = Some(Arc::new(move |key| Box::new(start(key))));
136        self
137    }
138
139    /// Sets the method used to shutdown any registered processes once when the [Registry] is terminated.
140    pub fn with_shutdown(mut self, shutdown: Shutdown) -> Self {
141        self.shutdown = shutdown;
142        self
143    }
144
145    /// Looks up a running process.
146    ///
147    /// If the registry is local, this will just query the table without hitting the registry process.
148    pub async fn lookup<T: Into<Dest>, N: Into<RegistryKey>>(
149        registry: T,
150        key: N,
151        timeout: Option<Duration>,
152    ) -> Result<Option<Pid>, RegistryError> {
153        use RegistryMessage::*;
154
155        let registry = registry.into();
156        let key = key.into();
157
158        if let Dest::Named(registry, Node::Local) = &registry {
159            return Ok(lookup_process(registry, &key));
160        }
161
162        match Registry::call(registry, Lookup(key), timeout).await? {
163            LookupSuccess(pid) => Ok(pid),
164            _ => unreachable!(),
165        }
166    }
167
168    /// Attempts to lookup a running process.
169    ///
170    /// If the process isn't currently running, it is spawned and passed the key.
171    pub async fn lookup_or_start<T: Into<Dest>, N: Into<RegistryKey>>(
172        registry: T,
173        key: N,
174        timeout: Option<Duration>,
175    ) -> Result<Pid, RegistryError> {
176        use RegistryMessage::*;
177
178        let registry = registry.into();
179        let key = key.into();
180
181        if let Dest::Named(registry, Node::Local) = &registry
182            && let Some(result) = lookup_process(registry, &key)
183        {
184            return Ok(result);
185        }
186
187        match Registry::call(registry, LookupOrStart(key), timeout).await? {
188            LookupOrStartSuccess(pid) => Ok(pid),
189            LookupOrStartError(error) => Err(error),
190            _ => unreachable!(),
191        }
192    }
193
194    /// Attempts to start a process.
195    ///
196    /// If the process is already running, an error is returned.
197    pub async fn start_process<T: Into<Dest>, N: Into<RegistryKey>>(
198        registry: T,
199        key: N,
200        timeout: Option<Duration>,
201    ) -> Result<Pid, RegistryError> {
202        use RegistryMessage::*;
203
204        let registry = registry.into();
205        let key = key.into();
206
207        if let Dest::Named(registry, Node::Local) = &registry
208            && let Some(pid) = lookup_process(registry, &key)
209        {
210            return Err(RegistryError::AlreadyStarted(pid));
211        }
212
213        match Registry::call(registry, Start(key), timeout).await? {
214            StartSuccess(pid) => Ok(pid),
215            StartError(error) => Err(error),
216            _ => unreachable!(),
217        }
218    }
219
220    /// Stops a process registered in the given `registry` with the given `key`.
221    ///
222    /// If the process is trapping exits, it will still run, but be unregistered from this registry.
223    ///
224    /// If the process is not registered an error is returned.
225    pub async fn stop_process<T: Into<Dest>, N: Into<RegistryKey>>(
226        registry: T,
227        key: N,
228        timeout: Option<Duration>,
229    ) -> Result<(), RegistryError> {
230        use RegistryMessage::*;
231
232        let registry = registry.into();
233        let key = key.into();
234
235        if let Dest::Named(registry, Node::Local) = &registry
236            && lookup_process(registry, &key).is_none()
237        {
238            return Err(RegistryError::NotFound);
239        }
240
241        match Registry::call(registry, Stop(key), timeout).await? {
242            StopSuccess => Ok(()),
243            StopError(error) => Err(error),
244            _ => unreachable!(),
245        }
246    }
247
248    /// Returns the number of registered processes for the given `registry`.
249    pub async fn count<T: Into<Dest>>(
250        registry: T,
251        timeout: Option<Duration>,
252    ) -> Result<usize, RegistryError> {
253        use RegistryMessage::*;
254
255        let registry = registry.into();
256
257        if let Dest::Named(registry, Node::Local) = &registry {
258            return Ok(count_processes(registry));
259        }
260
261        match Registry::call(registry, Count, timeout).await? {
262            CountSuccess(count) => Ok(count),
263            _ => unreachable!(),
264        }
265    }
266
267    /// Returns a list of every process registered to the given `registry`.
268    ///
269    /// There is no ordering guarantee.
270    pub async fn list<T: Into<Dest>>(
271        registry: T,
272        timeout: Option<Duration>,
273    ) -> Result<Vec<(RegistryKey, Pid)>, RegistryError> {
274        use RegistryMessage::*;
275
276        let registry = registry.into();
277
278        if let Dest::Named(registry, Node::Local) = &registry {
279            return Ok(list_processes(registry));
280        }
281
282        match Registry::call(registry, List, timeout).await? {
283            ListSuccess(list) => Ok(list),
284            _ => unreachable!(),
285        }
286    }
287
288    /// Removes a process registered in the given `registry` with the given `key`.
289    ///
290    /// The process will no longer be registered, but it will remain running if it was found.
291    pub async fn remove<T: Into<Dest>, N: Into<RegistryKey>>(
292        registry: T,
293        key: N,
294        timeout: Option<Duration>,
295    ) -> Result<Option<Pid>, RegistryError> {
296        use RegistryMessage::*;
297
298        let registry = registry.into();
299        let key = key.into();
300
301        if let Dest::Named(registry, Node::Local) = &registry {
302            let Some(process) = remove_process(registry, &key) else {
303                return Ok(None);
304            };
305
306            Registry::cast(registry.to_string(), RemoveLookup(process));
307
308            return Ok(Some(process));
309        }
310
311        match Registry::call(registry, Remove(key), timeout).await? {
312            RemoveSuccess(pid) => Ok(pid),
313            _ => unreachable!(),
314        }
315    }
316
317    /// Create a registry proces not linked to a supervision tree.
318    pub async fn start(self, mut options: RegistryOptions) -> Result<Pid, ExitReason> {
319        if options.name.is_none() {
320            options = options.name(self.name.clone());
321        }
322
323        GenServer::start(self, options.into()).await
324    }
325
326    /// Creates a registry process as part of a supervision tree.
327    ///
328    /// For example, this function ensures that the registry is linked to the calling process (its supervisor).
329    pub async fn start_link(self, mut options: RegistryOptions) -> Result<Pid, ExitReason> {
330        if options.name.is_none() {
331            options = options.name(self.name.clone());
332        }
333
334        GenServer::start_link(self, options.into()).await
335    }
336
337    /// Builds a child specification for this [Registry] process.
338    pub fn child_spec(self, mut options: RegistryOptions) -> ChildSpec {
339        if options.name.is_none() {
340            options = options.name(self.name.clone());
341        }
342
343        ChildSpec::new("Registry")
344            .start(move || self.clone().start_link(options.clone()))
345            .child_type(ChildType::Supervisor)
346    }
347
348    /// Looks up, or starts a process by the given key.
349    async fn lookup_or_start_by_key(&mut self, key: RegistryKey) -> Result<Pid, RegistryError> {
350        if let Some(result) = lookup_process(&self.name, &key) {
351            return Ok(result);
352        }
353
354        self.start_by_key(key).await
355    }
356
357    /// Starts a process if one doesn't exist.
358    async fn start_by_key(&mut self, key: RegistryKey) -> Result<Pid, RegistryError> {
359        if let Some(process) = lookup_process(&self.name, &key) {
360            return Err(RegistryError::AlreadyStarted(process));
361        }
362
363        let start_child = Pin::from(self.start.as_ref().unwrap()(key.clone())).await;
364
365        match start_child {
366            Ok(pid) => {
367                #[cfg(feature = "tracing")]
368                tracing::info!(child_key = ?key, child_pid = ?pid, "Started registered process");
369
370                self.lookup.insert(pid, key.clone());
371
372                register_process(&self.name, key, pid);
373
374                Ok(pid)
375            }
376            Err(reason) => {
377                #[cfg(feature = "tracing")]
378                tracing::error!(reason = ?reason, child_key = ?key, "Start registered process error");
379
380                Err(RegistryError::StartError(reason))
381            }
382        }
383    }
384
385    /// Terminates all registered children of this registry.
386    async fn terminate_children(&mut self) {
387        match self.shutdown {
388            Shutdown::BrutalKill => {
389                // Do nothing, the drop will automatically kill each process.
390            }
391            _ => {
392                let Some((_, registry)) = REGISTRY.remove(&self.name) else {
393                    return;
394                };
395
396                let mut monitors: Vec<(Pid, Reference)> = Vec::with_capacity(registry.len());
397
398                for (process, key) in &self.lookup {
399                    if registry.contains_key(key) {
400                        monitors.push((*process, Process::monitor(*process)));
401
402                        Process::exit(*process, ExitReason::from("shutdown"));
403                    }
404                }
405
406                for (process, monitor) in monitors {
407                    if let Shutdown::Duration(timeout) = self.shutdown {
408                        let _ = shutdown_timeout(process, monitor, timeout).await;
409                    } else if let Shutdown::Infinity = self.shutdown {
410                        let _ = shutdown_infinity(process, monitor).await;
411                    }
412                }
413
414                self.lookup.clear();
415            }
416        }
417    }
418
419    /// Stops a process by the given key.
420    fn stop_by_key(&mut self, key: RegistryKey) -> Result<(), RegistryError> {
421        let Some(process) = lookup_process(&self.name, &key) else {
422            return Err(RegistryError::NotFound);
423        };
424
425        Process::unlink(process);
426        Process::exit(process, ExitReason::from("shutdown"));
427
428        self.lookup.remove(&process);
429
430        remove_process(&self.name, &key);
431
432        Ok(())
433    }
434
435    /// Looks up a process by the given key.
436    fn lookup_by_key(&mut self, key: RegistryKey) -> Option<Pid> {
437        lookup_process(&self.name, &key)
438    }
439
440    /// Removes a process by the given key.
441    fn remove_by_key(&mut self, key: RegistryKey) -> Option<Pid> {
442        let process = remove_process(&self.name, &key)?;
443
444        Process::unlink(process);
445
446        self.lookup.remove(&process);
447
448        Some(process)
449    }
450
451    /// Removes the process from the registry.
452    fn remove_process(&mut self, pid: Pid, reason: ExitReason) {
453        let Some(key) = self.lookup.remove(&pid) else {
454            return;
455        };
456
457        #[cfg(feature = "tracing")]
458        tracing::info!(reason = ?reason, child_key = ?key, child_pid = ?pid, "Removed registered process");
459
460        #[cfg(not(feature = "tracing"))]
461        let _ = reason;
462
463        REGISTRY.alter(&self.name, |_, value| {
464            value.remove_if(&key, |_, value| *value == pid);
465            value
466        });
467    }
468}
469
470impl Drop for Registry {
471    fn drop(&mut self) {
472        REGISTRY.remove(&self.name);
473
474        for process in self.lookup.keys() {
475            Process::unlink(*process);
476            Process::exit(*process, ExitReason::Kill);
477        }
478    }
479}
480
481impl GenServer for Registry {
482    type Message = RegistryMessage;
483
484    async fn init(&mut self) -> Result<(), ExitReason> {
485        Process::set_flags(ProcessFlags::TRAP_EXIT);
486
487        Ok(())
488    }
489
490    async fn terminate(&mut self, _reason: ExitReason) {
491        self.terminate_children().await;
492    }
493
494    async fn handle_cast(&mut self, message: Self::Message) -> Result<(), ExitReason> {
495        use RegistryMessage::*;
496
497        match message {
498            RemoveLookup(process) => {
499                Process::unlink(process);
500
501                self.lookup.remove(&process);
502                Ok(())
503            }
504            _ => unreachable!(),
505        }
506    }
507
508    async fn handle_call(
509        &mut self,
510        message: Self::Message,
511        _from: From,
512    ) -> Result<Option<Self::Message>, ExitReason> {
513        use RegistryMessage::*;
514
515        match message {
516            Lookup(key) => {
517                let result = self.lookup_by_key(key);
518
519                Ok(Some(LookupSuccess(result)))
520            }
521            LookupOrStart(key) => match self.lookup_or_start_by_key(key).await {
522                Ok(pid) => Ok(Some(LookupOrStartSuccess(pid))),
523                Err(error) => Ok(Some(LookupOrStartError(error))),
524            },
525            Start(key) => match self.start_by_key(key).await {
526                Ok(pid) => Ok(Some(StartSuccess(pid))),
527                Err(error) => Ok(Some(StartError(error))),
528            },
529            Stop(key) => match self.stop_by_key(key) {
530                Ok(()) => Ok(Some(StopSuccess)),
531                Err(error) => Ok(Some(StopError(error))),
532            },
533            Count => {
534                let count = count_processes(&self.name);
535
536                Ok(Some(CountSuccess(count)))
537            }
538            List => {
539                let list = list_processes(&self.name);
540
541                Ok(Some(ListSuccess(list)))
542            }
543            Remove(key) => {
544                let removed = self.remove_by_key(key);
545
546                Ok(Some(RemoveSuccess(removed)))
547            }
548            _ => unreachable!(),
549        }
550    }
551
552    async fn handle_info(&mut self, info: Message<Self::Message>) -> Result<(), ExitReason> {
553        match info {
554            Message::System(SystemMessage::Exit(pid, reason)) => {
555                self.remove_process(pid, reason);
556                Ok(())
557            }
558            _ => Ok(()),
559        }
560    }
561}
562
563impl std::convert::From<i32> for RegistryKey {
564    fn from(value: i32) -> Self {
565        Self::Integer32(value)
566    }
567}
568
569impl std::convert::From<i64> for RegistryKey {
570    fn from(value: i64) -> Self {
571        Self::Integer64(value)
572    }
573}
574
575impl std::convert::From<i128> for RegistryKey {
576    fn from(value: i128) -> Self {
577        Self::Integer128(value)
578    }
579}
580
581impl std::convert::From<u32> for RegistryKey {
582    fn from(value: u32) -> Self {
583        Self::UInteger32(value)
584    }
585}
586
587impl std::convert::From<u64> for RegistryKey {
588    fn from(value: u64) -> Self {
589        Self::UInteger64(value)
590    }
591}
592
593impl std::convert::From<u128> for RegistryKey {
594    fn from(value: u128) -> Self {
595        Self::UInteger128(value)
596    }
597}
598
599impl std::convert::From<String> for RegistryKey {
600    fn from(value: String) -> Self {
601        Self::String(value)
602    }
603}
604
605impl std::convert::From<&str> for RegistryKey {
606    fn from(value: &str) -> Self {
607        Self::String(value.to_owned())
608    }
609}
610
611impl std::convert::From<CallError> for RegistryError {
612    fn from(value: CallError) -> Self {
613        Self::CallError(value)
614    }
615}
616
617/// Looks up a process in the given registry assigned to the given key.
618fn lookup_process<T: AsRef<str>>(registry: T, key: &RegistryKey) -> Option<Pid> {
619    REGISTRY
620        .get(registry.as_ref())
621        .and_then(|registry| registry.get(key).map(|entry| *entry.value()))
622}
623
624/// Removes a process in the given registry assigned to the given key.
625fn remove_process<T: AsRef<str>>(registry: T, key: &RegistryKey) -> Option<Pid> {
626    REGISTRY
627        .get_mut(registry.as_ref())
628        .and_then(|registry| registry.remove(key).map(|entry| entry.1))
629}
630
631/// Counts the number of processes in a registry.
632fn count_processes<T: AsRef<str>>(registry: T) -> usize {
633    REGISTRY
634        .get(registry.as_ref())
635        .map(|registry| registry.len())
636        .unwrap_or_default()
637}
638
639/// Lists all of the processes in a registry.
640fn list_processes<T: AsRef<str>>(registry: T) -> Vec<(RegistryKey, Pid)> {
641    REGISTRY
642        .get(registry.as_ref())
643        .map(|registry| {
644            registry
645                .iter()
646                .map(|entry| (entry.key().clone(), *entry.value()))
647                .collect()
648        })
649        .unwrap_or_default()
650}
651
652/// Registers the given process in the local registry with the given key.
653fn register_process<T: Into<String>>(registry: T, key: RegistryKey, process: Pid) {
654    REGISTRY
655        .entry(registry.into())
656        .or_default()
657        .insert(key, process);
658}