ockam_node 0.76.0

Ockam Node implementation crate
Documentation
use crate::async_drop::AsyncDrop;
use crate::channel_types::{message_channel, small_channel, SmallReceiver, SmallSender};
use crate::debugger;
use crate::tokio::{self, runtime::Handle, time::timeout};
use crate::{
    error::*, parser, relay::CtrlSignal, router::SenderPair, Cancel, NodeMessage, ProcessorBuilder,
    ShutdownType, WorkerBuilder,
};
use core::{
    sync::atomic::{AtomicUsize, Ordering},
    time::Duration,
};
use ockam_core::compat::{boxed::Box, string::String, sync::Arc, vec::Vec};
use ockam_core::{
    errcode::{Kind, Origin},
    Address, AllowAll, AllowOnwardAddress, AsyncTryClone, DenyAll, Error, IncomingAccessControl,
    LocalMessage, Mailboxes, Message, OutgoingAccessControl, Processor, RelayMessage, Result,
    Route, TransportMessage, TransportType, Worker,
};
use ockam_core::{LocalInfo, Mailbox};

/// A default timeout in seconds
pub const DEFAULT_TIMEOUT: u64 = 30;

enum AddressType {
    Worker,
    Processor,
}

impl AddressType {
    fn str(&self) -> &'static str {
        match self {
            AddressType::Worker => "worker",
            AddressType::Processor => "processor",
        }
    }
}

/// A special sender type that connects a type to an AsyncDrop handler
pub type AsyncDropSender = crate::tokio::sync::oneshot::Sender<Address>;

/// A special type of `Context` that has no worker relay and inherits
/// the parent `Context`'s access control
pub type DetachedContext = Context;

/// A special type of `Context` that has no worker relay and a custom
/// access control which is not inherited from its parent `Context.
pub type RepeaterContext = Context;

/// Context contains Node state and references to the runtime.
pub struct Context {
    mailboxes: Mailboxes,
    sender: SmallSender<NodeMessage>,
    rt: Handle,
    receiver: SmallReceiver<RelayMessage>,
    async_drop_sender: Option<AsyncDropSender>,
    mailbox_count: Arc<AtomicUsize>,
}

impl Drop for Context {
    fn drop(&mut self) {
        if let Some(sender) = self.async_drop_sender.take() {
            trace!("De-allocated detached context {}", self.address());
            if let Err(e) = sender.send(self.address()) {
                warn!("Encountered error while dropping detached context: {}", e);
            }
        }
    }
}

#[ockam_core::async_trait]
impl AsyncTryClone for Context {
    async fn async_try_clone(&self) -> Result<Self> {
        // TODO: @ac ignores parent Access Control. Should be documented somewhere
        self.new_detached(
            Address::random_tagged("Context.async_try_clone.detached"),
            DenyAll,
            DenyAll,
        )
        .await
    }
}

impl Context {
    /// Return runtime clone
    pub fn runtime(&self) -> &Handle {
        &self.rt
    }

    /// Return mailbox_count clone
    pub(crate) fn mailbox_count(&self) -> Arc<AtomicUsize> {
        self.mailbox_count.clone()
    }

    /// Return a reference to sender
    pub(crate) fn sender(&self) -> &SmallSender<NodeMessage> {
        &self.sender
    }

    /// Wait for the next message from the mailbox
    pub(crate) async fn receiver_next(&mut self) -> Result<Option<RelayMessage>> {
        loop {
            let relay_msg = if let Some(msg) = self.receiver.recv().await.map(|msg| {
                trace!("{}: received new message!", self.address());

                // First we update the mailbox fill metrics
                self.mailbox_count.fetch_sub(1, Ordering::Acquire);

                msg
            }) {
                msg
            } else {
                // no more messages
                return Ok(None);
            };

            debugger::log_incoming_message(self, &relay_msg);

            if !self.mailboxes.is_incoming_authorized(&relay_msg).await? {
                warn!(
                    "Message received from {} for {} did not pass incoming access control",
                    relay_msg.return_route(),
                    relay_msg.destination()
                );
                continue;
            }

            return Ok(Some(relay_msg));
        }
    }
}

