1use std::path::PathBuf;
2
3use d_engine_proto::client::ClientReadRequest;
4use d_engine_proto::client::ClientResponse;
5use d_engine_proto::client::ClientWriteRequest;
6use d_engine_proto::common::LogId;
7use d_engine_proto::server::cluster::ClusterConfChangeRequest;
8use d_engine_proto::server::cluster::ClusterConfUpdateResponse;
9use d_engine_proto::server::cluster::ClusterMembership;
10use d_engine_proto::server::cluster::JoinRequest;
11use d_engine_proto::server::cluster::JoinResponse;
12use d_engine_proto::server::cluster::LeaderDiscoveryRequest;
13use d_engine_proto::server::cluster::LeaderDiscoveryResponse;
14use d_engine_proto::server::cluster::MetadataRequest;
15use d_engine_proto::server::election::VoteRequest;
16use d_engine_proto::server::election::VoteResponse;
17use d_engine_proto::server::replication::AppendEntriesRequest;
18use d_engine_proto::server::replication::AppendEntriesResponse;
19use d_engine_proto::server::storage::PurgeLogRequest;
20use d_engine_proto::server::storage::PurgeLogResponse;
21use d_engine_proto::server::storage::SnapshotAck;
22use d_engine_proto::server::storage::SnapshotChunk;
23use d_engine_proto::server::storage::SnapshotMetadata;
24use d_engine_proto::server::storage::SnapshotResponse;
25use tonic::Status;
26
27use crate::MaybeCloneOneshotSender;
28use crate::Result;
29use crate::StreamResponseSender;
30
31#[derive(Debug, Clone)]
32pub struct NewCommitData {
33 pub new_commit_index: u64,
34 pub role: i32,
35 pub current_term: u64,
36}
37
38#[derive(Debug)]
39#[allow(dead_code)]
40pub enum RoleEvent {
41 BecomeFollower(Option<u32>), BecomeCandidate,
43 BecomeLeader,
44 BecomeLearner,
45
46 NotifyNewCommitIndex(NewCommitData),
47
48 LeaderDiscovered(u32, u64), ReprocessEvent(Box<RaftEvent>), }
55
56#[derive(Debug)]
57pub enum RaftEvent {
58 ReceiveVoteRequest(
59 VoteRequest,
60 MaybeCloneOneshotSender<std::result::Result<VoteResponse, Status>>,
61 ),
62
63 ClusterConf(
64 MetadataRequest,
65 MaybeCloneOneshotSender<std::result::Result<ClusterMembership, Status>>,
66 ),
67
68 ClusterConfUpdate(
69 ClusterConfChangeRequest,
70 MaybeCloneOneshotSender<std::result::Result<ClusterConfUpdateResponse, Status>>,
71 ),
72
73 AppendEntries(
74 AppendEntriesRequest,
75 MaybeCloneOneshotSender<std::result::Result<AppendEntriesResponse, Status>>,
76 ),
77
78 ClientPropose(
79 ClientWriteRequest,
80 MaybeCloneOneshotSender<std::result::Result<ClientResponse, Status>>,
81 ),
82
83 ClientReadRequest(
84 ClientReadRequest,
85 MaybeCloneOneshotSender<std::result::Result<ClientResponse, Status>>,
86 ),
87
88 InstallSnapshotChunk(
90 Box<tonic::Streaming<SnapshotChunk>>,
91 MaybeCloneOneshotSender<std::result::Result<SnapshotResponse, Status>>,
92 ),
93
94 StreamSnapshot(Box<tonic::Streaming<SnapshotAck>>, StreamResponseSender),
96
97 RaftLogCleanUp(
98 PurgeLogRequest,
99 MaybeCloneOneshotSender<std::result::Result<PurgeLogResponse, Status>>,
100 ),
101
102 JoinCluster(
103 JoinRequest,
104 MaybeCloneOneshotSender<std::result::Result<JoinResponse, Status>>,
105 ),
106
107 DiscoverLeader(
108 LeaderDiscoveryRequest,
109 MaybeCloneOneshotSender<std::result::Result<LeaderDiscoveryResponse, Status>>,
110 ),
111
112 #[allow(unused)]
114 TriggerSnapshotPush {
115 peer_id: u32,
116 },
117
118 CreateSnapshotEvent,
119
120 LogPurgeCompleted(LogId),
121
122 SnapshotCreated(Result<(SnapshotMetadata, PathBuf)>),
123
124 PromoteReadyLearners,
126
127 StepDownSelfRemoved,
130
131 MembershipApplied,
134
135 FlushReadBuffer,
138}
139
140#[cfg(test)]
141#[cfg_attr(test, derive(Debug, Clone))]
142#[allow(unused)]
143pub enum TestEvent {
144 ReceiveVoteRequest(VoteRequest),
145
146 ClusterConf(MetadataRequest),
147
148 ClusterConfUpdate(ClusterConfChangeRequest),
149
150 AppendEntries(AppendEntriesRequest),
151
152 ClientPropose(ClientWriteRequest),
153
154 ClientReadRequest(ClientReadRequest),
155
156 InstallSnapshotChunk,
157
158 StreamSnapshot,
159
160 RaftLogCleanUp(PurgeLogRequest),
161
162 JoinCluster(JoinRequest),
163
164 DiscoverLeader(LeaderDiscoveryRequest),
165
166 TriggerSnapshotPush { peer_id: u32 },
167
168 CreateSnapshotEvent,
170
171 SnapshotCreated,
172
173 LogPurgeCompleted(LogId),
174
175 PromoteReadyLearners,
176
177 FlushReadBuffer,
178}
179
180#[cfg(test)]
181pub(crate) fn raft_event_to_test_event(event: &RaftEvent) -> TestEvent {
182 match event {
183 RaftEvent::ReceiveVoteRequest(req, _) => TestEvent::ReceiveVoteRequest(*req),
184 RaftEvent::ClusterConf(req, _) => TestEvent::ClusterConf(*req),
185 RaftEvent::ClusterConfUpdate(req, _) => TestEvent::ClusterConfUpdate(req.clone()),
186 RaftEvent::AppendEntries(req, _) => TestEvent::AppendEntries(req.clone()),
187 RaftEvent::ClientPropose(req, _) => TestEvent::ClientPropose(req.clone()),
188 RaftEvent::ClientReadRequest(req, _) => TestEvent::ClientReadRequest(req.clone()),
189 RaftEvent::InstallSnapshotChunk(_, _) => TestEvent::InstallSnapshotChunk,
190 RaftEvent::StreamSnapshot(_, _) => TestEvent::StreamSnapshot,
191 RaftEvent::RaftLogCleanUp(req, _) => TestEvent::RaftLogCleanUp(req.clone()),
192 RaftEvent::JoinCluster(req, _) => TestEvent::JoinCluster(req.clone()),
193 RaftEvent::DiscoverLeader(req, _) => TestEvent::DiscoverLeader(req.clone()),
194 RaftEvent::CreateSnapshotEvent => TestEvent::CreateSnapshotEvent,
195 RaftEvent::SnapshotCreated(_result) => TestEvent::SnapshotCreated,
196 RaftEvent::LogPurgeCompleted(id) => TestEvent::LogPurgeCompleted(*id),
197 RaftEvent::TriggerSnapshotPush { peer_id } => {
198 TestEvent::TriggerSnapshotPush { peer_id: *peer_id }
199 }
200 RaftEvent::PromoteReadyLearners => TestEvent::PromoteReadyLearners,
201 RaftEvent::StepDownSelfRemoved => {
202 TestEvent::CreateSnapshotEvent }
206 RaftEvent::MembershipApplied => {
207 TestEvent::CreateSnapshotEvent }
210 RaftEvent::FlushReadBuffer => TestEvent::FlushReadBuffer,
211 }
212}