rustzmq2 0.1.0

A native async Rust implementation of ZeroMQ
Documentation
//! Shared infrastructure for socket types backed by `GenericSocketBackend`
//! (PUSH, PULL, DEALER, ROUTER, REQ, REP, PAIR, CHANNEL, SCATTER, GATHER).
//! PUB/XPUB/SUB have custom backends and manage their own state directly.

use crate::endpoint::Endpoint;
use crate::engine::backend::GenericSocketBackend;
use crate::reconnect::{ReconnectConfig, ReconnectHandle};
use crate::socket::common::SocketCommon;
use crate::{SocketBackend, SocketEvent, SocketOptions, SocketType, ZmqResult};

use std::collections::HashMap;
use std::sync::Arc;

pub(crate) struct SocketCore {
    pub(crate) common: SocketCommon<GenericSocketBackend>,
    pub(crate) reconnect_handles: HashMap<Endpoint, ReconnectHandle>,
    /// Counter for the amortized cooperative yield on the send hot path.
    /// Each `send` implementation bumps this and lets [`Self::after_send`]
    /// decide whether to actually yield.
    ///
    /// Stored as `AtomicU32` so split-socket halves (e.g. `DealerSendHalf`,
    /// `RouterSendHalf`) can share it behind an `Arc` without locking.
    /// Relaxed ordering is sufficient — this is a heuristic, not a
    /// synchronization primitive.
    send_count: std::sync::atomic::AtomicU32,
}

impl SocketCore {
    pub(crate) fn new(socket_type: SocketType, mut options: SocketOptions) -> Self {
        // Resolve the inline-write knob now that we know the socket type.
        // `None` (the SocketOptions default) means "use the per-type
        // default"; `Some(_)` is a user override that we leave intact.
        if options.inline_write_max.is_none() {
            options.inline_write_max = Some(socket_type.default_inline_write_max());
        }
        // Same shape for `out_batch_msgs`: outer None → per-type default,
        // outer Some(_) → user override. PUB / XPUB stay at 32 (per-peer
        // HWM safety under fanout); everything else gets 256 (point-to-
        // point sockets benefit from larger writev amortization).
        if options.out_batch_msgs.is_none() {
            options.out_batch_msgs = Some(socket_type.default_out_batch_msgs());
        }
        let backend = Arc::new(GenericSocketBackend::with_options(socket_type, options));
        Self {
            common: SocketCommon::new(backend),
            reconnect_handles: HashMap::new(),
            send_count: std::sync::atomic::AtomicU32::new(0),
        }
    }

    /// Amortized cooperative yield shorthand for socket `send` impls.
    /// Call at the end of `send` to bump the counter and yield every
    /// [`crate::async_rt::task::SOCKET_SEND_YIELD_EVERY`] sends.
    ///
    /// Takes `&self` (not `&mut`) so split-socket send halves behind
    /// `Arc<_>` can call it.
    #[inline]
    pub(crate) async fn after_send(&self) {
        use std::sync::atomic::Ordering;
        let n = self
            .send_count
            .fetch_add(1, Ordering::Relaxed)
            .wrapping_add(1);
        if n % crate::async_rt::task::SOCKET_SEND_YIELD_EVERY == 0 {
            crate::async_rt::task::yield_now().await;
        }
    }

    /// Connect to `endpoint`, register the peer, emit a `Connected`
    /// monitor event, and spawn the reconnect task.
    pub(crate) async fn connect_endpoint(&mut self, endpoint: Endpoint) -> ZmqResult<()> {
        let connect_timeout = self.common.backend.socket_options().connect_timeout;
        let (resolved, peer_id) = crate::socket::handshake::connect_peer_forever(
            endpoint.clone(),
            self.common.backend.clone(),
            connect_timeout,
        )
        .await?;

        if let Some(monitor) = self.common.backend.monitor().lock().as_mut() {
            let _ = monitor.try_send(SocketEvent::Connected(resolved, peer_id.clone()));
        }

        let b = self.common.backend.clone();
        let register_fn: crate::reconnect::RegisterDisconnectFn =
            Box::new(move |id, notifier| b.register_disconnect_notifier(id, notifier));

        let opts = self.common.backend.socket_options();
        let handle = crate::reconnect::spawn_reconnect_task(
            endpoint.clone(),
            self.common.backend.clone(),
            peer_id,
            register_fn,
            ReconnectConfig {
                initial_interval: opts.reconnect_interval,
                max_interval: opts.reconnect_interval_max,
                backoff_multiplier: 2.0,
            },
        );
        self.reconnect_handles.insert(endpoint, handle);
        Ok(())
    }

    /// Drain all peer outbound queues up to the linger timeout.
    pub(crate) async fn linger_drain(&self) {
        let opts = self.common.backend.socket_options();
        crate::engine::registry::drain_registry(self.common.backend.registry(), opts).await;
    }
}

impl Drop for SocketCore {
    fn drop(&mut self) {
        for (_, handle) in self.reconnect_handles.drain() {
            handle.shutdown();
        }
        self.common.backend.shutdown();
    }
}