beamr 0.6.3

A Rust runtime with the BEAM's execution model, targeting Gleam
Documentation
use super::*;
use crate::term::boxed::{write_external_pid, write_tuple};

#[derive(Default)]
struct RecordingDiagnostics {
    unknown: Vec<i64>,
    malformed: Vec<Option<ControlOp>>,
}

impl ControlDiagnostics for RecordingDiagnostics {
    fn unknown_opcode(&mut self, opcode: i64) {
        self.unknown.push(opcode);
    }
    fn malformed_control(&mut self, op: Option<ControlOp>) {
        self.malformed.push(op);
    }
}

#[derive(Default)]
struct RecordingHandler {
    calls: Vec<ControlOp>,
}

impl ControlMessageHandler for RecordingHandler {
    fn handle_send(&mut self, _tuple: Tuple) {
        self.calls.push(ControlOp::Send);
    }
    fn handle_reg_send(&mut self, _tuple: Tuple) {
        self.calls.push(ControlOp::RegSend);
    }
    fn handle_link(&mut self, _from: DistributedPid, _ft: Term, _to: DistributedPid, _tt: Term) {
        self.calls.push(ControlOp::Link);
    }
    fn handle_unlink(&mut self, _from: DistributedPid, _to: DistributedPid) {
        self.calls.push(ControlOp::Unlink);
    }
    fn handle_exit(&mut self, _from: DistributedPid, _to: DistributedPid, _r: ExitReason) {
        self.calls.push(ControlOp::Exit);
    }
    fn handle_exit2(&mut self, _from: DistributedPid, _to: DistributedPid, _r: ExitReason) {
        self.calls.push(ControlOp::Exit2);
    }
    fn handle_monitor_p(&mut self, _tuple: Tuple) {
        self.calls.push(ControlOp::MonitorP);
    }
    fn handle_demonitor_p(&mut self, _tuple: Tuple) {
        self.calls.push(ControlOp::DemonitorP);
    }
    fn handle_monitor_p_exit(&mut self, _tuple: Tuple) {
        self.calls.push(ControlOp::MonitorPExit);
    }
    fn handle_spawn_request(&mut self, _tuple: Tuple) {
        self.calls.push(ControlOp::SpawnRequest);
    }
    fn handle_spawn_reply(&mut self, _tuple: Tuple) {
        self.calls.push(ControlOp::SpawnReply);
    }
}

#[derive(Default)]
struct RecordingSink {
    sent: Vec<(Atom, OutboundControlMessage)>,
}

impl ControlMessageSink for RecordingSink {
    fn send_control(
        &mut self,
        node: Atom,
        message: OutboundControlMessage,
    ) -> Result<(), LifecycleError> {
        self.sent.push((node, message));
        Ok(())
    }
}

fn local_pid(pid: u64) -> Term {
    Term::pid(pid)
}

fn remote_pid(heap: &mut [u64], node: Atom, pid: u64) -> Term {
    write_external_pid(heap, node, pid, 0).expect("external pid fits")
}

fn tuple(heap: &mut [u64], elements: &[Term]) -> Term {
    write_tuple(heap, elements).expect("tuple fits")
}

fn control_tuple(heap: &mut [u64], op: ControlOp) -> Term {
    let elements = match op {
        ControlOp::Send => [
            Term::small_int(op.opcode()),
            Term::atom(Atom::NIL),
            local_pid(2),
            Term::NIL,
            Term::NIL,
        ],
        ControlOp::RegSend => [
            Term::small_int(op.opcode()),
            local_pid(1),
            Term::atom(Atom::NIL),
            Term::atom(Atom::OK),
            Term::NIL,
        ],
        ControlOp::Link | ControlOp::Unlink => [
            Term::small_int(op.opcode()),
            local_pid(1),
            local_pid(2),
            Term::NIL,
            Term::NIL,
        ],
        ControlOp::Exit | ControlOp::Exit2 => [
            Term::small_int(op.opcode()),
            local_pid(1),
            local_pid(2),
            Term::atom(Atom::ERROR),
            Term::NIL,
        ],
        ControlOp::MonitorP | ControlOp::DemonitorP => [
            Term::small_int(op.opcode()),
            local_pid(1),
            local_pid(2),
            Term::small_int(123),
            Term::NIL,
        ],
        ControlOp::MonitorPExit => [
            Term::small_int(op.opcode()),
            local_pid(1),
            local_pid(2),
            Term::small_int(123),
            Term::atom(Atom::ERROR),
        ],
        ControlOp::SpawnRequest | ControlOp::SpawnReply => [
            Term::small_int(op.opcode()),
            local_pid(1),
            local_pid(2),
            Term::NIL,
            Term::NIL,
        ],
    };
    tuple(heap, &elements)
}

