rustzmq2 0.1.0

A native async Rust implementation of ZeroMQ
Documentation
//! Front-end socket façade: the `Socket` trait, configuration, events.
//!
//! The I/O machinery lives in `crate::engine`; this module's job is to
//! describe the shape of a ZMQ socket to user code (bind / connect /
//! unbind / close / monitor) and to carry configuration to the backend.

mod backend;
pub(crate) mod common;
pub(crate) mod core;
mod event;
pub mod family;
pub(crate) mod handshake;
pub(crate) mod kind;
mod options;
mod socket_type;
pub(crate) mod topic_router;

pub use backend::{CaptureSocket, MultiPeerBackend, SocketBackend, SocketRecv, SocketSend};
pub use event::SocketEvent;
pub use options::{ReconnectStop, SocketBuilder, SocketOptions};
pub use socket_type::SocketType;

use crate::endpoint::Endpoint;
use crate::transport::{self, AcceptStopHandle, TransportIo};
use crate::{ZmqError, ZmqResult};

use futures::channel::mpsc;

use std::collections::HashMap;
use std::sync::Arc;

/// Core trait implemented by every ZMQ socket type.
///
/// Provides `bind`, `connect`, `unbind`, `monitor`, and `close`. Use
/// [`SocketSend`] and [`SocketRecv`] to send and receive messages.
pub trait Socket: Sized + Send + common::HasCommon<Backend = <Self as Socket>::Backend> {
    type Backend: MultiPeerBackend + 'static;

    /// Create a socket with default options.
    fn new() -> Self {
        Self::with_options(SocketOptions::default())
    }

    /// Create a [`SocketBuilder`] to configure the socket before constructing it.
    fn builder() -> options::SocketBuilder<Self> {
        options::SocketBuilder::new()
    }

    /// Create a socket with the given options.
    fn with_options(options: SocketOptions) -> Self;

    #[doc(hidden)]
    fn backend(&self) -> Arc<<Self as Socket>::Backend> {
        self.common().backend.clone()
    }

    /// Binds to the endpoint and starts a coroutine to accept new connections
    /// on it. Returns the resolved bound endpoint.
    ///
    /// `endpoint` accepts anything that converts into an [`Endpoint`]:
    /// `&str`, `String`, or an already-parsed `Endpoint`. Implement
    /// `TryFrom<MyType> for Endpoint` to extend this set.
    ///
    /// See [`zmq_bind(3)`](https://libzmq.readthedocs.io/en/latest/zmq_bind.html).
    fn bind<E>(
        &mut self,
        endpoint: E,
    ) -> impl std::future::Future<Output = ZmqResult<Endpoint>> + Send
    where
        E: TryInto<Endpoint> + Send,
        E::Error: Into<ZmqError>,
    {
        async move {
            let endpoint = endpoint.try_into().map_err(Into::into)?;
            #[cfg(feature = "inproc")]
            if matches!(endpoint, Endpoint::Inproc(_)) {
                warn_inproc_ignored_options(self, &endpoint);
            }
            let cloned_backend = self.backend();

            let cback = move |result: ZmqResult<TransportIo>| {
                let b = cloned_backend.clone();
                async move {
                    let result = match result {
                        #[cfg(any(feature = "tcp", all(feature = "ipc", target_family = "unix")))]
                        Ok(TransportIo::Framed(io, ep)) => {
                            let peer_addr = Some(ep.to_string());
                            handshake::peer_connected(*io, b.clone(), None, peer_addr)
                                .await
                                .map(|peer_id| (ep, peer_id))
                        }
                        #[cfg(feature = "inproc")]
                        Ok(TransportIo::Inproc(peer)) => {
                            let ep = peer.endpoint.clone();
                            // Placeholder id: ROUTER swaps it for the remote's
                            // advertised routing_id inside peer_connected_inproc
                            // once the handshake resolves.
                            let peer_id = PeerIdentity::new();
                            b.clone()
                                .peer_connected_inproc(&peer_id, peer, Some(ep.clone()))
                                .await
                                .map(|()| (ep, peer_id))
                        }
                        Err(e) => Err(e),
                    };
                    match result {
                        Ok((ep, peer_id)) => {
                            if let Some(monitor) = b.monitor().lock().as_mut() {
                                let _ = monitor.try_send(SocketEvent::Accepted(ep, peer_id));
                            }
                        }
                        Err(e) => {
                            if let Some(monitor) = b.monitor().lock().as_mut() {
                                let _ = monitor.try_send(SocketEvent::AcceptFailed(e));
                            }
                        }
                    }
                }
            };

            let backend = self.backend();
            let backlog = backend.socket_options().backlog;
            let ipv6 = backend.socket_options().ipv6;
            #[cfg(feature = "tcp")]
            let tcp_cfg = crate::transport::TcpConfig::from_options(backend.socket_options());
            #[cfg(not(feature = "tcp"))]
            let tcp_cfg: () = ();
            let (endpoint, stop_handle) =
                transport::begin_accept(endpoint, backlog, ipv6, tcp_cfg, cback).await?;

            if let Some(monitor) = self.backend().monitor().lock().as_mut() {
                let _ = monitor.try_send(SocketEvent::Listening(endpoint.clone()));
            }
            self.binds().insert(endpoint.clone(), stop_handle);
            self.set_last_endpoint(endpoint.clone());
            Ok(endpoint)
        }
    }

