use crate::codec::{Message, ZmtpVersion};
use crate::endpoint::Endpoint;
use crate::ZmqMessage;
use futures::channel::mpsc;
use std::any::Any;
use std::borrow::Cow;
use thiserror::Error;
pub type ZmqResult<T> = Result<T, ZmqError>;
#[cfg(any(feature = "tcp", all(feature = "ipc", target_family = "unix")))]
pub(crate) type CodecResult<T> = Result<T, CodecError>;
#[non_exhaustive]
#[derive(Error, Debug)]
pub enum EndpointError {
#[error("Failed to parse IP address or port")]
ParseIpAddr(#[from] std::net::AddrParseError),
#[error("Unknown transport type {0}")]
UnknownTransport(String),
#[error("Invalid Syntax: {0}")]
Syntax(&'static str),
}
#[non_exhaustive]
#[derive(Error, Debug)]
pub enum CodecError {
#[error("{0}")]
Command(&'static str),
#[error("{0}")]
Greeting(&'static str),
#[error("{0}")]
Mechanism(&'static str),
#[error("{0}")]
Decode(&'static str),
#[error("{0}")]
Io(#[from] std::io::Error),
#[error("{0}")]
Other(&'static str),
#[error("peer disconnected")]
PeerDisconnected,
#[cfg(feature = "curve")]
#[error("CURVE encrypt failed")]
CurveEncryptFailed,
#[cfg(feature = "curve")]
#[error("CURVE decrypt failed")]
CurveDecryptFailed,
#[cfg(feature = "curve")]
#[error("MESSAGE frame too short")]
MessageFrameTooShort,
#[cfg(feature = "curve")]
#[error("CURVE nonce out of order")]
CurveNonceOutOfOrder,
#[cfg(feature = "curve")]
#[error("CURVE: empty plaintext")]
CurveEmptyPlaintext,
#[error("vectored write returned 0 bytes")]
WriteZero,
}
impl From<CodecError> for std::io::Error {
fn from(e: CodecError) -> Self {
use std::io::ErrorKind;
let kind = match &e {
#[cfg(feature = "curve")]
CodecError::CurveEncryptFailed
| CodecError::CurveDecryptFailed
| CodecError::MessageFrameTooShort
| CodecError::CurveNonceOutOfOrder
| CodecError::CurveEmptyPlaintext => ErrorKind::InvalidData,
CodecError::WriteZero => ErrorKind::WriteZero,
CodecError::Io(io) => io.kind(),
_ => ErrorKind::Other,
};
std::io::Error::new(kind, e.to_string())
}
}
#[non_exhaustive]
#[derive(Debug)]
pub enum JoinError {
Cancelled,
Panic(Box<dyn Any + Send + 'static>),
}
impl JoinError {
pub fn is_cancelled(&self) -> bool {
matches!(self, Self::Cancelled)
}
pub fn is_panic(&self) -> bool {
!self.is_cancelled()
}
}
impl std::fmt::Display for JoinError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Cancelled => f.write_str("task cancelled"),
Self::Panic(_) => f.write_str("task panicked"),
}
}
}
impl std::error::Error for JoinError {}
#[cfg(feature = "tokio")]
impl From<tokio::task::JoinError> for JoinError {
fn from(err: tokio::task::JoinError) -> Self {
if err.is_cancelled() {
Self::Cancelled
} else {
Self::Panic(err.into_panic())
}
}
}
#[non_exhaustive]
#[derive(Error, Debug)]
pub enum TaskError {
#[error("Internal task error: {0}")]
Internal(#[from] Box<ZmqError>),
#[error("Task panicked")]
Panic,
#[error("Task cancelled")]
Cancelled,
}
impl From<JoinError> for TaskError {
fn from(err: JoinError) -> Self {
if err.is_panic() {
TaskError::Panic
} else {
debug_assert!(err.is_cancelled());
TaskError::Cancelled
}
}
}
#[derive(Debug)]
pub struct ZmqEmptyMessageError;
impl std::fmt::Display for ZmqEmptyMessageError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Unable to construct an empty ZmqMessage")
}
}
impl std::error::Error for ZmqEmptyMessageError {}
#[derive(Debug)]
pub(crate) struct Disconnected;
#[derive(Debug)]
pub enum SendError {
Enqueue(Message),
Flush,
}
#[non_exhaustive]
#[derive(Error, Debug)]
pub enum ZmqError {
#[error("Endpoint Error: {0}")]
Endpoint(#[from] EndpointError),
#[error("Network Error: {0}")]
Network(#[from] std::io::Error),
#[error("Socket bind doesn't exist: {0}")]
NoSuchBind(Endpoint),
#[error("Address already in use: {0}")]
AddressInUse(Endpoint),
#[error("Incompatible peer socket type")]
IncompatiblePeer,
#[error("Codec Error: {0}")]
Codec(#[from] CodecError),
#[error("Socket Error: {0}")]
Socket(Cow<'static, str>),
#[error("{0}")]
BufferFull(Cow<'static, str>),
#[error("Failed to deliver message ({} frames) cause of {reason}", message.len())]
ReturnToSender {
reason: Cow<'static, str>,
message: ZmqMessage,
},
#[error("Failed to deliver messages ({} messages) cause of {reason}", messages.len())]
ReturnToSenderMultipart {
reason: Cow<'static, str>,
messages: Vec<ZmqMessage>,
},
#[error("Task Error: {0}")]
Task(#[from] TaskError),
#[error("{0}")]
Other(Cow<'static, str>),
#[error("No message received")]
NoMessage,
#[error("Invalid peer identity: must be less than 256 bytes in length")]
PeerIdentity,
#[error("Unsupported ZMTP version")]
UnsupportedVersion(ZmtpVersion),
#[error("Mechanism mismatch: we use {ours}, peer uses {peer}")]
MechanismMismatch {
ours: &'static str,
peer: &'static str,
},
#[error("PLAIN authentication failed: {reason}")]
PlainAuthFailed { reason: String },
#[cfg(feature = "curve")]
#[error("CURVE handshake failed: {reason}")]
CurveHandshakeFailed { reason: Cow<'static, str> },
#[error("ZAP denied connection (status {status_code}): {status_text}")]
ZapDenied {
status_code: u16,
status_text: String,
},
#[error("ZAP handler timed out")]
ZapTimeout,
#[error("ZMTP role conflict: both peers claim the same server/client role")]
ServerRoleConflict,
#[error("ZMTP handshake timed out")]
HandshakeTimeout,
}
impl From<mpsc::TrySendError<Message>> for ZmqError {
fn from(_: mpsc::TrySendError<Message>) -> Self {
ZmqError::BufferFull("Failed to send message. Send queue full/broken".into())
}
}
impl From<mpsc::SendError> for ZmqError {
fn from(_: mpsc::SendError) -> Self {
ZmqError::BufferFull("Failed to send message. Send queue full/broken".into())
}
}