mm1-node 0.7.23

An Erlang-style actor runtime for Rust.
Documentation
#![allow(dead_code)]

use std::sync::{Arc, Weak};

use mm1_address::address::Address;
use mm1_address::address_range::AddressRange;
use mm1_address::pool::Lease;
use mm1_address::subnet::NetAddress;
use mm1_common::errors::chain::StdErrorDisplayChainExt;
use mm1_common::log;
use tokio::sync::{Notify, OwnedSemaphorePermit, Semaphore, mpsc};

#[derive(derive_more::Debug)]
pub(crate) struct Registry<S, M> {
    #[debug(skip)]
    networks: scc::TreeIndex<AddressRange, Node<S, M>>,
}

pub(crate) struct Node<S, M> {
    subnet_lease:    Arc<Lease>,
    inbox_size:      usize,
    inbox_semaphore: Arc<Semaphore>,
    mailbox_tx:      SubnetMailboxTx<S, M>,
}

pub(crate) fn new_mailbox<S, M>() -> (SubnetMailboxTx<S, M>, SubnetMailboxRx<S, M>) {
    let subnet_notify = Arc::new(Notify::new());
    let (tx_system, rx_system) = mpsc::unbounded_channel();
    let (tx_priority, rx_priority) = kanal::unbounded();
    let (tx_regular, rx_regular) = kanal::unbounded();

    let tx_priority = tx_priority.into();
    let tx_regular = tx_regular.into();

    let tx = SubnetMailboxTx {
        tx_system,
        subnet_notify: subnet_notify.clone(),
        tx_priority,
        tx_regular,
    };
    let rx = SubnetMailboxRx {
        rx_system,
        subnet_notify,
        rx_priority,
        rx_regular,
    };

    (tx, rx)
}

pub(crate) struct SubnetMailboxTx<S, M> {
    pub(crate) tx_system: mpsc::UnboundedSender<S>,

    pub(crate) subnet_notify: Arc<Notify>,
    pub(crate) tx_priority:   Arc<kanal::Sender<MessageWithoutPermit<M>>>,
    pub(crate) tx_regular:    Arc<kanal::Sender<MessageWithPermit<M>>>,
}

pub(crate) struct WeakSubnetMailboxTx<S, M> {
    pub(crate) tx_system: mpsc::WeakUnboundedSender<S>,

    pub(crate) subnet_notify: Weak<Notify>,
    pub(crate) tx_priority:   Weak<kanal::Sender<MessageWithoutPermit<M>>>,
    pub(crate) tx_regular:    Weak<kanal::Sender<MessageWithPermit<M>>>,
}

pub(crate) struct SubnetMailboxRx<S, M> {
    pub(crate) rx_system:     mpsc::UnboundedReceiver<S>,
    pub(crate) subnet_notify: Arc<Notify>,
    pub(crate) rx_priority:   kanal::Receiver<MessageWithoutPermit<M>>,
    pub(crate) rx_regular:    kanal::Receiver<MessageWithPermit<M>>,
}

pub(crate) struct MessageWithoutPermit<M> {
    pub(crate) to:      Address,
    pub(crate) message: M,
}

pub(crate) struct MessageWithPermit<M> {
    pub(crate) to:      Address,
    pub(crate) message: M,
    permit:             OwnedSemaphorePermit,
}

impl<S, M> Registry<S, M>
where
    S: 'static,
    M: 'static,
{
    pub(crate) fn new() -> Self {
        Default::default()
    }

    pub(crate) fn register(
        &self,
        subnet_address: NetAddress,
        node: Node<S, M>,
    ) -> Result<(), Node<S, M>> {
        let address_range = AddressRange::from(subnet_address);
        self.networks
            .insert(address_range, node)
            .inspect_err(
                |(address_range, _node)| log::warn!(%address_range, "failed to bind address range"),
            )
            .map_err(|(_address_range, node)| node)?;
        log::trace!(%address_range, "register: registered");
        Ok(())
    }

    pub(crate) fn unregister(&self, subnet_address: NetAddress) -> bool {
        let guard = Default::default();
        let sought_range = AddressRange::from(subnet_address);
        let Some((found_range, _)) = self.networks.peek_entry(&sought_range, &guard) else {
            log::trace!(
                %sought_range,
                "unregister: sought-range not found"
            );
            return false
        };
        if *found_range != sought_range {
            log::error!(
                %sought_range,
                %found_range,
                "unregister: sought-range is not equal to the found range"
            );
            return false
        }
        let removed = self.networks.remove(&sought_range);
        log::trace!(%sought_range, %removed, "unregister: removing range");

        removed
    }

    pub(crate) fn lookup(&self, address: Address) -> Option<Node<S, M>> {
        self.networks
            .peek_with(&AddressRange::from(address), |_, node| node.clone())
    }
}

