pub mod unified;
pub mod auth_error;
pub mod controller_error;
pub mod filter_error;
pub mod client_error;
pub use client_error::ClientError;
pub use controller_error::ControllerError;
pub use controller_error::ControllerResult;
pub use filter_error::FilterError;
pub use unified::AuthError;
pub use unified::NetworkError;
pub use unified::ProtocolError;
pub use unified::RocketMQError;
pub use unified::RpcClientError;
pub use unified::SerializationError;
pub use unified::ServiceError as UnifiedServiceError;
pub use unified::ToolsError;
#[deprecated(since = "0.7.0", note = "Use unified error system instead")]
mod cli_error;
#[deprecated(since = "0.7.0", note = "Use unified error system instead")]
mod common_error;
#[deprecated(since = "0.7.0", note = "Use unified error system instead")]
mod name_srv_error;
#[deprecated(since = "0.7.0", note = "Use unified error system instead")]
mod remoting_error;
#[deprecated(since = "0.7.0", note = "Use unified error system instead")]
mod store_error;
#[deprecated(since = "0.7.0", note = "Use unified error system instead")]
mod tui_error;
use std::io;
use thiserror::Error;
#[deprecated(since = "0.7.0", note = "Use unified::RocketMQResult instead")]
pub type LegacyRocketMQResult<T> = std::result::Result<T, RocketmqError>;
#[deprecated(since = "0.7.0", note = "Use unified::Result instead")]
pub type LegacyResult<T> = anyhow::Result<T>;
pub use unified::Result;
pub use unified::RocketMQResult;
use unified::ServiceError;
#[derive(Debug, Error)]
pub enum RocketmqError {
#[error("{0}")]
RemoteError(String),
#[error("{0}")]
DeserializeHeaderError(String),
#[error("connect to {0} failed")]
RemotingConnectError(String),
#[error("send request to < {0} > failed")]
RemotingSendRequestError(String),
#[error("wait response on the channel < {0} >, timeout: {1}(ms)")]
RemotingTimeoutError(String, u64),
#[error("RemotingTooMuchRequestException: {0}")]
RemotingTooMuchRequestError(String),
#[error("RpcException: code: {0}, message: {1}")]
RpcError(i32, String),
#[error("{0}")]
FromStrErr(String),
#[error("{0:?}")]
Io(#[from] io::Error),
#[error("RocketMQ protocol decoding failed, extFields length: {0}, but header length: {1}")]
DecodingError(usize, usize),
#[error("UTF-8 decoding error: {0}")]
Utf8Error(#[from] std::str::Utf8Error),
#[error("RemotingCommandDecoderError:{0}")]
RemotingCommandDecoderError(String),
#[error("RemotingCommandEncoderError:{0}")]
RemotingCommandEncoderError(String),
#[error("Not support serialize type: {0}")]
NotSupportSerializeType(u8),
#[error("ConnectionInvalid: {0}")]
ConnectionInvalid(String),
#[error("AbortProcessError: {0}-{1}")]
AbortProcessError(i32, String),
#[error("Channel Send Request failed: {0}")]
ChannelSendRequestFailed(String),
#[error("Channel recv Request failed: {0}")]
ChannelRecvRequestFailed(String),
#[error("{0}")]
IllegalArgument(String),
#[error("{0}")]
MQClientErr(#[from] ClientErr),
#[error("{0}")]
MQClientBrokerError(#[from] MQBrokerErr),
#[error("{0}")]
RequestTimeoutError(#[from] RequestTimeoutErr),
#[error("Client exception occurred: CODE:{0}, broker address:{1}, Message:{2}")]
OffsetNotFoundError(i32, String, String),
#[error("{0}")]
IllegalArgumentError(String),
#[error("{0}")]
#[cfg(feature = "with_serde")]
SerdeJsonError(#[from] serde_json::Error),
#[error("{0}")]
UnsupportedOperationException(String),
#[error("{0}")]
IpError(String),
#[error("{0}")]
ChannelError(String),
#[error("Client exception occurred: CODE:{0}, broker address:{2}, Message:{1}")]
MQBrokerError(i32, String, String),
#[error("{0}")]
NoneError(String),
#[error("{0}")]
TokioHandlerError(String),
#[error("Config parse error: {0}")]
#[cfg(feature = "with_config")]
ConfigError(#[from] config::ConfigError),
#[error("{0} command failed , {1}")]
SubCommand(String, String),
#[error("{0}")]
ServiceTaskError(#[from] ServiceError),
#[error("{0}")]
StoreCustomError(String),
}
#[derive(Error, Debug)]
#[error("{message}")]
pub struct MQBrokerErr {
response_code: i32,
error_message: Option<String>,
broker_addr: Option<String>,
message: String,
}
impl MQBrokerErr {
pub fn new(response_code: i32, error_message: impl Into<String>) -> Self {
let error_message = error_message.into();
let message = "";
Self {
response_code,
error_message: Some(error_message),
broker_addr: None,
message: String::from(message),
}
}
pub fn new_with_broker(
response_code: i32,
error_message: impl Into<String>,
broker_addr: impl Into<String>,
) -> Self {
let broker_addr = broker_addr.into();
let error_message = error_message.into();
let message = "";
Self {
response_code,
error_message: Some(error_message),
broker_addr: Some(broker_addr),
message: String::from(message),
}
}
pub fn response_code(&self) -> i32 {
self.response_code
}
pub fn error_message(&self) -> Option<&String> {
self.error_message.as_ref()
}
pub fn broker_addr(&self) -> Option<&String> {
self.broker_addr.as_ref()
}
}
#[macro_export]
macro_rules! client_broker_err {
($response_code:expr, $error_message:expr, $broker_addr:expr) => {{
std::result::Result::Err($crate::RocketmqError::MQClientBrokerError(
$crate::MQBrokerErr::new_with_broker($response_code as i32, $error_message, $broker_addr),
))
}};
($response_code:expr, $error_message:expr) => {{
std::result::Result::Err($crate::RocketmqError::MQClientBrokerError($crate::MQBrokerErr::new(
$response_code as i32,
$error_message,
)))
}};
}
#[derive(Error, Debug)]
#[error("{message}")]
pub struct ClientErr {
response_code: i32,
error_message: Option<String>,
message: String,
}
impl ClientErr {
pub fn new(error_message: impl Into<String>) -> Self {
let error_message = error_message.into();
let message = "string";
Self {
response_code: -1,
error_message: Some(error_message),
message: String::from(message),
}
}
pub fn new_with_code(response_code: i32, error_message: impl Into<String>) -> Self {
let error_message = error_message.into();
let message = "";
Self {
response_code,
error_message: Some(error_message),
message: String::from(message),
}
}
pub fn response_code(&self) -> i32 {
self.response_code
}
pub fn error_message(&self) -> Option<&String> {
self.error_message.as_ref()
}
}
#[deprecated(
since = "0.7.0",
note = "Use unified error system and macros from rocketmq-client instead"
)]
#[macro_export]
macro_rules! mq_client_err_legacy {
($response_code:expr, $fmt:expr, $($arg:expr),*) => {{
let formatted_msg = format!($fmt, $($arg),*);
std::result::Result::Err($crate::client_error::MQClientError::MQClientErr(
$crate::client_error::ClientErr::new_with_code($response_code as i32, formatted_msg),
))
}};
($response_code:expr, $error_message:expr) => {{
std::result::Result::Err($crate::RocketmqError::MQClientErr(
$crate::ClientErr::new_with_code(
$response_code as i32,
$error_message,
),
))
}};
($error_message:expr) => {{
std::result::Result::Err($crate::RocketmqError::MQClientErr(
$crate::ClientErr::new($error_message),
))
}};
}
#[derive(Error, Debug)]
#[error("{message}")]
pub struct RequestTimeoutErr {
response_code: i32,
error_message: Option<String>,
message: String,
}
impl RequestTimeoutErr {
pub fn new(error_message: impl Into<String>) -> Self {
let error_message = error_message.into();
let message = "FAQUrl::attach_default_url(Some(error_message.as_str()))";
Self {
response_code: -1,
error_message: Some(error_message),
message: String::from(message),
}
}
pub fn new_with_code(response_code: i32, error_message: impl Into<String>) -> Self {
let error_message = error_message.into();
let message =
"FAQUrl::attach_default_url(Some(format!(\"CODE: {} DESC: {}\", response_code, error_message,).as_str()))";
Self {
response_code,
error_message: Some(error_message),
message: String::from(message),
}
}
pub fn response_code(&self) -> i32 {
self.response_code
}
pub fn error_message(&self) -> Option<&String> {
self.error_message.as_ref()
}
}
#[macro_export]
macro_rules! request_timeout_err {
($response_code:expr, $fmt:expr, $($arg:expr),*) => {{
let formatted_msg = format!($fmt, $($arg),*);
std::result::Result::Err($crate::RocketmqError::RequestTimeoutError(
$crate::RequestTimeoutErr::new_with_code(
$response_code as i32,
formatted_msg,
),
))
}};
($response_code:expr, $error_message:expr) => {{
std::result::Result::Err($crate::RocketmqError::RequestTimeoutError(
$crate::RequestTimeoutErr::new_with_code(
$response_code as i32,
$error_message,
),
))
}};
($error_message:expr) => {{
std::result::Result::Err($crate::RocketmqError::RequestTimeoutError(
$crate::RequestTimeoutErr::new($error_message),
))
}};
}
#[deprecated(since = "0.7.0", note = "Use unified::ServiceError instead")]
#[derive(Debug, thiserror::Error)]
pub enum LegacyServiceError {
#[error("Service is already running")]
AlreadyRunning,
#[error("Service is not running")]
NotRunning,
#[error("Service startup failed: {0}")]
StartupFailed(String),
#[error("Service shutdown failed: {0}")]
ShutdownFailed(String),
#[error("Service operation timeout")]
Timeout,
#[error("Service interrupted")]
Interrupted,
}
impl From<RocketmqError> for unified::RocketMQError {
fn from(err: RocketmqError) -> Self {
match err {
RocketmqError::RemoteError(msg) => Self::network_connection_failed("unknown", msg),
RocketmqError::RemotingConnectError(addr) => Self::network_connection_failed(addr, "connection failed"),
RocketmqError::RemotingSendRequestError(addr) => {
Self::network_connection_failed(addr, "send request failed")
}
RocketmqError::RemotingTimeoutError(addr, timeout) => {
Self::Network(unified::NetworkError::RequestTimeout {
addr,
timeout_ms: timeout,
})
}
RocketmqError::RemotingTooMuchRequestError(msg) => Self::illegal_argument(msg),
RocketmqError::DeserializeHeaderError(msg) => {
Self::Serialization(unified::SerializationError::DecodeFailed {
format: "header",
message: msg,
})
}
RocketmqError::RemotingCommandDecoderError(_msg) => Self::Protocol(unified::ProtocolError::DecodeError {
ext_fields_len: 0,
header_len: 0,
}),
RocketmqError::DecodingError(required, available) => {
Self::Serialization(unified::SerializationError::DecodeFailed {
format: "binary",
message: format!("required {} bytes, got {}", required, available),
})
}
RocketmqError::NotSupportSerializeType(t) => {
Self::Protocol(unified::ProtocolError::UnsupportedSerializationType { serialize_type: t })
}
RocketmqError::MQBrokerError(code, msg, addr) => {
Self::broker_operation_failed("BROKER_OPERATION", code, msg).with_broker_addr(addr)
}
RocketmqError::MQClientBrokerError(err) => {
let mut e = Self::broker_operation_failed(
"BROKER_OPERATION",
err.response_code(),
err.error_message().unwrap_or(&String::new()).clone(),
);
if let Some(addr) = err.broker_addr() {
e = e.with_broker_addr(addr.clone());
}
e
}
RocketmqError::OffsetNotFoundError(code, addr, msg) => {
Self::broker_operation_failed("OFFSET_NOT_FOUND", code, msg).with_broker_addr(addr)
}
RocketmqError::MQClientErr(err) => {
Self::illegal_argument(err.error_message().unwrap_or(&String::new()).clone())
}
RocketmqError::RequestTimeoutError(_) => Self::Timeout {
operation: "request",
timeout_ms: 3000,
},
RocketmqError::Io(err) => Self::IO(err),
RocketmqError::IllegalArgument(msg) | RocketmqError::IllegalArgumentError(msg) => {
Self::illegal_argument(msg)
}
RocketmqError::UnsupportedOperationException(msg) => Self::illegal_argument(msg),
RocketmqError::IpError(msg) => Self::illegal_argument(format!("IP error: {}", msg)),
RocketmqError::ChannelError(msg) => Self::Internal(format!("Channel error: {}", msg)),
RocketmqError::NoneError(msg) => Self::Internal(format!("None error: {}", msg)),
RocketmqError::TokioHandlerError(msg) => Self::Internal(format!("Tokio handler error: {}", msg)),
RocketmqError::SubCommand(cmd, msg) => Self::Internal(format!("{} command failed: {}", cmd, msg)),
RocketmqError::StoreCustomError(msg) => Self::StorageReadFailed {
path: "unknown".to_string(),
reason: msg,
},
RocketmqError::ServiceTaskError(err) => Self::Service(err),
#[cfg(feature = "with_serde")]
RocketmqError::SerdeJsonError(err) => {
Self::Serialization(unified::SerializationError::JsonError(err.to_string()))
}
#[cfg(feature = "with_config")]
RocketmqError::ConfigError(err) => Self::ConfigParseFailed {
key: "unknown",
reason: err.to_string(),
},
RocketmqError::RpcError(code, msg) => Self::broker_operation_failed("RPC", code, msg),
RocketmqError::FromStrErr(msg) => Self::illegal_argument(msg),
RocketmqError::Utf8Error(err) => Self::Serialization(unified::SerializationError::Utf8Error(err)),
RocketmqError::ConnectionInvalid(msg) => Self::network_connection_failed("unknown", msg),
RocketmqError::AbortProcessError(code, msg) => {
Self::Internal(format!("Abort process error {}: {}", code, msg))
}
RocketmqError::ChannelSendRequestFailed(msg) => Self::network_connection_failed("channel", msg),
RocketmqError::ChannelRecvRequestFailed(msg) => Self::network_connection_failed("channel", msg),
RocketmqError::RemotingCommandEncoderError(msg) => {
Self::Serialization(unified::SerializationError::EncodeFailed {
format: "command",
message: msg,
})
}
}
}
}