rustzmq2 0.1.0

A native async Rust implementation of ZeroMQ
Documentation
//! DEALER socket.
//!
//! `send` is round-robin across connected peers; `recv` pulls from the
//! shared inbound channel.

use crate::codec::{CodecError, Message};
use crate::engine::backend::GenericSocketBackend;
use crate::socket::common::{HasCommon, SocketCommon};
use crate::socket::core::SocketCore;
use crate::{
    CaptureSocket, Socket, SocketBackend, SocketOptions, SocketRecv, SocketSend, SocketType,
    ZmqMessage, ZmqResult,
};

use flume::Receiver;

use std::sync::Arc;

/// Dealer socket (DEALER). Async request socket without the strict send/recv alternation of REQ.
///
/// Sends messages round-robin across connected peers. Use with [`RouterSocket`](crate::RouterSocket)
/// for async request/reply, or as a load-balancing client across multiple REP workers.
///
/// See [RFC 28](https://rfc.zeromq.org/spec/28/) for the DEALER/ROUTER
/// wire contract and [`zmq_socket(3)`](https://libzmq.readthedocs.io/en/latest/zmq_socket.html).
pub struct DealerSocket {
    core: SocketCore,
    inbound: Receiver<(
        crate::engine::registry::PeerKey,
        Result<Message, CodecError>,
    )>,
}

impl HasCommon for DealerSocket {
    type Backend = GenericSocketBackend;
    fn common(&self) -> &SocketCommon<Self::Backend> {
        &self.core.common
    }
    fn common_mut(&mut self) -> &mut SocketCommon<Self::Backend> {
        &mut self.core.common
    }
}

impl Socket for DealerSocket {
    type Backend = GenericSocketBackend;

    fn with_options(options: SocketOptions) -> Self {
        let core = SocketCore::new(SocketType::DEALER, options);
        let inbound = core.common.backend.inbound();
        Self { core, inbound }
    }

    async fn connect<E>(&mut self, endpoint: E) -> ZmqResult<()>
    where
        E: TryInto<crate::endpoint::Endpoint> + Send,
        E::Error: Into<crate::ZmqError>,
    {
        let endpoint = endpoint.try_into().map_err(Into::into)?;
        self.core.connect_endpoint(endpoint).await
    }

    async fn linger_drain(&mut self) {
        self.core.linger_drain().await;
    }
}

impl SocketRecv for DealerSocket {
    async fn recv(&mut self) -> ZmqResult<ZmqMessage> {
        let receive_timeout = self.core.common.backend.socket_options().receive_timeout;
        let (_peer_id, message) = self
            .core
            .common
            .backend
            .recv_auto(&self.inbound, receive_timeout)
            .await?;
        Ok(message)
    }
}

impl SocketSend for DealerSocket {
    async fn send(&mut self, message: impl Into<ZmqMessage> + Send) -> ZmqResult<()> {
        let message = message.into();
        let send_timeout = self.core.common.backend.socket_options().send_timeout;
        self.core
            .common
            .backend
            .send_round_robin_timed(message, send_timeout)
            .await?;
        self.core.after_send().await;
        Ok(())
    }
}

impl CaptureSocket for DealerSocket {}

impl DealerSocket {
    /// Splits the socket into send + recv halves, allowing concurrent
    /// use from independent tasks.
    ///
    /// [`DealerSendHalf`] is `Clone` so multiple producer tasks can share
    /// it; [`DealerRecvHalf`] is unique because the inbound channel
    /// has a single consumer.
    ///
    /// # Example
    ///
    /// ```rust,no_run
    /// use rustzmq2::prelude::*;
    ///
    /// #[tokio::main]
    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
    ///     let mut dealer = rustzmq2::DealerSocket::new();
    ///     dealer.connect("tcp://127.0.0.1:5555").await?;
    ///     let (mut tx, mut rx) = dealer.split();
    ///
    ///     // Recv loop in its own task.
    ///     let recv_task = tokio::spawn(async move {
    ///         while let Ok(reply) = rx.recv().await {
    ///             println!("reply: {:?}", reply);
    ///         }
    ///     });
    ///
    ///     for i in 0..10 {
    ///         tx.send(format!("req-{i}")).await?;
    ///     }
    ///     drop(tx); // close the send side; recv_task exits when peer disconnects
    ///     recv_task.await.ok();
    ///     Ok(())
    /// }
    /// ```
    pub fn split(mut self) -> (DealerSendHalf, DealerRecvHalf) {
        let dummy = SocketCore::new(SocketType::DEALER, SocketOptions::default());
        let inbound = std::mem::replace(&mut self.inbound, dummy.common.backend.inbound());
        let core = std::mem::replace(&mut self.core, dummy);

        let inner = Arc::new(DealerSocketInner { core });

        (
            DealerSendHalf {
                inner: inner.clone(),
            },
            DealerRecvHalf { inner, inbound },
        )
    }
}

struct DealerSocketInner {
    core: SocketCore,
}

/// Send half of a [`DealerSocket`] produced by [`DealerSocket::split`].
#[derive(Clone)]
pub struct DealerSendHalf {
    inner: Arc<DealerSocketInner>,
}

/// Recv half of a [`DealerSocket`] produced by [`DealerSocket::split`].
pub struct DealerRecvHalf {
    inner: Arc<DealerSocketInner>,
    inbound: Receiver<(
        crate::engine::registry::PeerKey,
        Result<Message, CodecError>,
    )>,
}

impl SocketSend for DealerSendHalf {
    async fn send(&mut self, message: impl Into<ZmqMessage> + Send) -> ZmqResult<()> {
        let message = message.into();
        self.inner
            .core
            .common
            .backend
            .send_round_robin(message)
            .await?;
        self.inner.core.after_send().await;
        Ok(())
    }
}

impl CaptureSocket for DealerSendHalf {}

impl SocketRecv for DealerRecvHalf {
    async fn recv(&mut self) -> ZmqResult<ZmqMessage> {
        let receive_timeout = self
            .inner
            .core
            .common
            .backend
            .socket_options()
            .receive_timeout;
        let (_peer_id, message) = self
            .inner
            .core
            .common
            .backend
            .recv_auto(&self.inbound, receive_timeout)
            .await?;
        Ok(message)
    }
}