1use crate::codec::{Message, ZmtpVersion};
2use crate::endpoint::Endpoint;
3use crate::ZmqMessage;
4
5use futures::channel::mpsc;
6use std::any::Any;
7use std::borrow::Cow;
8use thiserror::Error;
9
10pub type ZmqResult<T> = Result<T, ZmqError>;
13#[cfg(any(feature = "tcp", all(feature = "ipc", target_family = "unix")))]
14pub(crate) type CodecResult<T> = Result<T, CodecError>;
15
16#[non_exhaustive]
20#[derive(Error, Debug)]
21pub enum EndpointError {
22 #[error("Failed to parse IP address or port")]
23 ParseIpAddr(#[from] std::net::AddrParseError),
24 #[error("Unknown transport type {0}")]
25 UnknownTransport(String),
26 #[error("Invalid Syntax: {0}")]
27 Syntax(&'static str),
28}
29
30#[non_exhaustive]
34#[derive(Error, Debug)]
35pub enum CodecError {
36 #[error("{0}")]
37 Command(&'static str),
38 #[error("{0}")]
39 Greeting(&'static str),
40 #[error("{0}")]
41 Mechanism(&'static str),
42 #[error("{0}")]
43 Decode(&'static str),
44 #[error("{0}")]
45 Io(#[from] std::io::Error),
46 #[error("{0}")]
47 Other(&'static str),
48 #[error("peer disconnected")]
53 PeerDisconnected,
54 #[cfg(feature = "curve")]
56 #[error("CURVE encrypt failed")]
57 CurveEncryptFailed,
58 #[cfg(feature = "curve")]
59 #[error("CURVE decrypt failed")]
60 CurveDecryptFailed,
61 #[cfg(feature = "curve")]
62 #[error("MESSAGE frame too short")]
63 MessageFrameTooShort,
64 #[cfg(feature = "curve")]
65 #[error("CURVE nonce out of order")]
66 CurveNonceOutOfOrder,
67 #[cfg(feature = "curve")]
68 #[error("CURVE: empty plaintext")]
69 CurveEmptyPlaintext,
70 #[error("vectored write returned 0 bytes")]
72 WriteZero,
73}
74
75impl From<CodecError> for std::io::Error {
76 fn from(e: CodecError) -> Self {
77 use std::io::ErrorKind;
78 let kind = match &e {
79 #[cfg(feature = "curve")]
80 CodecError::CurveEncryptFailed
81 | CodecError::CurveDecryptFailed
82 | CodecError::MessageFrameTooShort
83 | CodecError::CurveNonceOutOfOrder
84 | CodecError::CurveEmptyPlaintext => ErrorKind::InvalidData,
85 CodecError::WriteZero => ErrorKind::WriteZero,
86 CodecError::Io(io) => io.kind(),
87 _ => ErrorKind::Other,
88 };
89 std::io::Error::new(kind, e.to_string())
90 }
91}
92
93#[non_exhaustive]
97#[derive(Debug)]
98pub enum JoinError {
99 Cancelled,
100 Panic(Box<dyn Any + Send + 'static>),
101}
102
103impl JoinError {
104 pub fn is_cancelled(&self) -> bool {
105 matches!(self, Self::Cancelled)
106 }
107
108 pub fn is_panic(&self) -> bool {
109 !self.is_cancelled()
110 }
111}
112
113impl std::fmt::Display for JoinError {
114 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
115 match self {
116 Self::Cancelled => f.write_str("task cancelled"),
117 Self::Panic(_) => f.write_str("task panicked"),
118 }
119 }
120}
121
122impl std::error::Error for JoinError {}
123
124#[cfg(feature = "tokio")]
125impl From<tokio::task::JoinError> for JoinError {
126 fn from(err: tokio::task::JoinError) -> Self {
127 if err.is_cancelled() {
128 Self::Cancelled
129 } else {
130 Self::Panic(err.into_panic())
131 }
132 }
133}
134
135#[non_exhaustive]
136#[derive(Error, Debug)]
137pub enum TaskError {
138 #[error("Internal task error: {0}")]
139 Internal(#[from] Box<ZmqError>),
140 #[error("Task panicked")]
141 Panic,
142 #[error("Task cancelled")]
143 Cancelled,
144}
145
146impl From<JoinError> for TaskError {
147 fn from(err: JoinError) -> Self {
148 if err.is_panic() {
149 TaskError::Panic
150 } else {
151 debug_assert!(err.is_cancelled());
152 TaskError::Cancelled
153 }
154 }
155}
156
157#[derive(Debug)]
161pub struct ZmqEmptyMessageError;
162
163impl std::fmt::Display for ZmqEmptyMessageError {
164 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
165 write!(f, "Unable to construct an empty ZmqMessage")
166 }
167}
168
169impl std::error::Error for ZmqEmptyMessageError {}
170
171#[derive(Debug)]
175pub(crate) struct Disconnected;
176
177#[derive(Debug)]
185pub enum SendError {
186 Enqueue(Message),
187 Flush,
188}
189
190#[non_exhaustive]
193#[derive(Error, Debug)]
194pub enum ZmqError {
195 #[error("Endpoint Error: {0}")]
196 Endpoint(#[from] EndpointError),
197 #[error("Network Error: {0}")]
198 Network(#[from] std::io::Error),
199 #[error("Socket bind doesn't exist: {0}")]
200 NoSuchBind(Endpoint),
201 #[error("Address already in use: {0}")]
204 AddressInUse(Endpoint),
205 #[error("Incompatible peer socket type")]
208 IncompatiblePeer,
209 #[error("Codec Error: {0}")]
210 Codec(#[from] CodecError),
211 #[error("Socket Error: {0}")]
212 Socket(Cow<'static, str>),
213 #[error("{0}")]
214 BufferFull(Cow<'static, str>),
215 #[error("Failed to deliver message ({} frames) cause of {reason}", message.len())]
216 ReturnToSender {
217 reason: Cow<'static, str>,
218 message: ZmqMessage,
219 },
220 #[error("Failed to deliver messages ({} messages) cause of {reason}", messages.len())]
223 ReturnToSenderMultipart {
224 reason: Cow<'static, str>,
225 messages: Vec<ZmqMessage>,
226 },
227 #[error("Task Error: {0}")]
228 Task(#[from] TaskError),
229 #[error("{0}")]
230 Other(Cow<'static, str>),
231 #[error("No message received")]
232 NoMessage,
233 #[error("Invalid peer identity: must be less than 256 bytes in length")]
234 PeerIdentity,
235 #[error("Unsupported ZMTP version")]
236 UnsupportedVersion(ZmtpVersion),
237 #[error("Mechanism mismatch: we use {ours}, peer uses {peer}")]
240 MechanismMismatch {
241 ours: &'static str,
242 peer: &'static str,
243 },
244 #[error("PLAIN authentication failed: {reason}")]
246 PlainAuthFailed { reason: String },
247 #[cfg(feature = "curve")]
249 #[error("CURVE handshake failed: {reason}")]
250 CurveHandshakeFailed { reason: Cow<'static, str> },
251 #[error("ZAP denied connection (status {status_code}): {status_text}")]
253 ZapDenied {
254 status_code: u16,
255 status_text: String,
256 },
257 #[error("ZAP handler timed out")]
259 ZapTimeout,
260 #[error("ZMTP role conflict: both peers claim the same server/client role")]
262 ServerRoleConflict,
263 #[error("ZMTP handshake timed out")]
265 HandshakeTimeout,
266}
267
268impl From<mpsc::TrySendError<Message>> for ZmqError {
269 fn from(_: mpsc::TrySendError<Message>) -> Self {
270 ZmqError::BufferFull("Failed to send message. Send queue full/broken".into())
271 }
272}
273
274impl From<mpsc::SendError> for ZmqError {
275 fn from(_: mpsc::SendError) -> Self {
276 ZmqError::BufferFull("Failed to send message. Send queue full/broken".into())
277 }
278}