1use std::io;
34
35pub type RocketMQResult<T> = std::result::Result<T, RocketmqError>;
36
37pub type Result<T> = anyhow::Result<T>;
38
39use thiserror::Error;
40
41#[derive(Debug, Error)]
42pub enum RocketmqError {
43 #[error("{0}")]
45 RemoteError(String),
46
47 #[error("{0}")]
48 DeserializeHeaderError(String),
49
50 #[error("connect to {0} failed")]
51 RemotingConnectError(String),
52
53 #[error("send request to < {0} > failed")]
54 RemotingSendRequestError(String),
55
56 #[error("wait response on the channel < {0} >, timeout: {1}(ms)")]
57 RemotingTimeoutError(String, u64),
58
59 #[error("RemotingTooMuchRequestException: {0}")]
60 RemotingTooMuchRequestError(String),
61
62 #[error("RpcException: code: {0}, message: {1}")]
63 RpcError(i32, String),
64
65 #[error("{0}")]
66 FromStrErr(String),
67
68 #[error("{0:?}")]
69 Io(#[from] io::Error),
70
71 #[error("RocketMQ protocol decoding failed, extFields length: {0}, but header length: {1}")]
72 DecodingError(usize, usize),
73
74 #[error("UTF-8 decoding error: {0}")]
75 Utf8Error(#[from] std::str::Utf8Error),
76
77 #[error("RemotingCommandDecoderError:{0}")]
78 RemotingCommandDecoderError(String),
79
80 #[error("RemotingCommandEncoderError:{0}")]
81 RemotingCommandEncoderError(String),
82
83 #[error("Not support serialize type: {0}")]
84 NotSupportSerializeType(u8),
85
86 #[error("ConnectionInvalid: {0}")]
87 ConnectionInvalid(String),
88
89 #[error("AbortProcessError: {0}-{1}")]
90 AbortProcessError(i32, String),
91
92 #[error("Channel Send Request failed: {0}")]
93 ChannelSendRequestFailed(String),
94
95 #[error("Channel recv Request failed: {0}")]
96 ChannelRecvRequestFailed(String),
97
98 #[error("{0}")]
99 IllegalArgument(String),
100
101 #[error("{0}")]
103 MQClientErr(#[from] ClientErr),
104
105 #[error("{0}")]
106 MQClientBrokerError(#[from] MQBrokerErr),
107
108 #[error("{0}")]
109 RequestTimeoutError(#[from] RequestTimeoutErr),
110
111 #[error("Client exception occurred: CODE:{0}, broker address:{1}, Message:{2}")]
112 OffsetNotFoundError(i32, String, String),
113
114 #[error("{0}")]
115 IllegalArgumentError(String),
116
117 #[error("Serialization error: {0}")]
118 JsonError(String),
119
120 #[error("{0}")]
121 UnsupportedOperationException(String),
122
123 #[error("{0}")]
124 IpError(String),
125
126 #[error("{0}")]
127 ChannelError(String),
128
129 #[error("Client exception occurred: CODE:{0}, broker address:{2}, Message:{1}")]
130 MQBrokerError(i32, String, String),
131
132 #[error("{0}")]
133 NoneError(String),
134
135 #[error("{0}")]
136 TokioHandlerError(String),
137
138 #[error("{0}")]
139 ConfigError(String),
140
141 #[error("{0} command failed , {1}")]
142 SubCommand(String, String),
143
144 #[error("{0}")]
145 ServiceTaskError(#[from] ServiceError),
146}
147
148#[derive(Error, Debug)]
149#[error("{message}")]
150pub struct MQBrokerErr {
151 response_code: i32,
152 error_message: Option<String>,
153 broker_addr: Option<String>,
154 message: String,
155}
156
157impl MQBrokerErr {
158 pub fn new(response_code: i32, error_message: impl Into<String>) -> Self {
159 let error_message = error_message.into();
160 let message = "";
164 Self {
165 response_code,
166 error_message: Some(error_message),
167 broker_addr: None,
168 message: String::from(message),
169 }
170 }
171
172 pub fn new_with_broker(
173 response_code: i32,
174 error_message: impl Into<String>,
175 broker_addr: impl Into<String>,
176 ) -> Self {
177 let broker_addr = broker_addr.into();
178 let error_message = error_message.into();
179 let message = "";
187 Self {
188 response_code,
189 error_message: Some(error_message),
190 broker_addr: Some(broker_addr),
191 message: String::from(message),
192 }
193 }
194
195 pub fn response_code(&self) -> i32 {
196 self.response_code
197 }
198
199 pub fn error_message(&self) -> Option<&String> {
200 self.error_message.as_ref()
201 }
202
203 pub fn broker_addr(&self) -> Option<&String> {
204 self.broker_addr.as_ref()
205 }
206}
207
208#[macro_export]
209macro_rules! client_broker_err {
210 ($response_code:expr, $error_message:expr, $broker_addr:expr) => {{
212 std::result::Result::Err($crate::RocketmqError::MQClientBrokerError(
213 $crate::MQBrokerErr::new_with_broker(
214 $response_code as i32,
215 $error_message,
216 $broker_addr,
217 ),
218 ))
219 }};
220 ($response_code:expr, $error_message:expr) => {{
222 std::result::Result::Err($crate::RocketmqError::MQClientBrokerError(
223 $crate::MQBrokerErr::new($response_code as i32, $error_message),
224 ))
225 }};
226}
227
228#[derive(Error, Debug)]
229#[error("{message}")]
230pub struct ClientErr {
231 response_code: i32,
232 error_message: Option<String>,
233 message: String,
234}
235
236impl ClientErr {
237 pub fn new(error_message: impl Into<String>) -> Self {
238 let error_message = error_message.into();
239 let message = "string";
240 Self {
242 response_code: -1,
243 error_message: Some(error_message),
244 message: String::from(message),
245 }
246 }
247
248 pub fn new_with_code(response_code: i32, error_message: impl Into<String>) -> Self {
249 let error_message = error_message.into();
250 let message = "";
254 Self {
255 response_code,
256 error_message: Some(error_message),
257 message: String::from(message),
258 }
259 }
260
261 pub fn response_code(&self) -> i32 {
262 self.response_code
263 }
264
265 pub fn error_message(&self) -> Option<&String> {
266 self.error_message.as_ref()
267 }
268}
269
270#[macro_export]
272macro_rules! mq_client_err {
273 ($response_code:expr, $fmt:expr, $($arg:expr),*) => {{
275 let formatted_msg = format!($fmt, $($arg),*);
276 std::result::Result::Err($crate::client_error::MQClientError::MQClientErr(
277 $crate::client_error::ClientErr::new_with_code($response_code as i32, formatted_msg),
278 ))
279 }};
280
281 ($response_code:expr, $error_message:expr) => {{
282 std::result::Result::Err($crate::RocketmqError::MQClientErr(
283 $crate::ClientErr::new_with_code(
284 $response_code as i32,
285 $error_message,
286 ),
287 ))
288 }};
289
290 ($error_message:expr) => {{
292 std::result::Result::Err($crate::RocketmqError::MQClientErr(
293 $crate::ClientErr::new($error_message),
294 ))
295 }};
296}
297
298#[derive(Error, Debug)]
299#[error("{message}")]
300pub struct RequestTimeoutErr {
301 response_code: i32,
302 error_message: Option<String>,
303 message: String,
304}
305
306impl RequestTimeoutErr {
307 pub fn new(error_message: impl Into<String>) -> Self {
308 let error_message = error_message.into();
309 let message = "FAQUrl::attach_default_url(Some(error_message.as_str()))";
311 Self {
312 response_code: -1,
313 error_message: Some(error_message),
314 message: String::from(message),
315 }
316 }
317
318 pub fn new_with_code(response_code: i32, error_message: impl Into<String>) -> Self {
319 let error_message = error_message.into();
320 let message = "FAQUrl::attach_default_url(Some(format!(\"CODE: {} DESC: {}\", \
324 response_code, error_message,).as_str()))";
325 Self {
326 response_code,
327 error_message: Some(error_message),
328 message: String::from(message),
329 }
330 }
331
332 pub fn response_code(&self) -> i32 {
333 self.response_code
334 }
335
336 pub fn error_message(&self) -> Option<&String> {
337 self.error_message.as_ref()
338 }
339}
340
341#[macro_export]
342macro_rules! request_timeout_err {
343 ($response_code:expr, $fmt:expr, $($arg:expr),*) => {{
345 let formatted_msg = format!($fmt, $($arg),*);
346 std::result::Result::Err($crate::RocketmqError::RequestTimeoutError(
347 $crate::RequestTimeoutErr::new_with_code(
348 $response_code as i32,
349 formatted_msg,
350 ),
351 ))
352 }};
353 ($response_code:expr, $error_message:expr) => {{
354 std::result::Result::Err($crate::RocketmqError::RequestTimeoutError(
355 $crate::RequestTimeoutErr::new_with_code(
356 $response_code as i32,
357 $error_message,
358 ),
359 ))
360 }};
361 ($error_message:expr) => {{
363 std::result::Result::Err($crate::RocketmqError::RequestTimeoutError(
364 $crate::RequestTimeoutErr::new($error_message),
365 ))
366 }};
367}
368
369#[derive(Debug, thiserror::Error)]
373pub enum ServiceError {
374 #[error("Service is already running")]
375 AlreadyRunning,
376
377 #[error("Service is not running")]
378 NotRunning,
379
380 #[error("Service startup failed: {0}")]
381 StartupFailed(String),
382
383 #[error("Service shutdown failed: {0}")]
384 ShutdownFailed(String),
385
386 #[error("Service operation timeout")]
387 Timeout,
388
389 #[error("Service interrupted")]
390 Interrupted,
391}