#[cfg(feature = "inproc")]
pub(crate) mod inproc;
#[cfg(all(feature = "ipc", feature = "tokio", target_family = "unix"))]
mod ipc;
#[cfg(feature = "tcp")]
mod runtime_tcp;
#[cfg(feature = "tcp")]
mod socks;
#[cfg(feature = "tcp")]
mod tcp;
#[cfg(any(
feature = "tcp",
all(feature = "ipc", feature = "tokio", target_family = "unix")
))]
use crate::codec::RuntimeFramedIo as FramedIo;
use crate::endpoint::Endpoint;
use crate::task_handle::TaskHandle;
use crate::ZmqResult;
#[cfg(feature = "tcp")]
#[derive(Clone, Default)]
pub(crate) struct TcpConfig {
pub send_buffer: Option<usize>,
pub receive_buffer: Option<usize>,
pub keepalive: Option<bool>,
pub keepalive_idle: Option<std::time::Duration>,
pub keepalive_interval: Option<std::time::Duration>,
pub keepalive_count: Option<u32>,
pub max_retransmit: Option<std::time::Duration>,
pub type_of_service: Option<u32>,
pub bind_to_device: Option<String>,
pub socks_proxy: Option<String>,
pub socks_username: Option<String>,
pub socks_password: Option<String>,
}
#[cfg(feature = "tcp")]
impl TcpConfig {
pub(crate) fn from_options(opts: &crate::SocketOptions) -> Self {
Self {
send_buffer: opts.tcp_send_buffer,
receive_buffer: opts.tcp_receive_buffer,
keepalive: opts.tcp_keepalive,
keepalive_idle: opts.tcp_keepalive_idle,
keepalive_interval: opts.tcp_keepalive_interval,
keepalive_count: opts.tcp_keepalive_count,
max_retransmit: opts.tcp_max_retransmit,
type_of_service: opts.type_of_service,
bind_to_device: opts.bind_to_device.clone(),
socks_proxy: opts.socks_proxy.clone(),
socks_username: opts.socks_username.clone(),
socks_password: opts.socks_password.clone(),
}
}
pub(crate) fn apply(&self, sock: socket2::SockRef<'_>) {
if let Some(n) = self.send_buffer {
if let Err(e) = sock.set_send_buffer_size(n) {
log::warn!("set_send_buffer_size({}): {}", n, e);
}
}
if let Some(n) = self.receive_buffer {
if let Err(e) = sock.set_recv_buffer_size(n) {
log::warn!("set_recv_buffer_size({}): {}", n, e);
}
}
if let Some(true) = self.keepalive {
let mut ka = socket2::TcpKeepalive::new();
if let Some(d) = self.keepalive_idle {
ka = ka.with_time(d);
}
#[cfg(not(any(target_os = "openbsd", target_os = "haiku", target_os = "solaris")))]
if let Some(d) = self.keepalive_interval {
ka = ka.with_interval(d);
}
#[cfg(not(any(
target_os = "openbsd",
target_os = "redox",
target_os = "solaris",
windows
)))]
if let Some(n) = self.keepalive_count {
ka = ka.with_retries(n);
}
if let Err(e) = sock.set_tcp_keepalive(&ka) {
log::warn!("set_tcp_keepalive: {}", e);
}
}
if let Some(tos) = self.type_of_service {
if let Err(e) = sock.set_tos_v4(tos) {
log::warn!("set_tos_v4({}): {}", tos, e);
}
}
#[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
if let Some(iface) = self.bind_to_device.as_deref() {
if let Err(e) = sock.bind_device(Some(iface.as_bytes())) {
log::warn!("bind_device({}): {}", iface, e);
}
}
#[cfg(any(
target_os = "android",
target_os = "fuchsia",
target_os = "linux",
target_os = "freebsd"
))]
if let Some(d) = self.max_retransmit {
if let Err(e) = sock.set_tcp_user_timeout(Some(d)) {
log::warn!("set_tcp_user_timeout: {}", e);
}
}
#[cfg(not(any(
target_os = "android",
target_os = "fuchsia",
target_os = "linux",
target_os = "freebsd"
)))]
let _ = &self.max_retransmit;
#[cfg(not(any(target_os = "android", target_os = "fuchsia", target_os = "linux")))]
let _ = &self.bind_to_device;
}
}
pub struct AcceptStopHandle {
pub(crate) task: TaskHandle<()>,
_guard: Option<Box<dyn std::any::Any + Send + Sync>>,
}
impl AcceptStopHandle {
#[cfg(any(
all(feature = "tcp", feature = "tokio"),
all(feature = "tcp", feature = "smol"),
all(feature = "ipc", feature = "tokio", target_family = "unix"),
))]
pub(crate) fn new(task: TaskHandle<()>) -> Self {
Self { task, _guard: None }
}
#[cfg(feature = "inproc")]
pub(crate) fn with_guard(
task: TaskHandle<()>,
guard: Box<dyn std::any::Any + Send + Sync>,
) -> Self {
Self {
task,
_guard: Some(guard),
}
}
}
pub(crate) enum TransportIo {
#[cfg(any(feature = "tcp", all(feature = "ipc", target_family = "unix")))]
Framed(Box<FramedIo>, Endpoint),
#[cfg(feature = "inproc")]
Inproc(inproc::InprocPeer),
}
pub(crate) async fn connect(
endpoint: &Endpoint,
#[cfg(feature = "tcp")] connect_timeout: Option<std::time::Duration>,
#[cfg(not(feature = "tcp"))] _connect_timeout: Option<std::time::Duration>,
#[cfg(feature = "tcp")] tcp_cfg: &TcpConfig,
#[cfg(not(feature = "tcp"))] _tcp_cfg: &(),
) -> ZmqResult<TransportIo> {
match endpoint {
Endpoint::Tcp(_host, _port) => {
#[cfg(feature = "tcp")]
{
let (io, ep) = tcp::connect(_host, *_port, connect_timeout, tcp_cfg).await?;
Ok(TransportIo::Framed(Box::new(io), ep))
}
#[cfg(not(feature = "tcp"))]
panic!("feature \"tcp\" is not enabled")
}
Endpoint::Ipc(_path) => {
#[cfg(all(feature = "ipc", feature = "tokio", target_family = "unix"))]
{
if let Some(path) = _path {
let (io, ep) = ipc::connect(path).await?;
Ok(TransportIo::Framed(Box::new(io), ep))
} else {
Err(crate::error::ZmqError::Socket(
"Cannot connect to an unnamed ipc socket".into(),
))
}
}
#[cfg(not(all(feature = "ipc", feature = "tokio", target_family = "unix")))]
panic!("IPC transport is not available on this platform")
}
Endpoint::Inproc(_name) => {
#[cfg(feature = "inproc")]
{
let peer = inproc::connect(_name).await?;
Ok(TransportIo::Inproc(peer))
}
#[cfg(not(feature = "inproc"))]
panic!("feature \"inproc\" is not enabled")
}
}
}
pub(crate) async fn begin_accept<T>(
endpoint: Endpoint,
#[cfg(feature = "tcp")] backlog: u32,
#[cfg(not(feature = "tcp"))] _backlog: u32,
#[cfg(feature = "tcp")] ipv6: bool,
#[cfg(not(feature = "tcp"))] _ipv6: bool,
#[cfg(feature = "tcp")] tcp_cfg: TcpConfig,
#[cfg(not(feature = "tcp"))] _tcp_cfg: (),
#[allow(unused_variables)] cback: impl Fn(ZmqResult<TransportIo>) -> T + Send + 'static,
) -> ZmqResult<(Endpoint, AcceptStopHandle)>
where
T: std::future::Future<Output = ()> + Send + 'static,
{
match endpoint {
Endpoint::Tcp(_host, _port) => {
#[cfg(feature = "tcp")]
{
let wrapped = move |r: ZmqResult<(FramedIo, Endpoint)>| {
cback(r.map(|(io, ep)| TransportIo::Framed(Box::new(io), ep)))
};
tcp::begin_accept(_host, _port, backlog, ipv6, tcp_cfg, wrapped).await
}
#[cfg(not(feature = "tcp"))]
panic!("feature \"tcp\" is not enabled")
}
Endpoint::Ipc(_path) => {
#[cfg(all(feature = "ipc", feature = "tokio", target_family = "unix"))]
{
if let Some(path) = _path {
let wrapped = move |r: ZmqResult<(FramedIo, Endpoint)>| {
cback(r.map(|(io, ep)| TransportIo::Framed(Box::new(io), ep)))
};
return ipc::begin_accept(&path, wrapped).await;
} else {
Err(crate::error::ZmqError::Socket(
"Cannot begin accepting peers at an unnamed ipc socket".into(),
))
}
}
#[cfg(not(all(feature = "ipc", feature = "tokio", target_family = "unix")))]
panic!("IPC transport is not available on this platform")
}
Endpoint::Inproc(_name) => {
#[cfg(feature = "inproc")]
{
let wrapped =
move |r: ZmqResult<inproc::InprocPeer>| cback(r.map(TransportIo::Inproc));
inproc::begin_accept(_name, wrapped).await
}
#[cfg(not(feature = "inproc"))]
panic!("feature \"inproc\" is not enabled")
}
}
}