rustzmq2 0.1.0

A native async Rust implementation of ZeroMQ
Documentation
//! XSUB socket.
//!
//! Raw subscriber side of the PUB/SUB pattern. Where [`SubSocket`](crate::SubSocket)
//! offers the high-level `subscribe(&str)` / `unsubscribe(&str)` API,
//! XSUB lets the application drive the subscription wire protocol
//! directly: to subscribe to topic `T`, send a single-frame
//! [`ZmqMessage`] whose first byte is `0x01` followed by the prefix
//! bytes; to unsubscribe, prefix with `0x00`.
//!
//! Any other first byte is forwarded upstream as a regular message —
//! this is how XSUB peers in a shovel/forwarder configuration relay
//! application data back toward the publisher side.
//!
//! Subscriptions are cached locally and replayed on connect and
//! reconnect (same as SUB), so the publisher is always (re)told about
//! the active filters after a hiccup.
//!
//! Compatible with PUB and XPUB. See
//! [RFC 29](https://rfc.zeromq.org/spec/29/) for the wire contract and
//! `xsub.cpp` in libzmq for the reference implementation.

use crate::engine::backend::{GenericSocketBackend, HasRegistry};
use crate::reconnect::{ReconnectConfig, ReconnectHandle};
use crate::socket::common::{HasCommon, SocketCommon};
use crate::socket::kind::sub::{SubBackendMsgType, SubSocketBackend};
use crate::{
    MultiPeerBackend, Socket, SocketBackend, SocketEvent, SocketOptions, SocketRecv, SocketSend,
    SocketType, ZmqMessage, ZmqResult,
};

use std::sync::Arc;

/// Extended subscribe socket (XSUB). Raw counterpart of [`SubSocket`](crate::SubSocket).
///
/// `send` accepts `0x01`-prefixed SUBSCRIBE frames, `0x00`-prefixed
/// UNSUBSCRIBE frames, or arbitrary application messages that are
/// forwarded to all connected publishers. `recv` delivers messages from
/// any connected publisher (no local filtering — that happens on the
/// publisher side).
///
/// Use XSUB in forwarder/shovel devices where both the subscription
/// events and the messages need to flow through application code.
///
/// See [RFC 29](https://rfc.zeromq.org/spec/29/) and
/// [`zmq_socket(3)`](https://libzmq.readthedocs.io/en/latest/zmq_socket.html).
pub struct XSubSocket {
    common: SocketCommon<SubSocketBackend>,
    reconnect_handles: Vec<ReconnectHandle>,
    /// Running send counter for the amortized cooperative yield. XSUB
    /// doesn't use `SocketCore`, so it carries its own counter.
    send_count: u32,
}

impl crate::socket::family::sealed::Sealed for XSubSocket {}
impl crate::socket::family::Subscriber for XSubSocket {}

impl Drop for XSubSocket {
    fn drop(&mut self) {
        for handle in self.reconnect_handles.drain(..) {
            handle.shutdown();
        }
        self.common.backend.shutdown();
    }
}

impl HasCommon for XSubSocket {
    type Backend = SubSocketBackend;
    fn common(&self) -> &SocketCommon<Self::Backend> {
        &self.common
    }
    fn common_mut(&mut self) -> &mut SocketCommon<Self::Backend> {
        &mut self.common
    }
}

impl Socket for XSubSocket {
    type Backend = SubSocketBackend;

    fn with_options(options: SocketOptions) -> Self {
        let backend = Arc::new(SubSocketBackend::with_options(options, SocketType::XSUB));
        Self {
            common: SocketCommon::new(backend),
            reconnect_handles: Vec::new(),
            send_count: 0,
        }
    }

    async fn linger_drain(&mut self) {
        let opts = self.common.backend.socket_options();
        crate::engine::registry::drain_registry(self.common.backend.registry(), opts).await;
    }