impl<S, M> Node<S, M> {
    pub(crate) fn new(
        subnet_lease: Lease,
        inbox_size: usize,
        mailbox_tx: SubnetMailboxTx<S, M>,
    ) -> Self {
        let inbox_semaphore = Arc::new(Semaphore::new(inbox_size));
        let subnet_lease = Arc::new(subnet_lease);
        Self {
            subnet_lease,
            inbox_size,
            inbox_semaphore,
            mailbox_tx,
        }
    }
}

impl<S, M> Node<S, M> {
    pub(crate) fn send(&self, to: Address, priority: bool, message: M) -> Result<(), ()> {
        let Self {
            inbox_semaphore,
            mailbox_tx,
            ..
        } = self;
        let SubnetMailboxTx {
            tx_priority,
            tx_regular,
            subnet_notify,
            ..
        } = mailbox_tx;

        let sent = if priority {
            let message_without_permit = MessageWithoutPermit { to, message };
            tx_priority
                .try_send(message_without_permit)
                .inspect_err(|e| log::warn!(reason = %e.as_display_chain(), "could not send via tx-priority"))
                .map_err(|_e| ())?
        } else {
            let Ok(permit) = inbox_semaphore.clone().try_acquire_owned() else {
                return Err(())
            };
            let message_with_permit = MessageWithPermit {
                to,
                message,
                permit,
            };
            tx_regular
                .try_send(message_with_permit)
                .inspect_err(
                    |e| log::warn!(reason = %e.as_display_chain(), "could not send via tx-regular"),
                )
                .map_err(|_e| ())?
        };
        if sent {
            log::trace!(subnet = %self.subnet_lease.net_address(), "notifying subnet");
            subnet_notify.notify_one();
            Ok(())
        } else {
            Err(())
        }
    }

    pub(crate) fn sys_send(&self, sys_msg: S) -> Result<(), S> {
        let Self { mailbox_tx, .. } = self;
        let SubnetMailboxTx { tx_system, .. } = mailbox_tx;
        tx_system.send(sys_msg).map_err(|e| e.0)
    }
}

impl<S, M> Clone for Node<S, M> {
    fn clone(&self) -> Self {
        let Self {
            subnet_lease,
            inbox_size,
            inbox_semaphore,
            mailbox_tx,
        } = self;
        Self {
            subnet_lease:    subnet_lease.clone(),
            inbox_size:      *inbox_size,
            inbox_semaphore: inbox_semaphore.clone(),
            mailbox_tx:      mailbox_tx.clone(),
        }
    }
}

impl<S, M> SubnetMailboxTx<S, M> {
    pub(crate) fn downgrade(&self) -> WeakSubnetMailboxTx<S, M> {
        let Self {
            tx_system,
            subnet_notify,
            tx_priority,
            tx_regular,
        } = self;
        let tx_system = tx_system.downgrade();
        let subnet_notify = Arc::downgrade(subnet_notify);
        let tx_priority = Arc::downgrade(tx_priority);
        let tx_regular = Arc::downgrade(tx_regular);

        WeakSubnetMailboxTx {
            tx_system,
            subnet_notify,
            tx_priority,
            tx_regular,
        }
    }
}
impl<S, M> WeakSubnetMailboxTx<S, M> {
    pub(crate) fn upgrade(&self) -> Option<SubnetMailboxTx<S, M>> {
        let Self {
            tx_system,
            subnet_notify,
            tx_priority,
            tx_regular,
        } = self;
        let tx_system = tx_system.upgrade()?;
        let subnet_notify = subnet_notify.upgrade()?;
        let tx_priority = tx_priority.upgrade()?;
        let tx_regular = tx_regular.upgrade()?;

        Some(SubnetMailboxTx {
            tx_system,
            subnet_notify,
            tx_priority,
            tx_regular,
        })
    }
}

impl<S, M> Clone for SubnetMailboxTx<S, M> {
    fn clone(&self) -> Self {
        let Self {
            tx_system,
            subnet_notify,
            tx_priority,
            tx_regular,
        } = self;
        Self {
            tx_system:     tx_system.clone(),
            subnet_notify: subnet_notify.clone(),
            tx_priority:   tx_priority.clone(),
            tx_regular:    tx_regular.clone(),
        }
    }
}
impl<S, M> Clone for WeakSubnetMailboxTx<S, M> {
    fn clone(&self) -> Self {
        let Self {
            tx_system,
            subnet_notify,
            tx_priority,
            tx_regular,
        } = self;
        Self {
            tx_system:     tx_system.clone(),
            subnet_notify: subnet_notify.clone(),
            tx_priority:   tx_priority.clone(),
            tx_regular:    tx_regular.clone(),
        }
    }
}

impl<S, M> Default for Registry<S, M> {
    fn default() -> Self {
        Self {
            networks: Default::default(),
        }
    }
}