1use std::io;
34
35pub type RocketMQResult<T> = std::result::Result<T, RocketmqError>;
36
37use thiserror::Error;
38
39#[derive(Debug, Error)]
40pub enum RocketmqError {
41 #[error("{0}")]
43 RemoteError(String),
44
45 #[error("{0}")]
46 DeserializeHeaderError(String),
47
48 #[error("connect to {0} failed")]
49 RemotingConnectError(String),
50
51 #[error("send request to < {0} > failed")]
52 RemotingSendRequestError(String),
53
54 #[error("wait response on the channel < {0} >, timeout: {1}(ms)")]
55 RemotingTimeoutError(String, u64),
56
57 #[error("RemotingTooMuchRequestException: {0}")]
58 RemotingTooMuchRequestError(String),
59
60 #[error("RpcException: code: {0}, message: {1}")]
61 RpcError(i32, String),
62
63 #[error("{0}")]
64 FromStrErr(String),
65
66 #[error("{0:?}")]
67 Io(#[from] io::Error),
68
69 #[error("RocketMQ protocol decoding failed, extFields length: {0}, but header length: {1}")]
70 DecodingError(usize, usize),
71
72 #[error("UTF-8 decoding error: {0}")]
73 Utf8Error(#[from] std::str::Utf8Error),
74
75 #[error("RemotingCommandDecoderError:{0}")]
76 RemotingCommandDecoderError(String),
77
78 #[error("RemotingCommandEncoderError:{0}")]
79 RemotingCommandEncoderError(String),
80
81 #[error("Not support serialize type: {0}")]
82 NotSupportSerializeType(u8),
83
84 #[error("ConnectionInvalid: {0}")]
85 ConnectionInvalid(String),
86
87 #[error("AbortProcessError: {0}-{1}")]
88 AbortProcessError(i32, String),
89
90 #[error("Channel Send Request failed: {0}")]
91 ChannelSendRequestFailed(String),
92
93 #[error("Channel recv Request failed: {0}")]
94 ChannelRecvRequestFailed(String),
95
96 #[error("{0}")]
97 IllegalArgument(String),
98
99 #[error("{0}")]
101 MQClientErr(#[from] ClientErr),
102
103 #[error("{0}")]
104 MQClientBrokerError(#[from] MQBrokerErr),
105
106 #[error("{0}")]
107 RequestTimeoutError(#[from] RequestTimeoutErr),
108
109 #[error("Client exception occurred: CODE:{0}, broker address:{1}, Message:{2}")]
110 OffsetNotFoundError(i32, String, String),
111
112 #[error("{0}")]
113 IllegalArgumentError(String),
114
115 #[error("Serialization error: {0}")]
116 JsonError(String),
117
118 #[error("{0}")]
119 UnsupportedOperationException(String),
120
121 #[error("{0}")]
122 IpError(String),
123
124 #[error("{0}")]
125 ChannelError(String),
126
127 #[error("Client exception occurred: CODE:{0}, broker address:{2}, Message:{1}")]
128 MQBrokerError(i32, String, String),
129
130 #[error("{0}")]
131 NoneError(String),
132
133 #[error("{0}")]
134 TokioHandlerError(String),
135
136 #[error("{0}")]
137 ConfigError(String),
138}
139
140#[derive(Error, Debug)]
141#[error("{message}")]
142pub struct MQBrokerErr {
143 response_code: i32,
144 error_message: Option<String>,
145 broker_addr: Option<String>,
146 message: String,
147}
148
149impl MQBrokerErr {
150 pub fn new(response_code: i32, error_message: impl Into<String>) -> Self {
151 let error_message = error_message.into();
152 let message = "";
156 Self {
157 response_code,
158 error_message: Some(error_message),
159 broker_addr: None,
160 message: String::from(message),
161 }
162 }
163
164 pub fn new_with_broker(
165 response_code: i32,
166 error_message: impl Into<String>,
167 broker_addr: impl Into<String>,
168 ) -> Self {
169 let broker_addr = broker_addr.into();
170 let error_message = error_message.into();
171 let message = "";
179 Self {
180 response_code,
181 error_message: Some(error_message),
182 broker_addr: Some(broker_addr),
183 message: String::from(message),
184 }
185 }
186
187 pub fn response_code(&self) -> i32 {
188 self.response_code
189 }
190
191 pub fn error_message(&self) -> Option<&String> {
192 self.error_message.as_ref()
193 }
194
195 pub fn broker_addr(&self) -> Option<&String> {
196 self.broker_addr.as_ref()
197 }
198}
199
200#[macro_export]
201macro_rules! client_broker_err {
202 ($response_code:expr, $error_message:expr, $broker_addr:expr) => {{
204 std::result::Result::Err($crate::RocketmqError::MQClientBrokerError(
205 $crate::MQBrokerErr::new_with_broker(
206 $response_code as i32,
207 $error_message,
208 $broker_addr,
209 ),
210 ))
211 }};
212 ($response_code:expr, $error_message:expr) => {{
214 std::result::Result::Err($crate::RocketmqError::MQClientBrokerError(
215 $crate::MQBrokerErr::new($response_code as i32, $error_message),
216 ))
217 }};
218}
219
220#[derive(Error, Debug)]
221#[error("{message}")]
222pub struct ClientErr {
223 response_code: i32,
224 error_message: Option<String>,
225 message: String,
226}
227
228impl ClientErr {
229 pub fn new(error_message: impl Into<String>) -> Self {
230 let error_message = error_message.into();
231 let message = "string";
232 Self {
234 response_code: -1,
235 error_message: Some(error_message),
236 message: String::from(message),
237 }
238 }
239
240 pub fn new_with_code(response_code: i32, error_message: impl Into<String>) -> Self {
241 let error_message = error_message.into();
242 let message = "";
246 Self {
247 response_code,
248 error_message: Some(error_message),
249 message: String::from(message),
250 }
251 }
252
253 pub fn response_code(&self) -> i32 {
254 self.response_code
255 }
256
257 pub fn error_message(&self) -> Option<&String> {
258 self.error_message.as_ref()
259 }
260}
261
262#[macro_export]
264macro_rules! mq_client_err {
265 ($response_code:expr, $fmt:expr, $($arg:expr),*) => {{
267 let formatted_msg = format!($fmt, $($arg),*);
268 std::result::Result::Err($crate::client_error::MQClientError::MQClientErr(
269 $crate::client_error::ClientErr::new_with_code($response_code as i32, formatted_msg),
270 ))
271 }};
272
273 ($response_code:expr, $error_message:expr) => {{
274 std::result::Result::Err($crate::RocketmqError::MQClientErr(
275 $crate::ClientErr::new_with_code(
276 $response_code as i32,
277 $error_message,
278 ),
279 ))
280 }};
281
282 ($error_message:expr) => {{
284 std::result::Result::Err($crate::RocketmqError::MQClientErr(
285 $crate::ClientErr::new($error_message),
286 ))
287 }};
288}
289
290#[derive(Error, Debug)]
291#[error("{message}")]
292pub struct RequestTimeoutErr {
293 response_code: i32,
294 error_message: Option<String>,
295 message: String,
296}
297
298impl RequestTimeoutErr {
299 pub fn new(error_message: impl Into<String>) -> Self {
300 let error_message = error_message.into();
301 let message = "FAQUrl::attach_default_url(Some(error_message.as_str()))";
303 Self {
304 response_code: -1,
305 error_message: Some(error_message),
306 message: String::from(message),
307 }
308 }
309
310 pub fn new_with_code(response_code: i32, error_message: impl Into<String>) -> Self {
311 let error_message = error_message.into();
312 let message = "FAQUrl::attach_default_url(Some(format!(\"CODE: {} DESC: {}\", \
316 response_code, error_message,).as_str()))";
317 Self {
318 response_code,
319 error_message: Some(error_message),
320 message: String::from(message),
321 }
322 }
323
324 pub fn response_code(&self) -> i32 {
325 self.response_code
326 }
327
328 pub fn error_message(&self) -> Option<&String> {
329 self.error_message.as_ref()
330 }
331}
332
333#[macro_export]
334macro_rules! request_timeout_err {
335 ($response_code:expr, $fmt:expr, $($arg:expr),*) => {{
337 let formatted_msg = format!($fmt, $($arg),*);
338 std::result::Result::Err($crate::RocketmqError::RequestTimeoutError(
339 $crate::RequestTimeoutErr::new_with_code(
340 $response_code as i32,
341 formatted_msg,
342 ),
343 ))
344 }};
345 ($response_code:expr, $error_message:expr) => {{
346 std::result::Result::Err($crate::RocketmqError::RequestTimeoutError(
347 $crate::RequestTimeoutErr::new_with_code(
348 $response_code as i32,
349 $error_message,
350 ),
351 ))
352 }};
353 ($error_message:expr) => {{
355 std::result::Result::Err($crate::RocketmqError::RequestTimeoutError(
356 $crate::RequestTimeoutErr::new($error_message),
357 ))
358 }};
359}