Skip to main content

rocketmq_error/
lib.rs

1// Copyright 2023 The RocketMQ Rust Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! # RocketMQ Error Handling System
16//!
17//! This crate provides a unified, semantic, and performant error handling system
18//! for the RocketMQ Rust implementation.
19//!
20//! ## New Unified Error System (v0.7.0+)
21//!
22//! The new error system provides:
23//! - **Semantic clarity**: Each error type clearly expresses what went wrong
24//! - **Performance**: Minimal heap allocations, optimized for hot paths
25//! - **Ergonomics**: Automatic error conversions via `From` trait
26//! - **Debuggability**: Rich context for production debugging
27//!
28//! ### Usage
29//!
30//! ```rust
31//! use rocketmq_error::RocketMQError;
32//! use rocketmq_error::RocketMQResult;
33//!
34//! fn send_message(addr: &str) -> RocketMQResult<()> {
35//!     if addr.is_empty() {
36//!         return Err(RocketMQError::network_connection_failed(
37//!             "localhost:9876",
38//!             "invalid address",
39//!         ));
40//!     }
41//!     Ok(())
42//! }
43//! # send_message("localhost:9876").unwrap();
44//! ```
45//!
46//! ## Legacy Error System (Deprecated)
47//!
48//! The legacy `RocketmqError` enum is still available for backward compatibility
49//! but will be removed in a future version. Please migrate to the new unified system.
50
51// New unified error system
52pub mod unified;
53
54// Auth error module
55pub mod auth_error;
56
57// Controller error module
58pub mod controller_error;
59
60// Filter error module
61pub mod filter_error;
62
63// Client error module
64pub mod client_error;
65
66// Re-export new error types as primary API
67// Re-export auth error types from unified module
68// Re-export controller error types
69pub use client_error::ClientError;
70pub use controller_error::ControllerError;
71pub use controller_error::ControllerResult;
72// Re-export filter error types
73pub use filter_error::FilterError;
74pub use unified::AuthError;
75pub use unified::NetworkError;
76pub use unified::ProtocolError;
77pub use unified::RocketMQError;
78pub use unified::RpcClientError;
79pub use unified::SerializationError;
80// Re-export result types (but don't conflict with legacy ones below)
81pub use unified::ServiceError as UnifiedServiceError;
82pub use unified::ToolsError;
83
84// Legacy error modules (deprecated but kept for compatibility)
85#[deprecated(since = "0.7.0", note = "Use unified error system instead")]
86mod cli_error;
87#[deprecated(since = "0.7.0", note = "Use unified error system instead")]
88mod common_error;
89#[deprecated(since = "0.7.0", note = "Use unified error system instead")]
90mod name_srv_error;
91#[deprecated(since = "0.7.0", note = "Use unified error system instead")]
92mod remoting_error;
93#[deprecated(since = "0.7.0", note = "Use unified error system instead")]
94mod store_error;
95#[deprecated(since = "0.7.0", note = "Use unified error system instead")]
96mod tui_error;
97
98use std::io;
99
100use thiserror::Error;
101
102// Legacy type aliases (deprecated - use RocketMQResult and Result from unified module)
103// Kept for backward compatibility with existing code
104#[deprecated(since = "0.7.0", note = "Use unified::RocketMQResult instead")]
105pub type LegacyRocketMQResult<T> = std::result::Result<T, RocketmqError>;
106
107#[deprecated(since = "0.7.0", note = "Use unified::Result instead")]
108pub type LegacyResult<T> = anyhow::Result<T>;
109
110// Re-export unified result types as the primary API
111pub use unified::Result;
112pub use unified::RocketMQResult;
113// Import ServiceError for use in legacy RocketmqError enum
114use unified::ServiceError;
115
116#[derive(Debug, Error)]
117pub enum RocketmqError {
118    // remoting errors
119    #[error("{0}")]
120    RemoteError(String),
121
122    #[error("{0}")]
123    DeserializeHeaderError(String),
124
125    #[error("connect to {0} failed")]
126    RemotingConnectError(String),
127
128    #[error("send request to < {0} > failed")]
129    RemotingSendRequestError(String),
130
131    #[error("wait response on the channel < {0}  >, timeout: {1}(ms)")]
132    RemotingTimeoutError(String, u64),
133
134    #[error("RemotingTooMuchRequestException: {0}")]
135    RemotingTooMuchRequestError(String),
136
137    #[error("RpcException: code: {0}, message: {1}")]
138    RpcError(i32, String),
139
140    #[error("{0}")]
141    FromStrErr(String),
142
143    #[error("{0:?}")]
144    Io(#[from] io::Error),
145
146    #[error("RocketMQ protocol decoding failed, extFields length: {0}, but header length: {1}")]
147    DecodingError(usize, usize),
148
149    #[error("UTF-8 decoding error: {0}")]
150    Utf8Error(#[from] std::str::Utf8Error),
151
152    #[error("RemotingCommandDecoderError:{0}")]
153    RemotingCommandDecoderError(String),
154
155    #[error("RemotingCommandEncoderError:{0}")]
156    RemotingCommandEncoderError(String),
157
158    #[error("Not support serialize type: {0}")]
159    NotSupportSerializeType(u8),
160
161    #[error("ConnectionInvalid: {0}")]
162    ConnectionInvalid(String),
163
164    #[error("AbortProcessError: {0}-{1}")]
165    AbortProcessError(i32, String),
166
167    #[error("Channel Send Request failed: {0}")]
168    ChannelSendRequestFailed(String),
169
170    #[error("Channel recv Request failed: {0}")]
171    ChannelRecvRequestFailed(String),
172
173    #[error("{0}")]
174    IllegalArgument(String),
175
176    //client error
177    #[error("{0}")]
178    MQClientErr(#[from] ClientErr),
179
180    #[error("{0}")]
181    MQClientBrokerError(#[from] MQBrokerErr),
182
183    #[error("{0}")]
184    RequestTimeoutError(#[from] RequestTimeoutErr),
185
186    #[error("Client exception occurred: CODE:{0}, broker address:{1}, Message:{2}")]
187    OffsetNotFoundError(i32, String, String),
188
189    #[error("{0}")]
190    IllegalArgumentError(String),
191
192    #[error("{0}")]
193    #[cfg(feature = "with_serde")]
194    SerdeJsonError(#[from] serde_json::Error),
195
196    #[error("{0}")]
197    UnsupportedOperationException(String),
198
199    #[error("{0}")]
200    IpError(String),
201
202    #[error("{0}")]
203    ChannelError(String),
204
205    #[error("Client exception occurred: CODE:{0}, broker address:{2}, Message:{1}")]
206    MQBrokerError(i32, String, String),
207
208    #[error("{0}")]
209    NoneError(String),
210
211    #[error("{0}")]
212    TokioHandlerError(String),
213
214    #[error("Config parse error: {0}")]
215    #[cfg(feature = "with_config")]
216    ConfigError(#[from] config::ConfigError),
217
218    #[error("{0} command failed , {1}")]
219    SubCommand(String, String),
220
221    #[error("{0}")]
222    ServiceTaskError(#[from] ServiceError),
223
224    #[error("{0}")]
225    StoreCustomError(String),
226}
227
228#[derive(Error, Debug)]
229#[error("{message}")]
230pub struct MQBrokerErr {
231    response_code: i32,
232    error_message: Option<String>,
233    broker_addr: Option<String>,
234    message: String,
235}
236
237impl MQBrokerErr {
238    pub fn new(response_code: i32, error_message: impl Into<String>) -> Self {
239        let error_message = error_message.into();
240        let message = format!("CODE: {response_code} DESC: {error_message}");
241        Self {
242            response_code,
243            error_message: Some(error_message),
244            broker_addr: None,
245            message,
246        }
247    }
248
249    pub fn new_with_broker(
250        response_code: i32,
251        error_message: impl Into<String>,
252        broker_addr: impl Into<String>,
253    ) -> Self {
254        let broker_addr = broker_addr.into();
255        let error_message = error_message.into();
256        let message = format!("CODE: {response_code} DESC: {error_message} BROKER: {broker_addr}");
257        Self {
258            response_code,
259            error_message: Some(error_message),
260            broker_addr: Some(broker_addr),
261            message,
262        }
263    }
264
265    pub fn response_code(&self) -> i32 {
266        self.response_code
267    }
268
269    pub fn error_message(&self) -> Option<&String> {
270        self.error_message.as_ref()
271    }
272
273    pub fn broker_addr(&self) -> Option<&String> {
274        self.broker_addr.as_ref()
275    }
276}
277
278#[macro_export]
279macro_rules! client_broker_err {
280    // Handle errors with a custom ResponseCode and formatted string
281    ($response_code:expr, $error_message:expr, $broker_addr:expr) => {{
282        std::result::Result::Err($crate::RocketmqError::MQClientBrokerError(
283            $crate::MQBrokerErr::new_with_broker($response_code as i32, $error_message, $broker_addr),
284        ))
285    }};
286    // Handle errors without a ResponseCode, using only the error message
287    ($response_code:expr, $error_message:expr) => {{
288        std::result::Result::Err($crate::RocketmqError::MQClientBrokerError($crate::MQBrokerErr::new(
289            $response_code as i32,
290            $error_message,
291        )))
292    }};
293}
294
295#[derive(Error, Debug)]
296#[error("{message}")]
297pub struct ClientErr {
298    response_code: i32,
299    error_message: Option<String>,
300    message: String,
301}
302
303impl ClientErr {
304    pub fn new(error_message: impl Into<String>) -> Self {
305        let error_message = error_message.into();
306        let message = error_message.clone();
307        Self {
308            response_code: -1,
309            error_message: Some(error_message),
310            message,
311        }
312    }
313
314    pub fn new_with_code(response_code: i32, error_message: impl Into<String>) -> Self {
315        let error_message = error_message.into();
316        let message = format!("CODE: {response_code} DESC: {error_message}");
317        Self {
318            response_code,
319            error_message: Some(error_message),
320            message,
321        }
322    }
323
324    pub fn response_code(&self) -> i32 {
325        self.response_code
326    }
327
328    pub fn error_message(&self) -> Option<&String> {
329        self.error_message.as_ref()
330    }
331}
332
333// Legacy macro - deprecated in favor of new unified error system
334#[deprecated(
335    since = "0.7.0",
336    note = "Use unified error system and macros from rocketmq-client instead"
337)]
338#[macro_export]
339macro_rules! mq_client_err_legacy {
340    // Handle errors with a custom ResponseCode and formatted string
341    ($response_code:expr, $fmt:expr, $($arg:expr),*) => {{
342        let formatted_msg = format!($fmt, $($arg),*);
343        std::result::Result::Err($crate::client_error::MQClientError::MQClientErr(
344            $crate::client_error::ClientErr::new_with_code($response_code as i32, formatted_msg),
345        ))
346    }};
347
348    ($response_code:expr, $error_message:expr) => {{
349        std::result::Result::Err($crate::RocketmqError::MQClientErr(
350            $crate::ClientErr::new_with_code(
351                $response_code as i32,
352                $error_message,
353            ),
354        ))
355    }};
356
357    // Handle errors without a ResponseCode, using only the error message
358    ($error_message:expr) => {{
359        std::result::Result::Err($crate::RocketmqError::MQClientErr(
360            $crate::ClientErr::new($error_message),
361        ))
362    }};
363}
364
365#[derive(Error, Debug)]
366#[error("{message}")]
367pub struct RequestTimeoutErr {
368    response_code: i32,
369    error_message: Option<String>,
370    message: String,
371}
372
373impl RequestTimeoutErr {
374    pub fn new(error_message: impl Into<String>) -> Self {
375        let error_message = error_message.into();
376        //let message = FAQUrl::attach_default_url(Some(error_message.as_str()));
377        let message = "FAQUrl::attach_default_url(Some(error_message.as_str()))";
378        Self {
379            response_code: -1,
380            error_message: Some(error_message),
381            message: String::from(message),
382        }
383    }
384
385    pub fn new_with_code(response_code: i32, error_message: impl Into<String>) -> Self {
386        let error_message = error_message.into();
387        // let message = FAQUrl::attach_default_url(Some(
388        //     format!("CODE: {}  DESC: {}", response_code, error_message,).as_str(),
389        // ));
390        let message =
391            "FAQUrl::attach_default_url(Some(format!(\"CODE: {}  DESC: {}\", response_code, error_message,).as_str()))";
392        Self {
393            response_code,
394            error_message: Some(error_message),
395            message: String::from(message),
396        }
397    }
398
399    pub fn response_code(&self) -> i32 {
400        self.response_code
401    }
402
403    pub fn error_message(&self) -> Option<&String> {
404        self.error_message.as_ref()
405    }
406}
407
408#[macro_export]
409macro_rules! request_timeout_err {
410    // Handle errors with a custom ResponseCode and formatted string
411    ($response_code:expr, $fmt:expr, $($arg:expr),*) => {{
412        let formatted_msg = format!($fmt, $($arg),*);
413        std::result::Result::Err($crate::RocketmqError::RequestTimeoutError(
414            $crate::RequestTimeoutErr::new_with_code(
415                $response_code as i32,
416                formatted_msg,
417            ),
418        ))
419    }};
420    ($response_code:expr, $error_message:expr) => {{
421        std::result::Result::Err($crate::RocketmqError::RequestTimeoutError(
422            $crate::RequestTimeoutErr::new_with_code(
423                $response_code as i32,
424                $error_message,
425            ),
426        ))
427    }};
428    // Handle errors without a ResponseCode, using only the error message
429    ($error_message:expr) => {{
430        std::result::Result::Err($crate::RocketmqError::RequestTimeoutError(
431            $crate::RequestTimeoutErr::new($error_message),
432        ))
433    }};
434}
435
436//------------------Legacy ServiceError (deprecated)------------------
437
438/// Service error enumeration (LEGACY - deprecated)
439///
440/// Use `unified::ServiceError` instead
441#[deprecated(since = "0.7.0", note = "Use unified::ServiceError instead")]
442#[derive(Debug, thiserror::Error)]
443pub enum LegacyServiceError {
444    #[error("Service is already running")]
445    AlreadyRunning,
446
447    #[error("Service is not running")]
448    NotRunning,
449
450    #[error("Service startup failed: {0}")]
451    StartupFailed(String),
452
453    #[error("Service shutdown failed: {0}")]
454    ShutdownFailed(String),
455
456    #[error("Service operation timeout")]
457    Timeout,
458
459    #[error("Service interrupted")]
460    Interrupted,
461}
462
463// Note: ServiceError is re-exported at the top of this file from unified module
464
465//------------------Automatic conversion from legacy to unified------------------
466
467/// Automatic conversion from legacy `RocketmqError` to unified `RocketMQError`
468///
469/// This allows legacy error-producing code to work with new error-expecting code
470impl From<RocketmqError> for unified::RocketMQError {
471    fn from(err: RocketmqError) -> Self {
472        match err {
473            // Network errors
474            RocketmqError::RemoteError(msg) => Self::network_connection_failed("unknown", msg),
475            RocketmqError::RemotingConnectError(addr) => Self::network_connection_failed(addr, "connection failed"),
476            RocketmqError::RemotingSendRequestError(addr) => {
477                Self::network_connection_failed(addr, "send request failed")
478            }
479            RocketmqError::RemotingTimeoutError(addr, timeout) => {
480                Self::Network(unified::NetworkError::RequestTimeout {
481                    addr,
482                    timeout_ms: timeout,
483                })
484            }
485            RocketmqError::RemotingTooMuchRequestError(msg) => Self::illegal_argument(msg),
486
487            // Protocol errors
488            RocketmqError::DeserializeHeaderError(msg) => {
489                Self::Serialization(unified::SerializationError::DecodeFailed {
490                    format: "header",
491                    message: msg,
492                })
493            }
494            RocketmqError::RemotingCommandDecoderError(_msg) => Self::Protocol(unified::ProtocolError::DecodeError {
495                ext_fields_len: 0,
496                header_len: 0,
497            }),
498            RocketmqError::DecodingError(required, available) => {
499                Self::Serialization(unified::SerializationError::DecodeFailed {
500                    format: "binary",
501                    message: format!("required {} bytes, got {}", required, available),
502                })
503            }
504            RocketmqError::NotSupportSerializeType(t) => {
505                Self::Protocol(unified::ProtocolError::UnsupportedSerializationType { serialize_type: t })
506            }
507
508            // Broker errors
509            RocketmqError::MQBrokerError(code, msg, addr) => {
510                Self::broker_operation_failed("BROKER_OPERATION", code, msg).with_broker_addr(addr)
511            }
512            RocketmqError::MQClientBrokerError(err) => {
513                let mut e = Self::broker_operation_failed(
514                    "BROKER_OPERATION",
515                    err.response_code(),
516                    err.error_message().unwrap_or(&String::new()).clone(),
517                );
518                if let Some(addr) = err.broker_addr() {
519                    e = e.with_broker_addr(addr.clone());
520                }
521                e
522            }
523            RocketmqError::OffsetNotFoundError(code, addr, msg) => {
524                Self::broker_operation_failed("OFFSET_NOT_FOUND", code, msg).with_broker_addr(addr)
525            }
526
527            // Client errors
528            RocketmqError::MQClientErr(err) => {
529                Self::illegal_argument(err.error_message().unwrap_or(&String::new()).clone())
530            }
531            RocketmqError::RequestTimeoutError(_) => Self::Timeout {
532                operation: "request",
533                timeout_ms: 3000,
534            },
535
536            // System errors
537            RocketmqError::Io(err) => Self::IO(err),
538            RocketmqError::IllegalArgument(msg) | RocketmqError::IllegalArgumentError(msg) => {
539                Self::illegal_argument(msg)
540            }
541            RocketmqError::UnsupportedOperationException(msg) => Self::illegal_argument(msg),
542            RocketmqError::IpError(msg) => Self::illegal_argument(format!("IP error: {}", msg)),
543            RocketmqError::ChannelError(msg) => Self::Internal(format!("Channel error: {}", msg)),
544            RocketmqError::NoneError(msg) => Self::Internal(format!("None error: {}", msg)),
545            RocketmqError::TokioHandlerError(msg) => Self::Internal(format!("Tokio handler error: {}", msg)),
546            RocketmqError::SubCommand(cmd, msg) => Self::Internal(format!("{} command failed: {}", cmd, msg)),
547            RocketmqError::StoreCustomError(msg) => Self::StorageReadFailed {
548                path: "unknown".to_string(),
549                reason: msg,
550            },
551
552            // Other errors
553            RocketmqError::ServiceTaskError(err) => Self::Service(err),
554
555            #[cfg(feature = "with_serde")]
556            RocketmqError::SerdeJsonError(err) => {
557                Self::Serialization(unified::SerializationError::JsonError(err.to_string()))
558            }
559
560            #[cfg(feature = "with_config")]
561            RocketmqError::ConfigError(err) => Self::ConfigParseFailed {
562                key: "unknown",
563                reason: err.to_string(),
564            },
565
566            // Handle remaining variants
567            RocketmqError::RpcError(code, msg) => Self::broker_operation_failed("RPC", code, msg),
568            RocketmqError::FromStrErr(msg) => Self::illegal_argument(msg),
569            RocketmqError::Utf8Error(err) => Self::Serialization(unified::SerializationError::Utf8Error(err)),
570            RocketmqError::ConnectionInvalid(msg) => Self::network_connection_failed("unknown", msg),
571            RocketmqError::AbortProcessError(code, msg) => {
572                Self::Internal(format!("Abort process error {}: {}", code, msg))
573            }
574            RocketmqError::ChannelSendRequestFailed(msg) => Self::network_connection_failed("channel", msg),
575            RocketmqError::ChannelRecvRequestFailed(msg) => Self::network_connection_failed("channel", msg),
576            RocketmqError::RemotingCommandEncoderError(msg) => {
577                Self::Serialization(unified::SerializationError::EncodeFailed {
578                    format: "command",
579                    message: msg,
580                })
581            }
582        }
583    }
584}
585
586#[cfg(test)]
587mod tests {
588    use super::*;
589
590    #[test]
591    fn client_err_display_preserves_original_message() {
592        let err = ClientErr::new("Topic of the message does not match its target message queue");
593
594        assert_eq!(
595            err.to_string(),
596            "Topic of the message does not match its target message queue"
597        );
598        assert_eq!(err.response_code(), -1);
599        assert_eq!(
600            err.error_message().map(String::as_str),
601            Some("Topic of the message does not match its target message queue")
602        );
603    }
604
605    #[test]
606    fn client_err_with_code_display_includes_code_and_description() {
607        let err = ClientErr::new_with_code(17, "No route info of this topic");
608
609        assert_eq!(err.to_string(), "CODE: 17 DESC: No route info of this topic");
610        assert_eq!(err.response_code(), 17);
611        assert_eq!(
612            err.error_message().map(String::as_str),
613            Some("No route info of this topic")
614        );
615    }
616
617    #[test]
618    fn broker_err_display_includes_code_description_and_broker_when_present() {
619        let err = MQBrokerErr::new_with_broker(14, "topic does not exist", "127.0.0.1:10911");
620
621        assert_eq!(
622            err.to_string(),
623            "CODE: 14 DESC: topic does not exist BROKER: 127.0.0.1:10911"
624        );
625        assert_eq!(err.response_code(), 14);
626        assert_eq!(err.error_message().map(String::as_str), Some("topic does not exist"));
627        assert_eq!(err.broker_addr().map(String::as_str), Some("127.0.0.1:10911"));
628    }
629
630    #[test]
631    #[allow(deprecated)]
632    fn macro_created_client_error_keeps_message_visible() {
633        let result: LegacyRocketMQResult<()> = crate::mq_client_err_legacy!("send failed");
634        let err = result.expect_err("macro should create a client error");
635
636        assert_eq!(err.to_string(), "send failed");
637    }
638}