rocketmq_error/
lib.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17/*
18 * Licensed to the Apache Software Foundation (ASF) under one or more
19 * contributor license agreements.  See the NOTICE file distributed with
20 * this work for additional information regarding copyright ownership.
21 * The ASF licenses this file to You under the Apache License, Version 2.0
22 * (the "License"); you may not use this file except in compliance with
23 * the License.  You may obtain a copy of the License at
24 *
25 *     http://www.apache.org/licenses/LICENSE-2.0
26 *
27 * Unless required by applicable law or agreed to in writing, software
28 * distributed under the License is distributed on an "AS IS" BASIS,
29 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
30 * See the License for the specific language governing permissions and
31 * limitations under the License.
32 */
33use std::io;
34
35pub type RocketMQResult<T> = std::result::Result<T, RocketmqError>;
36
37pub type Result<T> = anyhow::Result<T>;
38
39use thiserror::Error;
40
41#[derive(Debug, Error)]
42pub enum RocketmqError {
43    // remoting errors
44    #[error("{0}")]
45    RemoteError(String),
46
47    #[error("{0}")]
48    DeserializeHeaderError(String),
49
50    #[error("connect to {0} failed")]
51    RemotingConnectError(String),
52
53    #[error("send request to < {0} > failed")]
54    RemotingSendRequestError(String),
55
56    #[error("wait response on the channel < {0}  >, timeout: {1}(ms)")]
57    RemotingTimeoutError(String, u64),
58
59    #[error("RemotingTooMuchRequestException: {0}")]
60    RemotingTooMuchRequestError(String),
61
62    #[error("RpcException: code: {0}, message: {1}")]
63    RpcError(i32, String),
64
65    #[error("{0}")]
66    FromStrErr(String),
67
68    #[error("{0:?}")]
69    Io(#[from] io::Error),
70
71    #[error("RocketMQ protocol decoding failed, extFields length: {0}, but header length: {1}")]
72    DecodingError(usize, usize),
73
74    #[error("UTF-8 decoding error: {0}")]
75    Utf8Error(#[from] std::str::Utf8Error),
76
77    #[error("RemotingCommandDecoderError:{0}")]
78    RemotingCommandDecoderError(String),
79
80    #[error("RemotingCommandEncoderError:{0}")]
81    RemotingCommandEncoderError(String),
82
83    #[error("Not support serialize type: {0}")]
84    NotSupportSerializeType(u8),
85
86    #[error("ConnectionInvalid: {0}")]
87    ConnectionInvalid(String),
88
89    #[error("AbortProcessError: {0}-{1}")]
90    AbortProcessError(i32, String),
91
92    #[error("Channel Send Request failed: {0}")]
93    ChannelSendRequestFailed(String),
94
95    #[error("Channel recv Request failed: {0}")]
96    ChannelRecvRequestFailed(String),
97
98    #[error("{0}")]
99    IllegalArgument(String),
100
101    //client error
102    #[error("{0}")]
103    MQClientErr(#[from] ClientErr),
104
105    #[error("{0}")]
106    MQClientBrokerError(#[from] MQBrokerErr),
107
108    #[error("{0}")]
109    RequestTimeoutError(#[from] RequestTimeoutErr),
110
111    #[error("Client exception occurred: CODE:{0}, broker address:{1}, Message:{2}")]
112    OffsetNotFoundError(i32, String, String),
113
114    #[error("{0}")]
115    IllegalArgumentError(String),
116
117    #[error("Serialization error: {0}")]
118    JsonError(String),
119
120    #[error("{0}")]
121    UnsupportedOperationException(String),
122
123    #[error("{0}")]
124    IpError(String),
125
126    #[error("{0}")]
127    ChannelError(String),
128
129    #[error("Client exception occurred: CODE:{0}, broker address:{2}, Message:{1}")]
130    MQBrokerError(i32, String, String),
131
132    #[error("{0}")]
133    NoneError(String),
134
135    #[error("{0}")]
136    TokioHandlerError(String),
137
138    #[error("{0}")]
139    ConfigError(String),
140
141    #[error("{0} command failed , {1}")]
142    SubCommand(String, String),
143
144    #[error("{0}")]
145    ServiceTaskError(#[from] ServiceError),
146}
147
148#[derive(Error, Debug)]
149#[error("{message}")]
150pub struct MQBrokerErr {
151    response_code: i32,
152    error_message: Option<String>,
153    broker_addr: Option<String>,
154    message: String,
155}
156
157impl MQBrokerErr {
158    pub fn new(response_code: i32, error_message: impl Into<String>) -> Self {
159        let error_message = error_message.into();
160        /*let message = FAQUrl::attach_default_url(Some(
161            format!("CODE: {}  DESC: {}", response_code, error_message,).as_str(),
162        ));*/
163        let message = "";
164        Self {
165            response_code,
166            error_message: Some(error_message),
167            broker_addr: None,
168            message: String::from(message),
169        }
170    }
171
172    pub fn new_with_broker(
173        response_code: i32,
174        error_message: impl Into<String>,
175        broker_addr: impl Into<String>,
176    ) -> Self {
177        let broker_addr = broker_addr.into();
178        let error_message = error_message.into();
179        /*let message = FAQUrl::attach_default_url(Some(
180            format!(
181                "CODE: {}  DESC: {} BROKER: {}",
182                response_code, error_message, broker_addr
183            )
184            .as_str(),
185        ));*/
186        let message = "";
187        Self {
188            response_code,
189            error_message: Some(error_message),
190            broker_addr: Some(broker_addr),
191            message: String::from(message),
192        }
193    }
194
195    pub fn response_code(&self) -> i32 {
196        self.response_code
197    }
198
199    pub fn error_message(&self) -> Option<&String> {
200        self.error_message.as_ref()
201    }
202
203    pub fn broker_addr(&self) -> Option<&String> {
204        self.broker_addr.as_ref()
205    }
206}
207
208#[macro_export]
209macro_rules! client_broker_err {
210    // Handle errors with a custom ResponseCode and formatted string
211    ($response_code:expr, $error_message:expr, $broker_addr:expr) => {{
212        std::result::Result::Err($crate::RocketmqError::MQClientBrokerError(
213            $crate::MQBrokerErr::new_with_broker(
214                $response_code as i32,
215                $error_message,
216                $broker_addr,
217            ),
218        ))
219    }};
220    // Handle errors without a ResponseCode, using only the error message
221    ($response_code:expr, $error_message:expr) => {{
222        std::result::Result::Err($crate::RocketmqError::MQClientBrokerError(
223            $crate::MQBrokerErr::new($response_code as i32, $error_message),
224        ))
225    }};
226}
227
228#[derive(Error, Debug)]
229#[error("{message}")]
230pub struct ClientErr {
231    response_code: i32,
232    error_message: Option<String>,
233    message: String,
234}
235
236impl ClientErr {
237    pub fn new(error_message: impl Into<String>) -> Self {
238        let error_message = error_message.into();
239        let message = "string";
240        // let message = FAQUrl::attach_default_url(Some(error_message.as_str()));
241        Self {
242            response_code: -1,
243            error_message: Some(error_message),
244            message: String::from(message),
245        }
246    }
247
248    pub fn new_with_code(response_code: i32, error_message: impl Into<String>) -> Self {
249        let error_message = error_message.into();
250        /*let message = FAQUrl::attach_default_url(Some(
251            format!("CODE: {}  DESC: {}", response_code, error_message,).as_str(),
252        ));*/
253        let message = "";
254        Self {
255            response_code,
256            error_message: Some(error_message),
257            message: String::from(message),
258        }
259    }
260
261    pub fn response_code(&self) -> i32 {
262        self.response_code
263    }
264
265    pub fn error_message(&self) -> Option<&String> {
266        self.error_message.as_ref()
267    }
268}
269
270// Create a macro to simplify error creation
271#[macro_export]
272macro_rules! mq_client_err {
273    // Handle errors with a custom ResponseCode and formatted string
274    ($response_code:expr, $fmt:expr, $($arg:expr),*) => {{
275        let formatted_msg = format!($fmt, $($arg),*);
276        std::result::Result::Err($crate::client_error::MQClientError::MQClientErr(
277            $crate::client_error::ClientErr::new_with_code($response_code as i32, formatted_msg),
278        ))
279    }};
280
281    ($response_code:expr, $error_message:expr) => {{
282        std::result::Result::Err($crate::RocketmqError::MQClientErr(
283            $crate::ClientErr::new_with_code(
284                $response_code as i32,
285                $error_message,
286            ),
287        ))
288    }};
289
290    // Handle errors without a ResponseCode, using only the error message
291    ($error_message:expr) => {{
292        std::result::Result::Err($crate::RocketmqError::MQClientErr(
293            $crate::ClientErr::new($error_message),
294        ))
295    }};
296}
297
298#[derive(Error, Debug)]
299#[error("{message}")]
300pub struct RequestTimeoutErr {
301    response_code: i32,
302    error_message: Option<String>,
303    message: String,
304}
305
306impl RequestTimeoutErr {
307    pub fn new(error_message: impl Into<String>) -> Self {
308        let error_message = error_message.into();
309        //let message = FAQUrl::attach_default_url(Some(error_message.as_str()));
310        let message = "FAQUrl::attach_default_url(Some(error_message.as_str()))";
311        Self {
312            response_code: -1,
313            error_message: Some(error_message),
314            message: String::from(message),
315        }
316    }
317
318    pub fn new_with_code(response_code: i32, error_message: impl Into<String>) -> Self {
319        let error_message = error_message.into();
320        // let message = FAQUrl::attach_default_url(Some(
321        //     format!("CODE: {}  DESC: {}", response_code, error_message,).as_str(),
322        // ));
323        let message = "FAQUrl::attach_default_url(Some(format!(\"CODE: {}  DESC: {}\", \
324                       response_code, error_message,).as_str()))";
325        Self {
326            response_code,
327            error_message: Some(error_message),
328            message: String::from(message),
329        }
330    }
331
332    pub fn response_code(&self) -> i32 {
333        self.response_code
334    }
335
336    pub fn error_message(&self) -> Option<&String> {
337        self.error_message.as_ref()
338    }
339}
340
341#[macro_export]
342macro_rules! request_timeout_err {
343    // Handle errors with a custom ResponseCode and formatted string
344    ($response_code:expr, $fmt:expr, $($arg:expr),*) => {{
345        let formatted_msg = format!($fmt, $($arg),*);
346        std::result::Result::Err($crate::RocketmqError::RequestTimeoutError(
347            $crate::RequestTimeoutErr::new_with_code(
348                $response_code as i32,
349                formatted_msg,
350            ),
351        ))
352    }};
353    ($response_code:expr, $error_message:expr) => {{
354        std::result::Result::Err($crate::RocketmqError::RequestTimeoutError(
355            $crate::RequestTimeoutErr::new_with_code(
356                $response_code as i32,
357                $error_message,
358            ),
359        ))
360    }};
361    // Handle errors without a ResponseCode, using only the error message
362    ($error_message:expr) => {{
363        std::result::Result::Err($crate::RocketmqError::RequestTimeoutError(
364            $crate::RequestTimeoutErr::new($error_message),
365        ))
366    }};
367}
368
369//------------------ServiceError------------------
370
371/// Service error enumeration
372#[derive(Debug, thiserror::Error)]
373pub enum ServiceError {
374    #[error("Service is already running")]
375    AlreadyRunning,
376
377    #[error("Service is not running")]
378    NotRunning,
379
380    #[error("Service startup failed: {0}")]
381    StartupFailed(String),
382
383    #[error("Service shutdown failed: {0}")]
384    ShutdownFailed(String),
385
386    #[error("Service operation timeout")]
387    Timeout,
388
389    #[error("Service interrupted")]
390    Interrupted,
391}