rustzmq2 0.1.0

A native async Rust implementation of ZeroMQ
Documentation
use crate::codec::{CodecError, Message};
use crate::engine::backend::GenericSocketBackend;
use crate::socket::common::{HasCommon, SocketCommon};
use crate::socket::core::SocketCore;
use crate::{
    Socket, SocketBackend, SocketOptions, SocketRecv, SocketSend, SocketType, ZmqError, ZmqMessage,
    ZmqResult,
};

use flume::Receiver;

/// Channel socket (CHANNEL). Bidirectional point-to-point channel.
///
/// Can both send and receive freely, with no ordering constraint. Use [`PairSocket`](crate::PairSocket)
/// for exclusive one-to-one connections; CHANNEL allows multiple peers.
///
/// See [`zmq_socket(3)`](https://libzmq.readthedocs.io/en/latest/zmq_socket.html) for
/// libzmq's CHANNEL option set.
pub struct ChannelSocket {
    core: SocketCore,
    inbound: Receiver<(
        crate::engine::registry::PeerKey,
        Result<Message, CodecError>,
    )>,
}

impl HasCommon for ChannelSocket {
    type Backend = GenericSocketBackend;
    fn common(&self) -> &SocketCommon<Self::Backend> {
        &self.core.common
    }
    fn common_mut(&mut self) -> &mut SocketCommon<Self::Backend> {
        &mut self.core.common
    }
}

impl Socket for ChannelSocket {
    type Backend = GenericSocketBackend;

    fn with_options(options: SocketOptions) -> Self {
        let core = SocketCore::new(SocketType::CHANNEL, options);
        let inbound = core.common.backend.inbound();
        Self { core, inbound }
    }

    async fn connect<E>(&mut self, endpoint: E) -> ZmqResult<()>
    where
        E: TryInto<crate::endpoint::Endpoint> + Send,
        E::Error: Into<crate::ZmqError>,
    {
        let endpoint = endpoint.try_into().map_err(Into::into)?;
        self.core.connect_endpoint(endpoint).await
    }

    async fn linger_drain(&mut self) {
        self.core.linger_drain().await;
    }
}

impl SocketSend for ChannelSocket {
    async fn send(&mut self, message: impl Into<ZmqMessage> + Send) -> ZmqResult<()> {
        let message = message.into();
        if message.len() > 1 {
            return Err(ZmqError::Socket(
                "CHANNEL does not support multipart messages".into(),
            ));
        }
        let send_timeout = self.core.common.backend.socket_options().send_timeout;
        self.core
            .common
            .backend
            .send_round_robin_timed(message, send_timeout)
            .await?;
        self.core.after_send().await;
        Ok(())
    }
}

impl SocketRecv for ChannelSocket {
    async fn recv(&mut self) -> ZmqResult<ZmqMessage> {
        let receive_timeout = self.core.common.backend.socket_options().receive_timeout;
        loop {
            let (_peer_id, message) = self
                .core
                .common
                .backend
                .recv_auto(&self.inbound, receive_timeout)
                .await?;
            if message.len() == 1 {
                return Ok(message);
            }
            // silently drop multipart messages, matching libzmq channel.cpp behaviour
        }
    }
}