rustzmq2 0.1.0

A native async Rust implementation of ZeroMQ
Documentation
use crate::codec::{Message, ZmtpVersion};
use crate::endpoint::Endpoint;
use crate::ZmqMessage;

use futures::channel::mpsc;
use std::any::Any;
use std::borrow::Cow;
use thiserror::Error;

// ── Result aliases ────────────────────────────────────────────────────────────

pub type ZmqResult<T> = Result<T, ZmqError>;
#[cfg(any(feature = "tcp", all(feature = "ipc", target_family = "unix")))]
pub(crate) type CodecResult<T> = Result<T, CodecError>;

// ── Endpoint errors ───────────────────────────────────────────────────────────

/// Represents an error when parsing an [`crate::Endpoint`]
#[non_exhaustive]
#[derive(Error, Debug)]
pub enum EndpointError {
    #[error("Failed to parse IP address or port")]
    ParseIpAddr(#[from] std::net::AddrParseError),
    #[error("Unknown transport type {0}")]
    UnknownTransport(String),
    #[error("Invalid Syntax: {0}")]
    Syntax(&'static str),
}

// ── Codec errors ──────────────────────────────────────────────────────────────

/// Represents an error when encoding/decoding raw byte buffers and frames
#[non_exhaustive]
#[derive(Error, Debug)]
pub enum CodecError {
    #[error("{0}")]
    Command(&'static str),
    #[error("{0}")]
    Greeting(&'static str),
    #[error("{0}")]
    Mechanism(&'static str),
    #[error("{0}")]
    Decode(&'static str),
    #[error("{0}")]
    Io(#[from] std::io::Error),
    #[error("{0}")]
    Other(&'static str),
    /// Synthetic marker emitted by the engine's reader task on graceful
    /// peer EOF. Not a codec error in the traditional sense — socket
    /// layers use it to trigger `peer_disconnected` cleanup and continue
    /// their recv loop rather than surfacing to the user.
    #[error("peer disconnected")]
    PeerDisconnected,
    // ── CURVE / message frame errors ──────────────────────────────────────
    #[cfg(feature = "curve")]
    #[error("CURVE encrypt failed")]
    CurveEncryptFailed,
    #[cfg(feature = "curve")]
    #[error("CURVE decrypt failed")]
    CurveDecryptFailed,
    #[cfg(feature = "curve")]
    #[error("MESSAGE frame too short")]
    MessageFrameTooShort,
    #[cfg(feature = "curve")]
    #[error("CURVE nonce out of order")]
    CurveNonceOutOfOrder,
    #[cfg(feature = "curve")]
    #[error("CURVE: empty plaintext")]
    CurveEmptyPlaintext,
    // ── Writer errors ─────────────────────────────────────────────────────
    #[error("vectored write returned 0 bytes")]
    WriteZero,
}

impl From<CodecError> for std::io::Error {
    fn from(e: CodecError) -> Self {
        use std::io::ErrorKind;
        let kind = match &e {
            #[cfg(feature = "curve")]
            CodecError::CurveEncryptFailed
            | CodecError::CurveDecryptFailed
            | CodecError::MessageFrameTooShort
            | CodecError::CurveNonceOutOfOrder
            | CodecError::CurveEmptyPlaintext => ErrorKind::InvalidData,
            CodecError::WriteZero => ErrorKind::WriteZero,
            CodecError::Io(io) => io.kind(),
            _ => ErrorKind::Other,
        };
        std::io::Error::new(kind, e.to_string())
    }
}

// ── Task / join errors ────────────────────────────────────────────────────────

/// The error type returned by awaiting a [`crate::async_rt::task::JoinHandle`].
#[non_exhaustive]
#[derive(Debug)]
pub enum JoinError {
    Cancelled,
    Panic(Box<dyn Any + Send + 'static>),
}

impl JoinError {
    pub fn is_cancelled(&self) -> bool {
        matches!(self, Self::Cancelled)
    }

    pub fn is_panic(&self) -> bool {
        !self.is_cancelled()
    }
}

impl std::fmt::Display for JoinError {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            Self::Cancelled => f.write_str("task cancelled"),
            Self::Panic(_) => f.write_str("task panicked"),
        }
    }
}

impl std::error::Error for JoinError {}

#[cfg(feature = "tokio")]
impl From<tokio::task::JoinError> for JoinError {
    fn from(err: tokio::task::JoinError) -> Self {
        if err.is_cancelled() {
            Self::Cancelled
        } else {
            Self::Panic(err.into_panic())
        }
    }
}

#[non_exhaustive]
#[derive(Error, Debug)]
pub enum TaskError {
    #[error("Internal task error: {0}")]
    Internal(#[from] Box<ZmqError>),
    #[error("Task panicked")]
    Panic,
    #[error("Task cancelled")]
    Cancelled,
}

impl From<JoinError> for TaskError {
    fn from(err: JoinError) -> Self {
        if err.is_panic() {
            TaskError::Panic
        } else {
            debug_assert!(err.is_cancelled());
            TaskError::Cancelled
        }
    }
}

// ── Message errors ────────────────────────────────────────────────────────────

/// Error returned when attempting to construct a [`ZmqMessage`](crate::ZmqMessage) with no frames.
#[derive(Debug)]
pub struct ZmqEmptyMessageError;

impl std::fmt::Display for ZmqEmptyMessageError {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "Unable to construct an empty ZmqMessage")
    }
}

impl std::error::Error for ZmqEmptyMessageError {}

// ── Engine / send errors ──────────────────────────────────────────────────────

/// Writer task exited before the target flush was reached.
#[derive(Debug)]
pub(crate) struct Disconnected;

/// Error kinds returned when sending a message through the engine.
///
/// `Enqueue` means the outbound channel was closed before the message was
/// accepted — the message is returned so the caller can report
/// `ReturnToSender`. `Flush` means the writer task exited after enqueue but
/// before flushing — the message belongs to the writer and isn't recoverable.
/// Only `send_flushed` can produce `Flush`.
#[derive(Debug)]
pub enum SendError {
    Enqueue(Message),
    Flush,
}

// ── Top-level ZMQ error ───────────────────────────────────────────────────────

#[non_exhaustive]
#[derive(Error, Debug)]
pub enum ZmqError {
    #[error("Endpoint Error: {0}")]
    Endpoint(#[from] EndpointError),
    #[error("Network Error: {0}")]
    Network(#[from] std::io::Error),
    #[error("Socket bind doesn't exist: {0}")]
    NoSuchBind(Endpoint),
    /// libzmq's `EADDRINUSE`: bind failed because the endpoint is already
    /// claimed (inproc name collision, TCP port in use, etc.).
    #[error("Address already in use: {0}")]
    AddressInUse(Endpoint),
    /// libzmq's `EPROTOTYPE`: the peer's socket type is not compatible
    /// with ours (e.g. REQ ↔ PUB). See [`SocketType::compatible`](crate::SocketType::compatible).
    #[error("Incompatible peer socket type")]
    IncompatiblePeer,
    #[error("Codec Error: {0}")]
    Codec(#[from] CodecError),
    #[error("Socket Error: {0}")]
    Socket(Cow<'static, str>),
    #[error("{0}")]
    BufferFull(Cow<'static, str>),
    #[error("Failed to deliver message ({} frames) cause of {reason}", message.len())]
    ReturnToSender {
        reason: Cow<'static, str>,
        message: ZmqMessage,
    },
    // ReturnToSenderMultipart exists separately because the internal Message enum
    // is not yet part of the public API. Once it is, these two variants can be unified.
    #[error("Failed to deliver messages ({} messages) cause of {reason}", messages.len())]
    ReturnToSenderMultipart {
        reason: Cow<'static, str>,
        messages: Vec<ZmqMessage>,
    },
    #[error("Task Error: {0}")]
    Task(#[from] TaskError),
    #[error("{0}")]
    Other(Cow<'static, str>),
    #[error("No message received")]
    NoMessage,
    #[error("Invalid peer identity: must be less than 256 bytes in length")]
    PeerIdentity,
    #[error("Unsupported ZMTP version")]
    UnsupportedVersion(ZmtpVersion),
    // ── Security ─────────────────────────────────────────────────────────────
    /// The two peers advertised incompatible security mechanisms.
    #[error("Mechanism mismatch: we use {ours}, peer uses {peer}")]
    MechanismMismatch {
        ours: &'static str,
        peer: &'static str,
    },
    /// PLAIN authentication failed (wrong credentials or server rejection).
    #[error("PLAIN authentication failed: {reason}")]
    PlainAuthFailed { reason: String },
    /// CURVE handshake failed (crypto or protocol error).
    #[cfg(feature = "curve")]
    #[error("CURVE handshake failed: {reason}")]
    CurveHandshakeFailed { reason: Cow<'static, str> },
    /// ZAP handler denied the connection.
    #[error("ZAP denied connection (status {status_code}): {status_text}")]
    ZapDenied {
        status_code: u16,
        status_text: String,
    },
    /// ZAP handler did not respond within the timeout.
    #[error("ZAP handler timed out")]
    ZapTimeout,
    /// Both peers claimed the same server/client role.
    #[error("ZMTP role conflict: both peers claim the same server/client role")]
    ServerRoleConflict,
    /// The ZMTP handshake did not complete within `handshake_interval`.
    #[error("ZMTP handshake timed out")]
    HandshakeTimeout,
}

impl From<mpsc::TrySendError<Message>> for ZmqError {
    fn from(_: mpsc::TrySendError<Message>) -> Self {
        ZmqError::BufferFull("Failed to send message. Send queue full/broken".into())
    }
}

impl From<mpsc::SendError> for ZmqError {
    fn from(_: mpsc::SendError) -> Self {
        ZmqError::BufferFull("Failed to send message. Send queue full/broken".into())
    }
}