1use config::ConfigError;
7use std::path::PathBuf;
8use std::time::Duration;
9use tokio::task::JoinError;
10
11#[doc(hidden)]
12pub type Result<T> = std::result::Result<T, Error>;
13
14#[derive(Debug, thiserror::Error)]
15pub enum Error {
16 #[error(transparent)]
18 System(#[from] SystemError),
19
20 #[error(transparent)]
22 Config(#[from] ConfigError),
23
24 #[error(transparent)]
26 Consensus(#[from] ConsensusError),
27
28 #[error("Fatal error: {0}")]
30 Fatal(String),
31}
32
33#[derive(Debug, thiserror::Error)]
34pub enum ConsensusError {
35 #[error(transparent)]
37 StateTransition(#[from] StateTransitionError),
38
39 #[error(transparent)]
41 Election(#[from] ElectionError),
42
43 #[error(transparent)]
45 Replication(#[from] ReplicationError),
46
47 #[error(transparent)]
49 Membership(#[from] MembershipError),
50
51 #[error(transparent)]
53 Snapshot(#[from] SnapshotError),
54
55 #[error("Operation requires {required_role} role but current role is {current_role}")]
57 RoleViolation {
58 current_role: &'static str,
59 required_role: &'static str,
60 context: String,
61 },
62}
63
64#[derive(Debug, thiserror::Error)]
65#[doc(hidden)]
66pub enum StateTransitionError {
67 #[error("Not enough votes to transition to leader.")]
68 NotEnoughVotes,
69
70 #[error("Invalid state transition.")]
71 InvalidTransition,
72
73 #[error("Lock error.")]
74 LockError,
75}
76
77#[derive(Debug, thiserror::Error)]
78pub enum NetworkError {
79 #[error("Service unavailable: {0}")]
81 ServiceUnavailable(String),
82
83 #[error("Connection timeout to {node_id} after {duration:?}")]
85 Timeout { node_id: u32, duration: Duration },
86
87 #[error("Network unreachable: {source}")]
89 Unreachable {
90 source: Box<dyn std::error::Error + Send + Sync>,
91 },
92
93 #[error("Socket connect failed error: {0}")]
95 ConnectError(String),
96
97 #[error("Retry timeout after {0:?}")]
99 RetryTimeoutError(Duration),
100
101 #[error("TLS handshake failed")]
103 TlsHandshakeFailure,
104
105 #[error("Request list for {request_type} contains no peers")]
107 EmptyPeerList { request_type: &'static str },
108
109 #[error("Peer({0}) connection not found")]
111 PeerConnectionNotFound(u32),
112
113 #[error("Peer({0}) address not found")]
115 PeerAddressNotFound(u32),
116
117 #[error("Invalid URI format: {0}")]
119 InvalidURI(String),
120
121 #[error("Failed to send {request_type} request")]
123 RequestSendFailure {
124 request_type: &'static str,
125 #[source]
126 source: Box<tonic::transport::Error>,
127 },
128
129 #[error("TCP keepalive configuration error")]
131 TcpKeepaliveError,
132
133 #[error("HTTP/2 keepalive configuration error")]
135 Http2KeepaliveError,
136
137 #[error(transparent)]
139 TonicError(#[from] Box<tonic::transport::Error>),
140
141 #[error(transparent)]
143 TonicStatusError(#[from] Box<tonic::Status>),
144
145 #[error("Failed to send read request: {0}")]
146 ReadSend(#[from] ReadSendError),
147
148 #[error("Failed to send write request: {0}")]
149 WriteSend(#[from] WriteSendError),
150
151 #[error("Background task failed: {0}")]
152 TaskFailed(#[from] JoinError),
153
154 #[error("{0}")]
155 TaskBackoffFailed(String),
156
157 #[error("{0}")]
158 SingalSendFailed(String),
159
160 #[error("{0}")]
161 SingalReceiveFailed(String),
162
163 #[error("New node join cluster failed: {0}")]
169 JoinFailed(String),
170
171 #[error("Network timeout: {0}")]
172 GlobalTimeout(String),
173}
174
175#[derive(Debug, thiserror::Error)]
176pub enum StorageError {
177 #[error(transparent)]
179 IoError(#[from] std::io::Error),
180
181 #[error("Error occurred at path: {path}")]
183 PathError {
184 path: PathBuf, source: std::io::Error,
186 },
187
188 #[error(transparent)]
190 BincodeError(#[from] bincode::Error),
191
192 #[error("State Machine error: {0}")]
194 StateMachineError(String),
195
196 #[error("Log storage failure: {0}")]
198 LogStorage(String),
199
200 #[error("Data corruption detected at {location}")]
205 DataCorruption { location: String },
206
207 #[error("Configuration storage error: {0}")]
209 ConfigStorage(String),
210
211 #[error("Embedded database error: {0}")]
213 DbError(String),
214
215 #[error("Value convert failed")]
217 Convert(#[from] ConvertError),
218
219 #[error("File errors")]
221 File(#[from] FileError),
222
223 #[error("Serialization error: {0}")]
225 SerializationError(String),
226
227 #[error(transparent)]
229 IdAllocation(#[from] IdAllocationError),
230}
231
232#[derive(Debug, thiserror::Error)]
233pub enum IdAllocationError {
234 #[error("ID allocation overflow: {start} > {end}")]
236 Overflow { start: u64, end: u64 },
237
238 #[error("Invalid ID range: {start}..={end}")]
240 InvalidRange { start: u64, end: u64 },
241
242 #[error("No available IDs")]
244 NoIdsAvailable,
245}
246
247#[derive(Debug, thiserror::Error)]
248pub enum FileError {
249 #[error("Path does not exist: {0}")]
250 NotFound(String),
251 #[error("Path is a directory: {0}")]
252 IsDirectory(String),
253 #[error("File is busy: {0}")]
254 Busy(String),
255 #[error("Insufficient permissions: {0}")]
256 PermissionDenied(String),
257 #[error("File is occupied: {0}")]
258 FileBusy(String),
259 #[error("Invalid path: {0}")]
260 InvalidPath(String),
261 #[error("Too small: {0}")]
262 TooSmall(u64),
263 #[error("Invalid extension: {0}")]
264 InvalidExt(String),
265 #[error("Invalid GZIP header: {0}")]
266 InvalidGzipHeader(String),
267 #[error("Unknown IO error: {0}")]
268 UnknownIo(String),
269}
270
271#[derive(Debug, thiserror::Error)]
273pub enum ConvertError {
274 #[error("invalid byte length: expected 8 bytes, received {0} bytes")]
278 InvalidLength(usize),
279
280 #[error("conversion failure: {0}")]
284 ConversionFailure(String),
285}
286
287#[derive(Debug, thiserror::Error)]
288pub enum ReadSendError {
289 #[error("Network timeout")]
290 Timeout(#[from] tokio::time::error::Elapsed),
291
292 #[error("Connection failed")]
293 Connection(#[from] tonic::transport::Error),
294}
295
296#[derive(Debug, thiserror::Error)]
297pub enum WriteSendError {
298 #[error("Not cluster leader")]
299 NotLeader,
300
301 #[error("Network unreachable")]
302 Unreachable,
303
304 #[error("Payload too large")]
305 PayloadExceeded,
306}
307
308#[derive(Debug, thiserror::Error)]
309pub enum SystemError {
310 #[error("Network error: {0}")]
312 Network(#[from] NetworkError),
313
314 #[error("Storage operation failed")]
316 Storage(#[from] StorageError),
317
318 #[error("Serialization error")]
320 Serialization(#[from] SerializationError),
321
322 #[error("Protobuf operation failed: {0}")]
324 Prost(#[from] ProstError),
325
326 #[error("Node failed to start: {0}")]
328 NodeStartFailed(String),
329
330 #[error("General server error: {0}")]
331 GeneralServer(String),
332
333 #[error("Internal server error")]
334 ServerUnavailable,
335}
336
337#[derive(Debug, thiserror::Error)]
339pub enum SerializationError {
340 #[error("Bincode serialization failed: {0}")]
341 Bincode(#[from] bincode::Error),
342}
343
344#[derive(Debug, thiserror::Error)]
346pub enum ProstError {
347 #[error("Encoding failed: {0}")]
348 Encode(#[from] prost::EncodeError),
349
350 #[error("Decoding failed: {0}")]
351 Decode(#[from] prost::DecodeError),
352}
353
354#[derive(Debug, thiserror::Error)]
355pub enum ElectionError {
356 #[error("Election failed: {0}")]
358 Failed(String),
359
360 #[error("Found higher term(={0}) during election process")]
362 HigherTerm(u64),
363
364 #[error("Term conflict (current: {current}, received: {received})")]
366 TermConflict { current: u64, received: u64 },
367
368 #[error("Log conflict at index {index} (expected: {expected_term}, actual: {actual_term})")]
370 LogConflict {
371 index: u64,
372 expected_term: u64,
373 actual_term: u64,
374 },
375
376 #[error("Quorum not reached (required: {required}, succeed: {succeed})")]
378 QuorumFailure { required: usize, succeed: usize },
379
380 #[error("Leadership consensus error: {0}")]
382 LeadershipConsensus(String),
383
384 #[error("No voting member found for candidate {candidate_id}")]
386 NoVotingMemberFound { candidate_id: u32 },
387}
388
389#[derive(Debug, thiserror::Error)]
390pub enum ReplicationError {
391 #[error("Found higher term(={0}) during replication process")]
393 HigherTerm(u64),
394
395 #[error("Quorum not reached for log replication")]
397 QuorumNotReached,
398
399 #[error("Timeout to receive majority response")]
401 QuorumTimeout,
402
403 #[error("Node {node_id} unreachable for replication")]
405 NodeUnreachable { node_id: u32 },
406
407 #[error("RPC timeout after {duration}ms")]
409 RpcTimeout { duration: u64 },
410
411 #[error("No peer mapping for leader {leader_id}")]
413 NoPeerFound { leader_id: u32 },
414
415 #[error("Log conflict at index {index} (expected term {expected_term}, actual {actual_term})")]
417 LogConflict {
418 index: u64,
419 expected_term: u64,
420 actual_term: u64,
421 },
422
423 #[error("Replication requires leader role (known leader: {leader_id:?})")]
425 NotLeader { leader_id: Option<u32> },
426}
427
428#[derive(Debug, thiserror::Error)]
429pub enum MembershipError {
430 #[error("Membership update consensus failure: {0}")]
432 ConfigChangeUpdateFailed(String),
433
434 #[error("Membership changes require leader role")]
436 NotLeader,
437
438 #[error("No leader information available")]
440 NoLeaderFound,
441
442 #[error("Only Learner can join cluster")]
444 NotLearner,
445
446 #[error("Cluster bootstrap not completed")]
448 ClusterIsNotReady,
449
450 #[error("Cluster connection setup failed: {0}")]
452 SetupClusterConnectionFailed(String),
453
454 #[error("Metadata missing for node {node_id} in cluster config")]
456 NoMetadataFoundForNode { node_id: u32 },
457
458 #[error("No reachable peers found in cluster membership")]
460 NoPeersAvailable,
461
462 #[error("Node({0}) already been added into cluster config.")]
464 NodeAlreadyExists(u32),
465
466 #[error("To be removed node({0}) is leader.")]
468 RemoveNodeIsLeader(u32),
469
470 #[error("Cannot promote node {node_id}: current role is {role} (expected LEARNER)")]
471 InvalidPromotion { node_id: u32, role: i32 },
472
473 #[error("Invalid membership change request")]
474 InvalidChangeRequest,
475
476 #[error("Commit Timeout")]
477 CommitTimeout,
478
479 #[error("Learner({0}) join cluster failed.")]
480 JoinClusterFailed(u32),
481
482 #[error("Join cluster error: {0}")]
483 JoinClusterError(String),
484
485 #[error("Not leader")]
486 NoLeader,
487
488 #[error("Mark leader id failed: {0}")]
489 MarkLeaderIdFailed(String),
490}
491
492#[derive(Debug, thiserror::Error)]
493pub enum SnapshotError {
494 #[error("Snapshot receiver lagging, dropping chunk")]
495 Backpressure,
496
497 #[error("Install snapshot RPC request been rejected, last_chunk={last_chunk}")]
499 Rejected { last_chunk: u32 },
500
501 #[error("Install snapshot RPC request been rejected")]
502 RemoteRejection,
503
504 #[error("Install snapshot RPC request failed")]
506 TransferFailed,
507
508 #[error("Install snapshot RPC request timeout")]
510 TransferTimeout,
511
512 #[error("Snapshot operation failed: {0}")]
514 OperationFailed(String),
515
516 #[error("Snapshot is outdated")]
518 Outdated,
519
520 #[error("Snapshot file checksum mismatch")]
522 ChecksumMismatch,
523
524 #[error("Invalid snapshot")]
526 InvalidSnapshot,
527
528 #[error("Invalid chunk sequence")]
530 InvalidChunkSequence,
531
532 #[error("Stream receiver disconnected")]
534 ReceiverDisconnected,
535
536 #[error("Invalid first snapshot stream chunk")]
537 InvalidFirstChunk,
538
539 #[error("Empty snapshot stream chunk")]
540 EmptySnapshot,
541
542 #[error("Incomplete snapshot error")]
543 IncompleteSnapshot,
544
545 #[error("Requested chunk {0} out of range (max: {1})")]
546 ChunkOutOfRange(u32, u32),
547
548 #[error("Chunk in stream is out of order")]
549 OutOfOrderChunk,
550
551 #[error("No metadata in chunk")]
552 MissingMetadata,
553
554 #[error("Chunk not cached: {0}")]
555 ChunkNotCached(u32),
556
557 #[error("Background stream push task died")]
558 BackgroundTaskDied,
559}
560
561impl From<NetworkError> for Error {
563 fn from(e: NetworkError) -> Self {
564 Error::System(SystemError::Network(e))
565 }
566}
567
568impl From<StorageError> for Error {
569 fn from(e: StorageError) -> Self {
570 Error::System(SystemError::Storage(e))
571 }
572}
573
574impl From<ConvertError> for Error {
575 fn from(e: ConvertError) -> Self {
576 Error::System(SystemError::Storage(StorageError::Convert(e)))
577 }
578}
579
580impl From<FileError> for Error {
581 fn from(e: FileError) -> Self {
582 Error::System(SystemError::Storage(StorageError::File(e)))
583 }
584}
585
586impl From<SerializationError> for Error {
587 fn from(e: SerializationError) -> Self {
588 Error::System(SystemError::Serialization(e))
589 }
590}
591
592impl From<ProstError> for Error {
606 fn from(error: ProstError) -> Self {
607 Error::System(SystemError::Prost(error))
608 }
609}
610
611impl From<StateTransitionError> for Error {
614 fn from(e: StateTransitionError) -> Self {
615 Error::Consensus(ConsensusError::StateTransition(e))
616 }
617}
618
619impl From<ElectionError> for Error {
620 fn from(e: ElectionError) -> Self {
621 Error::Consensus(ConsensusError::Election(e))
622 }
623}
624
625impl From<ReplicationError> for Error {
626 fn from(e: ReplicationError) -> Self {
627 Error::Consensus(ConsensusError::Replication(e))
628 }
629}
630
631impl From<MembershipError> for Error {
632 fn from(e: MembershipError) -> Self {
633 Error::Consensus(ConsensusError::Membership(e))
634 }
635}
636
637impl From<ReadSendError> for Error {
639 fn from(e: ReadSendError) -> Self {
640 Error::System(SystemError::Network(NetworkError::ReadSend(e)))
641 }
642}
643
644impl From<WriteSendError> for Error {
645 fn from(e: WriteSendError) -> Self {
646 Error::System(SystemError::Network(NetworkError::WriteSend(e)))
647 }
648}
649
650impl From<tonic::transport::Error> for Error {
651 fn from(err: tonic::transport::Error) -> Self {
652 NetworkError::TonicError(Box::new(err)).into()
653 }
654}
655
656impl From<JoinError> for Error {
657 fn from(err: JoinError) -> Self {
658 NetworkError::TaskFailed(err).into()
659 }
660}
661
662impl From<SnapshotError> for Error {
663 fn from(e: SnapshotError) -> Self {
664 Error::Consensus(ConsensusError::Snapshot(e))
665 }
666}
667
668impl From<IdAllocationError> for Error {
669 fn from(e: IdAllocationError) -> Self {
670 StorageError::IdAllocation(e).into()
671 }
672}
673
674impl From<std::io::Error> for Error {
675 fn from(e: std::io::Error) -> Self {
676 StorageError::IoError(e).into()
677 }
678}