impl Context {
    /// Create a new context
    ///
    /// This function returns a new instance of Context, the relay
    /// sender pair, and relay control signal receiver.
    ///
    /// `async_drop_sender` must be provided when creating a detached
    /// Context type (i.e. not backed by a worker relay).
    pub(crate) fn new(
        rt: Handle,
        sender: SmallSender<NodeMessage>,
        mailboxes: Mailboxes,
        async_drop_sender: Option<AsyncDropSender>,
    ) -> (Self, SenderPair, SmallReceiver<CtrlSignal>) {
        let (mailbox_tx, receiver) = message_channel();
        let (ctrl_tx, ctrl_rx) = small_channel();
        (
            Self {
                rt,
                sender,
                mailboxes,
                receiver,
                async_drop_sender,
                mailbox_count: Arc::new(0.into()),
            },
            SenderPair {
                msgs: mailbox_tx,
                ctrl: ctrl_tx,
            },
            ctrl_rx,
        )
    }

    /// Return the primary address of the current worker
    pub fn address(&self) -> Address {
        self.mailboxes.main_address()
    }

    /// Return all addresses of the current worker
    pub fn aliases(&self) -> Vec<Address> {
        self.mailboxes.aliases()
    }

    /// Return a reference to the mailboxes of this context
    pub fn mailboxes(&self) -> &Mailboxes {
        &self.mailboxes
    }

    /// Utility function to sleep tasks from other crates
    #[doc(hidden)]
    pub async fn sleep(&self, dur: Duration) {
        tokio::time::sleep(dur).await;
    }

    /// TODO basically we can just rename `Self::new_detached_impl()`
    pub async fn new_detached_with_mailboxes(
        &self,
        mailboxes: Mailboxes,
    ) -> Result<DetachedContext> {
        let ctx = self.new_detached_impl(mailboxes).await?;

        debugger::log_inherit_context("DETACHED_WITH_MB", self, &ctx);

        Ok(ctx)
    }

    /// Create a new detached `Context` without spawning a full worker
    ///
    /// Note: this function is very low-level.  For most users
    /// [`start_worker()`](Self::start_worker) is the recommended way
    /// to create a new worker context.
    ///
    pub async fn new_detached(
        &self,
        address: impl Into<Address>,
        incoming: impl IncomingAccessControl,
        outgoing: impl OutgoingAccessControl,
    ) -> Result<DetachedContext> {
        let mailboxes = Mailboxes::main(address.into(), Arc::new(incoming), Arc::new(outgoing));
        let ctx = self.new_detached_impl(mailboxes).await?;

        debugger::log_inherit_context("DETACHED", self, &ctx);

        Ok(ctx)
    }

    async fn new_detached_impl(&self, mailboxes: Mailboxes) -> Result<DetachedContext> {
        // A detached Context exists without a worker relay, which
        // requires special shutdown handling.  To allow the Drop
        // handler to interact with the Node runtime, we use an
        // AsyncDrop handler.
        //
        // This handler is spawned and listens for an event from the
        // Drop handler, and then forwards a message to the Node
        // router.
        let (async_drop, drop_sender) = AsyncDrop::new(self.sender.clone());
        self.rt.spawn(async_drop.run());

        // Create a new context and get access to the mailbox senders
        let addresses = mailboxes.addresses();
        let (ctx, sender, _) = Self::new(
            self.rt.clone(),
            self.sender.clone(),
            mailboxes,
            Some(drop_sender),
        );

        // Create a "detached relay" and register it with the router
        let (msg, mut rx) =
            NodeMessage::start_worker(addresses, sender, true, Arc::clone(&self.mailbox_count));
        self.sender
            .send(msg)
            .await
            .map_err(|e| Error::new(Origin::Node, Kind::Invalid, e))?;
        rx.recv()
            .await
            .ok_or_else(|| NodeError::NodeState(NodeReason::Unknown).internal())??;

        Ok(ctx)
    }

