1pub mod unified;
56
57pub use unified::NetworkError;
59pub use unified::ProtocolError;
60pub use unified::RocketMQError;
61pub use unified::SerializationError;
62pub use unified::ServiceError as UnifiedServiceError;
64pub use unified::ToolsError;
65
66#[deprecated(since = "0.7.0", note = "Use unified error system instead")]
68mod cli_error;
69#[deprecated(since = "0.7.0", note = "Use unified error system instead")]
70mod client_error;
71#[deprecated(since = "0.7.0", note = "Use unified error system instead")]
72mod common_error;
73#[deprecated(since = "0.7.0", note = "Use unified error system instead")]
74mod name_srv_error;
75#[deprecated(since = "0.7.0", note = "Use unified error system instead")]
76mod remoting_error;
77#[deprecated(since = "0.7.0", note = "Use unified error system instead")]
78mod store_error;
79#[deprecated(since = "0.7.0", note = "Use unified error system instead")]
80mod tools_error;
81#[deprecated(since = "0.7.0", note = "Use unified error system instead")]
82mod tui_error;
83
84use std::io;
85
86use thiserror::Error;
87
88#[deprecated(since = "0.7.0", note = "Use unified::RocketMQResult instead")]
91pub type LegacyRocketMQResult<T> = std::result::Result<T, RocketmqError>;
92
93#[deprecated(since = "0.7.0", note = "Use unified::Result instead")]
94pub type LegacyResult<T> = anyhow::Result<T>;
95
96pub use unified::Result;
98pub use unified::RocketMQResult;
99use unified::ServiceError;
101
102#[derive(Debug, Error)]
103pub enum RocketmqError {
104 #[error("{0}")]
106 RemoteError(String),
107
108 #[error("{0}")]
109 DeserializeHeaderError(String),
110
111 #[error("connect to {0} failed")]
112 RemotingConnectError(String),
113
114 #[error("send request to < {0} > failed")]
115 RemotingSendRequestError(String),
116
117 #[error("wait response on the channel < {0} >, timeout: {1}(ms)")]
118 RemotingTimeoutError(String, u64),
119
120 #[error("RemotingTooMuchRequestException: {0}")]
121 RemotingTooMuchRequestError(String),
122
123 #[error("RpcException: code: {0}, message: {1}")]
124 RpcError(i32, String),
125
126 #[error("{0}")]
127 FromStrErr(String),
128
129 #[error("{0:?}")]
130 Io(#[from] io::Error),
131
132 #[error("RocketMQ protocol decoding failed, extFields length: {0}, but header length: {1}")]
133 DecodingError(usize, usize),
134
135 #[error("UTF-8 decoding error: {0}")]
136 Utf8Error(#[from] std::str::Utf8Error),
137
138 #[error("RemotingCommandDecoderError:{0}")]
139 RemotingCommandDecoderError(String),
140
141 #[error("RemotingCommandEncoderError:{0}")]
142 RemotingCommandEncoderError(String),
143
144 #[error("Not support serialize type: {0}")]
145 NotSupportSerializeType(u8),
146
147 #[error("ConnectionInvalid: {0}")]
148 ConnectionInvalid(String),
149
150 #[error("AbortProcessError: {0}-{1}")]
151 AbortProcessError(i32, String),
152
153 #[error("Channel Send Request failed: {0}")]
154 ChannelSendRequestFailed(String),
155
156 #[error("Channel recv Request failed: {0}")]
157 ChannelRecvRequestFailed(String),
158
159 #[error("{0}")]
160 IllegalArgument(String),
161
162 #[error("{0}")]
164 MQClientErr(#[from] ClientErr),
165
166 #[error("{0}")]
167 MQClientBrokerError(#[from] MQBrokerErr),
168
169 #[error("{0}")]
170 RequestTimeoutError(#[from] RequestTimeoutErr),
171
172 #[error("Client exception occurred: CODE:{0}, broker address:{1}, Message:{2}")]
173 OffsetNotFoundError(i32, String, String),
174
175 #[error("{0}")]
176 IllegalArgumentError(String),
177
178 #[error("{0}")]
179 #[cfg(feature = "with_serde")]
180 SerdeJsonError(#[from] serde_json::Error),
181
182 #[error("{0}")]
183 UnsupportedOperationException(String),
184
185 #[error("{0}")]
186 IpError(String),
187
188 #[error("{0}")]
189 ChannelError(String),
190
191 #[error("Client exception occurred: CODE:{0}, broker address:{2}, Message:{1}")]
192 MQBrokerError(i32, String, String),
193
194 #[error("{0}")]
195 NoneError(String),
196
197 #[error("{0}")]
198 TokioHandlerError(String),
199
200 #[error("Config parse error: {0}")]
201 #[cfg(feature = "with_config")]
202 ConfigError(#[from] config::ConfigError),
203
204 #[error("{0} command failed , {1}")]
205 SubCommand(String, String),
206
207 #[error("{0}")]
208 ServiceTaskError(#[from] ServiceError),
209
210 #[error("{0}")]
211 StoreCustomError(String),
212}
213
214#[derive(Error, Debug)]
215#[error("{message}")]
216pub struct MQBrokerErr {
217 response_code: i32,
218 error_message: Option<String>,
219 broker_addr: Option<String>,
220 message: String,
221}
222
223impl MQBrokerErr {
224 pub fn new(response_code: i32, error_message: impl Into<String>) -> Self {
225 let error_message = error_message.into();
226 let message = "";
230 Self {
231 response_code,
232 error_message: Some(error_message),
233 broker_addr: None,
234 message: String::from(message),
235 }
236 }
237
238 pub fn new_with_broker(
239 response_code: i32,
240 error_message: impl Into<String>,
241 broker_addr: impl Into<String>,
242 ) -> Self {
243 let broker_addr = broker_addr.into();
244 let error_message = error_message.into();
245 let message = "";
253 Self {
254 response_code,
255 error_message: Some(error_message),
256 broker_addr: Some(broker_addr),
257 message: String::from(message),
258 }
259 }
260
261 pub fn response_code(&self) -> i32 {
262 self.response_code
263 }
264
265 pub fn error_message(&self) -> Option<&String> {
266 self.error_message.as_ref()
267 }
268
269 pub fn broker_addr(&self) -> Option<&String> {
270 self.broker_addr.as_ref()
271 }
272}
273
274#[macro_export]
275macro_rules! client_broker_err {
276 ($response_code:expr, $error_message:expr, $broker_addr:expr) => {{
278 std::result::Result::Err($crate::RocketmqError::MQClientBrokerError(
279 $crate::MQBrokerErr::new_with_broker(
280 $response_code as i32,
281 $error_message,
282 $broker_addr,
283 ),
284 ))
285 }};
286 ($response_code:expr, $error_message:expr) => {{
288 std::result::Result::Err($crate::RocketmqError::MQClientBrokerError(
289 $crate::MQBrokerErr::new($response_code as i32, $error_message),
290 ))
291 }};
292}
293
294#[derive(Error, Debug)]
295#[error("{message}")]
296pub struct ClientErr {
297 response_code: i32,
298 error_message: Option<String>,
299 message: String,
300}
301
302impl ClientErr {
303 pub fn new(error_message: impl Into<String>) -> Self {
304 let error_message = error_message.into();
305 let message = "string";
306 Self {
308 response_code: -1,
309 error_message: Some(error_message),
310 message: String::from(message),
311 }
312 }
313
314 pub fn new_with_code(response_code: i32, error_message: impl Into<String>) -> Self {
315 let error_message = error_message.into();
316 let message = "";
320 Self {
321 response_code,
322 error_message: Some(error_message),
323 message: String::from(message),
324 }
325 }
326
327 pub fn response_code(&self) -> i32 {
328 self.response_code
329 }
330
331 pub fn error_message(&self) -> Option<&String> {
332 self.error_message.as_ref()
333 }
334}
335
336#[deprecated(
338 since = "0.7.0",
339 note = "Use unified error system and macros from rocketmq-client instead"
340)]
341#[macro_export]
342macro_rules! mq_client_err_legacy {
343 ($response_code:expr, $fmt:expr, $($arg:expr),*) => {{
345 let formatted_msg = format!($fmt, $($arg),*);
346 std::result::Result::Err($crate::client_error::MQClientError::MQClientErr(
347 $crate::client_error::ClientErr::new_with_code($response_code as i32, formatted_msg),
348 ))
349 }};
350
351 ($response_code:expr, $error_message:expr) => {{
352 std::result::Result::Err($crate::RocketmqError::MQClientErr(
353 $crate::ClientErr::new_with_code(
354 $response_code as i32,
355 $error_message,
356 ),
357 ))
358 }};
359
360 ($error_message:expr) => {{
362 std::result::Result::Err($crate::RocketmqError::MQClientErr(
363 $crate::ClientErr::new($error_message),
364 ))
365 }};
366}
367
368#[derive(Error, Debug)]
369#[error("{message}")]
370pub struct RequestTimeoutErr {
371 response_code: i32,
372 error_message: Option<String>,
373 message: String,
374}
375
376impl RequestTimeoutErr {
377 pub fn new(error_message: impl Into<String>) -> Self {
378 let error_message = error_message.into();
379 let message = "FAQUrl::attach_default_url(Some(error_message.as_str()))";
381 Self {
382 response_code: -1,
383 error_message: Some(error_message),
384 message: String::from(message),
385 }
386 }
387
388 pub fn new_with_code(response_code: i32, error_message: impl Into<String>) -> Self {
389 let error_message = error_message.into();
390 let message = "FAQUrl::attach_default_url(Some(format!(\"CODE: {} DESC: {}\", \
394 response_code, error_message,).as_str()))";
395 Self {
396 response_code,
397 error_message: Some(error_message),
398 message: String::from(message),
399 }
400 }
401
402 pub fn response_code(&self) -> i32 {
403 self.response_code
404 }
405
406 pub fn error_message(&self) -> Option<&String> {
407 self.error_message.as_ref()
408 }
409}
410
411#[macro_export]
412macro_rules! request_timeout_err {
413 ($response_code:expr, $fmt:expr, $($arg:expr),*) => {{
415 let formatted_msg = format!($fmt, $($arg),*);
416 std::result::Result::Err($crate::RocketmqError::RequestTimeoutError(
417 $crate::RequestTimeoutErr::new_with_code(
418 $response_code as i32,
419 formatted_msg,
420 ),
421 ))
422 }};
423 ($response_code:expr, $error_message:expr) => {{
424 std::result::Result::Err($crate::RocketmqError::RequestTimeoutError(
425 $crate::RequestTimeoutErr::new_with_code(
426 $response_code as i32,
427 $error_message,
428 ),
429 ))
430 }};
431 ($error_message:expr) => {{
433 std::result::Result::Err($crate::RocketmqError::RequestTimeoutError(
434 $crate::RequestTimeoutErr::new($error_message),
435 ))
436 }};
437}
438
439#[deprecated(since = "0.7.0", note = "Use unified::ServiceError instead")]
445#[derive(Debug, thiserror::Error)]
446pub enum LegacyServiceError {
447 #[error("Service is already running")]
448 AlreadyRunning,
449
450 #[error("Service is not running")]
451 NotRunning,
452
453 #[error("Service startup failed: {0}")]
454 StartupFailed(String),
455
456 #[error("Service shutdown failed: {0}")]
457 ShutdownFailed(String),
458
459 #[error("Service operation timeout")]
460 Timeout,
461
462 #[error("Service interrupted")]
463 Interrupted,
464}
465
466impl From<RocketmqError> for unified::RocketMQError {
474 fn from(err: RocketmqError) -> Self {
475 match err {
476 RocketmqError::RemoteError(msg) => Self::network_connection_failed("unknown", msg),
478 RocketmqError::RemotingConnectError(addr) => {
479 Self::network_connection_failed(addr, "connection failed")
480 }
481 RocketmqError::RemotingSendRequestError(addr) => {
482 Self::network_connection_failed(addr, "send request failed")
483 }
484 RocketmqError::RemotingTimeoutError(addr, timeout) => {
485 Self::Network(unified::NetworkError::RequestTimeout {
486 addr: addr.to_string(),
487 timeout_ms: timeout,
488 })
489 }
490 RocketmqError::RemotingTooMuchRequestError(msg) => Self::illegal_argument(msg),
491
492 RocketmqError::DeserializeHeaderError(msg) => {
494 Self::Serialization(unified::SerializationError::DecodeFailed {
495 format: "header",
496 message: msg,
497 })
498 }
499 RocketmqError::RemotingCommandDecoderError(_msg) => {
500 Self::Protocol(unified::ProtocolError::DecodeError {
501 ext_fields_len: 0,
502 header_len: 0,
503 })
504 }
505 RocketmqError::DecodingError(required, available) => {
506 Self::Serialization(unified::SerializationError::DecodeFailed {
507 format: "binary",
508 message: format!("required {} bytes, got {}", required, available),
509 })
510 }
511 RocketmqError::NotSupportSerializeType(t) => {
512 Self::Protocol(unified::ProtocolError::UnsupportedSerializationType {
513 serialize_type: t,
514 })
515 }
516
517 RocketmqError::MQBrokerError(code, msg, addr) => {
519 Self::broker_operation_failed("BROKER_OPERATION", code, msg).with_broker_addr(addr)
520 }
521 RocketmqError::MQClientBrokerError(err) => {
522 let mut e = Self::broker_operation_failed(
523 "BROKER_OPERATION",
524 err.response_code(),
525 err.error_message().unwrap_or(&String::new()).clone(),
526 );
527 if let Some(addr) = err.broker_addr() {
528 e = e.with_broker_addr(addr.clone());
529 }
530 e
531 }
532 RocketmqError::OffsetNotFoundError(code, addr, msg) => {
533 Self::broker_operation_failed("OFFSET_NOT_FOUND", code, msg).with_broker_addr(addr)
534 }
535
536 RocketmqError::MQClientErr(err) => {
538 Self::illegal_argument(err.error_message().unwrap_or(&String::new()).clone())
539 }
540 RocketmqError::RequestTimeoutError(_) => Self::Timeout {
541 operation: "request",
542 timeout_ms: 3000,
543 },
544
545 RocketmqError::Io(err) => Self::IO(err),
547 RocketmqError::IllegalArgument(msg) | RocketmqError::IllegalArgumentError(msg) => {
548 Self::illegal_argument(msg)
549 }
550 RocketmqError::UnsupportedOperationException(msg) => Self::illegal_argument(msg),
551 RocketmqError::IpError(msg) => Self::illegal_argument(format!("IP error: {}", msg)),
552 RocketmqError::ChannelError(msg) => Self::Internal(format!("Channel error: {}", msg)),
553 RocketmqError::NoneError(msg) => Self::Internal(format!("None error: {}", msg)),
554 RocketmqError::TokioHandlerError(msg) => {
555 Self::Internal(format!("Tokio handler error: {}", msg))
556 }
557 RocketmqError::SubCommand(cmd, msg) => {
558 Self::Internal(format!("{} command failed: {}", cmd, msg))
559 }
560 RocketmqError::StoreCustomError(msg) => Self::StorageReadFailed {
561 path: "unknown".to_string(),
562 reason: msg,
563 },
564
565 RocketmqError::ServiceTaskError(err) => Self::Service(err),
567
568 #[cfg(feature = "with_serde")]
569 RocketmqError::SerdeJsonError(err) => {
570 Self::Serialization(unified::SerializationError::JsonError(err.to_string()))
571 }
572
573 #[cfg(feature = "with_config")]
574 RocketmqError::ConfigError(err) => Self::ConfigParseFailed {
575 key: "unknown",
576 reason: err.to_string(),
577 },
578
579 RocketmqError::RpcError(code, msg) => Self::broker_operation_failed("RPC", code, msg),
581 RocketmqError::FromStrErr(msg) => Self::illegal_argument(msg),
582 RocketmqError::Utf8Error(err) => {
583 Self::Serialization(unified::SerializationError::Utf8Error(err))
584 }
585 RocketmqError::ConnectionInvalid(msg) => {
586 Self::network_connection_failed("unknown", msg)
587 }
588 RocketmqError::AbortProcessError(code, msg) => {
589 Self::Internal(format!("Abort process error {}: {}", code, msg))
590 }
591 RocketmqError::ChannelSendRequestFailed(msg) => {
592 Self::network_connection_failed("channel", msg)
593 }
594 RocketmqError::ChannelRecvRequestFailed(msg) => {
595 Self::network_connection_failed("channel", msg)
596 }
597 RocketmqError::RemotingCommandEncoderError(msg) => {
598 Self::Serialization(unified::SerializationError::EncodeFailed {
599 format: "command",
600 message: msg,
601 })
602 }
603 }
604 }
605}