#[test]
fn control_op_mapping_is_exact() {
    assert_eq!(ControlOp::from_opcode(1), Some(ControlOp::Link));
    assert_eq!(ControlOp::from_opcode(2), Some(ControlOp::Send));
    assert_eq!(ControlOp::from_opcode(3), Some(ControlOp::Exit));
    assert_eq!(ControlOp::from_opcode(4), Some(ControlOp::Unlink));
    assert_eq!(ControlOp::from_opcode(6), Some(ControlOp::RegSend));
    assert_eq!(ControlOp::from_opcode(8), Some(ControlOp::Exit2));
    assert_eq!(ControlOp::from_opcode(19), Some(ControlOp::MonitorP));
    assert_eq!(ControlOp::from_opcode(20), Some(ControlOp::DemonitorP));
    assert_eq!(ControlOp::from_opcode(21), Some(ControlOp::MonitorPExit));
    assert_eq!(ControlOp::from_opcode(29), Some(ControlOp::SpawnRequest));
    assert_eq!(ControlOp::from_opcode(31), Some(ControlOp::SpawnReply));
    assert_eq!(ControlOp::from_opcode(255), None);
}

#[test]
fn dispatches_each_known_opcode_to_correct_handler() {
    let ops = [
        ControlOp::Send,
        ControlOp::RegSend,
        ControlOp::Link,
        ControlOp::Exit,
        ControlOp::Unlink,
        ControlOp::MonitorP,
        ControlOp::DemonitorP,
        ControlOp::MonitorPExit,
        ControlOp::Exit2,
        ControlOp::SpawnRequest,
        ControlOp::SpawnReply,
    ];
    for op in ops {
        let mut heap = [0_u64; 8];
        let term = control_tuple(&mut heap, op);
        let mut handler = RecordingHandler::default();
        let mut diagnostics = RecordingDiagnostics::default();
        dispatch_control_message(term, &mut handler, &mut diagnostics);
        assert_eq!(handler.calls, vec![op]);
        assert_eq!(diagnostics.unknown, Vec::<i64>::new());
        assert_eq!(diagnostics.malformed, Vec::<Option<ControlOp>>::new());
    }
}

#[test]
fn unknown_opcode_is_logged_and_ignored() {
    let mut heap = [0_u64; 3];
    let term = tuple(&mut heap, &[Term::small_int(999), local_pid(1)]);
    let mut handler = RecordingHandler::default();
    let mut diagnostics = RecordingDiagnostics::default();
    dispatch_control_message(term, &mut handler, &mut diagnostics);
    assert_eq!(handler.calls, Vec::<ControlOp>::new());
    assert_eq!(diagnostics.unknown, vec![999]);
    assert_eq!(diagnostics.malformed, Vec::<Option<ControlOp>>::new());
}

#[test]
fn inbound_link_and_unlink_mutate_collision_safe_link_state() {
    let mut remote_heap = [0_u64; 4];
    let remote = remote_pid(&mut remote_heap, Atom::OK, 1);
    let local = local_pid(1);
    let mut tuple_heap = [0_u64; 4];
    let link_term = tuple(
        &mut tuple_heap,
        &[Term::small_int(ControlOp::Link.opcode()), remote, local],
    );
    let mut state = ControlLifecycleState::default();
    let mut diagnostics = NoopDiagnostics;
    dispatch_control_message(link_term, &mut state, &mut diagnostics);

    let remote_id = DistributedPid::remote(Atom::OK, 1, 0);
    let local_id = DistributedPid::local(1);
    assert!(state.has_link(remote_id, local_id));
    assert!(state.has_link(local_id, remote_id));
    assert_eq!(state.links()[0].left_term, remote);
    assert_eq!(state.links()[0].right_term, local);

    let mut unlink_heap = [0_u64; 4];
    let unlink_term = tuple(
        &mut unlink_heap,
        &[Term::small_int(ControlOp::Unlink.opcode()), remote, local],
    );
    dispatch_control_message(unlink_term, &mut state, &mut diagnostics);
    assert!(!state.has_link(remote_id, local_id));
}