    /// Start a new worker instance at the given address
    ///
    /// A worker is an asynchronous piece of code that can send and
    /// receive messages of a specific type.  This type is encoded via
    /// the [`Worker`](ockam_core::Worker) trait.  If your code relies
    /// on a manual run-loop you may want to use
    /// [`start_processor()`](Self::start_processor) instead!
    ///
    /// Each address in the set must be unique and unused on the
    /// current node.  Workers must implement the Worker trait and be
    /// thread-safe.  Workers run asynchronously and will be scheduled
    /// independently of each other.  To wait for the initialisation
    /// of your worker to complete you can use
    /// [`wait_for()`](Self::wait_for).
    ///
    /// ```rust
    /// use ockam_core::{AllowAll, Result, Worker, worker};
    /// use ockam_node::Context;
    ///
    /// struct MyWorker;
    ///
    /// #[worker]
    /// impl Worker for MyWorker {
    ///     type Context = Context;
    ///     type Message = String;
    /// }
    ///
    /// async fn start_my_worker(ctx: &mut Context) -> Result<()> {
    ///     ctx.start_worker("my-worker-address", MyWorker, AllowAll, AllowAll).await
    /// }
    /// ```
    pub async fn start_worker<NM, NW>(
        &self,
        address: impl Into<Address>,
        worker: NW,
        incoming: impl IncomingAccessControl,
        outgoing: impl OutgoingAccessControl,
    ) -> Result<()>
    where
        NM: Message + Send + 'static,
        NW: Worker<Context = Context, Message = NM>,
    {
        WorkerBuilder::with_mailboxes(
            Mailboxes::main(address, Arc::new(incoming), Arc::new(outgoing)),
            worker,
        )
        .start(self)
        .await?;

        Ok(())
    }

    /// Start a new processor instance at the given address
    ///
    /// A processor is an asynchronous piece of code that runs a
    /// custom run loop, with access to a worker context to send and
    /// receive messages.  If your code is built around responding to
    /// message events, consider using
    /// [`start_worker()`](Self::start_worker) instead!
    ///
    pub async fn start_processor<P>(
        &self,
        address: impl Into<Address>,
        processor: P,
        incoming: impl IncomingAccessControl,
        outgoing: impl OutgoingAccessControl,
    ) -> Result<()>
    where
        P: Processor<Context = Context>,
    {
        ProcessorBuilder::with_mailboxes(
            Mailboxes::main(address.into(), Arc::new(incoming), Arc::new(outgoing)),
            processor,
        )
        .start(self)
        .await?;

        Ok(())
    }

    /// Shut down a local worker by its primary address
    pub async fn stop_worker<A: Into<Address>>(&self, addr: A) -> Result<()> {
        self.stop_address(addr.into(), AddressType::Worker).await
    }

    /// Shut down a local processor by its address
    pub async fn stop_processor<A: Into<Address>>(&self, addr: A) -> Result<()> {
        self.stop_address(addr.into(), AddressType::Processor).await
    }

    async fn stop_address(&self, addr: Address, t: AddressType) -> Result<()> {
        debug!("Shutting down {} {}", t.str(), addr);

        // Send the stop request
        let (req, mut rx) = match t {
            AddressType::Worker => NodeMessage::stop_worker(addr, false),
            AddressType::Processor => NodeMessage::stop_processor(addr),
        };
        self.sender
            .send(req)
            .await
            .map_err(NodeError::from_send_err)?;

        // Then check that address was properly shut down
        rx.recv()
            .await
            .ok_or_else(|| NodeError::NodeState(NodeReason::Unknown).internal())??;
        Ok(())
    }

    /// Signal to the local runtime to shut down immediately
    ///
    /// **WARNING**: calling this function may result in data loss.
    /// It is recommended to use the much safer
    /// [`Context::stop`](Context::stop) function instead!
    pub async fn stop_now(&mut self) -> Result<()> {
        let tx = self.sender.clone();
        info!("Immediately shutting down all workers");
        let (msg, _) = NodeMessage::stop_node(ShutdownType::Immediate);

        match tx.send(msg).await {
            Ok(()) => Ok(()),
            Err(e) => Err(Error::new(Origin::Node, Kind::Invalid, e)),
        }
    }

    /// Signal to the local runtime to shut down
    ///
    /// This call will hang until a safe shutdown has been completed.
    /// The default timeout for a safe shutdown is 1 second.  You can
    /// change this behaviour by calling
    /// [`Context::stop_timeout`](Context::stop_timeout) directly.
    pub async fn stop(&mut self) -> Result<()> {
        self.stop_timeout(1).await
    }

    /// Signal to the local runtime to shut down
    ///
    /// This call will hang until a safe shutdown has been completed
    /// or the desired timeout has been reached.
    pub async fn stop_timeout(&mut self, seconds: u8) -> Result<()> {
        let (req, mut rx) = NodeMessage::stop_node(ShutdownType::Graceful(seconds));
        self.sender
            .send(req)
            .await
            .map_err(NodeError::from_send_err)?;

        // Wait until we get the all-clear
        rx.recv()
            .await
            .ok_or_else(|| NodeError::NodeState(NodeReason::Unknown).internal())??;
        Ok(())
    }

