1use crate::errors::{Error, Result};
16use crate::mailbox::{Mailbox, Message};
17use crate::process::{Process, spawn_process};
18use crate::registry::ProcessRegistry;
19use dashmap::DashMap;
20use edp_client::control::ControlMessage;
21use edp_client::epmd_client::{EpmdClient, NodeType};
22use edp_client::{Connection, ConnectionConfig, PidAllocator};
23use erltf::OwnedTerm;
24use erltf::types::{Atom, ExternalPid, ExternalReference};
25use std::sync::Arc;
26use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
27use std::time::Duration;
28use tokio::sync::{Mutex, oneshot};
29
30pub const DEFAULT_RPC_TIMEOUT: Duration = Duration::from_secs(10);
31
32pub struct Node {
33 name: Atom,
34 cookie: String,
35 creation: Arc<AtomicU32>,
36 pid_allocator: Arc<PidAllocator>,
37 reference_counter: Arc<AtomicU32>,
38 registry: Arc<ProcessRegistry>,
39 connections: Arc<DashMap<String, Arc<Mutex<Connection>>>>,
40 pending_rpcs: Arc<DashMap<String, oneshot::Sender<OwnedTerm>>>,
41 started: Arc<AtomicBool>,
42 listen_port: Option<u16>,
43 hidden: bool,
44}
45
46impl Node {
47 pub fn new(name: impl Into<String>, cookie: impl Into<String>) -> Self {
48 Self::with_hidden(name, cookie, false)
49 }
50
51 pub fn new_hidden(name: impl Into<String>, cookie: impl Into<String>) -> Self {
52 Self::with_hidden(name, cookie, true)
53 }
54
55 pub async fn connect_to(
56 name: impl Into<String>,
57 cookie: impl Into<String>,
58 remote_node: impl Into<String>,
59 ) -> Result<Self> {
60 Self::connect_to_with_hidden(name, cookie, remote_node, false).await
61 }
62
63 pub async fn connect_to_hidden(
64 name: impl Into<String>,
65 cookie: impl Into<String>,
66 remote_node: impl Into<String>,
67 ) -> Result<Self> {
68 Self::connect_to_with_hidden(name, cookie, remote_node, true).await
69 }
70
71 async fn connect_to_with_hidden(
72 name: impl Into<String>,
73 cookie: impl Into<String>,
74 remote_node: impl Into<String>,
75 hidden: bool,
76 ) -> Result<Self> {
77 let mut node = Self::with_hidden(name, cookie, hidden);
78 node.start(0).await?;
79 node.connect(remote_node).await?;
80 Ok(node)
81 }
82
83 fn with_hidden(name: impl Into<String>, cookie: impl Into<String>, hidden: bool) -> Self {
84 let name_atom = Atom::new(name.into());
85 let creation = 1;
86 let pid_allocator = Arc::new(PidAllocator::new(name_atom.clone(), creation));
87 let creation = Arc::new(AtomicU32::new(creation));
88
89 Self {
90 name: name_atom,
91 cookie: cookie.into(),
92 creation,
93 pid_allocator,
94 reference_counter: Arc::new(AtomicU32::new(0)),
95 registry: Arc::new(ProcessRegistry::new()),
96 connections: Arc::new(DashMap::new()),
97 pending_rpcs: Arc::new(DashMap::new()),
98 started: Arc::new(AtomicBool::new(false)),
99 listen_port: None,
100 hidden,
101 }
102 }
103
104 pub fn registry(&self) -> Arc<ProcessRegistry> {
105 self.registry.clone()
106 }
107
108 pub async fn start(&mut self, port: u16) -> Result<()> {
109 if self.started.swap(true, Ordering::SeqCst) {
110 return Err(Error::NodeAlreadyStarted);
111 }
112
113 let (node_name, _host) =
114 self.name.as_str().split_once('@').ok_or_else(|| {
115 Error::EpmdRegistration(format!("Invalid node name: {}", self.name))
116 })?;
117
118 let epmd = EpmdClient::new("localhost");
119 let creation = epmd
120 .register_node(port, node_name, NodeType::Normal, 6, 6, &[])
121 .await
122 .map_err(|e| Error::EpmdRegistration(e.to_string()))?;
123
124 self.creation.store(creation, Ordering::SeqCst);
125 self.pid_allocator.set_creation(creation);
126 self.listen_port = Some(port);
127
128 tracing::debug!(
129 "Node {} started on port {} with creation {}",
130 self.name,
131 port,
132 creation
133 );
134 Ok(())
135 }
136
137 pub async fn connect(&self, remote_node: impl Into<String>) -> Result<()> {
138 let remote_node = remote_node.into();
139
140 if self.connections.contains_key(&remote_node) {
141 return Ok(());
142 }
143
144 let config = if self.hidden {
145 ConnectionConfig::new_hidden(self.name.as_str(), &remote_node, &self.cookie)
146 } else {
147 ConnectionConfig::new(self.name.as_str(), &remote_node, &self.cookie)
148 };
149
150 let mut conn = Connection::new(config);
151 conn.connect().await?;
152
153 let read_half = conn.take_read_half().ok_or_else(|| {
154 edp_client::Error::InvalidStateMessage(
155 "Failed to take read half from connection".to_string(),
156 )
157 })?;
158
159 let timeout = conn.timeout();
160
161 self.connections
162 .insert(remote_node.clone(), Arc::new(Mutex::new(conn)));
163
164 self.spawn_receiver_task(remote_node.clone(), read_half, timeout);
165
166 tracing::debug!("Connected to {}", remote_node);
167 Ok(())
168 }
169
170 fn spawn_receiver_task(
171 &self,
172 remote_node: String,
173 mut read_half: edp_client::OwnedReadHalf,
174 timeout: std::time::Duration,
175 ) {
176 let registry = self.registry.clone();
177 let pending_rpcs = self.pending_rpcs.clone();
178 let connections = self.connections.clone();
179 let remote_node_clone = remote_node.clone();
180
181 tokio::spawn(async move {
182 loop {
183 let result =
184 edp_client::Connection::receive_message_from_read_half(&mut read_half, timeout)
185 .await;
186
187 match result {
188 Ok((control_msg, payload)) => {
189 let payload_len = payload.as_ref().map(|p| p.len()).unwrap_or(0);
190 tracing::debug!(
191 "Received control message from {}, payload size: {} bytes",
192 remote_node,
193 payload_len
194 );
195 tracing::debug!(
196 "Control message details: {:?}, payload: {:?}",
197 control_msg,
198 payload
199 );
200 if let Err(e) =
201 Self::route_message(®istry, &pending_rpcs, control_msg, payload)
202 .await
203 {
204 tracing::error!("Failed to route message: {}", e);
205 }
206 }
207 Err(e) => {
208 if e.to_string().contains("Decode error") {
209 tracing::warn!(
210 "Failed to decode message from {} (likely unsupported message type): {}",
211 remote_node,
212 e
213 );
214 continue;
215 }
216 tracing::error!("Error receiving message from {}: {}", remote_node, e);
217 break;
218 }
219 }
220 }
221
222 connections.remove(&remote_node_clone);
223 tracing::debug!(
224 "Receiver task for {} terminated, connection removed",
225 remote_node
226 );
227 });
228 }
229
230 async fn route_message(
231 registry: &ProcessRegistry,
232 pending_rpcs: &DashMap<String, oneshot::Sender<OwnedTerm>>,
233 control_msg: ControlMessage,
234 payload: Option<OwnedTerm>,
235 ) -> Result<()> {
236 match control_msg {
237 ControlMessage::Send { to_pid, .. } => {
238 if let Some(body) = payload
239 && let OwnedTerm::Pid(pid) = to_pid
240 {
241 if let Some(handle) = registry.get(&pid).await {
242 handle.send(Message::Regular { from: None, body }).await?;
243 } else {
244 let pid_str = format!("{}.{}.{}", pid.id, pid.serial, pid.creation);
245 if let Some((_key, sender)) = pending_rpcs.remove(&pid_str) {
246 let _ = sender.send(body);
247 }
248 }
249 }
250 }
251 ControlMessage::RegSend { to_name, .. } => {
252 if let Some(body) = payload
253 && let OwnedTerm::Atom(name) = to_name
254 && let Some(pid) = registry.whereis(&name).await
255 && let Some(handle) = registry.get(&pid).await
256 {
257 handle.send(Message::Regular { from: None, body }).await?;
258 }
259 }
260 ControlMessage::Exit {
261 from_pid,
262 to_pid,
263 reason,
264 } => {
265 if let OwnedTerm::Pid(from) = from_pid
266 && let OwnedTerm::Pid(to) = to_pid
267 && let Some(handle) = registry.get(&to).await
268 {
269 handle.send(Message::Exit { from, reason }).await?;
270 }
271 }
272 ControlMessage::MonitorPExit {
273 from_proc,
274 to_pid,
275 reference,
276 reason,
277 } => {
278 if let OwnedTerm::Pid(from) = from_proc
279 && let OwnedTerm::Pid(to) = to_pid
280 && let OwnedTerm::Reference(ref_val) = reference
281 && let Some(handle) = registry.get(&to).await
282 {
283 handle
284 .send(Message::MonitorExit {
285 monitored: from,
286 reference: ref_val,
287 reason,
288 })
289 .await?;
290 }
291 }
292 _ => {}
293 }
294
295 Ok(())
296 }
297
298 pub async fn spawn<P: Process>(&self, process: P) -> Result<ExternalPid> {
299 if !self.started.load(Ordering::SeqCst) {
300 return Err(Error::NodeNotStarted);
301 }
302
303 let mailbox = Mailbox::new();
304 let pid = self
305 .pid_allocator
306 .allocate()
307 .expect("PID allocator lock poisoned");
308
309 let handle = spawn_process(process, mailbox, self.registry.clone(), pid.clone()).await;
310
311 self.registry.insert(pid.clone(), handle).await;
312
313 tracing::debug!("Spawned process: {:?}", pid);
314 Ok(pid)
315 }
316
317 pub async fn register(&self, name: Atom, pid: ExternalPid) -> Result<()> {
318 self.registry.register(name, pid).await
319 }
320
321 pub async fn unregister(&self, name: &Atom) -> Result<()> {
322 self.registry.unregister(name).await
323 }
324
325 pub async fn whereis(&self, name: &Atom) -> Option<ExternalPid> {
326 self.registry.whereis(name).await
327 }
328
329 pub async fn registered(&self) -> Vec<Atom> {
330 self.registry.registered().await
331 }
332
333 pub async fn send(&self, to: &ExternalPid, message: OwnedTerm) -> Result<()> {
334 if to.node == self.name {
335 self.send_local(to, message).await
336 } else {
337 self.send_remote(to, message).await
338 }
339 }
340
341 pub async fn send_to_name(&self, to: &Atom, message: OwnedTerm) -> Result<()> {
342 let pid = self
343 .whereis(to)
344 .await
345 .ok_or_else(|| Error::NameNotRegistered(to.clone()))?;
346 self.send(&pid, message).await
347 }
348
349 async fn send_local(&self, to: &ExternalPid, message: OwnedTerm) -> Result<()> {
350 if let Some(handle) = self.registry.get(to).await {
351 handle
352 .send(Message::Regular {
353 from: None,
354 body: message,
355 })
356 .await?;
357 Ok(())
358 } else {
359 Err(Error::ProcessNotFound(to.clone()))
360 }
361 }
362
363 async fn send_remote(&self, to: &ExternalPid, message: OwnedTerm) -> Result<()> {
364 let node_name = to.node.as_str();
365
366 if let Some(conn) = self.connections.get(node_name) {
367 let from = self
368 .pid_allocator
369 .allocate()
370 .expect("PID allocator lock poisoned");
371 let mut conn_guard = conn.lock().await;
372 conn_guard.send_message(from, to.clone(), message).await?;
373 Ok(())
374 } else {
375 Err(Error::NodeNotConnected(node_name.to_string()))
376 }
377 }
378
379 pub async fn link(&self, from: &ExternalPid, to: &ExternalPid) -> Result<()> {
380 if let Some(from_handle) = self.registry.get(from).await {
381 from_handle.add_link(to.clone()).await;
382 }
383
384 if to.node == self.name {
385 if let Some(to_handle) = self.registry.get(to).await {
386 to_handle.add_link(from.clone()).await;
387 }
388 Ok(())
389 } else {
390 let node_name = to.node.as_str();
391
392 if let Some(conn) = self.connections.get(node_name) {
393 let mut conn_guard = conn.lock().await;
394 conn_guard.link(from, to).await?;
395 Ok(())
396 } else {
397 Err(Error::NodeNotConnected(node_name.to_string()))
398 }
399 }
400 }
401
402 pub async fn unlink(&self, from: &ExternalPid, to: &ExternalPid) -> Result<()> {
403 if let Some(from_handle) = self.registry.get(from).await {
404 from_handle.remove_link(to).await;
405 }
406
407 if to.node == self.name {
408 if let Some(to_handle) = self.registry.get(to).await {
409 to_handle.remove_link(from).await;
410 }
411 Ok(())
412 } else {
413 let node_name = to.node.as_str();
414
415 if let Some(conn) = self.connections.get(node_name) {
416 let unlink_id = self.reference_counter.fetch_add(1, Ordering::SeqCst) as u64;
417 let mut conn_guard = conn.lock().await;
418 conn_guard.unlink(from, to, unlink_id).await?;
419 Ok(())
420 } else {
421 Err(Error::NodeNotConnected(node_name.to_string()))
422 }
423 }
424 }
425
426 pub fn make_reference(&self) -> ExternalReference {
427 let id0 = self.reference_counter.fetch_add(1, Ordering::SeqCst);
428 let id1 = self.reference_counter.fetch_add(1, Ordering::SeqCst);
429 let id2 = self.reference_counter.fetch_add(1, Ordering::SeqCst);
430 ExternalReference::new(
431 self.name.clone(),
432 self.creation.load(Ordering::SeqCst),
433 vec![id0, id1, id2],
434 )
435 }
436
437 pub async fn monitor(&self, from: &ExternalPid, to: &ExternalPid) -> Result<ExternalReference> {
438 let reference = self.make_reference();
439
440 if to.node == self.name {
441 if let Some(to_handle) = self.registry.get(to).await {
442 to_handle.add_monitor(from.clone(), reference.clone()).await;
443 }
444 Ok(reference)
445 } else {
446 let node_name = to.node.as_str();
447
448 if let Some(conn) = self.connections.get(node_name) {
449 let mut conn_guard = conn.lock().await;
450 conn_guard.monitor(from, to, &reference).await?;
451 Ok(reference)
452 } else {
453 Err(Error::NodeNotConnected(node_name.to_string()))
454 }
455 }
456 }
457
458 pub async fn demonitor(
459 &self,
460 from: &ExternalPid,
461 to: &ExternalPid,
462 reference: &ExternalReference,
463 ) -> Result<()> {
464 if to.node == self.name {
465 if let Some(to_handle) = self.registry.get(to).await {
466 to_handle.remove_monitor(reference).await;
467 }
468 Ok(())
469 } else {
470 let node_name = to.node.as_str();
471
472 if let Some(conn) = self.connections.get(node_name) {
473 let mut conn_guard = conn.lock().await;
474 conn_guard.demonitor(from, to, reference).await?;
475 Ok(())
476 } else {
477 Err(Error::NodeNotConnected(node_name.to_string()))
478 }
479 }
480 }
481
482 pub fn name(&self) -> &Atom {
483 &self.name
484 }
485
486 pub fn creation(&self) -> u32 {
487 self.creation.load(Ordering::SeqCst)
488 }
489
490 pub async fn process_count(&self) -> usize {
491 self.registry.count().await
492 }
493
494 pub fn connections(&self) -> Arc<DashMap<String, Arc<Mutex<Connection>>>> {
495 self.connections.clone()
496 }
497
498 pub fn cookie(&self) -> &str {
499 &self.cookie
500 }
501
502 pub async fn rpc_call(
503 &self,
504 remote_node: &str,
505 module: &str,
506 function: &str,
507 args: Vec<OwnedTerm>,
508 ) -> Result<OwnedTerm> {
509 self.rpc_call_with_timeout(remote_node, module, function, args, DEFAULT_RPC_TIMEOUT)
510 .await
511 }
512
513 pub async fn rpc_call_with_timeout(
514 &self,
515 remote_node: &str,
516 module: &str,
517 function: &str,
518 args: Vec<OwnedTerm>,
519 timeout: Duration,
520 ) -> Result<OwnedTerm> {
521 let response = self
522 .rpc_call_raw_with_timeout(remote_node, module, function, args, timeout)
523 .await?;
524 response.into_rex_response().map_err(Error::from)
525 }
526
527 pub async fn rpc_call_raw(
528 &self,
529 remote_node: &str,
530 module: &str,
531 function: &str,
532 args: Vec<OwnedTerm>,
533 ) -> Result<OwnedTerm> {
534 self.rpc_call_raw_with_timeout(remote_node, module, function, args, DEFAULT_RPC_TIMEOUT)
535 .await
536 }
537
538 pub async fn rpc_call_raw_with_timeout(
539 &self,
540 remote_node: &str,
541 module: &str,
542 function: &str,
543 args: Vec<OwnedTerm>,
544 timeout: Duration,
545 ) -> Result<OwnedTerm> {
546 let reply_to_pid = self
547 .pid_allocator
548 .allocate()
549 .expect("PID allocator lock poisoned");
550
551 let call_request = OwnedTerm::Tuple(vec![
552 OwnedTerm::Pid(reply_to_pid.clone()),
553 OwnedTerm::Tuple(vec![
554 OwnedTerm::Atom(Atom::new("call")),
555 OwnedTerm::Atom(Atom::new(module)),
556 OwnedTerm::Atom(Atom::new(function)),
557 OwnedTerm::List(args),
558 OwnedTerm::Atom(Atom::new("user")),
559 ]),
560 ]);
561
562 let (tx, rx) = oneshot::channel();
563 let pid_str = format!(
564 "{}.{}.{}",
565 reply_to_pid.id, reply_to_pid.serial, reply_to_pid.creation
566 );
567 self.pending_rpcs.insert(pid_str.clone(), tx);
568
569 tracing::debug!("RPC call_request: {:?}", call_request);
570 tracing::debug!("RPC reply_to_pid: {:?}", reply_to_pid);
571
572 tracing::trace!("Looking up connection for node: {}", remote_node);
573 if let Some(conn) = self.connections.get(remote_node) {
574 tracing::trace!("Found connection, sending to rex");
575 let mut conn_guard = conn.lock().await;
576 conn_guard
577 .send_to_name(reply_to_pid, Atom::new("rex"), call_request)
578 .await?;
579 tracing::trace!("Message sent to rex");
580 } else {
581 tracing::error!("No connection found for node: {}", remote_node);
582 self.pending_rpcs.remove(&pid_str);
583 return Err(Error::NodeNotConnected(remote_node.to_string()));
584 }
585
586 let response = tokio::time::timeout(timeout, rx).await;
587
588 if response.is_err() {
589 self.pending_rpcs.remove(&pid_str);
590 }
591
592 let response = response
593 .map_err(|_| Error::RpcTimeout(timeout))?
594 .map_err(|_| Error::RpcCancelled)?;
595
596 Ok(response)
597 }
598}