use std::time::Duration;
use crate::{Blob, CoreState, ZmqError};
pub const SNDHWM: i32 = 23;
pub const RCVHWM: i32 = 24;
pub const LINGER: i32 = 17;
pub const SUBSCRIBE: i32 = 6;
pub const UNSUBSCRIBE: i32 = 7;
pub const ROUTING_ID: i32 = 5; pub const RECONNECT_IVL: i32 = 18; pub const RECONNECT_IVL_MAX: i32 = 21; pub const RCVTIMEO: i32 = 27;
pub const SNDTIMEO: i32 = 28;
pub const LAST_ENDPOINT: i32 = 32;
pub const TCP_KEEPALIVE: i32 = 34;
pub const TCP_KEEPALIVE_IDLE: i32 = 35;
pub const TCP_KEEPALIVE_CNT: i32 = 36;
pub const TCP_KEEPALIVE_INTVL: i32 = 37;
pub const HEARTBEAT_IVL: i32 = 38; pub const HEARTBEAT_TIMEOUT: i32 = 39; pub const HEARTBEAT_TTL: i32 = 40; pub const HANDSHAKE_IVL: i32 = 41;
pub const ROUTER_MANDATORY: i32 = 33;
pub const AUTO_DELIMITER: i32 = 42;
pub const ZAP_DOMAIN: i32 = 55; pub const PLAIN_SERVER: i32 = 44;
pub const PLAIN_USERNAME: i32 = 45;
pub const PLAIN_PASSWORD: i32 = 46;
pub const NOISE_XX_ENABLED: i32 = 1202; pub const NOISE_XX_STATIC_SECRET_KEY: i32 = 1200; pub const NOISE_XX_REMOTE_STATIC_PUBLIC_KEY: i32 = 1201;
pub const CURVE_SERVER: i32 = 47; pub const CURVE_SECRET_KEY: i32 = 49; pub const CURVE_SERVER_KEY: i32 = 48;
pub const MAXMSGSIZE: i32 = 22;
pub const MAX_CONNECTIONS: i32 = 1000;
#[cfg(feature = "io-uring")]
pub const IO_URING_SNDZEROCOPY: i32 = 1170;
#[cfg(feature = "io-uring")]
pub const IO_URING_RCVMULTISHOT: i32 = 1171;
pub const TCP_CORK: i32 = 1172;
pub const IO_URING_SESSION_ENABLED: i32 = 1175;
pub const DEFAULT_RECONNECT_IVL_MS: u64 = 1000;
#[derive(Debug, Clone)]
pub(crate) struct SocketOptions {
pub rcvhwm: usize,
pub sndhwm: usize,
pub rcvtimeo: Option<Duration>,
pub sndtimeo: Option<Duration>,
pub linger: Option<Duration>, pub reconnect_ivl: Option<Duration>, pub reconnect_ivl_max: Option<Duration>, pub backlog: Option<u32>,
pub routing_id: Option<Blob>,
pub socket_type_name: String, pub tcp_keepalive_enabled: i32, pub tcp_keepalive_idle: Option<Duration>,
pub tcp_keepalive_count: Option<u32>,
pub tcp_keepalive_interval: Option<Duration>,
pub tcp_nodelay: bool, pub max_connections: Option<usize>,
pub maxmsgsize: i64,
pub heartbeat_ivl: Option<Duration>,
pub heartbeat_timeout: Option<Duration>,
pub handshake_ivl: Option<Duration>,
pub router_mandatory: bool,
pub tcp_cork: bool,
pub io_uring: IOURingSocketOptions,
pub zap_domain: Option<String>, pub plain_options: PlainMechanismSocketOptions,
#[cfg(feature = "curve")]
pub curve_options: CurveMechanismSocketOptions,
#[cfg(feature = "noise_xx")]
pub noise_xx_options: NoiseXxSocketOptions,
}
impl Default for SocketOptions {
fn default() -> Self {
Self {
rcvhwm: 256,
sndhwm: 256,
rcvtimeo: None, sndtimeo: None, linger: Some(Duration::ZERO), reconnect_ivl: Some(Duration::from_millis(DEFAULT_RECONNECT_IVL_MS)),
reconnect_ivl_max: Some(Duration::ZERO), backlog: None,
routing_id: None,
socket_type_name: "UNKNOWN".to_string(), tcp_keepalive_enabled: 0, tcp_keepalive_idle: None,
tcp_keepalive_count: None,
tcp_keepalive_interval: None,
tcp_nodelay: true, max_connections: Some(1024),
maxmsgsize: -1, heartbeat_ivl: None, heartbeat_timeout: None,
handshake_ivl: None,
router_mandatory: false, tcp_cork: false,
io_uring: Default::default(),
zap_domain: None,
plain_options: Default::default(),
#[cfg(feature = "noise_xx")]
noise_xx_options: NoiseXxSocketOptions::default(),
#[cfg(feature = "curve")]
curve_options: CurveMechanismSocketOptions::default(),
}
}
}
#[derive(Debug, Clone)]
pub struct IOURingSocketOptions {
pub send_zerocopy: bool,
pub recv_multishot: bool,
pub session_enabled: bool,
}
impl Default for IOURingSocketOptions {
fn default() -> Self {
Self {
session_enabled: false,
send_zerocopy: false,
recv_multishot: false,
}
}
}
#[cfg(feature = "curve")]
#[derive(Debug, Clone, Default)]
pub struct CurveMechanismSocketOptions {
pub enabled: bool,
pub server_role: bool, pub secret_key: Option<[u8; 32]>,
pub server_public_key: Option<[u8; 32]>, }
#[cfg(feature = "noise_xx")]
#[derive(Debug, Clone, Default)]
pub struct NoiseXxSocketOptions {
pub enabled: bool,
pub static_secret_key_bytes: Option<[u8; 32]>,
pub remote_static_public_key_bytes: Option<[u8; 32]>,
}
#[derive(Debug, Clone, Default)]
pub(crate) struct TcpTransportConfig {
pub tcp_nodelay: bool,
pub keepalive_time: Option<Duration>,
pub keepalive_interval: Option<Duration>,
pub keepalive_count: Option<u32>,
}
#[derive(Debug, Clone, Default)]
pub struct PlainMechanismSocketOptions {
pub enabled: bool,
pub server_role: Option<bool>, pub username: Option<String>, pub password: Option<String>,
}
#[derive(Debug, Clone, Default)]
pub(crate) struct ZmtpEngineConfig {
pub routing_id: Option<Blob>,
pub socket_type_name: String,
pub security_enabled: bool,
pub heartbeat_ivl: Option<Duration>,
pub heartbeat_timeout: Option<Duration>,
pub handshake_timeout: Option<Duration>,
pub rcvtimeo: Option<Duration>,
pub sndtimeo: Option<Duration>,
pub use_send_zerocopy: bool,
pub use_recv_multishot: bool,
pub use_cork: bool,
#[cfg(feature = "noise_xx")]
pub use_noise_xx: bool,
#[cfg(feature = "noise_xx")]
pub noise_xx_local_sk_bytes_for_engine: Option<[u8; 32]>,
#[cfg(feature = "noise_xx")]
pub noise_xx_remote_pk_bytes_for_engine: Option<[u8; 32]>,
#[cfg(feature = "curve")]
pub use_curve: bool,
#[cfg(feature = "curve")]
pub curve_local_secret_key: Option<[u8; 32]>,
#[cfg(feature = "curve")]
pub curve_remote_public_key: Option<[u8; 32]>,
pub use_plain: bool,
pub plain_username_for_engine: Option<String>,
pub plain_password_for_engine: Option<String>,
pub max_msg_size: i64,
}
impl From<&SocketOptions> for ZmtpEngineConfig {
fn from(options: &SocketOptions) -> Self {
let security_enabled = options.plain_options.enabled
|| {
#[cfg(feature = "noise_xx")]
{ options.noise_xx_options.enabled }
#[cfg(not(feature = "noise_xx"))]
{ false }
}
|| {
#[cfg(feature = "curve")]
{ options.curve_options.enabled }
#[cfg(not(feature = "curve"))]
{ false }
};
ZmtpEngineConfig {
routing_id: options.routing_id.clone(),
socket_type_name: options.socket_type_name.clone(),
security_enabled,
heartbeat_ivl: options.heartbeat_ivl,
heartbeat_timeout: options.heartbeat_timeout,
handshake_timeout: options.handshake_ivl,
rcvtimeo: options.rcvtimeo,
sndtimeo: options.sndtimeo,
use_send_zerocopy: options.io_uring.send_zerocopy,
use_recv_multishot: options.io_uring.recv_multishot,
use_cork: options.tcp_cork,
#[cfg(feature = "noise_xx")]
use_noise_xx: options.noise_xx_options.enabled,
#[cfg(feature = "noise_xx")]
noise_xx_local_sk_bytes_for_engine: options.noise_xx_options.static_secret_key_bytes,
#[cfg(feature = "noise_xx")]
noise_xx_remote_pk_bytes_for_engine: options.noise_xx_options.remote_static_public_key_bytes,
#[cfg(feature = "curve")]
use_curve: options.curve_options.enabled,
#[cfg(feature = "curve")]
curve_local_secret_key: options.curve_options.secret_key,
#[cfg(feature = "curve")]
curve_remote_public_key: options.curve_options.server_public_key,
use_plain: options.plain_options.enabled,
plain_username_for_engine: options.plain_options.username.clone(),
plain_password_for_engine: options.plain_options.password.clone(),
max_msg_size: options.maxmsgsize,
}
}
}
pub(crate) fn parse_i32_option(value: &[u8]) -> Result<i32, ZmqError> {
let arr: [u8; 4] = value
.try_into()
.map_err(|_| ZmqError::InvalidOptionValue(0))?;
Ok(i32::from_ne_bytes(arr)) }
pub(crate) fn parse_bool_option(value: &[u8]) -> Result<bool, ZmqError> {
Ok(parse_i32_option(value)? == 1)
}
pub(crate) fn parse_duration_ms_option(value: &[u8]) -> Result<Option<Duration>, ZmqError> {
let val = parse_i32_option(value)?;
match val {
-1 => Ok(None), 0.. => Ok(Some(Duration::from_millis(val as u64))), _ => Err(ZmqError::InvalidOptionValue(0)), }
}
pub(crate) fn parse_secs_duration_option(value: &[u8]) -> Result<Option<Duration>, ZmqError> {
let val = parse_i32_option(value)?;
match val {
0..=i32::MAX => Ok(Some(Duration::from_secs(val as u64))),
_ => Err(ZmqError::InvalidOptionValue(0)),
}
}
pub(crate) fn parse_timeout_option(
value: &[u8],
option_id: i32,
) -> Result<Option<Duration>, ZmqError> {
let val = parse_i32_option(value).map_err(|_| ZmqError::InvalidOptionValue(option_id))?;
match val {
-1 => Ok(None), 0 => Ok(Some(Duration::ZERO)), 1.. => Ok(Some(Duration::from_millis(val as u64))), _ => Err(ZmqError::InvalidOptionValue(option_id)), }
}
pub(crate) fn parse_linger_option(value: &[u8]) -> Result<Option<Duration>, ZmqError> {
let val = parse_i32_option(value)?;
match val {
-1 => Ok(None), 0.. => Ok(Some(Duration::from_millis(val as u64))), _ => Err(ZmqError::InvalidOptionValue(LINGER)), }
}
pub(crate) fn parse_u32_option(value: &[u8]) -> Result<Option<u32>, ZmqError> {
let val = parse_i32_option(value)?; match val {
0..=i32::MAX => Ok(Some(val as u32)),
_ => Err(ZmqError::InvalidOptionValue(0)),
}
}
pub(crate) fn parse_keepalive_mode_option(value: &[u8]) -> Result<i32, ZmqError> {
let val = parse_i32_option(value)?;
if val >= -1 && val <= 1 {
Ok(val)
} else {
Err(ZmqError::InvalidOptionValue(TCP_KEEPALIVE))
}
}
pub(crate) fn parse_blob_option(value: &[u8]) -> Result<Blob, ZmqError> {
if value.len() > 255 {
Err(ZmqError::InvalidOptionValue(ROUTING_ID)) } else {
Ok(Blob::from(value.to_vec())) }
}
pub(crate) fn parse_heartbeat_option(
value: &[u8],
option_id: i32,
) -> Result<Option<Duration>, ZmqError> {
let val = parse_i32_option(value).map_err(|_| ZmqError::InvalidOptionValue(option_id))?;
match val {
0 => Ok(None), 1.. => Ok(Some(Duration::from_millis(val as u64))), _ => Err(ZmqError::InvalidOptionValue(option_id)), }
}
pub(crate) fn parse_handshake_option(
value: &[u8],
option_id: i32,
) -> Result<Option<Duration>, ZmqError> {
let val = parse_i32_option(value).map_err(|_| ZmqError::InvalidOptionValue(option_id))?;
match val {
0 => Ok(None),
1.. => Ok(Some(Duration::from_millis(val as u64))), _ => Err(ZmqError::InvalidOptionValue(option_id)), }
}
pub(crate) fn parse_reconnect_ivl_option(value: &[u8]) -> Result<Option<Duration>, ZmqError> {
let val = parse_i32_option(value)?;
match val {
-1 => Ok(None), 0 => Ok(None), 1.. => Ok(Some(Duration::from_millis(val as u64))),
_ => Err(ZmqError::InvalidOptionValue(RECONNECT_IVL)),
}
}
pub(crate) fn parse_reconnect_ivl_max_option(value: &[u8]) -> Result<Option<Duration>, ZmqError> {
let val = parse_i32_option(value)?;
match val {
0 => Ok(Some(Duration::ZERO)), 1.. => Ok(Some(Duration::from_millis(val as u64))),
_ => Err(ZmqError::InvalidOptionValue(RECONNECT_IVL_MAX)),
}
}
pub(crate) fn parse_maxmsgsize_option(value: &[u8]) -> Result<i64, ZmqError> {
let arr: [u8; 8] = value
.try_into()
.map_err(|_| ZmqError::InvalidOptionValue(MAXMSGSIZE))?;
let v = i64::from_ne_bytes(arr);
if v < -1 {
return Err(ZmqError::InvalidOptionValue(MAXMSGSIZE));
}
Ok(v)
}
pub(crate) fn parse_max_connections_option(
value: &[u8],
option_id: i32,
) -> Result<Option<usize>, ZmqError> {
let val = parse_i32_option(value).map_err(|_| ZmqError::InvalidOptionValue(option_id))?;
match val {
-1 => Ok(None), 0 => Err(ZmqError::InvalidOptionValue(option_id)), 1.. => Ok(Some(val as usize)),
_ => Err(ZmqError::InvalidOptionValue(option_id)),
}
}
pub(crate) fn parse_key_option<const N: usize>(
value: &[u8],
option_id: i32,
) -> Result<[u8; N], ZmqError> {
value.try_into().map_err(|_e| {
tracing::error!(
option_id = option_id,
expected_len = N,
actual_len = value.len(),
"Invalid key length provided for socket option."
);
ZmqError::InvalidOptionValue(option_id) })
}
pub(crate) fn parse_string_option(value: &[u8], option_id: i32) -> Result<String, ZmqError> {
String::from_utf8(value.to_vec()).map_err(|_| ZmqError::InvalidOptionValue(option_id))
}
pub(crate) fn apply_core_option_value(
options: &mut SocketOptions, option_id: i32,
value: &[u8],
) -> Result<(), ZmqError> {
tracing::debug!(
option_id,
value_len = value.len(),
"Applying core socket option"
);
match option_id {
SNDHWM => options.sndhwm = parse_i32_option(value)?.max(0) as usize,
RCVHWM => options.rcvhwm = parse_i32_option(value)?.max(0) as usize,
LINGER => options.linger = parse_linger_option(value)?,
ROUTING_ID => options.routing_id = Some(parse_blob_option(value)?),
RECONNECT_IVL => options.reconnect_ivl = parse_reconnect_ivl_option(value)?,
RECONNECT_IVL_MAX => options.reconnect_ivl_max = parse_reconnect_ivl_max_option(value)?,
RCVTIMEO => options.rcvtimeo = parse_timeout_option(value, option_id)?,
SNDTIMEO => options.sndtimeo = parse_timeout_option(value, option_id)?,
TCP_KEEPALIVE => options.tcp_keepalive_enabled = parse_keepalive_mode_option(value)?,
TCP_KEEPALIVE_IDLE => options.tcp_keepalive_idle = parse_secs_duration_option(value)?,
TCP_KEEPALIVE_CNT => options.tcp_keepalive_count = parse_u32_option(value)?,
TCP_KEEPALIVE_INTVL => options.tcp_keepalive_interval = parse_secs_duration_option(value)?,
HEARTBEAT_IVL => options.heartbeat_ivl = parse_heartbeat_option(value, option_id)?,
HEARTBEAT_TIMEOUT => options.heartbeat_timeout = parse_heartbeat_option(value, option_id)?,
HANDSHAKE_IVL => options.handshake_ivl = parse_handshake_option(value, option_id)?,
MAXMSGSIZE => options.maxmsgsize = parse_maxmsgsize_option(value)?,
MAX_CONNECTIONS => options.max_connections = parse_max_connections_option(value, option_id)?,
TCP_CORK => options.tcp_cork = parse_bool_option(value)?,
ZAP_DOMAIN => options.zap_domain = Some(parse_string_option(value, option_id)?),
PLAIN_SERVER => {
options.plain_options.server_role = Some(parse_bool_option(value)?);
options.plain_options.enabled = true; }
PLAIN_USERNAME => {
options.plain_options.username = Some(parse_string_option(value, option_id)?);
options.plain_options.enabled = true;
}
PLAIN_PASSWORD => {
options.plain_options.password = Some(parse_string_option(value, option_id)?);
options.plain_options.enabled = true;
}
#[cfg(feature = "curve")]
CURVE_SERVER => {
options.curve_options.server_role = parse_bool_option(value)?;
options.curve_options.enabled = true; }
#[cfg(feature = "curve")]
CURVE_SECRET_KEY => {
options.curve_options.secret_key = Some(parse_key_option::<32>(value, option_id)?);
options.curve_options.enabled = true;
}
#[cfg(feature = "curve")]
CURVE_SERVER_KEY => {
options.curve_options.server_public_key = Some(parse_key_option::<32>(value, option_id)?);
options.curve_options.enabled = true;
}
#[cfg(feature = "noise_xx")]
NOISE_XX_ENABLED => options.noise_xx_options.enabled = parse_bool_option(value)?,
#[cfg(feature = "noise_xx")]
NOISE_XX_STATIC_SECRET_KEY => options.noise_xx_options.static_secret_key_bytes = Some(parse_key_option::<32>(value, option_id)?),
#[cfg(feature = "noise_xx")]
NOISE_XX_REMOTE_STATIC_PUBLIC_KEY => options.noise_xx_options.remote_static_public_key_bytes = Some(parse_key_option::<32>(value, option_id)?),
#[cfg(feature = "io-uring")]
IO_URING_SESSION_ENABLED => options.io_uring.session_enabled = parse_bool_option(value)?,
#[cfg(feature = "io-uring")]
IO_URING_SNDZEROCOPY => options.io_uring.send_zerocopy = parse_bool_option(value)?,
#[cfg(feature = "io-uring")]
IO_URING_RCVMULTISHOT => options.io_uring.recv_multishot = parse_bool_option(value)?,
SUBSCRIBE | UNSUBSCRIBE | LAST_ENDPOINT | ROUTER_MANDATORY |
AUTO_DELIMITER | 16 => return Err(ZmqError::UnsupportedOption(option_id)),
_ => return Err(ZmqError::InvalidOption(option_id)), }
Ok(())
}
pub(crate) fn retrieve_core_option_value(
options: &SocketOptions, core_s_reader: &CoreState, option_id: i32,
) -> Result<Vec<u8>, ZmqError> {
match option_id {
SNDHWM => Ok((options.sndhwm as i32).to_ne_bytes().to_vec()),
RCVHWM => Ok((options.rcvhwm as i32).to_ne_bytes().to_vec()),
LINGER => Ok(options.linger.map_or(-1, |d| d.as_millis().try_into().unwrap_or(i32::MAX)).to_ne_bytes().to_vec()),
ROUTING_ID => options.routing_id.as_ref().map(|b| b.to_vec()).ok_or(ZmqError::Internal("Option ROUTING_ID not set".into())),
RECONNECT_IVL => Ok(options.reconnect_ivl.map_or(0, |d| d.as_millis() as i32).to_ne_bytes().to_vec()), RECONNECT_IVL_MAX => Ok(options.reconnect_ivl_max.map_or(0, |d| d.as_millis() as i32).to_ne_bytes().to_vec()), RCVTIMEO => Ok(options.rcvtimeo.map_or(-1, |d| d.as_millis().try_into().unwrap_or(i32::MAX)).to_ne_bytes().to_vec()),
SNDTIMEO => Ok(options.sndtimeo.map_or(-1, |d| d.as_millis().try_into().unwrap_or(i32::MAX)).to_ne_bytes().to_vec()),
LAST_ENDPOINT => Ok(core_s_reader.last_bound_endpoint.as_deref().unwrap_or("").as_bytes().to_vec()),
TCP_KEEPALIVE => Ok(options.tcp_keepalive_enabled.to_ne_bytes().to_vec()),
TCP_KEEPALIVE_IDLE => Ok(options.tcp_keepalive_idle.map_or(0, |d| d.as_secs() as i32).to_ne_bytes().to_vec()),
TCP_KEEPALIVE_CNT => Ok(options.tcp_keepalive_count.map_or(0, |c| c as i32).to_ne_bytes().to_vec()),
TCP_KEEPALIVE_INTVL => Ok(options.tcp_keepalive_interval.map_or(0, |d| d.as_secs() as i32).to_ne_bytes().to_vec()),
HEARTBEAT_IVL => Ok(options.heartbeat_ivl.map_or(0, |d| d.as_millis() as i32).to_ne_bytes().to_vec()),
HEARTBEAT_TIMEOUT => Ok(options.heartbeat_timeout.map_or(0, |d| d.as_millis() as i32).to_ne_bytes().to_vec()),
HANDSHAKE_IVL => Ok(options.handshake_ivl.map_or(0, |d| d.as_millis() as i32).to_ne_bytes().to_vec()),
MAXMSGSIZE => Ok(options.maxmsgsize.to_ne_bytes().to_vec()),
MAX_CONNECTIONS => Ok(options.max_connections.map_or(-1, |v| v as i32).to_ne_bytes().to_vec()),
TCP_CORK => Ok((options.tcp_cork as i32).to_ne_bytes().to_vec()),
ZAP_DOMAIN => options.zap_domain.as_ref().map(|s| s.as_bytes().to_vec()).ok_or(ZmqError::Internal("Option ZAP_DOMAIN not set".into())),
PLAIN_SERVER => options.plain_options.server_role.map(|b| (b as i32).to_ne_bytes().to_vec()).ok_or(ZmqError::Internal("Option PLAIN_SERVER not set".into())),
PLAIN_USERNAME => options.plain_options.username.as_ref().map(|s| s.as_bytes().to_vec()).ok_or(ZmqError::Internal("Option PLAIN_USERNAME not set".into())),
PLAIN_PASSWORD => Err(ZmqError::PermissionDenied("PLAIN_PASSWORD is write-only".into())),
#[cfg(feature = "noise_xx")]
NOISE_XX_ENABLED => Ok((options.noise_xx_options.enabled as i32).to_ne_bytes().to_vec()),
#[cfg(feature = "noise_xx")]
NOISE_XX_STATIC_SECRET_KEY => Err(ZmqError::PermissionDenied("NOISE_XX_STATIC_SECRET_KEY is write-only".into())),
#[cfg(feature = "noise_xx")]
NOISE_XX_REMOTE_STATIC_PUBLIC_KEY => options.noise_xx_options.remote_static_public_key_bytes.map(|k| k.to_vec()).ok_or(ZmqError::Internal("Option NOISE_XX_REMOTE_STATIC_PUBLIC_KEY not set".into())),
#[cfg(feature = "io-uring")]
IO_URING_SESSION_ENABLED => Ok((options.io_uring.session_enabled as i32).to_ne_bytes().to_vec()),
#[cfg(feature = "io-uring")]
IO_URING_SNDZEROCOPY => Ok((options.io_uring.send_zerocopy as i32).to_ne_bytes().to_vec()),
#[cfg(feature = "io-uring")]
IO_URING_RCVMULTISHOT => Ok((options.io_uring.recv_multishot as i32).to_ne_bytes().to_vec()),
16 => Ok((core_s_reader.socket_type as i32).to_ne_bytes().to_vec()),
SUBSCRIBE | UNSUBSCRIBE | ROUTER_MANDATORY | AUTO_DELIMITER => Err(ZmqError::UnsupportedOption(option_id)),
_ => Err(ZmqError::InvalidOption(option_id)),
}
}