rustzmq2 0.1.0

A native async Rust implementation of ZeroMQ
Documentation
//! ZAP (`ZeroMQ` Authentication Protocol, RFC 27) — Rust async channel variant.
//!
//! Instead of routing requests over an `inproc://zeromq.zap.01` ZMQ socket,
//! we use a global async channel so the handler lives in the same process and
//! async runtime. This avoids the bootstrap chicken-and-egg problem and the
//! overhead of socket round-trips.

use crate::codec::mechanism::ZmqMechanism;
#[cfg(any(feature = "tcp", all(feature = "ipc", target_family = "unix")))]
use crate::codec::ZmqGreeting;
#[cfg(any(feature = "tcp", all(feature = "ipc", target_family = "unix")))]
use crate::mechanism::SessionState;
#[cfg(any(feature = "tcp", all(feature = "ipc", target_family = "unix")))]
use crate::{ZmqError, ZmqResult};

use parking_lot::RwLock;
use std::future::Future;
use std::pin::Pin;

/// A request sent to the ZAP handler for authentication.
#[derive(Debug, Clone)]
pub struct ZapRequest {
    /// ZAP domain from the connecting socket's options.
    pub domain: String,
    /// Peer's remote address (IP:port string).
    pub address: String,
    /// ZMTP mechanism in use.
    pub mechanism: ZmqMechanism,
    /// PLAIN username, if applicable.
    pub username: Option<String>,
    /// PLAIN password, if applicable.
    pub password: Option<String>,
    /// CURVE client long-term public key (32 bytes), if mechanism is CURVE.
    pub client_pubkey: Option<Vec<u8>>,
}

/// Response from the ZAP handler.
///
/// Use [`ZapResponse::allow`] / [`ZapResponse::deny`] for the canonical
/// 200/400 cases, or construct directly to set a custom `status_code` /
/// `user_id`.
#[derive(Debug, Clone)]
pub struct ZapResponse {
    /// 200 = allow; 400 = client error (deny); 500 = server error.
    pub status_code: u16,
    pub status_text: String,
    /// Opaque user identifier returned to the application.
    pub user_id: String,
}

impl ZapResponse {
    /// Allow the connection, tagging it with the given user identity.
    pub fn allow(user_id: impl Into<String>) -> Self {
        Self {
            status_code: 200,
            status_text: "OK".into(),
            user_id: user_id.into(),
        }
    }

    /// Deny the connection with an optional reason string.
    pub fn deny(reason: impl Into<String>) -> Self {
        Self {
            status_code: 400,
            status_text: reason.into(),
            user_id: String::new(),
        }
    }
}

type BoxFuture = Pin<Box<dyn Future<Output = ZapResponse> + Send + 'static>>;
type HandlerFn = Box<dyn Fn(ZapRequest) -> BoxFuture + Send + Sync + 'static>;

static ZAP_HANDLER: RwLock<Option<HandlerFn>> = RwLock::new(None);

/// Register a ZAP authentication handler for all connections.
///
/// The handler is called for every new connection where
/// [`zap_domain`](crate::SocketBuilder::zap_domain) is set on the socket.
/// Return `ZapResponse { status_code: 200, .. }` to allow; any other code
/// denies. Handlers have 5 seconds to respond before the connection is
/// rejected with a timeout error.
///
/// Register **before** any socket binds or connects — connections that
/// arrive without a registered handler are permitted (permissive default,
/// matching libzmq).
///
/// Only one handler may be registered at a time. Subsequent calls replace
/// the previous handler.
///
/// # Example
///
/// ```rust,no_run
/// use rustzmq2::{set_zap_handler, ZapRequest, ZapResponse};
///
/// set_zap_handler(|req: ZapRequest| async move {
///     match req.username.as_deref() {
///         Some("alice") => ZapResponse::allow("alice"),
///         _ => ZapResponse::deny("unauthorized"),
///     }
/// });
/// ```
pub fn set_zap_handler<F, Fut>(handler: F)
where
    F: Fn(ZapRequest) -> Fut + Send + Sync + 'static,
    Fut: Future<Output = ZapResponse> + Send + 'static,
{
    let boxed: HandlerFn = Box::new(move |req| Box::pin(handler(req)));
    *ZAP_HANDLER.write() = Some(boxed);
}

/// Remove the global ZAP handler. After this call, all connections are
/// permitted without authentication (permissive default).
///
/// Useful in test teardown so handler state doesn't leak between cases.
///
/// ```rust,no_run
/// use rustzmq2::{clear_zap_handler, set_zap_handler, ZapResponse};
///
/// set_zap_handler(|_req| async { ZapResponse::allow("user") });
/// // ... run test ...
/// clear_zap_handler();
/// ```
pub fn clear_zap_handler() {
    *ZAP_HANDLER.write() = None;
}

/// Call the registered ZAP handler (if any) to authenticate a connection.
///
/// Returns `Ok(())` if allowed (or if no handler is registered — permissive
/// default, matching libzmq behaviour when no ZAP handler is bound).
#[cfg(any(feature = "tcp", all(feature = "ipc", target_family = "unix")))]
pub(crate) async fn zap_check(
    domain: &str,
    peer_greeting: &ZmqGreeting,
    state: &SessionState,
    peer_addr: &str,
    client_pubkey: Option<Vec<u8>>,
) -> ZmqResult<()> {
    let req = ZapRequest {
        domain: domain.to_string(),
        address: peer_addr.to_string(),
        mechanism: peer_greeting.mechanism,
        username: state.username.clone(),
        password: state.password.clone(),
        client_pubkey,
    };

    let resp_future = {
        let guard = ZAP_HANDLER.read();
        match guard.as_ref() {
            Some(handler) => handler(req),
            None => return Ok(()),
        }
    };

    let resp = crate::async_rt::task::timeout(std::time::Duration::from_secs(5), resp_future)
        .await
        .map_err(|_e| ZmqError::ZapTimeout)?;

    if resp.status_code == 200 {
        Ok(())
    } else {
        Err(ZmqError::ZapDenied {
            status_code: resp.status_code,
            status_text: resp.status_text,
        })
    }
}