use std::path::PathBuf;
use d_engine_proto::client::ClientReadRequest;
use d_engine_proto::client::ClientResponse;
use d_engine_proto::client::ClientWriteRequest;
use d_engine_proto::common::LogId;
use d_engine_proto::server::cluster::ClusterConfChangeRequest;
use d_engine_proto::server::cluster::ClusterConfUpdateResponse;
use d_engine_proto::server::cluster::ClusterMembership;
use d_engine_proto::server::cluster::JoinRequest;
use d_engine_proto::server::cluster::JoinResponse;
use d_engine_proto::server::cluster::LeaderDiscoveryRequest;
use d_engine_proto::server::cluster::LeaderDiscoveryResponse;
use d_engine_proto::server::cluster::MetadataRequest;
use d_engine_proto::server::election::VoteRequest;
use d_engine_proto::server::election::VoteResponse;
use d_engine_proto::server::replication::AppendEntriesRequest;
use d_engine_proto::server::replication::AppendEntriesResponse;
use d_engine_proto::server::storage::SnapshotAck;
use d_engine_proto::server::storage::SnapshotChunk;
use d_engine_proto::server::storage::SnapshotMetadata;
use d_engine_proto::server::storage::SnapshotResponse;
use tonic::Status;
use crate::ApplyResult;
use crate::MaybeCloneOneshotSender;
use crate::Result;
use crate::StreamResponseSender;
#[derive(Debug, Clone, PartialEq)]
pub struct NewCommitData {
pub new_commit_index: u64,
pub role: i32,
pub current_term: u64,
}
#[derive(Debug)]
pub enum ClientCmd {
Propose(
ClientWriteRequest,
MaybeCloneOneshotSender<std::result::Result<ClientResponse, Status>>,
),
Read(
ClientReadRequest,
MaybeCloneOneshotSender<std::result::Result<ClientResponse, Status>>,
),
}
#[derive(Debug)]
#[allow(dead_code)]
pub enum RoleEvent {
BecomeFollower(Option<u32>), BecomeCandidate,
BecomeLeader,
BecomeLearner,
NotifyNewCommitIndex(NewCommitData),
LeaderDiscovered(u32, u64),
ReprocessEvent(Box<RaftEvent>), }
#[derive(Debug)]
pub enum RaftEvent {
ReceiveVoteRequest(
VoteRequest,
MaybeCloneOneshotSender<std::result::Result<VoteResponse, Status>>,
),
ClusterConf(
MetadataRequest,
MaybeCloneOneshotSender<std::result::Result<ClusterMembership, Status>>,
),
ClusterConfUpdate(
ClusterConfChangeRequest,
MaybeCloneOneshotSender<std::result::Result<ClusterConfUpdateResponse, Status>>,
),
AppendEntries(
AppendEntriesRequest,
MaybeCloneOneshotSender<std::result::Result<AppendEntriesResponse, Status>>,
),
InstallSnapshotChunk(
Box<tonic::Streaming<SnapshotChunk>>,
MaybeCloneOneshotSender<std::result::Result<SnapshotResponse, Status>>,
),
StreamSnapshot(Box<tonic::Streaming<SnapshotAck>>, StreamResponseSender),
JoinCluster(
JoinRequest,
MaybeCloneOneshotSender<std::result::Result<JoinResponse, Status>>,
),
DiscoverLeader(
LeaderDiscoveryRequest,
MaybeCloneOneshotSender<std::result::Result<LeaderDiscoveryResponse, Status>>,
),
#[allow(unused)]
TriggerSnapshotPush {
peer_id: u32,
},
CreateSnapshotEvent,
LogPurgeCompleted(LogId),
SnapshotCreated(Result<(SnapshotMetadata, PathBuf)>),
PromoteReadyLearners,
StepDownSelfRemoved,
MembershipApplied,
FatalError {
source: String, error: String, },
ApplyCompleted {
last_index: u64,
results: Vec<ApplyResult>,
},
}
#[cfg(test)]
#[cfg_attr(test, derive(Debug, Clone))]
#[allow(unused)]
pub enum TestEvent {
ReceiveVoteRequest(VoteRequest),
ClusterConf(MetadataRequest),
ClusterConfUpdate(ClusterConfChangeRequest),
AppendEntries(AppendEntriesRequest),
ClientPropose(ClientWriteRequest),
ClientReadRequest(ClientReadRequest),
InstallSnapshotChunk,
StreamSnapshot,
JoinCluster(JoinRequest),
DiscoverLeader(LeaderDiscoveryRequest),
TriggerSnapshotPush {
peer_id: u32,
},
CreateSnapshotEvent,
SnapshotCreated,
LogPurgeCompleted(LogId),
PromoteReadyLearners,
FatalError {
source: String,
error: String,
},
ApplyCompleted {
last_index: u64,
results: Vec<ApplyResult>,
},
}
#[cfg(test)]
pub(crate) fn raft_event_to_test_event(event: &RaftEvent) -> TestEvent {
match event {
RaftEvent::ReceiveVoteRequest(req, _) => TestEvent::ReceiveVoteRequest(*req),
RaftEvent::ClusterConf(req, _) => TestEvent::ClusterConf(*req),
RaftEvent::ClusterConfUpdate(req, _) => TestEvent::ClusterConfUpdate(req.clone()),
RaftEvent::AppendEntries(req, _) => TestEvent::AppendEntries(req.clone()),
RaftEvent::InstallSnapshotChunk(_, _) => TestEvent::InstallSnapshotChunk,
RaftEvent::StreamSnapshot(_, _) => TestEvent::StreamSnapshot,
RaftEvent::JoinCluster(req, _) => TestEvent::JoinCluster(req.clone()),
RaftEvent::DiscoverLeader(req, _) => TestEvent::DiscoverLeader(req.clone()),
RaftEvent::CreateSnapshotEvent => TestEvent::CreateSnapshotEvent,
RaftEvent::SnapshotCreated(_result) => TestEvent::SnapshotCreated,
RaftEvent::LogPurgeCompleted(id) => TestEvent::LogPurgeCompleted(*id),
RaftEvent::TriggerSnapshotPush { peer_id } => {
TestEvent::TriggerSnapshotPush { peer_id: *peer_id }
}
RaftEvent::PromoteReadyLearners => TestEvent::PromoteReadyLearners,
RaftEvent::StepDownSelfRemoved => {
TestEvent::CreateSnapshotEvent }
RaftEvent::MembershipApplied => {
TestEvent::CreateSnapshotEvent }
RaftEvent::FatalError { source, error } => TestEvent::FatalError {
source: source.clone(),
error: error.clone(),
},
RaftEvent::ApplyCompleted {
last_index,
results,
} => TestEvent::ApplyCompleted {
last_index: *last_index,
results: results.clone(),
},
}
}