1use std::path::PathBuf;
2
3use d_engine_proto::client::ClientReadRequest;
4use d_engine_proto::client::ClientResponse;
5use d_engine_proto::client::ClientWriteRequest;
6use d_engine_proto::common::LogId;
7use d_engine_proto::server::cluster::ClusterConfChangeRequest;
8use d_engine_proto::server::cluster::ClusterConfUpdateResponse;
9use d_engine_proto::server::cluster::ClusterMembership;
10use d_engine_proto::server::cluster::JoinRequest;
11use d_engine_proto::server::cluster::JoinResponse;
12use d_engine_proto::server::cluster::LeaderDiscoveryRequest;
13use d_engine_proto::server::cluster::LeaderDiscoveryResponse;
14use d_engine_proto::server::cluster::MetadataRequest;
15use d_engine_proto::server::election::VoteRequest;
16use d_engine_proto::server::election::VoteResponse;
17use d_engine_proto::server::replication::AppendEntriesRequest;
18use d_engine_proto::server::replication::AppendEntriesResponse;
19use d_engine_proto::server::storage::SnapshotAck;
20use d_engine_proto::server::storage::SnapshotChunk;
21use d_engine_proto::server::storage::SnapshotMetadata;
22use d_engine_proto::server::storage::SnapshotResponse;
23use tonic::Status;
24
25use crate::ApplyResult;
26use crate::MaybeCloneOneshotSender;
27use crate::Result;
28use crate::StreamResponseSender;
29
30#[derive(Debug, Clone, PartialEq)]
31pub struct NewCommitData {
32 pub new_commit_index: u64,
33 pub role: i32,
34 pub current_term: u64,
35}
36
37#[derive(Debug)]
40pub enum ClientCmd {
41 Propose(
42 ClientWriteRequest,
43 MaybeCloneOneshotSender<std::result::Result<ClientResponse, Status>>,
44 ),
45 Read(
46 ClientReadRequest,
47 MaybeCloneOneshotSender<std::result::Result<ClientResponse, Status>>,
48 ),
49}
50
51#[derive(Debug)]
52#[allow(dead_code)]
53pub enum RoleEvent {
54 BecomeFollower(Option<u32>), BecomeCandidate,
56 BecomeLeader,
57 BecomeLearner,
58
59 NotifyNewCommitIndex(NewCommitData),
60
61 LeaderDiscovered(u32, u64), ReprocessEvent(Box<RaftEvent>), }
68
69#[derive(Debug)]
70pub enum RaftEvent {
71 ReceiveVoteRequest(
72 VoteRequest,
73 MaybeCloneOneshotSender<std::result::Result<VoteResponse, Status>>,
74 ),
75
76 ClusterConf(
77 MetadataRequest,
78 MaybeCloneOneshotSender<std::result::Result<ClusterMembership, Status>>,
79 ),
80
81 ClusterConfUpdate(
82 ClusterConfChangeRequest,
83 MaybeCloneOneshotSender<std::result::Result<ClusterConfUpdateResponse, Status>>,
84 ),
85
86 AppendEntries(
87 AppendEntriesRequest,
88 MaybeCloneOneshotSender<std::result::Result<AppendEntriesResponse, Status>>,
89 ),
90
91 InstallSnapshotChunk(
93 Box<tonic::Streaming<SnapshotChunk>>,
94 MaybeCloneOneshotSender<std::result::Result<SnapshotResponse, Status>>,
95 ),
96
97 StreamSnapshot(Box<tonic::Streaming<SnapshotAck>>, StreamResponseSender),
99
100 JoinCluster(
101 JoinRequest,
102 MaybeCloneOneshotSender<std::result::Result<JoinResponse, Status>>,
103 ),
104
105 DiscoverLeader(
106 LeaderDiscoveryRequest,
107 MaybeCloneOneshotSender<std::result::Result<LeaderDiscoveryResponse, Status>>,
108 ),
109
110 #[allow(unused)]
112 TriggerSnapshotPush {
113 peer_id: u32,
114 },
115
116 CreateSnapshotEvent,
117
118 LogPurgeCompleted(LogId),
119
120 SnapshotCreated(Result<(SnapshotMetadata, PathBuf)>),
121
122 PromoteReadyLearners,
124
125 StepDownSelfRemoved,
128
129 MembershipApplied,
132
133 FatalError {
136 source: String, error: String, },
139
140 ApplyCompleted {
144 last_index: u64,
145 results: Vec<ApplyResult>,
146 },
147}
148
149#[cfg(test)]
150#[cfg_attr(test, derive(Debug, Clone))]
151#[allow(unused)]
152pub enum TestEvent {
153 ReceiveVoteRequest(VoteRequest),
154
155 ClusterConf(MetadataRequest),
156
157 ClusterConfUpdate(ClusterConfChangeRequest),
158
159 AppendEntries(AppendEntriesRequest),
160
161 ClientPropose(ClientWriteRequest),
162
163 ClientReadRequest(ClientReadRequest),
164
165 InstallSnapshotChunk,
166
167 StreamSnapshot,
168
169 JoinCluster(JoinRequest),
170
171 DiscoverLeader(LeaderDiscoveryRequest),
172
173 TriggerSnapshotPush {
174 peer_id: u32,
175 },
176
177 CreateSnapshotEvent,
179
180 SnapshotCreated,
181
182 LogPurgeCompleted(LogId),
183
184 PromoteReadyLearners,
185
186 FatalError {
187 source: String,
188 error: String,
189 },
190
191 ApplyCompleted {
192 last_index: u64,
193 results: Vec<ApplyResult>,
194 },
195}
196
197#[cfg(test)]
198pub(crate) fn raft_event_to_test_event(event: &RaftEvent) -> TestEvent {
199 match event {
200 RaftEvent::ReceiveVoteRequest(req, _) => TestEvent::ReceiveVoteRequest(*req),
201 RaftEvent::ClusterConf(req, _) => TestEvent::ClusterConf(*req),
202 RaftEvent::ClusterConfUpdate(req, _) => TestEvent::ClusterConfUpdate(req.clone()),
203 RaftEvent::AppendEntries(req, _) => TestEvent::AppendEntries(req.clone()),
204 RaftEvent::InstallSnapshotChunk(_, _) => TestEvent::InstallSnapshotChunk,
205 RaftEvent::StreamSnapshot(_, _) => TestEvent::StreamSnapshot,
206 RaftEvent::JoinCluster(req, _) => TestEvent::JoinCluster(req.clone()),
207 RaftEvent::DiscoverLeader(req, _) => TestEvent::DiscoverLeader(req.clone()),
208 RaftEvent::CreateSnapshotEvent => TestEvent::CreateSnapshotEvent,
209 RaftEvent::SnapshotCreated(_result) => TestEvent::SnapshotCreated,
210 RaftEvent::LogPurgeCompleted(id) => TestEvent::LogPurgeCompleted(*id),
211 RaftEvent::TriggerSnapshotPush { peer_id } => {
212 TestEvent::TriggerSnapshotPush { peer_id: *peer_id }
213 }
214 RaftEvent::PromoteReadyLearners => TestEvent::PromoteReadyLearners,
215 RaftEvent::StepDownSelfRemoved => {
216 TestEvent::CreateSnapshotEvent }
220 RaftEvent::MembershipApplied => {
221 TestEvent::CreateSnapshotEvent }
224 RaftEvent::FatalError { source, error } => TestEvent::FatalError {
225 source: source.clone(),
226 error: error.clone(),
227 },
228 RaftEvent::ApplyCompleted {
229 last_index,
230 results,
231 } => TestEvent::ApplyCompleted {
232 last_index: *last_index,
233 results: results.clone(),
234 },
235 }
236}