rustzmq2 0.1.0

A native async Rust implementation of ZeroMQ
Documentation
//! `SocketType` enum + compatibility matrix.

use crate::ZmqError;

use std::convert::TryFrom;
use std::fmt::{self, Display};
use std::str::FromStr;

// 15 columns: PAIR PUB SUB REQ REP DEALER ROUTER PULL PUSH XPUB XSUB STREAM SCATTER GATHER CHANNEL
#[rustfmt::skip]
const COMPATIBILITY_MATRIX: [u8; 225] = [
    // PAIR  PUB  SUB  REQ  REP  DEAL ROUT PULL PUSH XPUB XSUB STR  SCAT GATH CHAN
    1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, // PAIR
    0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, // PUB
    0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, // SUB
    0, 0, 0, 0, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, // REQ
    0, 0, 0, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, // REP
    0, 0, 0, 0, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, // DEALER
    0, 0, 0, 1, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, // ROUTER
    0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, // PULL
    0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, // PUSH
    0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, // XPUB
    0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, // XSUB
    0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, // STREAM
    0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, // SCATTER
    0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, // GATHER
    0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, // CHANNEL
];

/// Identifies the ZMQ socket pattern. Used internally and in [`SocketEvent`](crate::SocketEvent)s.
#[allow(clippy::upper_case_acronyms)]
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
#[repr(usize)]
pub enum SocketType {
    PAIR = 0,
    PUB = 1,
    SUB = 2,
    REQ = 3,
    REP = 4,
    DEALER = 5,
    ROUTER = 6,
    PULL = 7,
    PUSH = 8,
    XPUB = 9,
    XSUB = 10,
    STREAM = 11,
    SCATTER = 12,
    GATHER = 13,
    CHANNEL = 14,
}

impl SocketType {
    /// Returns the canonical ZMTP string name (e.g. `"REQ"`, `"PUB"`).
    pub const fn as_str(&self) -> &'static str {
        match self {
            SocketType::PAIR => "PAIR",
            SocketType::PUB => "PUB",
            SocketType::SUB => "SUB",
            SocketType::REQ => "REQ",
            SocketType::REP => "REP",
            SocketType::DEALER => "DEALER",
            SocketType::ROUTER => "ROUTER",
            SocketType::PULL => "PULL",
            SocketType::PUSH => "PUSH",
            SocketType::XPUB => "XPUB",
            SocketType::XSUB => "XSUB",
            SocketType::STREAM => "STREAM",
            SocketType::SCATTER => "SCATTER",
            SocketType::GATHER => "GATHER",
            SocketType::CHANNEL => "CHANNEL",
        }
    }

    /// Checks if two sockets are compatible with each other
    /// ```
    /// use rustzmq2::SocketType;
    /// assert!(SocketType::PUB.compatible(SocketType::SUB));
    /// assert!(SocketType::REQ.compatible(SocketType::REP));
    /// assert!(SocketType::DEALER.compatible(SocketType::ROUTER));
    /// assert!(!SocketType::PUB.compatible(SocketType::REP));
    /// ```
    pub fn compatible(&self, other: SocketType) -> bool {
        let row_index = *self as usize;
        let col_index = other as usize;
        COMPATIBILITY_MATRIX[row_index * 15 + col_index] != 0
    }

    /// Whether this socket uses the remote peer's routing identity as the
    /// `PeerIdentity` under which the peer is registered. Matches libzmq's
    /// `options.recv_routing_id`, which is only set to `true` for ROUTER
    /// (`libzmq/src/router.cpp:29`). Over the wire this is signaled via
    /// the `Identity` ZMTP property; over inproc we exchange it in the
    /// handshake payload.
    #[cfg(feature = "inproc")]
    pub(crate) fn wants_remote_routing_id(&self) -> bool {
        matches!(self, SocketType::ROUTER)
    }

    /// Default for [`SocketOptions::inline_write_max`] when the user
    /// hasn't set one explicitly.
    ///
    /// `Some(0)` (uncapped inline) for protocols whose queue depth is
    /// ≤ 1 by construction — REQ/REP lockstep, PAIR exclusive 1:1.
    /// `None` (disabled) for everything else, where inline would
    /// bypass `drain_batch` coalescing on throughput-shaped workloads.
    ///
    /// [`SocketOptions::inline_write_max`]: crate::SocketOptionsBuilder::inline_write_max
    pub(crate) fn default_inline_write_max(&self) -> Option<usize> {
        match self {
            SocketType::REQ | SocketType::REP | SocketType::PAIR => Some(0),
            _ => None,
        }
    }

    /// Default for [`SocketOptions::out_batch_msgs`] when the user
    /// hasn't set one explicitly.
    ///
    /// PUB / XPUB get `Some(32)`: fanout sockets need a low per-peer
    /// drain cap so one slow peer's pending queue can't overflow HWM
    /// and trigger drops. Everything else gets `Some(256)`: point-to-
    /// point sockets have one peer in flight per send and benefit from
    /// the larger per-syscall amortization.
    ///
    /// [`SocketOptions::out_batch_msgs`]: crate::SocketOptionsBuilder::out_batch_msgs
    pub(crate) fn default_out_batch_msgs(&self) -> Option<usize> {
        match self {
            SocketType::PUB | SocketType::XPUB => Some(32),
            _ => Some(256),
        }
    }
}

impl FromStr for SocketType {
    type Err = ZmqError;

    #[inline]
    fn from_str(s: &str) -> Result<Self, ZmqError> {
        Self::try_from(s.as_bytes())
    }
}

impl TryFrom<&[u8]> for SocketType {
    type Error = ZmqError;

    fn try_from(s: &[u8]) -> Result<Self, ZmqError> {
        Ok(match s {
            b"PAIR" => SocketType::PAIR,
            b"PUB" => SocketType::PUB,
            b"SUB" => SocketType::SUB,
            b"REQ" => SocketType::REQ,
            b"REP" => SocketType::REP,
            b"DEALER" => SocketType::DEALER,
            b"ROUTER" => SocketType::ROUTER,
            b"PULL" => SocketType::PULL,
            b"PUSH" => SocketType::PUSH,
            b"XPUB" => SocketType::XPUB,
            b"XSUB" => SocketType::XSUB,
            b"STREAM" => SocketType::STREAM,
            b"SCATTER" => SocketType::SCATTER,
            b"GATHER" => SocketType::GATHER,
            b"CHANNEL" => SocketType::CHANNEL,
            _ => return Err(ZmqError::Other("Unknown socket type".into())),
        })
    }
}

impl Display for SocketType {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.write_str(self.as_str())
    }
}

impl AsRef<str> for SocketType {
    fn as_ref(&self) -> &str {
        self.as_str()
    }
}