use crate::SendError;
use crate::process_handle::ProcessHandle;
use dashmap::DashMap;
use starlang_core::{Pid, Term};
use std::sync::{Arc, OnceLock};
pub type RemoteSendHook = fn(pid: Pid, data: Vec<u8>) -> Result<(), SendError>;
static REMOTE_SEND_HOOK: OnceLock<RemoteSendHook> = OnceLock::new();
pub fn set_remote_send_hook(hook: RemoteSendHook) -> Result<(), RemoteSendHook> {
REMOTE_SEND_HOOK.set(hook)
}
#[derive(Clone)]
pub struct ProcessRegistry {
processes: Arc<DashMap<Pid, ProcessHandle>>,
names: Arc<DashMap<String, Pid>>,
}
impl ProcessRegistry {
pub fn new() -> Self {
Self {
processes: Arc::new(DashMap::new()),
names: Arc::new(DashMap::new()),
}
}
pub fn register(&self, handle: ProcessHandle) {
self.processes.insert(handle.pid(), handle);
}
pub fn unregister(&self, pid: Pid) -> Option<ProcessHandle> {
self.names.retain(|_, &mut p| p != pid);
self.processes.remove(&pid).map(|(_, h)| h)
}
pub fn get(&self, pid: Pid) -> Option<ProcessHandle> {
self.processes.get(&pid).map(|r| r.value().clone())
}
pub fn contains(&self, pid: Pid) -> bool {
self.processes.contains_key(&pid)
}
pub fn len(&self) -> usize {
self.processes.len()
}
pub fn is_empty(&self) -> bool {
self.processes.is_empty()
}
pub fn send_raw(&self, pid: Pid, data: Vec<u8>) -> Result<(), SendError> {
if !pid.is_local() {
if let Some(hook) = REMOTE_SEND_HOOK.get() {
return hook(pid, data);
} else {
return Err(SendError::ProcessNotFound(pid));
}
}
match self.processes.get(&pid) {
Some(handle) => handle.send_raw(data),
None => Err(SendError::ProcessNotFound(pid)),
}
}
pub fn send<M: Term>(&self, pid: Pid, msg: &M) -> Result<(), SendError> {
self.send_raw(pid, msg.encode())
}
pub fn register_name(&self, name: String, pid: Pid) -> bool {
if self.names.contains_key(&name) {
return false;
}
self.names.insert(name, pid);
true
}
pub fn whereis(&self, name: &str) -> Option<Pid> {
self.names.get(name).map(|r| *r.value())
}
pub fn unregister_name(&self, name: &str) -> Option<Pid> {
self.names.remove(name).map(|(_, pid)| pid)
}
pub fn registered_names(&self) -> Vec<String> {
self.names.iter().map(|r| r.key().clone()).collect()
}
pub fn pids(&self) -> Vec<Pid> {
self.processes.iter().map(|r| *r.key()).collect()
}
pub fn for_each<F>(&self, f: F)
where
F: FnMut(ProcessHandle),
{
self.processes.iter().map(|r| r.value().clone()).for_each(f);
}
}
impl Default for ProcessRegistry {
fn default() -> Self {
Self::new()
}
}
impl std::fmt::Debug for ProcessRegistry {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ProcessRegistry")
.field("process_count", &self.processes.len())
.field("name_count", &self.names.len())
.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::mailbox::Mailbox;
use crate::process_handle::ProcessState;
use std::sync::RwLock;
fn create_test_handle(pid: Pid) -> ProcessHandle {
let (_mailbox, sender) = Mailbox::new();
let state = Arc::new(RwLock::new(ProcessState::new(pid)));
ProcessHandle::new(pid, sender, state, None)
}
#[test]
fn test_register_and_get() {
let registry = ProcessRegistry::new();
let pid = Pid::new();
let handle = create_test_handle(pid);
registry.register(handle);
assert!(registry.contains(pid));
assert_eq!(registry.len(), 1);
let retrieved = registry.get(pid).unwrap();
assert_eq!(retrieved.pid(), pid);
}
#[test]
fn test_unregister() {
let registry = ProcessRegistry::new();
let pid = Pid::new();
let handle = create_test_handle(pid);
registry.register(handle);
assert!(registry.contains(pid));
let removed = registry.unregister(pid);
assert!(removed.is_some());
assert!(!registry.contains(pid));
assert!(registry.is_empty());
}
#[test]
fn test_name_registration() {
let registry = ProcessRegistry::new();
let pid = Pid::new();
let handle = create_test_handle(pid);
registry.register(handle);
assert!(registry.register_name("my_process".to_string(), pid));
assert_eq!(registry.whereis("my_process"), Some(pid));
let pid2 = Pid::new();
assert!(!registry.register_name("my_process".to_string(), pid2));
assert_eq!(registry.unregister_name("my_process"), Some(pid));
assert_eq!(registry.whereis("my_process"), None);
}
#[test]
fn test_unregister_removes_names() {
let registry = ProcessRegistry::new();
let pid = Pid::new();
let handle = create_test_handle(pid);
registry.register(handle);
registry.register_name("my_process".to_string(), pid);
registry.unregister(pid);
assert_eq!(registry.whereis("my_process"), None);
}
#[test]
fn test_pids_and_names() {
let registry = ProcessRegistry::new();
let pid1 = Pid::new();
let pid2 = Pid::new();
registry.register(create_test_handle(pid1));
registry.register(create_test_handle(pid2));
registry.register_name("proc1".to_string(), pid1);
registry.register_name("proc2".to_string(), pid2);
let pids = registry.pids();
assert_eq!(pids.len(), 2);
assert!(pids.contains(&pid1));
assert!(pids.contains(&pid2));
let names = registry.registered_names();
assert_eq!(names.len(), 2);
assert!(names.contains(&"proc1".to_string()));
assert!(names.contains(&"proc2".to_string()));
}
}