Skip to main content

d_engine_core/
errors.rs

1//! Raft Consensus Protocol Error Hierarchy
2//!
3//! Defines comprehensive error types for a Raft-based distributed system,
4//! categorized by protocol layer and operational concerns.
5
6use std::path::PathBuf;
7use std::time::Duration;
8
9use config::ConfigError;
10use tokio::task::JoinError;
11
12#[doc(hidden)]
13pub type Result<T> = std::result::Result<T, Error>;
14
15#[derive(Debug, thiserror::Error)]
16pub enum Error {
17    /// Infrastructure-level failures (network, storage, serialization)
18    #[error(transparent)]
19    System(#[from] SystemError),
20
21    /// Cluster configuration validation failures
22    #[error(transparent)]
23    Config(#[from] ConfigError),
24
25    /// Raft consensus protocol violations and failures
26    #[error(transparent)]
27    Consensus(#[from] ConsensusError),
28
29    /// Unrecoverable failures requiring process termination
30    #[error("Fatal error: {0}")]
31    Fatal(String),
32}
33
34impl Error {
35    /// Returns true only for errors that require node shutdown.
36    ///
37    /// Fatal: explicit `Fatal` marker, storage data corruption.
38    /// Non-fatal (warn + continue): network, snapshot, election, replication.
39    pub fn is_fatal(&self) -> bool {
40        matches!(
41            self,
42            Error::Fatal(_)
43                | Error::System(SystemError::Storage(StorageError::DataCorruption { .. }))
44        )
45    }
46}
47
48#[derive(Debug, thiserror::Error)]
49pub enum ConsensusError {
50    /// Illegal Raft node state transitions
51    #[error(transparent)]
52    StateTransition(#[from] StateTransitionError),
53
54    /// Leader election failures (Section 5.2 Raft paper)
55    #[error(transparent)]
56    Election(#[from] ElectionError),
57
58    /// Log replication failures (Section 5.3 Raft paper)
59    #[error(transparent)]
60    Replication(#[from] ReplicationError),
61
62    /// Cluster membership change failures (Section 6 Raft paper)
63    #[error(transparent)]
64    Membership(#[from] MembershipError),
65
66    /// Snapshot-related errors during installation or restoration
67    #[error(transparent)]
68    Snapshot(#[from] SnapshotError),
69
70    /// Role permission conflict error
71    #[error("Operation requires {required_role} role but current role is {current_role}")]
72    RoleViolation {
73        current_role: &'static str,
74        required_role: &'static str,
75        context: String,
76    },
77}
78
79#[derive(Debug, thiserror::Error)]
80#[doc(hidden)]
81pub enum StateTransitionError {
82    #[error("Not enough votes to transition to leader.")]
83    NotEnoughVotes,
84
85    #[error("Invalid state transition.")]
86    InvalidTransition,
87
88    #[error("Lock error.")]
89    LockError,
90}
91
92#[derive(Debug, thiserror::Error)]
93pub enum NetworkError {
94    /// Endpoint unavailable (HTTP 503 equivalent)
95    #[error("Service unavailable: {0}")]
96    ServiceUnavailable(String),
97
98    /// Peer communication timeout
99    #[error("Connection timeout to {node_id} after {duration:?}")]
100    Timeout { node_id: u32, duration: Duration },
101
102    /// Unreachable node with source context
103    #[error("Network unreachable: {source}")]
104    Unreachable {
105        source: Box<dyn std::error::Error + Send + Sync>,
106    },
107
108    /// Persistent connection failures
109    #[error("Socket connect failed error: {0}")]
110    ConnectError(String),
111
112    /// Retry policy exhaustion
113    #[error("Retry timeout after {0:?}")]
114    RetryTimeoutError(Duration),
115
116    /// TLS negotiation failures
117    #[error("TLS handshake failed")]
118    TlsHandshakeFailure,
119
120    /// Missing peer list for RPC
121    #[error("Request list for {request_type} contains no peers")]
122    EmptyPeerList { request_type: &'static str },
123
124    /// Peer connection not found
125    #[error("Peer({0}) connection not found")]
126    PeerConnectionNotFound(u32),
127
128    /// Peer closed the channel
129    #[error("Peer closed the channel")]
130    ResponseChannelClosed,
131
132    /// Peer address not found
133    #[error("Peer({0}) address not found")]
134    PeerAddressNotFound(u32),
135
136    /// Malformed node addresses
137    #[error("Invalid URI format: {0}")]
138    InvalidURI(String),
139
140    /// RPC transmission failures with context
141    #[error("Failed to send {request_type} request")]
142    RequestSendFailure {
143        request_type: &'static str,
144        #[source]
145        source: Box<tonic::transport::Error>,
146    },
147
148    /// Low-level TCP configuration errors
149    #[error("TCP keepalive configuration error")]
150    TcpKeepaliveError,
151
152    /// HTTP/2 protocol configuration errors
153    #[error("HTTP/2 keepalive configuration error")]
154    Http2KeepaliveError,
155
156    /// gRPC transport layer errors
157    #[error(transparent)]
158    TonicError(#[from] Box<tonic::transport::Error>),
159
160    /// gRPC status code errors
161    #[error(transparent)]
162    TonicStatusError(#[from] Box<tonic::Status>),
163
164    #[error("Failed to send read request: {0}")]
165    ReadSend(#[from] ReadSendError),
166
167    #[error("Failed to send write request: {0}")]
168    WriteSend(#[from] WriteSendError),
169
170    #[error("Background task failed: {0}")]
171    TaskFailed(#[from] JoinError),
172
173    #[error("{0}")]
174    TaskBackoffFailed(String),
175
176    #[error("{0}")]
177    SingalSendFailed(String),
178
179    #[error("{0}")]
180    SingalReceiveFailed(String),
181
182    // #[error("Install snapshot RPC request been rejected, last_chunk={last_chunk}")]
183    // SnapshotRejected { last_chunk: u32 },
184
185    // #[error("Install snapshot RPC request failed")]
186    // SnapshotTransferFailed,
187    #[error("New node join cluster failed: {0}")]
188    JoinFailed(String),
189
190    #[error("Network timeout: {0}")]
191    GlobalTimeout(String),
192}
193
194#[derive(Debug, thiserror::Error)]
195pub enum StorageError {
196    /// Disk I/O failures during log/snapshot operations
197    #[error(transparent)]
198    IoError(#[from] std::io::Error),
199
200    /// Custom error with a path as a string slice (`&str`)
201    #[error("Error occurred at path: {path}")]
202    PathError {
203        path: PathBuf, // Use &str for lightweight references
204        source: std::io::Error,
205    },
206
207    /// Serialization failures for persisted data
208    #[error(transparent)]
209    BincodeError(#[from] bincode::Error),
210
211    /// State machine application errors
212    #[error("State Machine error: {0}")]
213    StateMachineError(String),
214
215    /// Log storage subsystem failures
216    #[error("Log storage failure: {0}")]
217    LogStorage(String),
218
219    // /// Snapshot creation/restoration failures
220    // #[error("Snapshot operation failed: {0}")]
221    // Snapshot(String),
222    /// Checksum validation failures
223    #[error("Data corruption detected at {location}")]
224    DataCorruption { location: String },
225
226    /// Configuration storage failures
227    #[error("Configuration storage error: {0}")]
228    ConfigStorage(String),
229
230    /// Embedded database errors
231    #[error("Embedded database error: {0}")]
232    DbError(String),
233
234    /// Error type for value conversion operations
235    #[error("Value convert failed")]
236    Convert(#[from] ConvertError),
237
238    /// File errors
239    #[error("File errors")]
240    File(#[from] FileError),
241
242    /// Serialization error
243    #[error("Serialization error: {0}")]
244    SerializationError(String),
245
246    /// ID allocation errors
247    #[error(transparent)]
248    IdAllocation(#[from] IdAllocationError),
249
250    /// Feature not enabled in configuration
251    ///
252    /// Returned when client requests a feature (e.g., TTL) that is not
253    /// enabled in the server configuration. This prevents silent failures
254    /// and ensures explicit feature activation.
255    #[error("Feature not enabled: {0}")]
256    FeatureNotEnabled(String),
257
258    /// State machine not serving requests
259    ///
260    /// Returned when read operations are attempted during critical operations
261    /// such as snapshot restoration. This ensures reads never access inconsistent
262    /// or temporary state during database transitions.
263    #[error("State machine not serving: {0}")]
264    NotServing(String),
265}
266
267#[doc(hidden)]
268#[derive(Debug, thiserror::Error)]
269pub enum IdAllocationError {
270    /// ID allocation overflow
271    #[error("ID allocation overflow: {start} > {end}")]
272    Overflow { start: u64, end: u64 },
273
274    /// Invalid ID range
275    #[error("Invalid ID range: {start}..={end}")]
276    InvalidRange { start: u64, end: u64 },
277
278    /// No available IDs
279    #[error("No available IDs")]
280    NoIdsAvailable,
281}
282
283#[doc(hidden)]
284#[derive(Debug, thiserror::Error)]
285pub enum FileError {
286    #[error("Path does not exist: {0}")]
287    NotFound(String),
288    #[error("Path is a directory: {0}")]
289    IsDirectory(String),
290    #[error("File is busy: {0}")]
291    Busy(String),
292    #[error("Insufficient permissions: {0}")]
293    PermissionDenied(String),
294    #[error("File is occupied: {0}")]
295    FileBusy(String),
296    #[error("Invalid path: {0}")]
297    InvalidPath(String),
298    #[error("Too small: {0}")]
299    TooSmall(u64),
300    #[error("Invalid extension: {0}")]
301    InvalidExt(String),
302    #[error("Invalid GZIP header: {0}")]
303    InvalidGzipHeader(String),
304    #[error("Unknown IO error: {0}")]
305    UnknownIo(String),
306}
307
308/// Error type for value conversion operations
309#[doc(hidden)]
310#[derive(Debug, thiserror::Error)]
311pub enum ConvertError {
312    /// Invalid input length error
313    ///
314    /// This occurs when the input byte slice length doesn't match the required 8 bytes.
315    #[error("invalid byte length: expected 8 bytes, received {0} bytes")]
316    InvalidLength(usize),
317
318    /// Generic conversion failure with detailed message
319    ///
320    /// Wraps underlying parsing/conversion errors with context information
321    #[error("conversion failure: {0}")]
322    ConversionFailure(String),
323}
324
325#[doc(hidden)]
326#[derive(Debug, thiserror::Error)]
327pub enum ReadSendError {
328    #[error("Network timeout")]
329    Timeout(#[from] tokio::time::error::Elapsed),
330
331    #[error("Connection failed")]
332    Connection(#[from] tonic::transport::Error),
333}
334
335#[doc(hidden)]
336#[derive(Debug, thiserror::Error)]
337pub enum WriteSendError {
338    #[error("Not cluster leader")]
339    NotLeader,
340
341    #[error("Network unreachable")]
342    Unreachable,
343
344    #[error("Payload too large")]
345    PayloadExceeded,
346}
347
348#[derive(Debug, thiserror::Error)]
349pub enum SystemError {
350    // Network layer
351    #[error("Network error: {0}")]
352    Network(#[from] NetworkError),
353
354    // Storage layer
355    #[error("Storage operation failed")]
356    Storage(#[from] StorageError),
357
358    //Serialization
359    #[error("Serialization error")]
360    Serialization(#[from] SerializationError),
361
362    /// Protocol buffer encoding/decoding specific errors
363    #[error("Protobuf operation failed: {0}")]
364    Prost(#[from] ProstError),
365
366    // Basic node operations
367    #[error("Node failed to start: {0}")]
368    NodeStartFailed(String),
369
370    #[error("General server error: {0}")]
371    GeneralServer(String),
372
373    #[error("Internal server error")]
374    ServerUnavailable,
375
376    /// State machine does not support lease-based expiration
377    #[error("State machine does not support lease management")]
378    LeaseNotSupported,
379}
380
381// Serialization is classified separately (across protocol layers and system layers)
382#[doc(hidden)]
383#[derive(Debug, thiserror::Error)]
384pub enum SerializationError {
385    #[error("Bincode serialization failed: {0}")]
386    Bincode(#[from] bincode::Error),
387}
388
389/// Wrapper for prost encoding/decoding errors
390#[doc(hidden)]
391#[derive(Debug, thiserror::Error)]
392pub enum ProstError {
393    #[error("Encoding failed: {0}")]
394    Encode(#[from] prost::EncodeError),
395
396    #[error("Decoding failed: {0}")]
397    Decode(#[from] prost::DecodeError),
398}
399
400#[doc(hidden)]
401#[derive(Debug, thiserror::Error)]
402pub enum ElectionError {
403    /// General election process failure
404    #[error("Election failed: {0}")]
405    Failed(String),
406
407    /// Stale term detection (Section 5.1 Raft paper)
408    #[error("Found higher term(={0}) during election process")]
409    HigherTerm(u64),
410
411    /// Term number inconsistency
412    #[error("Term conflict (current: {current}, received: {received})")]
413    TermConflict { current: u64, received: u64 },
414
415    /// Log inconsistency during vote requests (Section 5.4.1 Raft paper)
416    #[error("Log conflict at index {index} (expected: {expected_term}, actual: {actual_term})")]
417    LogConflict {
418        index: u64,
419        expected_term: u64,
420        actual_term: u64,
421    },
422
423    /// Quorum not achieved (Section 5.2 Raft paper)
424    #[error("Quorum not reached (required: {required}, succeed: {succeed})")]
425    QuorumFailure { required: usize, succeed: usize },
426
427    /// Leadership handoff failures
428    #[error("Leadership consensus error: {0}")]
429    LeadershipConsensus(String),
430
431    /// Isolated node scenario
432    #[error("No voting member found for candidate {candidate_id}")]
433    NoVotingMemberFound { candidate_id: u32 },
434}
435
436#[doc(hidden)]
437#[derive(Debug, thiserror::Error)]
438pub enum ReplicationError {
439    /// Stale leader detected during AppendEntries RPC
440    #[error("Found higher term(={0}) during replication process")]
441    HigherTerm(u64),
442
443    /// Failed to achieve majority acknowledgment
444    #[error("Quorum not reached for log replication")]
445    QuorumNotReached,
446
447    /// Timeout to receive majority response
448    #[error("Timeout to receive majority response")]
449    QuorumTimeout,
450
451    /// Target follower node unreachable
452    #[error("Node {node_id} unreachable for replication")]
453    NodeUnreachable { node_id: u32 },
454
455    /// Network timeout during replication RPC
456    #[error("RPC timeout after {duration}ms")]
457    RpcTimeout { duration: u64 },
458
459    /// Missing peer configuration in leader state
460    #[error("No peer mapping for leader {leader_id}")]
461    NoPeerFound { leader_id: u32 },
462
463    /// Log inconsistency detected during replication (ยง5.3)
464    #[error("Log conflict at index {index} (expected term {expected_term}, actual {actual_term})")]
465    LogConflict {
466        index: u64,
467        expected_term: u64,
468        actual_term: u64,
469    },
470
471    /// Node not in leader state for replication requests
472    #[error("Replication requires leader role (known leader: {leader_id:?})")]
473    NotLeader { leader_id: Option<u32> },
474}
475
476/// Errors that can occur during ReadIndex batching for linearizable reads
477#[doc(hidden)]
478#[derive(Debug, thiserror::Error, Clone)]
479pub enum ReadIndexError {
480    /// This node is no longer the leader
481    #[error("Not leader - cannot serve linearizable read")]
482    NotLeader,
483
484    /// Leadership verification timed out (possible network partition)
485    #[error("Verification timeout after {timeout_ms}ms")]
486    Timeout { timeout_ms: u64 },
487
488    /// Leadership verification failed for other reasons
489    #[error("Verification failed: {reason}")]
490    VerificationFailed { reason: String },
491}
492
493#[derive(Debug, thiserror::Error)]
494pub enum MembershipError {
495    /// Failed to reach consensus on configuration change
496    #[error("Membership update consensus failure: {0}")]
497    ConfigChangeUpdateFailed(String),
498
499    /// Non-leader node attempted membership change
500    #[error("Membership changes require leader role")]
501    NotLeader,
502
503    /// No leader information available
504    #[error("No leader information available")]
505    NoLeaderFound,
506
507    /// Non-learner node attempted join cluster
508    #[error("Only Learner can join cluster")]
509    NotLearner,
510
511    /// Cluster not in operational state
512    #[error("Cluster bootstrap not completed")]
513    ClusterIsNotReady,
514
515    /// Connection establishment failure during join
516    #[error("Cluster connection setup failed: {0}")]
517    SetupClusterConnectionFailed(String),
518
519    /// Missing node metadata in configuration
520    #[error("Metadata missing for node {node_id} in cluster config")]
521    NoMetadataFoundForNode { node_id: u32 },
522
523    /// No available peers found for request
524    #[error("No reachable peers found in cluster membership")]
525    NoPeersAvailable,
526
527    /// Node already been added into cluster config
528    #[error("Node({0}) already been added into cluster config.")]
529    NodeAlreadyExists(u32),
530
531    /// To be removed node is leader.
532    #[error("To be removed node({0}) is leader.")]
533    RemoveNodeIsLeader(u32),
534
535    #[error("Cannot promote node {node_id}: current role is {role} (expected LEARNER)")]
536    InvalidPromotion { node_id: u32, role: i32 },
537
538    #[error("Invalid membership change request")]
539    InvalidChangeRequest,
540
541    #[error("Commit Timeout")]
542    CommitTimeout,
543
544    #[error("Learner({0}) join cluster failed.")]
545    JoinClusterFailed(u32),
546
547    #[error("Join cluster error: {0}")]
548    JoinClusterError(String),
549
550    #[error("Not leader")]
551    NoLeader,
552
553    #[error("Mark leader id failed: {0}")]
554    MarkLeaderIdFailed(String),
555
556    /// Cluster metadata not initialized when required
557    #[error(
558        "BUG: cluster_metadata not initialized! Leader must call init_cluster_metadata() after election"
559    )]
560    ClusterMetadataNotInitialized,
561}
562
563#[doc(hidden)]
564#[derive(Debug, thiserror::Error)]
565pub enum SnapshotError {
566    #[error("Snapshot receiver lagging, dropping chunk")]
567    Backpressure,
568
569    /// Snapshot chunk rejected during installation
570    #[error("Install snapshot RPC request been rejected, last_chunk={last_chunk}")]
571    Rejected { last_chunk: u32 },
572
573    #[error("Install snapshot RPC request been rejected")]
574    RemoteRejection,
575
576    /// Snapshot transfer failed due to stream/network issues
577    #[error("Install snapshot RPC request failed")]
578    TransferFailed,
579
580    /// Snapshot transfer timeout due to network issues
581    #[error("Install snapshot RPC request timeout")]
582    TransferTimeout,
583
584    /// Snapshot operation failed with context
585    #[error("Snapshot operation failed: {0}")]
586    OperationFailed(String),
587
588    /// Snapshot is outdated and cannot be used
589    #[error("Snapshot is outdated")]
590    Outdated,
591
592    /// Snapshot file checksum mismatch
593    #[error("Snapshot file checksum mismatch")]
594    ChecksumMismatch,
595
596    /// Invalid snapshot
597    #[error("Invalid snapshot")]
598    InvalidSnapshot,
599
600    /// Invalid chunk sequence
601    #[error("Invalid chunk sequence")]
602    InvalidChunkSequence,
603
604    /// Stream receiver disconnected
605    #[error("Stream receiver disconnected")]
606    ReceiverDisconnected,
607
608    #[error("Invalid first snapshot stream chunk")]
609    InvalidFirstChunk,
610
611    #[error("Empty snapshot stream chunk")]
612    EmptySnapshot,
613
614    #[error("Incomplete snapshot error")]
615    IncompleteSnapshot,
616
617    #[error("Requested chunk {0} out of range (max: {1})")]
618    ChunkOutOfRange(u32, u32),
619
620    #[error("Chunk in stream is out of order")]
621    OutOfOrderChunk,
622
623    #[error("No metadata in chunk")]
624    MissingMetadata,
625
626    #[error("Chunk not cached: {0}")]
627    ChunkNotCached(u32),
628
629    #[error("Background stream push task died")]
630    BackgroundTaskDied,
631}
632
633// ============== Conversion Implementations ============== //
634impl From<NetworkError> for Error {
635    fn from(e: NetworkError) -> Self {
636        Error::System(SystemError::Network(e))
637    }
638}
639
640impl From<StorageError> for Error {
641    fn from(e: StorageError) -> Self {
642        Error::System(SystemError::Storage(e))
643    }
644}
645
646impl From<ConvertError> for Error {
647    fn from(e: ConvertError) -> Self {
648        Error::System(SystemError::Storage(StorageError::Convert(e)))
649    }
650}
651
652impl From<FileError> for Error {
653    fn from(e: FileError) -> Self {
654        Error::System(SystemError::Storage(StorageError::File(e)))
655    }
656}
657
658impl From<SerializationError> for Error {
659    fn from(e: SerializationError) -> Self {
660        Error::System(SystemError::Serialization(e))
661    }
662}
663
664// // These allow direct conversion from prost errors to SystemError
665// impl From<prost::EncodeError> for SystemError {
666//     fn from(error: prost::EncodeError) -> Self {
667//         SystemError::Prost(ProstError::Encode(error))
668//     }
669// }
670
671// impl From<prost::DecodeError> for SystemError {
672//     fn from(error: prost::DecodeError) -> Self {
673//         SystemError::Prost(ProstError::Decode(error))
674//     }
675// }
676
677impl From<ProstError> for Error {
678    fn from(error: ProstError) -> Self {
679        Error::System(SystemError::Prost(error))
680    }
681}
682
683// ===== Consensus Error conversions =====
684
685impl From<StateTransitionError> for Error {
686    fn from(e: StateTransitionError) -> Self {
687        Error::Consensus(ConsensusError::StateTransition(e))
688    }
689}
690
691impl From<ElectionError> for Error {
692    fn from(e: ElectionError) -> Self {
693        Error::Consensus(ConsensusError::Election(e))
694    }
695}
696
697impl From<ReplicationError> for Error {
698    fn from(e: ReplicationError) -> Self {
699        Error::Consensus(ConsensusError::Replication(e))
700    }
701}
702
703impl From<MembershipError> for Error {
704    fn from(e: MembershipError) -> Self {
705        Error::Consensus(ConsensusError::Membership(e))
706    }
707}
708
709// ===== Network sub-error conversions =====
710impl From<ReadSendError> for Error {
711    fn from(e: ReadSendError) -> Self {
712        Error::System(SystemError::Network(NetworkError::ReadSend(e)))
713    }
714}
715
716impl From<WriteSendError> for Error {
717    fn from(e: WriteSendError) -> Self {
718        Error::System(SystemError::Network(NetworkError::WriteSend(e)))
719    }
720}
721
722impl From<tonic::transport::Error> for Error {
723    fn from(err: tonic::transport::Error) -> Self {
724        NetworkError::TonicError(Box::new(err)).into()
725    }
726}
727
728impl From<JoinError> for Error {
729    fn from(err: JoinError) -> Self {
730        NetworkError::TaskFailed(err).into()
731    }
732}
733
734impl From<SnapshotError> for Error {
735    fn from(e: SnapshotError) -> Self {
736        Error::Consensus(ConsensusError::Snapshot(e))
737    }
738}
739
740impl From<ReadIndexError> for Error {
741    fn from(e: ReadIndexError) -> Self {
742        Error::Consensus(ConsensusError::Replication(match e {
743            ReadIndexError::NotLeader => ReplicationError::NotLeader { leader_id: None },
744            ReadIndexError::Timeout { timeout_ms } => ReplicationError::RpcTimeout {
745                duration: timeout_ms,
746            },
747            ReadIndexError::VerificationFailed { reason: _ } => ReplicationError::QuorumNotReached,
748        }))
749    }
750}
751
752impl From<IdAllocationError> for Error {
753    fn from(e: IdAllocationError) -> Self {
754        StorageError::IdAllocation(e).into()
755    }
756}
757
758impl From<std::io::Error> for Error {
759    fn from(e: std::io::Error) -> Self {
760        StorageError::IoError(e).into()
761    }
762}