rustzmq2 0.1.0

A native async Rust implementation of ZeroMQ
Documentation
use crate::codec::{CodecError, Message};
use crate::engine::backend::GenericSocketBackend;
use crate::socket::common::{HasCommon, SocketCommon};
use crate::socket::core::SocketCore;
use crate::{Socket, SocketBackend, SocketOptions, SocketRecv, SocketType, ZmqMessage, ZmqResult};

use flume::Receiver;

/// Gather socket (GATHER). Receives messages fair-queued from connected SCATTER peers.
///
/// Counterpart to [`ScatterSocket`](crate::ScatterSocket).
///
/// See [`zmq_socket(3)`](https://libzmq.readthedocs.io/en/latest/zmq_socket.html) —
/// SCATTER/GATHER are the single-part analogues of PUSH/PULL.
pub struct GatherSocket {
    core: SocketCore,
    inbound: Receiver<(
        crate::engine::registry::PeerKey,
        Result<Message, CodecError>,
    )>,
}

impl HasCommon for GatherSocket {
    type Backend = GenericSocketBackend;
    fn common(&self) -> &SocketCommon<Self::Backend> {
        &self.core.common
    }
    fn common_mut(&mut self) -> &mut SocketCommon<Self::Backend> {
        &mut self.core.common
    }
}

impl Socket for GatherSocket {
    type Backend = GenericSocketBackend;

    fn with_options(options: SocketOptions) -> Self {
        let core = SocketCore::new(SocketType::GATHER, options);
        let inbound = core.common.backend.inbound();
        Self { core, inbound }
    }

    async fn connect<E>(&mut self, endpoint: E) -> ZmqResult<()>
    where
        E: TryInto<crate::endpoint::Endpoint> + Send,
        E::Error: Into<crate::ZmqError>,
    {
        let endpoint = endpoint.try_into().map_err(Into::into)?;
        self.core.connect_endpoint(endpoint).await
    }

    async fn linger_drain(&mut self) {
        self.core.linger_drain().await;
    }
}

impl SocketRecv for GatherSocket {
    async fn recv(&mut self) -> ZmqResult<ZmqMessage> {
        let receive_timeout = self.core.common.backend.socket_options().receive_timeout;
        loop {
            let (_peer_id, message) = self
                .core
                .common
                .backend
                .recv_auto(&self.inbound, receive_timeout)
                .await?;
            if message.len() == 1 {
                return Ok(message);
            }
            // silently drop multipart messages, matching libzmq gather.cpp behaviour
        }
    }
}