mosaik 0.3.17

A Rust runtime for building self-organizing, leaderless distributed systems.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
use {
	crate::{
		PeerId,
		groups::{
			LeadershipPreference,
			StateMachine,
			Storage,
			Term,
			raft::{
				Message,
				candidate::Candidate,
				follower::Follower,
				leader::Leader,
				protocol::{AppendEntries, RequestVoteResponse, Vote},
				shared::Shared,
			},
		},
		primitives::Short,
	},
	core::{
		ops::ControlFlow,
		task::{Context, Poll},
	},
	derive_more::{Display, From},
};

/// Raft node role - each node is always in one of these states.
///
/// Depending on the currently assumed role, protocol messages are handled
/// differently and certain actions are taken (e.g., starting elections,
/// sending heartbeats, etc.).
///
/// Messages that are common to all roles (e.g., stepping down on higher term,
/// voting for candidates, etc.) are handled at the `Role` level, and messages
/// that are specific to each role are forwarded to the role-specific message
#[derive(Display, From)]
#[allow(clippy::large_enum_variant)]
pub enum Role<M: StateMachine> {
	/// Passive state: responds to messages from candidates and leaders.
	/// If election timeout elapses without receiving `AppendEntries` from
	/// current leader or granting vote to candidate, converts to candidate.
	///
	/// Followers may serve read-only requests depending on the configured
	/// read consistency level. All log-mutating requests must be forwarded to
	/// the current leader.
	#[display("Follower")]
	Follower(Follower<M>),

	/// Active state during elections: increments term, votes for self,
	/// sends `RequestVote` RPCs to all other servers, and waits for votes.
	///
	/// Nodes go into this state if they have not received `AppendEntries`
	/// messages from a leader within the election timeout.
	#[display("Candidate")]
	Candidate(Candidate<M>),

	/// Active state as leader: handles log-mutating requests from clients,
	/// replicates log entries, and sends periodic heartbeats to followers.
	#[display("Leader")]
	Leader(Leader<M>),
}

impl<M: StateMachine> Role<M> {
	pub fn new<S: Storage<M::Command>>(shared: &Shared<S, M>) -> Self {
		Self::Follower(Follower::new(Term::zero(), None, shared))
	}

	/// Drives the role-specific periodic actions (e.g., elections, heartbeats).
	pub fn poll<S: Storage<M::Command>>(
		&mut self,
		cx: &mut Context<'_>,
		shared: &mut Shared<S, M>,
	) -> Poll<()> {
		let next_step = match self {
			Self::Follower(follower) => follower.poll(cx, shared),
			Self::Candidate(candidate) => candidate.poll(cx, shared),
			Self::Leader(leader) => leader.poll(cx, shared),
		};

		let readiness = match next_step {
			Poll::Ready(next) => {
				if let ControlFlow::Break(next_role) = next {
					// transition to the next role if the current role's tick indicates a
					// role change (e.g., election timeout elapsed, new leader elected,
					// etc.)
					*self = next_role;
				}
				Poll::Ready(())
			}
			Poll::Pending => {
				shared.add_waker(cx.waker().clone());
				Poll::Pending
			}
		};

		// drive the state sync provider on every tick of the loop
		// at lower priority than the primary role-specific actions.
		if shared.poll_state_sync_provider(cx).is_ready() {
			return Poll::Ready(());
		}

		readiness
	}

