1use std::path::PathBuf;
2
3use d_engine_proto::client::ClientReadRequest;
4use d_engine_proto::client::ClientResponse;
5use d_engine_proto::client::ClientWriteRequest;
6use d_engine_proto::common::LogId;
7use d_engine_proto::server::cluster::ClusterConfChangeRequest;
8use d_engine_proto::server::cluster::ClusterConfUpdateResponse;
9use d_engine_proto::server::cluster::ClusterMembership;
10use d_engine_proto::server::cluster::JoinRequest;
11use d_engine_proto::server::cluster::JoinResponse;
12use d_engine_proto::server::cluster::LeaderDiscoveryRequest;
13use d_engine_proto::server::cluster::LeaderDiscoveryResponse;
14use d_engine_proto::server::cluster::MetadataRequest;
15use d_engine_proto::server::election::VoteRequest;
16use d_engine_proto::server::election::VoteResponse;
17use d_engine_proto::server::replication::AppendEntriesRequest;
18use d_engine_proto::server::replication::AppendEntriesResponse;
19use d_engine_proto::server::storage::PurgeLogRequest;
20use d_engine_proto::server::storage::PurgeLogResponse;
21use d_engine_proto::server::storage::SnapshotAck;
22use d_engine_proto::server::storage::SnapshotChunk;
23use d_engine_proto::server::storage::SnapshotMetadata;
24use d_engine_proto::server::storage::SnapshotResponse;
25use tonic::Status;
26
27use crate::MaybeCloneOneshotSender;
28use crate::Result;
29use crate::StreamResponseSender;
30
31#[derive(Debug, Clone)]
32pub struct NewCommitData {
33 pub new_commit_index: u64,
34 pub role: i32,
35 pub current_term: u64,
36}
37
38#[derive(Debug)]
39#[allow(dead_code)]
40pub enum RoleEvent {
41 BecomeFollower(Option<u32>), BecomeCandidate,
43 BecomeLeader,
44 BecomeLearner,
45
46 NotifyNewCommitIndex(NewCommitData),
47
48 LeaderDiscovered(u32, u64), ReprocessEvent(Box<RaftEvent>), }
55
56#[derive(Debug)]
57pub enum RaftEvent {
58 ReceiveVoteRequest(
59 VoteRequest,
60 MaybeCloneOneshotSender<std::result::Result<VoteResponse, Status>>,
61 ),
62
63 ClusterConf(
64 MetadataRequest,
65 MaybeCloneOneshotSender<std::result::Result<ClusterMembership, Status>>,
66 ),
67
68 ClusterConfUpdate(
69 ClusterConfChangeRequest,
70 MaybeCloneOneshotSender<std::result::Result<ClusterConfUpdateResponse, Status>>,
71 ),
72
73 AppendEntries(
74 AppendEntriesRequest,
75 MaybeCloneOneshotSender<std::result::Result<AppendEntriesResponse, Status>>,
76 ),
77
78 ClientPropose(
79 ClientWriteRequest,
80 MaybeCloneOneshotSender<std::result::Result<ClientResponse, Status>>,
81 ),
82
83 ClientReadRequest(
84 ClientReadRequest,
85 MaybeCloneOneshotSender<std::result::Result<ClientResponse, Status>>,
86 ),
87
88 InstallSnapshotChunk(
90 Box<tonic::Streaming<SnapshotChunk>>,
91 MaybeCloneOneshotSender<std::result::Result<SnapshotResponse, Status>>,
92 ),
93
94 StreamSnapshot(Box<tonic::Streaming<SnapshotAck>>, StreamResponseSender),
96
97 RaftLogCleanUp(
98 PurgeLogRequest,
99 MaybeCloneOneshotSender<std::result::Result<PurgeLogResponse, Status>>,
100 ),
101
102 JoinCluster(
103 JoinRequest,
104 MaybeCloneOneshotSender<std::result::Result<JoinResponse, Status>>,
105 ),
106
107 DiscoverLeader(
108 LeaderDiscoveryRequest,
109 MaybeCloneOneshotSender<std::result::Result<LeaderDiscoveryResponse, Status>>,
110 ),
111
112 #[allow(unused)]
114 TriggerSnapshotPush {
115 peer_id: u32,
116 },
117
118 CreateSnapshotEvent,
119
120 LogPurgeCompleted(LogId),
121
122 SnapshotCreated(Result<(SnapshotMetadata, PathBuf)>),
123
124 PromoteReadyLearners,
126
127 StepDownSelfRemoved,
130
131 MembershipApplied,
134}
135
136#[cfg(any(test, feature = "test-utils"))]
137#[cfg_attr(any(test, feature = "test-utils"), derive(Debug, Clone))]
138#[allow(unused)]
139pub enum TestEvent {
140 ReceiveVoteRequest(VoteRequest),
141
142 ClusterConf(MetadataRequest),
143
144 ClusterConfUpdate(ClusterConfChangeRequest),
145
146 AppendEntries(AppendEntriesRequest),
147
148 ClientPropose(ClientWriteRequest),
149
150 ClientReadRequest(ClientReadRequest),
151
152 InstallSnapshotChunk,
153
154 StreamSnapshot,
155
156 RaftLogCleanUp(PurgeLogRequest),
157
158 JoinCluster(JoinRequest),
159
160 DiscoverLeader(LeaderDiscoveryRequest),
161
162 TriggerSnapshotPush { peer_id: u32 },
163
164 CreateSnapshotEvent,
166
167 SnapshotCreated,
168
169 LogPurgeCompleted(LogId),
170
171 PromoteReadyLearners,
172}
173
174#[cfg(any(test, feature = "test-utils"))]
175pub fn raft_event_to_test_event(event: &RaftEvent) -> TestEvent {
176 match event {
177 RaftEvent::ReceiveVoteRequest(req, _) => TestEvent::ReceiveVoteRequest(*req),
178 RaftEvent::ClusterConf(req, _) => TestEvent::ClusterConf(*req),
179 RaftEvent::ClusterConfUpdate(req, _) => TestEvent::ClusterConfUpdate(req.clone()),
180 RaftEvent::AppendEntries(req, _) => TestEvent::AppendEntries(req.clone()),
181 RaftEvent::ClientPropose(req, _) => TestEvent::ClientPropose(req.clone()),
182 RaftEvent::ClientReadRequest(req, _) => TestEvent::ClientReadRequest(req.clone()),
183 RaftEvent::InstallSnapshotChunk(_, _) => TestEvent::InstallSnapshotChunk,
184 RaftEvent::StreamSnapshot(_, _) => TestEvent::StreamSnapshot,
185 RaftEvent::RaftLogCleanUp(req, _) => TestEvent::RaftLogCleanUp(req.clone()),
186 RaftEvent::JoinCluster(req, _) => TestEvent::JoinCluster(req.clone()),
187 RaftEvent::DiscoverLeader(req, _) => TestEvent::DiscoverLeader(req.clone()),
188 RaftEvent::CreateSnapshotEvent => TestEvent::CreateSnapshotEvent,
189 RaftEvent::SnapshotCreated(_result) => TestEvent::SnapshotCreated,
190 RaftEvent::LogPurgeCompleted(id) => TestEvent::LogPurgeCompleted(*id),
191 RaftEvent::TriggerSnapshotPush { peer_id } => {
192 TestEvent::TriggerSnapshotPush { peer_id: *peer_id }
193 }
194 RaftEvent::PromoteReadyLearners => TestEvent::PromoteReadyLearners,
195 RaftEvent::StepDownSelfRemoved => {
196 TestEvent::CreateSnapshotEvent }
200 RaftEvent::MembershipApplied => {
201 TestEvent::CreateSnapshotEvent }
204 }
205}