mosaik 0.3.17

A Rust runtime for building self-organizing, leaderless distributed systems.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
use {
	crate::{
		groups::{Counter, CounterCommand, CounterValueQuery},
		utils::{discover_all, timeout_after, timeout_s},
	},
	mosaik::{
		groups::IndexRange,
		primitives::{Pretty, Short},
		*,
	},
	std::time::Instant,
	tokio::{join, task::JoinSet},
};

/// This test verifies that commands can be executed on leaders and followers,
/// then the state queried with weak consistency when followers do not need
/// to catch up with any log entries.
#[tokio::test]
async fn no_catchup_weak_query() -> anyhow::Result<()> {
	let network_id = NetworkId::random();
	let group_key = GroupKey::random();

	let n0 = Network::new(network_id).await?;
	let g0 = n0
		.groups()
		.with_key(group_key)
		.with_state_machine(Counter::default())
		.join();

	let timeout = 2
		* (g0.config().consensus().bootstrap_delay
			+ g0.config().consensus().election_timeout
			+ g0.config().consensus().election_timeout_jitter);

	// wait for n0 to become online by electing itself as leader and being ready
	// to accept commands
	timeout_after(timeout, g0.when().online()).await?;
	assert_eq!(g0.leader(), Some(n0.local().id()));
	assert_eq!(g0.committed(), 0);
	tracing::info!("g0 is online");

	// start a new node and have it join the group.
	// Since there are no log entries to catch up with, this node should be online
	// immediately after joining the group.
	let n1 = Network::new(network_id).await?;
	let n2 = Network::new(network_id).await?;
	discover_all([&n0, &n1, &n2]).await?;

	let g1 = n1
		.groups()
		.with_key(group_key)
		.with_state_machine(Counter::default())
		.join();

	let g2 = n2
		.groups()
		.with_key(group_key)
		.with_state_machine(Counter::default())
		.join();

	// make sure that they are in the same group
	assert_eq!(g0.id(), g1.id());
	assert_eq!(g0.id(), g2.id());

	// wait for g1 to recognize the existing leader and catch up with the log
	timeout_after(timeout, g1.when().online()).await?;
	assert_eq!(g1.leader(), Some(n0.local().id()));
	assert_eq!(g1.committed(), 0);
	tracing::info!("g1 is online and is following g0 as leader");

	// wait for g2 to recognize the existing leader and catch up with the log
	timeout_after(timeout, g2.when().online()).await?;
	assert_eq!(g2.leader(), Some(n0.local().id()));
	assert_eq!(g2.committed(), 0);
	tracing::info!("g2 is online and is following g0 as leader");

	// execute two commands on the leader and wait for them to be committed to the
	// state machine.
	timeout_s(2, g0.execute(CounterCommand::Increment(3))).await??;
	timeout_s(2, g0.execute(CounterCommand::Increment(4))).await??;

	let index = g0.committed();
	tracing::info!("leader committed to index {index}");
	assert_eq!(index, 2);

	// verify that they have been applied correctly
	let value = g0.query(CounterValueQuery, Consistency::Strong).await?;
	tracing::info!("counter value on leader: {value}");
	assert_eq!(value, 7);

	// wait for the follower g1 to learn from the leader about the new committed
	// index
	let index = timeout_s(2, g1.when().committed().reaches(2)).await?;
	tracing::info!("follower g1 knows that index {index} is committed");
	assert_eq!(index, 2);

	// wait for the follower g2 to learn from the leader about the new committed
	// index
	let index = timeout_s(2, g2.when().committed().reaches(2)).await?;
	tracing::info!("follower g2 knows that index {index} is committed");
	assert_eq!(index, 2);

	// verify that the new node has the correct state after synchronizing the log
	let value = g1.query(CounterValueQuery, Consistency::Weak).await?;
	tracing::info!("counter value on follower g1 (weak): {value}");
	assert_eq!(value, 7);

	// verify that the new node has the correct state after synchronizing the log
	let value = g2.query(CounterValueQuery, Consistency::Weak).await?;
	tracing::info!("counter value on follower g2 (weak): {value}");
	assert_eq!(value, 7);

	// follower executes a command, should resolve after its replicated and
	// committed to the state machine by the group.
	let index = g1.execute(CounterCommand::Decrement(2)).await?;
	tracing::info!("follower g1 command committed at index {index}");
	assert_eq!(index, 3);

	// verify that the command has been applied correctly on both nodes after
	// being replicated to the leader and then applied to the state machine on
	// both nodes.
	let value_n0 = g0.query(CounterValueQuery, Consistency::Weak).await?;
	let value_n1 = g1.query(CounterValueQuery, Consistency::Weak).await?;
	let value_n2 = g2.query(CounterValueQuery, Consistency::Weak).await?;

	assert_eq!(value_n0, 5);
	assert_eq!(value_n1, 5);
	assert_eq!(value_n2, 5);

	tracing::info!("follower g1 command replicated and committed on all nodes");

	// kill the current leader n0, and wait for followers to reorganize and elect
	// a new leader.
	let g1_elected_fut = g1.when().leader_changed();
	let g2_elected_fut = g2.when().leader_changed();

	drop(n0); // kill the leader
	tracing::info!("killed the leader n0");

	let (g1_leader, g2_leader) = join!(g1_elected_fut, g2_elected_fut);
	assert_eq!(g1_leader, g2_leader); // they should elect the same new leader
	tracing::info!("new leader elected: {}", Short(g1_leader));

	// after new leader is elected, both nodes will execute a series of commands
	let g1_pos = g1
		.execute_many([
			CounterCommand::Increment(10), //
			CounterCommand::Decrement(2),
			CounterCommand::Increment(3),
		])
		.await?;
	assert_eq!(g1_pos.end(), 6);
	tracing::info!(
		"g1 executed 3 commands committed at index {}",
		Pretty(&g1_pos)
	);

	let g2_pos = g2
		.execute_many([
			CounterCommand::Increment(20), //
			CounterCommand::Decrement(4),
			CounterCommand::Decrement(1),
		])
		.await?;
	assert_eq!(g2_pos.end(), 9);
	assert_eq!(g2_pos, IndexRange::new(7.into(), 9.into()));

	tracing::info!(
		"g2 executed 3 commands committed at range {}",
		Pretty(&g2_pos)
	);

	// wait for both nodes to learn that index 9 is committed
	timeout_s(2, g1.when().committed().reaches(g2_pos)).await?;

	assert_eq!(g1.committed(), 9);
	assert_eq!(g2.committed(), 9);

	let value_n1 = g1.query(CounterValueQuery, Consistency::Weak).await?;
	let value_n2 = g2.query(CounterValueQuery, Consistency::Weak).await?;

	assert_eq!(value_n1, 31);
	assert_eq!(value_n2, 31);

	tracing::info!(
		"query result is correct on g1 and g2: {value_n1}=={value_n2}"
	);

	Ok(())
}

