1mod network;
22mod protocol;
23mod rpc;
24mod serialization;
25mod tools;
26
27use std::io;
28
29pub use crate::filter_error::FilterError;
31
32pub use network::NetworkError;
33pub use protocol::ProtocolError;
34pub use rpc::RpcClientError;
35pub use serialization::SerializationError;
36use thiserror::Error;
37pub use tools::ToolsError;
38
39pub use crate::auth_error::AuthError;
41#[allow(deprecated)]
43pub use crate::client_error::*;
44#[allow(deprecated)]
45pub use crate::common_error::*;
46pub use crate::controller_error::ControllerError;
48
49#[derive(Debug, Error)]
86pub enum RocketMQError {
87 #[error(transparent)]
92 Network(#[from] NetworkError),
93
94 #[error(transparent)]
99 Serialization(#[from] SerializationError),
100
101 #[error(transparent)]
106 Protocol(#[from] ProtocolError),
107
108 #[error(transparent)]
113 Rpc(#[from] RpcClientError),
114
115 #[error(transparent)]
120 Authentication(#[from] AuthError),
121
122 #[error(transparent)]
127 Controller(#[from] ControllerError),
128
129 #[error("Invalid message property: {0}")]
134 InvalidProperty(String),
135
136 #[error("Broker not found: {name}")]
141 BrokerNotFound { name: String },
142
143 #[error("Broker registration failed for '{name}': {reason}")]
145 BrokerRegistrationFailed { name: String, reason: String },
146
147 #[error("Broker operation '{operation}' failed: code={code}, message={message}")]
149 BrokerOperationFailed {
150 operation: &'static str,
151 code: i32,
152 message: String,
153 broker_addr: Option<String>,
154 },
155
156 #[error("Topic '{topic}' does not exist")]
158 TopicNotExist { topic: String },
159
160 #[error("Queue does not exist: topic='{topic}', queue_id={queue_id}")]
162 QueueNotExist { topic: String, queue_id: i32 },
163
164 #[error("Subscription group '{group}' not found")]
166 SubscriptionGroupNotExist { group: String },
167
168 #[error("Queue {queue_id} out of range (0-{max}) for topic '{topic}'")]
170 QueueIdOutOfRange { topic: String, queue_id: i32, max: i32 },
171
172 #[error("Message body length {actual} bytes exceeds limit {limit} bytes")]
174 MessageTooLarge { actual: usize, limit: usize },
175
176 #[error("Message validation failed: {reason}")]
178 MessageValidationFailed { reason: String },
179
180 #[error("Retry limit {current}/{max} exceeded for group '{group}'")]
182 RetryLimitExceeded { group: String, current: i32, max: i32 },
183
184 #[error("Transaction message rejected by broker policy")]
186 TransactionRejected,
187
188 #[error("Broker permission denied: {operation}")]
190 BrokerPermissionDenied { operation: String },
191
192 #[error("Not master broker, master address: {master_address}")]
194 NotMasterBroker { master_address: String },
195
196 #[error("Message lookup failed at offset {offset}")]
198 MessageLookupFailed { offset: i64 },
199
200 #[error("Sending to topic '{topic}' is forbidden")]
202 TopicSendingForbidden { topic: String },
203
204 #[error("Async task '{task}' failed: {context}")]
206 BrokerAsyncTaskFailed {
207 task: &'static str,
208 context: String,
209 #[source]
210 source: Box<dyn std::error::Error + Send + Sync>,
211 },
212
213 #[error("Request body {operation} failed: {reason}")]
218 RequestBodyInvalid { operation: &'static str, reason: String },
219
220 #[error("Request header error: {0}")]
222 RequestHeaderError(String),
223
224 #[error("Response {operation} failed: {reason}")]
226 ResponseProcessFailed { operation: &'static str, reason: String },
227
228 #[error("Route information not found for topic '{topic}'")]
233 RouteNotFound { topic: String },
234
235 #[error("Route data inconsistency detected for topic '{topic}': {reason}")]
237 RouteInconsistent { topic: String, reason: String },
238
239 #[error("Broker registration conflict for '{broker_name}': {reason}")]
241 RouteRegistrationConflict { broker_name: String, reason: String },
242
243 #[error("Route state version conflict: expected={expected}, actual={actual}")]
245 RouteVersionConflict { expected: u64, actual: u64 },
246
247 #[error("Cluster '{cluster}' not found")]
249 ClusterNotFound { cluster: String },
250
251 #[error("Client is not started")]
256 ClientNotStarted,
257
258 #[error("Client is already started")]
260 ClientAlreadyStarted,
261
262 #[error("Client is shutting down")]
264 ClientShuttingDown,
265
266 #[error("Invalid client state: expected {expected}, got {actual}")]
268 ClientInvalidState { expected: &'static str, actual: String },
269
270 #[error("Producer is not available")]
272 ProducerNotAvailable,
273
274 #[error("Consumer is not available")]
276 ConsumerNotAvailable,
277
278 #[error(transparent)]
283 Tools(#[from] ToolsError),
284
285 #[error(transparent)]
290 Filter(#[from] FilterError),
291
292 #[error("Storage read failed for '{path}': {reason}")]
297 StorageReadFailed { path: String, reason: String },
298
299 #[error("Storage write failed for '{path}': {reason}")]
301 StorageWriteFailed { path: String, reason: String },
302
303 #[error("Corrupted data detected in '{path}'")]
305 StorageCorrupted { path: String },
306
307 #[error("Out of storage space: {path}")]
309 StorageOutOfSpace { path: String },
310
311 #[error("Failed to acquire lock for '{path}'")]
313 StorageLockFailed { path: String },
314
315 #[error("Configuration parse error for '{key}': {reason}")]
320 ConfigParseFailed { key: &'static str, reason: String },
321
322 #[error("Required configuration '{key}' is missing")]
324 ConfigMissing { key: &'static str },
325
326 #[error("Invalid configuration for '{key}': value='{value}', reason={reason}")]
328 ConfigInvalidValue {
329 key: &'static str,
330 value: String,
331 reason: String,
332 },
333
334 #[error("Not leader, current leader is: {}", leader_id.map(|id| id.to_string()).unwrap_or_else(|| "unknown".to_string()))]
339 ControllerNotLeader { leader_id: Option<u64> },
340
341 #[error("Raft consensus error: {reason}")]
343 ControllerRaftError { reason: String },
344
345 #[error("Consensus operation '{operation}' timed out after {timeout_ms}ms")]
347 ControllerConsensusTimeout { operation: &'static str, timeout_ms: u64 },
348
349 #[error("Snapshot operation failed: {reason}")]
351 ControllerSnapshotFailed { reason: String },
352
353 #[error("IO error: {0}")]
358 IO(#[from] io::Error),
359
360 #[error("Illegal argument: {0}")]
362 IllegalArgument(String),
363
364 #[error("Operation '{operation}' timed out after {timeout_ms}ms")]
366 Timeout { operation: &'static str, timeout_ms: u64 },
367
368 #[error("Internal error: {0}")]
370 Internal(String),
371
372 #[error("Service error: {0}")]
374 Service(#[from] ServiceError),
375
376 #[error("Invalid RocketMQ version ordinal: {0}")]
381 InvalidVersionOrdinal(u32),
382
383 #[deprecated(since = "0.7.0", note = "Use specific error types instead")]
388 #[error("{0}")]
389 Legacy(String),
390
391 #[error("Not initialized: {0}")]
392 NotInitialized(String),
393
394 #[error("Message is missing required property: {property}")]
395 MissingRequiredMessageProperty { property: &'static str },
396}
397
398impl RocketMQError {
403 #[inline]
405 pub fn network_connection_failed(addr: impl Into<String>, reason: impl Into<String>) -> Self {
406 Self::Network(NetworkError::connection_failed(addr, reason))
407 }
408
409 #[inline]
411 pub fn network_timeout(addr: impl Into<String>, timeout: std::time::Duration) -> Self {
412 Self::Network(NetworkError::request_timeout(addr, timeout.as_millis() as u64))
413 }
414
415 #[inline]
417 pub fn network_request_failed(addr: impl Into<String>, reason: impl Into<String>) -> Self {
418 Self::Network(NetworkError::send_failed(addr, reason))
419 }
420
421 #[inline]
423 pub fn deserialization_failed(format: &'static str, reason: impl Into<String>) -> Self {
424 Self::Serialization(SerializationError::decode_failed(format, reason))
425 }
426
427 #[inline]
429 pub fn validation_failed(field: impl Into<String>, reason: impl Into<String>) -> Self {
430 Self::Tools(ToolsError::validation_error(field, reason))
431 }
432
433 #[inline]
435 pub fn broker_operation_failed(operation: &'static str, code: i32, message: impl Into<String>) -> Self {
436 Self::BrokerOperationFailed {
437 operation,
438 code,
439 message: message.into(),
440 broker_addr: None,
441 }
442 }
443
444 #[inline]
446 pub fn storage_read_failed(path: impl Into<String>, reason: impl Into<String>) -> Self {
447 Self::StorageReadFailed {
448 path: path.into(),
449 reason: reason.into(),
450 }
451 }
452
453 #[inline]
455 pub fn storage_write_failed(path: impl Into<String>, reason: impl Into<String>) -> Self {
456 Self::StorageWriteFailed {
457 path: path.into(),
458 reason: reason.into(),
459 }
460 }
461
462 #[inline]
464 pub fn illegal_argument(message: impl Into<String>) -> Self {
465 Self::IllegalArgument(message.into())
466 }
467
468 #[inline]
470 pub fn route_not_found(topic: impl Into<String>) -> Self {
471 Self::RouteNotFound { topic: topic.into() }
472 }
473
474 #[inline]
476 pub fn route_registration_conflict(broker_name: impl Into<String>, reason: impl Into<String>) -> Self {
477 Self::RouteRegistrationConflict {
478 broker_name: broker_name.into(),
479 reason: reason.into(),
480 }
481 }
482
483 #[inline]
485 pub fn cluster_not_found(cluster: impl Into<String>) -> Self {
486 Self::ClusterNotFound {
487 cluster: cluster.into(),
488 }
489 }
490
491 #[inline]
493 pub fn request_body_invalid(operation: &'static str, reason: impl Into<String>) -> Self {
494 Self::RequestBodyInvalid {
495 operation,
496 reason: reason.into(),
497 }
498 }
499
500 #[inline]
502 pub fn request_header_error(message: impl Into<String>) -> Self {
503 Self::RequestHeaderError(message.into())
504 }
505
506 #[inline]
508 pub fn response_process_failed(operation: &'static str, reason: impl Into<String>) -> Self {
509 Self::ResponseProcessFailed {
510 operation,
511 reason: reason.into(),
512 }
513 }
514
515 pub fn with_broker_addr(self, addr: impl Into<String>) -> Self {
517 match self {
518 Self::BrokerOperationFailed {
519 operation,
520 code,
521 message,
522 broker_addr: _,
523 } => Self::BrokerOperationFailed {
524 operation,
525 code,
526 message,
527 broker_addr: Some(addr.into()),
528 },
529 other => other,
530 }
531 }
532
533 #[inline]
535 pub fn validation_error(field: impl Into<String>, reason: impl Into<String>) -> Self {
536 Self::Tools(ToolsError::validation_error(field, reason))
537 }
538
539 #[inline]
541 pub fn topic_not_found(topic: impl Into<String>) -> Self {
542 Self::Tools(ToolsError::topic_not_found(topic))
543 }
544
545 #[inline]
547 pub fn topic_already_exists(topic: impl Into<String>) -> Self {
548 Self::Tools(ToolsError::topic_already_exists(topic))
549 }
550
551 #[inline]
553 pub fn nameserver_unreachable(addr: impl Into<String>) -> Self {
554 Self::Tools(ToolsError::nameserver_unreachable(addr))
555 }
556
557 #[inline]
559 pub fn nameserver_config_invalid(reason: impl Into<String>) -> Self {
560 Self::Tools(ToolsError::nameserver_config_invalid(reason))
561 }
562
563 #[inline]
565 pub fn not_initialized(reason: impl Into<String>) -> Self {
566 Self::NotInitialized(reason.into())
567 }
568
569 #[inline]
575 pub fn authentication_failed(reason: impl Into<String>) -> Self {
576 Self::Authentication(AuthError::AuthenticationFailed(reason.into()))
577 }
578
579 #[inline]
581 pub fn invalid_credential(reason: impl Into<String>) -> Self {
582 Self::Authentication(AuthError::InvalidCredential(reason.into()))
583 }
584
585 #[inline]
587 pub fn user_not_found(username: impl Into<String>) -> Self {
588 Self::Authentication(AuthError::UserNotFound(username.into()))
589 }
590
591 #[inline]
593 pub fn invalid_signature(reason: impl Into<String>) -> Self {
594 Self::Authentication(AuthError::InvalidSignature(reason.into()))
595 }
596
597 #[inline]
603 pub fn controller_not_leader(leader_id: Option<u64>) -> Self {
604 Self::Controller(ControllerError::NotLeader { leader_id })
605 }
606
607 #[inline]
609 pub fn controller_raft_error(reason: impl Into<String>) -> Self {
610 Self::Controller(ControllerError::Raft(reason.into()))
611 }
612
613 #[inline]
615 pub fn controller_metadata_not_found(key: impl Into<String>) -> Self {
616 Self::Controller(ControllerError::MetadataNotFound { key: key.into() })
617 }
618
619 #[inline]
621 pub fn controller_invalid_request(reason: impl Into<String>) -> Self {
622 Self::Controller(ControllerError::InvalidRequest(reason.into()))
623 }
624
625 #[inline]
627 pub fn controller_timeout(timeout_ms: u64) -> Self {
628 Self::Controller(ControllerError::Timeout { timeout_ms })
629 }
630
631 #[inline]
633 pub fn controller_shutdown() -> Self {
634 Self::Controller(ControllerError::Shutdown)
635 }
636
637 #[inline]
643 pub fn filter_empty_bytes() -> Self {
644 Self::Filter(FilterError::empty_bytes())
645 }
646
647 #[inline]
649 pub fn filter_invalid_bit_length() -> Self {
650 Self::Filter(FilterError::invalid_bit_length())
651 }
652
653 #[inline]
655 pub fn filter_bit_length_too_small() -> Self {
656 Self::Filter(FilterError::bit_length_too_small())
657 }
658
659 #[inline]
661 pub fn filter_bit_position_out_of_bounds(pos: usize, max: usize) -> Self {
662 Self::Filter(FilterError::bit_position_out_of_bounds(pos, max))
663 }
664
665 #[inline]
667 pub fn filter_byte_position_out_of_bounds(pos: usize, max: usize) -> Self {
668 Self::Filter(FilterError::byte_position_out_of_bounds(pos, max))
669 }
670
671 #[inline]
673 pub fn filter_uninitialized() -> Self {
674 Self::Filter(FilterError::uninitialized())
675 }
676}
677
678impl From<std::str::Utf8Error> for RocketMQError {
683 #[inline]
684 fn from(e: std::str::Utf8Error) -> Self {
685 Self::Serialization(SerializationError::from(e))
686 }
687}
688
689#[cfg(feature = "with_serde")]
690impl From<serde_json::Error> for RocketMQError {
691 #[inline]
692 fn from(e: serde_json::Error) -> Self {
693 Self::Serialization(SerializationError::from(e))
694 }
695}
696
697#[cfg(feature = "with_config")]
698impl From<config::ConfigError> for RocketMQError {
699 fn from(e: config::ConfigError) -> Self {
700 Self::ConfigParseFailed {
701 key: "unknown",
702 reason: e.to_string(),
703 }
704 }
705}
706
707#[derive(Debug, Error)]
713pub enum ServiceError {
714 #[error("Service is already running")]
716 AlreadyRunning,
717
718 #[error("Service is not running")]
720 NotRunning,
721
722 #[error("Service startup failed: {0}")]
724 StartupFailed(String),
725
726 #[error("Service shutdown failed: {0}")]
728 ShutdownFailed(String),
729
730 #[error("Service operation timeout")]
732 Timeout,
733
734 #[error("Service interrupted")]
736 Interrupted,
737}
738
739pub type RocketMQResult<T> = std::result::Result<T, RocketMQError>;
758
759pub type Result<T> = anyhow::Result<T>;
764
765#[cfg(test)]
766mod tests {
767 use super::*;
768
769 #[test]
770 fn test_error_creation() {
771 let err = RocketMQError::network_connection_failed("127.0.0.1:9876", "timeout");
772 assert!(err.to_string().contains("Connection failed"));
773 }
774
775 #[test]
776 fn test_error_conversion() {
777 let io_err = io::Error::new(io::ErrorKind::NotFound, "file not found");
778 let rmq_err: RocketMQError = io_err.into();
779 assert!(matches!(rmq_err, RocketMQError::IO(_)));
780 }
781
782 #[test]
783 fn test_broker_operation_with_addr() {
784 let err =
785 RocketMQError::broker_operation_failed("SEND_MESSAGE", 1, "failed").with_broker_addr("127.0.0.1:10911");
786
787 if let RocketMQError::BrokerOperationFailed { broker_addr, .. } = err {
788 assert_eq!(broker_addr, Some("127.0.0.1:10911".to_string()));
789 } else {
790 panic!("Expected BrokerOperationFailed");
791 }
792 }
793
794 #[test]
795 fn test_topic_not_exist() {
796 let err = RocketMQError::TopicNotExist {
797 topic: "TestTopic".to_string(),
798 };
799 assert_eq!(err.to_string(), "Topic 'TestTopic' does not exist");
800 }
801}