rustzmq2 0.1.0

A native async Rust implementation of ZeroMQ
Documentation
//! ROUTER socket.
//!
//! Recv prepends the source peer's identity as the first frame. Send
//! pops that identity frame, looks up the peer in the registry, and
//! pushes through the per-peer outbound channel.

use crate::codec::{CodecError, Message};
use crate::engine::backend::GenericSocketBackend;
use crate::socket::common::{HasCommon, SocketCommon};
use crate::socket::core::SocketCore;
use crate::PeerIdentity;
use crate::{
    Socket, SocketBackend, SocketOptions, SocketRecv, SocketSend, SocketType, ZmqError, ZmqMessage,
    ZmqResult,
};

use flume::Receiver;

use std::convert::TryInto;
use std::sync::Arc;

/// Router socket (ROUTER). Async reply socket that tracks each peer's
/// identity so replies can be addressed by identity.
///
/// Use with [`DealerSocket`](crate::DealerSocket) to build async request/reply brokers.
///
/// See [RFC 28](https://rfc.zeromq.org/spec/28/) for the DEALER/ROUTER
/// wire contract and [`zmq_socket(3)`](https://libzmq.readthedocs.io/en/latest/zmq_socket.html).
///
/// # Example
///
/// Prefer the typed [`recv_envelope`](RouterSocket::recv_envelope) /
/// [`send_to`](RouterSocket::send_to) pair — `recv` / `send` exist too
/// (with the identity as the leading frame), but only because generic
/// broker code like [`proxy`](crate::proxy) needs that shape.
///
/// ```rust,no_run
/// use rustzmq2::prelude::*;
///
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
///     let mut router = rustzmq2::RouterSocket::new();
///     router.bind("tcp://127.0.0.1:5555").await?;
///
///     let RouterEnvelope { identity, message } = router.recv_envelope().await?;
///     println!("{} bytes from {identity}", message.iter().map(|f| f.len()).sum::<usize>());
///
///     router.send_to(&identity, ZmqMessage::from("pong")).await?;
///     Ok(())
/// }
/// ```
pub struct RouterSocket {
    core: SocketCore,
    inbound: Receiver<(
        crate::engine::registry::PeerKey,
        Result<Message, CodecError>,
    )>,
}

impl HasCommon for RouterSocket {
    type Backend = GenericSocketBackend;
    fn common(&self) -> &SocketCommon<Self::Backend> {
        &self.core.common
    }
    fn common_mut(&mut self) -> &mut SocketCommon<Self::Backend> {
        &mut self.core.common
    }
}

impl Socket for RouterSocket {
    type Backend = GenericSocketBackend;

    fn with_options(options: SocketOptions) -> Self {
        let core = SocketCore::new(SocketType::ROUTER, options);
        let inbound = core.common.backend.inbound();
        Self { core, inbound }
    }

    async fn connect<E>(&mut self, endpoint: E) -> ZmqResult<()>
    where
        E: TryInto<crate::endpoint::Endpoint> + Send,
        E::Error: Into<crate::ZmqError>,
    {
        let endpoint = endpoint.try_into().map_err(Into::into)?;
        self.core.connect_endpoint(endpoint).await
    }

    async fn linger_drain(&mut self) {
        self.core.linger_drain().await;
    }
}

impl SocketRecv for RouterSocket {
    async fn recv(&mut self) -> ZmqResult<ZmqMessage> {
        let receive_timeout = self.core.common.backend.socket_options().receive_timeout;
        let (key, mut message) = self
            .core
            .common
            .backend
            .recv_auto(&self.inbound, receive_timeout)
            .await?;
        // ROUTER is the one socket that puts `PeerIdentity` on the wire
        // envelope, so we pay one Bytes clone per recv here — everywhere
        // else uses the PeerKey directly.
        let peer_id = self
            .core
            .common
            .backend
            .registry()
            .id_for(key)
            .ok_or(ZmqError::Other(
                "Peer disappeared before ROUTER envelope".into(),
            ))?;
        message.push_front(peer_id);
        Ok(message)
    }
}

impl SocketSend for RouterSocket {
    async fn send(&mut self, message: impl Into<ZmqMessage> + Send) -> ZmqResult<()> {
        let mut message = message.into();
        if message.len() <= 1 {
            return Err(ZmqError::Socket(
                "ROUTER send requires at least 2 frames: identity frame + message".into(),
            ));
        }
        let peer_id: PeerIdentity = message.pop_front().unwrap().try_into()?;
        let send_timeout = self.core.common.backend.socket_options().send_timeout;
        self.core
            .common
            .backend
            .send_to_timed(&peer_id, message, send_timeout)
            .await?;
        self.core.after_send().await;
        Ok(())
    }
}

/// A message received from a [`RouterSocket`]: the originating peer's
/// identity plus its payload, with the envelope already split apart.
///
/// Returned by [`RouterSocket::recv_envelope`] /
/// [`RouterRecvHalf::recv_envelope`]. The same identity can be passed to
/// [`RouterSocket::send_to`] / [`RouterSendHalf::send_to`] to address a
/// reply back to that peer.
#[derive(Debug, Clone)]
pub struct RouterEnvelope {
    pub identity: PeerIdentity,
    pub message: ZmqMessage,
}

