1use std::path::PathBuf;
7use std::time::Duration;
8
9use config::ConfigError;
10use tokio::task::JoinError;
11
12#[doc(hidden)]
13pub type Result<T> = std::result::Result<T, Error>;
14
15#[derive(Debug, thiserror::Error)]
16pub enum Error {
17 #[error(transparent)]
19 System(#[from] SystemError),
20
21 #[error(transparent)]
23 Config(#[from] ConfigError),
24
25 #[error(transparent)]
27 Consensus(#[from] ConsensusError),
28
29 #[error("Fatal error: {0}")]
31 Fatal(String),
32}
33
34#[derive(Debug, thiserror::Error)]
35pub enum ConsensusError {
36 #[error(transparent)]
38 StateTransition(#[from] StateTransitionError),
39
40 #[error(transparent)]
42 Election(#[from] ElectionError),
43
44 #[error(transparent)]
46 Replication(#[from] ReplicationError),
47
48 #[error(transparent)]
50 Membership(#[from] MembershipError),
51
52 #[error(transparent)]
54 Snapshot(#[from] SnapshotError),
55
56 #[error("Operation requires {required_role} role but current role is {current_role}")]
58 RoleViolation {
59 current_role: &'static str,
60 required_role: &'static str,
61 context: String,
62 },
63}
64
65#[derive(Debug, thiserror::Error)]
66#[doc(hidden)]
67pub enum StateTransitionError {
68 #[error("Not enough votes to transition to leader.")]
69 NotEnoughVotes,
70
71 #[error("Invalid state transition.")]
72 InvalidTransition,
73
74 #[error("Lock error.")]
75 LockError,
76}
77
78#[derive(Debug, thiserror::Error)]
79pub enum NetworkError {
80 #[error("Service unavailable: {0}")]
82 ServiceUnavailable(String),
83
84 #[error("Connection timeout to {node_id} after {duration:?}")]
86 Timeout { node_id: u32, duration: Duration },
87
88 #[error("Network unreachable: {source}")]
90 Unreachable {
91 source: Box<dyn std::error::Error + Send + Sync>,
92 },
93
94 #[error("Socket connect failed error: {0}")]
96 ConnectError(String),
97
98 #[error("Retry timeout after {0:?}")]
100 RetryTimeoutError(Duration),
101
102 #[error("TLS handshake failed")]
104 TlsHandshakeFailure,
105
106 #[error("Request list for {request_type} contains no peers")]
108 EmptyPeerList { request_type: &'static str },
109
110 #[error("Peer({0}) connection not found")]
112 PeerConnectionNotFound(u32),
113
114 #[error("Peer closed the channel")]
116 ResponseChannelClosed,
117
118 #[error("Peer({0}) address not found")]
120 PeerAddressNotFound(u32),
121
122 #[error("Invalid URI format: {0}")]
124 InvalidURI(String),
125
126 #[error("Failed to send {request_type} request")]
128 RequestSendFailure {
129 request_type: &'static str,
130 #[source]
131 source: Box<tonic::transport::Error>,
132 },
133
134 #[error("TCP keepalive configuration error")]
136 TcpKeepaliveError,
137
138 #[error("HTTP/2 keepalive configuration error")]
140 Http2KeepaliveError,
141
142 #[error(transparent)]
144 TonicError(#[from] Box<tonic::transport::Error>),
145
146 #[error(transparent)]
148 TonicStatusError(#[from] Box<tonic::Status>),
149
150 #[error("Failed to send read request: {0}")]
151 ReadSend(#[from] ReadSendError),
152
153 #[error("Failed to send write request: {0}")]
154 WriteSend(#[from] WriteSendError),
155
156 #[error("Background task failed: {0}")]
157 TaskFailed(#[from] JoinError),
158
159 #[error("{0}")]
160 TaskBackoffFailed(String),
161
162 #[error("{0}")]
163 SingalSendFailed(String),
164
165 #[error("{0}")]
166 SingalReceiveFailed(String),
167
168 #[error("New node join cluster failed: {0}")]
174 JoinFailed(String),
175
176 #[error("Network timeout: {0}")]
177 GlobalTimeout(String),
178}
179
180#[derive(Debug, thiserror::Error)]
181pub enum StorageError {
182 #[error(transparent)]
184 IoError(#[from] std::io::Error),
185
186 #[error("Error occurred at path: {path}")]
188 PathError {
189 path: PathBuf, source: std::io::Error,
191 },
192
193 #[error(transparent)]
195 BincodeError(#[from] bincode::Error),
196
197 #[error("State Machine error: {0}")]
199 StateMachineError(String),
200
201 #[error("Log storage failure: {0}")]
203 LogStorage(String),
204
205 #[error("Data corruption detected at {location}")]
210 DataCorruption { location: String },
211
212 #[error("Configuration storage error: {0}")]
214 ConfigStorage(String),
215
216 #[error("Embedded database error: {0}")]
218 DbError(String),
219
220 #[error("Value convert failed")]
222 Convert(#[from] ConvertError),
223
224 #[error("File errors")]
226 File(#[from] FileError),
227
228 #[error("Serialization error: {0}")]
230 SerializationError(String),
231
232 #[error(transparent)]
234 IdAllocation(#[from] IdAllocationError),
235
236 #[error("Feature not enabled: {0}")]
242 FeatureNotEnabled(String),
243
244 #[error("State machine not serving: {0}")]
250 NotServing(String),
251}
252
253#[derive(Debug, thiserror::Error)]
254pub enum IdAllocationError {
255 #[error("ID allocation overflow: {start} > {end}")]
257 Overflow { start: u64, end: u64 },
258
259 #[error("Invalid ID range: {start}..={end}")]
261 InvalidRange { start: u64, end: u64 },
262
263 #[error("No available IDs")]
265 NoIdsAvailable,
266}
267
268#[derive(Debug, thiserror::Error)]
269pub enum FileError {
270 #[error("Path does not exist: {0}")]
271 NotFound(String),
272 #[error("Path is a directory: {0}")]
273 IsDirectory(String),
274 #[error("File is busy: {0}")]
275 Busy(String),
276 #[error("Insufficient permissions: {0}")]
277 PermissionDenied(String),
278 #[error("File is occupied: {0}")]
279 FileBusy(String),
280 #[error("Invalid path: {0}")]
281 InvalidPath(String),
282 #[error("Too small: {0}")]
283 TooSmall(u64),
284 #[error("Invalid extension: {0}")]
285 InvalidExt(String),
286 #[error("Invalid GZIP header: {0}")]
287 InvalidGzipHeader(String),
288 #[error("Unknown IO error: {0}")]
289 UnknownIo(String),
290}
291
292#[derive(Debug, thiserror::Error)]
294pub enum ConvertError {
295 #[error("invalid byte length: expected 8 bytes, received {0} bytes")]
299 InvalidLength(usize),
300
301 #[error("conversion failure: {0}")]
305 ConversionFailure(String),
306}
307
308#[derive(Debug, thiserror::Error)]
309pub enum ReadSendError {
310 #[error("Network timeout")]
311 Timeout(#[from] tokio::time::error::Elapsed),
312
313 #[error("Connection failed")]
314 Connection(#[from] tonic::transport::Error),
315}
316
317#[derive(Debug, thiserror::Error)]
318pub enum WriteSendError {
319 #[error("Not cluster leader")]
320 NotLeader,
321
322 #[error("Network unreachable")]
323 Unreachable,
324
325 #[error("Payload too large")]
326 PayloadExceeded,
327}
328
329#[derive(Debug, thiserror::Error)]
330pub enum SystemError {
331 #[error("Network error: {0}")]
333 Network(#[from] NetworkError),
334
335 #[error("Storage operation failed")]
337 Storage(#[from] StorageError),
338
339 #[error("Serialization error")]
341 Serialization(#[from] SerializationError),
342
343 #[error("Protobuf operation failed: {0}")]
345 Prost(#[from] ProstError),
346
347 #[error("Node failed to start: {0}")]
349 NodeStartFailed(String),
350
351 #[error("General server error: {0}")]
352 GeneralServer(String),
353
354 #[error("Internal server error")]
355 ServerUnavailable,
356
357 #[error("State machine does not support lease management")]
359 LeaseNotSupported,
360}
361
362#[derive(Debug, thiserror::Error)]
364pub enum SerializationError {
365 #[error("Bincode serialization failed: {0}")]
366 Bincode(#[from] bincode::Error),
367}
368
369#[derive(Debug, thiserror::Error)]
371pub enum ProstError {
372 #[error("Encoding failed: {0}")]
373 Encode(#[from] prost::EncodeError),
374
375 #[error("Decoding failed: {0}")]
376 Decode(#[from] prost::DecodeError),
377}
378
379#[derive(Debug, thiserror::Error)]
380pub enum ElectionError {
381 #[error("Election failed: {0}")]
383 Failed(String),
384
385 #[error("Found higher term(={0}) during election process")]
387 HigherTerm(u64),
388
389 #[error("Term conflict (current: {current}, received: {received})")]
391 TermConflict { current: u64, received: u64 },
392
393 #[error("Log conflict at index {index} (expected: {expected_term}, actual: {actual_term})")]
395 LogConflict {
396 index: u64,
397 expected_term: u64,
398 actual_term: u64,
399 },
400
401 #[error("Quorum not reached (required: {required}, succeed: {succeed})")]
403 QuorumFailure { required: usize, succeed: usize },
404
405 #[error("Leadership consensus error: {0}")]
407 LeadershipConsensus(String),
408
409 #[error("No voting member found for candidate {candidate_id}")]
411 NoVotingMemberFound { candidate_id: u32 },
412}
413
414#[derive(Debug, thiserror::Error)]
415pub enum ReplicationError {
416 #[error("Found higher term(={0}) during replication process")]
418 HigherTerm(u64),
419
420 #[error("Quorum not reached for log replication")]
422 QuorumNotReached,
423
424 #[error("Timeout to receive majority response")]
426 QuorumTimeout,
427
428 #[error("Node {node_id} unreachable for replication")]
430 NodeUnreachable { node_id: u32 },
431
432 #[error("RPC timeout after {duration}ms")]
434 RpcTimeout { duration: u64 },
435
436 #[error("No peer mapping for leader {leader_id}")]
438 NoPeerFound { leader_id: u32 },
439
440 #[error("Log conflict at index {index} (expected term {expected_term}, actual {actual_term})")]
442 LogConflict {
443 index: u64,
444 expected_term: u64,
445 actual_term: u64,
446 },
447
448 #[error("Replication requires leader role (known leader: {leader_id:?})")]
450 NotLeader { leader_id: Option<u32> },
451}
452
453#[derive(Debug, thiserror::Error, Clone)]
455pub enum ReadIndexError {
456 #[error("Not leader - cannot serve linearizable read")]
458 NotLeader,
459
460 #[error("Verification timeout after {timeout_ms}ms")]
462 Timeout { timeout_ms: u64 },
463
464 #[error("Verification failed: {reason}")]
466 VerificationFailed { reason: String },
467}
468
469#[derive(Debug, thiserror::Error)]
470pub enum MembershipError {
471 #[error("Membership update consensus failure: {0}")]
473 ConfigChangeUpdateFailed(String),
474
475 #[error("Membership changes require leader role")]
477 NotLeader,
478
479 #[error("No leader information available")]
481 NoLeaderFound,
482
483 #[error("Only Learner can join cluster")]
485 NotLearner,
486
487 #[error("Cluster bootstrap not completed")]
489 ClusterIsNotReady,
490
491 #[error("Cluster connection setup failed: {0}")]
493 SetupClusterConnectionFailed(String),
494
495 #[error("Metadata missing for node {node_id} in cluster config")]
497 NoMetadataFoundForNode { node_id: u32 },
498
499 #[error("No reachable peers found in cluster membership")]
501 NoPeersAvailable,
502
503 #[error("Node({0}) already been added into cluster config.")]
505 NodeAlreadyExists(u32),
506
507 #[error("To be removed node({0}) is leader.")]
509 RemoveNodeIsLeader(u32),
510
511 #[error("Cannot promote node {node_id}: current role is {role} (expected LEARNER)")]
512 InvalidPromotion { node_id: u32, role: i32 },
513
514 #[error("Invalid membership change request")]
515 InvalidChangeRequest,
516
517 #[error("Commit Timeout")]
518 CommitTimeout,
519
520 #[error("Learner({0}) join cluster failed.")]
521 JoinClusterFailed(u32),
522
523 #[error("Join cluster error: {0}")]
524 JoinClusterError(String),
525
526 #[error("Not leader")]
527 NoLeader,
528
529 #[error("Mark leader id failed: {0}")]
530 MarkLeaderIdFailed(String),
531
532 #[error(
534 "BUG: cluster_metadata not initialized! Leader must call init_cluster_metadata() after election"
535 )]
536 ClusterMetadataNotInitialized,
537}
538
539#[derive(Debug, thiserror::Error)]
540pub enum SnapshotError {
541 #[error("Snapshot receiver lagging, dropping chunk")]
542 Backpressure,
543
544 #[error("Install snapshot RPC request been rejected, last_chunk={last_chunk}")]
546 Rejected { last_chunk: u32 },
547
548 #[error("Install snapshot RPC request been rejected")]
549 RemoteRejection,
550
551 #[error("Install snapshot RPC request failed")]
553 TransferFailed,
554
555 #[error("Install snapshot RPC request timeout")]
557 TransferTimeout,
558
559 #[error("Snapshot operation failed: {0}")]
561 OperationFailed(String),
562
563 #[error("Snapshot is outdated")]
565 Outdated,
566
567 #[error("Snapshot file checksum mismatch")]
569 ChecksumMismatch,
570
571 #[error("Invalid snapshot")]
573 InvalidSnapshot,
574
575 #[error("Invalid chunk sequence")]
577 InvalidChunkSequence,
578
579 #[error("Stream receiver disconnected")]
581 ReceiverDisconnected,
582
583 #[error("Invalid first snapshot stream chunk")]
584 InvalidFirstChunk,
585
586 #[error("Empty snapshot stream chunk")]
587 EmptySnapshot,
588
589 #[error("Incomplete snapshot error")]
590 IncompleteSnapshot,
591
592 #[error("Requested chunk {0} out of range (max: {1})")]
593 ChunkOutOfRange(u32, u32),
594
595 #[error("Chunk in stream is out of order")]
596 OutOfOrderChunk,
597
598 #[error("No metadata in chunk")]
599 MissingMetadata,
600
601 #[error("Chunk not cached: {0}")]
602 ChunkNotCached(u32),
603
604 #[error("Background stream push task died")]
605 BackgroundTaskDied,
606}
607
608impl From<NetworkError> for Error {
610 fn from(e: NetworkError) -> Self {
611 Error::System(SystemError::Network(e))
612 }
613}
614
615impl From<StorageError> for Error {
616 fn from(e: StorageError) -> Self {
617 Error::System(SystemError::Storage(e))
618 }
619}
620
621impl From<ConvertError> for Error {
622 fn from(e: ConvertError) -> Self {
623 Error::System(SystemError::Storage(StorageError::Convert(e)))
624 }
625}
626
627impl From<FileError> for Error {
628 fn from(e: FileError) -> Self {
629 Error::System(SystemError::Storage(StorageError::File(e)))
630 }
631}
632
633impl From<SerializationError> for Error {
634 fn from(e: SerializationError) -> Self {
635 Error::System(SystemError::Serialization(e))
636 }
637}
638
639impl From<ProstError> for Error {
653 fn from(error: ProstError) -> Self {
654 Error::System(SystemError::Prost(error))
655 }
656}
657
658impl From<StateTransitionError> for Error {
661 fn from(e: StateTransitionError) -> Self {
662 Error::Consensus(ConsensusError::StateTransition(e))
663 }
664}
665
666impl From<ElectionError> for Error {
667 fn from(e: ElectionError) -> Self {
668 Error::Consensus(ConsensusError::Election(e))
669 }
670}
671
672impl From<ReplicationError> for Error {
673 fn from(e: ReplicationError) -> Self {
674 Error::Consensus(ConsensusError::Replication(e))
675 }
676}
677
678impl From<MembershipError> for Error {
679 fn from(e: MembershipError) -> Self {
680 Error::Consensus(ConsensusError::Membership(e))
681 }
682}
683
684impl From<ReadSendError> for Error {
686 fn from(e: ReadSendError) -> Self {
687 Error::System(SystemError::Network(NetworkError::ReadSend(e)))
688 }
689}
690
691impl From<WriteSendError> for Error {
692 fn from(e: WriteSendError) -> Self {
693 Error::System(SystemError::Network(NetworkError::WriteSend(e)))
694 }
695}
696
697impl From<tonic::transport::Error> for Error {
698 fn from(err: tonic::transport::Error) -> Self {
699 NetworkError::TonicError(Box::new(err)).into()
700 }
701}
702
703impl From<JoinError> for Error {
704 fn from(err: JoinError) -> Self {
705 NetworkError::TaskFailed(err).into()
706 }
707}
708
709impl From<SnapshotError> for Error {
710 fn from(e: SnapshotError) -> Self {
711 Error::Consensus(ConsensusError::Snapshot(e))
712 }
713}
714
715impl From<ReadIndexError> for Error {
716 fn from(e: ReadIndexError) -> Self {
717 Error::Consensus(ConsensusError::Replication(match e {
718 ReadIndexError::NotLeader => ReplicationError::NotLeader { leader_id: None },
719 ReadIndexError::Timeout { timeout_ms } => ReplicationError::RpcTimeout {
720 duration: timeout_ms,
721 },
722 ReadIndexError::VerificationFailed { reason: _ } => ReplicationError::QuorumNotReached,
723 }))
724 }
725}
726
727impl From<IdAllocationError> for Error {
728 fn from(e: IdAllocationError) -> Self {
729 StorageError::IdAllocation(e).into()
730 }
731}
732
733impl From<std::io::Error> for Error {
734 fn from(e: std::io::Error) -> Self {
735 StorageError::IoError(e).into()
736 }
737}