rustzmq2 0.1.0

A native async Rust implementation of ZeroMQ
Documentation
use crate::engine::backend::GenericSocketBackend;
use crate::socket::common::{HasCommon, SocketCommon};
use crate::socket::core::SocketCore;
use crate::{
    Socket, SocketBackend, SocketOptions, SocketSend, SocketType, ZmqError, ZmqMessage, ZmqResult,
};

/// Scatter socket (SCATTER). Distributes messages round-robin to connected GATHER peers.
///
/// Counterpart to [`GatherSocket`](crate::GatherSocket). Similar to PUSH/PULL but
/// intended for symmetric scatter/gather pipelines.
///
/// See [`zmq_socket(3)`](https://libzmq.readthedocs.io/en/latest/zmq_socket.html) —
/// SCATTER/GATHER are the single-part analogues of PUSH/PULL.
pub struct ScatterSocket {
    core: SocketCore,
}

impl HasCommon for ScatterSocket {
    type Backend = GenericSocketBackend;
    fn common(&self) -> &SocketCommon<Self::Backend> {
        &self.core.common
    }
    fn common_mut(&mut self) -> &mut SocketCommon<Self::Backend> {
        &mut self.core.common
    }
}

impl Socket for ScatterSocket {
    type Backend = GenericSocketBackend;

    fn with_options(options: SocketOptions) -> Self {
        Self {
            core: SocketCore::new(SocketType::SCATTER, options),
        }
    }

    async fn connect<E>(&mut self, endpoint: E) -> ZmqResult<()>
    where
        E: TryInto<crate::endpoint::Endpoint> + Send,
        E::Error: Into<crate::ZmqError>,
    {
        let endpoint = endpoint.try_into().map_err(Into::into)?;
        self.core.connect_endpoint(endpoint).await
    }

    async fn linger_drain(&mut self) {
        self.core.linger_drain().await;
    }
}

impl SocketSend for ScatterSocket {
    async fn send(&mut self, message: impl Into<ZmqMessage> + Send) -> ZmqResult<()> {
        let message = message.into();
        if message.len() > 1 {
            return Err(ZmqError::Socket(
                "SCATTER does not support multipart messages".into(),
            ));
        }
        let send_timeout = self.core.common.backend.socket_options().send_timeout;
        self.core
            .common
            .backend
            .send_round_robin_timed(message, send_timeout)
            .await?;
        self.core.after_send().await;
        Ok(())
    }
}