1mod network;
25mod protocol;
26mod serialization;
27mod tools;
28
29use std::io;
30
31pub use network::NetworkError;
32pub use protocol::ProtocolError;
33pub use serialization::SerializationError;
34use thiserror::Error;
35pub use tools::ToolsError;
36
37#[allow(deprecated)]
39pub use crate::client_error::*;
40#[allow(deprecated)]
41pub use crate::common_error::*;
42
43#[derive(Debug, Error)]
72pub enum RocketMQError {
73 #[error(transparent)]
78 Network(#[from] NetworkError),
79
80 #[error(transparent)]
85 Serialization(#[from] SerializationError),
86
87 #[error(transparent)]
92 Protocol(#[from] ProtocolError),
93
94 #[error("Broker not found: {name}")]
99 BrokerNotFound { name: String },
100
101 #[error("Broker registration failed for '{name}': {reason}")]
103 BrokerRegistrationFailed { name: String, reason: String },
104
105 #[error("Broker operation '{operation}' failed: code={code}, message={message}")]
107 BrokerOperationFailed {
108 operation: &'static str,
109 code: i32,
110 message: String,
111 broker_addr: Option<String>,
112 },
113
114 #[error("Topic '{topic}' does not exist")]
116 TopicNotExist { topic: String },
117
118 #[error("Queue does not exist: topic='{topic}', queue_id={queue_id}")]
120 QueueNotExist { topic: String, queue_id: i32 },
121
122 #[error("Subscription group '{group}' not found")]
124 SubscriptionGroupNotExist { group: String },
125
126 #[error("Queue {queue_id} out of range (0-{max}) for topic '{topic}'")]
128 QueueIdOutOfRange {
129 topic: String,
130 queue_id: i32,
131 max: i32,
132 },
133
134 #[error("Message body length {actual} bytes exceeds limit {limit} bytes")]
136 MessageTooLarge { actual: usize, limit: usize },
137
138 #[error("Message validation failed: {reason}")]
140 MessageValidationFailed { reason: String },
141
142 #[error("Retry limit {current}/{max} exceeded for group '{group}'")]
144 RetryLimitExceeded {
145 group: String,
146 current: i32,
147 max: i32,
148 },
149
150 #[error("Transaction message rejected by broker policy")]
152 TransactionRejected,
153
154 #[error("Broker permission denied: {operation}")]
156 BrokerPermissionDenied { operation: String },
157
158 #[error("Not master broker, master address: {master_address}")]
160 NotMasterBroker { master_address: String },
161
162 #[error("Message lookup failed at offset {offset}")]
164 MessageLookupFailed { offset: i64 },
165
166 #[error("Sending to topic '{topic}' is forbidden")]
168 TopicSendingForbidden { topic: String },
169
170 #[error("Async task '{task}' failed: {context}")]
172 BrokerAsyncTaskFailed {
173 task: &'static str,
174 context: String,
175 #[source]
176 source: Box<dyn std::error::Error + Send + Sync>,
177 },
178
179 #[error("Request body {operation} failed: {reason}")]
184 RequestBodyInvalid {
185 operation: &'static str,
186 reason: String,
187 },
188
189 #[error("Request header error: {0}")]
191 RequestHeaderError(String),
192
193 #[error("Response {operation} failed: {reason}")]
195 ResponseProcessFailed {
196 operation: &'static str,
197 reason: String,
198 },
199
200 #[error("Route information not found for topic '{topic}'")]
205 RouteNotFound { topic: String },
206
207 #[error("Route data inconsistency detected for topic '{topic}': {reason}")]
209 RouteInconsistent { topic: String, reason: String },
210
211 #[error("Broker registration conflict for '{broker_name}': {reason}")]
213 RouteRegistrationConflict { broker_name: String, reason: String },
214
215 #[error("Route state version conflict: expected={expected}, actual={actual}")]
217 RouteVersionConflict { expected: u64, actual: u64 },
218
219 #[error("Cluster '{cluster}' not found")]
221 ClusterNotFound { cluster: String },
222
223 #[error("Client is not started")]
228 ClientNotStarted,
229
230 #[error("Client is already started")]
232 ClientAlreadyStarted,
233
234 #[error("Client is shutting down")]
236 ClientShuttingDown,
237
238 #[error("Invalid client state: expected {expected}, got {actual}")]
240 ClientInvalidState {
241 expected: &'static str,
242 actual: String,
243 },
244
245 #[error("Producer is not available")]
247 ProducerNotAvailable,
248
249 #[error("Consumer is not available")]
251 ConsumerNotAvailable,
252
253 #[error(transparent)]
258 Tools(#[from] ToolsError),
259
260 #[error("Storage read failed for '{path}': {reason}")]
265 StorageReadFailed { path: String, reason: String },
266
267 #[error("Storage write failed for '{path}': {reason}")]
269 StorageWriteFailed { path: String, reason: String },
270
271 #[error("Corrupted data detected in '{path}'")]
273 StorageCorrupted { path: String },
274
275 #[error("Out of storage space: {path}")]
277 StorageOutOfSpace { path: String },
278
279 #[error("Failed to acquire lock for '{path}'")]
281 StorageLockFailed { path: String },
282
283 #[error("Configuration parse error for '{key}': {reason}")]
288 ConfigParseFailed { key: &'static str, reason: String },
289
290 #[error("Required configuration '{key}' is missing")]
292 ConfigMissing { key: &'static str },
293
294 #[error("Invalid configuration for '{key}': value='{value}', reason={reason}")]
296 ConfigInvalidValue {
297 key: &'static str,
298 value: String,
299 reason: String,
300 },
301
302 #[error("Not leader, current leader is: {}", leader_id.map(|id| id.to_string()).unwrap_or_else(|| "unknown".to_string()))]
307 ControllerNotLeader { leader_id: Option<u64> },
308
309 #[error("Raft consensus error: {reason}")]
311 ControllerRaftError { reason: String },
312
313 #[error("Consensus operation '{operation}' timed out after {timeout_ms}ms")]
315 ControllerConsensusTimeout {
316 operation: &'static str,
317 timeout_ms: u64,
318 },
319
320 #[error("Snapshot operation failed: {reason}")]
322 ControllerSnapshotFailed { reason: String },
323
324 #[error("IO error: {0}")]
329 IO(#[from] io::Error),
330
331 #[error("Illegal argument: {0}")]
333 IllegalArgument(String),
334
335 #[error("Operation '{operation}' timed out after {timeout_ms}ms")]
337 Timeout {
338 operation: &'static str,
339 timeout_ms: u64,
340 },
341
342 #[error("Internal error: {0}")]
344 Internal(String),
345
346 #[error("Service error: {0}")]
348 Service(#[from] ServiceError),
349
350 #[error("Invalid RocketMQ version ordinal: {0}")]
355 InvalidVersionOrdinal(u32),
356
357 #[deprecated(since = "0.7.0", note = "Use specific error types instead")]
362 #[error("{0}")]
363 Legacy(String),
364}
365
366impl RocketMQError {
371 #[inline]
373 pub fn network_connection_failed(addr: impl Into<String>, reason: impl Into<String>) -> Self {
374 Self::Network(NetworkError::connection_failed(addr, reason))
375 }
376
377 #[inline]
379 pub fn broker_operation_failed(
380 operation: &'static str,
381 code: i32,
382 message: impl Into<String>,
383 ) -> Self {
384 Self::BrokerOperationFailed {
385 operation,
386 code,
387 message: message.into(),
388 broker_addr: None,
389 }
390 }
391
392 #[inline]
394 pub fn storage_read_failed(path: impl Into<String>, reason: impl Into<String>) -> Self {
395 Self::StorageReadFailed {
396 path: path.into(),
397 reason: reason.into(),
398 }
399 }
400
401 #[inline]
403 pub fn storage_write_failed(path: impl Into<String>, reason: impl Into<String>) -> Self {
404 Self::StorageWriteFailed {
405 path: path.into(),
406 reason: reason.into(),
407 }
408 }
409
410 #[inline]
412 pub fn illegal_argument(message: impl Into<String>) -> Self {
413 Self::IllegalArgument(message.into())
414 }
415
416 #[inline]
418 pub fn route_not_found(topic: impl Into<String>) -> Self {
419 Self::RouteNotFound {
420 topic: topic.into(),
421 }
422 }
423
424 #[inline]
426 pub fn route_registration_conflict(
427 broker_name: impl Into<String>,
428 reason: impl Into<String>,
429 ) -> Self {
430 Self::RouteRegistrationConflict {
431 broker_name: broker_name.into(),
432 reason: reason.into(),
433 }
434 }
435
436 #[inline]
438 pub fn cluster_not_found(cluster: impl Into<String>) -> Self {
439 Self::ClusterNotFound {
440 cluster: cluster.into(),
441 }
442 }
443
444 #[inline]
446 pub fn request_body_invalid(operation: &'static str, reason: impl Into<String>) -> Self {
447 Self::RequestBodyInvalid {
448 operation,
449 reason: reason.into(),
450 }
451 }
452
453 #[inline]
455 pub fn request_header_error(message: impl Into<String>) -> Self {
456 Self::RequestHeaderError(message.into())
457 }
458
459 #[inline]
461 pub fn response_process_failed(operation: &'static str, reason: impl Into<String>) -> Self {
462 Self::ResponseProcessFailed {
463 operation,
464 reason: reason.into(),
465 }
466 }
467
468 pub fn with_broker_addr(self, addr: impl Into<String>) -> Self {
470 match self {
471 Self::BrokerOperationFailed {
472 operation,
473 code,
474 message,
475 broker_addr: _,
476 } => Self::BrokerOperationFailed {
477 operation,
478 code,
479 message,
480 broker_addr: Some(addr.into()),
481 },
482 other => other,
483 }
484 }
485
486 #[inline]
488 pub fn validation_error(field: impl Into<String>, reason: impl Into<String>) -> Self {
489 Self::Tools(ToolsError::validation_error(field, reason))
490 }
491
492 #[inline]
494 pub fn topic_not_found(topic: impl Into<String>) -> Self {
495 Self::Tools(ToolsError::topic_not_found(topic))
496 }
497
498 #[inline]
500 pub fn topic_already_exists(topic: impl Into<String>) -> Self {
501 Self::Tools(ToolsError::topic_already_exists(topic))
502 }
503
504 #[inline]
506 pub fn nameserver_unreachable(addr: impl Into<String>) -> Self {
507 Self::Tools(ToolsError::nameserver_unreachable(addr))
508 }
509
510 #[inline]
512 pub fn nameserver_config_invalid(reason: impl Into<String>) -> Self {
513 Self::Tools(ToolsError::nameserver_config_invalid(reason))
514 }
515}
516
517impl From<std::str::Utf8Error> for RocketMQError {
522 #[inline]
523 fn from(e: std::str::Utf8Error) -> Self {
524 Self::Serialization(SerializationError::from(e))
525 }
526}
527
528#[cfg(feature = "with_serde")]
529impl From<serde_json::Error> for RocketMQError {
530 #[inline]
531 fn from(e: serde_json::Error) -> Self {
532 Self::Serialization(SerializationError::from(e))
533 }
534}
535
536#[cfg(feature = "with_config")]
537impl From<config::ConfigError> for RocketMQError {
538 fn from(e: config::ConfigError) -> Self {
539 Self::ConfigParseFailed {
540 key: "unknown",
541 reason: e.to_string(),
542 }
543 }
544}
545
546#[derive(Debug, Error)]
552pub enum ServiceError {
553 #[error("Service is already running")]
555 AlreadyRunning,
556
557 #[error("Service is not running")]
559 NotRunning,
560
561 #[error("Service startup failed: {0}")]
563 StartupFailed(String),
564
565 #[error("Service shutdown failed: {0}")]
567 ShutdownFailed(String),
568
569 #[error("Service operation timeout")]
571 Timeout,
572
573 #[error("Service interrupted")]
575 Interrupted,
576}
577
578pub type RocketMQResult<T> = std::result::Result<T, RocketMQError>;
597
598pub type Result<T> = anyhow::Result<T>;
603
604#[cfg(test)]
605mod tests {
606 use super::*;
607
608 #[test]
609 fn test_error_creation() {
610 let err = RocketMQError::network_connection_failed("127.0.0.1:9876", "timeout");
611 assert!(err.to_string().contains("Connection failed"));
612 }
613
614 #[test]
615 fn test_error_conversion() {
616 let io_err = io::Error::new(io::ErrorKind::NotFound, "file not found");
617 let rmq_err: RocketMQError = io_err.into();
618 assert!(matches!(rmq_err, RocketMQError::IO(_)));
619 }
620
621 #[test]
622 fn test_broker_operation_with_addr() {
623 let err = RocketMQError::broker_operation_failed("SEND_MESSAGE", 1, "failed")
624 .with_broker_addr("127.0.0.1:10911");
625
626 if let RocketMQError::BrokerOperationFailed { broker_addr, .. } = err {
627 assert_eq!(broker_addr, Some("127.0.0.1:10911".to_string()));
628 } else {
629 panic!("Expected BrokerOperationFailed");
630 }
631 }
632
633 #[test]
634 fn test_topic_not_exist() {
635 let err = RocketMQError::TopicNotExist {
636 topic: "TestTopic".to_string(),
637 };
638 assert_eq!(err.to_string(), "Topic 'TestTopic' does not exist");
639 }
640}