1use std::path::PathBuf;
7use std::time::Duration;
8
9use config::ConfigError;
10use tokio::task::JoinError;
11
12#[doc(hidden)]
13pub type Result<T> = std::result::Result<T, Error>;
14
15#[derive(Debug, thiserror::Error)]
16pub enum Error {
17 #[error(transparent)]
19 System(#[from] SystemError),
20
21 #[error(transparent)]
23 Config(#[from] ConfigError),
24
25 #[error(transparent)]
27 Consensus(#[from] ConsensusError),
28
29 #[error("Fatal error: {0}")]
31 Fatal(String),
32}
33
34impl Error {
35 pub fn is_fatal(&self) -> bool {
40 matches!(
41 self,
42 Error::Fatal(_)
43 | Error::System(SystemError::Storage(StorageError::DataCorruption { .. }))
44 )
45 }
46}
47
48#[derive(Debug, thiserror::Error)]
49pub enum ConsensusError {
50 #[error(transparent)]
52 StateTransition(#[from] StateTransitionError),
53
54 #[error(transparent)]
56 Election(#[from] ElectionError),
57
58 #[error(transparent)]
60 Replication(#[from] ReplicationError),
61
62 #[error(transparent)]
64 Membership(#[from] MembershipError),
65
66 #[error(transparent)]
68 Snapshot(#[from] SnapshotError),
69
70 #[error("Operation requires {required_role} role but current role is {current_role}")]
72 RoleViolation {
73 current_role: &'static str,
74 required_role: &'static str,
75 context: String,
76 },
77}
78
79#[derive(Debug, thiserror::Error)]
80#[doc(hidden)]
81pub enum StateTransitionError {
82 #[error("Not enough votes to transition to leader.")]
83 NotEnoughVotes,
84
85 #[error("Invalid state transition.")]
86 InvalidTransition,
87
88 #[error("Lock error.")]
89 LockError,
90}
91
92#[derive(Debug, thiserror::Error)]
93pub enum NetworkError {
94 #[error("Service unavailable: {0}")]
96 ServiceUnavailable(String),
97
98 #[error("Connection timeout to {node_id} after {duration:?}")]
100 Timeout { node_id: u32, duration: Duration },
101
102 #[error("Network unreachable: {source}")]
104 Unreachable {
105 source: Box<dyn std::error::Error + Send + Sync>,
106 },
107
108 #[error("Socket connect failed error: {0}")]
110 ConnectError(String),
111
112 #[error("Retry timeout after {0:?}")]
114 RetryTimeoutError(Duration),
115
116 #[error("TLS handshake failed")]
118 TlsHandshakeFailure,
119
120 #[error("Request list for {request_type} contains no peers")]
122 EmptyPeerList { request_type: &'static str },
123
124 #[error("Peer({0}) connection not found")]
126 PeerConnectionNotFound(u32),
127
128 #[error("Peer closed the channel")]
130 ResponseChannelClosed,
131
132 #[error("Peer({0}) address not found")]
134 PeerAddressNotFound(u32),
135
136 #[error("Invalid URI format: {0}")]
138 InvalidURI(String),
139
140 #[error("Failed to send {request_type} request")]
142 RequestSendFailure {
143 request_type: &'static str,
144 #[source]
145 source: Box<tonic::transport::Error>,
146 },
147
148 #[error("TCP keepalive configuration error")]
150 TcpKeepaliveError,
151
152 #[error("HTTP/2 keepalive configuration error")]
154 Http2KeepaliveError,
155
156 #[error(transparent)]
158 TonicError(#[from] Box<tonic::transport::Error>),
159
160 #[error(transparent)]
162 TonicStatusError(#[from] Box<tonic::Status>),
163
164 #[error("Failed to send read request: {0}")]
165 ReadSend(#[from] ReadSendError),
166
167 #[error("Failed to send write request: {0}")]
168 WriteSend(#[from] WriteSendError),
169
170 #[error("Background task failed: {0}")]
171 TaskFailed(#[from] JoinError),
172
173 #[error("{0}")]
174 TaskBackoffFailed(String),
175
176 #[error("{0}")]
177 SingalSendFailed(String),
178
179 #[error("{0}")]
180 SingalReceiveFailed(String),
181
182 #[error("New node join cluster failed: {0}")]
188 JoinFailed(String),
189
190 #[error("Network timeout: {0}")]
191 GlobalTimeout(String),
192}
193
194#[derive(Debug, thiserror::Error)]
195pub enum StorageError {
196 #[error(transparent)]
198 IoError(#[from] std::io::Error),
199
200 #[error("Error occurred at path: {path}")]
202 PathError {
203 path: PathBuf, source: std::io::Error,
205 },
206
207 #[error(transparent)]
209 BincodeError(#[from] bincode::Error),
210
211 #[error("State Machine error: {0}")]
213 StateMachineError(String),
214
215 #[error("Log storage failure: {0}")]
217 LogStorage(String),
218
219 #[error("Data corruption detected at {location}")]
224 DataCorruption { location: String },
225
226 #[error("Configuration storage error: {0}")]
228 ConfigStorage(String),
229
230 #[error("Embedded database error: {0}")]
232 DbError(String),
233
234 #[error("Value convert failed")]
236 Convert(#[from] ConvertError),
237
238 #[error("File errors")]
240 File(#[from] FileError),
241
242 #[error("Serialization error: {0}")]
244 SerializationError(String),
245
246 #[error(transparent)]
248 IdAllocation(#[from] IdAllocationError),
249
250 #[error("Feature not enabled: {0}")]
256 FeatureNotEnabled(String),
257
258 #[error("State machine not serving: {0}")]
264 NotServing(String),
265}
266
267#[doc(hidden)]
268#[derive(Debug, thiserror::Error)]
269pub enum IdAllocationError {
270 #[error("ID allocation overflow: {start} > {end}")]
272 Overflow { start: u64, end: u64 },
273
274 #[error("Invalid ID range: {start}..={end}")]
276 InvalidRange { start: u64, end: u64 },
277
278 #[error("No available IDs")]
280 NoIdsAvailable,
281}
282
283#[doc(hidden)]
284#[derive(Debug, thiserror::Error)]
285pub enum FileError {
286 #[error("Path does not exist: {0}")]
287 NotFound(String),
288 #[error("Path is a directory: {0}")]
289 IsDirectory(String),
290 #[error("File is busy: {0}")]
291 Busy(String),
292 #[error("Insufficient permissions: {0}")]
293 PermissionDenied(String),
294 #[error("File is occupied: {0}")]
295 FileBusy(String),
296 #[error("Invalid path: {0}")]
297 InvalidPath(String),
298 #[error("Too small: {0}")]
299 TooSmall(u64),
300 #[error("Invalid extension: {0}")]
301 InvalidExt(String),
302 #[error("Invalid GZIP header: {0}")]
303 InvalidGzipHeader(String),
304 #[error("Unknown IO error: {0}")]
305 UnknownIo(String),
306}
307
308#[doc(hidden)]
310#[derive(Debug, thiserror::Error)]
311pub enum ConvertError {
312 #[error("invalid byte length: expected 8 bytes, received {0} bytes")]
316 InvalidLength(usize),
317
318 #[error("conversion failure: {0}")]
322 ConversionFailure(String),
323}
324
325#[doc(hidden)]
326#[derive(Debug, thiserror::Error)]
327pub enum ReadSendError {
328 #[error("Network timeout")]
329 Timeout(#[from] tokio::time::error::Elapsed),
330
331 #[error("Connection failed")]
332 Connection(#[from] tonic::transport::Error),
333}
334
335#[doc(hidden)]
336#[derive(Debug, thiserror::Error)]
337pub enum WriteSendError {
338 #[error("Not cluster leader")]
339 NotLeader,
340
341 #[error("Network unreachable")]
342 Unreachable,
343
344 #[error("Payload too large")]
345 PayloadExceeded,
346}
347
348#[derive(Debug, thiserror::Error)]
349pub enum SystemError {
350 #[error("Network error: {0}")]
352 Network(#[from] NetworkError),
353
354 #[error("Storage operation failed")]
356 Storage(#[from] StorageError),
357
358 #[error("Serialization error")]
360 Serialization(#[from] SerializationError),
361
362 #[error("Protobuf operation failed: {0}")]
364 Prost(#[from] ProstError),
365
366 #[error("Node failed to start: {0}")]
368 NodeStartFailed(String),
369
370 #[error("General server error: {0}")]
371 GeneralServer(String),
372
373 #[error("Internal server error")]
374 ServerUnavailable,
375
376 #[error("State machine does not support lease management")]
378 LeaseNotSupported,
379}
380
381#[doc(hidden)]
383#[derive(Debug, thiserror::Error)]
384pub enum SerializationError {
385 #[error("Bincode serialization failed: {0}")]
386 Bincode(#[from] bincode::Error),
387}
388
389#[doc(hidden)]
391#[derive(Debug, thiserror::Error)]
392pub enum ProstError {
393 #[error("Encoding failed: {0}")]
394 Encode(#[from] prost::EncodeError),
395
396 #[error("Decoding failed: {0}")]
397 Decode(#[from] prost::DecodeError),
398}
399
400#[doc(hidden)]
401#[derive(Debug, thiserror::Error)]
402pub enum ElectionError {
403 #[error("Election failed: {0}")]
405 Failed(String),
406
407 #[error("Found higher term(={0}) during election process")]
409 HigherTerm(u64),
410
411 #[error("Term conflict (current: {current}, received: {received})")]
413 TermConflict { current: u64, received: u64 },
414
415 #[error("Log conflict at index {index} (expected: {expected_term}, actual: {actual_term})")]
417 LogConflict {
418 index: u64,
419 expected_term: u64,
420 actual_term: u64,
421 },
422
423 #[error("Quorum not reached (required: {required}, succeed: {succeed})")]
425 QuorumFailure { required: usize, succeed: usize },
426
427 #[error("Leadership consensus error: {0}")]
429 LeadershipConsensus(String),
430
431 #[error("No voting member found for candidate {candidate_id}")]
433 NoVotingMemberFound { candidate_id: u32 },
434}
435
436#[doc(hidden)]
437#[derive(Debug, thiserror::Error)]
438pub enum ReplicationError {
439 #[error("Found higher term(={0}) during replication process")]
441 HigherTerm(u64),
442
443 #[error("Quorum not reached for log replication")]
445 QuorumNotReached,
446
447 #[error("Timeout to receive majority response")]
449 QuorumTimeout,
450
451 #[error("Node {node_id} unreachable for replication")]
453 NodeUnreachable { node_id: u32 },
454
455 #[error("RPC timeout after {duration}ms")]
457 RpcTimeout { duration: u64 },
458
459 #[error("No peer mapping for leader {leader_id}")]
461 NoPeerFound { leader_id: u32 },
462
463 #[error("Log conflict at index {index} (expected term {expected_term}, actual {actual_term})")]
465 LogConflict {
466 index: u64,
467 expected_term: u64,
468 actual_term: u64,
469 },
470
471 #[error("Replication requires leader role (known leader: {leader_id:?})")]
473 NotLeader { leader_id: Option<u32> },
474}
475
476#[doc(hidden)]
478#[derive(Debug, thiserror::Error, Clone)]
479pub enum ReadIndexError {
480 #[error("Not leader - cannot serve linearizable read")]
482 NotLeader,
483
484 #[error("Verification timeout after {timeout_ms}ms")]
486 Timeout { timeout_ms: u64 },
487
488 #[error("Verification failed: {reason}")]
490 VerificationFailed { reason: String },
491}
492
493#[derive(Debug, thiserror::Error)]
494pub enum MembershipError {
495 #[error("Membership update consensus failure: {0}")]
497 ConfigChangeUpdateFailed(String),
498
499 #[error("Membership changes require leader role")]
501 NotLeader,
502
503 #[error("No leader information available")]
505 NoLeaderFound,
506
507 #[error("Only Learner can join cluster")]
509 NotLearner,
510
511 #[error("Cluster bootstrap not completed")]
513 ClusterIsNotReady,
514
515 #[error("Cluster connection setup failed: {0}")]
517 SetupClusterConnectionFailed(String),
518
519 #[error("Metadata missing for node {node_id} in cluster config")]
521 NoMetadataFoundForNode { node_id: u32 },
522
523 #[error("No reachable peers found in cluster membership")]
525 NoPeersAvailable,
526
527 #[error("Node({0}) already been added into cluster config.")]
529 NodeAlreadyExists(u32),
530
531 #[error("To be removed node({0}) is leader.")]
533 RemoveNodeIsLeader(u32),
534
535 #[error("Cannot promote node {node_id}: current role is {role} (expected LEARNER)")]
536 InvalidPromotion { node_id: u32, role: i32 },
537
538 #[error("Invalid membership change request")]
539 InvalidChangeRequest,
540
541 #[error("Commit Timeout")]
542 CommitTimeout,
543
544 #[error("Learner({0}) join cluster failed.")]
545 JoinClusterFailed(u32),
546
547 #[error("Join cluster error: {0}")]
548 JoinClusterError(String),
549
550 #[error("Not leader")]
551 NoLeader,
552
553 #[error("Mark leader id failed: {0}")]
554 MarkLeaderIdFailed(String),
555
556 #[error(
558 "BUG: cluster_metadata not initialized! Leader must call init_cluster_metadata() after election"
559 )]
560 ClusterMetadataNotInitialized,
561}
562
563#[doc(hidden)]
564#[derive(Debug, thiserror::Error)]
565pub enum SnapshotError {
566 #[error("Snapshot receiver lagging, dropping chunk")]
567 Backpressure,
568
569 #[error("Install snapshot RPC request been rejected, last_chunk={last_chunk}")]
571 Rejected { last_chunk: u32 },
572
573 #[error("Install snapshot RPC request been rejected")]
574 RemoteRejection,
575
576 #[error("Install snapshot RPC request failed")]
578 TransferFailed,
579
580 #[error("Install snapshot RPC request timeout")]
582 TransferTimeout,
583
584 #[error("Snapshot operation failed: {0}")]
586 OperationFailed(String),
587
588 #[error("Snapshot is outdated")]
590 Outdated,
591
592 #[error("Snapshot file checksum mismatch")]
594 ChecksumMismatch,
595
596 #[error("Invalid snapshot")]
598 InvalidSnapshot,
599
600 #[error("Invalid chunk sequence")]
602 InvalidChunkSequence,
603
604 #[error("Stream receiver disconnected")]
606 ReceiverDisconnected,
607
608 #[error("Invalid first snapshot stream chunk")]
609 InvalidFirstChunk,
610
611 #[error("Empty snapshot stream chunk")]
612 EmptySnapshot,
613
614 #[error("Incomplete snapshot error")]
615 IncompleteSnapshot,
616
617 #[error("Requested chunk {0} out of range (max: {1})")]
618 ChunkOutOfRange(u32, u32),
619
620 #[error("Chunk in stream is out of order")]
621 OutOfOrderChunk,
622
623 #[error("No metadata in chunk")]
624 MissingMetadata,
625
626 #[error("Chunk not cached: {0}")]
627 ChunkNotCached(u32),
628
629 #[error("Background stream push task died")]
630 BackgroundTaskDied,
631}
632
633impl From<NetworkError> for Error {
635 fn from(e: NetworkError) -> Self {
636 Error::System(SystemError::Network(e))
637 }
638}
639
640impl From<StorageError> for Error {
641 fn from(e: StorageError) -> Self {
642 Error::System(SystemError::Storage(e))
643 }
644}
645
646impl From<ConvertError> for Error {
647 fn from(e: ConvertError) -> Self {
648 Error::System(SystemError::Storage(StorageError::Convert(e)))
649 }
650}
651
652impl From<FileError> for Error {
653 fn from(e: FileError) -> Self {
654 Error::System(SystemError::Storage(StorageError::File(e)))
655 }
656}
657
658impl From<SerializationError> for Error {
659 fn from(e: SerializationError) -> Self {
660 Error::System(SystemError::Serialization(e))
661 }
662}
663
664impl From<ProstError> for Error {
678 fn from(error: ProstError) -> Self {
679 Error::System(SystemError::Prost(error))
680 }
681}
682
683impl From<StateTransitionError> for Error {
686 fn from(e: StateTransitionError) -> Self {
687 Error::Consensus(ConsensusError::StateTransition(e))
688 }
689}
690
691impl From<ElectionError> for Error {
692 fn from(e: ElectionError) -> Self {
693 Error::Consensus(ConsensusError::Election(e))
694 }
695}
696
697impl From<ReplicationError> for Error {
698 fn from(e: ReplicationError) -> Self {
699 Error::Consensus(ConsensusError::Replication(e))
700 }
701}
702
703impl From<MembershipError> for Error {
704 fn from(e: MembershipError) -> Self {
705 Error::Consensus(ConsensusError::Membership(e))
706 }
707}
708
709impl From<ReadSendError> for Error {
711 fn from(e: ReadSendError) -> Self {
712 Error::System(SystemError::Network(NetworkError::ReadSend(e)))
713 }
714}
715
716impl From<WriteSendError> for Error {
717 fn from(e: WriteSendError) -> Self {
718 Error::System(SystemError::Network(NetworkError::WriteSend(e)))
719 }
720}
721
722impl From<tonic::transport::Error> for Error {
723 fn from(err: tonic::transport::Error) -> Self {
724 NetworkError::TonicError(Box::new(err)).into()
725 }
726}
727
728impl From<JoinError> for Error {
729 fn from(err: JoinError) -> Self {
730 NetworkError::TaskFailed(err).into()
731 }
732}
733
734impl From<SnapshotError> for Error {
735 fn from(e: SnapshotError) -> Self {
736 Error::Consensus(ConsensusError::Snapshot(e))
737 }
738}
739
740impl From<ReadIndexError> for Error {
741 fn from(e: ReadIndexError) -> Self {
742 Error::Consensus(ConsensusError::Replication(match e {
743 ReadIndexError::NotLeader => ReplicationError::NotLeader { leader_id: None },
744 ReadIndexError::Timeout { timeout_ms } => ReplicationError::RpcTimeout {
745 duration: timeout_ms,
746 },
747 ReadIndexError::VerificationFailed { reason: _ } => ReplicationError::QuorumNotReached,
748 }))
749 }
750}
751
752impl From<IdAllocationError> for Error {
753 fn from(e: IdAllocationError) -> Self {
754 StorageError::IdAllocation(e).into()
755 }
756}
757
758impl From<std::io::Error> for Error {
759 fn from(e: std::io::Error) -> Self {
760 StorageError::IoError(e).into()
761 }
762}