Skip to main content

rustzmq2/
error.rs

1use crate::codec::{Message, ZmtpVersion};
2use crate::endpoint::Endpoint;
3use crate::ZmqMessage;
4
5use futures::channel::mpsc;
6use std::any::Any;
7use std::borrow::Cow;
8use thiserror::Error;
9
10// ── Result aliases ────────────────────────────────────────────────────────────
11
12pub type ZmqResult<T> = Result<T, ZmqError>;
13#[cfg(any(feature = "tcp", all(feature = "ipc", target_family = "unix")))]
14pub(crate) type CodecResult<T> = Result<T, CodecError>;
15
16// ── Endpoint errors ───────────────────────────────────────────────────────────
17
18/// Represents an error when parsing an [`crate::Endpoint`]
19#[non_exhaustive]
20#[derive(Error, Debug)]
21pub enum EndpointError {
22    #[error("Failed to parse IP address or port")]
23    ParseIpAddr(#[from] std::net::AddrParseError),
24    #[error("Unknown transport type {0}")]
25    UnknownTransport(String),
26    #[error("Invalid Syntax: {0}")]
27    Syntax(&'static str),
28}
29
30// ── Codec errors ──────────────────────────────────────────────────────────────
31
32/// Represents an error when encoding/decoding raw byte buffers and frames
33#[non_exhaustive]
34#[derive(Error, Debug)]
35pub enum CodecError {
36    #[error("{0}")]
37    Command(&'static str),
38    #[error("{0}")]
39    Greeting(&'static str),
40    #[error("{0}")]
41    Mechanism(&'static str),
42    #[error("{0}")]
43    Decode(&'static str),
44    #[error("{0}")]
45    Io(#[from] std::io::Error),
46    #[error("{0}")]
47    Other(&'static str),
48    /// Synthetic marker emitted by the engine's reader task on graceful
49    /// peer EOF. Not a codec error in the traditional sense — socket
50    /// layers use it to trigger `peer_disconnected` cleanup and continue
51    /// their recv loop rather than surfacing to the user.
52    #[error("peer disconnected")]
53    PeerDisconnected,
54    // ── CURVE / message frame errors ──────────────────────────────────────
55    #[cfg(feature = "curve")]
56    #[error("CURVE encrypt failed")]
57    CurveEncryptFailed,
58    #[cfg(feature = "curve")]
59    #[error("CURVE decrypt failed")]
60    CurveDecryptFailed,
61    #[cfg(feature = "curve")]
62    #[error("MESSAGE frame too short")]
63    MessageFrameTooShort,
64    #[cfg(feature = "curve")]
65    #[error("CURVE nonce out of order")]
66    CurveNonceOutOfOrder,
67    #[cfg(feature = "curve")]
68    #[error("CURVE: empty plaintext")]
69    CurveEmptyPlaintext,
70    // ── Writer errors ─────────────────────────────────────────────────────
71    #[error("vectored write returned 0 bytes")]
72    WriteZero,
73}
74
75impl From<CodecError> for std::io::Error {
76    fn from(e: CodecError) -> Self {
77        use std::io::ErrorKind;
78        let kind = match &e {
79            #[cfg(feature = "curve")]
80            CodecError::CurveEncryptFailed
81            | CodecError::CurveDecryptFailed
82            | CodecError::MessageFrameTooShort
83            | CodecError::CurveNonceOutOfOrder
84            | CodecError::CurveEmptyPlaintext => ErrorKind::InvalidData,
85            CodecError::WriteZero => ErrorKind::WriteZero,
86            CodecError::Io(io) => io.kind(),
87            _ => ErrorKind::Other,
88        };
89        std::io::Error::new(kind, e.to_string())
90    }
91}
92
93// ── Task / join errors ────────────────────────────────────────────────────────
94
95/// The error type returned by awaiting a [`crate::async_rt::task::JoinHandle`].
96#[non_exhaustive]
97#[derive(Debug)]
98pub enum JoinError {
99    Cancelled,
100    Panic(Box<dyn Any + Send + 'static>),
101}
102
103impl JoinError {
104    pub fn is_cancelled(&self) -> bool {
105        matches!(self, Self::Cancelled)
106    }
107
108    pub fn is_panic(&self) -> bool {
109        !self.is_cancelled()
110    }
111}
112
113impl std::fmt::Display for JoinError {
114    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
115        match self {
116            Self::Cancelled => f.write_str("task cancelled"),
117            Self::Panic(_) => f.write_str("task panicked"),
118        }
119    }
120}
121
122impl std::error::Error for JoinError {}
123
124#[cfg(feature = "tokio")]
125impl From<tokio::task::JoinError> for JoinError {
126    fn from(err: tokio::task::JoinError) -> Self {
127        if err.is_cancelled() {
128            Self::Cancelled
129        } else {
130            Self::Panic(err.into_panic())
131        }
132    }
133}
134
135#[non_exhaustive]
136#[derive(Error, Debug)]
137pub enum TaskError {
138    #[error("Internal task error: {0}")]
139    Internal(#[from] Box<ZmqError>),
140    #[error("Task panicked")]
141    Panic,
142    #[error("Task cancelled")]
143    Cancelled,
144}
145
146impl From<JoinError> for TaskError {
147    fn from(err: JoinError) -> Self {
148        if err.is_panic() {
149            TaskError::Panic
150        } else {
151            debug_assert!(err.is_cancelled());
152            TaskError::Cancelled
153        }
154    }
155}
156
157// ── Message errors ────────────────────────────────────────────────────────────
158
159/// Error returned when attempting to construct a [`ZmqMessage`](crate::ZmqMessage) with no frames.
160#[derive(Debug)]
161pub struct ZmqEmptyMessageError;
162
163impl std::fmt::Display for ZmqEmptyMessageError {
164    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
165        write!(f, "Unable to construct an empty ZmqMessage")
166    }
167}
168
169impl std::error::Error for ZmqEmptyMessageError {}
170
171// ── Engine / send errors ──────────────────────────────────────────────────────
172
173/// Writer task exited before the target flush was reached.
174#[derive(Debug)]
175pub(crate) struct Disconnected;
176
177/// Error kinds returned when sending a message through the engine.
178///
179/// `Enqueue` means the outbound channel was closed before the message was
180/// accepted — the message is returned so the caller can report
181/// `ReturnToSender`. `Flush` means the writer task exited after enqueue but
182/// before flushing — the message belongs to the writer and isn't recoverable.
183/// Only `send_flushed` can produce `Flush`.
184#[derive(Debug)]
185pub enum SendError {
186    Enqueue(Message),
187    Flush,
188}
189
190// ── Top-level ZMQ error ───────────────────────────────────────────────────────
191
192#[non_exhaustive]
193#[derive(Error, Debug)]
194pub enum ZmqError {
195    #[error("Endpoint Error: {0}")]
196    Endpoint(#[from] EndpointError),
197    #[error("Network Error: {0}")]
198    Network(#[from] std::io::Error),
199    #[error("Socket bind doesn't exist: {0}")]
200    NoSuchBind(Endpoint),
201    /// libzmq's `EADDRINUSE`: bind failed because the endpoint is already
202    /// claimed (inproc name collision, TCP port in use, etc.).
203    #[error("Address already in use: {0}")]
204    AddressInUse(Endpoint),
205    /// libzmq's `EPROTOTYPE`: the peer's socket type is not compatible
206    /// with ours (e.g. REQ ↔ PUB). See [`SocketType::compatible`](crate::SocketType::compatible).
207    #[error("Incompatible peer socket type")]
208    IncompatiblePeer,
209    #[error("Codec Error: {0}")]
210    Codec(#[from] CodecError),
211    #[error("Socket Error: {0}")]
212    Socket(Cow<'static, str>),
213    #[error("{0}")]
214    BufferFull(Cow<'static, str>),
215    #[error("Failed to deliver message ({} frames) cause of {reason}", message.len())]
216    ReturnToSender {
217        reason: Cow<'static, str>,
218        message: ZmqMessage,
219    },
220    // ReturnToSenderMultipart exists separately because the internal Message enum
221    // is not yet part of the public API. Once it is, these two variants can be unified.
222    #[error("Failed to deliver messages ({} messages) cause of {reason}", messages.len())]
223    ReturnToSenderMultipart {
224        reason: Cow<'static, str>,
225        messages: Vec<ZmqMessage>,
226    },
227    #[error("Task Error: {0}")]
228    Task(#[from] TaskError),
229    #[error("{0}")]
230    Other(Cow<'static, str>),
231    #[error("No message received")]
232    NoMessage,
233    #[error("Invalid peer identity: must be less than 256 bytes in length")]
234    PeerIdentity,
235    #[error("Unsupported ZMTP version")]
236    UnsupportedVersion(ZmtpVersion),
237    // ── Security ─────────────────────────────────────────────────────────────
238    /// The two peers advertised incompatible security mechanisms.
239    #[error("Mechanism mismatch: we use {ours}, peer uses {peer}")]
240    MechanismMismatch {
241        ours: &'static str,
242        peer: &'static str,
243    },
244    /// PLAIN authentication failed (wrong credentials or server rejection).
245    #[error("PLAIN authentication failed: {reason}")]
246    PlainAuthFailed { reason: String },
247    /// CURVE handshake failed (crypto or protocol error).
248    #[cfg(feature = "curve")]
249    #[error("CURVE handshake failed: {reason}")]
250    CurveHandshakeFailed { reason: Cow<'static, str> },
251    /// ZAP handler denied the connection.
252    #[error("ZAP denied connection (status {status_code}): {status_text}")]
253    ZapDenied {
254        status_code: u16,
255        status_text: String,
256    },
257    /// ZAP handler did not respond within the timeout.
258    #[error("ZAP handler timed out")]
259    ZapTimeout,
260    /// Both peers claimed the same server/client role.
261    #[error("ZMTP role conflict: both peers claim the same server/client role")]
262    ServerRoleConflict,
263    /// The ZMTP handshake did not complete within `handshake_interval`.
264    #[error("ZMTP handshake timed out")]
265    HandshakeTimeout,
266}
267
268impl From<mpsc::TrySendError<Message>> for ZmqError {
269    fn from(_: mpsc::TrySendError<Message>) -> Self {
270        ZmqError::BufferFull("Failed to send message. Send queue full/broken".into())
271    }
272}
273
274impl From<mpsc::SendError> for ZmqError {
275    fn from(_: mpsc::SendError) -> Self {
276        ZmqError::BufferFull("Failed to send message. Send queue full/broken".into())
277    }
278}