1pub mod unified;
53
54pub mod auth_error;
56
57pub mod controller_error;
59
60pub mod filter_error;
62
63pub mod client_error;
65
66pub use client_error::ClientError;
70pub use controller_error::ControllerError;
71pub use controller_error::ControllerResult;
72pub use filter_error::FilterError;
74pub use unified::AuthError;
75pub use unified::NetworkError;
76pub use unified::ProtocolError;
77pub use unified::RocketMQError;
78pub use unified::RpcClientError;
79pub use unified::SerializationError;
80pub use unified::ServiceError as UnifiedServiceError;
82pub use unified::ToolsError;
83
84#[deprecated(since = "0.7.0", note = "Use unified error system instead")]
86mod cli_error;
87#[deprecated(since = "0.7.0", note = "Use unified error system instead")]
88mod common_error;
89#[deprecated(since = "0.7.0", note = "Use unified error system instead")]
90mod name_srv_error;
91#[deprecated(since = "0.7.0", note = "Use unified error system instead")]
92mod remoting_error;
93#[deprecated(since = "0.7.0", note = "Use unified error system instead")]
94mod store_error;
95#[deprecated(since = "0.7.0", note = "Use unified error system instead")]
96mod tui_error;
97
98use std::io;
99
100use thiserror::Error;
101
102#[deprecated(since = "0.7.0", note = "Use unified::RocketMQResult instead")]
105pub type LegacyRocketMQResult<T> = std::result::Result<T, RocketmqError>;
106
107#[deprecated(since = "0.7.0", note = "Use unified::Result instead")]
108pub type LegacyResult<T> = anyhow::Result<T>;
109
110pub use unified::Result;
112pub use unified::RocketMQResult;
113use unified::ServiceError;
115
116#[derive(Debug, Error)]
117pub enum RocketmqError {
118 #[error("{0}")]
120 RemoteError(String),
121
122 #[error("{0}")]
123 DeserializeHeaderError(String),
124
125 #[error("connect to {0} failed")]
126 RemotingConnectError(String),
127
128 #[error("send request to < {0} > failed")]
129 RemotingSendRequestError(String),
130
131 #[error("wait response on the channel < {0} >, timeout: {1}(ms)")]
132 RemotingTimeoutError(String, u64),
133
134 #[error("RemotingTooMuchRequestException: {0}")]
135 RemotingTooMuchRequestError(String),
136
137 #[error("RpcException: code: {0}, message: {1}")]
138 RpcError(i32, String),
139
140 #[error("{0}")]
141 FromStrErr(String),
142
143 #[error("{0:?}")]
144 Io(#[from] io::Error),
145
146 #[error("RocketMQ protocol decoding failed, extFields length: {0}, but header length: {1}")]
147 DecodingError(usize, usize),
148
149 #[error("UTF-8 decoding error: {0}")]
150 Utf8Error(#[from] std::str::Utf8Error),
151
152 #[error("RemotingCommandDecoderError:{0}")]
153 RemotingCommandDecoderError(String),
154
155 #[error("RemotingCommandEncoderError:{0}")]
156 RemotingCommandEncoderError(String),
157
158 #[error("Not support serialize type: {0}")]
159 NotSupportSerializeType(u8),
160
161 #[error("ConnectionInvalid: {0}")]
162 ConnectionInvalid(String),
163
164 #[error("AbortProcessError: {0}-{1}")]
165 AbortProcessError(i32, String),
166
167 #[error("Channel Send Request failed: {0}")]
168 ChannelSendRequestFailed(String),
169
170 #[error("Channel recv Request failed: {0}")]
171 ChannelRecvRequestFailed(String),
172
173 #[error("{0}")]
174 IllegalArgument(String),
175
176 #[error("{0}")]
178 MQClientErr(#[from] ClientErr),
179
180 #[error("{0}")]
181 MQClientBrokerError(#[from] MQBrokerErr),
182
183 #[error("{0}")]
184 RequestTimeoutError(#[from] RequestTimeoutErr),
185
186 #[error("Client exception occurred: CODE:{0}, broker address:{1}, Message:{2}")]
187 OffsetNotFoundError(i32, String, String),
188
189 #[error("{0}")]
190 IllegalArgumentError(String),
191
192 #[error("{0}")]
193 #[cfg(feature = "with_serde")]
194 SerdeJsonError(#[from] serde_json::Error),
195
196 #[error("{0}")]
197 UnsupportedOperationException(String),
198
199 #[error("{0}")]
200 IpError(String),
201
202 #[error("{0}")]
203 ChannelError(String),
204
205 #[error("Client exception occurred: CODE:{0}, broker address:{2}, Message:{1}")]
206 MQBrokerError(i32, String, String),
207
208 #[error("{0}")]
209 NoneError(String),
210
211 #[error("{0}")]
212 TokioHandlerError(String),
213
214 #[error("Config parse error: {0}")]
215 #[cfg(feature = "with_config")]
216 ConfigError(#[from] config::ConfigError),
217
218 #[error("{0} command failed , {1}")]
219 SubCommand(String, String),
220
221 #[error("{0}")]
222 ServiceTaskError(#[from] ServiceError),
223
224 #[error("{0}")]
225 StoreCustomError(String),
226}
227
228#[derive(Error, Debug)]
229#[error("{message}")]
230pub struct MQBrokerErr {
231 response_code: i32,
232 error_message: Option<String>,
233 broker_addr: Option<String>,
234 message: String,
235}
236
237impl MQBrokerErr {
238 pub fn new(response_code: i32, error_message: impl Into<String>) -> Self {
239 let error_message = error_message.into();
240 let message = format!("CODE: {response_code} DESC: {error_message}");
241 Self {
242 response_code,
243 error_message: Some(error_message),
244 broker_addr: None,
245 message,
246 }
247 }
248
249 pub fn new_with_broker(
250 response_code: i32,
251 error_message: impl Into<String>,
252 broker_addr: impl Into<String>,
253 ) -> Self {
254 let broker_addr = broker_addr.into();
255 let error_message = error_message.into();
256 let message = format!("CODE: {response_code} DESC: {error_message} BROKER: {broker_addr}");
257 Self {
258 response_code,
259 error_message: Some(error_message),
260 broker_addr: Some(broker_addr),
261 message,
262 }
263 }
264
265 pub fn response_code(&self) -> i32 {
266 self.response_code
267 }
268
269 pub fn error_message(&self) -> Option<&String> {
270 self.error_message.as_ref()
271 }
272
273 pub fn broker_addr(&self) -> Option<&String> {
274 self.broker_addr.as_ref()
275 }
276}
277
278#[macro_export]
279macro_rules! client_broker_err {
280 ($response_code:expr, $error_message:expr, $broker_addr:expr) => {{
282 std::result::Result::Err($crate::RocketmqError::MQClientBrokerError(
283 $crate::MQBrokerErr::new_with_broker($response_code as i32, $error_message, $broker_addr),
284 ))
285 }};
286 ($response_code:expr, $error_message:expr) => {{
288 std::result::Result::Err($crate::RocketmqError::MQClientBrokerError($crate::MQBrokerErr::new(
289 $response_code as i32,
290 $error_message,
291 )))
292 }};
293}
294
295#[derive(Error, Debug)]
296#[error("{message}")]
297pub struct ClientErr {
298 response_code: i32,
299 error_message: Option<String>,
300 message: String,
301}
302
303impl ClientErr {
304 pub fn new(error_message: impl Into<String>) -> Self {
305 let error_message = error_message.into();
306 let message = error_message.clone();
307 Self {
308 response_code: -1,
309 error_message: Some(error_message),
310 message,
311 }
312 }
313
314 pub fn new_with_code(response_code: i32, error_message: impl Into<String>) -> Self {
315 let error_message = error_message.into();
316 let message = format!("CODE: {response_code} DESC: {error_message}");
317 Self {
318 response_code,
319 error_message: Some(error_message),
320 message,
321 }
322 }
323
324 pub fn response_code(&self) -> i32 {
325 self.response_code
326 }
327
328 pub fn error_message(&self) -> Option<&String> {
329 self.error_message.as_ref()
330 }
331}
332
333#[deprecated(
335 since = "0.7.0",
336 note = "Use unified error system and macros from rocketmq-client instead"
337)]
338#[macro_export]
339macro_rules! mq_client_err_legacy {
340 ($response_code:expr, $fmt:expr, $($arg:expr),*) => {{
342 let formatted_msg = format!($fmt, $($arg),*);
343 std::result::Result::Err($crate::client_error::MQClientError::MQClientErr(
344 $crate::client_error::ClientErr::new_with_code($response_code as i32, formatted_msg),
345 ))
346 }};
347
348 ($response_code:expr, $error_message:expr) => {{
349 std::result::Result::Err($crate::RocketmqError::MQClientErr(
350 $crate::ClientErr::new_with_code(
351 $response_code as i32,
352 $error_message,
353 ),
354 ))
355 }};
356
357 ($error_message:expr) => {{
359 std::result::Result::Err($crate::RocketmqError::MQClientErr(
360 $crate::ClientErr::new($error_message),
361 ))
362 }};
363}
364
365#[derive(Error, Debug)]
366#[error("{message}")]
367pub struct RequestTimeoutErr {
368 response_code: i32,
369 error_message: Option<String>,
370 message: String,
371}
372
373impl RequestTimeoutErr {
374 pub fn new(error_message: impl Into<String>) -> Self {
375 let error_message = error_message.into();
376 let message = "FAQUrl::attach_default_url(Some(error_message.as_str()))";
378 Self {
379 response_code: -1,
380 error_message: Some(error_message),
381 message: String::from(message),
382 }
383 }
384
385 pub fn new_with_code(response_code: i32, error_message: impl Into<String>) -> Self {
386 let error_message = error_message.into();
387 let message =
391 "FAQUrl::attach_default_url(Some(format!(\"CODE: {} DESC: {}\", response_code, error_message,).as_str()))";
392 Self {
393 response_code,
394 error_message: Some(error_message),
395 message: String::from(message),
396 }
397 }
398
399 pub fn response_code(&self) -> i32 {
400 self.response_code
401 }
402
403 pub fn error_message(&self) -> Option<&String> {
404 self.error_message.as_ref()
405 }
406}
407
408#[macro_export]
409macro_rules! request_timeout_err {
410 ($response_code:expr, $fmt:expr, $($arg:expr),*) => {{
412 let formatted_msg = format!($fmt, $($arg),*);
413 std::result::Result::Err($crate::RocketmqError::RequestTimeoutError(
414 $crate::RequestTimeoutErr::new_with_code(
415 $response_code as i32,
416 formatted_msg,
417 ),
418 ))
419 }};
420 ($response_code:expr, $error_message:expr) => {{
421 std::result::Result::Err($crate::RocketmqError::RequestTimeoutError(
422 $crate::RequestTimeoutErr::new_with_code(
423 $response_code as i32,
424 $error_message,
425 ),
426 ))
427 }};
428 ($error_message:expr) => {{
430 std::result::Result::Err($crate::RocketmqError::RequestTimeoutError(
431 $crate::RequestTimeoutErr::new($error_message),
432 ))
433 }};
434}
435
436#[deprecated(since = "0.7.0", note = "Use unified::ServiceError instead")]
442#[derive(Debug, thiserror::Error)]
443pub enum LegacyServiceError {
444 #[error("Service is already running")]
445 AlreadyRunning,
446
447 #[error("Service is not running")]
448 NotRunning,
449
450 #[error("Service startup failed: {0}")]
451 StartupFailed(String),
452
453 #[error("Service shutdown failed: {0}")]
454 ShutdownFailed(String),
455
456 #[error("Service operation timeout")]
457 Timeout,
458
459 #[error("Service interrupted")]
460 Interrupted,
461}
462
463impl From<RocketmqError> for unified::RocketMQError {
471 fn from(err: RocketmqError) -> Self {
472 match err {
473 RocketmqError::RemoteError(msg) => Self::network_connection_failed("unknown", msg),
475 RocketmqError::RemotingConnectError(addr) => Self::network_connection_failed(addr, "connection failed"),
476 RocketmqError::RemotingSendRequestError(addr) => {
477 Self::network_connection_failed(addr, "send request failed")
478 }
479 RocketmqError::RemotingTimeoutError(addr, timeout) => {
480 Self::Network(unified::NetworkError::RequestTimeout {
481 addr,
482 timeout_ms: timeout,
483 })
484 }
485 RocketmqError::RemotingTooMuchRequestError(msg) => Self::illegal_argument(msg),
486
487 RocketmqError::DeserializeHeaderError(msg) => {
489 Self::Serialization(unified::SerializationError::DecodeFailed {
490 format: "header",
491 message: msg,
492 })
493 }
494 RocketmqError::RemotingCommandDecoderError(_msg) => Self::Protocol(unified::ProtocolError::DecodeError {
495 ext_fields_len: 0,
496 header_len: 0,
497 }),
498 RocketmqError::DecodingError(required, available) => {
499 Self::Serialization(unified::SerializationError::DecodeFailed {
500 format: "binary",
501 message: format!("required {} bytes, got {}", required, available),
502 })
503 }
504 RocketmqError::NotSupportSerializeType(t) => {
505 Self::Protocol(unified::ProtocolError::UnsupportedSerializationType { serialize_type: t })
506 }
507
508 RocketmqError::MQBrokerError(code, msg, addr) => {
510 Self::broker_operation_failed("BROKER_OPERATION", code, msg).with_broker_addr(addr)
511 }
512 RocketmqError::MQClientBrokerError(err) => {
513 let mut e = Self::broker_operation_failed(
514 "BROKER_OPERATION",
515 err.response_code(),
516 err.error_message().unwrap_or(&String::new()).clone(),
517 );
518 if let Some(addr) = err.broker_addr() {
519 e = e.with_broker_addr(addr.clone());
520 }
521 e
522 }
523 RocketmqError::OffsetNotFoundError(code, addr, msg) => {
524 Self::broker_operation_failed("OFFSET_NOT_FOUND", code, msg).with_broker_addr(addr)
525 }
526
527 RocketmqError::MQClientErr(err) => {
529 Self::illegal_argument(err.error_message().unwrap_or(&String::new()).clone())
530 }
531 RocketmqError::RequestTimeoutError(_) => Self::Timeout {
532 operation: "request",
533 timeout_ms: 3000,
534 },
535
536 RocketmqError::Io(err) => Self::IO(err),
538 RocketmqError::IllegalArgument(msg) | RocketmqError::IllegalArgumentError(msg) => {
539 Self::illegal_argument(msg)
540 }
541 RocketmqError::UnsupportedOperationException(msg) => Self::illegal_argument(msg),
542 RocketmqError::IpError(msg) => Self::illegal_argument(format!("IP error: {}", msg)),
543 RocketmqError::ChannelError(msg) => Self::Internal(format!("Channel error: {}", msg)),
544 RocketmqError::NoneError(msg) => Self::Internal(format!("None error: {}", msg)),
545 RocketmqError::TokioHandlerError(msg) => Self::Internal(format!("Tokio handler error: {}", msg)),
546 RocketmqError::SubCommand(cmd, msg) => Self::Internal(format!("{} command failed: {}", cmd, msg)),
547 RocketmqError::StoreCustomError(msg) => Self::StorageReadFailed {
548 path: "unknown".to_string(),
549 reason: msg,
550 },
551
552 RocketmqError::ServiceTaskError(err) => Self::Service(err),
554
555 #[cfg(feature = "with_serde")]
556 RocketmqError::SerdeJsonError(err) => {
557 Self::Serialization(unified::SerializationError::JsonError(err.to_string()))
558 }
559
560 #[cfg(feature = "with_config")]
561 RocketmqError::ConfigError(err) => Self::ConfigParseFailed {
562 key: "unknown",
563 reason: err.to_string(),
564 },
565
566 RocketmqError::RpcError(code, msg) => Self::broker_operation_failed("RPC", code, msg),
568 RocketmqError::FromStrErr(msg) => Self::illegal_argument(msg),
569 RocketmqError::Utf8Error(err) => Self::Serialization(unified::SerializationError::Utf8Error(err)),
570 RocketmqError::ConnectionInvalid(msg) => Self::network_connection_failed("unknown", msg),
571 RocketmqError::AbortProcessError(code, msg) => {
572 Self::Internal(format!("Abort process error {}: {}", code, msg))
573 }
574 RocketmqError::ChannelSendRequestFailed(msg) => Self::network_connection_failed("channel", msg),
575 RocketmqError::ChannelRecvRequestFailed(msg) => Self::network_connection_failed("channel", msg),
576 RocketmqError::RemotingCommandEncoderError(msg) => {
577 Self::Serialization(unified::SerializationError::EncodeFailed {
578 format: "command",
579 message: msg,
580 })
581 }
582 }
583 }
584}
585
586#[cfg(test)]
587mod tests {
588 use super::*;
589
590 #[test]
591 fn client_err_display_preserves_original_message() {
592 let err = ClientErr::new("Topic of the message does not match its target message queue");
593
594 assert_eq!(
595 err.to_string(),
596 "Topic of the message does not match its target message queue"
597 );
598 assert_eq!(err.response_code(), -1);
599 assert_eq!(
600 err.error_message().map(String::as_str),
601 Some("Topic of the message does not match its target message queue")
602 );
603 }
604
605 #[test]
606 fn client_err_with_code_display_includes_code_and_description() {
607 let err = ClientErr::new_with_code(17, "No route info of this topic");
608
609 assert_eq!(err.to_string(), "CODE: 17 DESC: No route info of this topic");
610 assert_eq!(err.response_code(), 17);
611 assert_eq!(
612 err.error_message().map(String::as_str),
613 Some("No route info of this topic")
614 );
615 }
616
617 #[test]
618 fn broker_err_display_includes_code_description_and_broker_when_present() {
619 let err = MQBrokerErr::new_with_broker(14, "topic does not exist", "127.0.0.1:10911");
620
621 assert_eq!(
622 err.to_string(),
623 "CODE: 14 DESC: topic does not exist BROKER: 127.0.0.1:10911"
624 );
625 assert_eq!(err.response_code(), 14);
626 assert_eq!(err.error_message().map(String::as_str), Some("topic does not exist"));
627 assert_eq!(err.broker_addr().map(String::as_str), Some("127.0.0.1:10911"));
628 }
629
630 #[test]
631 #[allow(deprecated)]
632 fn macro_created_client_error_keeps_message_visible() {
633 let result: LegacyRocketMQResult<()> = crate::mq_client_err_legacy!("send failed");
634 let err = result.expect_err("macro should create a client error");
635
636 assert_eq!(err.to_string(), "send failed");
637 }
638}