rustzmq2 0.1.0

A native async Rust implementation of ZeroMQ
Documentation
//! In-process transport.
//!
//! Peers exchange messages via a sync `std::sync::mpsc::SyncSender` injected
//! directly into the remote socket's inproc inbound channel, bypassing the
//! async executor on the send path. A `tokio::sync::Notify` (or runtime
//! equivalent) wakes the remote's `recv_next` only when it is parked.
//!
//! # Connection protocol
//! 1. The server side calls `begin_accept(name, cback)` which registers the
//!    name in a global [`REGISTRY`] and spawns an accept task.
//! 2. The client side calls `connect(name)` which creates a matched
//!    [`InprocPeer`] pair and sends the server half through the registry to
//!    the accept task.
//! 3. Both sides perform a two-round async handshake inside
//!    `peer_connected_inproc`:
//!    - Round 1: each sends its `InprocChannelInfo` (sync tx + notify) to the other.
//!    - Round 2: each sends its assigned `PeerKey` to the other.
//!
//!    After the handshake both sides hold a fully-wired `InprocEngine` that
//!    injects directly into the remote's sync inproc inbound channel.

use super::AcceptStopHandle;
use crate::async_rt;
use crate::endpoint::Endpoint;
use crate::error::ZmqError;
use crate::task_handle::TaskHandle;
use crate::ZmqResult;

use flume as ac;
use futures::channel::oneshot;
use futures::{select, FutureExt};
use once_cell::sync::Lazy;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};

// ── Global name registry ──────────────────────────────────────────────────────

/// Maps bound names to handoff senders. `connect()` looks up the name and
/// sends an `InprocPeer` to the server-side accept task.
static REGISTRY: Lazy<Mutex<HashMap<String, ac::Sender<InprocPeer>>>> =
    Lazy::new(|| Mutex::new(HashMap::new()));

/// Connections that arrived before a matching `begin_accept`. Drained into
/// the accept task's `handoff_tx` when `begin_accept` registers the name.
/// Matches libzmq's `pend_connection` / `connect_pending` flow
/// (`libzmq/src/ctx.cpp:743-779`). Keyed by endpoint name.
static PENDING: Lazy<Mutex<HashMap<String, Vec<InprocPeer>>>> =
    Lazy::new(|| Mutex::new(HashMap::new()));

// ── InprocChannelInfo ─────────────────────────────────────────────────────────

/// Bundled inproc channel info exchanged during handshake Round 1.
/// Contains the sync sender, the notify handle needed to wake the remote's
/// `recv_next` without going through the async executor, and the socket
/// type so the peer can reject incompatible pairings (REQ↔PUB, etc.)
/// before any user messages flow.
///
/// `routing_id` carries the local side's configured `ZMQ_ROUTING_ID`
/// (`opts.peer_id`). It's the inproc analogue of the `Identity` ZMTP
/// property exchanged over TCP. Whether the remote *uses* it depends on
/// the remote's socket type — only ROUTER does, matching libzmq's
/// `options.recv_routing_id` rule.
pub(crate) struct InprocChannelInfo {
    pub(crate) tx: crate::engine::InprocInboundTx,
    pub(crate) notify: Arc<crate::async_rt::notify::RuntimeNotify>,
    pub(crate) socket_type: crate::SocketType,
    pub(crate) routing_id: Option<crate::PeerIdentity>,
}

// ── InprocPeer ────────────────────────────────────────────────────────────────

/// Handshake token for one end of an inproc connection.
///
/// Both sides exchange their `InprocChannelInfo` (Round 1) and `PeerKey`
/// (Round 2) via oneshot pairs so each can build a fully-wired `InprocEngine`.
pub(crate) struct InprocPeer {
    pub(crate) endpoint: Endpoint,
    /// Round 1 — send our `InprocChannelInfo` to the remote.
    pub(crate) send_inbound: oneshot::Sender<InprocChannelInfo>,
    /// Round 1 — receive the remote's `InprocChannelInfo`.
    pub(crate) recv_inbound: oneshot::Receiver<InprocChannelInfo>,
    /// Round 2 — send our assigned `PeerKey` to the remote.
    pub(crate) send_key: oneshot::Sender<crate::engine::registry::PeerKey>,
    /// Round 2 — receive the remote's assigned `PeerKey`.
    pub(crate) recv_key: oneshot::Receiver<crate::engine::registry::PeerKey>,
}

