1use std::path::PathBuf;
2
3use crate::client::{ClientReadRequest, ClientResponse, ClientWriteRequest};
4use d_engine_proto::common::LogId;
5use d_engine_proto::server::cluster::ClusterConfChangeRequest;
6use d_engine_proto::server::cluster::ClusterConfUpdateResponse;
7use d_engine_proto::server::cluster::ClusterMembership;
8use d_engine_proto::server::cluster::JoinRequest;
9use d_engine_proto::server::cluster::JoinResponse;
10use d_engine_proto::server::cluster::LeaderDiscoveryRequest;
11use d_engine_proto::server::cluster::LeaderDiscoveryResponse;
12use d_engine_proto::server::cluster::MetadataRequest;
13use d_engine_proto::server::election::VoteRequest;
14use d_engine_proto::server::election::VoteResponse;
15use d_engine_proto::server::replication::AppendEntriesRequest;
16use d_engine_proto::server::replication::AppendEntriesResponse;
17use d_engine_proto::server::storage::SnapshotAck;
18use d_engine_proto::server::storage::SnapshotChunk;
19use d_engine_proto::server::storage::SnapshotMetadata;
20use d_engine_proto::server::storage::SnapshotResponse;
21use tonic::Status;
22
23use crate::ApplyResult;
24use crate::MaybeCloneOneshotSender;
25use crate::Result;
26use crate::ScanResult;
27use crate::StreamResponseSender;
28
29#[derive(Debug, Clone, PartialEq)]
30pub struct NewCommitData {
31 pub new_commit_index: u64,
32 pub role: i32,
33 pub current_term: u64,
34}
35
36#[derive(Debug)]
39pub enum ClientCmd {
40 Propose(
41 ClientWriteRequest,
42 MaybeCloneOneshotSender<std::result::Result<ClientResponse, Status>>,
43 ),
44 Read(
45 ClientReadRequest,
46 MaybeCloneOneshotSender<std::result::Result<ClientResponse, Status>>,
47 ),
48 Scan(
49 bytes::Bytes,
50 MaybeCloneOneshotSender<std::result::Result<ScanResult, Status>>,
51 ),
52}
53
54#[derive(Debug)]
55#[allow(dead_code)]
56pub enum RoleEvent {
57 BecomeFollower(Option<u32>), BecomeCandidate,
59 BecomeLeader,
60 BecomeLearner,
61
62 NotifyNewCommitIndex(NewCommitData),
63
64 LeaderDiscovered(u32, u64), ReprocessEvent(Box<RaftEvent>), LogFlushed {
75 durable_index: u64,
76 },
77
78 AppendResult {
82 follower_id: u32,
83 result: Result<AppendEntriesResponse>,
84 },
85
86 SnapshotPushCompleted {
90 peer_id: u32,
91 success: bool,
92 },
93
94 NoopCommitted {
98 term: u64,
99 },
100
101 ApplyCompleted {
105 last_index: u64,
106 results: Vec<ApplyResult>,
107 },
108
109 PeerStreamError {
114 peer_id: u32,
115 },
116
117 ZombieDetected(u32),
121
122 FatalError {
125 source: String,
126 error: String,
127 },
128}
129
130#[derive(Debug)]
131pub enum RaftEvent {
132 ReceiveVoteRequest(
133 VoteRequest,
134 MaybeCloneOneshotSender<std::result::Result<VoteResponse, Status>>,
135 ),
136
137 ClusterConf(
138 MetadataRequest,
139 MaybeCloneOneshotSender<std::result::Result<ClusterMembership, Status>>,
140 ),
141
142 ClusterConfUpdate(
143 ClusterConfChangeRequest,
144 MaybeCloneOneshotSender<std::result::Result<ClusterConfUpdateResponse, Status>>,
145 ),
146
147 AppendEntries(
148 AppendEntriesRequest,
149 MaybeCloneOneshotSender<std::result::Result<AppendEntriesResponse, Status>>,
150 ),
151
152 InstallSnapshotChunk(
154 Box<tonic::Streaming<SnapshotChunk>>,
155 MaybeCloneOneshotSender<std::result::Result<SnapshotResponse, Status>>,
156 ),
157
158 StreamSnapshot(Box<tonic::Streaming<SnapshotAck>>, StreamResponseSender),
160
161 JoinCluster(
162 JoinRequest,
163 MaybeCloneOneshotSender<std::result::Result<JoinResponse, Status>>,
164 ),
165
166 DiscoverLeader(
167 LeaderDiscoveryRequest,
168 MaybeCloneOneshotSender<std::result::Result<LeaderDiscoveryResponse, Status>>,
169 ),
170
171 CreateSnapshotEvent,
172
173 LogPurgeCompleted(LogId),
174
175 SnapshotCreated(Result<(SnapshotMetadata, PathBuf)>),
176
177 PromoteReadyLearners,
179
180 StepDownSelfRemoved,
183
184 MembershipApplied,
187
188 FatalError {
191 source: String, error: String, },
194}
195
196#[cfg(test)]
197#[cfg_attr(test, derive(Debug, Clone))]
198#[allow(unused)]
199pub enum TestEvent {
200 ReceiveVoteRequest(VoteRequest),
201
202 ClusterConf(MetadataRequest),
203
204 ClusterConfUpdate(ClusterConfChangeRequest),
205
206 AppendEntries(AppendEntriesRequest),
207
208 ClientPropose(ClientWriteRequest),
209
210 ClientReadRequest(ClientReadRequest),
211
212 InstallSnapshotChunk,
213
214 StreamSnapshot,
215
216 JoinCluster(JoinRequest),
217
218 DiscoverLeader(LeaderDiscoveryRequest),
219
220 CreateSnapshotEvent,
222
223 SnapshotCreated,
224
225 LogPurgeCompleted(LogId),
226
227 PromoteReadyLearners,
228
229 FatalError {
230 source: String,
231 error: String,
232 },
233
234 ApplyCompleted {
235 last_index: u64,
236 results: Vec<ApplyResult>,
237 },
238}
239
240#[cfg(test)]
241pub(crate) fn raft_event_to_test_event(event: &RaftEvent) -> TestEvent {
242 match event {
243 RaftEvent::ReceiveVoteRequest(req, _) => TestEvent::ReceiveVoteRequest(*req),
244 RaftEvent::ClusterConf(req, _) => TestEvent::ClusterConf(*req),
245 RaftEvent::ClusterConfUpdate(req, _) => TestEvent::ClusterConfUpdate(req.clone()),
246 RaftEvent::AppendEntries(req, _) => TestEvent::AppendEntries(req.clone()),
247 RaftEvent::InstallSnapshotChunk(_, _) => TestEvent::InstallSnapshotChunk,
248 RaftEvent::StreamSnapshot(_, _) => TestEvent::StreamSnapshot,
249 RaftEvent::JoinCluster(req, _) => TestEvent::JoinCluster(req.clone()),
250 RaftEvent::DiscoverLeader(req, _) => TestEvent::DiscoverLeader(req.clone()),
251 RaftEvent::CreateSnapshotEvent => TestEvent::CreateSnapshotEvent,
252 RaftEvent::SnapshotCreated(_result) => TestEvent::SnapshotCreated,
253 RaftEvent::LogPurgeCompleted(id) => TestEvent::LogPurgeCompleted(*id),
254 RaftEvent::PromoteReadyLearners => TestEvent::PromoteReadyLearners,
255 RaftEvent::StepDownSelfRemoved => {
256 TestEvent::CreateSnapshotEvent }
260 RaftEvent::MembershipApplied => {
261 TestEvent::CreateSnapshotEvent }
264 RaftEvent::FatalError { source, error } => TestEvent::FatalError {
265 source: source.clone(),
266 error: error.clone(),
267 },
268 }
269}