/// This test verifies that commands can be executed on leaders and followers,
/// then the state queried with weak consistency when followers do not need
/// to catch up with any log entries.
#[tokio::test]
async fn no_catchup_strong_query() -> anyhow::Result<()> {
	let network_id = NetworkId::random();
	let group_key = GroupKey::random();

	let n0 = Network::new(network_id).await?;
	let g0 = n0
		.groups()
		.with_key(group_key)
		.with_state_machine(Counter::default())
		.join();

	let timeout = 2
		* (g0.config().consensus().bootstrap_delay
			+ g0.config().consensus().election_timeout
			+ g0.config().consensus().election_timeout_jitter);

	// wait for n0 to become online by electing itself as leader and being ready
	// to accept commands
	timeout_after(timeout, g0.when().online()).await?;
	assert_eq!(g0.leader(), Some(n0.local().id()));
	assert_eq!(g0.committed(), 0);
	tracing::info!("g0 is online");

	// start a new node and have it join the group.
	// Since there are no log entries to catch up with, this node should be online
	// immediately after joining the group.
	let n1 = Network::new(network_id).await?;
	let n2 = Network::new(network_id).await?;
	discover_all([&n0, &n1, &n2]).await?;

	let g1 = n1
		.groups()
		.with_key(group_key)
		.with_state_machine(Counter::default())
		.join();

	let g2 = n2
		.groups()
		.with_key(group_key)
		.with_state_machine(Counter::default())
		.join();

	// make sure that they are in the same group
	assert_eq!(g0.id(), g1.id());
	assert_eq!(g0.id(), g2.id());

	// wait for g1 to recognize the existing leader and catch up with the log
	timeout_after(timeout, g1.when().online()).await?;
	assert_eq!(g1.leader(), Some(n0.local().id()));
	assert_eq!(g1.committed(), 0);
	tracing::info!("g1 is online and is following g0 as leader");

	// wait for g2 to recognize the existing leader and catch up with the log
	timeout_after(timeout, g2.when().online()).await?;
	assert_eq!(g2.leader(), Some(n0.local().id()));
	assert_eq!(g2.committed(), 0);
	tracing::info!("g2 is online and is following g0 as leader");

	// execute two commands on the leader and wait for them to be committed to the
	// state machine.
	timeout_s(2, g0.execute(CounterCommand::Increment(3))).await??;
	timeout_s(2, g0.execute(CounterCommand::Increment(4))).await??;
	timeout_s(2, g1.when().committed().reaches(2)).await?;

	let index = g0.committed();
	tracing::info!("leader committed to index {index}");
	assert_eq!(index, 2);

	// verify that they have been applied correctly
	let start = Instant::now();
	let value = g0.query(CounterValueQuery, Strong).await?;
	let dur_strong_leader = start.elapsed();

	tracing::info!("counter value on leader: {value}");
	assert_eq!(value, 7);
	assert_eq!(value.state_position(), 2);

	let start = Instant::now();
	let value = g1.query(CounterValueQuery, Weak).await?;
	let dur_weak_follower = start.elapsed();
	tracing::info!("counter value on follower g1 (weak): {value}");
	assert_eq!(value, 7);
	assert_eq!(value.state_position(), 2);

	let start = Instant::now();
	let value = g1.query(CounterValueQuery, Strong).await?;
	let dur_strong_follower = start.elapsed();
	tracing::info!("counter value on follower g1 (strong): {value}");
	assert_eq!(value, 7);
	assert_eq!(value.state_position(), 2);

	// this could be a bit flaky due to timing variations in the test environment,
	// but in general we should expect the strong query on the follower to take
	// longer than the strong query on the leader due to the additional forwarding
	// and waiting for response, and also to take longer than the weak query on
	// the follower.

	assert!(
		dur_strong_follower > dur_strong_leader,
		"strong query on follower should take longer than on leader due to \
		 forwarding and waiting for response {dur_strong_follower:?} vs \
		 {dur_strong_leader:?}"
	);

	assert!(
		dur_strong_follower > dur_weak_follower,
		"strong query on follower should take longer than weak query on follower \
		 due to forwarding and waiting for response {dur_strong_follower:?} vs \
		 {dur_weak_follower:?}"
	);

	Ok(())
}