	/// Handles incoming consensus protocol messages based on the current role.
	/// Implements behaviors common to all roles, such as stepping down on
	/// receiving messages with higher terms.
	pub fn receive_protocol_message<S: Storage<M::Command>>(
		&mut self,
		message: Message<M>,
		sender: PeerId,
		shared: &mut Shared<S, M>,
	) {
		if let Some(message_term) = message.term()
			&& message_term < self.term()
		{
			tracing::trace!(
				local_term = %self.term(),
				message_term = %message_term,
				group = %Short(shared.group_id()),
				network = %Short(shared.network_id()),
				sender = %Short(sender),
				message = %message,
				local_role = %self,
				"ignoring stale raft message"
			);
			return;
		}

		// Handle state sync messages that are intended for the state
		// sync provider.
		let Err(message) = Self::maybe_state_sync(message, sender, shared) else {
			// not a state sync or not handled at the provider level, forward to
			// role-specific handlers.
			return;
		};

		// Any message with a higher term should trigger an immediate step down to
		// follower state with the new term.
		self.maybe_step_down(&message, shared);

		// Handle `RequestVote` messages and cast votes if applicable. This is
		// common to all roles, as followers, candidates, and leaders can all
		// receive `RequestVote` messages. There is no more role-specific handling
		// for this message type.
		if self.maybe_cast_vote(&message, sender, shared) {
			// if the message was a `RequestVote` and we handled it by casting a
			// vote, then we don't need to forward it to the role-specific message
			// handlers.
			return;
		}

		// forward the message to the role-specific message handler for any messages
		// that are not handled at the role level.
		let result = match self {
			Self::Follower(follower) => {
				follower.receive_protocol_message(message, sender, shared)
			}
			Self::Candidate(candidate) => {
				candidate.receive_protocol_message(message, sender, shared)
			}
			Self::Leader(leader) => {
				leader.receive_protocol_message(message, sender, shared)
			}
		};

		match result {
			Ok(()) => {}
			Err(RoleHandlerError::Unexpected(message)) => {
				tracing::trace!(
					local_term = %self.term(),
					message_term = ?message.term(),
					group = %Short(shared.group_id()),
					network = %Short(shared.network_id()),
					sender = %Short(sender),
					message = %message,
					"unexpected message type received as {self}",
				);
			}
			Err(RoleHandlerError::StepDown(request)) => {
				// this happens when a candidate receives an `AppendEntries` message
				// from a leader with the same or higher term, step down to follower and
				// process the `AppendEntries` message as a follower after stepping
				// down.
				*self = Follower::<M>::new(
					request.term, //
					Some(request.leader),
					shared,
				)
				.into();
				shared.update_leader(Some(request.leader));

				tracing::debug!(
					leader = %Short(request.leader),
					term = %self.term(),
					group = %Short(shared.group_id()),
					network = %Short(shared.network_id()),
					"stepping down and following",
				);

				// process the incoming `AppendEntries` message as a follower after
				// stepping down.
				self.receive_protocol_message(
					Message::AppendEntries(request),
					sender,
					shared,
				);
			}
			Err(RoleHandlerError::RivalLeader(request)) => {
				// this happens when a leader receives an `AppendEntries` message from
				// another leader with the same term, which indicates a network
				// partition with two rival leaders. Trigger new elections with a higher
				// term.
				tracing::warn!(
					term = %request.term,
					other_leader = %Short(request.leader),
					other_leader_log = %request.prev_log_position,
					local_log = %shared.storage.last(),
					group = %Short(shared.group_id()),
					network = %Short(shared.network_id()),
					"rival group leader detected",
				);

				*self = Candidate::<M>::new(self.term().next(), shared).into();
				shared.update_leader(None);
				shared.wake_all();
			}
		}
	}

	/// Checks all incoming messages for a higher term and steps down to follower
	/// if necessary. Returns `true` if the node stepped down to follower state,
	/// otherwise `false`.
	fn maybe_step_down<S: Storage<M::Command>>(
		&mut self,
		message: &Message<M>,
		shared: &Shared<S, M>,
	) {
		let Some(message_term) = message.term() else {
			// If the message does not carry a term, it cannot trigger a step down.
			return;
		};

		assert!(message_term >= self.term());

		if message_term > self.term() {
			if let Some(leader) = message.leader() {
				tracing::debug!(
					leader = %Short(leader),
					old_term = %self.term(),
					new_term = %message_term,
					group = %Short(shared.group_id()),
					network = %Short(shared.network_id()),
					"following",
				);
			} else {
				tracing::debug!(
					group = %Short(shared.group_id()),
					network = %Short(shared.network_id()),
					old_term = %self.term(),
					new_term = %message_term,
					"stepping down to follower",
				);
			}

			// If the incoming message has a higher term, we must step down to
			// follower state and follow the new leader (if provided), and process
			// the incoming message as a follower.
			*self = Follower::<M>::new::<S>(
				message_term, //
				message.leader(),
				shared,
			)
			.into();

			// notify status listeners that we have a new leader.
			shared.update_leader(message.leader());
		}
	}