    #[doc(hidden)]
    fn binds(&mut self) -> &mut HashMap<Endpoint, AcceptStopHandle> {
        &mut self.common_mut().binds
    }

    #[doc(hidden)]
    fn set_last_endpoint(&mut self, endpoint: Endpoint) {
        self.common_mut().last_endpoint = Some(endpoint);
    }

    /// Returns the last endpoint resolved by [`bind`](Socket::bind), or `None`
    /// if the socket has not been bound.
    fn last_endpoint(&self) -> Option<&Endpoint> {
        self.common().last_endpoint.as_ref()
    }

    /// Stop accepting connections on the given endpoint.
    fn unbind(
        &mut self,
        endpoint: Endpoint,
    ) -> impl std::future::Future<Output = ZmqResult<()>> + Send {
        async move {
            let stop_handle = self.binds().remove(&endpoint);
            let stop_handle = stop_handle.ok_or(ZmqError::NoSuchBind(endpoint))?;
            stop_handle.task.shutdown().await
        }
    }

    /// Stop accepting connections on all bound endpoints.
    fn unbind_all(&mut self) -> impl std::future::Future<Output = Vec<ZmqError>> + Send {
        async move {
            let mut errs = Vec::new();
            let endpoints: Vec<_> = self.binds().keys().cloned().collect();
            for endpoint in endpoints {
                if let Err(err) = self.unbind(endpoint).await {
                    errs.push(err);
                }
            }
            errs
        }
    }

    /// Connects to the given endpoint.
    ///
    /// `endpoint` accepts anything that converts into an [`Endpoint`]:
    /// `&str`, `String`, or an already-parsed `Endpoint`. Implement
    /// `TryFrom<MyType> for Endpoint` to extend this set.
    ///
    /// See [`zmq_connect(3)`](https://libzmq.readthedocs.io/en/latest/zmq_connect.html).
    fn connect<E>(&mut self, endpoint: E) -> impl std::future::Future<Output = ZmqResult<()>> + Send
    where
        E: TryInto<Endpoint> + Send,
        E::Error: Into<ZmqError>,
    {
        async move {
            let endpoint = endpoint.try_into().map_err(Into::into)?;
            #[cfg(feature = "inproc")]
            if matches!(endpoint, Endpoint::Inproc(_)) {
                warn_inproc_ignored_options(self, &endpoint);
            }
            let backend = self.backend();
            let (resolved, peer_id) =
                handshake::connect_peer_forever(endpoint, backend, None).await?;
            if let Some(monitor) = self.backend().monitor().lock().as_mut() {
                let _ = monitor.try_send(SocketEvent::Connected(resolved, peer_id));
            }
            Ok(())
        }
    }

