edp_node/
registry.rs

1// Copyright (C) 2025-2026 Michael S. Klishin and Contributors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::errors::{Error, Result};
16use crate::process::ProcessHandle;
17use erltf::types::{Atom, ExternalPid};
18use std::collections::HashMap;
19use std::collections::hash_map::Entry;
20use std::sync::Arc;
21use tokio::sync::RwLock;
22
23#[derive(Clone)]
24pub struct ProcessRegistry {
25    by_pid: Arc<RwLock<HashMap<ExternalPid, ProcessHandle>>>,
26    by_name: Arc<RwLock<HashMap<Atom, ExternalPid>>>,
27}
28
29impl ProcessRegistry {
30    pub fn new() -> Self {
31        Self {
32            by_pid: Arc::new(RwLock::new(HashMap::new())),
33            by_name: Arc::new(RwLock::new(HashMap::new())),
34        }
35    }
36
37    pub async fn insert(&self, pid: ExternalPid, handle: ProcessHandle) {
38        self.by_pid.write().await.insert(pid, handle);
39    }
40
41    pub async fn remove(&self, pid: &ExternalPid) -> Option<ProcessHandle> {
42        self.by_pid.write().await.remove(pid)
43    }
44
45    pub async fn get(&self, pid: &ExternalPid) -> Option<ProcessHandle> {
46        self.by_pid.read().await.get(pid).cloned()
47    }
48
49    pub async fn register(&self, name: Atom, pid: ExternalPid) -> Result<()> {
50        let mut names = self.by_name.write().await;
51        match names.entry(name.clone()) {
52            Entry::Occupied(_) => Err(Error::NameAlreadyRegistered(name)),
53            Entry::Vacant(e) => {
54                e.insert(pid);
55                Ok(())
56            }
57        }
58    }
59
60    pub async fn unregister(&self, name: &Atom) -> Result<()> {
61        self.by_name
62            .write()
63            .await
64            .remove(name)
65            .map(|_| ())
66            .ok_or_else(|| Error::NameNotRegistered(name.clone()))
67    }
68
69    pub async fn whereis(&self, name: &Atom) -> Option<ExternalPid> {
70        self.by_name.read().await.get(name).cloned()
71    }
72
73    pub async fn registered(&self) -> Vec<Atom> {
74        self.by_name.read().await.keys().cloned().collect()
75    }
76
77    pub async fn count(&self) -> usize {
78        self.by_pid.read().await.len()
79    }
80}
81
82impl Default for ProcessRegistry {
83    fn default() -> Self {
84        Self::new()
85    }
86}