use std::path::PathBuf;
use std::time::Duration;
use config::ConfigError;
use tokio::task::JoinError;
#[doc(hidden)]
pub type Result<T> = std::result::Result<T, Error>;
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error(transparent)]
System(#[from] SystemError),
#[error(transparent)]
Config(#[from] ConfigError),
#[error(transparent)]
Consensus(#[from] ConsensusError),
#[error("Fatal error: {0}")]
Fatal(String),
}
#[derive(Debug, thiserror::Error)]
pub enum ConsensusError {
#[error(transparent)]
StateTransition(#[from] StateTransitionError),
#[error(transparent)]
Election(#[from] ElectionError),
#[error(transparent)]
Replication(#[from] ReplicationError),
#[error(transparent)]
Membership(#[from] MembershipError),
#[error(transparent)]
Snapshot(#[from] SnapshotError),
#[error("Operation requires {required_role} role but current role is {current_role}")]
RoleViolation {
current_role: &'static str,
required_role: &'static str,
context: String,
},
}
#[derive(Debug, thiserror::Error)]
#[doc(hidden)]
pub enum StateTransitionError {
#[error("Not enough votes to transition to leader.")]
NotEnoughVotes,
#[error("Invalid state transition.")]
InvalidTransition,
#[error("Lock error.")]
LockError,
}
#[derive(Debug, thiserror::Error)]
pub enum NetworkError {
#[error("Service unavailable: {0}")]
ServiceUnavailable(String),
#[error("Connection timeout to {node_id} after {duration:?}")]
Timeout { node_id: u32, duration: Duration },
#[error("Network unreachable: {source}")]
Unreachable {
source: Box<dyn std::error::Error + Send + Sync>,
},
#[error("Socket connect failed error: {0}")]
ConnectError(String),
#[error("Retry timeout after {0:?}")]
RetryTimeoutError(Duration),
#[error("TLS handshake failed")]
TlsHandshakeFailure,
#[error("Request list for {request_type} contains no peers")]
EmptyPeerList { request_type: &'static str },
#[error("Peer({0}) connection not found")]
PeerConnectionNotFound(u32),
#[error("Peer closed the channel")]
ResponseChannelClosed,
#[error("Peer({0}) address not found")]
PeerAddressNotFound(u32),
#[error("Invalid URI format: {0}")]
InvalidURI(String),
#[error("Failed to send {request_type} request")]
RequestSendFailure {
request_type: &'static str,
#[source]
source: Box<tonic::transport::Error>,
},
#[error("TCP keepalive configuration error")]
TcpKeepaliveError,
#[error("HTTP/2 keepalive configuration error")]
Http2KeepaliveError,
#[error(transparent)]
TonicError(#[from] Box<tonic::transport::Error>),
#[error(transparent)]
TonicStatusError(#[from] Box<tonic::Status>),
#[error("Failed to send read request: {0}")]
ReadSend(#[from] ReadSendError),
#[error("Failed to send write request: {0}")]
WriteSend(#[from] WriteSendError),
#[error("Background task failed: {0}")]
TaskFailed(#[from] JoinError),
#[error("{0}")]
TaskBackoffFailed(String),
#[error("{0}")]
SingalSendFailed(String),
#[error("{0}")]
SingalReceiveFailed(String),
#[error("New node join cluster failed: {0}")]
JoinFailed(String),
#[error("Network timeout: {0}")]
GlobalTimeout(String),
}
#[derive(Debug, thiserror::Error)]
pub enum StorageError {
#[error(transparent)]
IoError(#[from] std::io::Error),
#[error("Error occurred at path: {path}")]
PathError {
path: PathBuf, source: std::io::Error,
},
#[error(transparent)]
BincodeError(#[from] bincode::Error),
#[error("State Machine error: {0}")]
StateMachineError(String),
#[error("Log storage failure: {0}")]
LogStorage(String),
#[error("Data corruption detected at {location}")]
DataCorruption { location: String },
#[error("Configuration storage error: {0}")]
ConfigStorage(String),
#[error("Embedded database error: {0}")]
DbError(String),
#[error("Value convert failed")]
Convert(#[from] ConvertError),
#[error("File errors")]
File(#[from] FileError),
#[error("Serialization error: {0}")]
SerializationError(String),
#[error(transparent)]
IdAllocation(#[from] IdAllocationError),
#[error("Feature not enabled: {0}")]
FeatureNotEnabled(String),
#[error("State machine not serving: {0}")]
NotServing(String),
}
#[derive(Debug, thiserror::Error)]
pub enum IdAllocationError {
#[error("ID allocation overflow: {start} > {end}")]
Overflow { start: u64, end: u64 },
#[error("Invalid ID range: {start}..={end}")]
InvalidRange { start: u64, end: u64 },
#[error("No available IDs")]
NoIdsAvailable,
}
#[derive(Debug, thiserror::Error)]
pub enum FileError {
#[error("Path does not exist: {0}")]
NotFound(String),
#[error("Path is a directory: {0}")]
IsDirectory(String),
#[error("File is busy: {0}")]
Busy(String),
#[error("Insufficient permissions: {0}")]
PermissionDenied(String),
#[error("File is occupied: {0}")]
FileBusy(String),
#[error("Invalid path: {0}")]
InvalidPath(String),
#[error("Too small: {0}")]
TooSmall(u64),
#[error("Invalid extension: {0}")]
InvalidExt(String),
#[error("Invalid GZIP header: {0}")]
InvalidGzipHeader(String),
#[error("Unknown IO error: {0}")]
UnknownIo(String),
}
#[derive(Debug, thiserror::Error)]
pub enum ConvertError {
#[error("invalid byte length: expected 8 bytes, received {0} bytes")]
InvalidLength(usize),
#[error("conversion failure: {0}")]
ConversionFailure(String),
}
#[derive(Debug, thiserror::Error)]
pub enum ReadSendError {
#[error("Network timeout")]
Timeout(#[from] tokio::time::error::Elapsed),
#[error("Connection failed")]
Connection(#[from] tonic::transport::Error),
}
#[derive(Debug, thiserror::Error)]
pub enum WriteSendError {
#[error("Not cluster leader")]
NotLeader,
#[error("Network unreachable")]
Unreachable,
#[error("Payload too large")]
PayloadExceeded,
}
#[derive(Debug, thiserror::Error)]
pub enum SystemError {
#[error("Network error: {0}")]
Network(#[from] NetworkError),
#[error("Storage operation failed")]
Storage(#[from] StorageError),
#[error("Serialization error")]
Serialization(#[from] SerializationError),
#[error("Protobuf operation failed: {0}")]
Prost(#[from] ProstError),
#[error("Node failed to start: {0}")]
NodeStartFailed(String),
#[error("General server error: {0}")]
GeneralServer(String),
#[error("Internal server error")]
ServerUnavailable,
#[error("State machine does not support lease management")]
LeaseNotSupported,
}
#[derive(Debug, thiserror::Error)]
pub enum SerializationError {
#[error("Bincode serialization failed: {0}")]
Bincode(#[from] bincode::Error),
}
#[derive(Debug, thiserror::Error)]
pub enum ProstError {
#[error("Encoding failed: {0}")]
Encode(#[from] prost::EncodeError),
#[error("Decoding failed: {0}")]
Decode(#[from] prost::DecodeError),
}
#[derive(Debug, thiserror::Error)]
pub enum ElectionError {
#[error("Election failed: {0}")]
Failed(String),
#[error("Found higher term(={0}) during election process")]
HigherTerm(u64),
#[error("Term conflict (current: {current}, received: {received})")]
TermConflict { current: u64, received: u64 },
#[error("Log conflict at index {index} (expected: {expected_term}, actual: {actual_term})")]
LogConflict {
index: u64,
expected_term: u64,
actual_term: u64,
},
#[error("Quorum not reached (required: {required}, succeed: {succeed})")]
QuorumFailure { required: usize, succeed: usize },
#[error("Leadership consensus error: {0}")]
LeadershipConsensus(String),
#[error("No voting member found for candidate {candidate_id}")]
NoVotingMemberFound { candidate_id: u32 },
}
#[derive(Debug, thiserror::Error)]
pub enum ReplicationError {
#[error("Found higher term(={0}) during replication process")]
HigherTerm(u64),
#[error("Quorum not reached for log replication")]
QuorumNotReached,
#[error("Timeout to receive majority response")]
QuorumTimeout,
#[error("Node {node_id} unreachable for replication")]
NodeUnreachable { node_id: u32 },
#[error("RPC timeout after {duration}ms")]
RpcTimeout { duration: u64 },
#[error("No peer mapping for leader {leader_id}")]
NoPeerFound { leader_id: u32 },
#[error("Log conflict at index {index} (expected term {expected_term}, actual {actual_term})")]
LogConflict {
index: u64,
expected_term: u64,
actual_term: u64,
},
#[error("Replication requires leader role (known leader: {leader_id:?})")]
NotLeader { leader_id: Option<u32> },
}
#[derive(Debug, thiserror::Error, Clone)]
pub enum ReadIndexError {
#[error("Not leader - cannot serve linearizable read")]
NotLeader,
#[error("Verification timeout after {timeout_ms}ms")]
Timeout { timeout_ms: u64 },
#[error("Verification failed: {reason}")]
VerificationFailed { reason: String },
}
#[derive(Debug, thiserror::Error)]
pub enum MembershipError {
#[error("Membership update consensus failure: {0}")]
ConfigChangeUpdateFailed(String),
#[error("Membership changes require leader role")]
NotLeader,
#[error("No leader information available")]
NoLeaderFound,
#[error("Only Learner can join cluster")]
NotLearner,
#[error("Cluster bootstrap not completed")]
ClusterIsNotReady,
#[error("Cluster connection setup failed: {0}")]
SetupClusterConnectionFailed(String),
#[error("Metadata missing for node {node_id} in cluster config")]
NoMetadataFoundForNode { node_id: u32 },
#[error("No reachable peers found in cluster membership")]
NoPeersAvailable,
#[error("Node({0}) already been added into cluster config.")]
NodeAlreadyExists(u32),
#[error("To be removed node({0}) is leader.")]
RemoveNodeIsLeader(u32),
#[error("Cannot promote node {node_id}: current role is {role} (expected LEARNER)")]
InvalidPromotion { node_id: u32, role: i32 },
#[error("Invalid membership change request")]
InvalidChangeRequest,
#[error("Commit Timeout")]
CommitTimeout,
#[error("Learner({0}) join cluster failed.")]
JoinClusterFailed(u32),
#[error("Join cluster error: {0}")]
JoinClusterError(String),
#[error("Not leader")]
NoLeader,
#[error("Mark leader id failed: {0}")]
MarkLeaderIdFailed(String),
#[error(
"BUG: cluster_metadata not initialized! Leader must call init_cluster_metadata() after election"
)]
ClusterMetadataNotInitialized,
}
#[derive(Debug, thiserror::Error)]
pub enum SnapshotError {
#[error("Snapshot receiver lagging, dropping chunk")]
Backpressure,
#[error("Install snapshot RPC request been rejected, last_chunk={last_chunk}")]
Rejected { last_chunk: u32 },
#[error("Install snapshot RPC request been rejected")]
RemoteRejection,
#[error("Install snapshot RPC request failed")]
TransferFailed,
#[error("Install snapshot RPC request timeout")]
TransferTimeout,
#[error("Snapshot operation failed: {0}")]
OperationFailed(String),
#[error("Snapshot is outdated")]
Outdated,
#[error("Snapshot file checksum mismatch")]
ChecksumMismatch,
#[error("Invalid snapshot")]
InvalidSnapshot,
#[error("Invalid chunk sequence")]
InvalidChunkSequence,
#[error("Stream receiver disconnected")]
ReceiverDisconnected,
#[error("Invalid first snapshot stream chunk")]
InvalidFirstChunk,
#[error("Empty snapshot stream chunk")]
EmptySnapshot,
#[error("Incomplete snapshot error")]
IncompleteSnapshot,
#[error("Requested chunk {0} out of range (max: {1})")]
ChunkOutOfRange(u32, u32),
#[error("Chunk in stream is out of order")]
OutOfOrderChunk,
#[error("No metadata in chunk")]
MissingMetadata,
#[error("Chunk not cached: {0}")]
ChunkNotCached(u32),
#[error("Background stream push task died")]
BackgroundTaskDied,
}
impl From<NetworkError> for Error {
fn from(e: NetworkError) -> Self {
Error::System(SystemError::Network(e))
}
}
impl From<StorageError> for Error {
fn from(e: StorageError) -> Self {
Error::System(SystemError::Storage(e))
}
}
impl From<ConvertError> for Error {
fn from(e: ConvertError) -> Self {
Error::System(SystemError::Storage(StorageError::Convert(e)))
}
}
impl From<FileError> for Error {
fn from(e: FileError) -> Self {
Error::System(SystemError::Storage(StorageError::File(e)))
}
}
impl From<SerializationError> for Error {
fn from(e: SerializationError) -> Self {
Error::System(SystemError::Serialization(e))
}
}
impl From<ProstError> for Error {
fn from(error: ProstError) -> Self {
Error::System(SystemError::Prost(error))
}
}
impl From<StateTransitionError> for Error {
fn from(e: StateTransitionError) -> Self {
Error::Consensus(ConsensusError::StateTransition(e))
}
}
impl From<ElectionError> for Error {
fn from(e: ElectionError) -> Self {
Error::Consensus(ConsensusError::Election(e))
}
}
impl From<ReplicationError> for Error {
fn from(e: ReplicationError) -> Self {
Error::Consensus(ConsensusError::Replication(e))
}
}
impl From<MembershipError> for Error {
fn from(e: MembershipError) -> Self {
Error::Consensus(ConsensusError::Membership(e))
}
}
impl From<ReadSendError> for Error {
fn from(e: ReadSendError) -> Self {
Error::System(SystemError::Network(NetworkError::ReadSend(e)))
}
}
impl From<WriteSendError> for Error {
fn from(e: WriteSendError) -> Self {
Error::System(SystemError::Network(NetworkError::WriteSend(e)))
}
}
impl From<tonic::transport::Error> for Error {
fn from(err: tonic::transport::Error) -> Self {
NetworkError::TonicError(Box::new(err)).into()
}
}
impl From<JoinError> for Error {
fn from(err: JoinError) -> Self {
NetworkError::TaskFailed(err).into()
}
}
impl From<SnapshotError> for Error {
fn from(e: SnapshotError) -> Self {
Error::Consensus(ConsensusError::Snapshot(e))
}
}
impl From<ReadIndexError> for Error {
fn from(e: ReadIndexError) -> Self {
Error::Consensus(ConsensusError::Replication(match e {
ReadIndexError::NotLeader => ReplicationError::NotLeader { leader_id: None },
ReadIndexError::Timeout { timeout_ms } => ReplicationError::RpcTimeout {
duration: timeout_ms,
},
ReadIndexError::VerificationFailed { reason: _ } => ReplicationError::QuorumNotReached,
}))
}
}
impl From<IdAllocationError> for Error {
fn from(e: IdAllocationError) -> Self {
StorageError::IdAllocation(e).into()
}
}
impl From<std::io::Error> for Error {
fn from(e: std::io::Error) -> Self {
StorageError::IoError(e).into()
}
}