1use std::fmt::Debug;
2
3use crate::async_runtime::AsyncOneshotSendExt;
4use crate::core::sm;
5use crate::engine::CommandKind;
6use crate::error::Infallible;
7use crate::error::InitializeError;
8use crate::error::InstallSnapshotError;
9use crate::progress::entry::ProgressEntry;
10use crate::progress::Inflight;
11use crate::raft::AppendEntriesResponse;
12use crate::raft::InstallSnapshotResponse;
13use crate::raft::SnapshotResponse;
14use crate::raft::VoteRequest;
15use crate::raft::VoteResponse;
16use crate::type_config::alias::OneshotSenderOf;
17use crate::LeaderId;
18use crate::LogId;
19use crate::NodeId;
20use crate::OptionalSend;
21use crate::RaftTypeConfig;
22use crate::Vote;
23
24#[derive(Debug)]
26pub(crate) enum Command<C>
27where C: RaftTypeConfig
28{
29 BecomeLeader,
32
33 QuitLeader,
35
36 AppendInputEntries {
38 vote: Vote<C::NodeId>,
48
49 entries: Vec<C::Entry>,
50 },
51
52 ReplicateCommitted { committed: Option<LogId<C::NodeId>> },
54
55 Commit {
65 seq: sm::CommandSeq,
67 already_committed: Option<LogId<C::NodeId>>,
69 upto: LogId<C::NodeId>,
70 },
71
72 Replicate {
74 target: C::NodeId,
75 req: Inflight<C::NodeId>,
76 },
77
78 RebuildReplicationStreams {
84 targets: Vec<(C::NodeId, ProgressEntry<C::NodeId>)>,
86 },
87
88 SaveVote { vote: Vote<C::NodeId> },
90
91 SendVote { vote_req: VoteRequest<C::NodeId> },
93
94 PurgeLog { upto: LogId<C::NodeId> },
96
97 DeleteConflictLog { since: LogId<C::NodeId> },
100
101 StateMachine { command: sm::Command<C> },
107
108 Respond {
110 when: Option<Condition<C::NodeId>>,
111 resp: Respond<C>,
112 },
113}
114
115impl<C> From<sm::Command<C>> for Command<C>
116where C: RaftTypeConfig
117{
118 fn from(cmd: sm::Command<C>) -> Self {
119 Self::StateMachine { command: cmd }
120 }
121}
122
123impl<C> PartialEq for Command<C>
125where
126 C: RaftTypeConfig,
127 C::Entry: PartialEq,
128{
129 #[rustfmt::skip]
130 fn eq(&self, other: &Self) -> bool {
131 match (self, other) {
132 (Command::BecomeLeader, Command::BecomeLeader) => true,
133 (Command::QuitLeader, Command::QuitLeader) => true,
134 (Command::AppendInputEntries { vote, entries }, Command::AppendInputEntries { vote: vb, entries: b }, ) => vote == vb && entries == b,
135 (Command::ReplicateCommitted { committed }, Command::ReplicateCommitted { committed: b }, ) => committed == b,
136 (Command::Commit { seq, already_committed, upto, }, Command::Commit { seq: b_seq, already_committed: b_committed, upto: b_upto, }, ) => seq == b_seq && already_committed == b_committed && upto == b_upto,
137 (Command::Replicate { target, req }, Command::Replicate { target: b_target, req: other_req, }, ) => target == b_target && req == other_req,
138 (Command::RebuildReplicationStreams { targets }, Command::RebuildReplicationStreams { targets: b }, ) => targets == b,
139 (Command::SaveVote { vote }, Command::SaveVote { vote: b }) => vote == b,
140 (Command::SendVote { vote_req }, Command::SendVote { vote_req: b }, ) => vote_req == b,
141 (Command::PurgeLog { upto }, Command::PurgeLog { upto: b }) => upto == b,
142 (Command::DeleteConflictLog { since }, Command::DeleteConflictLog { since: b }, ) => since == b,
143 (Command::Respond { when, resp: send }, Command::Respond { when: b_when, resp: b }) => send == b && when == b_when,
144 (Command::StateMachine { command }, Command::StateMachine { command: b }) => command == b,
145 _ => false,
146 }
147 }
148}
149
150impl<C> Command<C>
151where C: RaftTypeConfig
152{
153 #[allow(dead_code)]
154 #[rustfmt::skip]
155 pub(crate) fn kind(&self) -> CommandKind {
156 match self {
157 Command::BecomeLeader => CommandKind::Main,
158 Command::QuitLeader => CommandKind::Main,
159 Command::RebuildReplicationStreams { .. } => CommandKind::Main,
160 Command::Respond { .. } => CommandKind::Main,
161
162 Command::AppendInputEntries { .. } => CommandKind::Log,
163 Command::SaveVote { .. } => CommandKind::Log,
164 Command::PurgeLog { .. } => CommandKind::Log,
165 Command::DeleteConflictLog { .. } => CommandKind::Log,
166
167 Command::ReplicateCommitted { .. } => CommandKind::Network,
168 Command::Replicate { .. } => CommandKind::Network,
169 Command::SendVote { .. } => CommandKind::Network,
170
171 Command::StateMachine { .. } => CommandKind::StateMachine,
172 Command::Commit { .. } => CommandKind::Main,
175 }
176 }
177
178 #[allow(dead_code)]
180 #[rustfmt::skip]
181 pub(crate) fn condition(&self) -> Option<&Condition<C::NodeId>> {
182 match self {
183 Command::BecomeLeader => None,
184 Command::QuitLeader => None,
185 Command::AppendInputEntries { .. } => None,
186 Command::ReplicateCommitted { .. } => None,
187 Command::Commit { .. } => None,
189 Command::Replicate { .. } => None,
190 Command::RebuildReplicationStreams { .. } => None,
191 Command::SaveVote { .. } => None,
192 Command::SendVote { .. } => None,
193 Command::PurgeLog { .. } => None,
194 Command::DeleteConflictLog { .. } => None,
195 Command::Respond { when, .. } => when.as_ref(),
196 Command::StateMachine { .. } => None,
198 }
199 }
200}
201
202#[derive(Debug, Clone, Copy)]
204#[derive(PartialEq, Eq)]
205pub(crate) enum Condition<NID>
206where NID: NodeId
207{
208 #[allow(dead_code)]
213 LogFlushed {
214 leader: LeaderId<NID>,
215 log_id: Option<LogId<NID>>,
216 },
217
218 #[allow(dead_code)]
220 Applied { log_id: Option<LogId<NID>> },
221
222 #[allow(dead_code)]
224 StateMachineCommand { command_seq: sm::CommandSeq },
225}
226
227#[derive(Debug, PartialEq, Eq)]
229#[derive(derive_more::From)]
230pub(crate) enum Respond<C>
231where C: RaftTypeConfig
232{
233 Vote(ValueSender<C, Result<VoteResponse<C::NodeId>, Infallible>>),
234 AppendEntries(ValueSender<C, Result<AppendEntriesResponse<C::NodeId>, Infallible>>),
235 ReceiveSnapshotChunk(ValueSender<C, Result<(), InstallSnapshotError>>),
236 InstallSnapshot(ValueSender<C, Result<InstallSnapshotResponse<C::NodeId>, InstallSnapshotError>>),
237 InstallFullSnapshot(ValueSender<C, Result<SnapshotResponse<C::NodeId>, Infallible>>),
238 Initialize(ValueSender<C, Result<(), InitializeError<C::NodeId, C::Node>>>),
239}
240
241impl<C> Respond<C>
242where C: RaftTypeConfig
243{
244 pub(crate) fn new<T>(res: T, tx: OneshotSenderOf<C, T>) -> Self
245 where
246 T: Debug + PartialEq + Eq + OptionalSend,
247 Self: From<ValueSender<C, T>>,
248 {
249 Respond::from(ValueSender::new(res, tx))
250 }
251
252 pub(crate) fn send(self) {
253 match self {
254 Respond::Vote(x) => x.send(),
255 Respond::AppendEntries(x) => x.send(),
256 Respond::ReceiveSnapshotChunk(x) => x.send(),
257 Respond::InstallSnapshot(x) => x.send(),
258 Respond::InstallFullSnapshot(x) => x.send(),
259 Respond::Initialize(x) => x.send(),
260 }
261 }
262}
263
264#[derive(Debug)]
265pub(crate) struct ValueSender<C, T>
266where
267 T: Debug + PartialEq + Eq + OptionalSend,
268 C: RaftTypeConfig,
269{
270 value: T,
271 tx: OneshotSenderOf<C, T>,
272}
273
274impl<C, T> PartialEq for ValueSender<C, T>
275where
276 T: Debug + PartialEq + Eq + OptionalSend,
277 C: RaftTypeConfig,
278{
279 fn eq(&self, other: &Self) -> bool {
280 self.value == other.value
281 }
282}
283
284impl<C, T> Eq for ValueSender<C, T>
285where
286 T: Debug + PartialEq + Eq + OptionalSend,
287 C: RaftTypeConfig,
288{
289}
290
291impl<C, T> ValueSender<C, T>
292where
293 T: Debug + PartialEq + Eq + OptionalSend,
294 C: RaftTypeConfig,
295{
296 pub(crate) fn new(res: T, tx: OneshotSenderOf<C, T>) -> Self {
297 Self { value: res, tx }
298 }
299
300 pub(crate) fn send(self) {
301 let _ = self.tx.send(self.value);
302 }
303}