d_engine/
errors.rs

1//! Raft Consensus Protocol Error Hierarchy
2//!
3//! Defines comprehensive error types for a Raft-based distributed system,
4//! categorized by protocol layer and operational concerns.
5
6use config::ConfigError;
7use std::path::PathBuf;
8use std::time::Duration;
9use tokio::task::JoinError;
10
11#[doc(hidden)]
12pub type Result<T> = std::result::Result<T, Error>;
13
14#[derive(Debug, thiserror::Error)]
15pub enum Error {
16    /// Infrastructure-level failures (network, storage, serialization)
17    #[error(transparent)]
18    System(#[from] SystemError),
19
20    /// Cluster configuration validation failures
21    #[error(transparent)]
22    Config(#[from] ConfigError),
23
24    /// Raft consensus protocol violations and failures
25    #[error(transparent)]
26    Consensus(#[from] ConsensusError),
27
28    /// Unrecoverable failures requiring process termination
29    #[error("Fatal error: {0}")]
30    Fatal(String),
31}
32
33#[derive(Debug, thiserror::Error)]
34pub enum ConsensusError {
35    /// Illegal Raft node state transitions
36    #[error(transparent)]
37    StateTransition(#[from] StateTransitionError),
38
39    /// Leader election failures (Section 5.2 Raft paper)
40    #[error(transparent)]
41    Election(#[from] ElectionError),
42
43    /// Log replication failures (Section 5.3 Raft paper)
44    #[error(transparent)]
45    Replication(#[from] ReplicationError),
46
47    /// Cluster membership change failures (Section 6 Raft paper)
48    #[error(transparent)]
49    Membership(#[from] MembershipError),
50
51    /// Snapshot-related errors during installation or restoration
52    #[error(transparent)]
53    Snapshot(#[from] SnapshotError),
54
55    /// Role permission conflict error
56    #[error("Operation requires {required_role} role but current role is {current_role}")]
57    RoleViolation {
58        current_role: &'static str,
59        required_role: &'static str,
60        context: String,
61    },
62}
63
64#[derive(Debug, thiserror::Error)]
65#[doc(hidden)]
66pub enum StateTransitionError {
67    #[error("Not enough votes to transition to leader.")]
68    NotEnoughVotes,
69
70    #[error("Invalid state transition.")]
71    InvalidTransition,
72
73    #[error("Lock error.")]
74    LockError,
75}
76
77#[derive(Debug, thiserror::Error)]
78pub enum NetworkError {
79    /// Endpoint unavailable (HTTP 503 equivalent)
80    #[error("Service unavailable: {0}")]
81    ServiceUnavailable(String),
82
83    /// Peer communication timeout
84    #[error("Connection timeout to {node_id} after {duration:?}")]
85    Timeout { node_id: u32, duration: Duration },
86
87    /// Unreachable node with source context
88    #[error("Network unreachable: {source}")]
89    Unreachable {
90        source: Box<dyn std::error::Error + Send + Sync>,
91    },
92
93    /// Persistent connection failures
94    #[error("Socket connect failed error: {0}")]
95    ConnectError(String),
96
97    /// Retry policy exhaustion
98    #[error("Retry timeout after {0:?}")]
99    RetryTimeoutError(Duration),
100
101    /// TLS negotiation failures
102    #[error("TLS handshake failed")]
103    TlsHandshakeFailure,
104
105    /// Missing peer list for RPC
106    #[error("Request list for {request_type} contains no peers")]
107    EmptyPeerList { request_type: &'static str },
108
109    /// Peer connection not found
110    #[error("Peer({0}) connection not found")]
111    PeerConnectionNotFound(u32),
112
113    /// Peer address not found
114    #[error("Peer({0}) address not found")]
115    PeerAddressNotFound(u32),
116
117    /// Malformed node addresses
118    #[error("Invalid URI format: {0}")]
119    InvalidURI(String),
120
121    /// RPC transmission failures with context
122    #[error("Failed to send {request_type} request")]
123    RequestSendFailure {
124        request_type: &'static str,
125        #[source]
126        source: Box<tonic::transport::Error>,
127    },
128
129    /// Low-level TCP configuration errors
130    #[error("TCP keepalive configuration error")]
131    TcpKeepaliveError,
132
133    /// HTTP/2 protocol configuration errors
134    #[error("HTTP/2 keepalive configuration error")]
135    Http2KeepaliveError,
136
137    /// gRPC transport layer errors
138    #[error(transparent)]
139    TonicError(#[from] Box<tonic::transport::Error>),
140
141    /// gRPC status code errors
142    #[error(transparent)]
143    TonicStatusError(#[from] Box<tonic::Status>),
144
145    #[error("Failed to send read request: {0}")]
146    ReadSend(#[from] ReadSendError),
147
148    #[error("Failed to send write request: {0}")]
149    WriteSend(#[from] WriteSendError),
150
151    #[error("Background task failed: {0}")]
152    TaskFailed(#[from] JoinError),
153
154    #[error("{0}")]
155    TaskBackoffFailed(String),
156
157    #[error("{0}")]
158    SingalSendFailed(String),
159
160    #[error("{0}")]
161    SingalReceiveFailed(String),
162
163    // #[error("Install snapshot RPC request been rejected, last_chunk={last_chunk}")]
164    // SnapshotRejected { last_chunk: u32 },
165
166    // #[error("Install snapshot RPC request failed")]
167    // SnapshotTransferFailed,
168    #[error("New node join cluster failed: {0}")]
169    JoinFailed(String),
170
171    #[error("Network timeout: {0}")]
172    GlobalTimeout(String),
173}
174
175#[derive(Debug, thiserror::Error)]
176pub enum StorageError {
177    /// Disk I/O failures during log/snapshot operations
178    #[error(transparent)]
179    IoError(#[from] std::io::Error),
180
181    /// Custom error with a path as a string slice (`&str`)
182    #[error("Error occurred at path: {path}")]
183    PathError {
184        path: PathBuf, // Use &str for lightweight references
185        source: std::io::Error,
186    },
187
188    /// Serialization failures for persisted data
189    #[error(transparent)]
190    BincodeError(#[from] bincode::Error),
191
192    /// State machine application errors
193    #[error("State Machine error: {0}")]
194    StateMachineError(String),
195
196    /// Log storage subsystem failures
197    #[error("Log storage failure: {0}")]
198    LogStorage(String),
199
200    // /// Snapshot creation/restoration failures
201    // #[error("Snapshot operation failed: {0}")]
202    // Snapshot(String),
203    /// Checksum validation failures
204    #[error("Data corruption detected at {location}")]
205    DataCorruption { location: String },
206
207    /// Configuration storage failures
208    #[error("Configuration storage error: {0}")]
209    ConfigStorage(String),
210
211    /// Embedded database errors
212    #[error("Embedded database error: {0}")]
213    DbError(String),
214
215    /// Error type for value conversion operations
216    #[error("Value convert failed")]
217    Convert(#[from] ConvertError),
218
219    /// File errors
220    #[error("File errors")]
221    File(#[from] FileError),
222
223    /// Serialization error
224    #[error("Serialization error: {0}")]
225    SerializationError(String),
226
227    /// ID allocation errors
228    #[error(transparent)]
229    IdAllocation(#[from] IdAllocationError),
230}
231
232#[derive(Debug, thiserror::Error)]
233pub enum IdAllocationError {
234    /// ID allocation overflow
235    #[error("ID allocation overflow: {start} > {end}")]
236    Overflow { start: u64, end: u64 },
237
238    /// Invalid ID range
239    #[error("Invalid ID range: {start}..={end}")]
240    InvalidRange { start: u64, end: u64 },
241
242    /// No available IDs
243    #[error("No available IDs")]
244    NoIdsAvailable,
245}
246
247#[derive(Debug, thiserror::Error)]
248pub enum FileError {
249    #[error("Path does not exist: {0}")]
250    NotFound(String),
251    #[error("Path is a directory: {0}")]
252    IsDirectory(String),
253    #[error("File is busy: {0}")]
254    Busy(String),
255    #[error("Insufficient permissions: {0}")]
256    PermissionDenied(String),
257    #[error("File is occupied: {0}")]
258    FileBusy(String),
259    #[error("Invalid path: {0}")]
260    InvalidPath(String),
261    #[error("Too small: {0}")]
262    TooSmall(u64),
263    #[error("Invalid extension: {0}")]
264    InvalidExt(String),
265    #[error("Invalid GZIP header: {0}")]
266    InvalidGzipHeader(String),
267    #[error("Unknown IO error: {0}")]
268    UnknownIo(String),
269}
270
271/// Error type for value conversion operations
272#[derive(Debug, thiserror::Error)]
273pub enum ConvertError {
274    /// Invalid input length error
275    ///
276    /// This occurs when the input byte slice length doesn't match the required 8 bytes.
277    #[error("invalid byte length: expected 8 bytes, received {0} bytes")]
278    InvalidLength(usize),
279
280    /// Generic conversion failure with detailed message
281    ///
282    /// Wraps underlying parsing/conversion errors with context information
283    #[error("conversion failure: {0}")]
284    ConversionFailure(String),
285}
286
287#[derive(Debug, thiserror::Error)]
288pub enum ReadSendError {
289    #[error("Network timeout")]
290    Timeout(#[from] tokio::time::error::Elapsed),
291
292    #[error("Connection failed")]
293    Connection(#[from] tonic::transport::Error),
294}
295
296#[derive(Debug, thiserror::Error)]
297pub enum WriteSendError {
298    #[error("Not cluster leader")]
299    NotLeader,
300
301    #[error("Network unreachable")]
302    Unreachable,
303
304    #[error("Payload too large")]
305    PayloadExceeded,
306}
307
308#[derive(Debug, thiserror::Error)]
309pub enum SystemError {
310    // Network layer
311    #[error("Network error: {0}")]
312    Network(#[from] NetworkError),
313
314    // Storage layer
315    #[error("Storage operation failed")]
316    Storage(#[from] StorageError),
317
318    //Serialization
319    #[error("Serialization error")]
320    Serialization(#[from] SerializationError),
321
322    /// Protocol buffer encoding/decoding specific errors
323    #[error("Protobuf operation failed: {0}")]
324    Prost(#[from] ProstError),
325
326    // Basic node operations
327    #[error("Node failed to start: {0}")]
328    NodeStartFailed(String),
329
330    #[error("General server error: {0}")]
331    GeneralServer(String),
332
333    #[error("Internal server error")]
334    ServerUnavailable,
335}
336
337// Serialization is classified separately (across protocol layers and system layers)
338#[derive(Debug, thiserror::Error)]
339pub enum SerializationError {
340    #[error("Bincode serialization failed: {0}")]
341    Bincode(#[from] bincode::Error),
342}
343
344/// Wrapper for prost encoding/decoding errors
345#[derive(Debug, thiserror::Error)]
346pub enum ProstError {
347    #[error("Encoding failed: {0}")]
348    Encode(#[from] prost::EncodeError),
349
350    #[error("Decoding failed: {0}")]
351    Decode(#[from] prost::DecodeError),
352}
353
354#[derive(Debug, thiserror::Error)]
355pub enum ElectionError {
356    /// General election process failure
357    #[error("Election failed: {0}")]
358    Failed(String),
359
360    /// Stale term detection (Section 5.1 Raft paper)
361    #[error("Found higher term(={0}) during election process")]
362    HigherTerm(u64),
363
364    /// Term number inconsistency
365    #[error("Term conflict (current: {current}, received: {received})")]
366    TermConflict { current: u64, received: u64 },
367
368    /// Log inconsistency during vote requests (Section 5.4.1 Raft paper)
369    #[error("Log conflict at index {index} (expected: {expected_term}, actual: {actual_term})")]
370    LogConflict {
371        index: u64,
372        expected_term: u64,
373        actual_term: u64,
374    },
375
376    /// Quorum not achieved (Section 5.2 Raft paper)
377    #[error("Quorum not reached (required: {required}, succeed: {succeed})")]
378    QuorumFailure { required: usize, succeed: usize },
379
380    /// Leadership handoff failures
381    #[error("Leadership consensus error: {0}")]
382    LeadershipConsensus(String),
383
384    /// Isolated node scenario
385    #[error("No voting member found for candidate {candidate_id}")]
386    NoVotingMemberFound { candidate_id: u32 },
387}
388
389#[derive(Debug, thiserror::Error)]
390pub enum ReplicationError {
391    /// Stale leader detected during AppendEntries RPC
392    #[error("Found higher term(={0}) during replication process")]
393    HigherTerm(u64),
394
395    /// Failed to achieve majority acknowledgment
396    #[error("Quorum not reached for log replication")]
397    QuorumNotReached,
398
399    /// Timeout to receive majority response
400    #[error("Timeout to receive majority response")]
401    QuorumTimeout,
402
403    /// Target follower node unreachable
404    #[error("Node {node_id} unreachable for replication")]
405    NodeUnreachable { node_id: u32 },
406
407    /// Network timeout during replication RPC
408    #[error("RPC timeout after {duration}ms")]
409    RpcTimeout { duration: u64 },
410
411    /// Missing peer configuration in leader state
412    #[error("No peer mapping for leader {leader_id}")]
413    NoPeerFound { leader_id: u32 },
414
415    /// Log inconsistency detected during replication (ยง5.3)
416    #[error("Log conflict at index {index} (expected term {expected_term}, actual {actual_term})")]
417    LogConflict {
418        index: u64,
419        expected_term: u64,
420        actual_term: u64,
421    },
422
423    /// Node not in leader state for replication requests
424    #[error("Replication requires leader role (known leader: {leader_id:?})")]
425    NotLeader { leader_id: Option<u32> },
426}
427
428#[derive(Debug, thiserror::Error)]
429pub enum MembershipError {
430    /// Failed to reach consensus on configuration change
431    #[error("Membership update consensus failure: {0}")]
432    ConfigChangeUpdateFailed(String),
433
434    /// Non-leader node attempted membership change
435    #[error("Membership changes require leader role")]
436    NotLeader,
437
438    /// No leader information available
439    #[error("No leader information available")]
440    NoLeaderFound,
441
442    /// Non-learner node attempted join cluster
443    #[error("Only Learner can join cluster")]
444    NotLearner,
445
446    /// Cluster not in operational state
447    #[error("Cluster bootstrap not completed")]
448    ClusterIsNotReady,
449
450    /// Connection establishment failure during join
451    #[error("Cluster connection setup failed: {0}")]
452    SetupClusterConnectionFailed(String),
453
454    /// Missing node metadata in configuration
455    #[error("Metadata missing for node {node_id} in cluster config")]
456    NoMetadataFoundForNode { node_id: u32 },
457
458    /// No available peers found for request
459    #[error("No reachable peers found in cluster membership")]
460    NoPeersAvailable,
461
462    /// Node already been added into cluster config
463    #[error("Node({0}) already been added into cluster config.")]
464    NodeAlreadyExists(u32),
465
466    /// To be removed node is leader.
467    #[error("To be removed node({0}) is leader.")]
468    RemoveNodeIsLeader(u32),
469
470    #[error("Cannot promote node {node_id}: current role is {role} (expected LEARNER)")]
471    InvalidPromotion { node_id: u32, role: i32 },
472
473    #[error("Invalid membership change request")]
474    InvalidChangeRequest,
475
476    #[error("Commit Timeout")]
477    CommitTimeout,
478
479    #[error("Learner({0}) join cluster failed.")]
480    JoinClusterFailed(u32),
481
482    #[error("Join cluster error: {0}")]
483    JoinClusterError(String),
484
485    #[error("Not leader")]
486    NoLeader,
487
488    #[error("Mark leader id failed: {0}")]
489    MarkLeaderIdFailed(String),
490}
491
492#[derive(Debug, thiserror::Error)]
493pub enum SnapshotError {
494    #[error("Snapshot receiver lagging, dropping chunk")]
495    Backpressure,
496
497    /// Snapshot chunk rejected during installation
498    #[error("Install snapshot RPC request been rejected, last_chunk={last_chunk}")]
499    Rejected { last_chunk: u32 },
500
501    #[error("Install snapshot RPC request been rejected")]
502    RemoteRejection,
503
504    /// Snapshot transfer failed due to stream/network issues
505    #[error("Install snapshot RPC request failed")]
506    TransferFailed,
507
508    /// Snapshot transfer timeout due to network issues
509    #[error("Install snapshot RPC request timeout")]
510    TransferTimeout,
511
512    /// Snapshot operation failed with context
513    #[error("Snapshot operation failed: {0}")]
514    OperationFailed(String),
515
516    /// Snapshot is outdated and cannot be used
517    #[error("Snapshot is outdated")]
518    Outdated,
519
520    /// Snapshot file checksum mismatch
521    #[error("Snapshot file checksum mismatch")]
522    ChecksumMismatch,
523
524    /// Invalid snapshot
525    #[error("Invalid snapshot")]
526    InvalidSnapshot,
527
528    /// Invalid chunk sequence
529    #[error("Invalid chunk sequence")]
530    InvalidChunkSequence,
531
532    /// Stream receiver disconnected
533    #[error("Stream receiver disconnected")]
534    ReceiverDisconnected,
535
536    #[error("Invalid first snapshot stream chunk")]
537    InvalidFirstChunk,
538
539    #[error("Empty snapshot stream chunk")]
540    EmptySnapshot,
541
542    #[error("Incomplete snapshot error")]
543    IncompleteSnapshot,
544
545    #[error("Requested chunk {0} out of range (max: {1})")]
546    ChunkOutOfRange(u32, u32),
547
548    #[error("Chunk in stream is out of order")]
549    OutOfOrderChunk,
550
551    #[error("No metadata in chunk")]
552    MissingMetadata,
553
554    #[error("Chunk not cached: {0}")]
555    ChunkNotCached(u32),
556
557    #[error("Background stream push task died")]
558    BackgroundTaskDied,
559}
560
561// ============== Conversion Implementations ============== //
562impl From<NetworkError> for Error {
563    fn from(e: NetworkError) -> Self {
564        Error::System(SystemError::Network(e))
565    }
566}
567
568impl From<StorageError> for Error {
569    fn from(e: StorageError) -> Self {
570        Error::System(SystemError::Storage(e))
571    }
572}
573
574impl From<ConvertError> for Error {
575    fn from(e: ConvertError) -> Self {
576        Error::System(SystemError::Storage(StorageError::Convert(e)))
577    }
578}
579
580impl From<FileError> for Error {
581    fn from(e: FileError) -> Self {
582        Error::System(SystemError::Storage(StorageError::File(e)))
583    }
584}
585
586impl From<SerializationError> for Error {
587    fn from(e: SerializationError) -> Self {
588        Error::System(SystemError::Serialization(e))
589    }
590}
591
592// // These allow direct conversion from prost errors to SystemError
593// impl From<prost::EncodeError> for SystemError {
594//     fn from(error: prost::EncodeError) -> Self {
595//         SystemError::Prost(ProstError::Encode(error))
596//     }
597// }
598
599// impl From<prost::DecodeError> for SystemError {
600//     fn from(error: prost::DecodeError) -> Self {
601//         SystemError::Prost(ProstError::Decode(error))
602//     }
603// }
604
605impl From<ProstError> for Error {
606    fn from(error: ProstError) -> Self {
607        Error::System(SystemError::Prost(error))
608    }
609}
610
611// ===== Consensus Error conversions =====
612
613impl From<StateTransitionError> for Error {
614    fn from(e: StateTransitionError) -> Self {
615        Error::Consensus(ConsensusError::StateTransition(e))
616    }
617}
618
619impl From<ElectionError> for Error {
620    fn from(e: ElectionError) -> Self {
621        Error::Consensus(ConsensusError::Election(e))
622    }
623}
624
625impl From<ReplicationError> for Error {
626    fn from(e: ReplicationError) -> Self {
627        Error::Consensus(ConsensusError::Replication(e))
628    }
629}
630
631impl From<MembershipError> for Error {
632    fn from(e: MembershipError) -> Self {
633        Error::Consensus(ConsensusError::Membership(e))
634    }
635}
636
637// ===== Network sub-error conversions =====
638impl From<ReadSendError> for Error {
639    fn from(e: ReadSendError) -> Self {
640        Error::System(SystemError::Network(NetworkError::ReadSend(e)))
641    }
642}
643
644impl From<WriteSendError> for Error {
645    fn from(e: WriteSendError) -> Self {
646        Error::System(SystemError::Network(NetworkError::WriteSend(e)))
647    }
648}
649
650impl From<tonic::transport::Error> for Error {
651    fn from(err: tonic::transport::Error) -> Self {
652        NetworkError::TonicError(Box::new(err)).into()
653    }
654}
655
656impl From<JoinError> for Error {
657    fn from(err: JoinError) -> Self {
658        NetworkError::TaskFailed(err).into()
659    }
660}
661
662impl From<SnapshotError> for Error {
663    fn from(e: SnapshotError) -> Self {
664        Error::Consensus(ConsensusError::Snapshot(e))
665    }
666}
667
668impl From<IdAllocationError> for Error {
669    fn from(e: IdAllocationError) -> Self {
670        StorageError::IdAllocation(e).into()
671    }
672}
673
674impl From<std::io::Error> for Error {
675    fn from(e: std::io::Error) -> Self {
676        StorageError::IoError(e).into()
677    }
678}