Skip to main content

rocketmq_error/
lib.rs

1// Copyright 2023 The RocketMQ Rust Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! # RocketMQ Error Handling System
16//!
17//! This crate provides a unified, semantic, and performant error handling system
18//! for the RocketMQ Rust implementation.
19//!
20//! ## New Unified Error System (v0.7.0+)
21//!
22//! The new error system provides:
23//! - **Semantic clarity**: Each error type clearly expresses what went wrong
24//! - **Performance**: Minimal heap allocations, optimized for hot paths
25//! - **Ergonomics**: Automatic error conversions via `From` trait
26//! - **Debuggability**: Rich context for production debugging
27//!
28//! ### Usage
29//!
30//! ```rust
31//! use rocketmq_error::RocketMQError;
32//! use rocketmq_error::RocketMQResult;
33//!
34//! fn send_message(addr: &str) -> RocketMQResult<()> {
35//!     if addr.is_empty() {
36//!         return Err(RocketMQError::network_connection_failed(
37//!             "localhost:9876",
38//!             "invalid address",
39//!         ));
40//!     }
41//!     Ok(())
42//! }
43//! # send_message("localhost:9876").unwrap();
44//! ```
45//!
46//! ## Legacy Error System (Deprecated)
47//!
48//! The legacy `RocketmqError` enum is still available for backward compatibility
49//! but will be removed in a future version. Please migrate to the new unified system.
50
51// New unified error system
52pub mod unified;
53
54// Auth error module
55pub mod auth_error;
56
57// Controller error module
58pub mod controller_error;
59
60// Filter error module
61pub mod filter_error;
62
63// Client error module
64pub mod client_error;
65
66// Re-export new error types as primary API
67// Re-export auth error types from unified module
68// Re-export controller error types
69pub use client_error::ClientError;
70pub use controller_error::ControllerError;
71pub use controller_error::ControllerResult;
72// Re-export filter error types
73pub use filter_error::FilterError;
74pub use unified::AuthError;
75pub use unified::NetworkError;
76pub use unified::ProtocolError;
77pub use unified::RocketMQError;
78pub use unified::RpcClientError;
79pub use unified::SerializationError;
80// Re-export result types (but don't conflict with legacy ones below)
81pub use unified::ServiceError as UnifiedServiceError;
82pub use unified::ToolsError;
83
84// Legacy error modules (deprecated but kept for compatibility)
85#[deprecated(since = "0.7.0", note = "Use unified error system instead")]
86mod cli_error;
87#[deprecated(since = "0.7.0", note = "Use unified error system instead")]
88mod common_error;
89#[deprecated(since = "0.7.0", note = "Use unified error system instead")]
90mod name_srv_error;
91#[deprecated(since = "0.7.0", note = "Use unified error system instead")]
92mod remoting_error;
93#[deprecated(since = "0.7.0", note = "Use unified error system instead")]
94mod store_error;
95#[deprecated(since = "0.7.0", note = "Use unified error system instead")]
96mod tui_error;
97
98use std::io;
99
100use thiserror::Error;
101
102// Legacy type aliases (deprecated - use RocketMQResult and Result from unified module)
103// Kept for backward compatibility with existing code
104#[deprecated(since = "0.7.0", note = "Use unified::RocketMQResult instead")]
105pub type LegacyRocketMQResult<T> = std::result::Result<T, RocketmqError>;
106
107#[deprecated(since = "0.7.0", note = "Use unified::Result instead")]
108pub type LegacyResult<T> = anyhow::Result<T>;
109
110// Re-export unified result types as the primary API
111pub use unified::Result;
112pub use unified::RocketMQResult;
113// Import ServiceError for use in legacy RocketmqError enum
114use unified::ServiceError;
115
116#[derive(Debug, Error)]
117pub enum RocketmqError {
118    // remoting errors
119    #[error("{0}")]
120    RemoteError(String),
121
122    #[error("{0}")]
123    DeserializeHeaderError(String),
124
125    #[error("connect to {0} failed")]
126    RemotingConnectError(String),
127
128    #[error("send request to < {0} > failed")]
129    RemotingSendRequestError(String),
130
131    #[error("wait response on the channel < {0}  >, timeout: {1}(ms)")]
132    RemotingTimeoutError(String, u64),
133
134    #[error("RemotingTooMuchRequestException: {0}")]
135    RemotingTooMuchRequestError(String),
136
137    #[error("RpcException: code: {0}, message: {1}")]
138    RpcError(i32, String),
139
140    #[error("{0}")]
141    FromStrErr(String),
142
143    #[error("{0:?}")]
144    Io(#[from] io::Error),
145
146    #[error("RocketMQ protocol decoding failed, extFields length: {0}, but header length: {1}")]
147    DecodingError(usize, usize),
148
149    #[error("UTF-8 decoding error: {0}")]
150    Utf8Error(#[from] std::str::Utf8Error),
151
152    #[error("RemotingCommandDecoderError:{0}")]
153    RemotingCommandDecoderError(String),
154
155    #[error("RemotingCommandEncoderError:{0}")]
156    RemotingCommandEncoderError(String),
157
158    #[error("Not support serialize type: {0}")]
159    NotSupportSerializeType(u8),
160
161    #[error("ConnectionInvalid: {0}")]
162    ConnectionInvalid(String),
163
164    #[error("AbortProcessError: {0}-{1}")]
165    AbortProcessError(i32, String),
166
167    #[error("Channel Send Request failed: {0}")]
168    ChannelSendRequestFailed(String),
169
170    #[error("Channel recv Request failed: {0}")]
171    ChannelRecvRequestFailed(String),
172
173    #[error("{0}")]
174    IllegalArgument(String),
175
176    //client error
177    #[error("{0}")]
178    MQClientErr(#[from] ClientErr),
179
180    #[error("{0}")]
181    MQClientBrokerError(#[from] MQBrokerErr),
182
183    #[error("{0}")]
184    RequestTimeoutError(#[from] RequestTimeoutErr),
185
186    #[error("Client exception occurred: CODE:{0}, broker address:{1}, Message:{2}")]
187    OffsetNotFoundError(i32, String, String),
188
189    #[error("{0}")]
190    IllegalArgumentError(String),
191
192    #[error("{0}")]
193    #[cfg(feature = "with_serde")]
194    SerdeJsonError(#[from] serde_json::Error),
195
196    #[error("{0}")]
197    UnsupportedOperationException(String),
198
199    #[error("{0}")]
200    IpError(String),
201
202    #[error("{0}")]
203    ChannelError(String),
204
205    #[error("Client exception occurred: CODE:{0}, broker address:{2}, Message:{1}")]
206    MQBrokerError(i32, String, String),
207
208    #[error("{0}")]
209    NoneError(String),
210
211    #[error("{0}")]
212    TokioHandlerError(String),
213
214    #[error("Config parse error: {0}")]
215    #[cfg(feature = "with_config")]
216    ConfigError(#[from] config::ConfigError),
217
218    #[error("{0} command failed , {1}")]
219    SubCommand(String, String),
220
221    #[error("{0}")]
222    ServiceTaskError(#[from] ServiceError),
223
224    #[error("{0}")]
225    StoreCustomError(String),
226}
227
228#[derive(Error, Debug)]
229#[error("{message}")]
230pub struct MQBrokerErr {
231    response_code: i32,
232    error_message: Option<String>,
233    broker_addr: Option<String>,
234    message: String,
235}
236
237impl MQBrokerErr {
238    pub fn new(response_code: i32, error_message: impl Into<String>) -> Self {
239        let error_message = error_message.into();
240        let message = "";
241        Self {
242            response_code,
243            error_message: Some(error_message),
244            broker_addr: None,
245            message: String::from(message),
246        }
247    }
248
249    pub fn new_with_broker(
250        response_code: i32,
251        error_message: impl Into<String>,
252        broker_addr: impl Into<String>,
253    ) -> Self {
254        let broker_addr = broker_addr.into();
255        let error_message = error_message.into();
256        let message = "";
257        Self {
258            response_code,
259            error_message: Some(error_message),
260            broker_addr: Some(broker_addr),
261            message: String::from(message),
262        }
263    }
264
265    pub fn response_code(&self) -> i32 {
266        self.response_code
267    }
268
269    pub fn error_message(&self) -> Option<&String> {
270        self.error_message.as_ref()
271    }
272
273    pub fn broker_addr(&self) -> Option<&String> {
274        self.broker_addr.as_ref()
275    }
276}
277
278#[macro_export]
279macro_rules! client_broker_err {
280    // Handle errors with a custom ResponseCode and formatted string
281    ($response_code:expr, $error_message:expr, $broker_addr:expr) => {{
282        std::result::Result::Err($crate::RocketmqError::MQClientBrokerError(
283            $crate::MQBrokerErr::new_with_broker($response_code as i32, $error_message, $broker_addr),
284        ))
285    }};
286    // Handle errors without a ResponseCode, using only the error message
287    ($response_code:expr, $error_message:expr) => {{
288        std::result::Result::Err($crate::RocketmqError::MQClientBrokerError($crate::MQBrokerErr::new(
289            $response_code as i32,
290            $error_message,
291        )))
292    }};
293}
294
295#[derive(Error, Debug)]
296#[error("{message}")]
297pub struct ClientErr {
298    response_code: i32,
299    error_message: Option<String>,
300    message: String,
301}
302
303impl ClientErr {
304    pub fn new(error_message: impl Into<String>) -> Self {
305        let error_message = error_message.into();
306        let message = "string";
307        // let message = FAQUrl::attach_default_url(Some(error_message.as_str()));
308        Self {
309            response_code: -1,
310            error_message: Some(error_message),
311            message: String::from(message),
312        }
313    }
314
315    pub fn new_with_code(response_code: i32, error_message: impl Into<String>) -> Self {
316        let error_message = error_message.into();
317        /*let message = FAQUrl::attach_default_url(Some(
318            format!("CODE: {}  DESC: {}", response_code, error_message,).as_str(),
319        ));*/
320        let message = "";
321        Self {
322            response_code,
323            error_message: Some(error_message),
324            message: String::from(message),
325        }
326    }
327
328    pub fn response_code(&self) -> i32 {
329        self.response_code
330    }
331
332    pub fn error_message(&self) -> Option<&String> {
333        self.error_message.as_ref()
334    }
335}
336
337// Legacy macro - deprecated in favor of new unified error system
338#[deprecated(
339    since = "0.7.0",
340    note = "Use unified error system and macros from rocketmq-client instead"
341)]
342#[macro_export]
343macro_rules! mq_client_err_legacy {
344    // Handle errors with a custom ResponseCode and formatted string
345    ($response_code:expr, $fmt:expr, $($arg:expr),*) => {{
346        let formatted_msg = format!($fmt, $($arg),*);
347        std::result::Result::Err($crate::client_error::MQClientError::MQClientErr(
348            $crate::client_error::ClientErr::new_with_code($response_code as i32, formatted_msg),
349        ))
350    }};
351
352    ($response_code:expr, $error_message:expr) => {{
353        std::result::Result::Err($crate::RocketmqError::MQClientErr(
354            $crate::ClientErr::new_with_code(
355                $response_code as i32,
356                $error_message,
357            ),
358        ))
359    }};
360
361    // Handle errors without a ResponseCode, using only the error message
362    ($error_message:expr) => {{
363        std::result::Result::Err($crate::RocketmqError::MQClientErr(
364            $crate::ClientErr::new($error_message),
365        ))
366    }};
367}
368
369#[derive(Error, Debug)]
370#[error("{message}")]
371pub struct RequestTimeoutErr {
372    response_code: i32,
373    error_message: Option<String>,
374    message: String,
375}
376
377impl RequestTimeoutErr {
378    pub fn new(error_message: impl Into<String>) -> Self {
379        let error_message = error_message.into();
380        //let message = FAQUrl::attach_default_url(Some(error_message.as_str()));
381        let message = "FAQUrl::attach_default_url(Some(error_message.as_str()))";
382        Self {
383            response_code: -1,
384            error_message: Some(error_message),
385            message: String::from(message),
386        }
387    }
388
389    pub fn new_with_code(response_code: i32, error_message: impl Into<String>) -> Self {
390        let error_message = error_message.into();
391        // let message = FAQUrl::attach_default_url(Some(
392        //     format!("CODE: {}  DESC: {}", response_code, error_message,).as_str(),
393        // ));
394        let message =
395            "FAQUrl::attach_default_url(Some(format!(\"CODE: {}  DESC: {}\", response_code, error_message,).as_str()))";
396        Self {
397            response_code,
398            error_message: Some(error_message),
399            message: String::from(message),
400        }
401    }
402
403    pub fn response_code(&self) -> i32 {
404        self.response_code
405    }
406
407    pub fn error_message(&self) -> Option<&String> {
408        self.error_message.as_ref()
409    }
410}
411
412#[macro_export]
413macro_rules! request_timeout_err {
414    // Handle errors with a custom ResponseCode and formatted string
415    ($response_code:expr, $fmt:expr, $($arg:expr),*) => {{
416        let formatted_msg = format!($fmt, $($arg),*);
417        std::result::Result::Err($crate::RocketmqError::RequestTimeoutError(
418            $crate::RequestTimeoutErr::new_with_code(
419                $response_code as i32,
420                formatted_msg,
421            ),
422        ))
423    }};
424    ($response_code:expr, $error_message:expr) => {{
425        std::result::Result::Err($crate::RocketmqError::RequestTimeoutError(
426            $crate::RequestTimeoutErr::new_with_code(
427                $response_code as i32,
428                $error_message,
429            ),
430        ))
431    }};
432    // Handle errors without a ResponseCode, using only the error message
433    ($error_message:expr) => {{
434        std::result::Result::Err($crate::RocketmqError::RequestTimeoutError(
435            $crate::RequestTimeoutErr::new($error_message),
436        ))
437    }};
438}
439
440//------------------Legacy ServiceError (deprecated)------------------
441
442/// Service error enumeration (LEGACY - deprecated)
443///
444/// Use `unified::ServiceError` instead
445#[deprecated(since = "0.7.0", note = "Use unified::ServiceError instead")]
446#[derive(Debug, thiserror::Error)]
447pub enum LegacyServiceError {
448    #[error("Service is already running")]
449    AlreadyRunning,
450
451    #[error("Service is not running")]
452    NotRunning,
453
454    #[error("Service startup failed: {0}")]
455    StartupFailed(String),
456
457    #[error("Service shutdown failed: {0}")]
458    ShutdownFailed(String),
459
460    #[error("Service operation timeout")]
461    Timeout,
462
463    #[error("Service interrupted")]
464    Interrupted,
465}
466
467// Note: ServiceError is re-exported at the top of this file from unified module
468
469//------------------Automatic conversion from legacy to unified------------------
470
471/// Automatic conversion from legacy `RocketmqError` to unified `RocketMQError`
472///
473/// This allows legacy error-producing code to work with new error-expecting code
474impl From<RocketmqError> for unified::RocketMQError {
475    fn from(err: RocketmqError) -> Self {
476        match err {
477            // Network errors
478            RocketmqError::RemoteError(msg) => Self::network_connection_failed("unknown", msg),
479            RocketmqError::RemotingConnectError(addr) => Self::network_connection_failed(addr, "connection failed"),
480            RocketmqError::RemotingSendRequestError(addr) => {
481                Self::network_connection_failed(addr, "send request failed")
482            }
483            RocketmqError::RemotingTimeoutError(addr, timeout) => {
484                Self::Network(unified::NetworkError::RequestTimeout {
485                    addr,
486                    timeout_ms: timeout,
487                })
488            }
489            RocketmqError::RemotingTooMuchRequestError(msg) => Self::illegal_argument(msg),
490
491            // Protocol errors
492            RocketmqError::DeserializeHeaderError(msg) => {
493                Self::Serialization(unified::SerializationError::DecodeFailed {
494                    format: "header",
495                    message: msg,
496                })
497            }
498            RocketmqError::RemotingCommandDecoderError(_msg) => Self::Protocol(unified::ProtocolError::DecodeError {
499                ext_fields_len: 0,
500                header_len: 0,
501            }),
502            RocketmqError::DecodingError(required, available) => {
503                Self::Serialization(unified::SerializationError::DecodeFailed {
504                    format: "binary",
505                    message: format!("required {} bytes, got {}", required, available),
506                })
507            }
508            RocketmqError::NotSupportSerializeType(t) => {
509                Self::Protocol(unified::ProtocolError::UnsupportedSerializationType { serialize_type: t })
510            }
511
512            // Broker errors
513            RocketmqError::MQBrokerError(code, msg, addr) => {
514                Self::broker_operation_failed("BROKER_OPERATION", code, msg).with_broker_addr(addr)
515            }
516            RocketmqError::MQClientBrokerError(err) => {
517                let mut e = Self::broker_operation_failed(
518                    "BROKER_OPERATION",
519                    err.response_code(),
520                    err.error_message().unwrap_or(&String::new()).clone(),
521                );
522                if let Some(addr) = err.broker_addr() {
523                    e = e.with_broker_addr(addr.clone());
524                }
525                e
526            }
527            RocketmqError::OffsetNotFoundError(code, addr, msg) => {
528                Self::broker_operation_failed("OFFSET_NOT_FOUND", code, msg).with_broker_addr(addr)
529            }
530
531            // Client errors
532            RocketmqError::MQClientErr(err) => {
533                Self::illegal_argument(err.error_message().unwrap_or(&String::new()).clone())
534            }
535            RocketmqError::RequestTimeoutError(_) => Self::Timeout {
536                operation: "request",
537                timeout_ms: 3000,
538            },
539
540            // System errors
541            RocketmqError::Io(err) => Self::IO(err),
542            RocketmqError::IllegalArgument(msg) | RocketmqError::IllegalArgumentError(msg) => {
543                Self::illegal_argument(msg)
544            }
545            RocketmqError::UnsupportedOperationException(msg) => Self::illegal_argument(msg),
546            RocketmqError::IpError(msg) => Self::illegal_argument(format!("IP error: {}", msg)),
547            RocketmqError::ChannelError(msg) => Self::Internal(format!("Channel error: {}", msg)),
548            RocketmqError::NoneError(msg) => Self::Internal(format!("None error: {}", msg)),
549            RocketmqError::TokioHandlerError(msg) => Self::Internal(format!("Tokio handler error: {}", msg)),
550            RocketmqError::SubCommand(cmd, msg) => Self::Internal(format!("{} command failed: {}", cmd, msg)),
551            RocketmqError::StoreCustomError(msg) => Self::StorageReadFailed {
552                path: "unknown".to_string(),
553                reason: msg,
554            },
555
556            // Other errors
557            RocketmqError::ServiceTaskError(err) => Self::Service(err),
558
559            #[cfg(feature = "with_serde")]
560            RocketmqError::SerdeJsonError(err) => {
561                Self::Serialization(unified::SerializationError::JsonError(err.to_string()))
562            }
563
564            #[cfg(feature = "with_config")]
565            RocketmqError::ConfigError(err) => Self::ConfigParseFailed {
566                key: "unknown",
567                reason: err.to_string(),
568            },
569
570            // Handle remaining variants
571            RocketmqError::RpcError(code, msg) => Self::broker_operation_failed("RPC", code, msg),
572            RocketmqError::FromStrErr(msg) => Self::illegal_argument(msg),
573            RocketmqError::Utf8Error(err) => Self::Serialization(unified::SerializationError::Utf8Error(err)),
574            RocketmqError::ConnectionInvalid(msg) => Self::network_connection_failed("unknown", msg),
575            RocketmqError::AbortProcessError(code, msg) => {
576                Self::Internal(format!("Abort process error {}: {}", code, msg))
577            }
578            RocketmqError::ChannelSendRequestFailed(msg) => Self::network_connection_failed("channel", msg),
579            RocketmqError::ChannelRecvRequestFailed(msg) => Self::network_connection_failed("channel", msg),
580            RocketmqError::RemotingCommandEncoderError(msg) => {
581                Self::Serialization(unified::SerializationError::EncodeFailed {
582                    format: "command",
583                    message: msg,
584                })
585            }
586        }
587    }
588}