1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
use {
crate::{
Digest,
GroupKey,
groups::{
Group,
Groups,
InMemoryLogStore,
NoOp,
StateMachine,
Storage,
config::GroupConfig,
worker::Worker,
},
primitives::ShortFmtExt,
tickets::TicketValidator,
},
core::time::Duration,
dashmap::Entry,
derive_builder::Builder,
serde::{Deserialize, Serialize},
};
/// Configures the behavior of a [`Group`] that is being joined.
pub struct GroupBuilder<'g, S = (), M = ()> {
/// Reference to the parent [`Groups`] instance that tracks all existing
/// groups.
groups: &'g Groups,
/// The group key that carries the authentication credentials for joining the
/// group and authorizing membership. This value is used when deriving the
/// group id.
pub(super) key: GroupKey,
/// Configures consensus parameters for the group protocol, such as
/// heartbeat intervals or election timeouts.
///
/// Those values are used when deriving the group id, all members of the
/// group must have identical configuration settings for these values, any
/// difference will render a different group id.
///
/// If the state machine implementation provides a default consensus config,
/// that will be used unless the value is explicitly set in the builder.
pub(super) consensus: Option<ConsensusConfig>,
/// The storage implementation to use for this group. This is used to persist
/// the current state of the replicated raft log. This value does not affect
/// the generated group id.
pub(super) storage: S,
/// The application-level state machine implementation.
///
/// This value is used when deriving the group id. All members of the group
/// must be running the same state machine implementation.
pub(super) state_machine: M,
/// Ticket validators for authenticating peers that attempt to join the
/// group. Each validator is used to validate the tickets presented by peers
/// during the bonding process. Peers must satisfy all configured validators
/// to be allowed to join the group and form a bond connection.
pub(super) auth: Vec<Box<dyn TicketValidator>>,
}
/// Setters that are available when neither the state machine nor the storage
/// are set.
impl<'g> GroupBuilder<'g, (), ()> {
/// Initialize a group builder for the given group key.
pub(super) const fn new(groups: &'g Groups, key: GroupKey) -> Self {
Self {
groups,
key,
consensus: None,
storage: (),
state_machine: (),
auth: Vec::new(),
}
}
/// Sets the application-level state machine implementation for the group.
/// This is used when deriving the group id, all members of the group must be
/// running the same state machine implementation.
///
/// This setting must be set before the storage implementation, since the
/// storage implementation must be compatible with the command type of the
/// state machine.
pub fn with_state_machine<SM: StateMachine>(
self,
state_machine: SM,
) -> GroupBuilder<'g, InMemoryLogStore<SM::Command>, SM> {
GroupBuilder {
groups: self.groups,
key: self.key,
consensus: self.consensus,
storage: InMemoryLogStore::<SM::Command>::default(),
state_machine,
auth: self.auth,
}
}
/// Joins a group with default configuration and noop state machine
/// implementation.
///
/// This is useful for cases where the user just wants to join a group for the
/// sake of knowing who is the leader and keep track of the group members,
/// without needing to run any application-level logic or persist any state.
pub fn join(self) -> Group<NoOp> {
self.with_state_machine(NoOp).join()
}
}
impl<'g, M> GroupBuilder<'g, InMemoryLogStore<M::Command>, M>
where
M: StateMachine,
{
/// Sets the storage implementation for the state machine's replicated log.
///
/// This is used to persist the current state of the log and must be
/// compatible with the command type of the state machine. This value does
/// not affect the generated group id.
///
/// Defaults to an in-memory log store if not set explicitly.
pub fn with_log_storage<S>(self, storage: S) -> GroupBuilder<'g, S, M>
where
S: Storage<M::Command>,
{
GroupBuilder {
groups: self.groups,
key: self.key,
consensus: self.consensus,
state_machine: self.state_machine,
storage,
auth: self.auth,
}
}
}
impl<S, M> GroupBuilder<'_, S, M> {
/// Consensus configuration for the group protocol, such as heartbeat
/// intervals and election timeouts.
///
/// This is used when deriving the group id, all members of the group must
/// have identical configuration settings for these values, any difference
/// will render a different group id.
///
/// If not set explicitly, the default consensus config will be used.
#[must_use]
pub const fn with_consensus_config(
mut self,
consensus: ConsensusConfig,
) -> Self {
self.consensus = Some(consensus);
self
}
/// Adds a ticket validator for authenticating peers that attempt to join
/// the group. Peers must satisfy all configured validators to be allowed
/// to join and form bond connections. Can be called multiple times to
/// require multiple types of tickets.
///
/// This does affect the generated group id, all members of the group must
/// have the same ticket validators in the same order, otherwise they will
/// derive different group ids and will not be able to form a bond
/// connection with each other.
#[must_use]
pub fn require_ticket(mut self, auth: impl TicketValidator) -> Self {
self.auth.push(Box::new(auth));
self
}
}
impl<S, M> GroupBuilder<'_, S, M>
where
S: Storage<M::Command>,
M: StateMachine,
{
/// Joins a group with the specified configuration.
///
/// The group builder values will generate a unique group id that is derived
/// from the group key, the state machine, and the consensus configuration.
pub fn join(self) -> Group<M> {
let config = GroupConfig::new(
self.key, //
self.consensus.unwrap_or_default(),
&self.state_machine,
self.auth,
);
let group_id = *config.group_id();
match self.groups.active.entry(group_id) {
Entry::Occupied(existing) => {
existing.get().public_handle(&self.groups.active)
}
Entry::Vacant(place) => {
let network = self.groups.local.network_id().short().to_string();
let group = group_id.short().to_string();
let network_label = [("network", network.clone())];
let labels = [("network", network), ("group", group)];
metrics::gauge!("mosaik.groups.active", &network_label).increment(1.0);
metrics::gauge!("mosaik.groups.bonds.active", &labels).set(0.0);
let worker = Worker::<S, M>::spawn(
self.groups,
config,
self.storage,
self.state_machine,
);
place.insert(worker).public_handle::<M>(&self.groups.active)
}
}
}
}
#[derive(
Builder, Debug, Clone, Hash, PartialEq, Serialize, Deserialize, Eq,
)]
#[builder(pattern = "owned", setter(prefix = "with"), derive(Debug, Clone))]
#[builder_struct_attr(doc(hidden))]
pub struct ConsensusConfig {
/// The interval at which heartbeat messages are sent over established
/// bonds to peers in the group to ensure liveness of the connection.
///
/// This value is used when deriving the group id and must be identical
/// across all members of the group.
#[builder(default = "Duration::from_millis(500)")]
pub heartbeat_interval: Duration,
/// The maximum jitter to apply to the heartbeat interval to avoid
/// an avalanche of heartbeats being sent at the same time.
///
/// This value is used when deriving the group id and must be identical
/// across all members of the group.
///
/// heartbeats are sent at intervals of
/// `heartbeat_interval - rand(0, heartbeat_jitter)`.
#[builder(default = "Duration::from_millis(150)")]
pub heartbeat_jitter: Duration,
/// The maximum number of consecutive missed heartbeats before considering
/// the bond connection to be dead and closing it.
///
/// This value is used when deriving the group id and must be identical
/// across all members of the group.
#[builder(default = "10")]
pub max_missed_heartbeats: u32,
/// The election timeout duration for Raft leader elections within the
/// group. This is the duration that a follower will wait without hearing
/// from the leader before starting a new election. See the Raft paper
/// section 5.2 for more details on the role of election timeouts in the Raft
/// algorithm.
///
/// Nodes in the same group might have different preferences for the election
/// timeout duration based on their role in the system. This affects the
/// preference of the node to be a leader or a follower, and can be used to
/// optimize the behavior of the group. For example, a node that prefers to
/// be a follower can set a longer election timeout to reduce the chances of
/// it becoming a leader, while a node that prefers to be a leader can set a
/// shorter election timeout to increase the chances of it becoming a
/// leader.
///
/// This value must be larger than the heartbeat interval.
#[builder(default = "Duration::from_secs(2)")]
pub election_timeout: Duration,
/// The maximum jitter to apply to the election timeout to avoid
/// split votes during leader elections. See the Raft paper section 5.2 for
/// more details on the role of election timeouts and randomization.
#[builder(default = "Duration::from_millis(500)")]
pub election_timeout_jitter: Duration,
/// The duration to wait during bootstrap before starting elections.
///
/// This is the time given to allow nodes to discover other members of the
/// group on the network before beginning leader elections process and
/// potentially self-nomination.
#[builder(default = "Duration::from_secs(3)")]
pub bootstrap_delay: Duration,
/// The timeout duration for forwarding a command to the current leader and
/// receiving an acknowledgment with the assigned log index.
#[builder(default = "Duration::from_secs(2)")]
pub forward_timeout: Duration,
/// The timeout duration for the leader to respond to state machine queries
/// to a follower querying the state with strong consistency.
#[builder(default = "Duration::from_secs(2)")]
pub query_timeout: Duration,
}
impl Default for ConsensusConfig {
fn default() -> Self {
ConsensusConfigBuilder::default().build().unwrap()
}
}
impl ConsensusConfig {
/// Creates a new intervals config builder with default values.
pub fn builder() -> ConsensusConfigBuilder {
ConsensusConfigBuilder::default()
}
/// Returns a randomized election timeout duration.
///
/// Randomized timeouts are essential for Raft to minimize the chances of
/// split votes during leader elections.
pub(crate) fn election_timeout(&self) -> Duration {
let base = self.election_timeout;
let jitter = self.election_timeout_jitter;
let range_start = base;
let range_end = base + jitter;
rand::random_range(range_start..range_end)
}
}
/// Internal API
impl ConsensusConfig {
pub(crate) fn digest(&self) -> Digest {
Digest::from_parts(&[
self.heartbeat_interval.as_millis().to_le_bytes(),
self.heartbeat_jitter.as_millis().to_le_bytes(),
u128::from(self.max_missed_heartbeats).to_le_bytes(),
])
}
}