    /// Using a temporary new context, send a message and then receive a message
    ///
    /// This helper function uses [`new_detached`], [`send`], and
    /// [`receive`] internally. See their documentation for more
    /// details.
    ///
    /// [`new_detached`]: Self::new_detached
    /// [`send`]: Self::send
    /// [`receive`]: Self::receive
    pub async fn send_and_receive<R, M, N>(&self, route: R, msg: M) -> Result<N>
    where
        R: Into<Route>,
        M: Message + Send + 'static,
        N: Message,
    {
        self.send_and_receive_with_timeout(route, msg, Duration::from_secs(DEFAULT_TIMEOUT))
            .await
    }

    /// Using a temporary new context, send a message and then receive a message with custom timeout
    ///
    /// This helper function uses [`new_detached`], [`send`], and
    /// [`receive`] internally. See their documentation for more
    /// details.
    ///
    /// [`new_detached`]: Self::new_detached
    /// [`send`]: Self::send
    /// [`receive`]: Self::receive
    pub async fn send_and_receive_with_timeout<R, M, N>(
        &self,
        route: R,
        msg: M,
        timeout: Duration,
    ) -> Result<N>
    where
        R: Into<Route>,
        M: Message + Send + 'static,
        N: Message,
    {
        let route: Route = route.into();

        let next = route.next()?.clone();
        let mailboxes = Mailboxes::new(
            Mailbox::new(
                Address::random_tagged("Context.send_and_receive.detached"),
                Arc::new(AllowAll), // FIXME: @ac there is no way to ensure that we're receiving response from the worker we sent request to
                Arc::new(AllowOnwardAddress(next)),
            ),
            vec![],
        );
        let mut child_ctx = self.new_detached_with_mailboxes(mailboxes).await?;

        child_ctx.send(route, msg).await?;
        Ok(child_ctx
            .receive_duration_timeout::<N>(timeout)
            .await?
            .take()
            .body())
    }

    /// Send a message to another address associated with this worker
    ///
    /// This function is a simple wrapper around `Self::send()` which
    /// validates the address given to it and will reject invalid
    /// addresses.
    pub async fn send_to_self<A, M>(&self, from: A, addr: A, msg: M) -> Result<()>
    where
        A: Into<Address>,
        M: Message + Send + 'static,
    {
        let addr = addr.into();
        if self.mailboxes.contains(&addr) {
            self.send_from_address(addr, msg, from.into()).await
        } else {
            Err(NodeError::NodeState(NodeReason::Unknown).internal())
        }
    }

    /// Send a message to an address or via a fully-qualified route
    ///
    /// Routes can be constructed from a set of [`Address`]es, or via
    /// the [`RouteBuilder`] type.  Routes can contain middleware
    /// router addresses, which will re-address messages that need to
    /// be handled by specific domain workers.
    ///
    /// [`Address`]: ockam_core::Address
    /// [`RouteBuilder`]: ockam_core::RouteBuilder
    ///
    /// ```rust
    /// # use {ockam_node::Context, ockam_core::Result};
    /// # async fn test(ctx: &mut Context) -> Result<()> {
    /// use ockam_core::Message;
    /// use serde::{Serialize, Deserialize};
    ///
    /// #[derive(Message, Serialize, Deserialize)]
    /// struct MyMessage(String);
    ///
    /// impl MyMessage {
    ///     fn new(s: &str) -> Self {
    ///         Self(s.into())
    ///     }
    /// }
    ///
    /// ctx.send("my-test-worker", MyMessage::new("Hello you there :)")).await?;
    /// Ok(())
    /// # }
    /// ```
    pub async fn send<R, M>(&self, route: R, msg: M) -> Result<()>
    where
        R: Into<Route>,
        M: Message + Send + 'static,
    {
        self.send_from_address(route.into(), msg, self.address())
            .await
    }

    /// Send a message to an address or via a fully-qualified route
    /// after attaching the given [`LocalInfo`] to the message.
    pub async fn send_with_local_info<R, M>(
        &self,
        route: R,
        msg: M,
        local_info: Vec<LocalInfo>,
    ) -> Result<()>
    where
        R: Into<Route>,
        M: Message + Send + 'static,
    {
        self.send_from_address_impl(route.into(), msg, self.address(), local_info)
            .await
    }