#[test]
fn outbound_remote_link_writes_link_control_and_records_state() {
    let mut remote_heap = [0_u64; 4];
    let remote = remote_pid(&mut remote_heap, Atom::OK, 42);
    let local = local_pid(7);
    let mut sink = RecordingSink::default();
    let mut state = ControlLifecycleState::default();
    assert_eq!(link_remote(&mut sink, &mut state, local, remote), Ok(()));
    assert_eq!(
        sink.sent,
        vec![(Atom::OK, OutboundControlMessage::link(local, remote))]
    );
    assert!(state.has_link(
        DistributedPid::local(7),
        DistributedPid::remote(Atom::OK, 42, 0)
    ));
}

#[test]
fn linked_remote_exit_produces_exit_control() {
    let mut remote_heap = [0_u64; 4];
    let remote = remote_pid(&mut remote_heap, Atom::OK, 42);
    let local = local_pid(7);
    let mut sink = RecordingSink::default();
    let mut state = ControlLifecycleState::default();
    assert_eq!(link_remote(&mut sink, &mut state, local, remote), Ok(()));
    sink.sent.clear();
    assert_eq!(
        propagate_remote_exit(&mut sink, &mut state, local, ExitReason::Error),
        Ok(())
    );
    assert_eq!(sink.sent.len(), 1);
    assert_eq!(sink.sent[0].0, Atom::OK);
    assert_eq!(sink.sent[0].1.op, ControlOp::Exit);
    assert_eq!(sink.sent[0].1.from, local);
    assert_eq!(sink.sent[0].1.to, remote);
    assert_eq!(sink.sent[0].1.reason, Some(ExitReason::Error));
    assert!(!state.has_link(
        DistributedPid::local(7),
        DistributedPid::remote(Atom::OK, 42, 0)
    ));
}

#[test]
fn inbound_exit2_delegates_to_exit_delivery_state() {
    let mut remote_heap = [0_u64; 4];
    let remote = remote_pid(&mut remote_heap, Atom::OK, 42);
    let local = local_pid(7);
    let mut state = ControlLifecycleState::default();
    state.establish_link(
        DistributedPid::remote(Atom::OK, 42, 0),
        DistributedPid::local(7),
    );
    let mut tuple_heap = [0_u64; 5];
    let exit2_term = tuple(
        &mut tuple_heap,
        &[
            Term::small_int(ControlOp::Exit2.opcode()),
            remote,
            local,
            Term::atom(Atom::ERROR),
        ],
    );
    let mut diagnostics = NoopDiagnostics;
    dispatch_control_message(exit2_term, &mut state, &mut diagnostics);
    assert_eq!(
        state.delivered_exits(),
        &[(
            DistributedPid::remote(Atom::OK, 42, 0),
            DistributedPid::local(7),
            ExitReason::Error,
        )]
    );
    assert!(!state.has_link(
        DistributedPid::remote(Atom::OK, 42, 0),
        DistributedPid::local(7)
    ));
}

#[test]
fn explicit_exit2_to_remote_pid_writes_exit2_control() {
    let mut remote_heap = [0_u64; 4];
    let remote = remote_pid(&mut remote_heap, Atom::OK, 42);
    let local = local_pid(7);
    let mut sink = RecordingSink::default();
    assert_eq!(
        exit2_remote(&mut sink, local, remote, ExitReason::Kill),
        Ok(())
    );
    assert_eq!(
        sink.sent,
        vec![(
            Atom::OK,
            OutboundControlMessage::exit2(local, remote, ExitReason::Kill)
        )]
    );
}