rustzmq2 0.1.0

A native async Rust implementation of ZeroMQ
Documentation
//! Generic bidirectional proxy between two sockets with optional capture.

use crate::socket::{CaptureSocket, SocketRecv, SocketSend};
use crate::ZmqResult;

use futures::{select, FutureExt};

/// Bidirectional proxy between a frontend and backend socket.
///
/// Forwards messages from `frontend` to `backend` and vice versa.
/// Runs until one of the sockets returns an error.
///
/// Mirrors libzmq's [`zmq_proxy(3)`](https://libzmq.readthedocs.io/en/latest/zmq_proxy.html).
/// To also tee every message to a third socket for monitoring/logging,
/// use [`proxy_with_capture`].
///
/// # Example — XSUB/XPUB pub-sub forwarder
///
/// ```rust,no_run
/// use rustzmq2::prelude::*;
///
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
///     let mut frontend = rustzmq2::XSubSocket::new();
///     frontend.bind("tcp://*:5559").await?;     // publishers connect here
///
///     let mut backend = rustzmq2::XPubSocket::new();
///     backend.bind("tcp://*:5560").await?;      // subscribers connect here
///
///     rustzmq2::proxy(frontend, backend).await?;
///     Ok(())
/// }
/// ```
pub async fn proxy<Frontend, Backend>(mut frontend: Frontend, mut backend: Backend) -> ZmqResult<()>
where
    Frontend: SocketSend + SocketRecv,
    Backend: SocketSend + SocketRecv,
{
    loop {
        select! {
            frontend_mess = frontend.recv().fuse() => {
                backend.send(frontend_mess?).await?;
            },
            backend_mess = backend.recv().fuse() => {
                frontend.send(backend_mess?).await?;
            }
        };
    }
}

/// Bidirectional proxy that also forwards every message to a capture
/// socket before relaying — useful for monitoring or logging traffic.
///
/// Runs until one of the sockets returns an error.
///
/// Mirrors libzmq's
/// [`zmq_proxy_steerable(3)`](https://libzmq.readthedocs.io/en/latest/zmq_proxy_steerable.html).
///
/// # Example — broker with traffic capture
///
/// ```rust,no_run
/// use rustzmq2::prelude::*;
///
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
///     let mut frontend = rustzmq2::RouterSocket::new();
///     frontend.bind("tcp://127.0.0.1:5559").await?;
///
///     let mut backend = rustzmq2::DealerSocket::new();
///     backend.bind("tcp://127.0.0.1:5560").await?;
///
///     // Tee every message into a PUB so an external monitor can subscribe.
///     let mut capture = rustzmq2::PubSocket::new();
///     capture.bind("tcp://127.0.0.1:9999").await?;
///
///     rustzmq2::proxy_with_capture(frontend, backend, capture).await?;
///     Ok(())
/// }
/// ```
pub async fn proxy_with_capture<Frontend, Backend, Capture>(
    mut frontend: Frontend,
    mut backend: Backend,
    mut capture: Capture,
) -> ZmqResult<()>
where
    Frontend: SocketSend + SocketRecv,
    Backend: SocketSend + SocketRecv,
    Capture: CaptureSocket,
{
    loop {
        select! {
            frontend_mess = frontend.recv().fuse() => {
                let message = frontend_mess?;
                capture.send(message.clone()).await?;
                backend.send(message).await?;
            },
            backend_mess = backend.recv().fuse() => {
                let message = backend_mess?;
                capture.send(message.clone()).await?;
                frontend.send(message).await?;
            }
        };
    }
}