1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
use crate::{
messages::{ballot_leader_election::BLEMessage, sequence_paxos::PaxosMessage},
storage::Entry,
util::NodeId,
};
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
/// Internal component for log replication
pub mod sequence_paxos {
use crate::{
ballot_leader_election::Ballot,
storage::{Entry, SnapshotType, StopSign},
util::{NodeId, SequenceNumber},
};
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
use std::fmt::Debug;
/// Message sent by a follower on crash-recovery or dropped messages to request its leader to re-prepare them.
#[derive(Copy, Clone, Debug)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct PrepareReq {
/// The current round.
pub n: Ballot,
}
/// Prepare message sent by a newly-elected leader to initiate the Prepare phase.
#[derive(Copy, Clone, Debug)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct Prepare {
/// The current round.
pub n: Ballot,
/// The decided index of this leader.
pub decided_idx: u64,
/// The latest round in which an entry was accepted.
pub n_accepted: Ballot,
/// The log length of this leader.
pub accepted_idx: u64,
}
/// Promise message sent by a follower in response to a [`Prepare`] sent by the leader.
#[derive(Clone, Debug)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct Promise<T>
where
T: Entry,
{
/// The current round.
pub n: Ballot,
/// The latest round in which an entry was accepted.
pub n_accepted: Ballot,
/// The decided snapshot.
pub decided_snapshot: Option<SnapshotType<T>>,
/// The log suffix.
pub suffix: Vec<T>,
/// The decided index of this follower.
pub decided_idx: u64,
/// The log length of this follower.
pub accepted_idx: u64,
/// The StopSign accepted by this follower
pub stopsign: Option<StopSign>,
}
/// AcceptSync message sent by the leader to synchronize the logs of all replicas in the prepare phase.
#[derive(Clone, Debug)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct AcceptSync<T>
where
T: Entry,
{
/// The current round.
pub n: Ballot,
/// The sequence number of this message in the leader-to-follower accept sequence
pub seq_num: SequenceNumber,
/// The decided snapshot.
pub decided_snapshot: Option<SnapshotType<T>>,
/// The log suffix.
pub suffix: Vec<T>,
/// The index of the log where the entries from `suffix` should be applied at (also the compacted idx of `decided_snapshot` if it exists)
pub sync_idx: u64,
/// The decided index
pub decided_idx: u64,
/// StopSign to be accepted
pub stopsign: Option<StopSign>,
#[cfg(feature = "unicache")]
/// The UniCache of the leader
pub unicache: T::UniCache,
}
/// Message with entries to be replicated and the latest decided index sent by the leader in the accept phase.
#[derive(Clone, Debug)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct AcceptDecide<T>
where
T: Entry,
{
/// The current round.
pub n: Ballot,
/// The sequence number of this message in the leader-to-follower accept sequence
pub seq_num: SequenceNumber,
/// The decided index.
pub decided_idx: u64,
/// Entries to be replicated.
pub entries: Vec<T>,
}
/// TODO
#[derive(Clone, Debug)]
#[cfg(feature = "unicache")]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct EncodedAcceptDecide<T>
where
T: Entry,
{
/// The current round.
pub n: Ballot,
/// The sequence number of this message in the leader-to-follower accept sequence
pub seq_num: SequenceNumber,
/// The decided index.
pub decided_idx: u64,
/// Entries to be replicated.
pub entries: Vec<T::EncodeResult>,
}
/// Message sent by follower to leader when entries has been accepted.
#[derive(Copy, Clone, Debug)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct Accepted {
/// The current round.
pub n: Ballot,
/// The accepted index.
pub accepted_idx: u64,
}
/// Message sent by leader to followers to decide up to a certain index in the log.
#[derive(Copy, Clone, Debug)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct Decide {
/// The current round.
pub n: Ballot,
/// The sequence number of this message in the leader-to-follower accept sequence
pub seq_num: SequenceNumber,
/// The decided index.
pub decided_idx: u64,
}
/// Message sent by leader to followers to accept a StopSign
#[derive(Clone, Debug)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct AcceptStopSign {
/// The current round.
pub n: Ballot,
/// The sequence number of this message in the leader-to-follower accept sequence
pub seq_num: SequenceNumber,
/// The decided index.
pub ss: StopSign,
}
/// Message sent by follower to leader when accepting an entry is rejected.
/// This happens when the follower is promised to a greater leader.
#[derive(Clone, Debug)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct NotAccepted {
/// The follower's current ballot
pub n: Ballot,
}
/// Compaction Request
#[allow(missing_docs)]
#[derive(Clone, Debug)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum Compaction {
Trim(u64),
Snapshot(Option<u64>),
}
/// An enum for all the different message types.
#[allow(missing_docs)]
#[derive(Clone, Debug)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum PaxosMsg<T>
where
T: Entry,
{
/// Request a [`Prepare`] to be sent from the leader. Used for fail-recovery.
PrepareReq(PrepareReq),
#[allow(missing_docs)]
Prepare(Prepare),
Promise(Promise<T>),
AcceptSync(AcceptSync<T>),
AcceptDecide(AcceptDecide<T>),
Accepted(Accepted),
NotAccepted(NotAccepted),
Decide(Decide),
/// Forward client proposals to the leader.
ProposalForward(Vec<T>),
Compaction(Compaction),
AcceptStopSign(AcceptStopSign),
ForwardStopSign(StopSign),
#[cfg(feature = "unicache")]
EncodedAcceptDecide(EncodedAcceptDecide<T>),
}
/// A struct for a Paxos message that also includes sender and receiver.
#[derive(Clone, Debug)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct PaxosMessage<T>
where
T: Entry,
{
/// Sender of `msg`.
pub from: NodeId,
/// Receiver of `msg`.
pub to: NodeId,
/// The message content.
pub msg: PaxosMsg<T>,
}
}
/// The different messages BLE uses to communicate with other servers.
pub mod ballot_leader_election {
use crate::{ballot_leader_election::Ballot, util::NodeId};
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
/// An enum for all the different BLE message types.
#[allow(missing_docs)]
#[derive(Clone, Debug)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum HeartbeatMsg {
Request(HeartbeatRequest),
Reply(HeartbeatReply),
}
/// Requests a reply from all the other servers.
#[derive(Clone, Debug)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct HeartbeatRequest {
/// Number of the current round.
pub round: u32,
}
/// Replies
#[derive(Clone, Debug)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct HeartbeatReply {
/// Number of the current heartbeat round.
pub round: u32,
/// Ballot of replying server.
pub ballot: Ballot,
/// Leader this server is following
pub leader: Ballot,
/// Whether the replying server sees a need for a new leader
pub happy: bool,
}
/// A struct for a Paxos message that also includes sender and receiver.
#[derive(Clone, Debug)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct BLEMessage {
/// Sender of `msg`.
pub from: NodeId,
/// Receiver of `msg`.
pub to: NodeId,
/// The message content.
pub msg: HeartbeatMsg,
}
}
#[allow(missing_docs)]
/// Message in OmniPaxos. Can be either a `SequencePaxos` message (for log replication) or `BLE` message (for leader election)
#[derive(Clone, Debug)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum Message<T>
where
T: Entry,
{
SequencePaxos(PaxosMessage<T>),
BLE(BLEMessage),
}
impl<T> Message<T>
where
T: Entry,
{
/// Get the sender id of the message
pub fn get_sender(&self) -> NodeId {
match self {
Message::SequencePaxos(p) => p.from,
Message::BLE(b) => b.from,
}
}
/// Get the receiver id of the message
pub fn get_receiver(&self) -> NodeId {
match self {
Message::SequencePaxos(p) => p.to,
Message::BLE(b) => b.to,
}
}
}