mod network;
mod protocol;
mod rpc;
mod serialization;
mod tools;
use std::io;
pub use crate::filter_error::FilterError;
pub use network::NetworkError;
pub use protocol::ProtocolError;
pub use rpc::RpcClientError;
pub use serialization::SerializationError;
use thiserror::Error;
pub use tools::ToolsError;
pub use crate::auth_error::AuthError;
#[allow(deprecated)]
pub use crate::client_error::*;
#[allow(deprecated)]
pub use crate::common_error::*;
pub use crate::controller_error::ControllerError;
#[derive(Debug, Error)]
pub enum RocketMQError {
#[error(transparent)]
Network(#[from] NetworkError),
#[error(transparent)]
Serialization(#[from] SerializationError),
#[error(transparent)]
Protocol(#[from] ProtocolError),
#[error(transparent)]
Rpc(#[from] RpcClientError),
#[error(transparent)]
Authentication(#[from] AuthError),
#[error(transparent)]
Controller(#[from] ControllerError),
#[error("Invalid message property: {0}")]
InvalidProperty(String),
#[error("Broker not found: {name}")]
BrokerNotFound { name: String },
#[error("Broker registration failed for '{name}': {reason}")]
BrokerRegistrationFailed { name: String, reason: String },
#[error("Broker operation '{operation}' failed: code={code}, message={message}")]
BrokerOperationFailed {
operation: &'static str,
code: i32,
message: String,
broker_addr: Option<String>,
},
#[error("Topic '{topic}' does not exist")]
TopicNotExist { topic: String },
#[error("Queue does not exist: topic='{topic}', queue_id={queue_id}")]
QueueNotExist { topic: String, queue_id: i32 },
#[error("Subscription group '{group}' not found")]
SubscriptionGroupNotExist { group: String },
#[error("Queue {queue_id} out of range (0-{max}) for topic '{topic}'")]
QueueIdOutOfRange { topic: String, queue_id: i32, max: i32 },
#[error("Message body length {actual} bytes exceeds limit {limit} bytes")]
MessageTooLarge { actual: usize, limit: usize },
#[error("Message validation failed: {reason}")]
MessageValidationFailed { reason: String },
#[error("Retry limit {current}/{max} exceeded for group '{group}'")]
RetryLimitExceeded { group: String, current: i32, max: i32 },
#[error("Transaction message rejected by broker policy")]
TransactionRejected,
#[error("Broker permission denied: {operation}")]
BrokerPermissionDenied { operation: String },
#[error("Not master broker, master address: {master_address}")]
NotMasterBroker { master_address: String },
#[error("Message lookup failed at offset {offset}")]
MessageLookupFailed { offset: i64 },
#[error("Sending to topic '{topic}' is forbidden")]
TopicSendingForbidden { topic: String },
#[error("Async task '{task}' failed: {context}")]
BrokerAsyncTaskFailed {
task: &'static str,
context: String,
#[source]
source: Box<dyn std::error::Error + Send + Sync>,
},
#[error("Request body {operation} failed: {reason}")]
RequestBodyInvalid { operation: &'static str, reason: String },
#[error("Request header error: {0}")]
RequestHeaderError(String),
#[error("Response {operation} failed: {reason}")]
ResponseProcessFailed { operation: &'static str, reason: String },
#[error("Route information not found for topic '{topic}'")]
RouteNotFound { topic: String },
#[error("Route data inconsistency detected for topic '{topic}': {reason}")]
RouteInconsistent { topic: String, reason: String },
#[error("Broker registration conflict for '{broker_name}': {reason}")]
RouteRegistrationConflict { broker_name: String, reason: String },
#[error("Route state version conflict: expected={expected}, actual={actual}")]
RouteVersionConflict { expected: u64, actual: u64 },
#[error("Cluster '{cluster}' not found")]
ClusterNotFound { cluster: String },
#[error("Client is not started")]
ClientNotStarted,
#[error("Client is already started")]
ClientAlreadyStarted,
#[error("Client is shutting down")]
ClientShuttingDown,
#[error("Invalid client state: expected {expected}, got {actual}")]
ClientInvalidState { expected: &'static str, actual: String },
#[error("Producer is not available")]
ProducerNotAvailable,
#[error("Consumer is not available")]
ConsumerNotAvailable,
#[error(transparent)]
Tools(#[from] ToolsError),
#[error(transparent)]
Filter(#[from] FilterError),
#[error("Storage read failed for '{path}': {reason}")]
StorageReadFailed { path: String, reason: String },
#[error("Storage write failed for '{path}': {reason}")]
StorageWriteFailed { path: String, reason: String },
#[error("Corrupted data detected in '{path}'")]
StorageCorrupted { path: String },
#[error("Out of storage space: {path}")]
StorageOutOfSpace { path: String },
#[error("Failed to acquire lock for '{path}'")]
StorageLockFailed { path: String },
#[error("Configuration parse error for '{key}': {reason}")]
ConfigParseFailed { key: &'static str, reason: String },
#[error("Required configuration '{key}' is missing")]
ConfigMissing { key: &'static str },
#[error("Invalid configuration for '{key}': value='{value}', reason={reason}")]
ConfigInvalidValue {
key: &'static str,
value: String,
reason: String,
},
#[error("Not leader, current leader is: {}", leader_id.map(|id| id.to_string()).unwrap_or_else(|| "unknown".to_string()))]
ControllerNotLeader { leader_id: Option<u64> },
#[error("Raft consensus error: {reason}")]
ControllerRaftError { reason: String },
#[error("Consensus operation '{operation}' timed out after {timeout_ms}ms")]
ControllerConsensusTimeout { operation: &'static str, timeout_ms: u64 },
#[error("Snapshot operation failed: {reason}")]
ControllerSnapshotFailed { reason: String },
#[error("IO error: {0}")]
IO(#[from] io::Error),
#[error("Illegal argument: {0}")]
IllegalArgument(String),
#[error("Operation '{operation}' timed out after {timeout_ms}ms")]
Timeout { operation: &'static str, timeout_ms: u64 },
#[error("Internal error: {0}")]
Internal(String),
#[error("Service error: {0}")]
Service(#[from] ServiceError),
#[error("Invalid RocketMQ version ordinal: {0}")]
InvalidVersionOrdinal(u32),
#[deprecated(since = "0.7.0", note = "Use specific error types instead")]
#[error("{0}")]
Legacy(String),
#[error("Not initialized: {0}")]
NotInitialized(String),
#[error("Message is missing required property: {property}")]
MissingRequiredMessageProperty { property: &'static str },
}
impl RocketMQError {
#[inline]
pub fn network_connection_failed(addr: impl Into<String>, reason: impl Into<String>) -> Self {
Self::Network(NetworkError::connection_failed(addr, reason))
}
#[inline]
pub fn network_timeout(addr: impl Into<String>, timeout: std::time::Duration) -> Self {
Self::Network(NetworkError::request_timeout(addr, timeout.as_millis() as u64))
}
#[inline]
pub fn network_request_failed(addr: impl Into<String>, reason: impl Into<String>) -> Self {
Self::Network(NetworkError::send_failed(addr, reason))
}
#[inline]
pub fn deserialization_failed(format: &'static str, reason: impl Into<String>) -> Self {
Self::Serialization(SerializationError::decode_failed(format, reason))
}
#[inline]
pub fn validation_failed(field: impl Into<String>, reason: impl Into<String>) -> Self {
Self::Tools(ToolsError::validation_error(field, reason))
}
#[inline]
pub fn broker_operation_failed(operation: &'static str, code: i32, message: impl Into<String>) -> Self {
Self::BrokerOperationFailed {
operation,
code,
message: message.into(),
broker_addr: None,
}
}
#[inline]
pub fn storage_read_failed(path: impl Into<String>, reason: impl Into<String>) -> Self {
Self::StorageReadFailed {
path: path.into(),
reason: reason.into(),
}
}
#[inline]
pub fn storage_write_failed(path: impl Into<String>, reason: impl Into<String>) -> Self {
Self::StorageWriteFailed {
path: path.into(),
reason: reason.into(),
}
}
#[inline]
pub fn illegal_argument(message: impl Into<String>) -> Self {
Self::IllegalArgument(message.into())
}
#[inline]
pub fn route_not_found(topic: impl Into<String>) -> Self {
Self::RouteNotFound { topic: topic.into() }
}
#[inline]
pub fn route_registration_conflict(broker_name: impl Into<String>, reason: impl Into<String>) -> Self {
Self::RouteRegistrationConflict {
broker_name: broker_name.into(),
reason: reason.into(),
}
}
#[inline]
pub fn cluster_not_found(cluster: impl Into<String>) -> Self {
Self::ClusterNotFound {
cluster: cluster.into(),
}
}
#[inline]
pub fn request_body_invalid(operation: &'static str, reason: impl Into<String>) -> Self {
Self::RequestBodyInvalid {
operation,
reason: reason.into(),
}
}
#[inline]
pub fn request_header_error(message: impl Into<String>) -> Self {
Self::RequestHeaderError(message.into())
}
#[inline]
pub fn response_process_failed(operation: &'static str, reason: impl Into<String>) -> Self {
Self::ResponseProcessFailed {
operation,
reason: reason.into(),
}
}
pub fn with_broker_addr(self, addr: impl Into<String>) -> Self {
match self {
Self::BrokerOperationFailed {
operation,
code,
message,
broker_addr: _,
} => Self::BrokerOperationFailed {
operation,
code,
message,
broker_addr: Some(addr.into()),
},
other => other,
}
}
#[inline]
pub fn validation_error(field: impl Into<String>, reason: impl Into<String>) -> Self {
Self::Tools(ToolsError::validation_error(field, reason))
}
#[inline]
pub fn topic_not_found(topic: impl Into<String>) -> Self {
Self::Tools(ToolsError::topic_not_found(topic))
}
#[inline]
pub fn topic_already_exists(topic: impl Into<String>) -> Self {
Self::Tools(ToolsError::topic_already_exists(topic))
}
#[inline]
pub fn nameserver_unreachable(addr: impl Into<String>) -> Self {
Self::Tools(ToolsError::nameserver_unreachable(addr))
}
#[inline]
pub fn nameserver_config_invalid(reason: impl Into<String>) -> Self {
Self::Tools(ToolsError::nameserver_config_invalid(reason))
}
#[inline]
pub fn not_initialized(reason: impl Into<String>) -> Self {
Self::NotInitialized(reason.into())
}
#[inline]
pub fn authentication_failed(reason: impl Into<String>) -> Self {
Self::Authentication(AuthError::AuthenticationFailed(reason.into()))
}
#[inline]
pub fn invalid_credential(reason: impl Into<String>) -> Self {
Self::Authentication(AuthError::InvalidCredential(reason.into()))
}
#[inline]
pub fn user_not_found(username: impl Into<String>) -> Self {
Self::Authentication(AuthError::UserNotFound(username.into()))
}
#[inline]
pub fn invalid_signature(reason: impl Into<String>) -> Self {
Self::Authentication(AuthError::InvalidSignature(reason.into()))
}
#[inline]
pub fn controller_not_leader(leader_id: Option<u64>) -> Self {
Self::Controller(ControllerError::NotLeader { leader_id })
}
#[inline]
pub fn controller_raft_error(reason: impl Into<String>) -> Self {
Self::Controller(ControllerError::Raft(reason.into()))
}
#[inline]
pub fn controller_metadata_not_found(key: impl Into<String>) -> Self {
Self::Controller(ControllerError::MetadataNotFound { key: key.into() })
}
#[inline]
pub fn controller_invalid_request(reason: impl Into<String>) -> Self {
Self::Controller(ControllerError::InvalidRequest(reason.into()))
}
#[inline]
pub fn controller_timeout(timeout_ms: u64) -> Self {
Self::Controller(ControllerError::Timeout { timeout_ms })
}
#[inline]
pub fn controller_shutdown() -> Self {
Self::Controller(ControllerError::Shutdown)
}
#[inline]
pub fn filter_empty_bytes() -> Self {
Self::Filter(FilterError::empty_bytes())
}
#[inline]
pub fn filter_invalid_bit_length() -> Self {
Self::Filter(FilterError::invalid_bit_length())
}
#[inline]
pub fn filter_bit_length_too_small() -> Self {
Self::Filter(FilterError::bit_length_too_small())
}
#[inline]
pub fn filter_bit_position_out_of_bounds(pos: usize, max: usize) -> Self {
Self::Filter(FilterError::bit_position_out_of_bounds(pos, max))
}
#[inline]
pub fn filter_byte_position_out_of_bounds(pos: usize, max: usize) -> Self {
Self::Filter(FilterError::byte_position_out_of_bounds(pos, max))
}
#[inline]
pub fn filter_uninitialized() -> Self {
Self::Filter(FilterError::uninitialized())
}
}
impl From<std::str::Utf8Error> for RocketMQError {
#[inline]
fn from(e: std::str::Utf8Error) -> Self {
Self::Serialization(SerializationError::from(e))
}
}
#[cfg(feature = "with_serde")]
impl From<serde_json::Error> for RocketMQError {
#[inline]
fn from(e: serde_json::Error) -> Self {
Self::Serialization(SerializationError::from(e))
}
}
#[cfg(feature = "with_config")]
impl From<config::ConfigError> for RocketMQError {
fn from(e: config::ConfigError) -> Self {
Self::ConfigParseFailed {
key: "unknown",
reason: e.to_string(),
}
}
}
#[derive(Debug, Error)]
pub enum ServiceError {
#[error("Service is already running")]
AlreadyRunning,
#[error("Service is not running")]
NotRunning,
#[error("Service startup failed: {0}")]
StartupFailed(String),
#[error("Service shutdown failed: {0}")]
ShutdownFailed(String),
#[error("Service operation timeout")]
Timeout,
#[error("Service interrupted")]
Interrupted,
}
pub type RocketMQResult<T> = std::result::Result<T, RocketMQError>;
pub type Result<T> = anyhow::Result<T>;
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_error_creation() {
let err = RocketMQError::network_connection_failed("127.0.0.1:9876", "timeout");
assert!(err.to_string().contains("Connection failed"));
}
#[test]
fn test_error_conversion() {
let io_err = io::Error::new(io::ErrorKind::NotFound, "file not found");
let rmq_err: RocketMQError = io_err.into();
assert!(matches!(rmq_err, RocketMQError::IO(_)));
}
#[test]
fn test_broker_operation_with_addr() {
let err =
RocketMQError::broker_operation_failed("SEND_MESSAGE", 1, "failed").with_broker_addr("127.0.0.1:10911");
if let RocketMQError::BrokerOperationFailed { broker_addr, .. } = err {
assert_eq!(broker_addr, Some("127.0.0.1:10911".to_string()));
} else {
panic!("Expected BrokerOperationFailed");
}
}
#[test]
fn test_topic_not_exist() {
let err = RocketMQError::TopicNotExist {
topic: "TestTopic".to_string(),
};
assert_eq!(err.to_string(), "Topic 'TestTopic' does not exist");
}
}