/// Verifies that `execute` returns a `Send + Sync + 'static` future by
/// spawning multiple concurrent commands onto a `JoinSet` from both the
/// leader and a follower.
#[tokio::test]
async fn execute_is_send_sync_via_joinset() -> anyhow::Result<()> {
	let network_id = NetworkId::random();
	let group_key = GroupKey::random();

	let n0 = Network::new(network_id).await?;
	let n1 = Network::new(network_id).await?;
	discover_all([&n0, &n1]).await?;

	let g0 = n0
		.groups()
		.with_key(group_key)
		.with_state_machine(Counter::default())
		.join();

	let g1 = n1
		.groups()
		.with_key(group_key)
		.with_state_machine(Counter::default())
		.join();

	let timeout = 2
		* (g0.config().consensus().bootstrap_delay
			+ g0.config().consensus().election_timeout
			+ g0.config().consensus().election_timeout_jitter);

	timeout_after(timeout, g0.when().online()).await?;
	timeout_after(timeout, g1.when().online()).await?;

	// Spawn multiple execute() calls into a JoinSet — this only compiles if
	// the returned future is Send + 'static.
	let mut set = JoinSet::new();
	for i in 0..5 {
		set.spawn(g0.execute(CounterCommand::Increment(i + 1)));
	}
	// Also spawn from the follower to exercise forwarding.
	for i in 0..5 {
		set.spawn(g1.execute(CounterCommand::Increment(i + 10)));
	}

	// Collect all results — every command should succeed.
	while let Some(result) = set.join_next().await {
		let index = result??;
		tracing::info!("command committed at index {index}");
	}

	// All 10 commands should be committed: 1+2+3+4+5 + 10+11+12+13+14 = 75
	timeout_s(2, g0.when().committed().reaches(10)).await?;
	timeout_s(2, g1.when().committed().reaches(10)).await?;

	let value = g0.query(CounterValueQuery, Consistency::Weak).await?;
	tracing::info!("final counter value: {value}");
	assert_eq!(value, 75);

	Ok(())
}