    async fn connect<E>(&mut self, endpoint: E) -> ZmqResult<()>
    where
        E: TryInto<crate::endpoint::Endpoint> + Send,
        E::Error: Into<crate::ZmqError>,
    {
        let endpoint = endpoint.try_into().map_err(Into::into)?;
        let connect_timeout = self.common.backend.socket_options().connect_timeout;
        let (resolved, peer_id) = crate::socket::handshake::connect_peer_forever(
            endpoint.clone(),
            self.common.backend.clone(),
            connect_timeout,
        )
        .await?;

        if let Some(monitor) = self.common.backend.monitor().lock().as_mut() {
            let _ = monitor.try_send(SocketEvent::Connected(resolved, peer_id.clone()));
        }

        let backend_for_closure = self.common.backend.clone();
        let register_fn: crate::reconnect::RegisterDisconnectFn =
            Box::new(move |peer_id, notifier| {
                backend_for_closure.register_disconnect_notifier(peer_id, notifier);
            });

        let opts = self.common.backend.socket_options();
        let reconnect_handle = crate::reconnect::spawn_reconnect_task(
            endpoint,
            self.common.backend.clone(),
            peer_id,
            register_fn,
            ReconnectConfig {
                initial_interval: opts.reconnect_interval,
                max_interval: opts.reconnect_interval_max,
                backoff_multiplier: 2.0,
            },
        );
        self.reconnect_handles.push(reconnect_handle);

        Ok(())
    }
}

impl SocketSend for XSubSocket {
    async fn send(&mut self, message: impl Into<ZmqMessage> + Send) -> ZmqResult<()> {
        let message = message.into();
        // Mirror libzmq xsub_t::xsend: inspect the first byte of the
        // first frame. 0x01 = SUBSCRIBE, 0x00 = UNSUBSCRIBE, anything
        // else (or empty) = application message. Subsequent multipart
        // frames always go upstream as-is; only frame 0 is parsed.
        let first = message.get(0);
        let (cmd, topic): (Option<SubBackendMsgType>, Option<Vec<u8>>) = match first {
            Some(frame) if !frame.is_empty() => match frame[0] {
                1 => (
                    Some(SubBackendMsgType::SUBSCRIBE),
                    Some(frame[1..].to_vec()),
                ),
                0 => (
                    Some(SubBackendMsgType::UNSUBSCRIBE),
                    Some(frame[1..].to_vec()),
                ),
                _ => (None, None),
            },
            _ => (None, None),
        };

        // Update the cached subscription set before the fanout so a
        // reconnect-replay during the fanout sees the new state.
        if let (Some(cmd_kind), Some(topic_bytes)) = (cmd, topic) {
            let mut subs = self.common.backend.subs.lock();
            match cmd_kind {
                SubBackendMsgType::SUBSCRIBE => {
                    subs.insert(topic_bytes);
                }
                SubBackendMsgType::UNSUBSCRIBE => {
                    subs.remove(&topic_bytes);
                }
            }
        }

        // Fan out the raw frame to every connected publisher.
        let mut peers = Vec::new();
        self.common.backend.registry().snapshot_into(&mut peers);
        let mut dead: Vec<crate::engine::registry::PeerKey> = Vec::new();
        for (key, engine) in &peers {
            if let Err(e) = engine.send_msg(message.clone()).await {
                log::debug!("XSUB: fanout to key={} failed: {:?}", key, e);
                dead.push(*key);
            }
        }
        for key in dead {
            if let Some(id) = self.common.backend.registry().id_for(key) {
                self.common.backend.peer_disconnected(&id);
            }
        }
        // Amortized cooperative yield (see `SocketCore::after_send`).
        self.send_count = self.send_count.wrapping_add(1);
        if self.send_count % crate::async_rt::task::SOCKET_SEND_YIELD_EVERY == 0 {
            crate::async_rt::task::yield_now().await;
        }
        Ok(())
    }
}

impl SocketRecv for XSubSocket {
    async fn recv(&mut self) -> ZmqResult<ZmqMessage> {
        let receive_timeout = self.common.backend.socket_options().receive_timeout;
        let (_peer_id, message) = GenericSocketBackend::recv_next_timed(
            &self.common.backend.inbound_rx,
            &*self.common.backend,
            receive_timeout,
        )
        .await?;
        Ok(message)
    }
}