beamr 0.6.4

A Rust runtime with the BEAM's execution model, targeting Gleam
Documentation
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use std::time::Duration;

use beamr::atom::{Atom, AtomTable};
use beamr::distribution::DistributionConfig;
use beamr::distribution::resolver::{NodeResolver, ResolveError, ResolveFuture};
use beamr::module::ModuleRegistry;
use beamr::native::BifRegistryImpl;
use beamr::scheduler::{Scheduler, SchedulerConfig};
use beamr::term::Term;
use beamr::term::boxed::write_external_pid;

#[derive(Default)]
struct DynamicResolver {
    nodes: Mutex<HashMap<String, SocketAddr>>,
}

impl DynamicResolver {
    fn insert(&self, name: &str, addr: SocketAddr) {
        self.nodes
            .lock()
            .unwrap_or_else(|error| error.into_inner())
            .insert(name.to_owned(), addr);
    }
}

impl NodeResolver for DynamicResolver {
    fn resolve<'a>(&'a self, name: &'a str) -> ResolveFuture<'a> {
        let result = self
            .nodes
            .lock()
            .unwrap_or_else(|error| error.into_inner())
            .get(name)
            .copied()
            .ok_or(ResolveError::NotFound);
        Box::pin(async move { result })
    }
}

fn scheduler(
    node_name: &str,
    resolver: Arc<DynamicResolver>,
    atom_table: Arc<AtomTable>,
) -> Scheduler {
    let bif_registry = Arc::new(BifRegistryImpl::new());
    let module_registry = Arc::new(ModuleRegistry::new());
    Scheduler::with_code_server_and_policy(
        SchedulerConfig {
            thread_count: Some(1),
            node_name: Some(node_name.to_owned()),
            distribution: Some(DistributionConfig { resolver }),
            ..SchedulerConfig::default()
        },
        module_registry,
        atom_table,
        bif_registry,
        Arc::new(beamr::native::AllCapabilitiesPolicy),
    )
    .expect("scheduler starts")
}

#[tokio::test]
async fn loopback_cross_node_pid_send_round_trip() {
    let resolver = Arc::new(DynamicResolver::default());
    let atom_table = Arc::new(AtomTable::with_common_atoms());
    let node_a = scheduler(
        "a@127.0.0.1",
        Arc::clone(&resolver),
        Arc::clone(&atom_table),
    );
    let node_b = scheduler(
        "b@127.0.0.1",
        Arc::clone(&resolver),
        Arc::clone(&atom_table),
    );
    let listen_a = node_a
        .distribution_connections()
        .listen("127.0.0.1:0".parse().expect("listen address parses"))
        .await
        .expect("node A listens");
    let listen_b = node_b
        .distribution_connections()
        .listen("127.0.0.1:0".parse().expect("listen address parses"))
        .await
        .expect("node B listens");
    resolver.insert("a@127.0.0.1", listen_a.local_addr());
    resolver.insert("b@127.0.0.1", listen_b.local_addr());

    let a_node_atom = node_a.local_node().name;
    let b_node_atom = node_b.local_node().name;
    node_a
        .distribution_connections()
        .register_inbound_identifier(move |_| Some(b_node_atom));
    node_b
        .distribution_connections()
        .register_inbound_identifier(move |_| Some(a_node_atom));

    let pid_a = node_a.spawn_test_process(false);
    let pid_b = node_b.spawn_test_process(false);
    let mut heap = [0_u64; 8];
    let remote_b = write_external_pid(&mut heap[..4], b_node_atom, pid_b, 0).expect("remote B pid");
    let remote_a = write_external_pid(&mut heap[4..], a_node_atom, pid_a, 0).expect("remote A pid");

    let frame_to_b = beamr::distribution::control::encode_send_frame(
        Term::atom(Atom::OK),
        remote_b,
        Term::atom(Atom::OK),
        &atom_table,
    )
    .expect("frame to B encodes");
    let conn_to_b = node_a
        .distribution_connections()
        .connect("b@127.0.0.1")
        .await
        .expect("A connects to B");
    conn_to_b
        .write_raw(&frame_to_b)
        .await
        .expect("A writes to B");

    for _ in 0..20 {
        if node_b.has_message(pid_b, Term::atom(Atom::OK)) == Some(true) {
            break;
        }
        tokio::time::sleep(Duration::from_millis(10)).await;
    }
    assert_eq!(node_b.has_message(pid_b, Term::atom(Atom::OK)), Some(true));

    let frame_to_a = beamr::distribution::control::encode_send_frame(
        Term::atom(Atom::OK),
        remote_a,
        Term::atom(Atom::OK),
        &atom_table,
    )
    .expect("frame to A encodes");
    let conn_to_a = node_b
        .distribution_connections()
        .connect("a@127.0.0.1")
        .await
        .expect("B connects to A");
    conn_to_a
        .write_raw(&frame_to_a)
        .await
        .expect("B writes to A");

    for _ in 0..20 {
        if node_a.has_message(pid_a, Term::atom(Atom::OK)) == Some(true) {
            break;
        }
        tokio::time::sleep(Duration::from_millis(10)).await;
    }
    assert_eq!(node_a.has_message(pid_a, Term::atom(Atom::OK)), Some(true));

    listen_a.shutdown();
    listen_b.shutdown();
    node_a.shutdown();
    node_b.shutdown();
}