/// Verifies that `execute_many` returns a `Send + Sync + 'static` future
/// by spawning concurrent batch commands onto a `JoinSet`.
#[tokio::test]
async fn execute_many_is_send_sync_via_joinset() -> anyhow::Result<()> {
	let network_id = NetworkId::random();
	let group_key = GroupKey::random();

	let n0 = Network::new(network_id).await?;
	let n1 = Network::new(network_id).await?;
	discover_all([&n0, &n1]).await?;

	let g0 = n0
		.groups()
		.with_key(group_key)
		.with_state_machine(Counter::default())
		.join();

	let g1 = n1
		.groups()
		.with_key(group_key)
		.with_state_machine(Counter::default())
		.join();

	let timeout = 2
		* (g0.config().consensus().bootstrap_delay
			+ g0.config().consensus().election_timeout
			+ g0.config().consensus().election_timeout_jitter);

	timeout_after(timeout, g0.when().online()).await?;
	timeout_after(timeout, g1.when().online()).await?;

	// Spawn batched execute_many() calls into a JoinSet from both nodes.
	let mut set = JoinSet::new();

	// Leader spawns two batches
	set.spawn(g0.execute_many([
		CounterCommand::Increment(1),
		CounterCommand::Increment(2),
		CounterCommand::Increment(3),
	]));
	set.spawn(g0.execute_many([
		CounterCommand::Decrement(1),
		CounterCommand::Increment(5),
	]));

	// Follower spawns two batches (exercising forwarding)
	set.spawn(g1.execute_many([
		CounterCommand::Increment(10),
		CounterCommand::Increment(20),
	]));
	set.spawn(g1.execute_many([
		CounterCommand::Decrement(5),
		CounterCommand::Increment(15),
	]));

	// Collect all results — every batch should succeed.
	while let Some(result) = set.join_next().await {
		let range = result??;
		tracing::info!("batch committed at range {}", Pretty(&range));
	}

	// Wait for both nodes to converge
	timeout_s(2, g0.when().committed().reaches(9)).await?;
	timeout_s(2, g1.when().committed().reaches(9)).await?;

	// Expected: 1+2+3 -1+5 +10+20 -5+15 = 50
	let value = g0.query(CounterValueQuery, Consistency::Weak).await?;
	tracing::info!("final counter value: {value}");
	assert_eq!(value, 50);

	Ok(())
}

/// Verifies that `query` returns a `Send + Sync + 'static` future by
/// spawning concurrent queries onto a `JoinSet` from both the leader and
/// a follower, using both weak and strong consistency.
#[tokio::test]
async fn query_is_send_sync_via_joinset() -> anyhow::Result<()> {
	let network_id = NetworkId::random();
	let group_key = GroupKey::random();

	let n0 = Network::new(network_id).await?;
	let n1 = Network::new(network_id).await?;
	discover_all([&n0, &n1]).await?;

	let g0 = n0
		.groups()
		.with_key(group_key)
		.with_state_machine(Counter::default())
		.join();

	let g1 = n1
		.groups()
		.with_key(group_key)
		.with_state_machine(Counter::default())
		.join();

	let timeout = 2
		* (g0.config().consensus().bootstrap_delay
			+ g0.config().consensus().election_timeout
			+ g0.config().consensus().election_timeout_jitter);

	timeout_after(timeout, g0.when().online()).await?;
	timeout_after(timeout, g1.when().online()).await?;

	// Execute some commands so the state machine has a non-zero value.
	timeout_s(2, g0.execute(CounterCommand::Increment(10))).await??;
	timeout_s(2, g0.execute(CounterCommand::Increment(20))).await??;
	timeout_s(2, g1.when().committed().reaches(2)).await?;

	// Spawn concurrent queries onto a JoinSet — this only compiles if the
	// returned future is Send + 'static.
	let mut set = JoinSet::new();

	// Weak queries from leader and follower
	for _ in 0..3 {
		set.spawn(g0.query(CounterValueQuery, Consistency::Weak));
		set.spawn(g1.query(CounterValueQuery, Consistency::Weak));
	}

	// Strong queries from leader and follower
	for _ in 0..3 {
		set.spawn(g0.query(CounterValueQuery, Consistency::Strong));
		set.spawn(g1.query(CounterValueQuery, Consistency::Strong));
	}

	// All queries should return the same value.
	while let Some(result) = set.join_next().await {
		let value = result??;
		tracing::info!("query result: {value}");
		assert_eq!(value, 30);
	}

	Ok(())
}