d_engine_core/
errors.rs

1//! Raft Consensus Protocol Error Hierarchy
2//!
3//! Defines comprehensive error types for a Raft-based distributed system,
4//! categorized by protocol layer and operational concerns.
5
6use std::path::PathBuf;
7use std::time::Duration;
8
9use config::ConfigError;
10use tokio::task::JoinError;
11
12#[doc(hidden)]
13pub type Result<T> = std::result::Result<T, Error>;
14
15#[derive(Debug, thiserror::Error)]
16pub enum Error {
17    /// Infrastructure-level failures (network, storage, serialization)
18    #[error(transparent)]
19    System(#[from] SystemError),
20
21    /// Cluster configuration validation failures
22    #[error(transparent)]
23    Config(#[from] ConfigError),
24
25    /// Raft consensus protocol violations and failures
26    #[error(transparent)]
27    Consensus(#[from] ConsensusError),
28
29    /// Unrecoverable failures requiring process termination
30    #[error("Fatal error: {0}")]
31    Fatal(String),
32}
33
34#[derive(Debug, thiserror::Error)]
35pub enum ConsensusError {
36    /// Illegal Raft node state transitions
37    #[error(transparent)]
38    StateTransition(#[from] StateTransitionError),
39
40    /// Leader election failures (Section 5.2 Raft paper)
41    #[error(transparent)]
42    Election(#[from] ElectionError),
43
44    /// Log replication failures (Section 5.3 Raft paper)
45    #[error(transparent)]
46    Replication(#[from] ReplicationError),
47
48    /// Cluster membership change failures (Section 6 Raft paper)
49    #[error(transparent)]
50    Membership(#[from] MembershipError),
51
52    /// Snapshot-related errors during installation or restoration
53    #[error(transparent)]
54    Snapshot(#[from] SnapshotError),
55
56    /// Role permission conflict error
57    #[error("Operation requires {required_role} role but current role is {current_role}")]
58    RoleViolation {
59        current_role: &'static str,
60        required_role: &'static str,
61        context: String,
62    },
63}
64
65#[derive(Debug, thiserror::Error)]
66#[doc(hidden)]
67pub enum StateTransitionError {
68    #[error("Not enough votes to transition to leader.")]
69    NotEnoughVotes,
70
71    #[error("Invalid state transition.")]
72    InvalidTransition,
73
74    #[error("Lock error.")]
75    LockError,
76}
77
78#[derive(Debug, thiserror::Error)]
79pub enum NetworkError {
80    /// Endpoint unavailable (HTTP 503 equivalent)
81    #[error("Service unavailable: {0}")]
82    ServiceUnavailable(String),
83
84    /// Peer communication timeout
85    #[error("Connection timeout to {node_id} after {duration:?}")]
86    Timeout { node_id: u32, duration: Duration },
87
88    /// Unreachable node with source context
89    #[error("Network unreachable: {source}")]
90    Unreachable {
91        source: Box<dyn std::error::Error + Send + Sync>,
92    },
93
94    /// Persistent connection failures
95    #[error("Socket connect failed error: {0}")]
96    ConnectError(String),
97
98    /// Retry policy exhaustion
99    #[error("Retry timeout after {0:?}")]
100    RetryTimeoutError(Duration),
101
102    /// TLS negotiation failures
103    #[error("TLS handshake failed")]
104    TlsHandshakeFailure,
105
106    /// Missing peer list for RPC
107    #[error("Request list for {request_type} contains no peers")]
108    EmptyPeerList { request_type: &'static str },
109
110    /// Peer connection not found
111    #[error("Peer({0}) connection not found")]
112    PeerConnectionNotFound(u32),
113
114    /// Peer closed the channel
115    #[error("Peer closed the channel")]
116    ResponseChannelClosed,
117
118    /// Peer address not found
119    #[error("Peer({0}) address not found")]
120    PeerAddressNotFound(u32),
121
122    /// Malformed node addresses
123    #[error("Invalid URI format: {0}")]
124    InvalidURI(String),
125
126    /// RPC transmission failures with context
127    #[error("Failed to send {request_type} request")]
128    RequestSendFailure {
129        request_type: &'static str,
130        #[source]
131        source: Box<tonic::transport::Error>,
132    },
133
134    /// Low-level TCP configuration errors
135    #[error("TCP keepalive configuration error")]
136    TcpKeepaliveError,
137
138    /// HTTP/2 protocol configuration errors
139    #[error("HTTP/2 keepalive configuration error")]
140    Http2KeepaliveError,
141
142    /// gRPC transport layer errors
143    #[error(transparent)]
144    TonicError(#[from] Box<tonic::transport::Error>),
145
146    /// gRPC status code errors
147    #[error(transparent)]
148    TonicStatusError(#[from] Box<tonic::Status>),
149
150    #[error("Failed to send read request: {0}")]
151    ReadSend(#[from] ReadSendError),
152
153    #[error("Failed to send write request: {0}")]
154    WriteSend(#[from] WriteSendError),
155
156    #[error("Background task failed: {0}")]
157    TaskFailed(#[from] JoinError),
158
159    #[error("{0}")]
160    TaskBackoffFailed(String),
161
162    #[error("{0}")]
163    SingalSendFailed(String),
164
165    #[error("{0}")]
166    SingalReceiveFailed(String),
167
168    // #[error("Install snapshot RPC request been rejected, last_chunk={last_chunk}")]
169    // SnapshotRejected { last_chunk: u32 },
170
171    // #[error("Install snapshot RPC request failed")]
172    // SnapshotTransferFailed,
173    #[error("New node join cluster failed: {0}")]
174    JoinFailed(String),
175
176    #[error("Network timeout: {0}")]
177    GlobalTimeout(String),
178}
179
180#[derive(Debug, thiserror::Error)]
181pub enum StorageError {
182    /// Disk I/O failures during log/snapshot operations
183    #[error(transparent)]
184    IoError(#[from] std::io::Error),
185
186    /// Custom error with a path as a string slice (`&str`)
187    #[error("Error occurred at path: {path}")]
188    PathError {
189        path: PathBuf, // Use &str for lightweight references
190        source: std::io::Error,
191    },
192
193    /// Serialization failures for persisted data
194    #[error(transparent)]
195    BincodeError(#[from] bincode::Error),
196
197    /// State machine application errors
198    #[error("State Machine error: {0}")]
199    StateMachineError(String),
200
201    /// Log storage subsystem failures
202    #[error("Log storage failure: {0}")]
203    LogStorage(String),
204
205    // /// Snapshot creation/restoration failures
206    // #[error("Snapshot operation failed: {0}")]
207    // Snapshot(String),
208    /// Checksum validation failures
209    #[error("Data corruption detected at {location}")]
210    DataCorruption { location: String },
211
212    /// Configuration storage failures
213    #[error("Configuration storage error: {0}")]
214    ConfigStorage(String),
215
216    /// Embedded database errors
217    #[error("Embedded database error: {0}")]
218    DbError(String),
219
220    /// Error type for value conversion operations
221    #[error("Value convert failed")]
222    Convert(#[from] ConvertError),
223
224    /// File errors
225    #[error("File errors")]
226    File(#[from] FileError),
227
228    /// Serialization error
229    #[error("Serialization error: {0}")]
230    SerializationError(String),
231
232    /// ID allocation errors
233    #[error(transparent)]
234    IdAllocation(#[from] IdAllocationError),
235
236    /// Feature not enabled in configuration
237    ///
238    /// Returned when client requests a feature (e.g., TTL) that is not
239    /// enabled in the server configuration. This prevents silent failures
240    /// and ensures explicit feature activation.
241    #[error("Feature not enabled: {0}")]
242    FeatureNotEnabled(String),
243
244    /// State machine not serving requests
245    ///
246    /// Returned when read operations are attempted during critical operations
247    /// such as snapshot restoration. This ensures reads never access inconsistent
248    /// or temporary state during database transitions.
249    #[error("State machine not serving: {0}")]
250    NotServing(String),
251}
252
253#[derive(Debug, thiserror::Error)]
254pub enum IdAllocationError {
255    /// ID allocation overflow
256    #[error("ID allocation overflow: {start} > {end}")]
257    Overflow { start: u64, end: u64 },
258
259    /// Invalid ID range
260    #[error("Invalid ID range: {start}..={end}")]
261    InvalidRange { start: u64, end: u64 },
262
263    /// No available IDs
264    #[error("No available IDs")]
265    NoIdsAvailable,
266}
267
268#[derive(Debug, thiserror::Error)]
269pub enum FileError {
270    #[error("Path does not exist: {0}")]
271    NotFound(String),
272    #[error("Path is a directory: {0}")]
273    IsDirectory(String),
274    #[error("File is busy: {0}")]
275    Busy(String),
276    #[error("Insufficient permissions: {0}")]
277    PermissionDenied(String),
278    #[error("File is occupied: {0}")]
279    FileBusy(String),
280    #[error("Invalid path: {0}")]
281    InvalidPath(String),
282    #[error("Too small: {0}")]
283    TooSmall(u64),
284    #[error("Invalid extension: {0}")]
285    InvalidExt(String),
286    #[error("Invalid GZIP header: {0}")]
287    InvalidGzipHeader(String),
288    #[error("Unknown IO error: {0}")]
289    UnknownIo(String),
290}
291
292/// Error type for value conversion operations
293#[derive(Debug, thiserror::Error)]
294pub enum ConvertError {
295    /// Invalid input length error
296    ///
297    /// This occurs when the input byte slice length doesn't match the required 8 bytes.
298    #[error("invalid byte length: expected 8 bytes, received {0} bytes")]
299    InvalidLength(usize),
300
301    /// Generic conversion failure with detailed message
302    ///
303    /// Wraps underlying parsing/conversion errors with context information
304    #[error("conversion failure: {0}")]
305    ConversionFailure(String),
306}
307
308#[derive(Debug, thiserror::Error)]
309pub enum ReadSendError {
310    #[error("Network timeout")]
311    Timeout(#[from] tokio::time::error::Elapsed),
312
313    #[error("Connection failed")]
314    Connection(#[from] tonic::transport::Error),
315}
316
317#[derive(Debug, thiserror::Error)]
318pub enum WriteSendError {
319    #[error("Not cluster leader")]
320    NotLeader,
321
322    #[error("Network unreachable")]
323    Unreachable,
324
325    #[error("Payload too large")]
326    PayloadExceeded,
327}
328
329#[derive(Debug, thiserror::Error)]
330pub enum SystemError {
331    // Network layer
332    #[error("Network error: {0}")]
333    Network(#[from] NetworkError),
334
335    // Storage layer
336    #[error("Storage operation failed")]
337    Storage(#[from] StorageError),
338
339    //Serialization
340    #[error("Serialization error")]
341    Serialization(#[from] SerializationError),
342
343    /// Protocol buffer encoding/decoding specific errors
344    #[error("Protobuf operation failed: {0}")]
345    Prost(#[from] ProstError),
346
347    // Basic node operations
348    #[error("Node failed to start: {0}")]
349    NodeStartFailed(String),
350
351    #[error("General server error: {0}")]
352    GeneralServer(String),
353
354    #[error("Internal server error")]
355    ServerUnavailable,
356
357    /// State machine does not support lease-based expiration
358    #[error("State machine does not support lease management")]
359    LeaseNotSupported,
360}
361
362// Serialization is classified separately (across protocol layers and system layers)
363#[derive(Debug, thiserror::Error)]
364pub enum SerializationError {
365    #[error("Bincode serialization failed: {0}")]
366    Bincode(#[from] bincode::Error),
367}
368
369/// Wrapper for prost encoding/decoding errors
370#[derive(Debug, thiserror::Error)]
371pub enum ProstError {
372    #[error("Encoding failed: {0}")]
373    Encode(#[from] prost::EncodeError),
374
375    #[error("Decoding failed: {0}")]
376    Decode(#[from] prost::DecodeError),
377}
378
379#[derive(Debug, thiserror::Error)]
380pub enum ElectionError {
381    /// General election process failure
382    #[error("Election failed: {0}")]
383    Failed(String),
384
385    /// Stale term detection (Section 5.1 Raft paper)
386    #[error("Found higher term(={0}) during election process")]
387    HigherTerm(u64),
388
389    /// Term number inconsistency
390    #[error("Term conflict (current: {current}, received: {received})")]
391    TermConflict { current: u64, received: u64 },
392
393    /// Log inconsistency during vote requests (Section 5.4.1 Raft paper)
394    #[error("Log conflict at index {index} (expected: {expected_term}, actual: {actual_term})")]
395    LogConflict {
396        index: u64,
397        expected_term: u64,
398        actual_term: u64,
399    },
400
401    /// Quorum not achieved (Section 5.2 Raft paper)
402    #[error("Quorum not reached (required: {required}, succeed: {succeed})")]
403    QuorumFailure { required: usize, succeed: usize },
404
405    /// Leadership handoff failures
406    #[error("Leadership consensus error: {0}")]
407    LeadershipConsensus(String),
408
409    /// Isolated node scenario
410    #[error("No voting member found for candidate {candidate_id}")]
411    NoVotingMemberFound { candidate_id: u32 },
412}
413
414#[derive(Debug, thiserror::Error)]
415pub enum ReplicationError {
416    /// Stale leader detected during AppendEntries RPC
417    #[error("Found higher term(={0}) during replication process")]
418    HigherTerm(u64),
419
420    /// Failed to achieve majority acknowledgment
421    #[error("Quorum not reached for log replication")]
422    QuorumNotReached,
423
424    /// Timeout to receive majority response
425    #[error("Timeout to receive majority response")]
426    QuorumTimeout,
427
428    /// Target follower node unreachable
429    #[error("Node {node_id} unreachable for replication")]
430    NodeUnreachable { node_id: u32 },
431
432    /// Network timeout during replication RPC
433    #[error("RPC timeout after {duration}ms")]
434    RpcTimeout { duration: u64 },
435
436    /// Missing peer configuration in leader state
437    #[error("No peer mapping for leader {leader_id}")]
438    NoPeerFound { leader_id: u32 },
439
440    /// Log inconsistency detected during replication (ยง5.3)
441    #[error("Log conflict at index {index} (expected term {expected_term}, actual {actual_term})")]
442    LogConflict {
443        index: u64,
444        expected_term: u64,
445        actual_term: u64,
446    },
447
448    /// Node not in leader state for replication requests
449    #[error("Replication requires leader role (known leader: {leader_id:?})")]
450    NotLeader { leader_id: Option<u32> },
451}
452
453/// Errors that can occur during ReadIndex batching for linearizable reads
454#[derive(Debug, thiserror::Error, Clone)]
455pub enum ReadIndexError {
456    /// This node is no longer the leader
457    #[error("Not leader - cannot serve linearizable read")]
458    NotLeader,
459
460    /// Leadership verification timed out (possible network partition)
461    #[error("Verification timeout after {timeout_ms}ms")]
462    Timeout { timeout_ms: u64 },
463
464    /// Leadership verification failed for other reasons
465    #[error("Verification failed: {reason}")]
466    VerificationFailed { reason: String },
467}
468
469#[derive(Debug, thiserror::Error)]
470pub enum MembershipError {
471    /// Failed to reach consensus on configuration change
472    #[error("Membership update consensus failure: {0}")]
473    ConfigChangeUpdateFailed(String),
474
475    /// Non-leader node attempted membership change
476    #[error("Membership changes require leader role")]
477    NotLeader,
478
479    /// No leader information available
480    #[error("No leader information available")]
481    NoLeaderFound,
482
483    /// Non-learner node attempted join cluster
484    #[error("Only Learner can join cluster")]
485    NotLearner,
486
487    /// Cluster not in operational state
488    #[error("Cluster bootstrap not completed")]
489    ClusterIsNotReady,
490
491    /// Connection establishment failure during join
492    #[error("Cluster connection setup failed: {0}")]
493    SetupClusterConnectionFailed(String),
494
495    /// Missing node metadata in configuration
496    #[error("Metadata missing for node {node_id} in cluster config")]
497    NoMetadataFoundForNode { node_id: u32 },
498
499    /// No available peers found for request
500    #[error("No reachable peers found in cluster membership")]
501    NoPeersAvailable,
502
503    /// Node already been added into cluster config
504    #[error("Node({0}) already been added into cluster config.")]
505    NodeAlreadyExists(u32),
506
507    /// To be removed node is leader.
508    #[error("To be removed node({0}) is leader.")]
509    RemoveNodeIsLeader(u32),
510
511    #[error("Cannot promote node {node_id}: current role is {role} (expected LEARNER)")]
512    InvalidPromotion { node_id: u32, role: i32 },
513
514    #[error("Invalid membership change request")]
515    InvalidChangeRequest,
516
517    #[error("Commit Timeout")]
518    CommitTimeout,
519
520    #[error("Learner({0}) join cluster failed.")]
521    JoinClusterFailed(u32),
522
523    #[error("Join cluster error: {0}")]
524    JoinClusterError(String),
525
526    #[error("Not leader")]
527    NoLeader,
528
529    #[error("Mark leader id failed: {0}")]
530    MarkLeaderIdFailed(String),
531
532    /// Cluster metadata not initialized when required
533    #[error(
534        "BUG: cluster_metadata not initialized! Leader must call init_cluster_metadata() after election"
535    )]
536    ClusterMetadataNotInitialized,
537}
538
539#[derive(Debug, thiserror::Error)]
540pub enum SnapshotError {
541    #[error("Snapshot receiver lagging, dropping chunk")]
542    Backpressure,
543
544    /// Snapshot chunk rejected during installation
545    #[error("Install snapshot RPC request been rejected, last_chunk={last_chunk}")]
546    Rejected { last_chunk: u32 },
547
548    #[error("Install snapshot RPC request been rejected")]
549    RemoteRejection,
550
551    /// Snapshot transfer failed due to stream/network issues
552    #[error("Install snapshot RPC request failed")]
553    TransferFailed,
554
555    /// Snapshot transfer timeout due to network issues
556    #[error("Install snapshot RPC request timeout")]
557    TransferTimeout,
558
559    /// Snapshot operation failed with context
560    #[error("Snapshot operation failed: {0}")]
561    OperationFailed(String),
562
563    /// Snapshot is outdated and cannot be used
564    #[error("Snapshot is outdated")]
565    Outdated,
566
567    /// Snapshot file checksum mismatch
568    #[error("Snapshot file checksum mismatch")]
569    ChecksumMismatch,
570
571    /// Invalid snapshot
572    #[error("Invalid snapshot")]
573    InvalidSnapshot,
574
575    /// Invalid chunk sequence
576    #[error("Invalid chunk sequence")]
577    InvalidChunkSequence,
578
579    /// Stream receiver disconnected
580    #[error("Stream receiver disconnected")]
581    ReceiverDisconnected,
582
583    #[error("Invalid first snapshot stream chunk")]
584    InvalidFirstChunk,
585
586    #[error("Empty snapshot stream chunk")]
587    EmptySnapshot,
588
589    #[error("Incomplete snapshot error")]
590    IncompleteSnapshot,
591
592    #[error("Requested chunk {0} out of range (max: {1})")]
593    ChunkOutOfRange(u32, u32),
594
595    #[error("Chunk in stream is out of order")]
596    OutOfOrderChunk,
597
598    #[error("No metadata in chunk")]
599    MissingMetadata,
600
601    #[error("Chunk not cached: {0}")]
602    ChunkNotCached(u32),
603
604    #[error("Background stream push task died")]
605    BackgroundTaskDied,
606}
607
608// ============== Conversion Implementations ============== //
609impl From<NetworkError> for Error {
610    fn from(e: NetworkError) -> Self {
611        Error::System(SystemError::Network(e))
612    }
613}
614
615impl From<StorageError> for Error {
616    fn from(e: StorageError) -> Self {
617        Error::System(SystemError::Storage(e))
618    }
619}
620
621impl From<ConvertError> for Error {
622    fn from(e: ConvertError) -> Self {
623        Error::System(SystemError::Storage(StorageError::Convert(e)))
624    }
625}
626
627impl From<FileError> for Error {
628    fn from(e: FileError) -> Self {
629        Error::System(SystemError::Storage(StorageError::File(e)))
630    }
631}
632
633impl From<SerializationError> for Error {
634    fn from(e: SerializationError) -> Self {
635        Error::System(SystemError::Serialization(e))
636    }
637}
638
639// // These allow direct conversion from prost errors to SystemError
640// impl From<prost::EncodeError> for SystemError {
641//     fn from(error: prost::EncodeError) -> Self {
642//         SystemError::Prost(ProstError::Encode(error))
643//     }
644// }
645
646// impl From<prost::DecodeError> for SystemError {
647//     fn from(error: prost::DecodeError) -> Self {
648//         SystemError::Prost(ProstError::Decode(error))
649//     }
650// }
651
652impl From<ProstError> for Error {
653    fn from(error: ProstError) -> Self {
654        Error::System(SystemError::Prost(error))
655    }
656}
657
658// ===== Consensus Error conversions =====
659
660impl From<StateTransitionError> for Error {
661    fn from(e: StateTransitionError) -> Self {
662        Error::Consensus(ConsensusError::StateTransition(e))
663    }
664}
665
666impl From<ElectionError> for Error {
667    fn from(e: ElectionError) -> Self {
668        Error::Consensus(ConsensusError::Election(e))
669    }
670}
671
672impl From<ReplicationError> for Error {
673    fn from(e: ReplicationError) -> Self {
674        Error::Consensus(ConsensusError::Replication(e))
675    }
676}
677
678impl From<MembershipError> for Error {
679    fn from(e: MembershipError) -> Self {
680        Error::Consensus(ConsensusError::Membership(e))
681    }
682}
683
684// ===== Network sub-error conversions =====
685impl From<ReadSendError> for Error {
686    fn from(e: ReadSendError) -> Self {
687        Error::System(SystemError::Network(NetworkError::ReadSend(e)))
688    }
689}
690
691impl From<WriteSendError> for Error {
692    fn from(e: WriteSendError) -> Self {
693        Error::System(SystemError::Network(NetworkError::WriteSend(e)))
694    }
695}
696
697impl From<tonic::transport::Error> for Error {
698    fn from(err: tonic::transport::Error) -> Self {
699        NetworkError::TonicError(Box::new(err)).into()
700    }
701}
702
703impl From<JoinError> for Error {
704    fn from(err: JoinError) -> Self {
705        NetworkError::TaskFailed(err).into()
706    }
707}
708
709impl From<SnapshotError> for Error {
710    fn from(e: SnapshotError) -> Self {
711        Error::Consensus(ConsensusError::Snapshot(e))
712    }
713}
714
715impl From<ReadIndexError> for Error {
716    fn from(e: ReadIndexError) -> Self {
717        Error::Consensus(ConsensusError::Replication(match e {
718            ReadIndexError::NotLeader => ReplicationError::NotLeader { leader_id: None },
719            ReadIndexError::Timeout { timeout_ms } => ReplicationError::RpcTimeout {
720                duration: timeout_ms,
721            },
722            ReadIndexError::VerificationFailed { reason: _ } => ReplicationError::QuorumNotReached,
723        }))
724    }
725}
726
727impl From<IdAllocationError> for Error {
728    fn from(e: IdAllocationError) -> Self {
729        StorageError::IdAllocation(e).into()
730    }
731}
732
733impl From<std::io::Error> for Error {
734    fn from(e: std::io::Error) -> Self {
735        StorageError::IoError(e).into()
736    }
737}