rustzmq2 0.1.0

A native async Rust implementation of ZeroMQ
Documentation
use crate::codec::{CodecError, Message};
use crate::engine::backend::GenericSocketBackend;
use crate::socket::common::{HasCommon, SocketCommon};
use crate::socket::core::SocketCore;
use crate::{
    Socket, SocketBackend, SocketOptions, SocketRecv, SocketSend, SocketType, ZmqMessage, ZmqResult,
};

use flume::Receiver;

/// Pair socket (PAIR). Exclusive one-to-one bidirectional channel.
///
/// Only one peer may be connected at a time. Use for tightly-coupled thread or
/// process pairs. For multi-peer patterns use DEALER/ROUTER instead.
///
/// See [RFC 31](https://rfc.zeromq.org/spec/31/) for the EXPAIR wire
/// contract and [`zmq_socket(3)`](https://libzmq.readthedocs.io/en/latest/zmq_socket.html).
pub struct PairSocket {
    core: SocketCore,
    inbound: Receiver<(
        crate::engine::registry::PeerKey,
        Result<Message, CodecError>,
    )>,
}

impl HasCommon for PairSocket {
    type Backend = GenericSocketBackend;
    fn common(&self) -> &SocketCommon<Self::Backend> {
        &self.core.common
    }
    fn common_mut(&mut self) -> &mut SocketCommon<Self::Backend> {
        &mut self.core.common
    }
}

impl Socket for PairSocket {
    type Backend = GenericSocketBackend;

    fn with_options(options: SocketOptions) -> Self {
        let core = SocketCore::new(SocketType::PAIR, options);
        let inbound = core.common.backend.inbound();
        Self { core, inbound }
    }

    async fn connect<E>(&mut self, endpoint: E) -> ZmqResult<()>
    where
        E: TryInto<crate::endpoint::Endpoint> + Send,
        E::Error: Into<crate::ZmqError>,
    {
        let endpoint = endpoint.try_into().map_err(Into::into)?;
        self.core.connect_endpoint(endpoint).await
    }

    async fn linger_drain(&mut self) {
        self.core.linger_drain().await;
    }
}

impl SocketSend for PairSocket {
    async fn send(&mut self, message: impl Into<ZmqMessage> + Send) -> ZmqResult<()> {
        let message = message.into();
        let send_timeout = self.core.common.backend.socket_options().send_timeout;
        self.core
            .common
            .backend
            .send_round_robin_timed(message, send_timeout)
            .await?;
        self.core.after_send().await;
        Ok(())
    }
}

impl SocketRecv for PairSocket {
    async fn recv(&mut self) -> ZmqResult<ZmqMessage> {
        let receive_timeout = self.core.common.backend.socket_options().receive_timeout;
        let (_peer_id, message) = self
            .core
            .common
            .backend
            .recv_auto(&self.inbound, receive_timeout)
            .await?;
        Ok(message)
    }
}