impl RouterSocket {
    /// Receive the next message and return it split into `(identity, payload)`.
    ///
    /// More ergonomic than [`SocketRecv::recv`], which returns the
    /// concatenated `[identity, payload...]` form for compatibility with
    /// generic broker code (e.g. [`proxy`](crate::proxy)).
    ///
    /// ```rust,no_run
    /// use rustzmq2::prelude::*;
    ///
    /// # async fn run() -> Result<(), Box<dyn std::error::Error>> {
    /// let mut router = rustzmq2::RouterSocket::new();
    /// router.bind("tcp://127.0.0.1:5555").await?;
    ///
    /// let RouterEnvelope { identity, message } = router.recv_envelope().await?;
    /// router.send_to(&identity, ZmqMessage::from("pong")).await?;
    /// # Ok(()) }
    /// ```
    pub async fn recv_envelope(&mut self) -> ZmqResult<RouterEnvelope> {
        let receive_timeout = self.core.common.backend.socket_options().receive_timeout;
        let (key, message) = self
            .core
            .common
            .backend
            .recv_auto(&self.inbound, receive_timeout)
            .await?;
        let identity = self
            .core
            .common
            .backend
            .registry()
            .id_for(key)
            .ok_or(ZmqError::Other(
                "Peer disappeared before ROUTER envelope".into(),
            ))?;
        Ok(RouterEnvelope { identity, message })
    }

    /// Send `message` to the peer identified by `identity`. The message
    /// must not include an identity frame — pass the payload only.
    ///
    /// Errors with [`ZmqError::ReturnToSender`] if the peer is no longer
    /// connected.
    pub async fn send_to(
        &mut self,
        identity: &PeerIdentity,
        message: impl Into<ZmqMessage> + Send,
    ) -> ZmqResult<()> {
        let send_timeout = self.core.common.backend.socket_options().send_timeout;
        self.core
            .common
            .backend
            .send_to_timed(identity, message.into(), send_timeout)
            .await?;
        self.core.after_send().await;
        Ok(())
    }

    /// Splits the socket into separate send and recv halves, allowing
    /// concurrent sending and receiving from independent async tasks.
    pub fn split(mut self) -> (RouterSendHalf, RouterRecvHalf) {
        let dummy = SocketCore::new(SocketType::ROUTER, SocketOptions::default());
        let inbound = std::mem::replace(&mut self.inbound, dummy.common.backend.inbound());
        let core = std::mem::replace(&mut self.core, dummy);

        let inner = Arc::new(RouterSocketInner { core });

        (
            RouterSendHalf {
                inner: inner.clone(),
            },
            RouterRecvHalf { inner, inbound },
        )
    }
}

struct RouterSocketInner {
    core: SocketCore,
}

/// The send half of a [`RouterSocket`] produced by [`RouterSocket::split`].
#[derive(Clone)]
pub struct RouterSendHalf {
    inner: Arc<RouterSocketInner>,
}

/// The recv half of a [`RouterSocket`] produced by [`RouterSocket::split`].
pub struct RouterRecvHalf {
    inner: Arc<RouterSocketInner>,
    inbound: Receiver<(
        crate::engine::registry::PeerKey,
        Result<Message, CodecError>,
    )>,
}

impl SocketSend for RouterSendHalf {
    async fn send(&mut self, message: impl Into<ZmqMessage> + Send) -> ZmqResult<()> {
        let mut message = message.into();
        if message.len() <= 1 {
            return Err(ZmqError::Socket(
                "ROUTER send requires at least 2 frames: identity frame + message".into(),
            ));
        }
        let peer_id: PeerIdentity = message.pop_front().unwrap().try_into()?;
        let send_timeout = self.inner.core.common.backend.socket_options().send_timeout;
        self.inner
            .core
            .common
            .backend
            .send_to_timed(&peer_id, message, send_timeout)
            .await?;
        self.inner.core.after_send().await;
        Ok(())
    }
}

impl SocketRecv for RouterRecvHalf {
    async fn recv(&mut self) -> ZmqResult<ZmqMessage> {
        let receive_timeout = self
            .inner
            .core
            .common
            .backend
            .socket_options()
            .receive_timeout;
        let (key, mut message) = self
            .inner
            .core
            .common
            .backend
            .recv_auto(&self.inbound, receive_timeout)
            .await?;
        let peer_id = self
            .inner
            .core
            .common
            .backend
            .registry()
            .id_for(key)
            .ok_or(ZmqError::Other(
                "Peer disappeared before ROUTER envelope".into(),
            ))?;
        message.push_front(peer_id);
        Ok(message)
    }
}

impl RouterRecvHalf {
    /// Receive the next message and return it split into `(identity, payload)`.
    /// See [`RouterSocket::recv_envelope`].
    pub async fn recv_envelope(&mut self) -> ZmqResult<RouterEnvelope> {
        let receive_timeout = self
            .inner
            .core
            .common
            .backend
            .socket_options()
            .receive_timeout;
        let (key, message) = self
            .inner
            .core
            .common
            .backend
            .recv_auto(&self.inbound, receive_timeout)
            .await?;
        let identity = self
            .inner
            .core
            .common
            .backend
            .registry()
            .id_for(key)
            .ok_or(ZmqError::Other(
                "Peer disappeared before ROUTER envelope".into(),
            ))?;
        Ok(RouterEnvelope { identity, message })
    }
}

impl RouterSendHalf {
    /// Send `message` to the peer identified by `identity`.
    /// See [`RouterSocket::send_to`].
    pub async fn send_to(
        &mut self,
        identity: &PeerIdentity,
        message: impl Into<ZmqMessage> + Send,
    ) -> ZmqResult<()> {
        let send_timeout = self.inner.core.common.backend.socket_options().send_timeout;
        self.inner
            .core
            .common
            .backend
            .send_to_timed(identity, message.into(), send_timeout)
            .await?;
        self.inner.core.after_send().await;
        Ok(())
    }
}