    /// Send a message to an address or via a fully-qualified route
    ///
    /// Routes can be constructed from a set of [`Address`]es, or via
    /// the [`RouteBuilder`] type.  Routes can contain middleware
    /// router addresses, which will re-address messages that need to
    /// be handled by specific domain workers.
    ///
    /// [`Address`]: ockam_core::Address
    /// [`RouteBuilder`]: ockam_core::RouteBuilder
    ///
    /// This function additionally takes the sending address
    /// parameter, to specify which of a worker's (or processor's)
    /// addresses should be used.
    pub async fn send_from_address<R, M>(
        &self,
        route: R,
        msg: M,
        sending_address: Address,
    ) -> Result<()>
    where
        R: Into<Route>,
        M: Message + Send + 'static,
    {
        self.send_from_address_impl(route.into(), msg, sending_address, Vec::new())
            .await
    }

    async fn send_from_address_impl<M>(
        &self,
        route: Route,
        msg: M,
        sending_address: Address,
        local_info: Vec<LocalInfo>,
    ) -> Result<()>
    where
        M: Message + Send + 'static,
    {
        // Check if the sender address exists
        if !self.mailboxes.contains(&sending_address) {
            return Err(Error::new_without_cause(Origin::Node, Kind::Invalid));
        }

        // First resolve the next hop in the route
        let (reply_tx, mut reply_rx) = small_channel();
        let next = match route.next() {
            Ok(next) => next,
            Err(_) => {
                // TODO: communicate bad routes to calling function
                tracing::error!("Invalid route for message sent from {}", sending_address);
                panic!("invalid destination route");
            }
        };

        let req = NodeMessage::SenderReq(next.clone(), reply_tx);
        self.sender
            .send(req)
            .await
            .map_err(NodeError::from_send_err)?;
        let (addr, sender) = reply_rx
            .recv()
            .await
            .ok_or_else(|| NodeError::NodeState(NodeReason::Unknown).internal())??
            .take_sender()?;

        // Pack the payload into a TransportMessage
        let payload = msg.encode().unwrap();
        let mut transport_msg = TransportMessage::v1(route, Route::new(), payload);
        transport_msg
            .return_route
            .modify()
            .append(sending_address.clone());

        // Pack transport message into a LocalMessage wrapper
        let local_msg = LocalMessage::new(transport_msg, local_info);

        // Pack local message into a RelayMessage wrapper
        let relay_msg = RelayMessage::new(sending_address.clone(), addr, local_msg);

        debugger::log_outgoing_message(self, &relay_msg);

        if !self.mailboxes.is_outgoing_authorized(&relay_msg).await? {
            warn!(
                "Message sent from {} to {} did not pass outgoing access control",
                relay_msg.source(),
                relay_msg.destination()
            );
            return Ok(());
        }

        // Send the packed user message with associated route
        sender
            .send(relay_msg)
            .await
            .map_err(NodeError::from_send_err)?;

        Ok(())
    }

    /// Forward a transport message to its next routing destination
    ///
    /// Similar to [`Context::send`], but taking a
    /// [`TransportMessage`], which contains the full destination
    /// route, and calculated return route for this hop.
    ///
    /// **Note:** you most likely want to use
    /// [`Context::send`] instead, unless you are writing an
    /// external router implementation for ockam node.
    ///
    /// [`Context::send`]: crate::Context::send
    /// [`TransportMessage`]: ockam_core::TransportMessage
    pub async fn forward(&self, local_msg: LocalMessage) -> Result<()> {
        self.forward_from_address(local_msg, self.address()).await
    }