/// Create a matched pair of `InprocPeer`s. Returns `(client, server)`.
fn peer_pair(name: &str) -> (InprocPeer, InprocPeer) {
    let (a_inbound_tx, b_inbound_rx) = oneshot::channel();
    let (b_inbound_tx, a_inbound_rx) = oneshot::channel();
    let (a_key_tx, b_key_rx) = oneshot::channel();
    let (b_key_tx, a_key_rx) = oneshot::channel();
    let ep = Endpoint::Inproc(name.to_string());
    let client = InprocPeer {
        endpoint: ep.clone(),
        send_inbound: a_inbound_tx,
        recv_inbound: a_inbound_rx,
        send_key: a_key_tx,
        recv_key: a_key_rx,
    };
    let server = InprocPeer {
        endpoint: ep,
        send_inbound: b_inbound_tx,
        recv_inbound: b_inbound_rx,
        send_key: b_key_tx,
        recv_key: b_key_rx,
    };
    (client, server)
}

// ── Public transport functions ────────────────────────────────────────────────

pub(crate) async fn connect(name: &str) -> ZmqResult<InprocPeer> {
    let (client, server) = peer_pair(name);

    // Hold the REGISTRY lock across the PENDING write so that a concurrent
    // `begin_accept` can't race past us: either we see the live handoff_tx
    // here, or our peer lands in PENDING and `begin_accept` drains it.
    let outcome: Option<ac::Sender<InprocPeer>> = {
        let registry = REGISTRY.lock().unwrap();
        if let Some(tx) = registry.get(name).cloned() {
            Some(tx)
        } else {
            PENDING
                .lock()
                .unwrap()
                .entry(name.to_string())
                .or_default()
                .push(server);
            return Ok(client);
        }
    };

    // Bound case: hand the server end to the accept task.
    let tx = outcome.expect("set in REGISTRY branch above");
    tx.send_async(server)
        .await
        .map_err(|_e| ZmqError::Socket("inproc: accept task closed before connect".into()))?;
    Ok(client)
}

/// Dropped when the inproc `AcceptStopHandle` is dropped, synchronously
/// removing the name from the global registry so the address is immediately
/// available for re-bind.
struct InprocRegistryGuard(String);

impl Drop for InprocRegistryGuard {
    fn drop(&mut self) {
        REGISTRY.lock().unwrap().remove(&self.0);
    }
}

pub(crate) async fn begin_accept<T>(
    name: String,
    cback: impl Fn(ZmqResult<InprocPeer>) -> T + Send + 'static,
) -> ZmqResult<(Endpoint, AcceptStopHandle)>
where
    T: std::future::Future<Output = ()> + Send + 'static,
{
    let (handoff_tx, handoff_rx) = ac::bounded::<InprocPeer>(8);
    let (stop_tx, stop_rx) = futures::channel::oneshot::channel::<()>();

    // Register AND drain any pending peers under the same REGISTRY lock so
    // a concurrent `connect()` cannot land in PENDING between our insert
    // and our drain and be lost.
    let pending_peers: Vec<InprocPeer> = {
        let mut registry = REGISTRY.lock().unwrap();
        if registry.contains_key(&name) {
            return Err(ZmqError::AddressInUse(Endpoint::Inproc(name)));
        }
        registry.insert(name.clone(), handoff_tx.clone());
        PENDING.lock().unwrap().remove(&name).unwrap_or_default()
    };
    // Hand off pending connections to the accept task. The channel has
    // capacity 8; if we pre-queued more we fall back to async send below.
    // Since the accept task hasn't started yet, the channel is empty.
    for peer in pending_peers {
        // flume bounded send may block past capacity. Doing it here
        // before the accept task is spawned is fine — we're on the bind
        // caller's task, not holding any lock.
        if handoff_tx.send_async(peer).await.is_err() {
            break;
        }
    }

    let endpoint = Endpoint::Inproc(name.clone());

    let task_handle = async_rt::task::spawn(async move {
        let mut stop_rx = stop_rx.fuse();
        loop {
            select! {
                incoming = handoff_rx.recv_async().fuse() => {
                    match incoming {
                        Ok(peer) => {
                            async_rt::task::spawn(cback(Ok(peer)));
                        }
                        Err(_) => break,
                    }
                }
                _ = stop_rx => break,
            }
        }
        Ok(())
    });

    let guard: Box<dyn std::any::Any + Send + Sync> = Box::new(InprocRegistryGuard(name));
    Ok((
        endpoint,
        AcceptStopHandle::with_guard(TaskHandle::new(stop_tx, task_handle), guard),
    ))
}