1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
//! `SocketType` enum + compatibility matrix.
use crate::ZmqError;
use std::convert::TryFrom;
use std::fmt::{self, Display};
use std::str::FromStr;
// 15 columns: PAIR PUB SUB REQ REP DEALER ROUTER PULL PUSH XPUB XSUB STREAM SCATTER GATHER CHANNEL
#[rustfmt::skip]
const COMPATIBILITY_MATRIX: [u8; 225] = [
// PAIR PUB SUB REQ REP DEAL ROUT PULL PUSH XPUB XSUB STR SCAT GATH CHAN
1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, // PAIR
0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, // PUB
0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, // SUB
0, 0, 0, 0, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, // REQ
0, 0, 0, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, // REP
0, 0, 0, 0, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, // DEALER
0, 0, 0, 1, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, // ROUTER
0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, // PULL
0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, // PUSH
0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, // XPUB
0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, // XSUB
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, // STREAM
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, // SCATTER
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, // GATHER
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, // CHANNEL
];
/// Identifies the ZMQ socket pattern. Used internally and in [`SocketEvent`](crate::SocketEvent)s.
#[allow(clippy::upper_case_acronyms)]
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
#[repr(usize)]
pub enum SocketType {
PAIR = 0,
PUB = 1,
SUB = 2,
REQ = 3,
REP = 4,
DEALER = 5,
ROUTER = 6,
PULL = 7,
PUSH = 8,
XPUB = 9,
XSUB = 10,
STREAM = 11,
SCATTER = 12,
GATHER = 13,
CHANNEL = 14,
}
impl SocketType {
/// Returns the canonical ZMTP string name (e.g. `"REQ"`, `"PUB"`).
pub const fn as_str(&self) -> &'static str {
match self {
SocketType::PAIR => "PAIR",
SocketType::PUB => "PUB",
SocketType::SUB => "SUB",
SocketType::REQ => "REQ",
SocketType::REP => "REP",
SocketType::DEALER => "DEALER",
SocketType::ROUTER => "ROUTER",
SocketType::PULL => "PULL",
SocketType::PUSH => "PUSH",
SocketType::XPUB => "XPUB",
SocketType::XSUB => "XSUB",
SocketType::STREAM => "STREAM",
SocketType::SCATTER => "SCATTER",
SocketType::GATHER => "GATHER",
SocketType::CHANNEL => "CHANNEL",
}
}
/// Checks if two sockets are compatible with each other
/// ```
/// use rustzmq2::SocketType;
/// assert!(SocketType::PUB.compatible(SocketType::SUB));
/// assert!(SocketType::REQ.compatible(SocketType::REP));
/// assert!(SocketType::DEALER.compatible(SocketType::ROUTER));
/// assert!(!SocketType::PUB.compatible(SocketType::REP));
/// ```
pub fn compatible(&self, other: SocketType) -> bool {
let row_index = *self as usize;
let col_index = other as usize;
COMPATIBILITY_MATRIX[row_index * 15 + col_index] != 0
}
/// Whether this socket uses the remote peer's routing identity as the
/// `PeerIdentity` under which the peer is registered. Matches libzmq's
/// `options.recv_routing_id`, which is only set to `true` for ROUTER
/// (`libzmq/src/router.cpp:29`). Over the wire this is signaled via
/// the `Identity` ZMTP property; over inproc we exchange it in the
/// handshake payload.
#[cfg(feature = "inproc")]
pub(crate) fn wants_remote_routing_id(&self) -> bool {
matches!(self, SocketType::ROUTER)
}
/// Default for [`SocketOptions::inline_write_max`] when the user
/// hasn't set one explicitly.
///
/// `Some(0)` (uncapped inline) for protocols whose queue depth is
/// ≤ 1 by construction — REQ/REP lockstep, PAIR exclusive 1:1.
/// `None` (disabled) for everything else, where inline would
/// bypass `drain_batch` coalescing on throughput-shaped workloads.
///
/// [`SocketOptions::inline_write_max`]: crate::SocketOptionsBuilder::inline_write_max
pub(crate) fn default_inline_write_max(&self) -> Option<usize> {
match self {
SocketType::REQ | SocketType::REP | SocketType::PAIR => Some(0),
_ => None,
}
}
/// Default for [`SocketOptions::out_batch_msgs`] when the user
/// hasn't set one explicitly.
///
/// PUB / XPUB get `Some(32)`: fanout sockets need a low per-peer
/// drain cap so one slow peer's pending queue can't overflow HWM
/// and trigger drops. Everything else gets `Some(256)`: point-to-
/// point sockets have one peer in flight per send and benefit from
/// the larger per-syscall amortization.
///
/// [`SocketOptions::out_batch_msgs`]: crate::SocketOptionsBuilder::out_batch_msgs
pub(crate) fn default_out_batch_msgs(&self) -> Option<usize> {
match self {
SocketType::PUB | SocketType::XPUB => Some(32),
_ => Some(256),
}
}
}
impl FromStr for SocketType {
type Err = ZmqError;
#[inline]
fn from_str(s: &str) -> Result<Self, ZmqError> {
Self::try_from(s.as_bytes())
}
}
impl TryFrom<&[u8]> for SocketType {
type Error = ZmqError;
fn try_from(s: &[u8]) -> Result<Self, ZmqError> {
Ok(match s {
b"PAIR" => SocketType::PAIR,
b"PUB" => SocketType::PUB,
b"SUB" => SocketType::SUB,
b"REQ" => SocketType::REQ,
b"REP" => SocketType::REP,
b"DEALER" => SocketType::DEALER,
b"ROUTER" => SocketType::ROUTER,
b"PULL" => SocketType::PULL,
b"PUSH" => SocketType::PUSH,
b"XPUB" => SocketType::XPUB,
b"XSUB" => SocketType::XSUB,
b"STREAM" => SocketType::STREAM,
b"SCATTER" => SocketType::SCATTER,
b"GATHER" => SocketType::GATHER,
b"CHANNEL" => SocketType::CHANNEL,
_ => return Err(ZmqError::Other("Unknown socket type".into())),
})
}
}
impl Display for SocketType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(self.as_str())
}
}
impl AsRef<str> for SocketType {
fn as_ref(&self) -> &str {
self.as_str()
}
}