1use std::collections::BTreeMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::Arc;
5use std::time::Duration;
6
7use dashmap::DashMap;
8
9use once_cell::sync::Lazy;
10
11use serde::Deserialize;
12use serde::Serialize;
13
14use crate::CallError;
15use crate::ChildSpec;
16use crate::ChildType;
17use crate::Dest;
18use crate::ExitReason;
19use crate::From;
20use crate::GenServer;
21use crate::Message;
22use crate::Node;
23use crate::Pid;
24use crate::Process;
25use crate::ProcessFlags;
26use crate::Reference;
27use crate::RegistryOptions;
28use crate::Shutdown;
29use crate::SystemMessage;
30use crate::shutdown_infinity;
31use crate::shutdown_timeout;
32
33static REGISTRY: Lazy<DashMap<String, DashMap<RegistryKey, Pid>>> = Lazy::new(DashMap::new);
35
36#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
38pub enum RegistryKey {
39 Integer32(i32),
41 Integer64(i64),
43 Integer128(i128),
45 UInteger32(u32),
47 UInteger64(u64),
49 UInteger128(u128),
51 String(String),
53}
54
55#[doc(hidden)]
57#[derive(Serialize, Deserialize)]
58pub enum RegistryMessage {
59 Lookup(RegistryKey),
60 LookupSuccess(Option<Pid>),
61 LookupOrStart(RegistryKey),
62 LookupOrStartSuccess(Pid),
63 LookupOrStartError(RegistryError),
64 Start(RegistryKey),
65 StartSuccess(Pid),
66 StartError(RegistryError),
67 Stop(RegistryKey),
68 StopSuccess,
69 StopError(RegistryError),
70 Count,
71 CountSuccess(usize),
72 List,
73 ListSuccess(Vec<(RegistryKey, Pid)>),
74 Remove(RegistryKey),
75 RemoveSuccess(Option<Pid>),
76 RemoveLookup(Pid),
77}
78
79#[derive(Debug, Serialize, Deserialize)]
81pub enum RegistryError {
82 CallError(CallError),
84 StartError(ExitReason),
86 StartNotSupported,
88 AlreadyStarted(Pid),
90 NotFound,
92}
93
94#[derive(Clone)]
98pub struct Registry {
99 name: String,
100 #[allow(clippy::type_complexity)]
101 start: Option<
102 Arc<
103 dyn Fn(RegistryKey) -> Box<dyn Future<Output = Result<Pid, ExitReason>> + Send + Sync>
104 + Send
105 + Sync,
106 >,
107 >,
108 shutdown: Shutdown,
109 lookup: BTreeMap<Pid, RegistryKey>,
110}
111
112impl Registry {
113 #[must_use]
117 pub fn new<T: Into<String>>(name: T) -> Self {
118 Self {
119 name: name.into(),
120 start: None,
121 shutdown: Shutdown::BrutalKill,
122 lookup: BTreeMap::new(),
123 }
124 }
125
126 #[must_use]
130 pub fn with_start<T, F>(mut self, start: T) -> Self
131 where
132 T: Fn(RegistryKey) -> F + Send + Sync + 'static,
133 F: Future<Output = Result<Pid, ExitReason>> + Send + Sync + 'static,
134 {
135 self.start = Some(Arc::new(move |key| Box::new(start(key))));
136 self
137 }
138
139 pub fn with_shutdown(mut self, shutdown: Shutdown) -> Self {
141 self.shutdown = shutdown;
142 self
143 }
144
145 pub async fn lookup<T: Into<Dest>, N: Into<RegistryKey>>(
149 registry: T,
150 key: N,
151 timeout: Option<Duration>,
152 ) -> Result<Option<Pid>, RegistryError> {
153 use RegistryMessage::*;
154
155 let registry = registry.into();
156 let key = key.into();
157
158 if let Dest::Named(registry, Node::Local) = ®istry {
159 return Ok(lookup_process(registry, &key));
160 }
161
162 match Registry::call(registry, Lookup(key), timeout).await? {
163 LookupSuccess(pid) => Ok(pid),
164 _ => unreachable!(),
165 }
166 }
167
168 pub async fn lookup_or_start<T: Into<Dest>, N: Into<RegistryKey>>(
172 registry: T,
173 key: N,
174 timeout: Option<Duration>,
175 ) -> Result<Pid, RegistryError> {
176 use RegistryMessage::*;
177
178 let registry = registry.into();
179 let key = key.into();
180
181 if let Dest::Named(registry, Node::Local) = ®istry
182 && let Some(result) = lookup_process(registry, &key)
183 {
184 return Ok(result);
185 }
186
187 match Registry::call(registry, LookupOrStart(key), timeout).await? {
188 LookupOrStartSuccess(pid) => Ok(pid),
189 LookupOrStartError(error) => Err(error),
190 _ => unreachable!(),
191 }
192 }
193
194 pub async fn start_process<T: Into<Dest>, N: Into<RegistryKey>>(
198 registry: T,
199 key: N,
200 timeout: Option<Duration>,
201 ) -> Result<Pid, RegistryError> {
202 use RegistryMessage::*;
203
204 let registry = registry.into();
205 let key = key.into();
206
207 if let Dest::Named(registry, Node::Local) = ®istry
208 && let Some(pid) = lookup_process(registry, &key)
209 {
210 return Err(RegistryError::AlreadyStarted(pid));
211 }
212
213 match Registry::call(registry, Start(key), timeout).await? {
214 StartSuccess(pid) => Ok(pid),
215 StartError(error) => Err(error),
216 _ => unreachable!(),
217 }
218 }
219
220 pub async fn stop_process<T: Into<Dest>, N: Into<RegistryKey>>(
226 registry: T,
227 key: N,
228 timeout: Option<Duration>,
229 ) -> Result<(), RegistryError> {
230 use RegistryMessage::*;
231
232 let registry = registry.into();
233 let key = key.into();
234
235 if let Dest::Named(registry, Node::Local) = ®istry
236 && lookup_process(registry, &key).is_none()
237 {
238 return Err(RegistryError::NotFound);
239 }
240
241 match Registry::call(registry, Stop(key), timeout).await? {
242 StopSuccess => Ok(()),
243 StopError(error) => Err(error),
244 _ => unreachable!(),
245 }
246 }
247
248 pub async fn count<T: Into<Dest>>(
250 registry: T,
251 timeout: Option<Duration>,
252 ) -> Result<usize, RegistryError> {
253 use RegistryMessage::*;
254
255 let registry = registry.into();
256
257 if let Dest::Named(registry, Node::Local) = ®istry {
258 return Ok(count_processes(registry));
259 }
260
261 match Registry::call(registry, Count, timeout).await? {
262 CountSuccess(count) => Ok(count),
263 _ => unreachable!(),
264 }
265 }
266
267 pub async fn list<T: Into<Dest>>(
271 registry: T,
272 timeout: Option<Duration>,
273 ) -> Result<Vec<(RegistryKey, Pid)>, RegistryError> {
274 use RegistryMessage::*;
275
276 let registry = registry.into();
277
278 if let Dest::Named(registry, Node::Local) = ®istry {
279 return Ok(list_processes(registry));
280 }
281
282 match Registry::call(registry, List, timeout).await? {
283 ListSuccess(list) => Ok(list),
284 _ => unreachable!(),
285 }
286 }
287
288 pub async fn remove<T: Into<Dest>, N: Into<RegistryKey>>(
292 registry: T,
293 key: N,
294 timeout: Option<Duration>,
295 ) -> Result<Option<Pid>, RegistryError> {
296 use RegistryMessage::*;
297
298 let registry = registry.into();
299 let key = key.into();
300
301 if let Dest::Named(registry, Node::Local) = ®istry {
302 let Some(process) = remove_process(registry, &key) else {
303 return Ok(None);
304 };
305
306 Registry::cast(registry.to_string(), RemoveLookup(process));
307
308 return Ok(Some(process));
309 }
310
311 match Registry::call(registry, Remove(key), timeout).await? {
312 RemoveSuccess(pid) => Ok(pid),
313 _ => unreachable!(),
314 }
315 }
316
317 pub async fn start(self, mut options: RegistryOptions) -> Result<Pid, ExitReason> {
319 if options.name.is_none() {
320 options = options.name(self.name.clone());
321 }
322
323 GenServer::start(self, options.into()).await
324 }
325
326 pub async fn start_link(self, mut options: RegistryOptions) -> Result<Pid, ExitReason> {
330 if options.name.is_none() {
331 options = options.name(self.name.clone());
332 }
333
334 GenServer::start_link(self, options.into()).await
335 }
336
337 pub fn child_spec(self, mut options: RegistryOptions) -> ChildSpec {
339 if options.name.is_none() {
340 options = options.name(self.name.clone());
341 }
342
343 ChildSpec::new("Registry")
344 .start(move || self.clone().start_link(options.clone()))
345 .child_type(ChildType::Supervisor)
346 }
347
348 async fn lookup_or_start_by_key(&mut self, key: RegistryKey) -> Result<Pid, RegistryError> {
350 if let Some(result) = lookup_process(&self.name, &key) {
351 return Ok(result);
352 }
353
354 self.start_by_key(key).await
355 }
356
357 async fn start_by_key(&mut self, key: RegistryKey) -> Result<Pid, RegistryError> {
359 if let Some(process) = lookup_process(&self.name, &key) {
360 return Err(RegistryError::AlreadyStarted(process));
361 }
362
363 let start_child = Pin::from(self.start.as_ref().unwrap()(key.clone())).await;
364
365 match start_child {
366 Ok(pid) => {
367 #[cfg(feature = "tracing")]
368 tracing::info!(child_key = ?key, child_pid = ?pid, "Started registered process");
369
370 self.lookup.insert(pid, key.clone());
371
372 register_process(&self.name, key, pid);
373
374 Ok(pid)
375 }
376 Err(reason) => {
377 #[cfg(feature = "tracing")]
378 tracing::error!(reason = ?reason, child_key = ?key, "Start registered process error");
379
380 Err(RegistryError::StartError(reason))
381 }
382 }
383 }
384
385 async fn terminate_children(&mut self) {
387 match self.shutdown {
388 Shutdown::BrutalKill => {
389 }
391 _ => {
392 let Some((_, registry)) = REGISTRY.remove(&self.name) else {
393 return;
394 };
395
396 let mut monitors: Vec<(Pid, Reference)> = Vec::with_capacity(registry.len());
397
398 for (process, key) in &self.lookup {
399 if registry.contains_key(key) {
400 monitors.push((*process, Process::monitor(*process)));
401
402 Process::exit(*process, ExitReason::from("shutdown"));
403 }
404 }
405
406 for (process, monitor) in monitors {
407 if let Shutdown::Duration(timeout) = self.shutdown {
408 let _ = shutdown_timeout(process, monitor, timeout).await;
409 } else if let Shutdown::Infinity = self.shutdown {
410 let _ = shutdown_infinity(process, monitor).await;
411 }
412 }
413
414 self.lookup.clear();
415 }
416 }
417 }
418
419 fn stop_by_key(&mut self, key: RegistryKey) -> Result<(), RegistryError> {
421 let Some(process) = lookup_process(&self.name, &key) else {
422 return Err(RegistryError::NotFound);
423 };
424
425 Process::unlink(process);
426 Process::exit(process, ExitReason::from("shutdown"));
427
428 self.lookup.remove(&process);
429
430 remove_process(&self.name, &key);
431
432 Ok(())
433 }
434
435 fn lookup_by_key(&mut self, key: RegistryKey) -> Option<Pid> {
437 lookup_process(&self.name, &key)
438 }
439
440 fn remove_by_key(&mut self, key: RegistryKey) -> Option<Pid> {
442 let process = remove_process(&self.name, &key)?;
443
444 Process::unlink(process);
445
446 self.lookup.remove(&process);
447
448 Some(process)
449 }
450
451 fn remove_process(&mut self, pid: Pid, reason: ExitReason) {
453 let Some(key) = self.lookup.remove(&pid) else {
454 return;
455 };
456
457 #[cfg(feature = "tracing")]
458 tracing::info!(reason = ?reason, child_key = ?key, child_pid = ?pid, "Removed registered process");
459
460 #[cfg(not(feature = "tracing"))]
461 let _ = reason;
462
463 REGISTRY.alter(&self.name, |_, value| {
464 value.remove_if(&key, |_, value| *value == pid);
465 value
466 });
467 }
468}
469
470impl Drop for Registry {
471 fn drop(&mut self) {
472 REGISTRY.remove(&self.name);
473
474 for process in self.lookup.keys() {
475 Process::unlink(*process);
476 Process::exit(*process, ExitReason::Kill);
477 }
478 }
479}
480
481impl GenServer for Registry {
482 type Message = RegistryMessage;
483
484 async fn init(&mut self) -> Result<(), ExitReason> {
485 Process::set_flags(ProcessFlags::TRAP_EXIT);
486
487 Ok(())
488 }
489
490 async fn terminate(&mut self, _reason: ExitReason) {
491 self.terminate_children().await;
492 }
493
494 async fn handle_cast(&mut self, message: Self::Message) -> Result<(), ExitReason> {
495 use RegistryMessage::*;
496
497 match message {
498 RemoveLookup(process) => {
499 Process::unlink(process);
500
501 self.lookup.remove(&process);
502 Ok(())
503 }
504 _ => unreachable!(),
505 }
506 }
507
508 async fn handle_call(
509 &mut self,
510 message: Self::Message,
511 _from: From,
512 ) -> Result<Option<Self::Message>, ExitReason> {
513 use RegistryMessage::*;
514
515 match message {
516 Lookup(key) => {
517 let result = self.lookup_by_key(key);
518
519 Ok(Some(LookupSuccess(result)))
520 }
521 LookupOrStart(key) => match self.lookup_or_start_by_key(key).await {
522 Ok(pid) => Ok(Some(LookupOrStartSuccess(pid))),
523 Err(error) => Ok(Some(LookupOrStartError(error))),
524 },
525 Start(key) => match self.start_by_key(key).await {
526 Ok(pid) => Ok(Some(StartSuccess(pid))),
527 Err(error) => Ok(Some(StartError(error))),
528 },
529 Stop(key) => match self.stop_by_key(key) {
530 Ok(()) => Ok(Some(StopSuccess)),
531 Err(error) => Ok(Some(StopError(error))),
532 },
533 Count => {
534 let count = count_processes(&self.name);
535
536 Ok(Some(CountSuccess(count)))
537 }
538 List => {
539 let list = list_processes(&self.name);
540
541 Ok(Some(ListSuccess(list)))
542 }
543 Remove(key) => {
544 let removed = self.remove_by_key(key);
545
546 Ok(Some(RemoveSuccess(removed)))
547 }
548 _ => unreachable!(),
549 }
550 }
551
552 async fn handle_info(&mut self, info: Message<Self::Message>) -> Result<(), ExitReason> {
553 match info {
554 Message::System(SystemMessage::Exit(pid, reason)) => {
555 self.remove_process(pid, reason);
556 Ok(())
557 }
558 _ => Ok(()),
559 }
560 }
561}
562
563impl std::convert::From<i32> for RegistryKey {
564 fn from(value: i32) -> Self {
565 Self::Integer32(value)
566 }
567}
568
569impl std::convert::From<i64> for RegistryKey {
570 fn from(value: i64) -> Self {
571 Self::Integer64(value)
572 }
573}
574
575impl std::convert::From<i128> for RegistryKey {
576 fn from(value: i128) -> Self {
577 Self::Integer128(value)
578 }
579}
580
581impl std::convert::From<u32> for RegistryKey {
582 fn from(value: u32) -> Self {
583 Self::UInteger32(value)
584 }
585}
586
587impl std::convert::From<u64> for RegistryKey {
588 fn from(value: u64) -> Self {
589 Self::UInteger64(value)
590 }
591}
592
593impl std::convert::From<u128> for RegistryKey {
594 fn from(value: u128) -> Self {
595 Self::UInteger128(value)
596 }
597}
598
599impl std::convert::From<String> for RegistryKey {
600 fn from(value: String) -> Self {
601 Self::String(value)
602 }
603}
604
605impl std::convert::From<&str> for RegistryKey {
606 fn from(value: &str) -> Self {
607 Self::String(value.to_owned())
608 }
609}
610
611impl std::convert::From<CallError> for RegistryError {
612 fn from(value: CallError) -> Self {
613 Self::CallError(value)
614 }
615}
616
617fn lookup_process<T: AsRef<str>>(registry: T, key: &RegistryKey) -> Option<Pid> {
619 REGISTRY
620 .get(registry.as_ref())
621 .and_then(|registry| registry.get(key).map(|entry| *entry.value()))
622}
623
624fn remove_process<T: AsRef<str>>(registry: T, key: &RegistryKey) -> Option<Pid> {
626 REGISTRY
627 .get_mut(registry.as_ref())
628 .and_then(|registry| registry.remove(key).map(|entry| entry.1))
629}
630
631fn count_processes<T: AsRef<str>>(registry: T) -> usize {
633 REGISTRY
634 .get(registry.as_ref())
635 .map(|registry| registry.len())
636 .unwrap_or_default()
637}
638
639fn list_processes<T: AsRef<str>>(registry: T) -> Vec<(RegistryKey, Pid)> {
641 REGISTRY
642 .get(registry.as_ref())
643 .map(|registry| {
644 registry
645 .iter()
646 .map(|entry| (entry.key().clone(), *entry.value()))
647 .collect()
648 })
649 .unwrap_or_default()
650}
651
652fn register_process<T: Into<String>>(registry: T, key: RegistryKey, process: Pid) {
654 REGISTRY
655 .entry(registry.into())
656 .or_default()
657 .insert(key, process);
658}