    /// Subscribe to [`SocketEvent`]s. Returns a channel receiver that yields
    /// events as connections are made, accepted, or dropped.
    ///
    /// The receiver is a `futures::channel::mpsc::Receiver`; consume via
    /// [`StreamExt::next`](futures::StreamExt::next). Call this *before*
    /// `bind`/`connect` to avoid missing the first events.
    ///
    /// [`SocketEvent`] is `#[non_exhaustive]` — always include a `_` arm.
    ///
    /// # Example
    ///
    /// ```rust,no_run
    /// use rustzmq2::prelude::*;
    /// use futures::StreamExt;
    ///
    /// #[tokio::main]
    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
    ///     let mut rep = rustzmq2::RepSocket::new();
    ///     let mut events = rep.monitor();
    ///     tokio::spawn(async move {
    ///         while let Some(event) = events.next().await {
    ///             match event {
    ///                 SocketEvent::Listening(ep)    => println!("listening on {ep}"),
    ///                 SocketEvent::Accepted(ep, id) => println!("accept {ep} id={id}"),
    ///                 SocketEvent::Disconnected(id) => println!("peer {id} gone"),
    ///                 _ => {}
    ///             }
    ///         }
    ///     });
    ///     rep.bind("tcp://127.0.0.1:5555").await?;
    ///     Ok(())
    /// }
    /// ```
    fn monitor(&mut self) -> mpsc::Receiver<SocketEvent> {
        let (sender, receiver) = mpsc::channel(1024);
        self.backend().monitor().lock().replace(sender);
        receiver
    }

    /// Drain outbound queues for all connected peers up to the configured
    /// linger timeout before the socket is dropped. Default is a no-op
    /// (linger = `Some(ZERO)` discards immediately); override on socket types
    /// that have access to the peer registry.
    #[doc(hidden)]
    fn linger_drain(&mut self) -> impl std::future::Future<Output = ()> + Send {
        async {}
    }

    /// Stop accepting connections and wait for outbound messages to drain
    /// (subject to the configured [`linger`](crate::SocketOptions) timeout),
    /// then drop the socket.
    ///
    /// Returns `Ok(())` if every bound endpoint shut down cleanly, or
    /// `Err(errs)` with the per-endpoint errors otherwise. The socket is
    /// always dropped — the error list is informational.
    fn close(mut self) -> impl std::future::Future<Output = Result<(), Vec<ZmqError>>> + Send {
        async move {
            let errs = self.unbind_all().await;
            self.linger_drain().await;
            if errs.is_empty() {
                Ok(())
            } else {
                Err(errs)
            }
        }
    }
}

#[cfg(feature = "inproc")]
use crate::PeerIdentity;

/// Emit `log::warn!` + one `SocketEvent::OptionIgnoredOnTransport` per
/// (option, socket) pair when a bind/connect routes through inproc and
/// the socket has options set that inproc ignores (CURVE, PLAIN, ZAP,
/// `handshake_interval`, heartbeat, TCP knobs, etc.). libzmq silently ignores
/// these — we're stricter on discoverability without breaking the
/// "one socket spans transports" model by returning an error.
#[cfg(feature = "inproc")]
fn warn_inproc_ignored_options<S: Socket>(sock: &mut S, endpoint: &Endpoint) {
    let ignored: Vec<&'static str> = sock.backend().socket_options().inproc_ignored_options();
    if ignored.is_empty() {
        return;
    }
    let common = sock.common_mut();
    for opt in ignored {
        if common.ignored_warned.insert(opt) {
            log::warn!(
                "option '{}' is set but has no effect on inproc transport ({}); \
                 configure per-endpoint behavior with separate sockets if needed",
                opt,
                endpoint
            );
            if let Some(monitor) = common.backend.monitor().lock().as_mut() {
                let _ = monitor.try_send(SocketEvent::OptionIgnoredOnTransport {
                    option: opt,
                    endpoint: endpoint.clone(),
                });
            }
        }
    }
}