rocketmq_error/
lib.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17/*
18 * Licensed to the Apache Software Foundation (ASF) under one or more
19 * contributor license agreements.  See the NOTICE file distributed with
20 * this work for additional information regarding copyright ownership.
21 * The ASF licenses this file to You under the Apache License, Version 2.0
22 * (the "License"); you may not use this file except in compliance with
23 * the License.  You may obtain a copy of the License at
24 *
25 *     http://www.apache.org/licenses/LICENSE-2.0
26 *
27 * Unless required by applicable law or agreed to in writing, software
28 * distributed under the License is distributed on an "AS IS" BASIS,
29 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
30 * See the License for the specific language governing permissions and
31 * limitations under the License.
32 */
33use std::io;
34
35pub type RocketMQResult<T> = std::result::Result<T, RocketmqError>;
36
37use thiserror::Error;
38
39#[derive(Debug, Error)]
40pub enum RocketmqError {
41    // remoting errors
42    #[error("{0}")]
43    RemoteError(String),
44
45    #[error("{0}")]
46    DeserializeHeaderError(String),
47
48    #[error("connect to {0} failed")]
49    RemotingConnectError(String),
50
51    #[error("send request to < {0} > failed")]
52    RemotingSendRequestError(String),
53
54    #[error("wait response on the channel < {0}  >, timeout: {1}(ms)")]
55    RemotingTimeoutError(String, u64),
56
57    #[error("RemotingTooMuchRequestException: {0}")]
58    RemotingTooMuchRequestError(String),
59
60    #[error("RpcException: code: {0}, message: {1}")]
61    RpcError(i32, String),
62
63    #[error("{0}")]
64    FromStrErr(String),
65
66    #[error("{0:?}")]
67    Io(#[from] io::Error),
68
69    #[error("RocketMQ protocol decoding failed, extFields length: {0}, but header length: {1}")]
70    DecodingError(usize, usize),
71
72    #[error("UTF-8 decoding error: {0}")]
73    Utf8Error(#[from] std::str::Utf8Error),
74
75    #[error("RemotingCommandDecoderError:{0}")]
76    RemotingCommandDecoderError(String),
77
78    #[error("RemotingCommandEncoderError:{0}")]
79    RemotingCommandEncoderError(String),
80
81    #[error("Not support serialize type: {0}")]
82    NotSupportSerializeType(u8),
83
84    #[error("ConnectionInvalid: {0}")]
85    ConnectionInvalid(String),
86
87    #[error("AbortProcessError: {0}-{1}")]
88    AbortProcessError(i32, String),
89
90    #[error("Channel Send Request failed: {0}")]
91    ChannelSendRequestFailed(String),
92
93    #[error("Channel recv Request failed: {0}")]
94    ChannelRecvRequestFailed(String),
95
96    #[error("{0}")]
97    IllegalArgument(String),
98
99    //client error
100    #[error("{0}")]
101    MQClientErr(#[from] ClientErr),
102
103    #[error("{0}")]
104    MQClientBrokerError(#[from] MQBrokerErr),
105
106    #[error("{0}")]
107    RequestTimeoutError(#[from] RequestTimeoutErr),
108
109    #[error("Client exception occurred: CODE:{0}, broker address:{1}, Message:{2}")]
110    OffsetNotFoundError(i32, String, String),
111
112    #[error("{0}")]
113    IllegalArgumentError(String),
114
115    #[error("Serialization error: {0}")]
116    JsonError(String),
117
118    #[error("{0}")]
119    UnsupportedOperationException(String),
120
121    #[error("{0}")]
122    IpError(String),
123
124    #[error("{0}")]
125    ChannelError(String),
126
127    #[error("Client exception occurred: CODE:{0}, broker address:{2}, Message:{1}")]
128    MQBrokerError(i32, String, String),
129
130    #[error("{0}")]
131    NoneError(String),
132
133    #[error("{0}")]
134    TokioHandlerError(String),
135
136    #[error("{0}")]
137    ConfigError(String),
138}
139
140#[derive(Error, Debug)]
141#[error("{message}")]
142pub struct MQBrokerErr {
143    response_code: i32,
144    error_message: Option<String>,
145    broker_addr: Option<String>,
146    message: String,
147}
148
149impl MQBrokerErr {
150    pub fn new(response_code: i32, error_message: impl Into<String>) -> Self {
151        let error_message = error_message.into();
152        /*let message = FAQUrl::attach_default_url(Some(
153            format!("CODE: {}  DESC: {}", response_code, error_message,).as_str(),
154        ));*/
155        let message = "";
156        Self {
157            response_code,
158            error_message: Some(error_message),
159            broker_addr: None,
160            message: String::from(message),
161        }
162    }
163
164    pub fn new_with_broker(
165        response_code: i32,
166        error_message: impl Into<String>,
167        broker_addr: impl Into<String>,
168    ) -> Self {
169        let broker_addr = broker_addr.into();
170        let error_message = error_message.into();
171        /*let message = FAQUrl::attach_default_url(Some(
172            format!(
173                "CODE: {}  DESC: {} BROKER: {}",
174                response_code, error_message, broker_addr
175            )
176            .as_str(),
177        ));*/
178        let message = "";
179        Self {
180            response_code,
181            error_message: Some(error_message),
182            broker_addr: Some(broker_addr),
183            message: String::from(message),
184        }
185    }
186
187    pub fn response_code(&self) -> i32 {
188        self.response_code
189    }
190
191    pub fn error_message(&self) -> Option<&String> {
192        self.error_message.as_ref()
193    }
194
195    pub fn broker_addr(&self) -> Option<&String> {
196        self.broker_addr.as_ref()
197    }
198}
199
200#[macro_export]
201macro_rules! client_broker_err {
202    // Handle errors with a custom ResponseCode and formatted string
203    ($response_code:expr, $error_message:expr, $broker_addr:expr) => {{
204        std::result::Result::Err($crate::RocketmqError::MQClientBrokerError(
205            $crate::MQBrokerErr::new_with_broker(
206                $response_code as i32,
207                $error_message,
208                $broker_addr,
209            ),
210        ))
211    }};
212    // Handle errors without a ResponseCode, using only the error message
213    ($response_code:expr, $error_message:expr) => {{
214        std::result::Result::Err($crate::RocketmqError::MQClientBrokerError(
215            $crate::MQBrokerErr::new($response_code as i32, $error_message),
216        ))
217    }};
218}
219
220#[derive(Error, Debug)]
221#[error("{message}")]
222pub struct ClientErr {
223    response_code: i32,
224    error_message: Option<String>,
225    message: String,
226}
227
228impl ClientErr {
229    pub fn new(error_message: impl Into<String>) -> Self {
230        let error_message = error_message.into();
231        let message = "string";
232        // let message = FAQUrl::attach_default_url(Some(error_message.as_str()));
233        Self {
234            response_code: -1,
235            error_message: Some(error_message),
236            message: String::from(message),
237        }
238    }
239
240    pub fn new_with_code(response_code: i32, error_message: impl Into<String>) -> Self {
241        let error_message = error_message.into();
242        /*let message = FAQUrl::attach_default_url(Some(
243            format!("CODE: {}  DESC: {}", response_code, error_message,).as_str(),
244        ));*/
245        let message = "";
246        Self {
247            response_code,
248            error_message: Some(error_message),
249            message: String::from(message),
250        }
251    }
252
253    pub fn response_code(&self) -> i32 {
254        self.response_code
255    }
256
257    pub fn error_message(&self) -> Option<&String> {
258        self.error_message.as_ref()
259    }
260}
261
262// Create a macro to simplify error creation
263#[macro_export]
264macro_rules! mq_client_err {
265    // Handle errors with a custom ResponseCode and formatted string
266    ($response_code:expr, $fmt:expr, $($arg:expr),*) => {{
267        let formatted_msg = format!($fmt, $($arg),*);
268        std::result::Result::Err($crate::client_error::MQClientError::MQClientErr(
269            $crate::client_error::ClientErr::new_with_code($response_code as i32, formatted_msg),
270        ))
271    }};
272
273    ($response_code:expr, $error_message:expr) => {{
274        std::result::Result::Err($crate::RocketmqError::MQClientErr(
275            $crate::ClientErr::new_with_code(
276                $response_code as i32,
277                $error_message,
278            ),
279        ))
280    }};
281
282    // Handle errors without a ResponseCode, using only the error message
283    ($error_message:expr) => {{
284        std::result::Result::Err($crate::RocketmqError::MQClientErr(
285            $crate::ClientErr::new($error_message),
286        ))
287    }};
288}
289
290#[derive(Error, Debug)]
291#[error("{message}")]
292pub struct RequestTimeoutErr {
293    response_code: i32,
294    error_message: Option<String>,
295    message: String,
296}
297
298impl RequestTimeoutErr {
299    pub fn new(error_message: impl Into<String>) -> Self {
300        let error_message = error_message.into();
301        //let message = FAQUrl::attach_default_url(Some(error_message.as_str()));
302        let message = "FAQUrl::attach_default_url(Some(error_message.as_str()))";
303        Self {
304            response_code: -1,
305            error_message: Some(error_message),
306            message: String::from(message),
307        }
308    }
309
310    pub fn new_with_code(response_code: i32, error_message: impl Into<String>) -> Self {
311        let error_message = error_message.into();
312        // let message = FAQUrl::attach_default_url(Some(
313        //     format!("CODE: {}  DESC: {}", response_code, error_message,).as_str(),
314        // ));
315        let message = "FAQUrl::attach_default_url(Some(format!(\"CODE: {}  DESC: {}\", \
316                       response_code, error_message,).as_str()))";
317        Self {
318            response_code,
319            error_message: Some(error_message),
320            message: String::from(message),
321        }
322    }
323
324    pub fn response_code(&self) -> i32 {
325        self.response_code
326    }
327
328    pub fn error_message(&self) -> Option<&String> {
329        self.error_message.as_ref()
330    }
331}
332
333#[macro_export]
334macro_rules! request_timeout_err {
335    // Handle errors with a custom ResponseCode and formatted string
336    ($response_code:expr, $fmt:expr, $($arg:expr),*) => {{
337        let formatted_msg = format!($fmt, $($arg),*);
338        std::result::Result::Err($crate::RocketmqError::RequestTimeoutError(
339            $crate::RequestTimeoutErr::new_with_code(
340                $response_code as i32,
341                formatted_msg,
342            ),
343        ))
344    }};
345    ($response_code:expr, $error_message:expr) => {{
346        std::result::Result::Err($crate::RocketmqError::RequestTimeoutError(
347            $crate::RequestTimeoutErr::new_with_code(
348                $response_code as i32,
349                $error_message,
350            ),
351        ))
352    }};
353    // Handle errors without a ResponseCode, using only the error message
354    ($error_message:expr) => {{
355        std::result::Result::Err($crate::RocketmqError::RequestTimeoutError(
356            $crate::RequestTimeoutErr::new($error_message),
357        ))
358    }};
359}