1mod network;
22mod protocol;
23mod rpc;
24mod serialization;
25mod tools;
26
27use std::io;
28
29pub use crate::filter_error::FilterError;
31
32pub use network::NetworkError;
33pub use protocol::ProtocolError;
34pub use rpc::RpcClientError;
35pub use serialization::SerializationError;
36use thiserror::Error;
37pub use tools::ToolsError;
38
39pub use crate::auth_error::AuthError;
41#[allow(deprecated)]
43pub use crate::client_error::*;
44#[allow(deprecated)]
45pub use crate::common_error::*;
46pub use crate::controller_error::ControllerError;
48
49#[derive(Debug, Error)]
86pub enum RocketMQError {
87 #[error(transparent)]
92 Network(#[from] NetworkError),
93
94 #[error(transparent)]
99 Serialization(#[from] SerializationError),
100
101 #[error(transparent)]
106 Protocol(#[from] ProtocolError),
107
108 #[error(transparent)]
113 Rpc(#[from] RpcClientError),
114
115 #[error(transparent)]
120 Authentication(#[from] AuthError),
121
122 #[error(transparent)]
127 Controller(#[from] ControllerError),
128
129 #[error("Invalid message property: {0}")]
134 InvalidProperty(String),
135
136 #[error("Broker not found: {name}")]
141 BrokerNotFound { name: String },
142
143 #[error("Broker registration failed for '{name}': {reason}")]
145 BrokerRegistrationFailed { name: String, reason: String },
146
147 #[error("Broker operation '{operation}' failed: code={code}, message={message}")]
149 BrokerOperationFailed {
150 operation: &'static str,
151 code: i32,
152 message: String,
153 broker_addr: Option<String>,
154 },
155
156 #[error("Topic '{topic}' does not exist")]
158 TopicNotExist { topic: String },
159
160 #[error("Queue does not exist: topic='{topic}', queue_id={queue_id}")]
162 QueueNotExist { topic: String, queue_id: i32 },
163
164 #[error("Subscription group '{group}' not found")]
166 SubscriptionGroupNotExist { group: String },
167
168 #[error("Queue {queue_id} out of range (0-{max}) for topic '{topic}'")]
170 QueueIdOutOfRange { topic: String, queue_id: i32, max: i32 },
171
172 #[error("Message body length {actual} bytes exceeds limit {limit} bytes")]
174 MessageTooLarge { actual: usize, limit: usize },
175
176 #[error("Message validation failed: {reason}")]
178 MessageValidationFailed { reason: String },
179
180 #[error("Retry limit {current}/{max} exceeded for group '{group}'")]
182 RetryLimitExceeded { group: String, current: i32, max: i32 },
183
184 #[error("Transaction message rejected by broker policy")]
186 TransactionRejected,
187
188 #[error("Broker permission denied: {operation}")]
190 BrokerPermissionDenied { operation: String },
191
192 #[error("Not master broker, master address: {master_address}")]
194 NotMasterBroker { master_address: String },
195
196 #[error("Message lookup failed at offset {offset}")]
198 MessageLookupFailed { offset: i64 },
199
200 #[error("Sending to topic '{topic}' is forbidden")]
202 TopicSendingForbidden { topic: String },
203
204 #[error("Async task '{task}' failed: {context}")]
206 BrokerAsyncTaskFailed {
207 task: &'static str,
208 context: String,
209 #[source]
210 source: Box<dyn std::error::Error + Send + Sync>,
211 },
212
213 #[error("Request body {operation} failed: {reason}")]
218 RequestBodyInvalid { operation: &'static str, reason: String },
219
220 #[error("Request header error: {0}")]
222 RequestHeaderError(String),
223
224 #[error("Response {operation} failed: {reason}")]
226 ResponseProcessFailed { operation: &'static str, reason: String },
227
228 #[error("Route information not found for topic '{topic}'")]
233 RouteNotFound { topic: String },
234
235 #[error("Route data inconsistency detected for topic '{topic}': {reason}")]
237 RouteInconsistent { topic: String, reason: String },
238
239 #[error("Broker registration conflict for '{broker_name}': {reason}")]
241 RouteRegistrationConflict { broker_name: String, reason: String },
242
243 #[error("Route state version conflict: expected={expected}, actual={actual}")]
245 RouteVersionConflict { expected: u64, actual: u64 },
246
247 #[error("Cluster '{cluster}' not found")]
249 ClusterNotFound { cluster: String },
250
251 #[error("Client is not started")]
256 ClientNotStarted,
257
258 #[error("Client is already started")]
260 ClientAlreadyStarted,
261
262 #[error("Client is shutting down")]
264 ClientShuttingDown,
265
266 #[error("Invalid client state: expected {expected}, got {actual}")]
268 ClientInvalidState { expected: &'static str, actual: String },
269
270 #[error("Producer is not available")]
272 ProducerNotAvailable,
273
274 #[error("Consumer is not available")]
276 ConsumerNotAvailable,
277
278 #[error(transparent)]
283 Tools(#[from] ToolsError),
284
285 #[error(transparent)]
290 Filter(#[from] FilterError),
291
292 #[error("Storage read failed for '{path}': {reason}")]
297 StorageReadFailed { path: String, reason: String },
298
299 #[error("Storage write failed for '{path}': {reason}")]
301 StorageWriteFailed { path: String, reason: String },
302
303 #[error("Corrupted data detected in '{path}'")]
305 StorageCorrupted { path: String },
306
307 #[error("Out of storage space: {path}")]
309 StorageOutOfSpace { path: String },
310
311 #[error("Failed to acquire lock for '{path}'")]
313 StorageLockFailed { path: String },
314
315 #[error("Configuration parse error for '{key}': {reason}")]
320 ConfigParseFailed { key: &'static str, reason: String },
321
322 #[error("Required configuration '{key}' is missing")]
324 ConfigMissing { key: &'static str },
325
326 #[error("Invalid configuration for '{key}': value='{value}', reason={reason}")]
328 ConfigInvalidValue {
329 key: &'static str,
330 value: String,
331 reason: String,
332 },
333
334 #[error("Invalid auth configuration for '{key}': {reason}")]
336 AuthConfigInvalid { key: &'static str, reason: String },
337
338 #[error("Auth hot reload failed for '{path}': {reason}")]
340 AuthHotReloadFailed { path: String, reason: String },
341
342 #[error("Not leader, current leader is: {}", leader_id.map(|id| id.to_string()).unwrap_or_else(|| "unknown".to_string()))]
347 ControllerNotLeader { leader_id: Option<u64> },
348
349 #[error("Raft consensus error: {reason}")]
351 ControllerRaftError { reason: String },
352
353 #[error("Consensus operation '{operation}' timed out after {timeout_ms}ms")]
355 ControllerConsensusTimeout { operation: &'static str, timeout_ms: u64 },
356
357 #[error("Snapshot operation failed: {reason}")]
359 ControllerSnapshotFailed { reason: String },
360
361 #[error("IO error: {0}")]
366 IO(#[from] io::Error),
367
368 #[error("Illegal argument: {0}")]
370 IllegalArgument(String),
371
372 #[error("Operation '{operation}' timed out after {timeout_ms}ms")]
374 Timeout { operation: &'static str, timeout_ms: u64 },
375
376 #[error("Internal error: {0}")]
378 Internal(String),
379
380 #[error("Service error: {0}")]
382 Service(#[from] ServiceError),
383
384 #[error("Invalid RocketMQ version ordinal: {0}")]
389 InvalidVersionOrdinal(u32),
390
391 #[deprecated(since = "0.7.0", note = "Use specific error types instead")]
396 #[error("{0}")]
397 Legacy(String),
398
399 #[error("Not initialized: {0}")]
400 NotInitialized(String),
401
402 #[error("Message is missing required property: {property}")]
403 MissingRequiredMessageProperty { property: &'static str },
404}
405
406impl RocketMQError {
411 #[inline]
413 pub fn network_connection_failed(addr: impl Into<String>, reason: impl Into<String>) -> Self {
414 Self::Network(NetworkError::connection_failed(addr, reason))
415 }
416
417 #[inline]
419 pub fn network_timeout(addr: impl Into<String>, timeout: std::time::Duration) -> Self {
420 Self::Network(NetworkError::request_timeout(addr, timeout.as_millis() as u64))
421 }
422
423 #[inline]
425 pub fn network_request_failed(addr: impl Into<String>, reason: impl Into<String>) -> Self {
426 Self::Network(NetworkError::send_failed(addr, reason))
427 }
428
429 #[inline]
431 pub fn deserialization_failed(format: &'static str, reason: impl Into<String>) -> Self {
432 Self::Serialization(SerializationError::decode_failed(format, reason))
433 }
434
435 #[inline]
437 pub fn validation_failed(field: impl Into<String>, reason: impl Into<String>) -> Self {
438 Self::Tools(ToolsError::validation_error(field, reason))
439 }
440
441 #[inline]
443 pub fn broker_operation_failed(operation: &'static str, code: i32, message: impl Into<String>) -> Self {
444 Self::BrokerOperationFailed {
445 operation,
446 code,
447 message: message.into(),
448 broker_addr: None,
449 }
450 }
451
452 #[inline]
454 pub fn storage_read_failed(path: impl Into<String>, reason: impl Into<String>) -> Self {
455 Self::StorageReadFailed {
456 path: path.into(),
457 reason: reason.into(),
458 }
459 }
460
461 #[inline]
463 pub fn storage_write_failed(path: impl Into<String>, reason: impl Into<String>) -> Self {
464 Self::StorageWriteFailed {
465 path: path.into(),
466 reason: reason.into(),
467 }
468 }
469
470 #[inline]
472 pub fn illegal_argument(message: impl Into<String>) -> Self {
473 Self::IllegalArgument(message.into())
474 }
475
476 #[inline]
478 pub fn route_not_found(topic: impl Into<String>) -> Self {
479 Self::RouteNotFound { topic: topic.into() }
480 }
481
482 #[inline]
484 pub fn route_registration_conflict(broker_name: impl Into<String>, reason: impl Into<String>) -> Self {
485 Self::RouteRegistrationConflict {
486 broker_name: broker_name.into(),
487 reason: reason.into(),
488 }
489 }
490
491 #[inline]
493 pub fn cluster_not_found(cluster: impl Into<String>) -> Self {
494 Self::ClusterNotFound {
495 cluster: cluster.into(),
496 }
497 }
498
499 #[inline]
501 pub fn request_body_invalid(operation: &'static str, reason: impl Into<String>) -> Self {
502 Self::RequestBodyInvalid {
503 operation,
504 reason: reason.into(),
505 }
506 }
507
508 #[inline]
510 pub fn request_header_error(message: impl Into<String>) -> Self {
511 Self::RequestHeaderError(message.into())
512 }
513
514 #[inline]
516 pub fn response_process_failed(operation: &'static str, reason: impl Into<String>) -> Self {
517 Self::ResponseProcessFailed {
518 operation,
519 reason: reason.into(),
520 }
521 }
522
523 pub fn with_broker_addr(self, addr: impl Into<String>) -> Self {
525 match self {
526 Self::BrokerOperationFailed {
527 operation,
528 code,
529 message,
530 broker_addr: _,
531 } => Self::BrokerOperationFailed {
532 operation,
533 code,
534 message,
535 broker_addr: Some(addr.into()),
536 },
537 other => other,
538 }
539 }
540
541 #[inline]
543 pub fn validation_error(field: impl Into<String>, reason: impl Into<String>) -> Self {
544 Self::Tools(ToolsError::validation_error(field, reason))
545 }
546
547 #[inline]
549 pub fn topic_not_found(topic: impl Into<String>) -> Self {
550 Self::Tools(ToolsError::topic_not_found(topic))
551 }
552
553 #[inline]
555 pub fn topic_already_exists(topic: impl Into<String>) -> Self {
556 Self::Tools(ToolsError::topic_already_exists(topic))
557 }
558
559 #[inline]
561 pub fn nameserver_unreachable(addr: impl Into<String>) -> Self {
562 Self::Tools(ToolsError::nameserver_unreachable(addr))
563 }
564
565 #[inline]
567 pub fn nameserver_config_invalid(reason: impl Into<String>) -> Self {
568 Self::Tools(ToolsError::nameserver_config_invalid(reason))
569 }
570
571 #[inline]
573 pub fn not_initialized(reason: impl Into<String>) -> Self {
574 Self::NotInitialized(reason.into())
575 }
576
577 #[inline]
583 pub fn authentication_failed(reason: impl Into<String>) -> Self {
584 Self::Authentication(AuthError::AuthenticationFailed(reason.into()))
585 }
586
587 #[inline]
589 pub fn invalid_credential(reason: impl Into<String>) -> Self {
590 Self::Authentication(AuthError::InvalidCredential(reason.into()))
591 }
592
593 #[inline]
595 pub fn user_not_found(username: impl Into<String>) -> Self {
596 Self::Authentication(AuthError::UserNotFound(username.into()))
597 }
598
599 #[inline]
601 pub fn invalid_signature(reason: impl Into<String>) -> Self {
602 Self::Authentication(AuthError::InvalidSignature(reason.into()))
603 }
604
605 #[inline]
607 pub fn auth_config_invalid(key: &'static str, reason: impl Into<String>) -> Self {
608 Self::AuthConfigInvalid {
609 key,
610 reason: reason.into(),
611 }
612 }
613
614 #[inline]
616 pub fn auth_hot_reload_failed(path: impl Into<String>, reason: impl Into<String>) -> Self {
617 Self::AuthHotReloadFailed {
618 path: path.into(),
619 reason: reason.into(),
620 }
621 }
622
623 #[inline]
629 pub fn controller_not_leader(leader_id: Option<u64>) -> Self {
630 Self::Controller(ControllerError::NotLeader { leader_id })
631 }
632
633 #[inline]
635 pub fn controller_raft_error(reason: impl Into<String>) -> Self {
636 Self::Controller(ControllerError::Raft(reason.into()))
637 }
638
639 #[inline]
641 pub fn controller_metadata_not_found(key: impl Into<String>) -> Self {
642 Self::Controller(ControllerError::MetadataNotFound { key: key.into() })
643 }
644
645 #[inline]
647 pub fn controller_invalid_request(reason: impl Into<String>) -> Self {
648 Self::Controller(ControllerError::InvalidRequest(reason.into()))
649 }
650
651 #[inline]
653 pub fn controller_timeout(timeout_ms: u64) -> Self {
654 Self::Controller(ControllerError::Timeout { timeout_ms })
655 }
656
657 #[inline]
659 pub fn controller_shutdown() -> Self {
660 Self::Controller(ControllerError::Shutdown)
661 }
662
663 #[inline]
669 pub fn filter_empty_bytes() -> Self {
670 Self::Filter(FilterError::empty_bytes())
671 }
672
673 #[inline]
675 pub fn filter_invalid_bit_length() -> Self {
676 Self::Filter(FilterError::invalid_bit_length())
677 }
678
679 #[inline]
681 pub fn filter_bit_length_too_small() -> Self {
682 Self::Filter(FilterError::bit_length_too_small())
683 }
684
685 #[inline]
687 pub fn filter_bit_position_out_of_bounds(pos: usize, max: usize) -> Self {
688 Self::Filter(FilterError::bit_position_out_of_bounds(pos, max))
689 }
690
691 #[inline]
693 pub fn filter_byte_position_out_of_bounds(pos: usize, max: usize) -> Self {
694 Self::Filter(FilterError::byte_position_out_of_bounds(pos, max))
695 }
696
697 #[inline]
699 pub fn filter_uninitialized() -> Self {
700 Self::Filter(FilterError::uninitialized())
701 }
702}
703
704impl From<std::str::Utf8Error> for RocketMQError {
709 #[inline]
710 fn from(e: std::str::Utf8Error) -> Self {
711 Self::Serialization(SerializationError::from(e))
712 }
713}
714
715#[cfg(feature = "with_serde")]
716impl From<serde_json::Error> for RocketMQError {
717 #[inline]
718 fn from(e: serde_json::Error) -> Self {
719 Self::Serialization(SerializationError::from(e))
720 }
721}
722
723#[cfg(feature = "with_config")]
724impl From<config::ConfigError> for RocketMQError {
725 fn from(e: config::ConfigError) -> Self {
726 Self::ConfigParseFailed {
727 key: "unknown",
728 reason: e.to_string(),
729 }
730 }
731}
732
733#[derive(Debug, Error)]
739pub enum ServiceError {
740 #[error("Service is already running")]
742 AlreadyRunning,
743
744 #[error("Service is not running")]
746 NotRunning,
747
748 #[error("Service startup failed: {0}")]
750 StartupFailed(String),
751
752 #[error("Service shutdown failed: {0}")]
754 ShutdownFailed(String),
755
756 #[error("Service operation timeout")]
758 Timeout,
759
760 #[error("Service interrupted")]
762 Interrupted,
763}
764
765pub type RocketMQResult<T> = std::result::Result<T, RocketMQError>;
784
785pub type Result<T> = anyhow::Result<T>;
790
791#[cfg(test)]
792mod tests {
793 use super::*;
794
795 #[test]
796 fn test_error_creation() {
797 let err = RocketMQError::network_connection_failed("127.0.0.1:9876", "timeout");
798 assert!(err.to_string().contains("Connection failed"));
799 }
800
801 #[test]
802 fn test_error_conversion() {
803 let io_err = io::Error::new(io::ErrorKind::NotFound, "file not found");
804 let rmq_err: RocketMQError = io_err.into();
805 assert!(matches!(rmq_err, RocketMQError::IO(_)));
806 }
807
808 #[test]
809 fn test_broker_operation_with_addr() {
810 let err =
811 RocketMQError::broker_operation_failed("SEND_MESSAGE", 1, "failed").with_broker_addr("127.0.0.1:10911");
812
813 if let RocketMQError::BrokerOperationFailed { broker_addr, .. } = err {
814 assert_eq!(broker_addr, Some("127.0.0.1:10911".to_string()));
815 } else {
816 panic!("Expected BrokerOperationFailed");
817 }
818 }
819
820 #[test]
821 fn test_topic_not_exist() {
822 let err = RocketMQError::TopicNotExist {
823 topic: "TestTopic".to_string(),
824 };
825 assert_eq!(err.to_string(), "Topic 'TestTopic' does not exist");
826 }
827
828 #[test]
829 fn auth_config_and_hot_reload_errors_are_distinct() {
830 let config = RocketMQError::auth_config_invalid("auth.authorization", "provider not ready");
831 assert!(matches!(
832 config,
833 RocketMQError::AuthConfigInvalid {
834 key: "auth.authorization",
835 ..
836 }
837 ));
838 assert!(config.to_string().contains("Invalid auth configuration"));
839
840 let reload = RocketMQError::auth_hot_reload_failed("conf/plain_acl.yml", "parse failed");
841 assert!(matches!(
842 reload,
843 RocketMQError::AuthHotReloadFailed {
844 ref path,
845 ref reason
846 } if path == "conf/plain_acl.yml" && reason == "parse failed"
847 ));
848 assert!(reload.to_string().contains("Auth hot reload failed"));
849 }
850}