mm1-node 0.7.23

An Erlang-style actor runtime for Rust.
Documentation
use core::fmt;
use std::pin::Pin;
use std::task::{Context, Poll, ready};

use futures::Stream;
use mm1_address::address::Address;
use mm1_common::types::AnyError;
use mm1_core::tracing::TraceId;
use mm1_proto_system::WatchRef;
use tokio::sync::{mpsc, oneshot};

pub(crate) fn create() -> (Tx, Rx) {
    let (tx, rx) = mpsc::channel(1);
    (Tx(tx), Rx(rx))
}

pub(crate) enum SysCall {
    Exit(Result<(), AnyError>),
    TrapExit(bool),
    Link {
        sender:   Address,
        receiver: Address,
    },
    Unlink {
        sender:   Address,
        receiver: Address,
    },
    ForkAdded(Address),
    Spawn(Pin<Box<dyn Future<Output = ()> + Send + 'static>>),
    Watch {
        sender:   Address,
        receiver: Address,
        reply_tx: oneshot::Sender<WatchRef>,
    },
    Unwatch {
        sender:    Address,
        watch_ref: WatchRef,
    },
}

pub(crate) struct Request {
    pub(crate) trace_id: TraceId,
    pub(crate) call:     SysCall,
    #[allow(dead_code)]
    pub(crate) ack_tx:   oneshot::Sender<std::convert::Infallible>,
}

#[derive(Debug, Clone)]
pub(crate) struct Tx(mpsc::Sender<Request>);

#[derive(Debug)]
#[pin_project::pin_project]
pub(crate) struct Rx(#[pin] mpsc::Receiver<Request>);

impl Tx {
    pub(crate) async fn invoke(&self, call: SysCall) {
        let (ack_tx, ack_rx) = oneshot::channel();
        self.0
            .send(Request {
                trace_id: TraceId::current(),
                call,
                ack_tx,
            })
            .await
            .expect("call: tx.send failed");
        let _ = ack_rx.await;
    }
}

impl Stream for Rx {
    type Item = (TraceId, SysCall);

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        Poll::Ready(
            ready!(self.project().0.poll_recv(cx))
                .map(|Request { trace_id, call, .. }| (trace_id, call)),
        )
    }
}

impl fmt::Display for SysCall {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            SysCall::Exit(Ok(())) => write!(f, "exit::ok"),
            SysCall::Exit(Err(reason)) => write!(f, "exit::err [reason: {reason}]"),
            SysCall::Link { sender, receiver } => write!(f, "link [{sender} -> {receiver}]"),
            SysCall::Unlink { sender, receiver } => {
                write!(f, "unlink [{sender} -> {receiver}]")
            },
            SysCall::TrapExit(set_into) => write!(f, "trap_exit [set-into: {set_into}]"),
            SysCall::ForkAdded(address) => write!(f, "fork_addded [addr: {address}]"),
            SysCall::Spawn(_) => write!(f, "spawn"),
            SysCall::Watch {
                sender, receiver, ..
            } => write!(f, "watch [sender: {sender}, receiver: {receiver}]"),
            SysCall::Unwatch { sender, watch_ref } => {
                write!(f, "unwatch [sender: {sender}, ref: {watch_ref}]")
            },
        }
    }
}

impl fmt::Debug for SysCall {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        fmt::Display::fmt(self, f)
    }
}