	/// Handles incoming `RequestVote` messages by deciding whether to cast a vote
	/// for the candidate based on the Raft voting rules. This behavior is common
	/// to all roles, as followers, candidates, and leaders can all receive
	/// `RequestVote` messages and may need to cast votes for candidates with
	/// higher terms.
	///
	/// returns true if the message was handled (i.e., it was a `RequestVote`
	/// message and should not be forwarded to other roles), otherwise false and
	/// the message will be forwarded to the role-specific message handler.
	fn maybe_cast_vote<S: Storage<M::Command>>(
		&mut self,
		message: &Message<M>,
		sender: PeerId,
		shared: &mut Shared<S, M>,
	) -> bool {
		let Message::RequestVote(request) = message else {
			return false;
		};

		// this should always hold because messages with lower terms are filtered
		// out in the `receive` method before reaching this point.
		assert!(request.term >= self.term());

		let local_cursor = shared.storage.last();

		tracing::debug!(
			candidate = %Short(request.candidate),
			term = %request.term,
			candidate_log = %request.log_position,
			local_log = %local_cursor,
			group = %Short(shared.group_id()),
			network = %Short(shared.network_id()),
			"new leader elections started by",
		);

		let bonds = shared.group.bonds.clone();
		let vote_with = |vote: Vote| {
			bonds
				.send_raft_to(
					Message::RequestVoteResponse(RequestVoteResponse {
						vote,
						term: request.term,
					}),
					sender,
				)
				.expect("infallible serialization");
		};

		if !shared.can_vote(request.term, request.candidate) {
			// We have already voted for another candidate in the same term
			vote_with(Vote::Denied);

			tracing::debug!(
				candidate = %Short(request.candidate),
				term = %request.term,
				group = %Short(shared.group_id()),
				network = %Short(shared.network_id()),
				"denying vote, already voted in this term",
			);
			return true;
		}

		if request.log_position.is_behind(&local_cursor) {
			// The candidate's log is not as up-to-date as ours, deny.
			vote_with(Vote::Denied);

			tracing::debug!(
				candidate = %Short(request.candidate),
				term = %request.term,
				our_log = %local_cursor,
				candidate_log = %request.log_position,
				group = %Short(shared.group_id()),
				network = %Short(shared.network_id()),
				"denying vote because our log is ahead",
			);
			return true;
		}

		// check if this node is behind the candidate's log
		if local_cursor.is_behind(&request.log_position) {
			// We are behind the candidate — abstain rather than grant or deny,
			// so we don't inflate the voting committee with lagging nodes but also
			// don't object to the candidate winning the election and becoming leader.
			vote_with(Vote::Abstained);

			tracing::debug!(
				candidate = %Short(request.candidate),
				term = %request.term,
				candidate_log = %request.log_position,
				local_log = %local_cursor,
				group = %Short(shared.group_id()),
				network = %Short(shared.network_id()),
				"abstained from voting because we are behind their log",
			);
		} else {
			// If we reach this point, we can vote for the candidate. We record our
			// vote to prevent us from voting for multiple candidates in the same
			// term and we send a positive `RequestVoteResponse` back to the
			// candidate.
			shared.save_vote(request.term, sender);

			// If we are fully caught up with the candidate's log we can
			// participate in this election. Observers abstain so they don't
			// inflate the election quorum — only nodes that can become
			// leader (and thus hold committed entries) should count.
			let is_observer = shared.state_machine.leadership_preference()
				== LeadershipPreference::Observer;

			if is_observer {
				vote_with(Vote::Abstained);
			} else {
				vote_with(Vote::Granted);
			}

			tracing::debug!(
				candidate = %Short(request.candidate),
				term = %request.term,
				candidate_log = %request.log_position,
				local_log = %local_cursor,
				group = %Short(shared.group_id()),
				network = %Short(shared.network_id()),
				"{}",
				if is_observer {
					"abstained from voting as observer"
				} else {
					"granting vote to candidate"
				},
			);
		}

		if let Self::Follower(follower) = self {
			// Raft 5.2: If election timeout elapses without receiving AppendEntries
			// RPC from current leader or granting vote to candidate.
			follower.reset_election_timeout(shared);
		}

		true
	}

	/// Handles incoming state sync messages that are part of the follower
	/// catch-up process. If the message is consumed at the state sync provider
	/// level, it will not be forwarded to the role-specific message handlers.
	fn maybe_state_sync<S: Storage<M::Command>>(
		message: Message<M>,
		sender: PeerId,
		shared: &mut Shared<S, M>,
	) -> Result<(), Message<M>> {
		let Message::StateSync(message) = message else {
			return Err(message);
		};

		shared
			.sync_provider_receive(message, sender)
			.map_err(Message::StateSync)
	}
}

impl<M: StateMachine> Role<M> {
	pub const fn term(&self) -> Term {
		match self {
			Self::Follower(follower) => follower.term(),
			Self::Candidate(candidate) => candidate.term(),
			Self::Leader(leader) => leader.term(),
		}
	}
}

/// Errors that occur when role-specific message handler can't handle a message
pub(super) enum RoleHandlerError<M: StateMachine> {
	/// Received a message that is not expected in the current role
	Unexpected(Message<M>),

	/// Received a message that made the current role step down to follower state,
	/// e.g. when a candidate receives an `AppendEntries` message from a leader
	/// with the same or higher term.
	StepDown(AppendEntries<M::Command>),

	// if we're a leader and we're receiving a message with the same term from
	// another leader, this means that the group has two rival leaders, this
	// indicates a network partition. Trigger new elections with a higher term.
	RivalLeader(AppendEntries<M::Command>),
}