rocketmq_error/
lib.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18//! # RocketMQ Error Handling System
19//!
20//! This crate provides a unified, semantic, and performant error handling system
21//! for the RocketMQ Rust implementation.
22//!
23//! ## New Unified Error System (v0.7.0+)
24//!
25//! The new error system provides:
26//! - **Semantic clarity**: Each error type clearly expresses what went wrong
27//! - **Performance**: Minimal heap allocations, optimized for hot paths
28//! - **Ergonomics**: Automatic error conversions via `From` trait
29//! - **Debuggability**: Rich context for production debugging
30//!
31//! ### Usage
32//!
33//! ```rust
34//! use rocketmq_error::RocketMQError;
35//! use rocketmq_error::RocketMQResult;
36//!
37//! fn send_message(addr: &str) -> RocketMQResult<()> {
38//!     if addr.is_empty() {
39//!         return Err(RocketMQError::network_connection_failed(
40//!             "localhost:9876",
41//!             "invalid address",
42//!         ));
43//!     }
44//!     Ok(())
45//! }
46//! # send_message("localhost:9876").unwrap();
47//! ```
48//!
49//! ## Legacy Error System (Deprecated)
50//!
51//! The legacy `RocketmqError` enum is still available for backward compatibility
52//! but will be removed in a future version. Please migrate to the new unified system.
53
54// New unified error system
55pub mod unified;
56
57// Re-export new error types as primary API
58pub use unified::NetworkError;
59pub use unified::ProtocolError;
60pub use unified::RocketMQError;
61pub use unified::SerializationError;
62// Re-export result types (but don't conflict with legacy ones below)
63pub use unified::ServiceError as UnifiedServiceError;
64pub use unified::ToolsError;
65
66// Legacy error modules (deprecated but kept for compatibility)
67#[deprecated(since = "0.7.0", note = "Use unified error system instead")]
68mod cli_error;
69#[deprecated(since = "0.7.0", note = "Use unified error system instead")]
70mod client_error;
71#[deprecated(since = "0.7.0", note = "Use unified error system instead")]
72mod common_error;
73#[deprecated(since = "0.7.0", note = "Use unified error system instead")]
74mod name_srv_error;
75#[deprecated(since = "0.7.0", note = "Use unified error system instead")]
76mod remoting_error;
77#[deprecated(since = "0.7.0", note = "Use unified error system instead")]
78mod store_error;
79#[deprecated(since = "0.7.0", note = "Use unified error system instead")]
80mod tools_error;
81#[deprecated(since = "0.7.0", note = "Use unified error system instead")]
82mod tui_error;
83
84use std::io;
85
86use thiserror::Error;
87
88// Legacy type aliases (deprecated - use RocketMQResult and Result from unified module)
89// Kept for backward compatibility with existing code
90#[deprecated(since = "0.7.0", note = "Use unified::RocketMQResult instead")]
91pub type LegacyRocketMQResult<T> = std::result::Result<T, RocketmqError>;
92
93#[deprecated(since = "0.7.0", note = "Use unified::Result instead")]
94pub type LegacyResult<T> = anyhow::Result<T>;
95
96// Re-export unified result types as the primary API
97pub use unified::Result;
98pub use unified::RocketMQResult;
99// Import ServiceError for use in legacy RocketmqError enum
100use unified::ServiceError;
101
102#[derive(Debug, Error)]
103pub enum RocketmqError {
104    // remoting errors
105    #[error("{0}")]
106    RemoteError(String),
107
108    #[error("{0}")]
109    DeserializeHeaderError(String),
110
111    #[error("connect to {0} failed")]
112    RemotingConnectError(String),
113
114    #[error("send request to < {0} > failed")]
115    RemotingSendRequestError(String),
116
117    #[error("wait response on the channel < {0}  >, timeout: {1}(ms)")]
118    RemotingTimeoutError(String, u64),
119
120    #[error("RemotingTooMuchRequestException: {0}")]
121    RemotingTooMuchRequestError(String),
122
123    #[error("RpcException: code: {0}, message: {1}")]
124    RpcError(i32, String),
125
126    #[error("{0}")]
127    FromStrErr(String),
128
129    #[error("{0:?}")]
130    Io(#[from] io::Error),
131
132    #[error("RocketMQ protocol decoding failed, extFields length: {0}, but header length: {1}")]
133    DecodingError(usize, usize),
134
135    #[error("UTF-8 decoding error: {0}")]
136    Utf8Error(#[from] std::str::Utf8Error),
137
138    #[error("RemotingCommandDecoderError:{0}")]
139    RemotingCommandDecoderError(String),
140
141    #[error("RemotingCommandEncoderError:{0}")]
142    RemotingCommandEncoderError(String),
143
144    #[error("Not support serialize type: {0}")]
145    NotSupportSerializeType(u8),
146
147    #[error("ConnectionInvalid: {0}")]
148    ConnectionInvalid(String),
149
150    #[error("AbortProcessError: {0}-{1}")]
151    AbortProcessError(i32, String),
152
153    #[error("Channel Send Request failed: {0}")]
154    ChannelSendRequestFailed(String),
155
156    #[error("Channel recv Request failed: {0}")]
157    ChannelRecvRequestFailed(String),
158
159    #[error("{0}")]
160    IllegalArgument(String),
161
162    //client error
163    #[error("{0}")]
164    MQClientErr(#[from] ClientErr),
165
166    #[error("{0}")]
167    MQClientBrokerError(#[from] MQBrokerErr),
168
169    #[error("{0}")]
170    RequestTimeoutError(#[from] RequestTimeoutErr),
171
172    #[error("Client exception occurred: CODE:{0}, broker address:{1}, Message:{2}")]
173    OffsetNotFoundError(i32, String, String),
174
175    #[error("{0}")]
176    IllegalArgumentError(String),
177
178    #[error("{0}")]
179    #[cfg(feature = "with_serde")]
180    SerdeJsonError(#[from] serde_json::Error),
181
182    #[error("{0}")]
183    UnsupportedOperationException(String),
184
185    #[error("{0}")]
186    IpError(String),
187
188    #[error("{0}")]
189    ChannelError(String),
190
191    #[error("Client exception occurred: CODE:{0}, broker address:{2}, Message:{1}")]
192    MQBrokerError(i32, String, String),
193
194    #[error("{0}")]
195    NoneError(String),
196
197    #[error("{0}")]
198    TokioHandlerError(String),
199
200    #[error("Config parse error: {0}")]
201    #[cfg(feature = "with_config")]
202    ConfigError(#[from] config::ConfigError),
203
204    #[error("{0} command failed , {1}")]
205    SubCommand(String, String),
206
207    #[error("{0}")]
208    ServiceTaskError(#[from] ServiceError),
209
210    #[error("{0}")]
211    StoreCustomError(String),
212}
213
214#[derive(Error, Debug)]
215#[error("{message}")]
216pub struct MQBrokerErr {
217    response_code: i32,
218    error_message: Option<String>,
219    broker_addr: Option<String>,
220    message: String,
221}
222
223impl MQBrokerErr {
224    pub fn new(response_code: i32, error_message: impl Into<String>) -> Self {
225        let error_message = error_message.into();
226        /*let message = FAQUrl::attach_default_url(Some(
227            format!("CODE: {}  DESC: {}", response_code, error_message,).as_str(),
228        ));*/
229        let message = "";
230        Self {
231            response_code,
232            error_message: Some(error_message),
233            broker_addr: None,
234            message: String::from(message),
235        }
236    }
237
238    pub fn new_with_broker(
239        response_code: i32,
240        error_message: impl Into<String>,
241        broker_addr: impl Into<String>,
242    ) -> Self {
243        let broker_addr = broker_addr.into();
244        let error_message = error_message.into();
245        /*let message = FAQUrl::attach_default_url(Some(
246            format!(
247                "CODE: {}  DESC: {} BROKER: {}",
248                response_code, error_message, broker_addr
249            )
250            .as_str(),
251        ));*/
252        let message = "";
253        Self {
254            response_code,
255            error_message: Some(error_message),
256            broker_addr: Some(broker_addr),
257            message: String::from(message),
258        }
259    }
260
261    pub fn response_code(&self) -> i32 {
262        self.response_code
263    }
264
265    pub fn error_message(&self) -> Option<&String> {
266        self.error_message.as_ref()
267    }
268
269    pub fn broker_addr(&self) -> Option<&String> {
270        self.broker_addr.as_ref()
271    }
272}
273
274#[macro_export]
275macro_rules! client_broker_err {
276    // Handle errors with a custom ResponseCode and formatted string
277    ($response_code:expr, $error_message:expr, $broker_addr:expr) => {{
278        std::result::Result::Err($crate::RocketmqError::MQClientBrokerError(
279            $crate::MQBrokerErr::new_with_broker(
280                $response_code as i32,
281                $error_message,
282                $broker_addr,
283            ),
284        ))
285    }};
286    // Handle errors without a ResponseCode, using only the error message
287    ($response_code:expr, $error_message:expr) => {{
288        std::result::Result::Err($crate::RocketmqError::MQClientBrokerError(
289            $crate::MQBrokerErr::new($response_code as i32, $error_message),
290        ))
291    }};
292}
293
294#[derive(Error, Debug)]
295#[error("{message}")]
296pub struct ClientErr {
297    response_code: i32,
298    error_message: Option<String>,
299    message: String,
300}
301
302impl ClientErr {
303    pub fn new(error_message: impl Into<String>) -> Self {
304        let error_message = error_message.into();
305        let message = "string";
306        // let message = FAQUrl::attach_default_url(Some(error_message.as_str()));
307        Self {
308            response_code: -1,
309            error_message: Some(error_message),
310            message: String::from(message),
311        }
312    }
313
314    pub fn new_with_code(response_code: i32, error_message: impl Into<String>) -> Self {
315        let error_message = error_message.into();
316        /*let message = FAQUrl::attach_default_url(Some(
317            format!("CODE: {}  DESC: {}", response_code, error_message,).as_str(),
318        ));*/
319        let message = "";
320        Self {
321            response_code,
322            error_message: Some(error_message),
323            message: String::from(message),
324        }
325    }
326
327    pub fn response_code(&self) -> i32 {
328        self.response_code
329    }
330
331    pub fn error_message(&self) -> Option<&String> {
332        self.error_message.as_ref()
333    }
334}
335
336// Legacy macro - deprecated in favor of new unified error system
337#[deprecated(
338    since = "0.7.0",
339    note = "Use unified error system and macros from rocketmq-client instead"
340)]
341#[macro_export]
342macro_rules! mq_client_err_legacy {
343    // Handle errors with a custom ResponseCode and formatted string
344    ($response_code:expr, $fmt:expr, $($arg:expr),*) => {{
345        let formatted_msg = format!($fmt, $($arg),*);
346        std::result::Result::Err($crate::client_error::MQClientError::MQClientErr(
347            $crate::client_error::ClientErr::new_with_code($response_code as i32, formatted_msg),
348        ))
349    }};
350
351    ($response_code:expr, $error_message:expr) => {{
352        std::result::Result::Err($crate::RocketmqError::MQClientErr(
353            $crate::ClientErr::new_with_code(
354                $response_code as i32,
355                $error_message,
356            ),
357        ))
358    }};
359
360    // Handle errors without a ResponseCode, using only the error message
361    ($error_message:expr) => {{
362        std::result::Result::Err($crate::RocketmqError::MQClientErr(
363            $crate::ClientErr::new($error_message),
364        ))
365    }};
366}
367
368#[derive(Error, Debug)]
369#[error("{message}")]
370pub struct RequestTimeoutErr {
371    response_code: i32,
372    error_message: Option<String>,
373    message: String,
374}
375
376impl RequestTimeoutErr {
377    pub fn new(error_message: impl Into<String>) -> Self {
378        let error_message = error_message.into();
379        //let message = FAQUrl::attach_default_url(Some(error_message.as_str()));
380        let message = "FAQUrl::attach_default_url(Some(error_message.as_str()))";
381        Self {
382            response_code: -1,
383            error_message: Some(error_message),
384            message: String::from(message),
385        }
386    }
387
388    pub fn new_with_code(response_code: i32, error_message: impl Into<String>) -> Self {
389        let error_message = error_message.into();
390        // let message = FAQUrl::attach_default_url(Some(
391        //     format!("CODE: {}  DESC: {}", response_code, error_message,).as_str(),
392        // ));
393        let message = "FAQUrl::attach_default_url(Some(format!(\"CODE: {}  DESC: {}\", \
394                       response_code, error_message,).as_str()))";
395        Self {
396            response_code,
397            error_message: Some(error_message),
398            message: String::from(message),
399        }
400    }
401
402    pub fn response_code(&self) -> i32 {
403        self.response_code
404    }
405
406    pub fn error_message(&self) -> Option<&String> {
407        self.error_message.as_ref()
408    }
409}
410
411#[macro_export]
412macro_rules! request_timeout_err {
413    // Handle errors with a custom ResponseCode and formatted string
414    ($response_code:expr, $fmt:expr, $($arg:expr),*) => {{
415        let formatted_msg = format!($fmt, $($arg),*);
416        std::result::Result::Err($crate::RocketmqError::RequestTimeoutError(
417            $crate::RequestTimeoutErr::new_with_code(
418                $response_code as i32,
419                formatted_msg,
420            ),
421        ))
422    }};
423    ($response_code:expr, $error_message:expr) => {{
424        std::result::Result::Err($crate::RocketmqError::RequestTimeoutError(
425            $crate::RequestTimeoutErr::new_with_code(
426                $response_code as i32,
427                $error_message,
428            ),
429        ))
430    }};
431    // Handle errors without a ResponseCode, using only the error message
432    ($error_message:expr) => {{
433        std::result::Result::Err($crate::RocketmqError::RequestTimeoutError(
434            $crate::RequestTimeoutErr::new($error_message),
435        ))
436    }};
437}
438
439//------------------Legacy ServiceError (deprecated)------------------
440
441/// Service error enumeration (LEGACY - deprecated)
442///
443/// Use `unified::ServiceError` instead
444#[deprecated(since = "0.7.0", note = "Use unified::ServiceError instead")]
445#[derive(Debug, thiserror::Error)]
446pub enum LegacyServiceError {
447    #[error("Service is already running")]
448    AlreadyRunning,
449
450    #[error("Service is not running")]
451    NotRunning,
452
453    #[error("Service startup failed: {0}")]
454    StartupFailed(String),
455
456    #[error("Service shutdown failed: {0}")]
457    ShutdownFailed(String),
458
459    #[error("Service operation timeout")]
460    Timeout,
461
462    #[error("Service interrupted")]
463    Interrupted,
464}
465
466// Note: ServiceError is re-exported at the top of this file from unified module
467
468//------------------Automatic conversion from legacy to unified------------------
469
470/// Automatic conversion from legacy `RocketmqError` to unified `RocketMQError`
471///
472/// This allows legacy error-producing code to work with new error-expecting code
473impl From<RocketmqError> for unified::RocketMQError {
474    fn from(err: RocketmqError) -> Self {
475        match err {
476            // Network errors
477            RocketmqError::RemoteError(msg) => Self::network_connection_failed("unknown", msg),
478            RocketmqError::RemotingConnectError(addr) => {
479                Self::network_connection_failed(addr, "connection failed")
480            }
481            RocketmqError::RemotingSendRequestError(addr) => {
482                Self::network_connection_failed(addr, "send request failed")
483            }
484            RocketmqError::RemotingTimeoutError(addr, timeout) => {
485                Self::Network(unified::NetworkError::RequestTimeout {
486                    addr: addr.to_string(),
487                    timeout_ms: timeout,
488                })
489            }
490            RocketmqError::RemotingTooMuchRequestError(msg) => Self::illegal_argument(msg),
491
492            // Protocol errors
493            RocketmqError::DeserializeHeaderError(msg) => {
494                Self::Serialization(unified::SerializationError::DecodeFailed {
495                    format: "header",
496                    message: msg,
497                })
498            }
499            RocketmqError::RemotingCommandDecoderError(_msg) => {
500                Self::Protocol(unified::ProtocolError::DecodeError {
501                    ext_fields_len: 0,
502                    header_len: 0,
503                })
504            }
505            RocketmqError::DecodingError(required, available) => {
506                Self::Serialization(unified::SerializationError::DecodeFailed {
507                    format: "binary",
508                    message: format!("required {} bytes, got {}", required, available),
509                })
510            }
511            RocketmqError::NotSupportSerializeType(t) => {
512                Self::Protocol(unified::ProtocolError::UnsupportedSerializationType {
513                    serialize_type: t,
514                })
515            }
516
517            // Broker errors
518            RocketmqError::MQBrokerError(code, msg, addr) => {
519                Self::broker_operation_failed("BROKER_OPERATION", code, msg).with_broker_addr(addr)
520            }
521            RocketmqError::MQClientBrokerError(err) => {
522                let mut e = Self::broker_operation_failed(
523                    "BROKER_OPERATION",
524                    err.response_code(),
525                    err.error_message().unwrap_or(&String::new()).clone(),
526                );
527                if let Some(addr) = err.broker_addr() {
528                    e = e.with_broker_addr(addr.clone());
529                }
530                e
531            }
532            RocketmqError::OffsetNotFoundError(code, addr, msg) => {
533                Self::broker_operation_failed("OFFSET_NOT_FOUND", code, msg).with_broker_addr(addr)
534            }
535
536            // Client errors
537            RocketmqError::MQClientErr(err) => {
538                Self::illegal_argument(err.error_message().unwrap_or(&String::new()).clone())
539            }
540            RocketmqError::RequestTimeoutError(_) => Self::Timeout {
541                operation: "request",
542                timeout_ms: 3000,
543            },
544
545            // System errors
546            RocketmqError::Io(err) => Self::IO(err),
547            RocketmqError::IllegalArgument(msg) | RocketmqError::IllegalArgumentError(msg) => {
548                Self::illegal_argument(msg)
549            }
550            RocketmqError::UnsupportedOperationException(msg) => Self::illegal_argument(msg),
551            RocketmqError::IpError(msg) => Self::illegal_argument(format!("IP error: {}", msg)),
552            RocketmqError::ChannelError(msg) => Self::Internal(format!("Channel error: {}", msg)),
553            RocketmqError::NoneError(msg) => Self::Internal(format!("None error: {}", msg)),
554            RocketmqError::TokioHandlerError(msg) => {
555                Self::Internal(format!("Tokio handler error: {}", msg))
556            }
557            RocketmqError::SubCommand(cmd, msg) => {
558                Self::Internal(format!("{} command failed: {}", cmd, msg))
559            }
560            RocketmqError::StoreCustomError(msg) => Self::StorageReadFailed {
561                path: "unknown".to_string(),
562                reason: msg,
563            },
564
565            // Other errors
566            RocketmqError::ServiceTaskError(err) => Self::Service(err),
567
568            #[cfg(feature = "with_serde")]
569            RocketmqError::SerdeJsonError(err) => {
570                Self::Serialization(unified::SerializationError::JsonError(err.to_string()))
571            }
572
573            #[cfg(feature = "with_config")]
574            RocketmqError::ConfigError(err) => Self::ConfigParseFailed {
575                key: "unknown",
576                reason: err.to_string(),
577            },
578
579            // Handle remaining variants
580            RocketmqError::RpcError(code, msg) => Self::broker_operation_failed("RPC", code, msg),
581            RocketmqError::FromStrErr(msg) => Self::illegal_argument(msg),
582            RocketmqError::Utf8Error(err) => {
583                Self::Serialization(unified::SerializationError::Utf8Error(err))
584            }
585            RocketmqError::ConnectionInvalid(msg) => {
586                Self::network_connection_failed("unknown", msg)
587            }
588            RocketmqError::AbortProcessError(code, msg) => {
589                Self::Internal(format!("Abort process error {}: {}", code, msg))
590            }
591            RocketmqError::ChannelSendRequestFailed(msg) => {
592                Self::network_connection_failed("channel", msg)
593            }
594            RocketmqError::ChannelRecvRequestFailed(msg) => {
595                Self::network_connection_failed("channel", msg)
596            }
597            RocketmqError::RemotingCommandEncoderError(msg) => {
598                Self::Serialization(unified::SerializationError::EncodeFailed {
599                    format: "command",
600                    message: msg,
601                })
602            }
603        }
604    }
605}