use alloc::sync::Arc;
use core::{iter, marker::PhantomData, ops::ControlFlow};
#[cfg(feature = "futures")]
use ::futures::{FutureExt, TryStreamExt};
#[cfg(feature = "futures")]
use async_trait::async_trait;
use bitflags::bitflags;
use derive_more::From;
use num_traits::PrimInt;
use crate::{
ZmqError, ZmqResult,
context::Context,
ffi::RawSocket,
message::{Message, MultipartMessage, Sendable},
sealed, zmq_sys_crate,
};
#[cfg(feature = "draft-api")]
mod channel;
#[cfg(feature = "draft-api")]
mod client;
mod dealer;
#[cfg(feature = "draft-api")]
mod dish;
#[cfg(feature = "draft-api")]
mod gather;
pub(crate) mod monitor;
mod pair;
#[cfg(feature = "draft-api")]
mod peer;
mod publish;
mod pull;
mod push;
#[cfg(feature = "draft-api")]
mod radio;
mod reply;
mod request;
mod router;
#[cfg(feature = "draft-api")]
mod scatter;
#[cfg(feature = "draft-api")]
mod server;
mod stream;
mod subscribe;
mod xpublish;
mod xsubscribe;
#[cfg(feature = "builder")]
pub use builder::SocketBuilder;
#[cfg(feature = "draft-api")]
pub use channel::ChannelSocket;
#[cfg(all(feature = "draft-api", feature = "builder"))]
pub use channel::builder::ChannelBuilder;
#[cfg(feature = "draft-api")]
pub use client::ClientSocket;
#[cfg(all(feature = "draft-api", feature = "builder"))]
pub use client::builder::ClientBuilder;
pub use dealer::DealerSocket;
#[cfg(feature = "builder")]
pub use dealer::builder::DealerBuilder;
#[cfg(feature = "draft-api")]
pub use dish::DishSocket;
#[cfg(all(feature = "draft-api", feature = "builder"))]
pub use dish::builder::DishBuilder;
#[cfg(feature = "draft-api")]
pub use gather::GatherSocket;
#[cfg(all(feature = "draft-api", feature = "builder"))]
pub use gather::builder::GatherBuilder;
use monitor::Monitor;
pub use monitor::{HandshakeProtocolError, MonitorReceiver, MonitorSocket, MonitorSocketEvent};
pub use pair::PairSocket;
#[cfg(feature = "builder")]
pub use pair::builder::PairBuilder;
#[cfg(feature = "draft-api")]
pub use peer::PeerSocket;
#[cfg(all(feature = "draft-api", feature = "builder"))]
pub use peer::builder::PeerBuilder;
pub use publish::PublishSocket;
#[cfg(feature = "builder")]
pub use publish::builder::PublishBuilder;
pub use pull::PullSocket;
#[cfg(feature = "builder")]
pub use pull::builder::PullBuilder;
pub use push::PushSocket;
#[cfg(feature = "builder")]
pub use push::builder::PushBuilder;
#[cfg(feature = "draft-api")]
pub use radio::RadioSocket;
#[cfg(all(feature = "draft-api", feature = "builder"))]
pub use radio::builder::RadioBuilder;
pub use reply::ReplySocket;
#[cfg(feature = "builder")]
pub use reply::builder::ReplyBuilder;
pub use request::RequestSocket;
#[cfg(feature = "builder")]
pub use request::builder::RequestBuilder;
#[cfg(feature = "draft-api")]
pub use router::RouterNotify;
pub use router::RouterSocket;
#[cfg(feature = "builder")]
pub use router::builder::RouterBuilder;
#[cfg(feature = "draft-api")]
pub use scatter::ScatterSocket;
#[cfg(all(feature = "draft-api", feature = "builder"))]
pub use scatter::builder::ScatterBuilder;
#[cfg(feature = "draft-api")]
pub use server::ServerSocket;
#[cfg(all(feature = "draft-api", feature = "builder"))]
pub use server::builder::ServerBuilder;
pub use stream::StreamSocket;
#[cfg(feature = "builder")]
pub use stream::builder::StreamBuilder;
pub use subscribe::SubscribeSocket;
#[cfg(feature = "builder")]
pub use subscribe::builder::SubscribeBuilder;
pub use xpublish::XPublishSocket;
#[cfg(feature = "builder")]
pub use xpublish::builder::XPublishBuilder;
#[doc(hidden)]
pub use xsubscribe::XSubscribeSocket;
#[cfg(feature = "builder")]
pub use xsubscribe::builder::XSubscribeBuilder;
#[cfg(zmq_has = "gssapi")]
use crate::security::GssApiNametype;
use crate::{auth::ZapDomain, security::SecurityMechanism};
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum SocketType {
Pair,
Publish,
Subscribe,
Request,
Reply,
Dealer,
Router,
Pull,
Push,
XPublish,
XSubscribe,
Stream,
#[cfg(feature = "draft-api")]
Server,
#[cfg(feature = "draft-api")]
Client,
#[cfg(feature = "draft-api")]
Radio,
#[cfg(feature = "draft-api")]
Dish,
#[cfg(feature = "draft-api")]
Gather,
#[cfg(feature = "draft-api")]
Scatter,
#[cfg(feature = "draft-api")]
Datagram,
#[cfg(feature = "draft-api")]
Peer,
#[cfg(feature = "draft-api")]
Channel,
}
impl From<SocketType> for i32 {
fn from(value: SocketType) -> Self {
match value {
SocketType::Pair => zmq_sys_crate::ZMQ_PAIR as i32,
SocketType::Publish => zmq_sys_crate::ZMQ_PUB as i32,
SocketType::Subscribe => zmq_sys_crate::ZMQ_SUB as i32,
SocketType::Request => zmq_sys_crate::ZMQ_REQ as i32,
SocketType::Reply => zmq_sys_crate::ZMQ_REP as i32,
SocketType::Dealer => zmq_sys_crate::ZMQ_DEALER as i32,
SocketType::Router => zmq_sys_crate::ZMQ_ROUTER as i32,
SocketType::Pull => zmq_sys_crate::ZMQ_PULL as i32,
SocketType::Push => zmq_sys_crate::ZMQ_PUSH as i32,
SocketType::XPublish => zmq_sys_crate::ZMQ_XPUB as i32,
SocketType::XSubscribe => zmq_sys_crate::ZMQ_XSUB as i32,
SocketType::Stream => zmq_sys_crate::ZMQ_STREAM as i32,
#[cfg(feature = "draft-api")]
SocketType::Server => zmq_sys_crate::ZMQ_SERVER as i32,
#[cfg(feature = "draft-api")]
SocketType::Client => zmq_sys_crate::ZMQ_CLIENT as i32,
#[cfg(feature = "draft-api")]
SocketType::Radio => zmq_sys_crate::ZMQ_RADIO as i32,
#[cfg(feature = "draft-api")]
SocketType::Dish => zmq_sys_crate::ZMQ_DISH as i32,
#[cfg(feature = "draft-api")]
SocketType::Gather => zmq_sys_crate::ZMQ_GATHER as i32,
#[cfg(feature = "draft-api")]
SocketType::Scatter => zmq_sys_crate::ZMQ_SCATTER as i32,
#[cfg(feature = "draft-api")]
SocketType::Datagram => zmq_sys_crate::ZMQ_DGRAM as i32,
#[cfg(feature = "draft-api")]
SocketType::Peer => zmq_sys_crate::ZMQ_PEER as i32,
#[cfg(feature = "draft-api")]
SocketType::Channel => zmq_sys_crate::ZMQ_CHANNEL as i32,
}
}
}
#[cfg(test)]
mod socket_type_tests {
use rstest::*;
use super::SocketType;
use crate::zmq_sys_crate;
#[rstest]
#[case(SocketType::Pair, zmq_sys_crate::ZMQ_PAIR as i32)]
#[case(SocketType::Publish, zmq_sys_crate::ZMQ_PUB as i32)]
#[case(SocketType::Subscribe, zmq_sys_crate::ZMQ_SUB as i32)]
#[case(SocketType::Request, zmq_sys_crate::ZMQ_REQ as i32)]
#[case(SocketType::Reply, zmq_sys_crate::ZMQ_REP as i32)]
#[case(SocketType::Dealer, zmq_sys_crate::ZMQ_DEALER as i32)]
#[case(SocketType::Router, zmq_sys_crate::ZMQ_ROUTER as i32)]
#[case(SocketType::Pull, zmq_sys_crate::ZMQ_PULL as i32)]
#[case(SocketType::Push, zmq_sys_crate::ZMQ_PUSH as i32)]
#[case(SocketType::XPublish, zmq_sys_crate::ZMQ_XPUB as i32)]
#[case(SocketType::XSubscribe, zmq_sys_crate::ZMQ_XSUB as i32)]
#[case(SocketType::Stream, zmq_sys_crate::ZMQ_STREAM as i32)]
#[cfg_attr(feature = "draft-api", case(SocketType::Server, zmq_sys_crate::ZMQ_SERVER as i32))]
#[cfg_attr(feature = "draft-api", case(SocketType::Client, zmq_sys_crate::ZMQ_CLIENT as i32))]
#[cfg_attr(feature = "draft-api", case(SocketType::Radio, zmq_sys_crate::ZMQ_RADIO as i32))]
#[cfg_attr(feature = "draft-api", case(SocketType::Dish, zmq_sys_crate::ZMQ_DISH as i32))]
#[cfg_attr(feature = "draft-api", case(SocketType::Gather, zmq_sys_crate::ZMQ_GATHER as i32))]
#[cfg_attr(feature = "draft-api", case(SocketType::Scatter, zmq_sys_crate::ZMQ_SCATTER as i32))]
#[cfg_attr(feature = "draft-api", case(SocketType::Datagram, zmq_sys_crate::ZMQ_DGRAM as i32))]
#[cfg_attr(feature = "draft-api", case(SocketType::Peer, zmq_sys_crate::ZMQ_PEER as i32))]
#[cfg_attr(feature = "draft-api", case(SocketType::Channel, zmq_sys_crate::ZMQ_CHANNEL as i32))]
fn converts_to_raw(#[case] socket_type: SocketType, #[case] raw: i32) {
assert_eq!(<SocketType as Into<i32>>::into(socket_type), raw);
}
}
#[derive(Debug, PartialEq, Eq, Clone, Copy, Hash)]
#[non_exhaustive]
pub enum SocketOption {
Type,
Affinity,
FileDescriptor,
UseFd,
BindToDevice,
TypeOfService,
IPv6,
ConnectTimeout,
#[cfg(feature = "draft-api")]
HelloMessage,
HandshakeInterval,
LastEndpoint,
HeartbeatInterval,
HeartbeatTimeToLive,
HeartbeatTimeout,
ReconnectInterval,
ReconnectIntervalMax,
#[cfg(feature = "draft-api")]
HiccupMessage,
#[cfg(feature = "draft-api")]
ReconnectStop,
#[cfg(feature = "draft-api")]
DisconnectMessage,
Events,
Linger,
Immediate,
Conflate,
ReceiveMore,
Backlog,
MaxMessageSize,
ReceiveBuffer,
ReceiveTimeout,
ReceiveHighWatermark,
SendBuffer,
SendTimeout,
SendHighWatermark,
ThreadSafe,
#[cfg(feature = "draft-api")]
Metadata,
#[cfg(feature = "draft-api")]
InBatchSize,
#[cfg(feature = "draft-api")]
OutBatchSize,
#[cfg(feature = "draft-api")]
Priority,
#[cfg(feature = "draft-api")]
BusyPoll,
RoutingId,
RouterMandatory,
RouterHandover,
ProbeRouter,
ConnectRoutingId,
#[cfg(feature = "draft-api")]
RouterNotify,
RequestCorrelate,
RequestRelaxed,
Subscribe,
Unsubscribe,
InvertMatching,
#[cfg(feature = "draft-api")]
OnlyFirstSubscribe,
#[cfg(feature = "draft-api")]
XsubVerboseUnsubscribe,
#[cfg(feature = "draft-api")]
TopicsCount,
XpubVerbose,
XpubVerboser,
XpubNoDrop,
XpubManual,
XpubWelcomeMessage,
#[cfg(feature = "draft-api")]
XpubManualLastValue,
StreamNotify,
Rate,
RecoveryInterval,
MulticastHops,
MulticastMaxTransportDataUnitSize,
#[cfg(feature = "draft-api")]
MulticastLoop,
TcpKeepalive,
TcpKeepaliveCount,
TcpKeepaliveIdle,
TcpKeepaliveInterval,
TcpAcceptFilter,
MaxTcpRetransmitTimeout,
#[cfg(zmq_has = "ipc")]
IpcFilterProcessId,
#[cfg(zmq_has = "ipc")]
IpcFilterUserId,
#[cfg(zmq_has = "ipc")]
IpcFilterGroupId,
#[cfg(zmq_has = "vmci")]
VmciBufferSize,
#[cfg(zmq_has = "vmci")]
VmciBufferMinSize,
#[cfg(zmq_has = "vmci")]
VmciBufferMaxSize,
#[cfg(zmq_has = "vmci")]
VmciConntectTimeout,
#[cfg(zmq_has = "norm")]
NormMode,
#[cfg(zmq_has = "norm")]
NormUnicastNack,
#[cfg(zmq_has = "norm")]
NormBufferSize,
#[cfg(zmq_has = "norm")]
NormSegmentSize,
#[cfg(zmq_has = "norm")]
NormBlockSize,
#[cfg(zmq_has = "norm")]
NormNumnParity,
#[cfg(zmq_has = "norm")]
NormNumnAutoParity,
#[cfg(zmq_has = "norm")]
NormPush,
ZapDomain,
#[cfg(feature = "draft-api")]
ZapEnforceDomain,
SocksProxy,
#[cfg(feature = "draft-api")]
SocksUsername,
#[cfg(feature = "draft-api")]
SocksPassword,
Mechanism,
PlainServer,
PlainUsername,
PlainPassword,
#[cfg(zmq_has = "curve")]
CurvePublicKey,
#[cfg(zmq_has = "curve")]
CurveSecretKey,
#[cfg(zmq_has = "curve")]
CurveServer,
#[cfg(zmq_has = "curve")]
CurveServerKey,
#[cfg(zmq_has = "gssapi")]
GssApiServer,
#[cfg(zmq_has = "gssapi")]
GssApiPrincipal,
#[cfg(zmq_has = "gssapi")]
GssApiServicePrincipal,
#[cfg(zmq_has = "gssapi")]
GssApiPlainText,
#[cfg(zmq_has = "gssapi")]
GssApiPrincipalNametype,
#[cfg(zmq_has = "gssapi")]
GssApiServicePrincipalNametype,
}
impl From<SocketOption> for i32 {
fn from(value: SocketOption) -> Self {
match value {
SocketOption::Affinity => zmq_sys_crate::ZMQ_AFFINITY as i32,
SocketOption::RoutingId => zmq_sys_crate::ZMQ_ROUTING_ID as i32,
SocketOption::Subscribe => zmq_sys_crate::ZMQ_SUBSCRIBE as i32,
SocketOption::Unsubscribe => zmq_sys_crate::ZMQ_UNSUBSCRIBE as i32,
SocketOption::Rate => zmq_sys_crate::ZMQ_RATE as i32,
SocketOption::RecoveryInterval => zmq_sys_crate::ZMQ_RECOVERY_IVL as i32,
SocketOption::SendBuffer => zmq_sys_crate::ZMQ_SNDBUF as i32,
SocketOption::ReceiveBuffer => zmq_sys_crate::ZMQ_RCVBUF as i32,
SocketOption::ReceiveMore => zmq_sys_crate::ZMQ_RCVMORE as i32,
SocketOption::FileDescriptor => zmq_sys_crate::ZMQ_FD as i32,
SocketOption::Events => zmq_sys_crate::ZMQ_EVENTS as i32,
SocketOption::Type => zmq_sys_crate::ZMQ_TYPE as i32,
SocketOption::Linger => zmq_sys_crate::ZMQ_LINGER as i32,
SocketOption::ReconnectInterval => zmq_sys_crate::ZMQ_RECONNECT_IVL as i32,
SocketOption::Backlog => zmq_sys_crate::ZMQ_BACKLOG as i32,
SocketOption::ReconnectIntervalMax => zmq_sys_crate::ZMQ_RECONNECT_IVL_MAX as i32,
SocketOption::MaxMessageSize => zmq_sys_crate::ZMQ_MAXMSGSIZE as i32,
SocketOption::SendHighWatermark => zmq_sys_crate::ZMQ_SNDHWM as i32,
SocketOption::ReceiveHighWatermark => zmq_sys_crate::ZMQ_RCVHWM as i32,
SocketOption::MulticastHops => zmq_sys_crate::ZMQ_MULTICAST_HOPS as i32,
SocketOption::ReceiveTimeout => zmq_sys_crate::ZMQ_RCVTIMEO as i32,
SocketOption::SendTimeout => zmq_sys_crate::ZMQ_SNDTIMEO as i32,
SocketOption::LastEndpoint => zmq_sys_crate::ZMQ_LAST_ENDPOINT as i32,
SocketOption::RouterMandatory => zmq_sys_crate::ZMQ_ROUTER_MANDATORY as i32,
SocketOption::TcpKeepalive => zmq_sys_crate::ZMQ_TCP_KEEPALIVE as i32,
SocketOption::TcpKeepaliveCount => zmq_sys_crate::ZMQ_TCP_KEEPALIVE_CNT as i32,
SocketOption::TcpKeepaliveIdle => zmq_sys_crate::ZMQ_TCP_KEEPALIVE_IDLE as i32,
SocketOption::TcpKeepaliveInterval => zmq_sys_crate::ZMQ_TCP_KEEPALIVE_INTVL as i32,
SocketOption::TcpAcceptFilter => zmq_sys_crate::ZMQ_TCP_ACCEPT_FILTER as i32,
SocketOption::Immediate => zmq_sys_crate::ZMQ_IMMEDIATE as i32,
SocketOption::XpubVerbose => zmq_sys_crate::ZMQ_XPUB_VERBOSE as i32,
SocketOption::IPv6 => zmq_sys_crate::ZMQ_IPV6 as i32,
SocketOption::Mechanism => zmq_sys_crate::ZMQ_MECHANISM as i32,
SocketOption::PlainServer => zmq_sys_crate::ZMQ_PLAIN_SERVER as i32,
SocketOption::PlainUsername => zmq_sys_crate::ZMQ_PLAIN_USERNAME as i32,
SocketOption::PlainPassword => zmq_sys_crate::ZMQ_PLAIN_PASSWORD as i32,
#[cfg(zmq_has = "curve")]
SocketOption::CurvePublicKey => zmq_sys_crate::ZMQ_CURVE_PUBLICKEY as i32,
#[cfg(zmq_has = "curve")]
SocketOption::CurveSecretKey => zmq_sys_crate::ZMQ_CURVE_SECRETKEY as i32,
#[cfg(zmq_has = "curve")]
SocketOption::CurveServer => zmq_sys_crate::ZMQ_CURVE_SERVER as i32,
#[cfg(zmq_has = "curve")]
SocketOption::CurveServerKey => zmq_sys_crate::ZMQ_CURVE_SERVERKEY as i32,
SocketOption::ProbeRouter => zmq_sys_crate::ZMQ_PROBE_ROUTER as i32,
SocketOption::RequestCorrelate => zmq_sys_crate::ZMQ_REQ_CORRELATE as i32,
SocketOption::RequestRelaxed => zmq_sys_crate::ZMQ_REQ_RELAXED as i32,
SocketOption::Conflate => zmq_sys_crate::ZMQ_CONFLATE as i32,
SocketOption::ZapDomain => zmq_sys_crate::ZMQ_ZAP_DOMAIN as i32,
SocketOption::RouterHandover => zmq_sys_crate::ZMQ_ROUTER_HANDOVER as i32,
SocketOption::TypeOfService => zmq_sys_crate::ZMQ_TOS as i32,
SocketOption::IpcFilterProcessId => zmq_sys_crate::ZMQ_IPC_FILTER_PID as i32,
SocketOption::IpcFilterUserId => zmq_sys_crate::ZMQ_IPC_FILTER_UID as i32,
SocketOption::IpcFilterGroupId => zmq_sys_crate::ZMQ_IPC_FILTER_GID as i32,
SocketOption::ConnectRoutingId => zmq_sys_crate::ZMQ_CONNECT_ROUTING_ID as i32,
#[cfg(zmq_has = "gssapi")]
SocketOption::GssApiServer => zmq_sys_crate::ZMQ_GSSAPI_SERVER as i32,
#[cfg(zmq_has = "gssapi")]
SocketOption::GssApiPrincipal => zmq_sys_crate::ZMQ_GSSAPI_PRINCIPAL as i32,
#[cfg(zmq_has = "gssapi")]
SocketOption::GssApiServicePrincipal => {
zmq_sys_crate::ZMQ_GSSAPI_SERVICE_PRINCIPAL as i32
}
#[cfg(zmq_has = "gssapi")]
SocketOption::GssApiPlainText => zmq_sys_crate::ZMQ_GSSAPI_PLAINTEXT as i32,
SocketOption::HandshakeInterval => zmq_sys_crate::ZMQ_HANDSHAKE_IVL as i32,
SocketOption::SocksProxy => zmq_sys_crate::ZMQ_SOCKS_PROXY as i32,
SocketOption::XpubNoDrop => zmq_sys_crate::ZMQ_XPUB_NODROP as i32,
SocketOption::XpubManual => zmq_sys_crate::ZMQ_XPUB_MANUAL as i32,
SocketOption::XpubWelcomeMessage => zmq_sys_crate::ZMQ_XPUB_WELCOME_MSG as i32,
SocketOption::StreamNotify => zmq_sys_crate::ZMQ_STREAM_NOTIFY as i32,
SocketOption::InvertMatching => zmq_sys_crate::ZMQ_INVERT_MATCHING as i32,
SocketOption::HeartbeatInterval => zmq_sys_crate::ZMQ_HEARTBEAT_IVL as i32,
SocketOption::HeartbeatTimeToLive => zmq_sys_crate::ZMQ_HEARTBEAT_TTL as i32,
SocketOption::HeartbeatTimeout => zmq_sys_crate::ZMQ_HEARTBEAT_TIMEOUT as i32,
SocketOption::XpubVerboser => zmq_sys_crate::ZMQ_XPUB_VERBOSER as i32,
SocketOption::ConnectTimeout => zmq_sys_crate::ZMQ_CONNECT_TIMEOUT as i32,
SocketOption::MaxTcpRetransmitTimeout => zmq_sys_crate::ZMQ_TCP_MAXRT as i32,
SocketOption::MulticastMaxTransportDataUnitSize => {
zmq_sys_crate::ZMQ_MULTICAST_MAXTPDU as i32
}
SocketOption::ThreadSafe => zmq_sys_crate::ZMQ_THREAD_SAFE as i32,
#[cfg(zmq_has = "vmci")]
SocketOption::VmciBufferSize => zmq_sys_crate::ZMQ_VMCI_BUFFER_SIZE as i32,
#[cfg(zmq_has = "vmci")]
SocketOption::VmciBufferMinSize => zmq_sys_crate::ZMQ_VMCI_BUFFER_MIN_SIZE as i32,
#[cfg(zmq_has = "vmci")]
SocketOption::VmciBufferMaxSize => zmq_sys_crate::ZMQ_VMCI_BUFFER_MAX_SIZE as i32,
#[cfg(zmq_has = "vmci")]
SocketOption::VmciConntectTimeout => zmq_sys_crate::ZMQ_VMCI_CONNECT_TIMEOUT as i32,
SocketOption::UseFd => zmq_sys_crate::ZMQ_USE_FD as i32,
#[cfg(zmq_has = "gssapi")]
SocketOption::GssApiPrincipalNametype => {
zmq_sys_crate::ZMQ_GSSAPI_PRINCIPAL_NAMETYPE as i32
}
#[cfg(zmq_has = "gssapi")]
SocketOption::GssApiServicePrincipalNametype => {
zmq_sys_crate::ZMQ_GSSAPI_SERVICE_PRINCIPAL_NAMETYPE as i32
}
SocketOption::BindToDevice => zmq_sys_crate::ZMQ_BINDTODEVICE as i32,
#[cfg(feature = "draft-api")]
SocketOption::ZapEnforceDomain => zmq_sys_crate::ZMQ_ZAP_ENFORCE_DOMAIN as i32,
#[cfg(feature = "draft-api")]
SocketOption::Metadata => zmq_sys_crate::ZMQ_METADATA as i32,
#[cfg(feature = "draft-api")]
SocketOption::MulticastLoop => zmq_sys_crate::ZMQ_MULTICAST_LOOP as i32,
#[cfg(feature = "draft-api")]
SocketOption::RouterNotify => zmq_sys_crate::ZMQ_ROUTER_NOTIFY as i32,
#[cfg(feature = "draft-api")]
SocketOption::XpubManualLastValue => zmq_sys_crate::ZMQ_XPUB_MANUAL_LAST_VALUE as i32,
#[cfg(feature = "draft-api")]
SocketOption::SocksUsername => zmq_sys_crate::ZMQ_SOCKS_USERNAME as i32,
#[cfg(feature = "draft-api")]
SocketOption::SocksPassword => zmq_sys_crate::ZMQ_SOCKS_PASSWORD as i32,
#[cfg(feature = "draft-api")]
SocketOption::InBatchSize => zmq_sys_crate::ZMQ_IN_BATCH_SIZE as i32,
#[cfg(feature = "draft-api")]
SocketOption::OutBatchSize => zmq_sys_crate::ZMQ_OUT_BATCH_SIZE as i32,
#[cfg(feature = "draft-api")]
SocketOption::OnlyFirstSubscribe => zmq_sys_crate::ZMQ_ONLY_FIRST_SUBSCRIBE as i32,
#[cfg(feature = "draft-api")]
SocketOption::ReconnectStop => zmq_sys_crate::ZMQ_RECONNECT_STOP as i32,
#[cfg(feature = "draft-api")]
SocketOption::HelloMessage => zmq_sys_crate::ZMQ_HELLO_MSG as i32,
#[cfg(feature = "draft-api")]
SocketOption::DisconnectMessage => zmq_sys_crate::ZMQ_DISCONNECT_MSG as i32,
#[cfg(feature = "draft-api")]
SocketOption::Priority => zmq_sys_crate::ZMQ_PRIORITY as i32,
#[cfg(feature = "draft-api")]
SocketOption::BusyPoll => zmq_sys_crate::ZMQ_BUSY_POLL as i32,
#[cfg(feature = "draft-api")]
SocketOption::HiccupMessage => zmq_sys_crate::ZMQ_HICCUP_MSG as i32,
#[cfg(feature = "draft-api")]
SocketOption::XsubVerboseUnsubscribe => {
zmq_sys_crate::ZMQ_XSUB_VERBOSE_UNSUBSCRIBE as i32
}
#[cfg(feature = "draft-api")]
SocketOption::TopicsCount => zmq_sys_crate::ZMQ_TOPICS_COUNT as i32,
#[cfg(zmq_has = "norm")]
SocketOption::NormMode => zmq_sys_crate::ZMQ_NORM_MODE as i32,
#[cfg(zmq_has = "norm")]
SocketOption::NormUnicastNack => zmq_sys_crate::ZMQ_NORM_UNICAST_NACK as i32,
#[cfg(zmq_has = "norm")]
SocketOption::NormBufferSize => zmq_sys_crate::ZMQ_NORM_BUFFER_SIZE as i32,
#[cfg(zmq_has = "norm")]
SocketOption::NormSegmentSize => zmq_sys_crate::ZMQ_NORM_SEGMENT_SIZE as i32,
#[cfg(zmq_has = "norm")]
SocketOption::NormBlockSize => zmq_sys_crate::ZMQ_NORM_BLOCK_SIZE as i32,
#[cfg(zmq_has = "norm")]
SocketOption::NormNumnParity => zmq_sys_crate::ZMQ_NORM_NUM_PARITY as i32,
#[cfg(zmq_has = "norm")]
SocketOption::NormNumnAutoParity => zmq_sys_crate::ZMQ_NORM_NUM_AUTOPARITY as i32,
#[cfg(zmq_has = "norm")]
SocketOption::NormPush => zmq_sys_crate::ZMQ_NORM_PUSH as i32,
}
}
}
#[cfg(test)]
mod socket_option_tests {
use rstest::*;
use super::SocketOption;
use crate::zmq_sys_crate;
#[rstest]
#[case(SocketOption::Affinity, zmq_sys_crate::ZMQ_AFFINITY as i32)]
#[case(SocketOption::RoutingId, zmq_sys_crate::ZMQ_ROUTING_ID as i32)]
#[case(SocketOption::Subscribe, zmq_sys_crate::ZMQ_SUBSCRIBE as i32)]
#[case(SocketOption::Unsubscribe, zmq_sys_crate::ZMQ_UNSUBSCRIBE as i32)]
#[case(SocketOption::Rate, zmq_sys_crate::ZMQ_RATE as i32)]
#[case(SocketOption::RecoveryInterval, zmq_sys_crate::ZMQ_RECOVERY_IVL as i32)]
#[case(SocketOption::SendBuffer, zmq_sys_crate::ZMQ_SNDBUF as i32)]
#[case(SocketOption::ReceiveBuffer, zmq_sys_crate::ZMQ_RCVBUF as i32)]
#[case(SocketOption::ReceiveMore, zmq_sys_crate::ZMQ_RCVMORE as i32)]
#[case(SocketOption::FileDescriptor, zmq_sys_crate::ZMQ_FD as i32)]
#[case(SocketOption::Events, zmq_sys_crate::ZMQ_EVENTS as i32)]
#[case(SocketOption::Type, zmq_sys_crate::ZMQ_TYPE as i32)]
#[case(SocketOption::Linger, zmq_sys_crate::ZMQ_LINGER as i32)]
#[case(SocketOption::ReconnectInterval, zmq_sys_crate::ZMQ_RECONNECT_IVL as i32)]
#[case(SocketOption::Backlog, zmq_sys_crate::ZMQ_BACKLOG as i32)]
#[case(SocketOption::ReconnectIntervalMax, zmq_sys_crate::ZMQ_RECONNECT_IVL_MAX as i32)]
#[case(SocketOption::MaxMessageSize, zmq_sys_crate::ZMQ_MAXMSGSIZE as i32)]
#[case(SocketOption::SendHighWatermark, zmq_sys_crate::ZMQ_SNDHWM as i32)]
#[case(SocketOption::ReceiveHighWatermark, zmq_sys_crate::ZMQ_RCVHWM as i32)]
#[case(SocketOption::MulticastHops, zmq_sys_crate::ZMQ_MULTICAST_HOPS as i32)]
#[case(SocketOption::ReceiveTimeout, zmq_sys_crate::ZMQ_RCVTIMEO as i32)]
#[case(SocketOption::SendTimeout, zmq_sys_crate::ZMQ_SNDTIMEO as i32)]
#[case(SocketOption::LastEndpoint, zmq_sys_crate::ZMQ_LAST_ENDPOINT as i32)]
#[case(SocketOption::RouterMandatory, zmq_sys_crate::ZMQ_ROUTER_MANDATORY as i32)]
#[case(SocketOption::TcpKeepalive, zmq_sys_crate::ZMQ_TCP_KEEPALIVE as i32)]
#[case(SocketOption::TcpKeepaliveCount, zmq_sys_crate::ZMQ_TCP_KEEPALIVE_CNT as i32)]
#[case(SocketOption::TcpKeepaliveIdle, zmq_sys_crate::ZMQ_TCP_KEEPALIVE_IDLE as i32)]
#[case(SocketOption::TcpKeepaliveInterval, zmq_sys_crate::ZMQ_TCP_KEEPALIVE_INTVL as i32)]
#[case(SocketOption::TcpAcceptFilter, zmq_sys_crate::ZMQ_TCP_ACCEPT_FILTER as i32)]
#[case(SocketOption::Immediate, zmq_sys_crate::ZMQ_IMMEDIATE as i32)]
#[case(SocketOption::XpubVerbose, zmq_sys_crate::ZMQ_XPUB_VERBOSE as i32)]
#[case(SocketOption::IPv6, zmq_sys_crate::ZMQ_IPV6 as i32)]
#[case(SocketOption::Mechanism, zmq_sys_crate::ZMQ_MECHANISM as i32)]
#[case(SocketOption::PlainServer, zmq_sys_crate::ZMQ_PLAIN_SERVER as i32)]
#[case(SocketOption::PlainUsername, zmq_sys_crate::ZMQ_PLAIN_USERNAME as i32)]
#[case(SocketOption::PlainPassword, zmq_sys_crate::ZMQ_PLAIN_PASSWORD as i32)]
#[cfg_attr(zmq_has = "curve", case(SocketOption::CurvePublicKey, zmq_sys_crate::ZMQ_CURVE_PUBLICKEY as i32))]
#[cfg_attr(zmq_has = "curve", case(SocketOption::CurveSecretKey, zmq_sys_crate::ZMQ_CURVE_SECRETKEY as i32))]
#[cfg_attr(zmq_has = "curve", case(SocketOption::CurveServer, zmq_sys_crate::ZMQ_CURVE_SERVER as i32))]
#[cfg_attr(zmq_has = "curve", case(SocketOption::CurveServerKey, zmq_sys_crate::ZMQ_CURVE_SERVERKEY as i32))]
#[case(SocketOption::ProbeRouter, zmq_sys_crate::ZMQ_PROBE_ROUTER as i32)]
#[case(SocketOption::RequestCorrelate, zmq_sys_crate::ZMQ_REQ_CORRELATE as i32)]
#[case(SocketOption::RequestRelaxed, zmq_sys_crate::ZMQ_REQ_RELAXED as i32)]
#[case(SocketOption::Conflate, zmq_sys_crate::ZMQ_CONFLATE as i32)]
#[case(SocketOption::ZapDomain, zmq_sys_crate::ZMQ_ZAP_DOMAIN as i32)]
#[case(SocketOption::RouterHandover, zmq_sys_crate::ZMQ_ROUTER_HANDOVER as i32)]
#[case(SocketOption::TypeOfService, zmq_sys_crate::ZMQ_TOS as i32)]
#[case(SocketOption::IpcFilterProcessId, zmq_sys_crate::ZMQ_IPC_FILTER_PID as i32)]
#[case(SocketOption::IpcFilterUserId, zmq_sys_crate::ZMQ_IPC_FILTER_UID as i32)]
#[case(SocketOption::IpcFilterGroupId, zmq_sys_crate::ZMQ_IPC_FILTER_GID as i32)]
#[case(SocketOption::ConnectRoutingId, zmq_sys_crate::ZMQ_CONNECT_ROUTING_ID as i32)]
#[cfg_attr(zmq_has = "gssapi", case(SocketOption::GssApiServer, zmq_sys_crate::ZMQ_GSSAPI_SERVER as i32))]
#[cfg_attr(zmq_has = "gssapi", case(SocketOption::GssApiPrincipal, zmq_sys_crate::ZMQ_GSSAPI_PRINCIPAL as i32))]
#[cfg_attr(zmq_has = "gssapi", case(SocketOption::GssApiServicePrincipal, zmq_sys_crate::ZMQ_GSSAPI_SERVICE_PRINCIPAL as i32))]
#[cfg_attr(zmq_has = "gssapi", case(SocketOption::GssApiPlainText, zmq_sys_crate::ZMQ_GSSAPI_PLAINTEXT as i32))]
#[case(SocketOption::HandshakeInterval, zmq_sys_crate::ZMQ_HANDSHAKE_IVL as i32)]
#[case(SocketOption::SocksProxy, zmq_sys_crate::ZMQ_SOCKS_PROXY as i32)]
#[case(SocketOption::XpubNoDrop, zmq_sys_crate::ZMQ_XPUB_NODROP as i32)]
#[case(SocketOption::XpubManual, zmq_sys_crate::ZMQ_XPUB_MANUAL as i32)]
#[case(SocketOption::XpubWelcomeMessage, zmq_sys_crate::ZMQ_XPUB_WELCOME_MSG as i32)]
#[case(SocketOption::StreamNotify, zmq_sys_crate::ZMQ_STREAM_NOTIFY as i32)]
#[case(SocketOption::InvertMatching, zmq_sys_crate::ZMQ_INVERT_MATCHING as i32)]
#[case(SocketOption::HeartbeatInterval, zmq_sys_crate::ZMQ_HEARTBEAT_IVL as i32)]
#[case(SocketOption::HeartbeatTimeToLive, zmq_sys_crate::ZMQ_HEARTBEAT_TTL as i32)]
#[case(SocketOption::HeartbeatTimeout, zmq_sys_crate::ZMQ_HEARTBEAT_TIMEOUT as i32)]
#[case(SocketOption::XpubVerboser, zmq_sys_crate::ZMQ_XPUB_VERBOSER as i32)]
#[case(SocketOption::ConnectTimeout, zmq_sys_crate::ZMQ_CONNECT_TIMEOUT as i32)]
#[case(SocketOption::MaxTcpRetransmitTimeout, zmq_sys_crate::ZMQ_TCP_MAXRT as i32)]
#[case(SocketOption::MulticastMaxTransportDataUnitSize, zmq_sys_crate::ZMQ_MULTICAST_MAXTPDU as i32)]
#[case(SocketOption::ThreadSafe, zmq_sys_crate::ZMQ_THREAD_SAFE as i32)]
#[cfg_attr(zmq_has = "vmci", case(SocketOption::VmciBufferSize, zmq_sys_crate::ZMQ_VMCI_BUFFER_SIZE as i32))]
#[cfg_attr(zmq_has = "vmci", case(SocketOption::VmciBufferMinSize, zmq_sys_crate::ZMQ_VMCI_BUFFER_MIN_SIZE as i32))]
#[cfg_attr(zmq_has = "vmci", case(SocketOption::VmciBufferMaxSize, zmq_sys_crate::ZMQ_VMCI_BUFFER_MAX_SIZE as i32))]
#[cfg_attr(zmq_has = "vmci", case(SocketOption::VmciConntectTimeout, zmq_sys_crate::ZMQ_VMCI_CONNECT_TIMEOUT as i32))]
#[case(SocketOption::UseFd, zmq_sys_crate::ZMQ_USE_FD as i32)]
#[cfg_attr(zmq_has = "gssapi", case(SocketOption::GssApiPrincipalNametype, zmq_sys_crate::ZMQ_GSSAPI_PRINCIPAL_NAMETYPE as i32))]
#[cfg_attr(zmq_has = "gssapi", case(SocketOption::GssApiServicePrincipalNametype, zmq_sys_crate::ZMQ_GSSAPI_SERVICE_PRINCIPAL_NAMETYPE as i32))]
#[case(SocketOption::BindToDevice, zmq_sys_crate::ZMQ_BINDTODEVICE as i32)]
#[cfg_attr(feature = "draft-api", case(SocketOption::ZapEnforceDomain, zmq_sys_crate::ZMQ_ZAP_ENFORCE_DOMAIN as i32))]
#[cfg_attr(feature = "draft-api", case(SocketOption::Metadata, zmq_sys_crate::ZMQ_METADATA as i32))]
#[cfg_attr(feature = "draft-api", case(SocketOption::MulticastLoop, zmq_sys_crate::ZMQ_MULTICAST_LOOP as i32))]
#[cfg_attr(feature = "draft-api", case(SocketOption::RouterNotify, zmq_sys_crate::ZMQ_ROUTER_NOTIFY as i32))]
#[cfg_attr(feature = "draft-api", case(SocketOption::XpubManualLastValue, zmq_sys_crate::ZMQ_XPUB_MANUAL_LAST_VALUE as i32))]
#[cfg_attr(feature = "draft-api", case(SocketOption::SocksUsername, zmq_sys_crate::ZMQ_SOCKS_USERNAME as i32))]
#[cfg_attr(feature = "draft-api", case(SocketOption::SocksPassword, zmq_sys_crate::ZMQ_SOCKS_PASSWORD as i32))]
#[cfg_attr(feature = "draft-api", case(SocketOption::InBatchSize, zmq_sys_crate::ZMQ_IN_BATCH_SIZE as i32))]
#[cfg_attr(feature = "draft-api", case(SocketOption::OutBatchSize, zmq_sys_crate::ZMQ_OUT_BATCH_SIZE as i32))]
#[cfg_attr(feature = "draft-api", case(SocketOption::OnlyFirstSubscribe, zmq_sys_crate::ZMQ_ONLY_FIRST_SUBSCRIBE as i32))]
#[cfg_attr(feature = "draft-api", case(SocketOption::ReconnectStop, zmq_sys_crate::ZMQ_RECONNECT_STOP as i32))]
#[cfg_attr(feature = "draft-api", case(SocketOption::HelloMessage, zmq_sys_crate::ZMQ_HELLO_MSG as i32))]
#[cfg_attr(feature = "draft-api", case(SocketOption::DisconnectMessage, zmq_sys_crate::ZMQ_DISCONNECT_MSG as i32))]
#[cfg_attr(feature = "draft-api", case(SocketOption::Priority, zmq_sys_crate::ZMQ_PRIORITY as i32))]
#[cfg_attr(feature = "draft-api", case(SocketOption::BusyPoll, zmq_sys_crate::ZMQ_BUSY_POLL as i32))]
#[cfg_attr(feature = "draft-api", case(SocketOption::HiccupMessage, zmq_sys_crate::ZMQ_HICCUP_MSG as i32))]
#[cfg_attr(feature = "draft-api", case(SocketOption::XsubVerboseUnsubscribe, zmq_sys_crate::ZMQ_XSUB_VERBOSE_UNSUBSCRIBE as i32))]
#[cfg_attr(feature = "draft-api", case(SocketOption::TopicsCount, zmq_sys_crate::ZMQ_TOPICS_COUNT as i32))]
#[cfg_attr(zmq_has = "norm", case(SocketOption::NormMode, zmq_sys_crate::ZMQ_NORM_MODE as i32))]
#[cfg_attr(zmq_has = "norm", case(SocketOption::NormUnicastNack, zmq_sys_crate::ZMQ_NORM_UNICAST_NACK as i32))]
#[cfg_attr(zmq_has = "norm", case(SocketOption::NormBufferSize, zmq_sys_crate::ZMQ_NORM_BUFFER_SIZE as i32))]
#[cfg_attr(zmq_has = "norm", case(SocketOption::NormSegmentSize, zmq_sys_crate::ZMQ_NORM_SEGMENT_SIZE as i32))]
#[cfg_attr(zmq_has = "norm", case(SocketOption::NormBlockSize, zmq_sys_crate::ZMQ_NORM_BLOCK_SIZE as i32))]
#[cfg_attr(zmq_has = "norm", case(SocketOption::NormNumnParity, zmq_sys_crate::ZMQ_NORM_NUM_PARITY as i32))]
#[cfg_attr(zmq_has = "norm", case(SocketOption::NormNumnAutoParity, zmq_sys_crate::ZMQ_NORM_NUM_AUTOPARITY as i32))]
#[cfg_attr(zmq_has = "norm", case(SocketOption::NormPush, zmq_sys_crate::ZMQ_NORM_PUSH as i32))]
fn converts_to_raw(#[case] option: SocketOption, #[case] expected: i32) {
assert_eq!(<SocketOption as Into<i32>>::into(option), expected);
}
}
pub struct Socket<T: sealed::SocketType> {
context: Context,
pub(crate) socket: Arc<RawSocket>,
marker: PhantomData<T>,
}
impl<T: sealed::SocketType> Socket<T> {
pub fn from_context(context: &Context) -> ZmqResult<Self> {
let socket = RawSocket::from_ctx(&context.inner, T::raw_socket_type() as i32)?;
Ok(Self {
context: context.clone(),
socket: socket.into(),
marker: PhantomData,
})
}
pub fn set_sockopt_bytes<V>(&self, option: SocketOption, value: V) -> ZmqResult<()>
where
V: AsRef<[u8]>,
{
self.socket.set_sockopt_bytes(option.into(), value.as_ref())
}
pub fn set_sockopt_string<V>(&self, option: SocketOption, value: V) -> ZmqResult<()>
where
V: AsRef<str>,
{
self.socket
.set_sockopt_string(option.into(), value.as_ref())
}
pub fn set_sockopt_int<V>(&self, option: SocketOption, value: V) -> ZmqResult<()>
where
V: PrimInt,
{
self.socket.set_sockopt_int(option.into(), value)
}
pub fn set_sockopt_bool(&self, option: SocketOption, value: bool) -> ZmqResult<()> {
self.socket.set_sockopt_bool(option.into(), value)
}
#[cfg(zmq_has = "curve")]
pub(crate) fn get_sockopt_curve(&self, option: SocketOption) -> ZmqResult<Vec<u8>> {
self.socket.get_sockopt_curve(option.into())
}
pub fn get_sockopt_bytes(&self, option: SocketOption) -> ZmqResult<Vec<u8>> {
self.socket.get_sockopt_bytes(option.into())
}
pub fn get_sockopt_string(&self, option: SocketOption) -> ZmqResult<String> {
self.socket.get_sockopt_string(option.into())
}
pub fn get_sockopt_int<V>(&self, option: SocketOption) -> ZmqResult<V>
where
V: PrimInt + Default,
{
self.socket.get_sockopt_int(option.into())
}
pub fn get_sockopt_bool(&self, option: SocketOption) -> ZmqResult<bool> {
self.socket.get_sockopt_bool(option.into())
}
pub fn set_affinity(&self, value: u64) -> ZmqResult<()> {
self.set_sockopt_int(SocketOption::Affinity, value)
}
pub fn affinity(&self) -> ZmqResult<u64> {
self.get_sockopt_int(SocketOption::Affinity)
}
pub fn set_backlog(&self, value: i32) -> ZmqResult<()> {
self.set_sockopt_int(SocketOption::Backlog, value)
}
pub fn backlog(&self) -> ZmqResult<i32> {
self.get_sockopt_int(SocketOption::Backlog)
}
#[cfg(feature = "draft-api")]
pub fn set_busy_poll(&self, value: bool) -> ZmqResult<()> {
self.set_sockopt_bool(SocketOption::BusyPoll, value)
}
pub fn set_connect_timeout(&self, value: i32) -> ZmqResult<()> {
self.set_sockopt_int(SocketOption::ConnectTimeout, value)
}
pub fn connect_timeout(&self) -> ZmqResult<i32> {
self.get_sockopt_int(SocketOption::ConnectTimeout)
}
pub fn events(&self) -> ZmqResult<PollEvents> {
self.get_sockopt_int::<i32>(SocketOption::Events)?
.try_into()
.map(PollEvents::from_bits_truncate)
.map_err(|_err| ZmqError::InvalidArgument)
}
#[cfg(zmq_has = "gssapi")]
pub fn set_gssapi_plaintext(&self, value: bool) -> ZmqResult<()> {
self.set_sockopt_bool(SocketOption::GssApiPlainText, value)
}
#[cfg(zmq_has = "gssapi")]
pub fn gssapi_plaintext(&self) -> ZmqResult<bool> {
self.get_sockopt_bool(SocketOption::GssApiPlainText)
}
#[cfg(zmq_has = "gssapi")]
pub fn set_gssapi_service_principal_nametype(&self, value: GssApiNametype) -> ZmqResult<()> {
self.set_sockopt_int(SocketOption::GssApiServicePrincipalNametype, value as i32)
}
#[cfg(zmq_has = "gssapi")]
pub fn gssapi_service_principal_nametype(&self) -> ZmqResult<GssApiNametype> {
self.get_sockopt_int::<i32>(SocketOption::GssApiServicePrincipalNametype)
.and_then(GssApiNametype::try_from)
}
#[cfg(zmq_has = "gssapi")]
pub fn set_gssapi_principal<V>(&self, value: V) -> ZmqResult<()>
where
V: AsRef<str>,
{
self.set_sockopt_string(SocketOption::GssApiPrincipal, value.as_ref())
}
#[cfg(zmq_has = "gssapi")]
pub fn gssapi_principal(&self) -> ZmqResult<String> {
self.socket
.get_sockopt_gssapi(SocketOption::GssApiPrincipal.into())
}
#[cfg(zmq_has = "gssapi")]
pub fn set_gssapi_principal_nametype(&self, value: GssApiNametype) -> ZmqResult<()> {
self.set_sockopt_int(SocketOption::GssApiPrincipalNametype, value as i32)
}
#[cfg(zmq_has = "gssapi")]
pub fn gssapi_principal_nametype(&self) -> ZmqResult<GssApiNametype> {
self.get_sockopt_int::<i32>(SocketOption::GssApiPrincipalNametype)
.and_then(GssApiNametype::try_from)
}
pub fn set_handshake_interval(&self, value: i32) -> ZmqResult<()> {
self.set_sockopt_int(SocketOption::HandshakeInterval, value)
}
pub fn handshake_interval(&self) -> ZmqResult<i32> {
self.get_sockopt_int(SocketOption::HandshakeInterval)
}
pub fn set_heartbeat_interval(&self, value: i32) -> ZmqResult<()> {
self.set_sockopt_int(SocketOption::HeartbeatInterval, value)
}
pub fn heartbeat_interval(&self) -> ZmqResult<i32> {
self.get_sockopt_int(SocketOption::HeartbeatInterval)
}
pub fn set_heartbeat_timeout(&self, value: i32) -> ZmqResult<()> {
self.set_sockopt_int(SocketOption::HeartbeatTimeout, value)
}
pub fn heartbeat_timeout(&self) -> ZmqResult<i32> {
self.get_sockopt_int(SocketOption::HeartbeatTimeout)
}
pub fn set_heartbeat_timetolive(&self, value: i32) -> ZmqResult<()> {
self.set_sockopt_int(SocketOption::HeartbeatTimeToLive, value)
}
pub fn heartbeat_timetolive(&self) -> ZmqResult<i32> {
self.get_sockopt_int(SocketOption::HeartbeatTimeToLive)
}
pub fn set_immediate(&self, value: bool) -> ZmqResult<()> {
self.set_sockopt_bool(SocketOption::Immediate, value)
}
pub fn immediate(&self) -> ZmqResult<bool> {
self.get_sockopt_bool(SocketOption::Immediate)
}
pub fn set_ipv6(&self, value: bool) -> ZmqResult<()> {
self.set_sockopt_bool(SocketOption::IPv6, value)
}
pub fn ipv6(&self) -> ZmqResult<bool> {
self.get_sockopt_bool(SocketOption::IPv6)
}
pub fn set_linger(&self, value: i32) -> ZmqResult<()> {
self.set_sockopt_int(SocketOption::Linger, value)
}
pub fn linger(&self) -> ZmqResult<i32> {
self.get_sockopt_int(SocketOption::Linger)
}
pub fn last_endpoint(&self) -> ZmqResult<String> {
self.get_sockopt_string(SocketOption::LastEndpoint)
}
pub fn set_max_message_size(&self, value: i64) -> ZmqResult<()> {
self.set_sockopt_int(SocketOption::MaxMessageSize, value)
}
pub fn max_message_size(&self) -> ZmqResult<i64> {
self.get_sockopt_int(SocketOption::MaxMessageSize)
}
pub fn set_security_mechanism(&self, security: &SecurityMechanism) -> ZmqResult<()> {
security.apply(self)
}
pub fn security_mechanism(&self) -> ZmqResult<SecurityMechanism> {
SecurityMechanism::try_from(self)
}
pub fn set_multicast_hops(&self, value: i32) -> ZmqResult<()> {
self.set_sockopt_int(SocketOption::MulticastHops, value)
}
pub fn multicast_hops(&self) -> ZmqResult<i32> {
self.get_sockopt_int(SocketOption::MulticastHops)
}
pub fn set_rate(&self, value: i32) -> ZmqResult<()> {
self.set_sockopt_int(SocketOption::Rate, value)
}
pub fn rate(&self) -> ZmqResult<i32> {
self.get_sockopt_int(SocketOption::Rate)
}
pub fn set_receive_buffer(&self, value: i32) -> ZmqResult<()> {
self.set_sockopt_int(SocketOption::ReceiveBuffer, value)
}
pub fn receive_buffer(&self) -> ZmqResult<i32> {
self.get_sockopt_int(SocketOption::ReceiveBuffer)
}
pub fn set_receive_highwater_mark(&self, value: i32) -> ZmqResult<()> {
self.set_sockopt_int(SocketOption::ReceiveHighWatermark, value)
}
pub fn receive_highwater_mark(&self) -> ZmqResult<i32> {
self.get_sockopt_int(SocketOption::ReceiveHighWatermark)
}
pub fn set_receive_timeout(&self, value: i32) -> ZmqResult<()> {
self.set_sockopt_int(SocketOption::ReceiveTimeout, value)
}
pub fn receive_timeout(&self) -> ZmqResult<i32> {
self.get_sockopt_int(SocketOption::ReceiveTimeout)
}
pub fn set_reconnect_interval(&self, value: i32) -> ZmqResult<()> {
self.set_sockopt_int(SocketOption::ReconnectInterval, value)
}
pub fn reconnect_interval(&self) -> ZmqResult<i32> {
self.get_sockopt_int(SocketOption::ReconnectInterval)
}
pub fn set_reconnect_interval_max(&self, value: i32) -> ZmqResult<()> {
self.set_sockopt_int(SocketOption::ReconnectIntervalMax, value)
}
pub fn reconnect_interval_max(&self) -> ZmqResult<i32> {
self.get_sockopt_int(SocketOption::ReconnectIntervalMax)
}
#[cfg(feature = "draft-api")]
pub fn set_reconnect_stop(&self, value: ReconnectStop) -> ZmqResult<()> {
self.set_sockopt_int(SocketOption::ReconnectStop, value.bits())
}
#[cfg(feature = "draft-api")]
pub fn reconnect_stop(&self) -> ZmqResult<ReconnectStop> {
self.get_sockopt_int(SocketOption::ReconnectStop)
.map(ReconnectStop::from_bits_truncate)
}
pub fn set_recovery_interval(&self, value: i32) -> ZmqResult<()> {
self.set_sockopt_int(SocketOption::RecoveryInterval, value)
}
pub fn recovery_interval(&self) -> ZmqResult<i32> {
self.get_sockopt_int(SocketOption::RecoveryInterval)
}
pub fn set_send_buffer(&self, value: i32) -> ZmqResult<()> {
self.set_sockopt_int(SocketOption::SendBuffer, value)
}
pub fn send_buffer(&self) -> ZmqResult<i32> {
self.get_sockopt_int(SocketOption::SendBuffer)
}
pub fn set_send_highwater_mark(&self, value: i32) -> ZmqResult<()> {
self.set_sockopt_int(SocketOption::SendHighWatermark, value)
}
pub fn send_highwater_mark(&self) -> ZmqResult<i32> {
self.get_sockopt_int(SocketOption::SendHighWatermark)
}
pub fn set_send_timeout(&self, value: i32) -> ZmqResult<()> {
self.set_sockopt_int(SocketOption::SendTimeout, value)
}
pub fn send_timeout(&self) -> ZmqResult<i32> {
self.get_sockopt_int(SocketOption::SendTimeout)
}
#[cfg(feature = "draft-api")]
pub fn set_socks_proxy<V>(&self, value: Option<V>) -> ZmqResult<()>
where
V: AsRef<str>,
{
match value {
None => self.set_sockopt_string(SocketOption::SocksUsername, ""),
Some(ref_value) => self.set_sockopt_string(SocketOption::SocksProxy, ref_value),
}
}
#[cfg(feature = "draft-api")]
pub fn socks_proxy(&self) -> ZmqResult<String> {
self.get_sockopt_string(SocketOption::SocksProxy)
}
#[cfg(feature = "draft-api")]
pub fn set_socks_username<V>(&self, value: V) -> ZmqResult<()>
where
V: AsRef<str>,
{
self.set_sockopt_string(SocketOption::SocksUsername, value.as_ref())
}
#[cfg(feature = "draft-api")]
pub fn socks_username(&self) -> ZmqResult<String> {
self.get_sockopt_string(SocketOption::SocksUsername)
}
#[cfg(feature = "draft-api")]
pub fn set_socks_password<V>(&self, value: V) -> ZmqResult<()>
where
V: AsRef<str>,
{
self.set_sockopt_string(SocketOption::SocksPassword, value.as_ref())
}
#[cfg(feature = "draft-api")]
pub fn socks_password(&self) -> ZmqResult<String> {
self.get_sockopt_string(SocketOption::SocksPassword)
}
pub fn set_tcp_keepalive(&self, value: i32) -> ZmqResult<()> {
self.set_sockopt_int(SocketOption::TcpKeepalive, value)
}
pub fn tcp_keepalive(&self) -> ZmqResult<i32> {
self.get_sockopt_int(SocketOption::TcpKeepalive)
}
pub fn set_tcp_keepalive_count(&self, value: i32) -> ZmqResult<()> {
self.set_sockopt_int(SocketOption::TcpKeepaliveCount, value)
}
pub fn tcp_keepalive_count(&self) -> ZmqResult<i32> {
self.get_sockopt_int(SocketOption::TcpKeepaliveCount)
}
pub fn set_tcp_keepalive_idle(&self, value: i32) -> ZmqResult<()> {
self.set_sockopt_int(SocketOption::TcpKeepaliveIdle, value)
}
pub fn tcp_keepalive_idle(&self) -> ZmqResult<i32> {
self.get_sockopt_int(SocketOption::TcpKeepaliveIdle)
}
pub fn set_tcp_keepalive_interval(&self, value: i32) -> ZmqResult<()> {
self.set_sockopt_int(SocketOption::TcpKeepaliveInterval, value)
}
pub fn tcp_keepalive_interval(&self) -> ZmqResult<i32> {
self.get_sockopt_int(SocketOption::TcpKeepaliveInterval)
}
pub fn set_tcp_max_retransmit_timeout(&self, value: i32) -> ZmqResult<()> {
self.set_sockopt_int(SocketOption::MaxTcpRetransmitTimeout, value)
}
pub fn tcp_max_retransmit_timeout(&self) -> ZmqResult<i32> {
self.get_sockopt_int(SocketOption::MaxTcpRetransmitTimeout)
}
pub fn set_type_of_service(&self, value: i32) -> ZmqResult<()> {
self.set_sockopt_int(SocketOption::TypeOfService, value)
}
pub fn type_of_service(&self) -> ZmqResult<i32> {
self.get_sockopt_int(SocketOption::TypeOfService)
}
pub fn set_zap_domain(&self, domain: &ZapDomain) -> ZmqResult<()> {
domain.apply(self)
}
pub fn zap_domain(&self) -> ZmqResult<ZapDomain> {
self.get_sockopt_string(SocketOption::ZapDomain)
.map(ZapDomain::from)
}
pub fn bind<E>(&self, endpoint: E) -> ZmqResult<()>
where
E: AsRef<str>,
{
self.socket.bind(endpoint.as_ref())
}
pub fn unbind<E>(&self, endpoint: E) -> ZmqResult<()>
where
E: AsRef<str>,
{
self.socket.unbind(endpoint.as_ref())
}
pub fn connect<E>(&self, endpoint: E) -> ZmqResult<()>
where
E: AsRef<str>,
{
self.socket.connect(endpoint.as_ref())
}
pub fn disconnect<E>(&self, endpoint: E) -> ZmqResult<()>
where
E: AsRef<str>,
{
self.socket.disconnect(endpoint.as_ref())
}
pub fn monitor<F>(&self, events: F) -> ZmqResult<MonitorSocket>
where
F: Into<MonitorFlags>,
{
let fd = self.get_sockopt_int::<usize>(SocketOption::FileDescriptor)?;
let monitor_endpoint = format!("inproc://monitor.s-{fd}");
self.socket
.monitor(&monitor_endpoint, events.into().bits() as i32)?;
let monitor = RawSocket::from_ctx(
self.context.as_raw(),
<Monitor as sealed::SocketType>::raw_socket_type() as i32,
)?;
monitor.connect(&monitor_endpoint)?;
Ok(Socket {
context: self.context.clone(),
socket: monitor.into(),
marker: PhantomData,
})
}
pub fn poll<E>(&self, events: E, timeout_ms: i64) -> ZmqResult<PollEvents>
where
E: Into<PollEvents>,
{
self.socket
.poll(events.into(), timeout_ms)?
.try_into()
.map(PollEvents::from_bits_truncate)
.map_err(|_err| ZmqError::InvalidArgument)
}
}
#[repr(transparent)]
#[derive(Debug, Clone, Copy, From, Default, PartialEq, Eq, PartialOrd, Ord)]
#[from(u16)]
pub struct MonitorFlags(u16);
bitflags! {
impl MonitorFlags: u16 {
const Connected = 0b0000_0000_0000_0001;
const ConnectDelayed = 0b0000_0000_0000_0010;
const ConnectRetried = 0b0000_0000_0000_0100;
const Listening = 0b0000_0000_0000_1000;
const BindFailed = 0b0000_0000_0001_0000;
const Accepted = 0b0000_0000_0010_0000;
const AcceptFailed = 0b0000_0000_0100_0000;
const Closed = 0b0000_0000_1000_0000;
const CloseFailed = 0b0000_0001_0000_0000;
const Disconnected = 0b0000_0010_0000_0000;
const MonitorStopped = 0b0000_0100_0000_0000;
const HandshakeFailedNoDetail = 0b0000_1000_0000_0000;
const HandshakeSucceeded = 0b0001_0000_0000_0000;
const HandshakeFailedProtocol = 0b0010_0000_0000_0000;
const HandshakeFailedAuth = 0b0100_0000_0000_0000;
}
}
#[repr(transparent)]
#[derive(Debug, Clone, Copy, From, Default, PartialEq, Eq, PartialOrd, Ord)]
pub struct RecvFlags(i32);
bitflags! {
impl RecvFlags: i32 {
const DONT_WAIT = 0b00000001;
}
}
#[cfg_attr(feature = "futures", async_trait)]
pub trait Receiver {
fn recv_msg<F>(&self, flags: F) -> ZmqResult<Message>
where
F: Into<RecvFlags> + Copy;
#[cfg(feature = "futures")]
async fn recv_msg_async(&self) -> Option<Message>;
}
#[cfg_attr(feature = "futures", async_trait)]
impl<T> Receiver for Socket<T>
where
T: sealed::SocketType + sealed::ReceiverFlag + Unpin,
Socket<T>: Sync,
{
fn recv_msg<F>(&self, flags: F) -> ZmqResult<Message>
where
F: Into<RecvFlags> + Copy,
{
self.socket
.recv(flags.into().bits())
.map(Message::from_raw_msg)
}
#[cfg(feature = "futures")]
async fn recv_msg_async(&self) -> Option<Message> {
futures::MessageReceivingFuture { receiver: self }.now_or_never()
}
}
#[cfg_attr(feature = "futures", async_trait)]
pub trait MultipartReceiver: Receiver {
fn recv_multipart<F>(&self, flags: F) -> ZmqResult<MultipartMessage>
where
F: Into<RecvFlags> + Copy,
{
iter::repeat_with(|| self.recv_msg(flags))
.try_fold(
MultipartMessage::new(),
|mut parts, zmq_result| match zmq_result {
Err(e) => ControlFlow::Break(Err(e)),
Ok(zmq_msg) => {
let got_more = zmq_msg.get_more();
parts.push_back(zmq_msg);
if got_more {
ControlFlow::Continue(parts)
} else {
ControlFlow::Break(Ok(parts))
}
}
},
)
.break_value()
.unwrap()
}
#[cfg(feature = "futures")]
async fn recv_multipart_async(&self) -> MultipartMessage {
::futures::stream::repeat_with(|| Ok(self.recv_msg_async()))
.try_fold(MultipartMessage::new(), |mut parts, zmq_msg| async move {
if let Some(msg) = zmq_msg.await {
let got_more = msg.get_more();
parts.push_back(msg);
if !got_more {
return Err(parts);
}
}
Ok(parts)
})
.await
.unwrap_err()
}
}
#[repr(transparent)]
#[derive(Debug, Clone, Copy, From, Default, PartialEq, Eq, PartialOrd, Ord)]
pub struct SendFlags(i32);
bitflags! {
impl SendFlags: i32 {
const DONT_WAIT = 0b00000001;
const SEND_MORE = 0b00000010;
}
}
#[cfg_attr(feature = "futures", async_trait)]
pub trait Sender {
fn send_msg<M, F>(&self, msg: M, flags: F) -> ZmqResult<()>
where
M: Into<Message>,
F: Into<SendFlags> + Copy;
#[cfg(feature = "futures")]
async fn send_msg_async<M, F>(&self, msg: M, flags: F) -> Option<()>
where
M: Into<Message> + Clone + Send,
F: Into<SendFlags> + Copy + Send;
}
#[cfg_attr(feature = "futures", async_trait)]
impl<T> Sender for Socket<T>
where
T: sealed::SocketType + sealed::SenderFlag + Unpin,
Socket<T>: Sync,
{
fn send_msg<M, F>(&self, msg: M, flags: F) -> ZmqResult<()>
where
M: Into<Message>,
F: Into<SendFlags> + Copy,
{
msg.into().send(self, flags.into().bits())
}
#[cfg(feature = "futures")]
async fn send_msg_async<M, F>(&self, msg: M, flags: F) -> Option<()>
where
M: Into<Message> + Clone + Send,
F: Into<SendFlags> + Copy + Send,
{
futures::MessageSendingFuture {
receiver: self,
message: msg,
flags: flags.into(),
}
.now_or_never()
}
}
#[cfg_attr(feature = "futures", async_trait)]
pub trait MultipartSender: Sender {
fn send_multipart<M, F>(&self, iter: M, flags: F) -> ZmqResult<()>
where
M: Into<MultipartMessage>,
F: Into<SendFlags> + Copy,
{
let mut last_part: Option<Message> = None;
for part in iter.into() {
let maybe_last = last_part.take();
if let Some(last) = maybe_last {
self.send_msg(last, flags.into() | SendFlags::SEND_MORE)?;
}
last_part = Some(part);
}
if let Some(last) = last_part {
self.send_msg(last, flags)
} else {
Ok(())
}
}
#[cfg(feature = "futures")]
async fn send_multipart_async<M, F>(&self, multipart: M, flags: F) -> Option<()>
where
M: Into<MultipartMessage> + Send,
F: Into<SendFlags> + Copy + Send,
{
let mut last_part = None;
for part in multipart.into() {
let maybe_last = last_part.take();
if let Some(last) = maybe_last {
self.send_msg_async(last, flags.into() | SendFlags::SEND_MORE)
.await?;
}
last_part = Some(part);
}
if let Some(last) = last_part {
self.send_msg_async(last, flags.into()).await
} else {
None
}
}
}
#[cfg(feature = "futures")]
mod futures {
use core::{pin::Pin, task::Poll};
use super::{RecvFlags, SendFlags, Socket};
use crate::{
message::{Message, Sendable},
sealed,
};
pub(super) struct MessageSendingFuture<'a, T, M>
where
T: sealed::SocketType + sealed::SenderFlag + Unpin,
M: Into<Message> + Clone + Send,
{
pub(super) receiver: &'a Socket<T>,
pub(super) message: M,
pub(super) flags: SendFlags,
}
#[rustversion::attr(before(1.87), allow(clippy::needless_lifetimes))]
impl<'a, T, M> Future for MessageSendingFuture<'a, T, M>
where
T: sealed::SocketType + sealed::SenderFlag + Unpin,
M: Into<Message> + Clone + Send,
{
type Output = ();
fn poll(self: Pin<&mut Self>, _ctx: &mut core::task::Context<'_>) -> Poll<Self::Output> {
let message = self.message.clone().into();
message
.send(self.receiver, self.flags.bits())
.map_or(Poll::Pending, Poll::Ready)
}
}
pub(super) struct MessageReceivingFuture<'a, T>
where
T: sealed::SocketType + sealed::ReceiverFlag + Unpin,
{
pub(super) receiver: &'a Socket<T>,
}
impl<T> Future for MessageReceivingFuture<'_, T>
where
T: sealed::SocketType + sealed::ReceiverFlag + Unpin,
{
type Output = Message;
fn poll(self: Pin<&mut Self>, _ctx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
self.receiver
.socket
.recv(RecvFlags::DONT_WAIT.bits())
.map(Message::from_raw_msg)
.map_or(Poll::Pending, Poll::Ready)
}
}
}
#[repr(transparent)]
#[derive(Debug, Clone, Copy, From, Default, PartialEq, Eq, PartialOrd, Ord)]
pub struct PollEvents(i16);
bitflags! {
impl PollEvents: i16 {
const POLL_IN = 0b0000_0001;
const POLL_OUT = 0b0000_0010;
const POLL_ERR = 0b0000_0100;
const POLL_PRI = 0b0000_1000;
}
}
#[cfg(feature = "draft-api")]
#[repr(transparent)]
#[derive(Debug, Clone, Copy, From, Default, PartialEq, Eq, PartialOrd, Ord)]
pub struct ReconnectStop(i32);
#[cfg(feature = "draft-api")]
bitflags! {
impl ReconnectStop: i32 {
const CONNECTION_REFUSED = zmq_sys_crate::ZMQ_RECONNECT_STOP_CONN_REFUSED as i32;
const HANDSHAKE_FAILED = zmq_sys_crate::ZMQ_RECONNECT_STOP_HANDSHAKE_FAILED as i32;
const AFTER_DISCONNECT = zmq_sys_crate::ZMQ_RECONNECT_STOP_AFTER_DISCONNECT as i32;
}}
#[cfg(test)]
mod socket_tests {
use std::{thread, time::Duration};
#[cfg(feature = "draft-api")]
use rstest::*;
#[cfg(feature = "draft-api")]
use super::ReconnectStop;
use super::{
DealerSocket, MonitorFlags, MonitorSocketEvent, PairSocket, PollEvents, SendFlags,
};
#[cfg(zmq_has = "gssapi")]
use crate::security::GssApiNametype;
use crate::{
prelude::{Context, MonitorReceiver, Sender, ZmqResult},
security::SecurityMechanism,
};
#[test]
fn set_affinity_sets_affinity() -> ZmqResult<()> {
let context = Context::new()?;
let socket = PairSocket::from_context(&context)?;
socket.set_affinity(42)?;
assert_eq!(socket.affinity()?, 42);
Ok(())
}
#[cfg(feature = "draft-api")]
#[test]
fn set_backlog_sets_backlog() -> ZmqResult<()> {
let context = Context::new()?;
let socket = PairSocket::from_context(&context)?;
socket.set_backlog(42)?;
assert_eq!(socket.backlog()?, 42);
Ok(())
}
#[test]
fn set_connect_timeout_sets_connect_timeout() -> ZmqResult<()> {
let context = Context::new()?;
let socket = PairSocket::from_context(&context)?;
socket.set_connect_timeout(42)?;
assert_eq!(socket.connect_timeout()?, 42);
Ok(())
}
#[test]
fn events_when_no_events_available() -> ZmqResult<()> {
let context = Context::new()?;
let socket = PairSocket::from_context(&context)?;
assert_eq!(socket.events()?, PollEvents::empty());
Ok(())
}
#[test]
fn events_when_connected() -> ZmqResult<()> {
let context = Context::new()?;
let endpoint = "inproc://test";
let server_socket = PairSocket::from_context(&context)?;
server_socket.bind(endpoint)?;
let client_socket = PairSocket::from_context(&context)?;
client_socket.connect(endpoint)?;
assert_eq!(client_socket.events()?, PollEvents::POLL_OUT);
Ok(())
}
#[cfg(zmq_has = "gssapi")]
#[test]
fn set_gssapi_plaintext_sets_gssapi_plaintext() -> ZmqResult<()> {
let context = Context::new()?;
let socket = PairSocket::from_context(&context)?;
socket.set_gssapi_plaintext(true)?;
assert!(socket.gssapi_plaintext()?);
Ok(())
}
#[cfg(zmq_has = "gssapi")]
#[test]
fn set_gssapi_service_principal_sets_gssapi_service_principal() -> ZmqResult<()> {
let context = Context::new()?;
let socket = PairSocket::from_context(&context)?;
socket.set_gssapi_service_principal_nametype(GssApiNametype::NtUsername)?;
assert_eq!(
socket.gssapi_service_principal_nametype()?,
GssApiNametype::NtUsername
);
Ok(())
}
#[cfg(zmq_has = "gssapi")]
#[test]
fn set_gssapi_principal_sets_gssapi_principal() -> ZmqResult<()> {
let context = Context::new()?;
let socket = PairSocket::from_context(&context)?;
socket.set_gssapi_principal("test")?;
assert_eq!(socket.gssapi_principal()?, "test");
Ok(())
}
#[cfg(zmq_has = "gssapi")]
#[test]
fn set_gssapi_principal_nametype_sets_gssapi_principal_nametype() -> ZmqResult<()> {
let context = Context::new()?;
let socket = PairSocket::from_context(&context)?;
socket.set_gssapi_principal_nametype(GssApiNametype::NtHostbased)?;
assert_eq!(
socket.gssapi_principal_nametype()?,
GssApiNametype::NtHostbased
);
Ok(())
}
#[test]
fn set_handshake_interval_sets_handshake_interval() -> ZmqResult<()> {
let context = Context::new()?;
let socket = PairSocket::from_context(&context)?;
socket.set_handshake_interval(42)?;
assert_eq!(socket.handshake_interval()?, 42);
Ok(())
}
#[test]
fn set_heartbeat_interval_sets_heartbeat_interval() -> ZmqResult<()> {
let context = Context::new()?;
let socket = PairSocket::from_context(&context)?;
socket.set_heartbeat_interval(42)?;
assert_eq!(socket.heartbeat_interval()?, 42);
Ok(())
}
#[test]
fn set_heartbeat_timeout_sets_heartbeat_timeout() -> ZmqResult<()> {
let context = Context::new()?;
let socket = PairSocket::from_context(&context)?;
socket.set_heartbeat_timeout(42)?;
assert_eq!(socket.heartbeat_timeout()?, 42);
Ok(())
}
#[test]
fn set_heartbeat_timetolive_sets_heartbeat_ttl() -> ZmqResult<()> {
let context = Context::new()?;
let socket = PairSocket::from_context(&context)?;
socket.set_heartbeat_timetolive(42_000)?;
assert_eq!(socket.heartbeat_timetolive()?, 42_000);
Ok(())
}
#[test]
fn set_immediate_sets_immediate() -> ZmqResult<()> {
let context = Context::new()?;
let socket = PairSocket::from_context(&context)?;
socket.set_immediate(true)?;
assert!(socket.immediate()?);
Ok(())
}
#[test]
fn set_ipv6_sets_ipv6() -> ZmqResult<()> {
let context = Context::new()?;
let socket = PairSocket::from_context(&context)?;
socket.set_ipv6(true)?;
assert!(socket.ipv6()?);
Ok(())
}
#[test]
fn set_linger_sets_linger() -> ZmqResult<()> {
let context = Context::new()?;
let socket = PairSocket::from_context(&context)?;
socket.set_linger(42)?;
assert_eq!(socket.linger()?, 42);
Ok(())
}
#[test]
fn last_endpoint_when_not_bound_or_connected() -> ZmqResult<()> {
let context = Context::new()?;
let socket = PairSocket::from_context(&context)?;
assert_eq!(socket.last_endpoint()?, "");
Ok(())
}
#[test]
fn last_endpoint_when_bound() -> ZmqResult<()> {
let context = Context::new()?;
let socket = PairSocket::from_context(&context)?;
socket.bind("inproc://last-endpoint-test")?;
assert_eq!(socket.last_endpoint()?, "inproc://last-endpoint-test");
Ok(())
}
#[test]
fn set_max_msg_size_sets_max_msg_size() -> ZmqResult<()> {
let context = Context::new()?;
let socket = PairSocket::from_context(&context)?;
socket.set_max_message_size(42)?;
assert_eq!(socket.max_message_size()?, 42);
Ok(())
}
#[test]
fn set_security_mechanism_set_security_mechanism() -> ZmqResult<()> {
let context = Context::new()?;
let socket = PairSocket::from_context(&context)?;
socket.set_security_mechanism(&SecurityMechanism::Plain {
username: "username".into(),
password: "supersecret".into(),
})?;
assert_eq!(
socket.security_mechanism()?,
SecurityMechanism::Plain {
username: "username".into(),
password: "supersecret".into()
}
);
Ok(())
}
#[test]
fn set_multicast_hops_sets_multicast_hops() -> ZmqResult<()> {
let context = Context::new()?;
let socket = PairSocket::from_context(&context)?;
socket.set_multicast_hops(42)?;
assert_eq!(socket.multicast_hops()?, 42);
Ok(())
}
#[test]
fn set_rate_sets_rate() -> ZmqResult<()> {
let context = Context::new()?;
let socket = PairSocket::from_context(&context)?;
socket.set_rate(42)?;
assert_eq!(socket.rate()?, 42);
Ok(())
}
#[test]
fn set_receive_buffer_sets_receive_buffer() -> ZmqResult<()> {
let context = Context::new()?;
let socket = PairSocket::from_context(&context)?;
socket.set_receive_buffer(42)?;
assert_eq!(socket.receive_buffer()?, 42);
Ok(())
}
#[test]
fn set_receive_high_watermark_sets_receive_high_watermark() -> ZmqResult<()> {
let context = Context::new()?;
let socket = PairSocket::from_context(&context)?;
socket.set_receive_highwater_mark(42)?;
assert_eq!(socket.receive_highwater_mark()?, 42);
Ok(())
}
#[test]
fn set_receive_timeout_sets_receive_timeout() -> ZmqResult<()> {
let context = Context::new()?;
let socket = PairSocket::from_context(&context)?;
socket.set_receive_timeout(42)?;
assert_eq!(socket.receive_timeout()?, 42);
Ok(())
}
#[test]
fn set_reconnect_interval_sets_reconnect_interval() -> ZmqResult<()> {
let context = Context::new()?;
let socket = PairSocket::from_context(&context)?;
socket.set_reconnect_interval(42)?;
assert_eq!(socket.reconnect_interval()?, 42);
Ok(())
}
#[test]
fn set_reconnect_interval_max_sets_reconnect_interval_max() -> ZmqResult<()> {
let context = Context::new()?;
let socket = PairSocket::from_context(&context)?;
socket.set_reconnect_interval_max(42)?;
assert_eq!(socket.reconnect_interval_max()?, 42);
Ok(())
}
#[cfg(feature = "draft-api")]
#[test]
fn set_reconnect_stop_sets_reconnect_stop() -> ZmqResult<()> {
let context = Context::new()?;
let socket = PairSocket::from_context(&context)?;
socket.set_reconnect_stop(
ReconnectStop::AFTER_DISCONNECT | ReconnectStop::CONNECTION_REFUSED,
)?;
assert_eq!(
socket.reconnect_stop()?,
ReconnectStop::AFTER_DISCONNECT | ReconnectStop::CONNECTION_REFUSED
);
Ok(())
}
#[test]
fn set_recoveery_interval_sets_recovery_interval() -> ZmqResult<()> {
let context = Context::new()?;
let socket = PairSocket::from_context(&context)?;
socket.set_recovery_interval(42)?;
assert_eq!(socket.recovery_interval()?, 42);
Ok(())
}
#[test]
fn set_send_buffer_sets_send_buffer() -> ZmqResult<()> {
let context = Context::new()?;
let socket = PairSocket::from_context(&context)?;
socket.set_send_buffer(42)?;
assert_eq!(socket.send_buffer()?, 42);
Ok(())
}
#[test]
fn set_send_high_watermark_sets_send_high_watermark() -> ZmqResult<()> {
let context = Context::new()?;
let socket = PairSocket::from_context(&context)?;
socket.set_send_highwater_mark(42)?;
assert_eq!(socket.send_highwater_mark()?, 42);
Ok(())
}
#[test]
fn set_send_timeout_sets_send_timeout() -> ZmqResult<()> {
let context = Context::new()?;
let socket = PairSocket::from_context(&context)?;
socket.set_send_timeout(42)?;
assert_eq!(socket.send_timeout()?, 42);
Ok(())
}
#[cfg(feature = "draft-api")]
#[rstest]
#[case(None)]
#[case(Some("asdf"))]
fn set_socks_proxy_sets_proxy_value(#[case] socks_proxy: Option<&str>) -> ZmqResult<()> {
let context = Context::new()?;
let socket = PairSocket::from_context(&context)?;
socket.set_socks_proxy(socks_proxy)?;
assert_eq!(socket.socks_proxy()?, socks_proxy.unwrap_or(""));
Ok(())
}
#[cfg(feature = "draft-api")]
#[test]
fn set_socks_username_sets_proxy_username() -> ZmqResult<()> {
let context = Context::new()?;
let socket = PairSocket::from_context(&context)?;
socket.set_socks_username("username")?;
assert_eq!(socket.socks_username()?, "username");
Ok(())
}
#[cfg(feature = "draft-api")]
#[test]
fn set_socks_password_sets_proxy_password() -> ZmqResult<()> {
let context = Context::new()?;
let socket = PairSocket::from_context(&context)?;
socket.set_socks_password("password")?;
assert_eq!(socket.socks_password()?, "password");
Ok(())
}
#[test]
fn set_tcp_keepalive_sets_tcp_keepalive() -> ZmqResult<()> {
let context = Context::new()?;
let socket = PairSocket::from_context(&context)?;
socket.set_tcp_keepalive(1)?;
assert_eq!(socket.tcp_keepalive()?, 1);
Ok(())
}
#[test]
fn set_tcp_keepalive_count_sets_tcp_keepalive_count() -> ZmqResult<()> {
let context = Context::new()?;
let socket = PairSocket::from_context(&context)?;
socket.set_tcp_keepalive_count(42)?;
assert_eq!(socket.tcp_keepalive_count()?, 42);
Ok(())
}
#[test]
fn set_tcp_keepalive_idle_sets_tcp_keepalive_idle() -> ZmqResult<()> {
let context = Context::new()?;
let socket = PairSocket::from_context(&context)?;
socket.set_tcp_keepalive_idle(42)?;
assert_eq!(socket.tcp_keepalive_idle()?, 42);
Ok(())
}
#[test]
fn set_tcp_keepalive_interval_sets_tcp_keepalive_interval() -> ZmqResult<()> {
let context = Context::new()?;
let socket = PairSocket::from_context(&context)?;
socket.set_tcp_keepalive_interval(42)?;
assert_eq!(socket.tcp_keepalive_interval()?, 42);
Ok(())
}
#[test]
fn set_tcp_max_retransmit_timout_set_retransmit_timeout() -> ZmqResult<()> {
let context = Context::new()?;
let socket = PairSocket::from_context(&context)?;
socket.set_tcp_max_retransmit_timeout(42)?;
assert_eq!(socket.tcp_max_retransmit_timeout()?, 42);
Ok(())
}
#[test]
fn set_type_of_service_sets_type_of_service() -> ZmqResult<()> {
let context = Context::new()?;
let socket = PairSocket::from_context(&context)?;
socket.set_type_of_service(42)?;
assert_eq!(socket.type_of_service()?, 42);
Ok(())
}
#[test]
fn set_zap_domain_sets_zap_domain() -> ZmqResult<()> {
let context = Context::new()?;
let socket = PairSocket::from_context(&context)?;
socket.set_zap_domain(&"zap".into())?;
assert_eq!(socket.zap_domain()?, "zap".into());
Ok(())
}
#[test]
fn unbind_unbinds_endpoint() -> ZmqResult<()> {
let context = Context::new()?;
let endpoint = "inproc://unbind-test";
let socket = PairSocket::from_context(&context)?;
socket.bind(endpoint)?;
assert!(socket.unbind(endpoint).is_ok());
Ok(())
}
#[test]
fn connect_connects_to_endpoint() -> ZmqResult<()> {
let context = Context::new()?;
let endpoint = "inproc://connect-test";
let server_socket = PairSocket::from_context(&context)?;
server_socket.bind(endpoint)?;
let client_socket = PairSocket::from_context(&context)?;
assert!(client_socket.connect(endpoint).is_ok());
Ok(())
}
#[test]
fn disconnect_disconnects_from_endpoint() -> ZmqResult<()> {
let context = Context::new()?;
let endpoint = "inproc://disconnect-test";
let server_socket = PairSocket::from_context(&context)?;
server_socket.bind(endpoint)?;
let client_socket = PairSocket::from_context(&context)?;
client_socket.connect(endpoint)?;
assert!(client_socket.disconnect(endpoint).is_ok());
Ok(())
}
#[test]
fn monitor_sets_up_socket_monitor() -> ZmqResult<()> {
let context = Context::new()?;
let dealer_server = DealerSocket::from_context(&context)?;
dealer_server.bind("tcp://127.0.0.1:*")?;
let client_endpoint = dealer_server.last_endpoint()?;
thread::spawn(move || {
loop {
thread::sleep(Duration::from_millis(10));
}
});
let dealer_client = DealerSocket::from_context(&context)?;
let dealer_monitor = dealer_client.monitor(MonitorFlags::Connected)?;
dealer_client.connect(client_endpoint)?;
loop {
match dealer_monitor.recv_monitor_event() {
Err(_) => continue,
Ok(event) => {
assert_eq!(event, MonitorSocketEvent::Connected);
break;
}
}
}
Ok(())
}
#[cfg(feature = "futures")]
#[test]
fn monitor_sets_up_async_socket_monitor() -> ZmqResult<()> {
let context = Context::new()?;
let dealer_server = DealerSocket::from_context(&context)?;
dealer_server.bind("tcp://127.0.0.1:*")?;
let client_endpoint = dealer_server.last_endpoint()?;
thread::spawn(move || {
loop {
thread::sleep(Duration::from_millis(10));
}
});
futures::executor::block_on(async {
let dealer_client = DealerSocket::from_context(&context)?;
let dealer_monitor = dealer_client.monitor(MonitorFlags::Connected)?;
dealer_client.connect(client_endpoint)?;
loop {
match dealer_monitor.recv_monitor_event_async().await {
Some(event) => {
assert_eq!(event, MonitorSocketEvent::Connected);
break;
}
_ => continue,
}
}
Ok(())
})
}
#[test]
fn poll_on_socket_when_no_events_available() -> ZmqResult<()> {
let context = Context::new()?;
let socket = PairSocket::from_context(&context)?;
assert_eq!(socket.poll(PollEvents::all(), 0)?, PollEvents::empty());
Ok(())
}
#[test]
fn poll_on_socket_when_event_available() -> ZmqResult<()> {
let context = Context::new()?;
let endpoint = "inproc://poll-test";
let pair_server = PairSocket::from_context(&context)?;
pair_server.bind(endpoint)?;
let pair_client = PairSocket::from_context(&context)?;
pair_client.connect(endpoint)?;
pair_server.send_msg("msg1", SendFlags::empty())?;
pair_server.send_msg("msg2", SendFlags::empty())?;
pair_server.send_msg("msg3", SendFlags::empty())?;
assert_eq!(pair_client.poll(PollEvents::all(), 0)?, PollEvents::POLL_IN);
Ok(())
}
}
#[cfg(feature = "builder")]
pub(crate) mod builder {
use derive_builder::Builder;
use serde::{Deserialize, Serialize};
use crate::{
ZmqResult, auth::ZapDomain, context::Context, sealed, security::SecurityMechanism,
socket::Socket,
};
#[derive(Default, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, Builder)]
#[builder(
pattern = "owned",
name = "SocketBuilder",
public,
build_fn(skip, error = "ZmqError"),
derive(PartialEq, Eq, Hash, Clone, serde::Serialize, serde::Deserialize)
)]
#[builder_struct_attr(doc = "Builder for [`Socket`].\n\n")]
#[allow(dead_code)]
struct SocketConfig {
#[cfg(feature = "draft-api")]
#[builder(default = false)]
busy_poll: bool,
#[builder(setter(into), default = 0)]
connect_timeout: i32,
#[builder(setter(into), default = 30_000)]
handshake_interval: i32,
#[builder(setter(into), default = 0)]
heartbeat_interval: i32,
#[builder(setter(into), default = 0)]
heartbeat_timeout: i32,
#[builder(setter(into), default = 0)]
heartbeat_timetolive: i32,
#[builder(default = false)]
immediate: bool,
#[builder(default = false)]
ipv6: bool,
#[builder(setter(into), default = 0)]
linger: i32,
#[builder(setter(into), default = -1)]
max_message_size: i64,
#[builder(setter(into), default = -1)]
receive_buffer: i32,
#[builder(setter(into), default = 1_000)]
receive_highwater_mark: i32,
#[builder(setter(into), default = -1)]
receive_timeout: i32,
#[builder(setter(into), default = 100)]
reconnect_interval: i32,
#[builder(setter(into), default = 0)]
reconnect_interval_max: i32,
#[builder(setter(into), default = -1)]
send_buffer: i32,
#[builder(setter(into), default = 1_000)]
send_highwater_mark: i32,
#[builder(setter(into), default = -1)]
send_timeout: i32,
#[builder(setter(into))]
zap_domain: ZapDomain,
#[builder(default = "SecurityMechanism::Null")]
security_mechanism: SecurityMechanism,
}
impl SocketBuilder {
pub fn apply<T>(self, socket: &Socket<T>) -> ZmqResult<()>
where
T: sealed::SocketType,
{
#[cfg(feature = "draft-api")]
self.busy_poll
.iter()
.try_for_each(|busy_poll| socket.set_busy_poll(*busy_poll))?;
self.connect_timeout
.iter()
.try_for_each(|connect_timeout| socket.set_connect_timeout(*connect_timeout))?;
self.handshake_interval
.iter()
.try_for_each(|handshake_interval| {
socket.set_handshake_interval(*handshake_interval)
})?;
self.heartbeat_interval
.iter()
.try_for_each(|heartbeat_interval| {
socket.set_heartbeat_interval(*heartbeat_interval)
})?;
self.heartbeat_timeout
.iter()
.try_for_each(|heartbeat_timeout| {
socket.set_heartbeat_timeout(*heartbeat_timeout)
})?;
self.heartbeat_timetolive
.iter()
.try_for_each(|heartbeat_timetolive| {
socket.set_heartbeat_timetolive(*heartbeat_timetolive)
})?;
self.immediate
.iter()
.try_for_each(|immediate| socket.set_immediate(*immediate))?;
self.ipv6
.iter()
.try_for_each(|ipv6| socket.set_ipv6(*ipv6))?;
self.linger
.iter()
.try_for_each(|linger| socket.set_linger(*linger))?;
self.max_message_size
.iter()
.try_for_each(|max_message_size| socket.set_max_message_size(*max_message_size))?;
self.receive_buffer
.iter()
.try_for_each(|receive_buffer| socket.set_receive_buffer(*receive_buffer))?;
self.receive_highwater_mark
.iter()
.try_for_each(|receive_highwater_mark| {
socket.set_receive_highwater_mark(*receive_highwater_mark)
})?;
self.receive_timeout
.iter()
.try_for_each(|receive_timeout| socket.set_receive_timeout(*receive_timeout))?;
self.reconnect_interval
.iter()
.try_for_each(|reconnect_interval| {
socket.set_reconnect_interval(*reconnect_interval)
})?;
self.reconnect_interval_max
.iter()
.try_for_each(|reconnect_interval_max| {
socket.set_reconnect_interval_max(*reconnect_interval_max)
})?;
self.send_buffer
.iter()
.try_for_each(|send_buffer| socket.set_send_buffer(*send_buffer))?;
self.send_highwater_mark
.iter()
.try_for_each(|send_highwater_mark| {
socket.set_send_highwater_mark(*send_highwater_mark)
})?;
self.send_timeout
.iter()
.try_for_each(|send_timeout| socket.set_send_timeout(*send_timeout))?;
self.zap_domain
.iter()
.try_for_each(|zap_domain| socket.set_zap_domain(zap_domain))?;
self.security_mechanism
.iter()
.try_for_each(|security_mechanism| {
socket.set_security_mechanism(security_mechanism)
})?;
Ok(())
}
pub fn build_from_context<T>(self, context: &Context) -> ZmqResult<Socket<T>>
where
T: sealed::SocketType,
{
let socket = Socket::<T>::from_context(context)?;
self.apply(&socket)?;
Ok(socket)
}
}
#[cfg(test)]
mod socket_builder_tests {
use super::SocketBuilder;
use crate::{
auth::ZapDomain,
prelude::{Context, PairSocket, ZmqResult},
security::SecurityMechanism,
};
#[test]
fn default_socket_builder() -> ZmqResult<()> {
let context = Context::new()?;
let builder = SocketBuilder::default();
let socket = PairSocket::from_context(&context)?;
builder.apply(&socket)?;
assert_eq!(socket.connect_timeout()?, 0);
assert_eq!(socket.handshake_interval()?, 30_000);
assert_eq!(socket.heartbeat_interval()?, 0);
assert_eq!(socket.heartbeat_timeout()?, -1);
assert_eq!(socket.heartbeat_timetolive()?, 0);
assert!(!socket.immediate()?);
assert!(!socket.ipv6()?);
assert_eq!(socket.linger()?, -1);
assert_eq!(socket.max_message_size()?, -1);
assert_eq!(socket.receive_buffer()?, -1);
assert_eq!(socket.receive_highwater_mark()?, 1_000);
assert_eq!(socket.receive_timeout()?, -1);
assert_eq!(socket.reconnect_interval()?, 100);
assert_eq!(socket.reconnect_interval_max()?, 0);
assert_eq!(socket.send_buffer()?, -1);
assert_eq!(socket.send_highwater_mark()?, 1_000);
assert_eq!(socket.send_timeout()?, -1);
assert_eq!(socket.zap_domain()?, ZapDomain::new("".into()));
assert_eq!(socket.security_mechanism()?, SecurityMechanism::Null);
Ok(())
}
#[test]
fn builder_with_custom_setttings() -> ZmqResult<()> {
let context = Context::new()?;
let builder = SocketBuilder::default()
.connect_timeout(42)
.handshake_interval(21)
.heartbeat_interval(666)
.heartbeat_timeout(1337)
.heartbeat_timetolive(420)
.immediate(true)
.ipv6(true)
.linger(1337)
.max_message_size(1337)
.receive_buffer(1337)
.receive_highwater_mark(1337)
.receive_timeout(1337)
.reconnect_interval(1337)
.reconnect_interval_max(1337)
.send_buffer(1337)
.send_highwater_mark(1337)
.send_timeout(1337)
.zap_domain(ZapDomain::new("test".into()))
.security_mechanism(SecurityMechanism::Plain {
username: "username".into(),
password: "supersecret".into(),
});
let socket = PairSocket::from_context(&context)?;
builder.apply(&socket)?;
assert_eq!(socket.connect_timeout()?, 42);
assert_eq!(socket.handshake_interval()?, 21);
assert_eq!(socket.heartbeat_interval()?, 666);
assert_eq!(socket.heartbeat_timeout()?, 1337);
assert_eq!(socket.heartbeat_timetolive()?, 400);
assert!(socket.immediate()?);
assert!(socket.ipv6()?);
assert_eq!(socket.linger()?, 1337);
assert_eq!(socket.max_message_size()?, 1337);
assert_eq!(socket.receive_buffer()?, 1337);
assert_eq!(socket.receive_highwater_mark()?, 1337);
assert_eq!(socket.receive_timeout()?, 1337);
assert_eq!(socket.reconnect_interval()?, 1337);
assert_eq!(socket.reconnect_interval_max()?, 1337);
assert_eq!(socket.send_buffer()?, 1337);
assert_eq!(socket.send_highwater_mark()?, 1337);
assert_eq!(socket.send_timeout()?, 1337);
assert_eq!(socket.zap_domain()?, ZapDomain::new("test".into()));
assert_eq!(
socket.security_mechanism()?,
SecurityMechanism::Plain {
username: "username".into(),
password: "supersecret".into()
}
);
Ok(())
}
}
}