1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
use std::fmt::Debug;
use crate::async_runtime::AsyncOneshotSendExt;
use crate::core::sm;
use crate::engine::CommandKind;
use crate::error::Infallible;
use crate::error::InitializeError;
use crate::error::InstallSnapshotError;
use crate::progress::entry::ProgressEntry;
use crate::progress::Inflight;
use crate::raft::AppendEntriesResponse;
use crate::raft::InstallSnapshotResponse;
use crate::raft::SnapshotResponse;
use crate::raft::VoteRequest;
use crate::raft::VoteResponse;
use crate::type_config::alias::OneshotSenderOf;
use crate::LeaderId;
use crate::LogId;
use crate::NodeId;
use crate::OptionalSend;
use crate::RaftTypeConfig;
use crate::Vote;
/// Commands to send to `RaftRuntime` to execute, to update the application state.
#[derive(Debug)]
pub(crate) enum Command<C>
where C: RaftTypeConfig
{
/// Becomes a leader, i.e., its `vote` is granted by a quorum.
/// The runtime initializes leader data when receives this command.
BecomeLeader,
/// No longer a leader. Clean up leader's data.
QuitLeader,
/// Append a `range` of entries.
AppendInputEntries {
/// The vote of the leader that submits the entries to write.
///
/// The leader could be a local leader that appends entries to the local log store,
/// or a remote leader that replicates entries to this follower.
///
/// The leader id is used to generate a monotonic increasing IO id, such as: [`LogIOId`].
/// Where [`LogIOId`] is `(leader_id, log_id)`.
///
/// [`LogIOId`]: crate::raft_state::io_state::log_io_id::LogIOId
vote: Vote<C::NodeId>,
entries: Vec<C::Entry>,
},
/// Replicate the committed log id to other nodes
ReplicateCommitted { committed: Option<LogId<C::NodeId>> },
/// Commit log entries that are already persisted in the store, upto `upto`, inclusive.
///
/// To `commit` logs, [`RaftLogStorage::save_committed()`] is called. And then committed logs
/// will be applied to the state machine by calling [`RaftStateMachine::apply()`].
///
/// And if it is leader, send applied result to the client that proposed the entry.
///
/// [`RaftLogStorage::save_committed()`]: crate::storage::RaftLogStorage::save_committed
/// [`RaftStateMachine::apply()`]: crate::storage::RaftStateMachine::apply
Commit {
// TODO: remove it when sm::Command to apply is generated by Engine
seq: sm::CommandSeq,
// TODO: pass the log id list or entries?
already_committed: Option<LogId<C::NodeId>>,
upto: LogId<C::NodeId>,
},
/// Replicate log entries or snapshot to a target.
Replicate {
target: C::NodeId,
req: Inflight<C::NodeId>,
},
/// Membership config changed, need to update replication streams.
/// The Runtime has to close all old replications and start new ones.
/// Because a replication stream should only report state for one membership config.
/// When membership config changes, the membership log id stored in ReplicationCore has to be
/// updated.
RebuildReplicationStreams {
/// Targets to replicate to.
targets: Vec<(C::NodeId, ProgressEntry<C::NodeId>)>,
},
/// Save vote to storage
SaveVote { vote: Vote<C::NodeId> },
/// Send vote to all other members
SendVote { vote_req: VoteRequest<C::NodeId> },
/// Purge log from the beginning to `upto`, inclusive.
PurgeLog { upto: LogId<C::NodeId> },
/// Delete logs that conflict with the leader from a follower/learner since log id `since`,
/// inclusive.
DeleteConflictLog { since: LogId<C::NodeId> },
// TODO(1): current it is only used to replace BuildSnapshot, InstallSnapshot, CancelSnapshot.
/// A command send to state machine worker [`sm::worker::Worker`].
///
/// The runtime(`RaftCore`) will just forward this command to [`sm::worker::Worker`].
/// The response will be sent back in a `RaftMsg::StateMachine` message to `RaftCore`.
StateMachine { command: sm::Command<C> },
/// Send result to caller
Respond {
when: Option<Condition<C::NodeId>>,
resp: Respond<C>,
},
}
impl<C> From<sm::Command<C>> for Command<C>
where C: RaftTypeConfig
{
fn from(cmd: sm::Command<C>) -> Self {
Self::StateMachine { command: cmd }
}
}
/// For unit testing
impl<C> PartialEq for Command<C>
where
C: RaftTypeConfig,
C::Entry: PartialEq,
{
#[rustfmt::skip]
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(Command::BecomeLeader, Command::BecomeLeader) => true,
(Command::QuitLeader, Command::QuitLeader) => true,
(Command::AppendInputEntries { vote, entries }, Command::AppendInputEntries { vote: vb, entries: b }, ) => vote == vb && entries == b,
(Command::ReplicateCommitted { committed }, Command::ReplicateCommitted { committed: b }, ) => committed == b,
(Command::Commit { seq, already_committed, upto, }, Command::Commit { seq: b_seq, already_committed: b_committed, upto: b_upto, }, ) => seq == b_seq && already_committed == b_committed && upto == b_upto,
(Command::Replicate { target, req }, Command::Replicate { target: b_target, req: other_req, }, ) => target == b_target && req == other_req,
(Command::RebuildReplicationStreams { targets }, Command::RebuildReplicationStreams { targets: b }, ) => targets == b,
(Command::SaveVote { vote }, Command::SaveVote { vote: b }) => vote == b,
(Command::SendVote { vote_req }, Command::SendVote { vote_req: b }, ) => vote_req == b,
(Command::PurgeLog { upto }, Command::PurgeLog { upto: b }) => upto == b,
(Command::DeleteConflictLog { since }, Command::DeleteConflictLog { since: b }, ) => since == b,
(Command::Respond { when, resp: send }, Command::Respond { when: b_when, resp: b }) => send == b && when == b_when,
(Command::StateMachine { command }, Command::StateMachine { command: b }) => command == b,
_ => false,
}
}
}
impl<C> Command<C>
where C: RaftTypeConfig
{
#[allow(dead_code)]
#[rustfmt::skip]
pub(crate) fn kind(&self) -> CommandKind {
match self {
Command::BecomeLeader => CommandKind::Main,
Command::QuitLeader => CommandKind::Main,
Command::RebuildReplicationStreams { .. } => CommandKind::Main,
Command::Respond { .. } => CommandKind::Main,
Command::AppendInputEntries { .. } => CommandKind::Log,
Command::SaveVote { .. } => CommandKind::Log,
Command::PurgeLog { .. } => CommandKind::Log,
Command::DeleteConflictLog { .. } => CommandKind::Log,
Command::ReplicateCommitted { .. } => CommandKind::Network,
Command::Replicate { .. } => CommandKind::Network,
Command::SendVote { .. } => CommandKind::Network,
Command::StateMachine { .. } => CommandKind::StateMachine,
// Apply is firstly handled by RaftCore, then forwarded to state machine worker.
// TODO: Apply also write `committed` to log-store, which should be run in CommandKind::Log
Command::Commit { .. } => CommandKind::Main,
}
}
/// Return the condition the command waits for if any.
#[allow(dead_code)]
#[rustfmt::skip]
pub(crate) fn condition(&self) -> Option<&Condition<C::NodeId>> {
match self {
Command::BecomeLeader => None,
Command::QuitLeader => None,
Command::AppendInputEntries { .. } => None,
Command::ReplicateCommitted { .. } => None,
// TODO: Apply also write `committed` to log-store, which should be run in CommandKind::Log
Command::Commit { .. } => None,
Command::Replicate { .. } => None,
Command::RebuildReplicationStreams { .. } => None,
Command::SaveVote { .. } => None,
Command::SendVote { .. } => None,
Command::PurgeLog { .. } => None,
Command::DeleteConflictLog { .. } => None,
Command::Respond { when, .. } => when.as_ref(),
// TODO(1):
Command::StateMachine { .. } => None,
}
}
}
/// A condition to wait for before running a command.
#[derive(Debug, Clone, Copy)]
#[derive(PartialEq, Eq)]
pub(crate) enum Condition<NID>
where NID: NodeId
{
/// Wait until the log is flushed to the disk.
///
/// In raft, a log io can be uniquely identified by `(leader_id, log_id)`, not `log_id`.
/// A same log id can be written multiple times by different leaders.
#[allow(dead_code)]
LogFlushed {
leader: LeaderId<NID>,
log_id: Option<LogId<NID>>,
},
/// Wait until the log is applied to the state machine.
#[allow(dead_code)]
Applied { log_id: Option<LogId<NID>> },
/// Wait until a [`sm::worker::Worker`] command is finished.
#[allow(dead_code)]
StateMachineCommand { command_seq: sm::CommandSeq },
}
/// A command to send return value to the caller via a `oneshot::Sender`.
#[derive(Debug, PartialEq, Eq)]
#[derive(derive_more::From)]
pub(crate) enum Respond<C>
where C: RaftTypeConfig
{
Vote(ValueSender<C, Result<VoteResponse<C::NodeId>, Infallible>>),
AppendEntries(ValueSender<C, Result<AppendEntriesResponse<C::NodeId>, Infallible>>),
ReceiveSnapshotChunk(ValueSender<C, Result<(), InstallSnapshotError>>),
InstallSnapshot(ValueSender<C, Result<InstallSnapshotResponse<C::NodeId>, InstallSnapshotError>>),
InstallFullSnapshot(ValueSender<C, Result<SnapshotResponse<C::NodeId>, Infallible>>),
Initialize(ValueSender<C, Result<(), InitializeError<C::NodeId, C::Node>>>),
}
impl<C> Respond<C>
where C: RaftTypeConfig
{
pub(crate) fn new<T>(res: T, tx: OneshotSenderOf<C, T>) -> Self
where
T: Debug + PartialEq + Eq + OptionalSend,
Self: From<ValueSender<C, T>>,
{
Respond::from(ValueSender::new(res, tx))
}
pub(crate) fn send(self) {
match self {
Respond::Vote(x) => x.send(),
Respond::AppendEntries(x) => x.send(),
Respond::ReceiveSnapshotChunk(x) => x.send(),
Respond::InstallSnapshot(x) => x.send(),
Respond::InstallFullSnapshot(x) => x.send(),
Respond::Initialize(x) => x.send(),
}
}
}
#[derive(Debug)]
pub(crate) struct ValueSender<C, T>
where
T: Debug + PartialEq + Eq + OptionalSend,
C: RaftTypeConfig,
{
value: T,
tx: OneshotSenderOf<C, T>,
}
impl<C, T> PartialEq for ValueSender<C, T>
where
T: Debug + PartialEq + Eq + OptionalSend,
C: RaftTypeConfig,
{
fn eq(&self, other: &Self) -> bool {
self.value == other.value
}
}
impl<C, T> Eq for ValueSender<C, T>
where
T: Debug + PartialEq + Eq + OptionalSend,
C: RaftTypeConfig,
{
}
impl<C, T> ValueSender<C, T>
where
T: Debug + PartialEq + Eq + OptionalSend,
C: RaftTypeConfig,
{
pub(crate) fn new(res: T, tx: OneshotSenderOf<C, T>) -> Self {
Self { value: res, tx }
}
pub(crate) fn send(self) {
let _ = self.tx.send(self.value);
}
}