rocketmq_error/unified/
mod.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18//! Unified error system for RocketMQ Rust implementation
19//!
20//! This module provides a centralized, semantic, and performant error handling system
21//! for all RocketMQ operations. All errors are categorized into logical groups for
22//! better debuggability and maintainability.
23
24mod network;
25mod protocol;
26mod serialization;
27mod tools;
28
29use std::io;
30
31pub use network::NetworkError;
32pub use protocol::ProtocolError;
33pub use serialization::SerializationError;
34use thiserror::Error;
35pub use tools::ToolsError;
36
37// Re-export legacy error types for backward compatibility (will be deprecated)
38#[allow(deprecated)]
39pub use crate::client_error::*;
40#[allow(deprecated)]
41pub use crate::common_error::*;
42
43/// Main error type for all RocketMQ operations
44///
45/// This enum provides a unified error system across all RocketMQ crates.
46/// Each variant represents a logical category of errors with rich context information.
47///
48/// # Design Principles
49/// - **Semantic**: Each error clearly expresses what went wrong
50/// - **Performance**: Minimal heap allocations, use of &'static str where possible
51/// - **Debuggability**: Rich context for production debugging
52/// - **Ergonomics**: Automatic conversions via From trait
53///
54/// # Examples
55///
56/// ```rust
57/// use rocketmq_error::RocketMQError;
58/// use rocketmq_error::RocketMQResult;
59///
60/// fn send_message(addr: &str) -> RocketMQResult<()> {
61///     // Create a network error
62///     if addr.is_empty() {
63///         return Err(RocketMQError::network_connection_failed(
64///             "localhost:9876",
65///             "empty address",
66///         ));
67///     }
68///     Ok(())
69/// }
70/// ```
71#[derive(Debug, Error)]
72pub enum RocketMQError {
73    // ============================================================================
74    // Network Errors
75    // ============================================================================
76    /// Network operation errors (connection, timeout, send/receive failures)
77    #[error(transparent)]
78    Network(#[from] NetworkError),
79
80    // ============================================================================
81    // Serialization Errors
82    // ============================================================================
83    /// Serialization/deserialization errors (encoding, decoding, format validation)
84    #[error(transparent)]
85    Serialization(#[from] SerializationError),
86
87    // ============================================================================
88    // Protocol Errors
89    // ============================================================================
90    /// RocketMQ protocol errors (invalid commands, version mismatch, etc.)
91    #[error(transparent)]
92    Protocol(#[from] ProtocolError),
93
94    // ============================================================================
95    // Broker Errors
96    // ============================================================================
97    /// Broker not found
98    #[error("Broker not found: {name}")]
99    BrokerNotFound { name: String },
100
101    /// Broker registration failed
102    #[error("Broker registration failed for '{name}': {reason}")]
103    BrokerRegistrationFailed { name: String, reason: String },
104
105    /// Broker operation failed with error code
106    #[error("Broker operation '{operation}' failed: code={code}, message={message}")]
107    BrokerOperationFailed {
108        operation: &'static str,
109        code: i32,
110        message: String,
111        broker_addr: Option<String>,
112    },
113
114    /// Topic does not exist
115    #[error("Topic '{topic}' does not exist")]
116    TopicNotExist { topic: String },
117
118    /// Queue does not exist
119    #[error("Queue does not exist: topic='{topic}', queue_id={queue_id}")]
120    QueueNotExist { topic: String, queue_id: i32 },
121
122    /// Subscription group not found
123    #[error("Subscription group '{group}' not found")]
124    SubscriptionGroupNotExist { group: String },
125
126    /// Queue ID out of range
127    #[error("Queue {queue_id} out of range (0-{max}) for topic '{topic}'")]
128    QueueIdOutOfRange {
129        topic: String,
130        queue_id: i32,
131        max: i32,
132    },
133
134    /// Message body too large
135    #[error("Message body length {actual} bytes exceeds limit {limit} bytes")]
136    MessageTooLarge { actual: usize, limit: usize },
137
138    /// Message validation failed
139    #[error("Message validation failed: {reason}")]
140    MessageValidationFailed { reason: String },
141
142    /// Retry limit exceeded
143    #[error("Retry limit {current}/{max} exceeded for group '{group}'")]
144    RetryLimitExceeded {
145        group: String,
146        current: i32,
147        max: i32,
148    },
149
150    /// Transaction message rejected
151    #[error("Transaction message rejected by broker policy")]
152    TransactionRejected,
153
154    /// Broker permission denied
155    #[error("Broker permission denied: {operation}")]
156    BrokerPermissionDenied { operation: String },
157
158    /// Not master broker
159    #[error("Not master broker, master address: {master_address}")]
160    NotMasterBroker { master_address: String },
161
162    /// Message lookup failed
163    #[error("Message lookup failed at offset {offset}")]
164    MessageLookupFailed { offset: i64 },
165
166    /// Topic sending forbidden
167    #[error("Sending to topic '{topic}' is forbidden")]
168    TopicSendingForbidden { topic: String },
169
170    /// Async task failed
171    #[error("Async task '{task}' failed: {context}")]
172    BrokerAsyncTaskFailed {
173        task: &'static str,
174        context: String,
175        #[source]
176        source: Box<dyn std::error::Error + Send + Sync>,
177    },
178
179    // ============================================================================
180    // Request/Response Errors
181    // ============================================================================
182    /// Request body missing or invalid
183    #[error("Request body {operation} failed: {reason}")]
184    RequestBodyInvalid {
185        operation: &'static str,
186        reason: String,
187    },
188
189    /// Request header missing or invalid
190    #[error("Request header error: {0}")]
191    RequestHeaderError(String),
192
193    /// Response encoding/decoding failed
194    #[error("Response {operation} failed: {reason}")]
195    ResponseProcessFailed {
196        operation: &'static str,
197        reason: String,
198    },
199
200    // ============================================================================
201    // NameServer/Route Errors
202    // ============================================================================
203    /// Route information not found
204    #[error("Route information not found for topic '{topic}'")]
205    RouteNotFound { topic: String },
206
207    /// Route data inconsistency detected
208    #[error("Route data inconsistency detected for topic '{topic}': {reason}")]
209    RouteInconsistent { topic: String, reason: String },
210
211    /// Broker registration conflict
212    #[error("Broker registration conflict for '{broker_name}': {reason}")]
213    RouteRegistrationConflict { broker_name: String, reason: String },
214
215    /// Route state version conflict
216    #[error("Route state version conflict: expected={expected}, actual={actual}")]
217    RouteVersionConflict { expected: u64, actual: u64 },
218
219    /// Cluster not found
220    #[error("Cluster '{cluster}' not found")]
221    ClusterNotFound { cluster: String },
222
223    // ============================================================================
224    // Client Errors
225    // ============================================================================
226    /// Client not started
227    #[error("Client is not started")]
228    ClientNotStarted,
229
230    /// Client already started
231    #[error("Client is already started")]
232    ClientAlreadyStarted,
233
234    /// Client is shutting down
235    #[error("Client is shutting down")]
236    ClientShuttingDown,
237
238    /// Invalid client state
239    #[error("Invalid client state: expected {expected}, got {actual}")]
240    ClientInvalidState {
241        expected: &'static str,
242        actual: String,
243    },
244
245    /// Producer not available
246    #[error("Producer is not available")]
247    ProducerNotAvailable,
248
249    /// Consumer not available
250    #[error("Consumer is not available")]
251    ConsumerNotAvailable,
252
253    // ============================================================================
254    // Tools/Admin Errors
255    // ============================================================================
256    /// Tools and admin operation errors
257    #[error(transparent)]
258    Tools(#[from] ToolsError),
259
260    // ============================================================================
261    // Storage Errors
262    // ============================================================================
263    /// Storage read failed
264    #[error("Storage read failed for '{path}': {reason}")]
265    StorageReadFailed { path: String, reason: String },
266
267    /// Storage write failed
268    #[error("Storage write failed for '{path}': {reason}")]
269    StorageWriteFailed { path: String, reason: String },
270
271    /// Data corruption detected
272    #[error("Corrupted data detected in '{path}'")]
273    StorageCorrupted { path: String },
274
275    /// Out of storage space
276    #[error("Out of storage space: {path}")]
277    StorageOutOfSpace { path: String },
278
279    /// Storage lock failed
280    #[error("Failed to acquire lock for '{path}'")]
281    StorageLockFailed { path: String },
282
283    // ============================================================================
284    // Configuration Errors
285    // ============================================================================
286    /// Configuration parsing failed
287    #[error("Configuration parse error for '{key}': {reason}")]
288    ConfigParseFailed { key: &'static str, reason: String },
289
290    /// Required configuration missing
291    #[error("Required configuration '{key}' is missing")]
292    ConfigMissing { key: &'static str },
293
294    /// Invalid configuration value
295    #[error("Invalid configuration for '{key}': value='{value}', reason={reason}")]
296    ConfigInvalidValue {
297        key: &'static str,
298        value: String,
299        reason: String,
300    },
301
302    // ============================================================================
303    // Controller/Raft Errors
304    // ============================================================================
305    /// Not the Raft leader
306    #[error("Not leader, current leader is: {}", leader_id.map(|id| id.to_string()).unwrap_or_else(|| "unknown".to_string()))]
307    ControllerNotLeader { leader_id: Option<u64> },
308
309    /// Raft consensus error
310    #[error("Raft consensus error: {reason}")]
311    ControllerRaftError { reason: String },
312
313    /// Consensus operation timeout
314    #[error("Consensus operation '{operation}' timed out after {timeout_ms}ms")]
315    ControllerConsensusTimeout {
316        operation: &'static str,
317        timeout_ms: u64,
318    },
319
320    /// Snapshot operation failed
321    #[error("Snapshot operation failed: {reason}")]
322    ControllerSnapshotFailed { reason: String },
323
324    // ============================================================================
325    // System Errors
326    // ============================================================================
327    /// IO error from std::io
328    #[error("IO error: {0}")]
329    IO(#[from] io::Error),
330
331    /// Illegal argument
332    #[error("Illegal argument: {0}")]
333    IllegalArgument(String),
334
335    /// Operation timeout
336    #[error("Operation '{operation}' timed out after {timeout_ms}ms")]
337    Timeout {
338        operation: &'static str,
339        timeout_ms: u64,
340    },
341
342    /// Internal error (should be rare)
343    #[error("Internal error: {0}")]
344    Internal(String),
345
346    /// Service lifecycle error
347    #[error("Service error: {0}")]
348    Service(#[from] ServiceError),
349
350    // ============================================================================
351    // Version Errors
352    // ============================================================================
353    /// Invalid RocketMQ version ordinal value
354    #[error("Invalid RocketMQ version ordinal: {0}")]
355    InvalidVersionOrdinal(u32),
356
357    // ============================================================================
358    // Legacy Errors (Deprecated - for backward compatibility)
359    // ============================================================================
360    /// Legacy error - use specific error types instead
361    #[deprecated(since = "0.7.0", note = "Use specific error types instead")]
362    #[error("{0}")]
363    Legacy(String),
364}
365
366// ============================================================================
367// Convenience Constructors
368// ============================================================================
369
370impl RocketMQError {
371    /// Create a network connection failed error
372    #[inline]
373    pub fn network_connection_failed(addr: impl Into<String>, reason: impl Into<String>) -> Self {
374        Self::Network(NetworkError::connection_failed(addr, reason))
375    }
376
377    /// Create a broker operation failed error
378    #[inline]
379    pub fn broker_operation_failed(
380        operation: &'static str,
381        code: i32,
382        message: impl Into<String>,
383    ) -> Self {
384        Self::BrokerOperationFailed {
385            operation,
386            code,
387            message: message.into(),
388            broker_addr: None,
389        }
390    }
391
392    /// Create a storage read failed error
393    #[inline]
394    pub fn storage_read_failed(path: impl Into<String>, reason: impl Into<String>) -> Self {
395        Self::StorageReadFailed {
396            path: path.into(),
397            reason: reason.into(),
398        }
399    }
400
401    /// Create a storage write failed error
402    #[inline]
403    pub fn storage_write_failed(path: impl Into<String>, reason: impl Into<String>) -> Self {
404        Self::StorageWriteFailed {
405            path: path.into(),
406            reason: reason.into(),
407        }
408    }
409
410    /// Create an illegal argument error
411    #[inline]
412    pub fn illegal_argument(message: impl Into<String>) -> Self {
413        Self::IllegalArgument(message.into())
414    }
415
416    /// Create a route not found error
417    #[inline]
418    pub fn route_not_found(topic: impl Into<String>) -> Self {
419        Self::RouteNotFound {
420            topic: topic.into(),
421        }
422    }
423
424    /// Create a route registration conflict error
425    #[inline]
426    pub fn route_registration_conflict(
427        broker_name: impl Into<String>,
428        reason: impl Into<String>,
429    ) -> Self {
430        Self::RouteRegistrationConflict {
431            broker_name: broker_name.into(),
432            reason: reason.into(),
433        }
434    }
435
436    /// Create a cluster not found error
437    #[inline]
438    pub fn cluster_not_found(cluster: impl Into<String>) -> Self {
439        Self::ClusterNotFound {
440            cluster: cluster.into(),
441        }
442    }
443
444    /// Create a request body invalid error
445    #[inline]
446    pub fn request_body_invalid(operation: &'static str, reason: impl Into<String>) -> Self {
447        Self::RequestBodyInvalid {
448            operation,
449            reason: reason.into(),
450        }
451    }
452
453    /// Create a request header error
454    #[inline]
455    pub fn request_header_error(message: impl Into<String>) -> Self {
456        Self::RequestHeaderError(message.into())
457    }
458
459    /// Create a response process failed error
460    #[inline]
461    pub fn response_process_failed(operation: &'static str, reason: impl Into<String>) -> Self {
462        Self::ResponseProcessFailed {
463            operation,
464            reason: reason.into(),
465        }
466    }
467
468    /// Add broker address context to broker operation error
469    pub fn with_broker_addr(self, addr: impl Into<String>) -> Self {
470        match self {
471            Self::BrokerOperationFailed {
472                operation,
473                code,
474                message,
475                broker_addr: _,
476            } => Self::BrokerOperationFailed {
477                operation,
478                code,
479                message,
480                broker_addr: Some(addr.into()),
481            },
482            other => other,
483        }
484    }
485
486    /// Create a validation error
487    #[inline]
488    pub fn validation_error(field: impl Into<String>, reason: impl Into<String>) -> Self {
489        Self::Tools(ToolsError::validation_error(field, reason))
490    }
491
492    /// Create a topic not found error (alias for TopicNotExist)
493    #[inline]
494    pub fn topic_not_found(topic: impl Into<String>) -> Self {
495        Self::Tools(ToolsError::topic_not_found(topic))
496    }
497
498    /// Create a topic already exists error
499    #[inline]
500    pub fn topic_already_exists(topic: impl Into<String>) -> Self {
501        Self::Tools(ToolsError::topic_already_exists(topic))
502    }
503
504    /// Create a nameserver unreachable error
505    #[inline]
506    pub fn nameserver_unreachable(addr: impl Into<String>) -> Self {
507        Self::Tools(ToolsError::nameserver_unreachable(addr))
508    }
509
510    /// Create a nameserver config invalid error
511    #[inline]
512    pub fn nameserver_config_invalid(reason: impl Into<String>) -> Self {
513        Self::Tools(ToolsError::nameserver_config_invalid(reason))
514    }
515}
516
517// ============================================================================
518// Error Conversion Implementations
519// ============================================================================
520
521impl From<std::str::Utf8Error> for RocketMQError {
522    #[inline]
523    fn from(e: std::str::Utf8Error) -> Self {
524        Self::Serialization(SerializationError::from(e))
525    }
526}
527
528#[cfg(feature = "with_serde")]
529impl From<serde_json::Error> for RocketMQError {
530    #[inline]
531    fn from(e: serde_json::Error) -> Self {
532        Self::Serialization(SerializationError::from(e))
533    }
534}
535
536#[cfg(feature = "with_config")]
537impl From<config::ConfigError> for RocketMQError {
538    fn from(e: config::ConfigError) -> Self {
539        Self::ConfigParseFailed {
540            key: "unknown",
541            reason: e.to_string(),
542        }
543    }
544}
545
546// ============================================================================
547// Service Error (moved from ServiceError)
548// ============================================================================
549
550/// Service lifecycle errors
551#[derive(Debug, Error)]
552pub enum ServiceError {
553    /// Service is already running
554    #[error("Service is already running")]
555    AlreadyRunning,
556
557    /// Service is not running
558    #[error("Service is not running")]
559    NotRunning,
560
561    /// Service startup failed
562    #[error("Service startup failed: {0}")]
563    StartupFailed(String),
564
565    /// Service shutdown failed
566    #[error("Service shutdown failed: {0}")]
567    ShutdownFailed(String),
568
569    /// Service operation timeout
570    #[error("Service operation timeout")]
571    Timeout,
572
573    /// Service interrupted
574    #[error("Service interrupted")]
575    Interrupted,
576}
577
578// ============================================================================
579// Type Aliases
580// ============================================================================
581
582/// Result type alias for RocketMQ operations
583///
584/// This is the standard result type used across all RocketMQ crates.
585///
586/// # Examples
587///
588/// ```rust
589/// use rocketmq_error::RocketMQResult;
590///
591/// fn send_message() -> RocketMQResult<()> {
592///     // ... operation
593///     Ok(())
594/// }
595/// ```
596pub type RocketMQResult<T> = std::result::Result<T, RocketMQError>;
597
598/// Alias for anyhow::Result for internal use
599///
600/// Use this for internal error handling where you don't need
601/// to expose specific error types to the public API.
602pub type Result<T> = anyhow::Result<T>;
603
604#[cfg(test)]
605mod tests {
606    use super::*;
607
608    #[test]
609    fn test_error_creation() {
610        let err = RocketMQError::network_connection_failed("127.0.0.1:9876", "timeout");
611        assert!(err.to_string().contains("Connection failed"));
612    }
613
614    #[test]
615    fn test_error_conversion() {
616        let io_err = io::Error::new(io::ErrorKind::NotFound, "file not found");
617        let rmq_err: RocketMQError = io_err.into();
618        assert!(matches!(rmq_err, RocketMQError::IO(_)));
619    }
620
621    #[test]
622    fn test_broker_operation_with_addr() {
623        let err = RocketMQError::broker_operation_failed("SEND_MESSAGE", 1, "failed")
624            .with_broker_addr("127.0.0.1:10911");
625
626        if let RocketMQError::BrokerOperationFailed { broker_addr, .. } = err {
627            assert_eq!(broker_addr, Some("127.0.0.1:10911".to_string()));
628        } else {
629            panic!("Expected BrokerOperationFailed");
630        }
631    }
632
633    #[test]
634    fn test_topic_not_exist() {
635        let err = RocketMQError::TopicNotExist {
636            topic: "TestTopic".to_string(),
637        };
638        assert_eq!(err.to_string(), "Topic 'TestTopic' does not exist");
639    }
640}