Skip to main content

rivven_cluster/
error.rs

1//! Cluster error types
2
3use std::net::SocketAddr;
4use thiserror::Error;
5
6/// Result type for cluster operations
7pub type Result<T> = std::result::Result<T, ClusterError>;
8
9/// Cluster errors
10#[derive(Debug, Error)]
11pub enum ClusterError {
12    // ==================== Configuration Errors ====================
13    #[error("invalid configuration: {0}")]
14    InvalidConfig(String),
15
16    #[error("node ID conflict: {0} already exists")]
17    NodeIdConflict(String),
18
19    // ==================== Membership Errors ====================
20    #[error("node not found: {0}")]
21    NodeNotFound(String),
22
23    #[error("node unreachable: {addr}")]
24    NodeUnreachable { addr: SocketAddr },
25
26    #[error("node failed: {node_id}")]
27    NodeFailed { node_id: String },
28
29    #[error("no seed nodes available")]
30    NoSeedNodes,
31
32    #[error("cluster join failed: {0}")]
33    JoinFailed(String),
34
35    #[error("cluster not ready: need {required} nodes, have {current}")]
36    ClusterNotReady { required: usize, current: usize },
37
38    // ==================== Consensus Errors ====================
39    #[error("not leader: current leader is {leader:?}")]
40    NotLeader { leader: Option<String> },
41
42    #[error("leader election in progress")]
43    LeaderElectionInProgress,
44
45    #[error("raft error: {0}")]
46    Raft(String),
47
48    #[error("raft storage error: {0}")]
49    RaftStorage(String),
50
51    #[error("proposal timeout")]
52    ProposalTimeout,
53
54    #[error("proposal rejected: {0}")]
55    ProposalRejected(String),
56
57    // ==================== Topic/Partition Errors ====================
58    #[error("topic not found: {0}")]
59    TopicNotFound(String),
60
61    #[error("topic already exists: {0}")]
62    TopicAlreadyExists(String),
63
64    #[error("partition not found: {topic}/{partition}")]
65    PartitionNotFound { topic: String, partition: u32 },
66
67    #[error("partition leader not found: {topic}/{partition}")]
68    PartitionLeaderNotFound { topic: String, partition: u32 },
69
70    #[error("invalid partition count: {0}")]
71    InvalidPartitionCount(u32),
72
73    #[error("invalid replication factor: {factor} (have {nodes} nodes)")]
74    InvalidReplicationFactor { factor: u16, nodes: usize },
75
76    // ==================== Replication Errors ====================
77    #[error("not enough ISR: need {required}, have {current}")]
78    NotEnoughIsr { required: u16, current: u16 },
79
80    #[error("replication failed: {0}")]
81    ReplicationFailed(String),
82
83    #[error("replica lag exceeded: {node_id} lag {lag_messages} messages")]
84    ReplicaLagExceeded { node_id: String, lag_messages: u64 },
85
86    #[error("high watermark not advanced")]
87    HighWatermarkStalled,
88
89    // ==================== Protocol Errors ====================
90    #[error("protocol error: {0}")]
91    Protocol(String),
92
93    #[error("invalid message: {0}")]
94    InvalidMessage(String),
95
96    #[error("message too large: {size} bytes (max {max})")]
97    MessageTooLarge { size: usize, max: usize },
98
99    #[error("serialization error: {0}")]
100    Serialization(String),
101
102    #[error("deserialization error: {0}")]
103    Deserialization(String),
104
105    // ==================== Network Errors ====================
106    #[error("connection failed: {0}")]
107    ConnectionFailed(String),
108
109    #[error("connection closed")]
110    ConnectionClosed,
111
112    #[error("request timeout")]
113    Timeout,
114
115    #[error("network error: {0}")]
116    Network(String),
117
118    // ==================== Storage Errors ====================
119    #[error("storage error: {0}")]
120    Storage(String),
121
122    #[error("corrupt data: {0}")]
123    CorruptData(String),
124
125    #[error("io error: {0}")]
126    Io(#[from] std::io::Error),
127
128    // ==================== Crypto/TLS Errors ====================
129    #[error("crypto error: {0}")]
130    CryptoError(String),
131
132    /// Raft RPC authentication failure
133    #[error("unauthorized: {0}")]
134    Unauthorized(String),
135
136    // ==================== Internal Errors ====================
137    #[error("internal error: {0}")]
138    Internal(String),
139
140    #[error("channel closed")]
141    ChannelClosed,
142
143    #[error("shutdown in progress")]
144    ShuttingDown,
145}
146
147impl ClusterError {
148    /// Check if this error is retriable
149    pub fn is_retriable(&self) -> bool {
150        matches!(
151            self,
152            ClusterError::NodeUnreachable { .. }
153                | ClusterError::Timeout
154                | ClusterError::LeaderElectionInProgress
155                | ClusterError::NotLeader { .. }
156                | ClusterError::ClusterNotReady { .. }
157                | ClusterError::NotEnoughIsr { .. }
158                | ClusterError::Network(_)
159        )
160    }
161
162    /// Check if this error indicates the node should redirect to leader
163    pub fn should_redirect(&self) -> bool {
164        matches!(self, ClusterError::NotLeader { leader: Some(_) })
165    }
166
167    /// Get the leader address if this is a NotLeader error
168    pub fn leader(&self) -> Option<&str> {
169        match self {
170            ClusterError::NotLeader { leader } => leader.as_deref(),
171            _ => None,
172        }
173    }
174
175    /// Check if this is a fatal error requiring shutdown
176    pub fn is_fatal(&self) -> bool {
177        matches!(
178            self,
179            ClusterError::CorruptData(_) | ClusterError::RaftStorage(_)
180        )
181    }
182}
183
184// Conversion from channel errors
185impl<T> From<tokio::sync::mpsc::error::SendError<T>> for ClusterError {
186    fn from(_: tokio::sync::mpsc::error::SendError<T>) -> Self {
187        ClusterError::ChannelClosed
188    }
189}
190
191impl From<tokio::sync::oneshot::error::RecvError> for ClusterError {
192    fn from(_: tokio::sync::oneshot::error::RecvError) -> Self {
193        ClusterError::ChannelClosed
194    }
195}
196
197// Conversion from postcard for serialization
198impl From<postcard::Error> for ClusterError {
199    fn from(e: postcard::Error) -> Self {
200        ClusterError::Serialization(e.to_string())
201    }
202}
203
204#[cfg(test)]
205mod tests {
206    use super::*;
207
208    #[test]
209    fn test_retriable_errors() {
210        assert!(ClusterError::Timeout.is_retriable());
211        assert!(ClusterError::LeaderElectionInProgress.is_retriable());
212        assert!(!ClusterError::TopicNotFound("test".into()).is_retriable());
213        assert!(!ClusterError::CorruptData("bad".into()).is_retriable());
214    }
215
216    #[test]
217    fn test_redirect_to_leader() {
218        let err = ClusterError::NotLeader {
219            leader: Some("node-1".into()),
220        };
221        assert!(err.should_redirect());
222        assert_eq!(err.leader(), Some("node-1"));
223
224        let err = ClusterError::NotLeader { leader: None };
225        assert!(!err.should_redirect());
226    }
227
228    #[test]
229    fn test_fatal_errors() {
230        assert!(ClusterError::CorruptData("bad crc".into()).is_fatal());
231        assert!(ClusterError::RaftStorage("disk full".into()).is_fatal());
232        assert!(!ClusterError::Timeout.is_fatal());
233    }
234}