    /// Forward a transport message to its next routing destination
    ///
    /// Similar to [`Context::send`], but taking a
    /// [`TransportMessage`], which contains the full destination
    /// route, and calculated return route for this hop.
    ///
    /// **Note:** you most likely want to use
    /// [`Context::send`] instead, unless you are writing an
    /// external router implementation for ockam node.
    ///
    /// [`Context::send`]: crate::Context::send
    /// [`TransportMessage`]: ockam_core::TransportMessage
    pub async fn forward_from_address(
        &self,
        local_msg: LocalMessage,
        sending_address: Address,
    ) -> Result<()> {
        // Check if the sender address exists
        if !self.mailboxes.contains(&sending_address) {
            return Err(Error::new_without_cause(Origin::Node, Kind::Invalid));
        }

        // First resolve the next hop in the route
        let (reply_tx, mut reply_rx) = small_channel();
        let next = match local_msg.transport().onward_route.next() {
            Ok(next) => next,
            Err(_) => {
                // TODO: communicate bad routes to calling function
                tracing::error!(
                    "Invalid onward route for message forwarded from {}",
                    local_msg.transport().return_route
                );
                panic!("invalid destination route");
            }
        };
        let req = NodeMessage::SenderReq(next.clone(), reply_tx);
        self.sender
            .send(req)
            .await
            .map_err(NodeError::from_send_err)?;
        let (addr, sender) = reply_rx
            .recv()
            .await
            .ok_or_else(|| NodeError::NodeState(NodeReason::Unknown).internal())??
            .take_sender()?;

        // Pack the transport message into a RelayMessage wrapper
        let relay_msg = RelayMessage::new(sending_address, addr, local_msg);

        debugger::log_outgoing_message(self, &relay_msg);

        // TODO check if this context is allowed to forward the message
        //      to the next hop in the route
        if !self.mailboxes.is_outgoing_authorized(&relay_msg).await? {
            warn!(
                "Message forwarded from {} to {} did not pass outgoing access control",
                relay_msg.source(),
                relay_msg.destination(),
            );
            return Ok(());
        }

        // Forward the message
        sender
            .send(relay_msg)
            .await
            .map_err(NodeError::from_send_err)?;

        Ok(())
    }

