1pub mod unified;
53
54pub mod auth_error;
56
57pub mod controller_error;
59
60pub mod filter_error;
62
63pub mod client_error;
65
66pub use client_error::ClientError;
70pub use controller_error::ControllerError;
71pub use controller_error::ControllerResult;
72pub use filter_error::FilterError;
74pub use unified::AuthError;
75pub use unified::NetworkError;
76pub use unified::ProtocolError;
77pub use unified::RocketMQError;
78pub use unified::RpcClientError;
79pub use unified::SerializationError;
80pub use unified::ServiceError as UnifiedServiceError;
82pub use unified::ToolsError;
83
84#[deprecated(since = "0.7.0", note = "Use unified error system instead")]
86mod cli_error;
87#[deprecated(since = "0.7.0", note = "Use unified error system instead")]
88mod common_error;
89#[deprecated(since = "0.7.0", note = "Use unified error system instead")]
90mod name_srv_error;
91#[deprecated(since = "0.7.0", note = "Use unified error system instead")]
92mod remoting_error;
93#[deprecated(since = "0.7.0", note = "Use unified error system instead")]
94mod store_error;
95#[deprecated(since = "0.7.0", note = "Use unified error system instead")]
96mod tui_error;
97
98use std::io;
99
100use thiserror::Error;
101
102#[deprecated(since = "0.7.0", note = "Use unified::RocketMQResult instead")]
105pub type LegacyRocketMQResult<T> = std::result::Result<T, RocketmqError>;
106
107#[deprecated(since = "0.7.0", note = "Use unified::Result instead")]
108pub type LegacyResult<T> = anyhow::Result<T>;
109
110pub use unified::Result;
112pub use unified::RocketMQResult;
113use unified::ServiceError;
115
116#[derive(Debug, Error)]
117pub enum RocketmqError {
118 #[error("{0}")]
120 RemoteError(String),
121
122 #[error("{0}")]
123 DeserializeHeaderError(String),
124
125 #[error("connect to {0} failed")]
126 RemotingConnectError(String),
127
128 #[error("send request to < {0} > failed")]
129 RemotingSendRequestError(String),
130
131 #[error("wait response on the channel < {0} >, timeout: {1}(ms)")]
132 RemotingTimeoutError(String, u64),
133
134 #[error("RemotingTooMuchRequestException: {0}")]
135 RemotingTooMuchRequestError(String),
136
137 #[error("RpcException: code: {0}, message: {1}")]
138 RpcError(i32, String),
139
140 #[error("{0}")]
141 FromStrErr(String),
142
143 #[error("{0:?}")]
144 Io(#[from] io::Error),
145
146 #[error("RocketMQ protocol decoding failed, extFields length: {0}, but header length: {1}")]
147 DecodingError(usize, usize),
148
149 #[error("UTF-8 decoding error: {0}")]
150 Utf8Error(#[from] std::str::Utf8Error),
151
152 #[error("RemotingCommandDecoderError:{0}")]
153 RemotingCommandDecoderError(String),
154
155 #[error("RemotingCommandEncoderError:{0}")]
156 RemotingCommandEncoderError(String),
157
158 #[error("Not support serialize type: {0}")]
159 NotSupportSerializeType(u8),
160
161 #[error("ConnectionInvalid: {0}")]
162 ConnectionInvalid(String),
163
164 #[error("AbortProcessError: {0}-{1}")]
165 AbortProcessError(i32, String),
166
167 #[error("Channel Send Request failed: {0}")]
168 ChannelSendRequestFailed(String),
169
170 #[error("Channel recv Request failed: {0}")]
171 ChannelRecvRequestFailed(String),
172
173 #[error("{0}")]
174 IllegalArgument(String),
175
176 #[error("{0}")]
178 MQClientErr(#[from] ClientErr),
179
180 #[error("{0}")]
181 MQClientBrokerError(#[from] MQBrokerErr),
182
183 #[error("{0}")]
184 RequestTimeoutError(#[from] RequestTimeoutErr),
185
186 #[error("Client exception occurred: CODE:{0}, broker address:{1}, Message:{2}")]
187 OffsetNotFoundError(i32, String, String),
188
189 #[error("{0}")]
190 IllegalArgumentError(String),
191
192 #[error("{0}")]
193 #[cfg(feature = "with_serde")]
194 SerdeJsonError(#[from] serde_json::Error),
195
196 #[error("{0}")]
197 UnsupportedOperationException(String),
198
199 #[error("{0}")]
200 IpError(String),
201
202 #[error("{0}")]
203 ChannelError(String),
204
205 #[error("Client exception occurred: CODE:{0}, broker address:{2}, Message:{1}")]
206 MQBrokerError(i32, String, String),
207
208 #[error("{0}")]
209 NoneError(String),
210
211 #[error("{0}")]
212 TokioHandlerError(String),
213
214 #[error("Config parse error: {0}")]
215 #[cfg(feature = "with_config")]
216 ConfigError(#[from] config::ConfigError),
217
218 #[error("{0} command failed , {1}")]
219 SubCommand(String, String),
220
221 #[error("{0}")]
222 ServiceTaskError(#[from] ServiceError),
223
224 #[error("{0}")]
225 StoreCustomError(String),
226}
227
228#[derive(Error, Debug)]
229#[error("{message}")]
230pub struct MQBrokerErr {
231 response_code: i32,
232 error_message: Option<String>,
233 broker_addr: Option<String>,
234 message: String,
235}
236
237impl MQBrokerErr {
238 pub fn new(response_code: i32, error_message: impl Into<String>) -> Self {
239 let error_message = error_message.into();
240 let message = "";
241 Self {
242 response_code,
243 error_message: Some(error_message),
244 broker_addr: None,
245 message: String::from(message),
246 }
247 }
248
249 pub fn new_with_broker(
250 response_code: i32,
251 error_message: impl Into<String>,
252 broker_addr: impl Into<String>,
253 ) -> Self {
254 let broker_addr = broker_addr.into();
255 let error_message = error_message.into();
256 let message = "";
257 Self {
258 response_code,
259 error_message: Some(error_message),
260 broker_addr: Some(broker_addr),
261 message: String::from(message),
262 }
263 }
264
265 pub fn response_code(&self) -> i32 {
266 self.response_code
267 }
268
269 pub fn error_message(&self) -> Option<&String> {
270 self.error_message.as_ref()
271 }
272
273 pub fn broker_addr(&self) -> Option<&String> {
274 self.broker_addr.as_ref()
275 }
276}
277
278#[macro_export]
279macro_rules! client_broker_err {
280 ($response_code:expr, $error_message:expr, $broker_addr:expr) => {{
282 std::result::Result::Err($crate::RocketmqError::MQClientBrokerError(
283 $crate::MQBrokerErr::new_with_broker($response_code as i32, $error_message, $broker_addr),
284 ))
285 }};
286 ($response_code:expr, $error_message:expr) => {{
288 std::result::Result::Err($crate::RocketmqError::MQClientBrokerError($crate::MQBrokerErr::new(
289 $response_code as i32,
290 $error_message,
291 )))
292 }};
293}
294
295#[derive(Error, Debug)]
296#[error("{message}")]
297pub struct ClientErr {
298 response_code: i32,
299 error_message: Option<String>,
300 message: String,
301}
302
303impl ClientErr {
304 pub fn new(error_message: impl Into<String>) -> Self {
305 let error_message = error_message.into();
306 let message = "string";
307 Self {
309 response_code: -1,
310 error_message: Some(error_message),
311 message: String::from(message),
312 }
313 }
314
315 pub fn new_with_code(response_code: i32, error_message: impl Into<String>) -> Self {
316 let error_message = error_message.into();
317 let message = "";
321 Self {
322 response_code,
323 error_message: Some(error_message),
324 message: String::from(message),
325 }
326 }
327
328 pub fn response_code(&self) -> i32 {
329 self.response_code
330 }
331
332 pub fn error_message(&self) -> Option<&String> {
333 self.error_message.as_ref()
334 }
335}
336
337#[deprecated(
339 since = "0.7.0",
340 note = "Use unified error system and macros from rocketmq-client instead"
341)]
342#[macro_export]
343macro_rules! mq_client_err_legacy {
344 ($response_code:expr, $fmt:expr, $($arg:expr),*) => {{
346 let formatted_msg = format!($fmt, $($arg),*);
347 std::result::Result::Err($crate::client_error::MQClientError::MQClientErr(
348 $crate::client_error::ClientErr::new_with_code($response_code as i32, formatted_msg),
349 ))
350 }};
351
352 ($response_code:expr, $error_message:expr) => {{
353 std::result::Result::Err($crate::RocketmqError::MQClientErr(
354 $crate::ClientErr::new_with_code(
355 $response_code as i32,
356 $error_message,
357 ),
358 ))
359 }};
360
361 ($error_message:expr) => {{
363 std::result::Result::Err($crate::RocketmqError::MQClientErr(
364 $crate::ClientErr::new($error_message),
365 ))
366 }};
367}
368
369#[derive(Error, Debug)]
370#[error("{message}")]
371pub struct RequestTimeoutErr {
372 response_code: i32,
373 error_message: Option<String>,
374 message: String,
375}
376
377impl RequestTimeoutErr {
378 pub fn new(error_message: impl Into<String>) -> Self {
379 let error_message = error_message.into();
380 let message = "FAQUrl::attach_default_url(Some(error_message.as_str()))";
382 Self {
383 response_code: -1,
384 error_message: Some(error_message),
385 message: String::from(message),
386 }
387 }
388
389 pub fn new_with_code(response_code: i32, error_message: impl Into<String>) -> Self {
390 let error_message = error_message.into();
391 let message =
395 "FAQUrl::attach_default_url(Some(format!(\"CODE: {} DESC: {}\", response_code, error_message,).as_str()))";
396 Self {
397 response_code,
398 error_message: Some(error_message),
399 message: String::from(message),
400 }
401 }
402
403 pub fn response_code(&self) -> i32 {
404 self.response_code
405 }
406
407 pub fn error_message(&self) -> Option<&String> {
408 self.error_message.as_ref()
409 }
410}
411
412#[macro_export]
413macro_rules! request_timeout_err {
414 ($response_code:expr, $fmt:expr, $($arg:expr),*) => {{
416 let formatted_msg = format!($fmt, $($arg),*);
417 std::result::Result::Err($crate::RocketmqError::RequestTimeoutError(
418 $crate::RequestTimeoutErr::new_with_code(
419 $response_code as i32,
420 formatted_msg,
421 ),
422 ))
423 }};
424 ($response_code:expr, $error_message:expr) => {{
425 std::result::Result::Err($crate::RocketmqError::RequestTimeoutError(
426 $crate::RequestTimeoutErr::new_with_code(
427 $response_code as i32,
428 $error_message,
429 ),
430 ))
431 }};
432 ($error_message:expr) => {{
434 std::result::Result::Err($crate::RocketmqError::RequestTimeoutError(
435 $crate::RequestTimeoutErr::new($error_message),
436 ))
437 }};
438}
439
440#[deprecated(since = "0.7.0", note = "Use unified::ServiceError instead")]
446#[derive(Debug, thiserror::Error)]
447pub enum LegacyServiceError {
448 #[error("Service is already running")]
449 AlreadyRunning,
450
451 #[error("Service is not running")]
452 NotRunning,
453
454 #[error("Service startup failed: {0}")]
455 StartupFailed(String),
456
457 #[error("Service shutdown failed: {0}")]
458 ShutdownFailed(String),
459
460 #[error("Service operation timeout")]
461 Timeout,
462
463 #[error("Service interrupted")]
464 Interrupted,
465}
466
467impl From<RocketmqError> for unified::RocketMQError {
475 fn from(err: RocketmqError) -> Self {
476 match err {
477 RocketmqError::RemoteError(msg) => Self::network_connection_failed("unknown", msg),
479 RocketmqError::RemotingConnectError(addr) => Self::network_connection_failed(addr, "connection failed"),
480 RocketmqError::RemotingSendRequestError(addr) => {
481 Self::network_connection_failed(addr, "send request failed")
482 }
483 RocketmqError::RemotingTimeoutError(addr, timeout) => {
484 Self::Network(unified::NetworkError::RequestTimeout {
485 addr,
486 timeout_ms: timeout,
487 })
488 }
489 RocketmqError::RemotingTooMuchRequestError(msg) => Self::illegal_argument(msg),
490
491 RocketmqError::DeserializeHeaderError(msg) => {
493 Self::Serialization(unified::SerializationError::DecodeFailed {
494 format: "header",
495 message: msg,
496 })
497 }
498 RocketmqError::RemotingCommandDecoderError(_msg) => Self::Protocol(unified::ProtocolError::DecodeError {
499 ext_fields_len: 0,
500 header_len: 0,
501 }),
502 RocketmqError::DecodingError(required, available) => {
503 Self::Serialization(unified::SerializationError::DecodeFailed {
504 format: "binary",
505 message: format!("required {} bytes, got {}", required, available),
506 })
507 }
508 RocketmqError::NotSupportSerializeType(t) => {
509 Self::Protocol(unified::ProtocolError::UnsupportedSerializationType { serialize_type: t })
510 }
511
512 RocketmqError::MQBrokerError(code, msg, addr) => {
514 Self::broker_operation_failed("BROKER_OPERATION", code, msg).with_broker_addr(addr)
515 }
516 RocketmqError::MQClientBrokerError(err) => {
517 let mut e = Self::broker_operation_failed(
518 "BROKER_OPERATION",
519 err.response_code(),
520 err.error_message().unwrap_or(&String::new()).clone(),
521 );
522 if let Some(addr) = err.broker_addr() {
523 e = e.with_broker_addr(addr.clone());
524 }
525 e
526 }
527 RocketmqError::OffsetNotFoundError(code, addr, msg) => {
528 Self::broker_operation_failed("OFFSET_NOT_FOUND", code, msg).with_broker_addr(addr)
529 }
530
531 RocketmqError::MQClientErr(err) => {
533 Self::illegal_argument(err.error_message().unwrap_or(&String::new()).clone())
534 }
535 RocketmqError::RequestTimeoutError(_) => Self::Timeout {
536 operation: "request",
537 timeout_ms: 3000,
538 },
539
540 RocketmqError::Io(err) => Self::IO(err),
542 RocketmqError::IllegalArgument(msg) | RocketmqError::IllegalArgumentError(msg) => {
543 Self::illegal_argument(msg)
544 }
545 RocketmqError::UnsupportedOperationException(msg) => Self::illegal_argument(msg),
546 RocketmqError::IpError(msg) => Self::illegal_argument(format!("IP error: {}", msg)),
547 RocketmqError::ChannelError(msg) => Self::Internal(format!("Channel error: {}", msg)),
548 RocketmqError::NoneError(msg) => Self::Internal(format!("None error: {}", msg)),
549 RocketmqError::TokioHandlerError(msg) => Self::Internal(format!("Tokio handler error: {}", msg)),
550 RocketmqError::SubCommand(cmd, msg) => Self::Internal(format!("{} command failed: {}", cmd, msg)),
551 RocketmqError::StoreCustomError(msg) => Self::StorageReadFailed {
552 path: "unknown".to_string(),
553 reason: msg,
554 },
555
556 RocketmqError::ServiceTaskError(err) => Self::Service(err),
558
559 #[cfg(feature = "with_serde")]
560 RocketmqError::SerdeJsonError(err) => {
561 Self::Serialization(unified::SerializationError::JsonError(err.to_string()))
562 }
563
564 #[cfg(feature = "with_config")]
565 RocketmqError::ConfigError(err) => Self::ConfigParseFailed {
566 key: "unknown",
567 reason: err.to_string(),
568 },
569
570 RocketmqError::RpcError(code, msg) => Self::broker_operation_failed("RPC", code, msg),
572 RocketmqError::FromStrErr(msg) => Self::illegal_argument(msg),
573 RocketmqError::Utf8Error(err) => Self::Serialization(unified::SerializationError::Utf8Error(err)),
574 RocketmqError::ConnectionInvalid(msg) => Self::network_connection_failed("unknown", msg),
575 RocketmqError::AbortProcessError(code, msg) => {
576 Self::Internal(format!("Abort process error {}: {}", code, msg))
577 }
578 RocketmqError::ChannelSendRequestFailed(msg) => Self::network_connection_failed("channel", msg),
579 RocketmqError::ChannelRecvRequestFailed(msg) => Self::network_connection_failed("channel", msg),
580 RocketmqError::RemotingCommandEncoderError(msg) => {
581 Self::Serialization(unified::SerializationError::EncodeFailed {
582 format: "command",
583 message: msg,
584 })
585 }
586 }
587 }
588}