rustzmq2 0.1.0

A native async Rust implementation of ZeroMQ
Documentation
#[cfg(feature = "inproc")]
pub(crate) mod inproc;
#[cfg(all(feature = "ipc", feature = "tokio", target_family = "unix"))]
mod ipc;
#[cfg(feature = "tcp")]
mod runtime_tcp;
#[cfg(feature = "tcp")]
mod socks;
#[cfg(feature = "tcp")]
mod tcp;

#[cfg(any(
    feature = "tcp",
    all(feature = "ipc", feature = "tokio", target_family = "unix")
))]
use crate::codec::RuntimeFramedIo as FramedIo;

use crate::endpoint::Endpoint;
use crate::task_handle::TaskHandle;
use crate::ZmqResult;

/// TCP socket knobs to apply on connect/accept. Extracted from
/// `SocketOptions` to keep the transport layer unaware of the full options
/// struct. `None` fields are skipped (OS defaults).
#[cfg(feature = "tcp")]
#[derive(Clone, Default)]
pub(crate) struct TcpConfig {
    pub send_buffer: Option<usize>,
    pub receive_buffer: Option<usize>,
    pub keepalive: Option<bool>,
    pub keepalive_idle: Option<std::time::Duration>,
    pub keepalive_interval: Option<std::time::Duration>,
    pub keepalive_count: Option<u32>,
    pub max_retransmit: Option<std::time::Duration>,
    pub type_of_service: Option<u32>,
    pub bind_to_device: Option<String>,
    pub socks_proxy: Option<String>,
    pub socks_username: Option<String>,
    pub socks_password: Option<String>,
}

#[cfg(feature = "tcp")]
impl TcpConfig {
    pub(crate) fn from_options(opts: &crate::SocketOptions) -> Self {
        Self {
            send_buffer: opts.tcp_send_buffer,
            receive_buffer: opts.tcp_receive_buffer,
            keepalive: opts.tcp_keepalive,
            keepalive_idle: opts.tcp_keepalive_idle,
            keepalive_interval: opts.tcp_keepalive_interval,
            keepalive_count: opts.tcp_keepalive_count,
            max_retransmit: opts.tcp_max_retransmit,
            type_of_service: opts.type_of_service,
            bind_to_device: opts.bind_to_device.clone(),
            socks_proxy: opts.socks_proxy.clone(),
            socks_username: opts.socks_username.clone(),
            socks_password: opts.socks_password.clone(),
        }
    }

    /// Apply every configured knob to `sock`. Errors are logged and ignored
    /// (matching libzmq: option application is best-effort).
    pub(crate) fn apply(&self, sock: socket2::SockRef<'_>) {
        if let Some(n) = self.send_buffer {
            if let Err(e) = sock.set_send_buffer_size(n) {
                log::warn!("set_send_buffer_size({}): {}", n, e);
            }
        }
        if let Some(n) = self.receive_buffer {
            if let Err(e) = sock.set_recv_buffer_size(n) {
                log::warn!("set_recv_buffer_size({}): {}", n, e);
            }
        }
        if let Some(true) = self.keepalive {
            let mut ka = socket2::TcpKeepalive::new();
            if let Some(d) = self.keepalive_idle {
                ka = ka.with_time(d);
            }
            #[cfg(not(any(target_os = "openbsd", target_os = "haiku", target_os = "solaris")))]
            if let Some(d) = self.keepalive_interval {
                ka = ka.with_interval(d);
            }
            #[cfg(not(any(
                target_os = "openbsd",
                target_os = "redox",
                target_os = "solaris",
                windows
            )))]
            if let Some(n) = self.keepalive_count {
                ka = ka.with_retries(n);
            }
            if let Err(e) = sock.set_tcp_keepalive(&ka) {
                log::warn!("set_tcp_keepalive: {}", e);
            }
        }
        if let Some(tos) = self.type_of_service {
            if let Err(e) = sock.set_tos_v4(tos) {
                log::warn!("set_tos_v4({}): {}", tos, e);
            }
        }
        #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
        if let Some(iface) = self.bind_to_device.as_deref() {
            if let Err(e) = sock.bind_device(Some(iface.as_bytes())) {
                log::warn!("bind_device({}): {}", iface, e);
            }
        }
        #[cfg(any(
            target_os = "android",
            target_os = "fuchsia",
            target_os = "linux",
            target_os = "freebsd"
        ))]
        if let Some(d) = self.max_retransmit {
            if let Err(e) = sock.set_tcp_user_timeout(Some(d)) {
                log::warn!("set_tcp_user_timeout: {}", e);
            }
        }
        // Silence unused-field warnings on platforms that don't support them.
        #[cfg(not(any(
            target_os = "android",
            target_os = "fuchsia",
            target_os = "linux",
            target_os = "freebsd"
        )))]
        let _ = &self.max_retransmit;
        #[cfg(not(any(target_os = "android", target_os = "fuchsia", target_os = "linux")))]
        let _ = &self.bind_to_device;
    }
}

pub struct AcceptStopHandle {
    pub(crate) task: TaskHandle<()>,
    /// Dropped after the task handle, allowing transports to run sync cleanup
    /// (e.g. inproc removes its name from the global registry) when the handle
    /// is dropped.
    _guard: Option<Box<dyn std::any::Any + Send + Sync>>,
}