    /// Block the current worker to wait for a typed message
    ///
    /// **Warning** this function will wait until its running ockam
    /// node is shut down.  A safer variant of this function is
    /// [`receive`](Self::receive) and
    /// [`receive_timeout`](Self::receive_timeout).
    pub async fn receive_block<M: Message>(&mut self) -> Result<Cancel<'_, M>> {
        let (msg, data, addr) = self.next_from_mailbox().await?;
        Ok(Cancel::new(msg, data, addr, self))
    }

    /// Block the current worker to wait for a typed message
    ///
    /// This function may return a `Err(FailedLoadData)` if the
    /// underlying worker was shut down, or `Err(Timeout)` if the call
    /// was waiting for longer than the `default timeout`.  Use
    /// [`receive_timeout`](Context::receive_timeout) to adjust the
    /// timeout period.
    ///
    /// Will return `None` if the corresponding worker has been
    /// stopped, or the underlying Node has shut down.
    pub async fn receive<M: Message>(&mut self) -> Result<Cancel<'_, M>> {
        self.receive_timeout(DEFAULT_TIMEOUT).await
    }

    /// Wait to receive a message up to a specified timeout
    ///
    /// See [`receive`](Self::receive) for more details.
    pub async fn receive_duration_timeout<M: Message>(
        &mut self,
        timeout_duration: Duration,
    ) -> Result<Cancel<'_, M>> {
        let (msg, data, addr) = timeout(timeout_duration, async { self.next_from_mailbox().await })
            .await
            .map_err(|e| NodeError::Data.with_elapsed(e))??;
        Ok(Cancel::new(msg, data, addr, self))
    }

    /// Wait to receive a message up to a specified timeout
    ///
    /// See [`receive`](Self::receive) for more details.
    pub async fn receive_timeout<M: Message>(
        &mut self,
        timeout_secs: u64,
    ) -> Result<Cancel<'_, M>> {
        self.receive_duration_timeout(Duration::from_secs(timeout_secs))
            .await
    }

    /// Block the current worker to wait for a message satisfying a conditional
    ///
    /// Will return `Err` if the corresponding worker has been
    /// stopped, or the underlying node has shut down.  This operation
    /// has a [default timeout](DEFAULT_TIMEOUT).
    ///
    /// Internally this function uses [`receive`](Self::receive), so
    /// is subject to the same timeout.
    pub async fn receive_match<M, F>(&mut self, check: F) -> Result<Cancel<'_, M>>
    where
        M: Message,
        F: Fn(&M) -> bool,
    {
        let (m, data, addr) = timeout(Duration::from_secs(DEFAULT_TIMEOUT), async {
            loop {
                match self.next_from_mailbox().await {
                    Ok((m, data, addr)) if check(&m) => break Ok((m, data, addr)),
                    Ok((_, data, _)) => {
                        // Requeue
                        self.forward(data).await?;
                    }
                    e => break e,
                }
            }
        })
        .await
        .map_err(|e| NodeError::Data.with_elapsed(e))??;

        Ok(Cancel::new(m, data, addr, self))
    }

    /// Assign the current worker to a cluster
    ///
    /// A cluster is a set of workers that should be stopped together
    /// when the node is stopped or parts of the system are reloaded.
    /// **This is not to be confused with supervisors!**
    ///
    /// By adding your worker to a cluster you signal to the runtime
    /// that your worker may be depended on by other workers that
    /// should be stopped first.
    ///
    /// **Your cluster name MUST NOT start with `_internals.` or
    /// `ockam.`!**
    ///
    /// Clusters are de-allocated in reverse order of their
    /// initialisation when the node is stopped.
    pub async fn set_cluster<S: Into<String>>(&self, label: S) -> Result<()> {
        let (msg, mut rx) = NodeMessage::set_cluster(self.address(), label.into());
        self.sender
            .send(msg)
            .await
            .map_err(NodeError::from_send_err)?;
        rx.recv()
            .await
            .ok_or_else(|| NodeError::NodeState(NodeReason::Unknown).internal())??
            .is_ok()
    }

    /// Return a list of all available worker addresses on a node
    pub async fn list_workers(&self) -> Result<Vec<Address>> {
        let (msg, mut reply_rx) = NodeMessage::list_workers();

        self.sender
            .send(msg)
            .await
            .map_err(NodeError::from_send_err)?;

        reply_rx
            .recv()
            .await
            .ok_or_else(|| NodeError::NodeState(NodeReason::Unknown).internal())??
            .take_workers()
    }

    /// Register a router for a specific address type
    pub async fn register<A: Into<Address>>(&self, type_: TransportType, addr: A) -> Result<()> {
        self.register_impl(type_, addr.into()).await
    }

    /// Send a shutdown acknowledgement to the router
    pub(crate) async fn send_stop_ack(&self) -> Result<()> {
        self.sender
            .send(NodeMessage::StopAck(self.address()))
            .await
            .map_err(NodeError::from_send_err)?;
        Ok(())
    }

    async fn register_impl(&self, type_: TransportType, addr: Address) -> Result<()> {
        let (tx, mut rx) = small_channel();
        self.sender
            .send(NodeMessage::Router(type_, addr, tx))
            .await
            .map_err(NodeError::from_send_err)?;

        rx.recv()
            .await
            .ok_or_else(|| NodeError::NodeState(NodeReason::Unknown).internal())??;
        Ok(())
    }

    /// A convenience function to get a data 3-tuple from the mailbox
    ///
    /// The reason this function doesn't construct a `Cancel<_, M>` is
    /// to avoid the lifetime collision between the mutation on `self`
    /// and the ref to `Context` passed to `Cancel::new(..)`
    ///
    /// This function will block and re-queue messages into the
    /// mailbox until it can receive the correct message payload.
    ///
    /// WARNING: this will temporarily create a busyloop, this
    /// mechanism should be replaced with a waker system that lets the
    /// mailbox work not yield another message until the relay worker
    /// has woken it.
    async fn next_from_mailbox<M: Message>(&mut self) -> Result<(M, LocalMessage, Address)> {
        loop {
            let msg = self
                .receiver_next()
                .await?
                .ok_or_else(|| NodeError::Data.not_found())?;
            let addr = msg.destination().clone();
            let local_msg = msg.into_local_message();

            // FIXME: make message parsing idempotent to avoid cloning
            match parser::message(&local_msg.transport().payload).ok() {
                Some(msg) => break Ok((msg, local_msg, addr)),
                None => {
                    // Requeue
                    self.forward(local_msg).await?;
                }
            }
        }
    }

    /// This function is called by Relay to indicate a worker is initialised
    pub(crate) async fn set_ready(&mut self) -> Result<()> {
        self.sender
            .send(NodeMessage::set_ready(self.address()))
            .await
            .map_err(NodeError::from_send_err)?;
        Ok(())
    }

    /// Wait for a particular address to become "ready"
    pub async fn wait_for<A: Into<Address>>(&mut self, addr: A) -> Result<()> {
        let (msg, mut reply) = NodeMessage::get_ready(addr.into());
        self.sender
            .send(msg)
            .await
            .map_err(NodeError::from_send_err)?;

        // This call blocks until the address has become ready or is
        // dropped by the router
        reply
            .recv()
            .await
            .ok_or_else(|| NodeError::NodeState(NodeReason::Unknown).internal())??;
        Ok(())
    }
}