edp_node/
node.rs

1// Copyright (C) 2025-2026 Michael S. Klishin and Contributors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::errors::{Error, Result};
16use crate::mailbox::{Mailbox, Message};
17use crate::process::{Process, spawn_process};
18use crate::registry::ProcessRegistry;
19use dashmap::DashMap;
20use edp_client::control::ControlMessage;
21use edp_client::epmd_client::{EpmdClient, NodeType};
22use edp_client::{Connection, ConnectionConfig, PidAllocator};
23use erltf::OwnedTerm;
24use erltf::types::{Atom, ExternalPid, ExternalReference};
25use std::sync::Arc;
26use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
27use std::time::Duration;
28use tokio::sync::{Mutex, oneshot};
29
30pub const DEFAULT_RPC_TIMEOUT: Duration = Duration::from_secs(10);
31
32pub struct Node {
33    name: Atom,
34    cookie: String,
35    creation: Arc<AtomicU32>,
36    pid_allocator: Arc<PidAllocator>,
37    reference_counter: Arc<AtomicU32>,
38    registry: Arc<ProcessRegistry>,
39    connections: Arc<DashMap<String, Arc<Mutex<Connection>>>>,
40    pending_rpcs: Arc<DashMap<String, oneshot::Sender<OwnedTerm>>>,
41    started: Arc<AtomicBool>,
42    listen_port: Option<u16>,
43    hidden: bool,
44}
45
46impl Node {
47    pub fn new(name: impl Into<String>, cookie: impl Into<String>) -> Self {
48        Self::with_hidden(name, cookie, false)
49    }
50
51    pub fn new_hidden(name: impl Into<String>, cookie: impl Into<String>) -> Self {
52        Self::with_hidden(name, cookie, true)
53    }
54
55    pub async fn connect_to(
56        name: impl Into<String>,
57        cookie: impl Into<String>,
58        remote_node: impl Into<String>,
59    ) -> Result<Self> {
60        Self::connect_to_with_hidden(name, cookie, remote_node, false).await
61    }
62
63    pub async fn connect_to_hidden(
64        name: impl Into<String>,
65        cookie: impl Into<String>,
66        remote_node: impl Into<String>,
67    ) -> Result<Self> {
68        Self::connect_to_with_hidden(name, cookie, remote_node, true).await
69    }
70
71    async fn connect_to_with_hidden(
72        name: impl Into<String>,
73        cookie: impl Into<String>,
74        remote_node: impl Into<String>,
75        hidden: bool,
76    ) -> Result<Self> {
77        let mut node = Self::with_hidden(name, cookie, hidden);
78        node.start(0).await?;
79        node.connect(remote_node).await?;
80        Ok(node)
81    }
82
83    fn with_hidden(name: impl Into<String>, cookie: impl Into<String>, hidden: bool) -> Self {
84        let name_atom = Atom::new(name.into());
85        let creation = 1;
86        let pid_allocator = Arc::new(PidAllocator::new(name_atom.clone(), creation));
87        let creation = Arc::new(AtomicU32::new(creation));
88
89        Self {
90            name: name_atom,
91            cookie: cookie.into(),
92            creation,
93            pid_allocator,
94            reference_counter: Arc::new(AtomicU32::new(0)),
95            registry: Arc::new(ProcessRegistry::new()),
96            connections: Arc::new(DashMap::new()),
97            pending_rpcs: Arc::new(DashMap::new()),
98            started: Arc::new(AtomicBool::new(false)),
99            listen_port: None,
100            hidden,
101        }
102    }
103
104    pub fn registry(&self) -> Arc<ProcessRegistry> {
105        self.registry.clone()
106    }
107
108    pub async fn start(&mut self, port: u16) -> Result<()> {
109        if self.started.swap(true, Ordering::SeqCst) {
110            return Err(Error::NodeAlreadyStarted);
111        }
112
113        let (node_name, _host) =
114            self.name.as_str().split_once('@').ok_or_else(|| {
115                Error::EpmdRegistration(format!("Invalid node name: {}", self.name))
116            })?;
117
118        let epmd = EpmdClient::new("localhost");
119        let creation = epmd
120            .register_node(port, node_name, NodeType::Normal, 6, 6, &[])
121            .await
122            .map_err(|e| Error::EpmdRegistration(e.to_string()))?;
123
124        self.creation.store(creation, Ordering::SeqCst);
125        self.pid_allocator.set_creation(creation);
126        self.listen_port = Some(port);
127
128        tracing::debug!(
129            "Node {} started on port {} with creation {}",
130            self.name,
131            port,
132            creation
133        );
134        Ok(())
135    }
136
137    pub async fn connect(&self, remote_node: impl Into<String>) -> Result<()> {
138        let remote_node = remote_node.into();
139
140        if self.connections.contains_key(&remote_node) {
141            return Ok(());
142        }
143
144        let config = if self.hidden {
145            ConnectionConfig::new_hidden(self.name.as_str(), &remote_node, &self.cookie)
146        } else {
147            ConnectionConfig::new(self.name.as_str(), &remote_node, &self.cookie)
148        };
149
150        let mut conn = Connection::new(config);
151        conn.connect().await?;
152
153        let read_half = conn.take_read_half().ok_or_else(|| {
154            edp_client::Error::InvalidStateMessage(
155                "Failed to take read half from connection".to_string(),
156            )
157        })?;
158
159        let timeout = conn.timeout();
160
161        self.connections
162            .insert(remote_node.clone(), Arc::new(Mutex::new(conn)));
163
164        self.spawn_receiver_task(remote_node.clone(), read_half, timeout);
165
166        tracing::debug!("Connected to {}", remote_node);
167        Ok(())
168    }
169
170    fn spawn_receiver_task(
171        &self,
172        remote_node: String,
173        mut read_half: edp_client::OwnedReadHalf,
174        timeout: std::time::Duration,
175    ) {
176        let registry = self.registry.clone();
177        let pending_rpcs = self.pending_rpcs.clone();
178        let connections = self.connections.clone();
179        let remote_node_clone = remote_node.clone();
180
181        tokio::spawn(async move {
182            loop {
183                let result =
184                    edp_client::Connection::receive_message_from_read_half(&mut read_half, timeout)
185                        .await;
186
187                match result {
188                    Ok((control_msg, payload)) => {
189                        let payload_len = payload.as_ref().map(|p| p.len()).unwrap_or(0);
190                        tracing::debug!(
191                            "Received control message from {}, payload size: {} bytes",
192                            remote_node,
193                            payload_len
194                        );
195                        tracing::debug!(
196                            "Control message details: {:?}, payload: {:?}",
197                            control_msg,
198                            payload
199                        );
200                        if let Err(e) =
201                            Self::route_message(&registry, &pending_rpcs, control_msg, payload)
202                                .await
203                        {
204                            tracing::error!("Failed to route message: {}", e);
205                        }
206                    }
207                    Err(e) => {
208                        if e.to_string().contains("Decode error") {
209                            tracing::warn!(
210                                "Failed to decode message from {} (likely unsupported message type): {}",
211                                remote_node,
212                                e
213                            );
214                            continue;
215                        }
216                        tracing::error!("Error receiving message from {}: {}", remote_node, e);
217                        break;
218                    }
219                }
220            }
221
222            connections.remove(&remote_node_clone);
223            tracing::debug!(
224                "Receiver task for {} terminated, connection removed",
225                remote_node
226            );
227        });
228    }
229
230    async fn route_message(
231        registry: &ProcessRegistry,
232        pending_rpcs: &DashMap<String, oneshot::Sender<OwnedTerm>>,
233        control_msg: ControlMessage,
234        payload: Option<OwnedTerm>,
235    ) -> Result<()> {
236        match control_msg {
237            ControlMessage::Send { to_pid, .. } => {
238                if let Some(body) = payload
239                    && let OwnedTerm::Pid(pid) = to_pid
240                {
241                    if let Some(handle) = registry.get(&pid).await {
242                        handle.send(Message::Regular { from: None, body }).await?;
243                    } else {
244                        let pid_str = format!("{}.{}.{}", pid.id, pid.serial, pid.creation);
245                        if let Some((_key, sender)) = pending_rpcs.remove(&pid_str) {
246                            let _ = sender.send(body);
247                        }
248                    }
249                }
250            }
251            ControlMessage::RegSend { to_name, .. } => {
252                if let Some(body) = payload
253                    && let OwnedTerm::Atom(name) = to_name
254                    && let Some(pid) = registry.whereis(&name).await
255                    && let Some(handle) = registry.get(&pid).await
256                {
257                    handle.send(Message::Regular { from: None, body }).await?;
258                }
259            }
260            ControlMessage::Exit {
261                from_pid,
262                to_pid,
263                reason,
264            } => {
265                if let OwnedTerm::Pid(from) = from_pid
266                    && let OwnedTerm::Pid(to) = to_pid
267                    && let Some(handle) = registry.get(&to).await
268                {
269                    handle.send(Message::Exit { from, reason }).await?;
270                }
271            }
272            ControlMessage::MonitorPExit {
273                from_proc,
274                to_pid,
275                reference,
276                reason,
277            } => {
278                if let OwnedTerm::Pid(from) = from_proc
279                    && let OwnedTerm::Pid(to) = to_pid
280                    && let OwnedTerm::Reference(ref_val) = reference
281                    && let Some(handle) = registry.get(&to).await
282                {
283                    handle
284                        .send(Message::MonitorExit {
285                            monitored: from,
286                            reference: ref_val,
287                            reason,
288                        })
289                        .await?;
290                }
291            }
292            _ => {}
293        }
294
295        Ok(())
296    }
297
298    pub async fn spawn<P: Process>(&self, process: P) -> Result<ExternalPid> {
299        if !self.started.load(Ordering::SeqCst) {
300            return Err(Error::NodeNotStarted);
301        }
302
303        let mailbox = Mailbox::new();
304        let pid = self
305            .pid_allocator
306            .allocate()
307            .expect("PID allocator lock poisoned");
308
309        let handle = spawn_process(process, mailbox, self.registry.clone(), pid.clone()).await;
310
311        self.registry.insert(pid.clone(), handle).await;
312
313        tracing::debug!("Spawned process: {:?}", pid);
314        Ok(pid)
315    }
316
317    pub async fn register(&self, name: Atom, pid: ExternalPid) -> Result<()> {
318        self.registry.register(name, pid).await
319    }
320
321    pub async fn unregister(&self, name: &Atom) -> Result<()> {
322        self.registry.unregister(name).await
323    }
324
325    pub async fn whereis(&self, name: &Atom) -> Option<ExternalPid> {
326        self.registry.whereis(name).await
327    }
328
329    pub async fn registered(&self) -> Vec<Atom> {
330        self.registry.registered().await
331    }
332
333    pub async fn send(&self, to: &ExternalPid, message: OwnedTerm) -> Result<()> {
334        if to.node == self.name {
335            self.send_local(to, message).await
336        } else {
337            self.send_remote(to, message).await
338        }
339    }
340
341    pub async fn send_to_name(&self, to: &Atom, message: OwnedTerm) -> Result<()> {
342        let pid = self
343            .whereis(to)
344            .await
345            .ok_or_else(|| Error::NameNotRegistered(to.clone()))?;
346        self.send(&pid, message).await
347    }
348
349    async fn send_local(&self, to: &ExternalPid, message: OwnedTerm) -> Result<()> {
350        if let Some(handle) = self.registry.get(to).await {
351            handle
352                .send(Message::Regular {
353                    from: None,
354                    body: message,
355                })
356                .await?;
357            Ok(())
358        } else {
359            Err(Error::ProcessNotFound(to.clone()))
360        }
361    }
362
363    async fn send_remote(&self, to: &ExternalPid, message: OwnedTerm) -> Result<()> {
364        let node_name = to.node.as_str();
365
366        if let Some(conn) = self.connections.get(node_name) {
367            let from = self
368                .pid_allocator
369                .allocate()
370                .expect("PID allocator lock poisoned");
371            let mut conn_guard = conn.lock().await;
372            conn_guard.send_message(from, to.clone(), message).await?;
373            Ok(())
374        } else {
375            Err(Error::NodeNotConnected(node_name.to_string()))
376        }
377    }
378
379    pub async fn link(&self, from: &ExternalPid, to: &ExternalPid) -> Result<()> {
380        if let Some(from_handle) = self.registry.get(from).await {
381            from_handle.add_link(to.clone()).await;
382        }
383
384        if to.node == self.name {
385            if let Some(to_handle) = self.registry.get(to).await {
386                to_handle.add_link(from.clone()).await;
387            }
388            Ok(())
389        } else {
390            let node_name = to.node.as_str();
391
392            if let Some(conn) = self.connections.get(node_name) {
393                let mut conn_guard = conn.lock().await;
394                conn_guard.link(from, to).await?;
395                Ok(())
396            } else {
397                Err(Error::NodeNotConnected(node_name.to_string()))
398            }
399        }
400    }
401
402    pub async fn unlink(&self, from: &ExternalPid, to: &ExternalPid) -> Result<()> {
403        if let Some(from_handle) = self.registry.get(from).await {
404            from_handle.remove_link(to).await;
405        }
406
407        if to.node == self.name {
408            if let Some(to_handle) = self.registry.get(to).await {
409                to_handle.remove_link(from).await;
410            }
411            Ok(())
412        } else {
413            let node_name = to.node.as_str();
414
415            if let Some(conn) = self.connections.get(node_name) {
416                let unlink_id = self.reference_counter.fetch_add(1, Ordering::SeqCst) as u64;
417                let mut conn_guard = conn.lock().await;
418                conn_guard.unlink(from, to, unlink_id).await?;
419                Ok(())
420            } else {
421                Err(Error::NodeNotConnected(node_name.to_string()))
422            }
423        }
424    }
425
426    pub fn make_reference(&self) -> ExternalReference {
427        let id0 = self.reference_counter.fetch_add(1, Ordering::SeqCst);
428        let id1 = self.reference_counter.fetch_add(1, Ordering::SeqCst);
429        let id2 = self.reference_counter.fetch_add(1, Ordering::SeqCst);
430        ExternalReference::new(
431            self.name.clone(),
432            self.creation.load(Ordering::SeqCst),
433            vec![id0, id1, id2],
434        )
435    }
436
437    pub async fn monitor(&self, from: &ExternalPid, to: &ExternalPid) -> Result<ExternalReference> {
438        let reference = self.make_reference();
439
440        if to.node == self.name {
441            if let Some(to_handle) = self.registry.get(to).await {
442                to_handle.add_monitor(from.clone(), reference.clone()).await;
443            }
444            Ok(reference)
445        } else {
446            let node_name = to.node.as_str();
447
448            if let Some(conn) = self.connections.get(node_name) {
449                let mut conn_guard = conn.lock().await;
450                conn_guard.monitor(from, to, &reference).await?;
451                Ok(reference)
452            } else {
453                Err(Error::NodeNotConnected(node_name.to_string()))
454            }
455        }
456    }
457
458    pub async fn demonitor(
459        &self,
460        from: &ExternalPid,
461        to: &ExternalPid,
462        reference: &ExternalReference,
463    ) -> Result<()> {
464        if to.node == self.name {
465            if let Some(to_handle) = self.registry.get(to).await {
466                to_handle.remove_monitor(reference).await;
467            }
468            Ok(())
469        } else {
470            let node_name = to.node.as_str();
471
472            if let Some(conn) = self.connections.get(node_name) {
473                let mut conn_guard = conn.lock().await;
474                conn_guard.demonitor(from, to, reference).await?;
475                Ok(())
476            } else {
477                Err(Error::NodeNotConnected(node_name.to_string()))
478            }
479        }
480    }
481
482    pub fn name(&self) -> &Atom {
483        &self.name
484    }
485
486    pub fn creation(&self) -> u32 {
487        self.creation.load(Ordering::SeqCst)
488    }
489
490    pub async fn process_count(&self) -> usize {
491        self.registry.count().await
492    }
493
494    pub fn connections(&self) -> Arc<DashMap<String, Arc<Mutex<Connection>>>> {
495        self.connections.clone()
496    }
497
498    pub fn cookie(&self) -> &str {
499        &self.cookie
500    }
501
502    pub async fn rpc_call(
503        &self,
504        remote_node: &str,
505        module: &str,
506        function: &str,
507        args: Vec<OwnedTerm>,
508    ) -> Result<OwnedTerm> {
509        self.rpc_call_with_timeout(remote_node, module, function, args, DEFAULT_RPC_TIMEOUT)
510            .await
511    }
512
513    pub async fn rpc_call_with_timeout(
514        &self,
515        remote_node: &str,
516        module: &str,
517        function: &str,
518        args: Vec<OwnedTerm>,
519        timeout: Duration,
520    ) -> Result<OwnedTerm> {
521        let response = self
522            .rpc_call_raw_with_timeout(remote_node, module, function, args, timeout)
523            .await?;
524        response.into_rex_response().map_err(Error::from)
525    }
526
527    pub async fn rpc_call_raw(
528        &self,
529        remote_node: &str,
530        module: &str,
531        function: &str,
532        args: Vec<OwnedTerm>,
533    ) -> Result<OwnedTerm> {
534        self.rpc_call_raw_with_timeout(remote_node, module, function, args, DEFAULT_RPC_TIMEOUT)
535            .await
536    }
537
538    pub async fn rpc_call_raw_with_timeout(
539        &self,
540        remote_node: &str,
541        module: &str,
542        function: &str,
543        args: Vec<OwnedTerm>,
544        timeout: Duration,
545    ) -> Result<OwnedTerm> {
546        let reply_to_pid = self
547            .pid_allocator
548            .allocate()
549            .expect("PID allocator lock poisoned");
550
551        let call_request = OwnedTerm::Tuple(vec![
552            OwnedTerm::Pid(reply_to_pid.clone()),
553            OwnedTerm::Tuple(vec![
554                OwnedTerm::Atom(Atom::new("call")),
555                OwnedTerm::Atom(Atom::new(module)),
556                OwnedTerm::Atom(Atom::new(function)),
557                OwnedTerm::List(args),
558                OwnedTerm::Atom(Atom::new("user")),
559            ]),
560        ]);
561
562        let (tx, rx) = oneshot::channel();
563        let pid_str = format!(
564            "{}.{}.{}",
565            reply_to_pid.id, reply_to_pid.serial, reply_to_pid.creation
566        );
567        self.pending_rpcs.insert(pid_str.clone(), tx);
568
569        tracing::debug!("RPC call_request: {:?}", call_request);
570        tracing::debug!("RPC reply_to_pid: {:?}", reply_to_pid);
571
572        tracing::trace!("Looking up connection for node: {}", remote_node);
573        if let Some(conn) = self.connections.get(remote_node) {
574            tracing::trace!("Found connection, sending to rex");
575            let mut conn_guard = conn.lock().await;
576            conn_guard
577                .send_to_name(reply_to_pid, Atom::new("rex"), call_request)
578                .await?;
579            tracing::trace!("Message sent to rex");
580        } else {
581            tracing::error!("No connection found for node: {}", remote_node);
582            self.pending_rpcs.remove(&pid_str);
583            return Err(Error::NodeNotConnected(remote_node.to_string()));
584        }
585
586        let response = tokio::time::timeout(timeout, rx).await;
587
588        if response.is_err() {
589            self.pending_rpcs.remove(&pid_str);
590        }
591
592        let response = response
593            .map_err(|_| Error::RpcTimeout(timeout))?
594            .map_err(|_| Error::RpcCancelled)?;
595
596        Ok(response)
597    }
598}