impl AcceptStopHandle {
    #[cfg(any(
        all(feature = "tcp", feature = "tokio"),
        all(feature = "tcp", feature = "smol"),
        all(feature = "ipc", feature = "tokio", target_family = "unix"),
    ))]
    pub(crate) fn new(task: TaskHandle<()>) -> Self {
        Self { task, _guard: None }
    }

    #[cfg(feature = "inproc")]
    pub(crate) fn with_guard(
        task: TaskHandle<()>,
        guard: Box<dyn std::any::Any + Send + Sync>,
    ) -> Self {
        Self {
            task,
            _guard: Some(guard),
        }
    }
}

/// The result of a successful `connect` call. Wire transports return a
/// `FramedIo` ready for the ZMTP handshake; inproc returns a channel pair
/// that bypasses wire encoding entirely.
pub(crate) enum TransportIo {
    /// TCP or IPC: framed stream + sink, needs ZMTP handshake.
    #[cfg(any(feature = "tcp", all(feature = "ipc", target_family = "unix")))]
    Framed(Box<FramedIo>, Endpoint),
    /// In-process: direct channel pair, no handshake needed.
    #[cfg(feature = "inproc")]
    Inproc(inproc::InprocPeer),
}

/// Connect to the given endpoint, returning a `TransportIo` appropriate for
/// the transport (framed wire or direct inproc channel).
pub(crate) async fn connect(
    endpoint: &Endpoint,
    #[cfg(feature = "tcp")] connect_timeout: Option<std::time::Duration>,
    #[cfg(not(feature = "tcp"))] _connect_timeout: Option<std::time::Duration>,
    #[cfg(feature = "tcp")] tcp_cfg: &TcpConfig,
    #[cfg(not(feature = "tcp"))] _tcp_cfg: &(),
) -> ZmqResult<TransportIo> {
    match endpoint {
        Endpoint::Tcp(_host, _port) => {
            #[cfg(feature = "tcp")]
            {
                let (io, ep) = tcp::connect(_host, *_port, connect_timeout, tcp_cfg).await?;
                Ok(TransportIo::Framed(Box::new(io), ep))
            }
            #[cfg(not(feature = "tcp"))]
            panic!("feature \"tcp\" is not enabled")
        }
        Endpoint::Ipc(_path) => {
            #[cfg(all(feature = "ipc", feature = "tokio", target_family = "unix"))]
            {
                if let Some(path) = _path {
                    let (io, ep) = ipc::connect(path).await?;
                    Ok(TransportIo::Framed(Box::new(io), ep))
                } else {
                    Err(crate::error::ZmqError::Socket(
                        "Cannot connect to an unnamed ipc socket".into(),
                    ))
                }
            }
            #[cfg(not(all(feature = "ipc", feature = "tokio", target_family = "unix")))]
            panic!("IPC transport is not available on this platform")
        }
        Endpoint::Inproc(_name) => {
            #[cfg(feature = "inproc")]
            {
                let peer = inproc::connect(_name).await?;
                Ok(TransportIo::Inproc(peer))
            }
            #[cfg(not(feature = "inproc"))]
            panic!("feature \"inproc\" is not enabled")
        }
    }
}

/// Spawn an accept task at the given endpoint.
///
/// `cback` is invoked for each accepted connection with a `TransportIo` value.
/// Returns the resolved bound endpoint and a handle to stop the accept task.
pub(crate) async fn begin_accept<T>(
    endpoint: Endpoint,
    #[cfg(feature = "tcp")] backlog: u32,
    #[cfg(not(feature = "tcp"))] _backlog: u32,
    #[cfg(feature = "tcp")] ipv6: bool,
    #[cfg(not(feature = "tcp"))] _ipv6: bool,
    #[cfg(feature = "tcp")] tcp_cfg: TcpConfig,
    #[cfg(not(feature = "tcp"))] _tcp_cfg: (),
    #[allow(unused_variables)] cback: impl Fn(ZmqResult<TransportIo>) -> T + Send + 'static,
) -> ZmqResult<(Endpoint, AcceptStopHandle)>
where
    T: std::future::Future<Output = ()> + Send + 'static,
{
    match endpoint {
        Endpoint::Tcp(_host, _port) => {
            #[cfg(feature = "tcp")]
            {
                let wrapped = move |r: ZmqResult<(FramedIo, Endpoint)>| {
                    cback(r.map(|(io, ep)| TransportIo::Framed(Box::new(io), ep)))
                };
                tcp::begin_accept(_host, _port, backlog, ipv6, tcp_cfg, wrapped).await
            }
            #[cfg(not(feature = "tcp"))]
            panic!("feature \"tcp\" is not enabled")
        }
        Endpoint::Ipc(_path) => {
            #[cfg(all(feature = "ipc", feature = "tokio", target_family = "unix"))]
            {
                if let Some(path) = _path {
                    let wrapped = move |r: ZmqResult<(FramedIo, Endpoint)>| {
                        cback(r.map(|(io, ep)| TransportIo::Framed(Box::new(io), ep)))
                    };
                    return ipc::begin_accept(&path, wrapped).await;
                } else {
                    Err(crate::error::ZmqError::Socket(
                        "Cannot begin accepting peers at an unnamed ipc socket".into(),
                    ))
                }
            }
            #[cfg(not(all(feature = "ipc", feature = "tokio", target_family = "unix")))]
            panic!("IPC transport is not available on this platform")
        }
        Endpoint::Inproc(_name) => {
            #[cfg(feature = "inproc")]
            {
                let wrapped =
                    move |r: ZmqResult<inproc::InprocPeer>| cback(r.map(TransportIo::Inproc));
                inproc::begin_accept(_name, wrapped).await
            }
            #[cfg(not(feature = "inproc"))]
            panic!("feature \"inproc\" is not enabled")
        }
    }
}