Skip to main content

rocketmq_error/
unified.rs

1// Copyright 2023 The RocketMQ Rust Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Unified error system for RocketMQ Rust implementation
16//!
17//! This module provides a centralized, semantic, and performant error handling system
18//! for all RocketMQ operations. All errors are categorized into logical groups for
19//! better debuggability and maintainability.
20
21mod network;
22mod protocol;
23mod rpc;
24mod serialization;
25mod tools;
26
27use std::io;
28
29// Re-export filter error
30pub use crate::filter_error::FilterError;
31
32pub use network::NetworkError;
33pub use protocol::ProtocolError;
34pub use rpc::RpcClientError;
35pub use serialization::SerializationError;
36use thiserror::Error;
37pub use tools::ToolsError;
38
39// Re-export auth error from the auth_error module
40pub use crate::auth_error::AuthError;
41// Re-export legacy error types for backward compatibility (will be deprecated)
42#[allow(deprecated)]
43pub use crate::client_error::*;
44#[allow(deprecated)]
45pub use crate::common_error::*;
46// Re-export controller error from the controller_error module
47pub use crate::controller_error::ControllerError;
48
49/// Main error type for all RocketMQ operations
50///
51/// This enum provides a unified error system across all RocketMQ crates.
52/// Each variant represents a logical category of errors with rich context information.
53///
54/// # Design Principles
55/// - **Semantic**: Each error clearly expresses what went wrong
56/// - **Performance**: Minimal heap allocations, use of &'static str where possible
57/// - **Debuggability**: Rich context for production debugging
58/// - **Ergonomics**: Automatic conversions via From trait
59///
60/// # Examples
61///
62/// ```rust
63/// use rocketmq_error::RocketMQError;
64/// use rocketmq_error::RocketMQResult;
65///
66/// fn send_message(addr: &str) -> RocketMQResult<()> {
67///     // Create a network error
68///     if addr.is_empty() {
69///         return Err(RocketMQError::network_connection_failed(
70///             "localhost:9876",
71///             "empty address",
72///         ));
73///     }
74///     Ok(())
75/// }
76///
77/// fn authenticate_user(username: &str) -> RocketMQResult<()> {
78///     // Create an authentication error
79///     if username.is_empty() {
80///         return Err(RocketMQError::user_not_found(""));
81///     }
82///     Ok(())
83/// }
84/// ```
85#[derive(Debug, Error)]
86pub enum RocketMQError {
87    // ============================================================================
88    // Network Errors
89    // ============================================================================
90    /// Network operation errors (connection, timeout, send/receive failures)
91    #[error(transparent)]
92    Network(#[from] NetworkError),
93
94    // ============================================================================
95    // Serialization Errors
96    // ============================================================================
97    /// Serialization/deserialization errors (encoding, decoding, format validation)
98    #[error(transparent)]
99    Serialization(#[from] SerializationError),
100
101    // ============================================================================
102    // Protocol Errors
103    // ============================================================================
104    /// RocketMQ protocol errors (invalid commands, version mismatch, etc.)
105    #[error(transparent)]
106    Protocol(#[from] ProtocolError),
107
108    // ============================================================================
109    // RPC Client Errors
110    // ============================================================================
111    /// RPC client specific errors (broker lookup, request failures, etc.)
112    #[error(transparent)]
113    Rpc(#[from] RpcClientError),
114
115    // ============================================================================
116    // Authentication Errors
117    // ============================================================================
118    /// Authentication/authorization errors (credential validation, access control, etc.)
119    #[error(transparent)]
120    Authentication(#[from] AuthError),
121
122    // ============================================================================
123    // Controller Errors
124    // ============================================================================
125    /// Controller operation errors (Raft consensus, leader election, broker management, etc.)
126    #[error(transparent)]
127    Controller(#[from] ControllerError),
128
129    // ============================================================================
130    // Message Property Errors
131    // ============================================================================
132    /// Invalid message property
133    #[error("Invalid message property: {0}")]
134    InvalidProperty(String),
135
136    // ============================================================================
137    // Broker Errors
138    // ============================================================================
139    /// Broker not found
140    #[error("Broker not found: {name}")]
141    BrokerNotFound { name: String },
142
143    /// Broker registration failed
144    #[error("Broker registration failed for '{name}': {reason}")]
145    BrokerRegistrationFailed { name: String, reason: String },
146
147    /// Broker operation failed with error code
148    #[error("Broker operation '{operation}' failed: code={code}, message={message}")]
149    BrokerOperationFailed {
150        operation: &'static str,
151        code: i32,
152        message: String,
153        broker_addr: Option<String>,
154    },
155
156    /// Topic does not exist
157    #[error("Topic '{topic}' does not exist")]
158    TopicNotExist { topic: String },
159
160    /// Queue does not exist
161    #[error("Queue does not exist: topic='{topic}', queue_id={queue_id}")]
162    QueueNotExist { topic: String, queue_id: i32 },
163
164    /// Subscription group not found
165    #[error("Subscription group '{group}' not found")]
166    SubscriptionGroupNotExist { group: String },
167
168    /// Queue ID out of range
169    #[error("Queue {queue_id} out of range (0-{max}) for topic '{topic}'")]
170    QueueIdOutOfRange { topic: String, queue_id: i32, max: i32 },
171
172    /// Message body too large
173    #[error("Message body length {actual} bytes exceeds limit {limit} bytes")]
174    MessageTooLarge { actual: usize, limit: usize },
175
176    /// Message validation failed
177    #[error("Message validation failed: {reason}")]
178    MessageValidationFailed { reason: String },
179
180    /// Retry limit exceeded
181    #[error("Retry limit {current}/{max} exceeded for group '{group}'")]
182    RetryLimitExceeded { group: String, current: i32, max: i32 },
183
184    /// Transaction message rejected
185    #[error("Transaction message rejected by broker policy")]
186    TransactionRejected,
187
188    /// Broker permission denied
189    #[error("Broker permission denied: {operation}")]
190    BrokerPermissionDenied { operation: String },
191
192    /// Not master broker
193    #[error("Not master broker, master address: {master_address}")]
194    NotMasterBroker { master_address: String },
195
196    /// Message lookup failed
197    #[error("Message lookup failed at offset {offset}")]
198    MessageLookupFailed { offset: i64 },
199
200    /// Topic sending forbidden
201    #[error("Sending to topic '{topic}' is forbidden")]
202    TopicSendingForbidden { topic: String },
203
204    /// Async task failed
205    #[error("Async task '{task}' failed: {context}")]
206    BrokerAsyncTaskFailed {
207        task: &'static str,
208        context: String,
209        #[source]
210        source: Box<dyn std::error::Error + Send + Sync>,
211    },
212
213    // ============================================================================
214    // Request/Response Errors
215    // ============================================================================
216    /// Request body missing or invalid
217    #[error("Request body {operation} failed: {reason}")]
218    RequestBodyInvalid { operation: &'static str, reason: String },
219
220    /// Request header missing or invalid
221    #[error("Request header error: {0}")]
222    RequestHeaderError(String),
223
224    /// Response encoding/decoding failed
225    #[error("Response {operation} failed: {reason}")]
226    ResponseProcessFailed { operation: &'static str, reason: String },
227
228    // ============================================================================
229    // NameServer/Route Errors
230    // ============================================================================
231    /// Route information not found
232    #[error("Route information not found for topic '{topic}'")]
233    RouteNotFound { topic: String },
234
235    /// Route data inconsistency detected
236    #[error("Route data inconsistency detected for topic '{topic}': {reason}")]
237    RouteInconsistent { topic: String, reason: String },
238
239    /// Broker registration conflict
240    #[error("Broker registration conflict for '{broker_name}': {reason}")]
241    RouteRegistrationConflict { broker_name: String, reason: String },
242
243    /// Route state version conflict
244    #[error("Route state version conflict: expected={expected}, actual={actual}")]
245    RouteVersionConflict { expected: u64, actual: u64 },
246
247    /// Cluster not found
248    #[error("Cluster '{cluster}' not found")]
249    ClusterNotFound { cluster: String },
250
251    // ============================================================================
252    // Client Errors
253    // ============================================================================
254    /// Client not started
255    #[error("Client is not started")]
256    ClientNotStarted,
257
258    /// Client already started
259    #[error("Client is already started")]
260    ClientAlreadyStarted,
261
262    /// Client is shutting down
263    #[error("Client is shutting down")]
264    ClientShuttingDown,
265
266    /// Invalid client state
267    #[error("Invalid client state: expected {expected}, got {actual}")]
268    ClientInvalidState { expected: &'static str, actual: String },
269
270    /// Producer not available
271    #[error("Producer is not available")]
272    ProducerNotAvailable,
273
274    /// Consumer not available
275    #[error("Consumer is not available")]
276    ConsumerNotAvailable,
277
278    // ============================================================================
279    // Tools/Admin Errors
280    // ============================================================================
281    /// Tools and admin operation errors
282    #[error(transparent)]
283    Tools(#[from] ToolsError),
284
285    // ============================================================================
286    // Filter Errors
287    // ============================================================================
288    /// Bloom filter and bit array operation errors
289    #[error(transparent)]
290    Filter(#[from] FilterError),
291
292    // ============================================================================
293    // Storage Errors
294    // ============================================================================
295    /// Storage read failed
296    #[error("Storage read failed for '{path}': {reason}")]
297    StorageReadFailed { path: String, reason: String },
298
299    /// Storage write failed
300    #[error("Storage write failed for '{path}': {reason}")]
301    StorageWriteFailed { path: String, reason: String },
302
303    /// Data corruption detected
304    #[error("Corrupted data detected in '{path}'")]
305    StorageCorrupted { path: String },
306
307    /// Out of storage space
308    #[error("Out of storage space: {path}")]
309    StorageOutOfSpace { path: String },
310
311    /// Storage lock failed
312    #[error("Failed to acquire lock for '{path}'")]
313    StorageLockFailed { path: String },
314
315    // ============================================================================
316    // Configuration Errors
317    // ============================================================================
318    /// Configuration parsing failed
319    #[error("Configuration parse error for '{key}': {reason}")]
320    ConfigParseFailed { key: &'static str, reason: String },
321
322    /// Required configuration missing
323    #[error("Required configuration '{key}' is missing")]
324    ConfigMissing { key: &'static str },
325
326    /// Invalid configuration value
327    #[error("Invalid configuration for '{key}': value='{value}', reason={reason}")]
328    ConfigInvalidValue {
329        key: &'static str,
330        value: String,
331        reason: String,
332    },
333
334    // ============================================================================
335    // Controller/Raft Errors
336    // ============================================================================
337    /// Not the Raft leader
338    #[error("Not leader, current leader is: {}", leader_id.map(|id| id.to_string()).unwrap_or_else(|| "unknown".to_string()))]
339    ControllerNotLeader { leader_id: Option<u64> },
340
341    /// Raft consensus error
342    #[error("Raft consensus error: {reason}")]
343    ControllerRaftError { reason: String },
344
345    /// Consensus operation timeout
346    #[error("Consensus operation '{operation}' timed out after {timeout_ms}ms")]
347    ControllerConsensusTimeout { operation: &'static str, timeout_ms: u64 },
348
349    /// Snapshot operation failed
350    #[error("Snapshot operation failed: {reason}")]
351    ControllerSnapshotFailed { reason: String },
352
353    // ============================================================================
354    // System Errors
355    // ============================================================================
356    /// IO error from std::io
357    #[error("IO error: {0}")]
358    IO(#[from] io::Error),
359
360    /// Illegal argument
361    #[error("Illegal argument: {0}")]
362    IllegalArgument(String),
363
364    /// Operation timeout
365    #[error("Operation '{operation}' timed out after {timeout_ms}ms")]
366    Timeout { operation: &'static str, timeout_ms: u64 },
367
368    /// Internal error (should be rare)
369    #[error("Internal error: {0}")]
370    Internal(String),
371
372    /// Service lifecycle error
373    #[error("Service error: {0}")]
374    Service(#[from] ServiceError),
375
376    // ============================================================================
377    // Version Errors
378    // ============================================================================
379    /// Invalid RocketMQ version ordinal value
380    #[error("Invalid RocketMQ version ordinal: {0}")]
381    InvalidVersionOrdinal(u32),
382
383    // ============================================================================
384    // Legacy Errors (Deprecated - for backward compatibility)
385    // ============================================================================
386    /// Legacy error - use specific error types instead
387    #[deprecated(since = "0.7.0", note = "Use specific error types instead")]
388    #[error("{0}")]
389    Legacy(String),
390
391    #[error("Not initialized: {0}")]
392    NotInitialized(String),
393
394    #[error("Message is missing required property: {property}")]
395    MissingRequiredMessageProperty { property: &'static str },
396}
397
398// ============================================================================
399// Convenience Constructors
400// ============================================================================
401
402impl RocketMQError {
403    /// Create a network connection failed error
404    #[inline]
405    pub fn network_connection_failed(addr: impl Into<String>, reason: impl Into<String>) -> Self {
406        Self::Network(NetworkError::connection_failed(addr, reason))
407    }
408
409    /// Create a network timeout error
410    #[inline]
411    pub fn network_timeout(addr: impl Into<String>, timeout: std::time::Duration) -> Self {
412        Self::Network(NetworkError::request_timeout(addr, timeout.as_millis() as u64))
413    }
414
415    /// Create a network request failed error
416    #[inline]
417    pub fn network_request_failed(addr: impl Into<String>, reason: impl Into<String>) -> Self {
418        Self::Network(NetworkError::send_failed(addr, reason))
419    }
420
421    /// Create a deserialization failed error
422    #[inline]
423    pub fn deserialization_failed(format: &'static str, reason: impl Into<String>) -> Self {
424        Self::Serialization(SerializationError::decode_failed(format, reason))
425    }
426
427    /// Create a validation failed error
428    #[inline]
429    pub fn validation_failed(field: impl Into<String>, reason: impl Into<String>) -> Self {
430        Self::Tools(ToolsError::validation_error(field, reason))
431    }
432
433    /// Create a broker operation failed error
434    #[inline]
435    pub fn broker_operation_failed(operation: &'static str, code: i32, message: impl Into<String>) -> Self {
436        Self::BrokerOperationFailed {
437            operation,
438            code,
439            message: message.into(),
440            broker_addr: None,
441        }
442    }
443
444    /// Create a storage read failed error
445    #[inline]
446    pub fn storage_read_failed(path: impl Into<String>, reason: impl Into<String>) -> Self {
447        Self::StorageReadFailed {
448            path: path.into(),
449            reason: reason.into(),
450        }
451    }
452
453    /// Create a storage write failed error
454    #[inline]
455    pub fn storage_write_failed(path: impl Into<String>, reason: impl Into<String>) -> Self {
456        Self::StorageWriteFailed {
457            path: path.into(),
458            reason: reason.into(),
459        }
460    }
461
462    /// Create an illegal argument error
463    #[inline]
464    pub fn illegal_argument(message: impl Into<String>) -> Self {
465        Self::IllegalArgument(message.into())
466    }
467
468    /// Create a route not found error
469    #[inline]
470    pub fn route_not_found(topic: impl Into<String>) -> Self {
471        Self::RouteNotFound { topic: topic.into() }
472    }
473
474    /// Create a route registration conflict error
475    #[inline]
476    pub fn route_registration_conflict(broker_name: impl Into<String>, reason: impl Into<String>) -> Self {
477        Self::RouteRegistrationConflict {
478            broker_name: broker_name.into(),
479            reason: reason.into(),
480        }
481    }
482
483    /// Create a cluster not found error
484    #[inline]
485    pub fn cluster_not_found(cluster: impl Into<String>) -> Self {
486        Self::ClusterNotFound {
487            cluster: cluster.into(),
488        }
489    }
490
491    /// Create a request body invalid error
492    #[inline]
493    pub fn request_body_invalid(operation: &'static str, reason: impl Into<String>) -> Self {
494        Self::RequestBodyInvalid {
495            operation,
496            reason: reason.into(),
497        }
498    }
499
500    /// Create a request header error
501    #[inline]
502    pub fn request_header_error(message: impl Into<String>) -> Self {
503        Self::RequestHeaderError(message.into())
504    }
505
506    /// Create a response process failed error
507    #[inline]
508    pub fn response_process_failed(operation: &'static str, reason: impl Into<String>) -> Self {
509        Self::ResponseProcessFailed {
510            operation,
511            reason: reason.into(),
512        }
513    }
514
515    /// Add broker address context to broker operation error
516    pub fn with_broker_addr(self, addr: impl Into<String>) -> Self {
517        match self {
518            Self::BrokerOperationFailed {
519                operation,
520                code,
521                message,
522                broker_addr: _,
523            } => Self::BrokerOperationFailed {
524                operation,
525                code,
526                message,
527                broker_addr: Some(addr.into()),
528            },
529            other => other,
530        }
531    }
532
533    /// Create a validation error
534    #[inline]
535    pub fn validation_error(field: impl Into<String>, reason: impl Into<String>) -> Self {
536        Self::Tools(ToolsError::validation_error(field, reason))
537    }
538
539    /// Create a topic not found error (alias for TopicNotExist)
540    #[inline]
541    pub fn topic_not_found(topic: impl Into<String>) -> Self {
542        Self::Tools(ToolsError::topic_not_found(topic))
543    }
544
545    /// Create a topic already exists error
546    #[inline]
547    pub fn topic_already_exists(topic: impl Into<String>) -> Self {
548        Self::Tools(ToolsError::topic_already_exists(topic))
549    }
550
551    /// Create a nameserver unreachable error
552    #[inline]
553    pub fn nameserver_unreachable(addr: impl Into<String>) -> Self {
554        Self::Tools(ToolsError::nameserver_unreachable(addr))
555    }
556
557    /// Create a nameserver config invalid error
558    #[inline]
559    pub fn nameserver_config_invalid(reason: impl Into<String>) -> Self {
560        Self::Tools(ToolsError::nameserver_config_invalid(reason))
561    }
562
563    /// Create a not initialized error
564    #[inline]
565    pub fn not_initialized(reason: impl Into<String>) -> Self {
566        Self::NotInitialized(reason.into())
567    }
568
569    // ============================================================================
570    // Authentication Error Constructors
571    // ============================================================================
572
573    /// Create an authentication failed error
574    #[inline]
575    pub fn authentication_failed(reason: impl Into<String>) -> Self {
576        Self::Authentication(AuthError::AuthenticationFailed(reason.into()))
577    }
578
579    /// Create an invalid credential error
580    #[inline]
581    pub fn invalid_credential(reason: impl Into<String>) -> Self {
582        Self::Authentication(AuthError::InvalidCredential(reason.into()))
583    }
584
585    /// Create a user not found error
586    #[inline]
587    pub fn user_not_found(username: impl Into<String>) -> Self {
588        Self::Authentication(AuthError::UserNotFound(username.into()))
589    }
590
591    /// Create an invalid signature error
592    #[inline]
593    pub fn invalid_signature(reason: impl Into<String>) -> Self {
594        Self::Authentication(AuthError::InvalidSignature(reason.into()))
595    }
596
597    // ============================================================================
598    // Controller Error Constructors
599    // ============================================================================
600
601    /// Create a controller not leader error
602    #[inline]
603    pub fn controller_not_leader(leader_id: Option<u64>) -> Self {
604        Self::Controller(ControllerError::NotLeader { leader_id })
605    }
606
607    /// Create a controller Raft error
608    #[inline]
609    pub fn controller_raft_error(reason: impl Into<String>) -> Self {
610        Self::Controller(ControllerError::Raft(reason.into()))
611    }
612
613    /// Create a controller metadata not found error
614    #[inline]
615    pub fn controller_metadata_not_found(key: impl Into<String>) -> Self {
616        Self::Controller(ControllerError::MetadataNotFound { key: key.into() })
617    }
618
619    /// Create a controller invalid request error
620    #[inline]
621    pub fn controller_invalid_request(reason: impl Into<String>) -> Self {
622        Self::Controller(ControllerError::InvalidRequest(reason.into()))
623    }
624
625    /// Create a controller timeout error
626    #[inline]
627    pub fn controller_timeout(timeout_ms: u64) -> Self {
628        Self::Controller(ControllerError::Timeout { timeout_ms })
629    }
630
631    /// Create a controller shutdown error
632    #[inline]
633    pub fn controller_shutdown() -> Self {
634        Self::Controller(ControllerError::Shutdown)
635    }
636
637    // ============================================================================
638    // Filter Error Constructors
639    // ============================================================================
640
641    /// Create an empty bytes error
642    #[inline]
643    pub fn filter_empty_bytes() -> Self {
644        Self::Filter(FilterError::empty_bytes())
645    }
646
647    /// Create an invalid bit length error
648    #[inline]
649    pub fn filter_invalid_bit_length() -> Self {
650        Self::Filter(FilterError::invalid_bit_length())
651    }
652
653    /// Create a bit length too small error
654    #[inline]
655    pub fn filter_bit_length_too_small() -> Self {
656        Self::Filter(FilterError::bit_length_too_small())
657    }
658
659    /// Create a bit position out of bounds error
660    #[inline]
661    pub fn filter_bit_position_out_of_bounds(pos: usize, max: usize) -> Self {
662        Self::Filter(FilterError::bit_position_out_of_bounds(pos, max))
663    }
664
665    /// Create a byte position out of bounds error
666    #[inline]
667    pub fn filter_byte_position_out_of_bounds(pos: usize, max: usize) -> Self {
668        Self::Filter(FilterError::byte_position_out_of_bounds(pos, max))
669    }
670
671    /// Create an uninitialized error
672    #[inline]
673    pub fn filter_uninitialized() -> Self {
674        Self::Filter(FilterError::uninitialized())
675    }
676}
677
678// ============================================================================
679// Error Conversion Implementations
680// ============================================================================
681
682impl From<std::str::Utf8Error> for RocketMQError {
683    #[inline]
684    fn from(e: std::str::Utf8Error) -> Self {
685        Self::Serialization(SerializationError::from(e))
686    }
687}
688
689#[cfg(feature = "with_serde")]
690impl From<serde_json::Error> for RocketMQError {
691    #[inline]
692    fn from(e: serde_json::Error) -> Self {
693        Self::Serialization(SerializationError::from(e))
694    }
695}
696
697#[cfg(feature = "with_config")]
698impl From<config::ConfigError> for RocketMQError {
699    fn from(e: config::ConfigError) -> Self {
700        Self::ConfigParseFailed {
701            key: "unknown",
702            reason: e.to_string(),
703        }
704    }
705}
706
707// ============================================================================
708// Service Error (moved from ServiceError)
709// ============================================================================
710
711/// Service lifecycle errors
712#[derive(Debug, Error)]
713pub enum ServiceError {
714    /// Service is already running
715    #[error("Service is already running")]
716    AlreadyRunning,
717
718    /// Service is not running
719    #[error("Service is not running")]
720    NotRunning,
721
722    /// Service startup failed
723    #[error("Service startup failed: {0}")]
724    StartupFailed(String),
725
726    /// Service shutdown failed
727    #[error("Service shutdown failed: {0}")]
728    ShutdownFailed(String),
729
730    /// Service operation timeout
731    #[error("Service operation timeout")]
732    Timeout,
733
734    /// Service interrupted
735    #[error("Service interrupted")]
736    Interrupted,
737}
738
739// ============================================================================
740// Type Aliases
741// ============================================================================
742
743/// Result type alias for RocketMQ operations
744///
745/// This is the standard result type used across all RocketMQ crates.
746///
747/// # Examples
748///
749/// ```rust
750/// use rocketmq_error::RocketMQResult;
751///
752/// fn send_message() -> RocketMQResult<()> {
753///     // ... operation
754///     Ok(())
755/// }
756/// ```
757pub type RocketMQResult<T> = std::result::Result<T, RocketMQError>;
758
759/// Alias for anyhow::Result for internal use
760///
761/// Use this for internal error handling where you don't need
762/// to expose specific error types to the public API.
763pub type Result<T> = anyhow::Result<T>;
764
765#[cfg(test)]
766mod tests {
767    use super::*;
768
769    #[test]
770    fn test_error_creation() {
771        let err = RocketMQError::network_connection_failed("127.0.0.1:9876", "timeout");
772        assert!(err.to_string().contains("Connection failed"));
773    }
774
775    #[test]
776    fn test_error_conversion() {
777        let io_err = io::Error::new(io::ErrorKind::NotFound, "file not found");
778        let rmq_err: RocketMQError = io_err.into();
779        assert!(matches!(rmq_err, RocketMQError::IO(_)));
780    }
781
782    #[test]
783    fn test_broker_operation_with_addr() {
784        let err =
785            RocketMQError::broker_operation_failed("SEND_MESSAGE", 1, "failed").with_broker_addr("127.0.0.1:10911");
786
787        if let RocketMQError::BrokerOperationFailed { broker_addr, .. } = err {
788            assert_eq!(broker_addr, Some("127.0.0.1:10911".to_string()));
789        } else {
790            panic!("Expected BrokerOperationFailed");
791        }
792    }
793
794    #[test]
795    fn test_topic_not_exist() {
796        let err = RocketMQError::TopicNotExist {
797            topic: "TestTopic".to_string(),
798        };
799        assert_eq!(err.to_string(), "Topic 'TestTopic' does not exist");
800    }
801}