use {
crate::{
groups::{Counter, CounterCommand, CounterValueQuery},
utils::{discover_all, timeout_after, timeout_s},
},
mosaik::{
groups::IndexRange,
primitives::{Pretty, Short},
*,
},
std::time::Instant,
tokio::{join, task::JoinSet},
};
#[tokio::test]
async fn no_catchup_weak_query() -> anyhow::Result<()> {
let network_id = NetworkId::random();
let group_key = GroupKey::random();
let n0 = Network::new(network_id).await?;
let g0 = n0
.groups()
.with_key(group_key)
.with_state_machine(Counter::default())
.join();
let timeout = 2
* (g0.config().consensus().bootstrap_delay
+ g0.config().consensus().election_timeout
+ g0.config().consensus().election_timeout_jitter);
timeout_after(timeout, g0.when().online()).await?;
assert_eq!(g0.leader(), Some(n0.local().id()));
assert_eq!(g0.committed(), 0);
tracing::info!("g0 is online");
let n1 = Network::new(network_id).await?;
let n2 = Network::new(network_id).await?;
discover_all([&n0, &n1, &n2]).await?;
let g1 = n1
.groups()
.with_key(group_key)
.with_state_machine(Counter::default())
.join();
let g2 = n2
.groups()
.with_key(group_key)
.with_state_machine(Counter::default())
.join();
assert_eq!(g0.id(), g1.id());
assert_eq!(g0.id(), g2.id());
timeout_after(timeout, g1.when().online()).await?;
assert_eq!(g1.leader(), Some(n0.local().id()));
assert_eq!(g1.committed(), 0);
tracing::info!("g1 is online and is following g0 as leader");
timeout_after(timeout, g2.when().online()).await?;
assert_eq!(g2.leader(), Some(n0.local().id()));
assert_eq!(g2.committed(), 0);
tracing::info!("g2 is online and is following g0 as leader");
timeout_s(2, g0.execute(CounterCommand::Increment(3))).await??;
timeout_s(2, g0.execute(CounterCommand::Increment(4))).await??;
let index = g0.committed();
tracing::info!("leader committed to index {index}");
assert_eq!(index, 2);
let value = g0.query(CounterValueQuery, Consistency::Strong).await?;
tracing::info!("counter value on leader: {value}");
assert_eq!(value, 7);
let index = timeout_s(2, g1.when().committed().reaches(2)).await?;
tracing::info!("follower g1 knows that index {index} is committed");
assert_eq!(index, 2);
let index = timeout_s(2, g2.when().committed().reaches(2)).await?;
tracing::info!("follower g2 knows that index {index} is committed");
assert_eq!(index, 2);
let value = g1.query(CounterValueQuery, Consistency::Weak).await?;
tracing::info!("counter value on follower g1 (weak): {value}");
assert_eq!(value, 7);
let value = g2.query(CounterValueQuery, Consistency::Weak).await?;
tracing::info!("counter value on follower g2 (weak): {value}");
assert_eq!(value, 7);
let index = g1.execute(CounterCommand::Decrement(2)).await?;
tracing::info!("follower g1 command committed at index {index}");
assert_eq!(index, 3);
let value_n0 = g0.query(CounterValueQuery, Consistency::Weak).await?;
let value_n1 = g1.query(CounterValueQuery, Consistency::Weak).await?;
let value_n2 = g2.query(CounterValueQuery, Consistency::Weak).await?;
assert_eq!(value_n0, 5);
assert_eq!(value_n1, 5);
assert_eq!(value_n2, 5);
tracing::info!("follower g1 command replicated and committed on all nodes");
let g1_elected_fut = g1.when().leader_changed();
let g2_elected_fut = g2.when().leader_changed();
drop(n0); tracing::info!("killed the leader n0");
let (g1_leader, g2_leader) = join!(g1_elected_fut, g2_elected_fut);
assert_eq!(g1_leader, g2_leader); tracing::info!("new leader elected: {}", Short(g1_leader));
let g1_pos = g1
.execute_many([
CounterCommand::Increment(10), CounterCommand::Decrement(2),
CounterCommand::Increment(3),
])
.await?;
assert_eq!(g1_pos.end(), 6);
tracing::info!(
"g1 executed 3 commands committed at index {}",
Pretty(&g1_pos)
);
let g2_pos = g2
.execute_many([
CounterCommand::Increment(20), CounterCommand::Decrement(4),
CounterCommand::Decrement(1),
])
.await?;
assert_eq!(g2_pos.end(), 9);
assert_eq!(g2_pos, IndexRange::new(7.into(), 9.into()));
tracing::info!(
"g2 executed 3 commands committed at range {}",
Pretty(&g2_pos)
);
timeout_s(2, g1.when().committed().reaches(g2_pos)).await?;
assert_eq!(g1.committed(), 9);
assert_eq!(g2.committed(), 9);
let value_n1 = g1.query(CounterValueQuery, Consistency::Weak).await?;
let value_n2 = g2.query(CounterValueQuery, Consistency::Weak).await?;
assert_eq!(value_n1, 31);
assert_eq!(value_n2, 31);
tracing::info!(
"query result is correct on g1 and g2: {value_n1}=={value_n2}"
);
Ok(())
}
#[tokio::test]
async fn no_catchup_strong_query() -> anyhow::Result<()> {
let network_id = NetworkId::random();
let group_key = GroupKey::random();
let n0 = Network::new(network_id).await?;
let g0 = n0
.groups()
.with_key(group_key)
.with_state_machine(Counter::default())
.join();
let timeout = 2
* (g0.config().consensus().bootstrap_delay
+ g0.config().consensus().election_timeout
+ g0.config().consensus().election_timeout_jitter);
timeout_after(timeout, g0.when().online()).await?;
assert_eq!(g0.leader(), Some(n0.local().id()));
assert_eq!(g0.committed(), 0);
tracing::info!("g0 is online");
let n1 = Network::new(network_id).await?;
let n2 = Network::new(network_id).await?;
discover_all([&n0, &n1, &n2]).await?;
let g1 = n1
.groups()
.with_key(group_key)
.with_state_machine(Counter::default())
.join();
let g2 = n2
.groups()
.with_key(group_key)
.with_state_machine(Counter::default())
.join();
assert_eq!(g0.id(), g1.id());
assert_eq!(g0.id(), g2.id());
timeout_after(timeout, g1.when().online()).await?;
assert_eq!(g1.leader(), Some(n0.local().id()));
assert_eq!(g1.committed(), 0);
tracing::info!("g1 is online and is following g0 as leader");
timeout_after(timeout, g2.when().online()).await?;
assert_eq!(g2.leader(), Some(n0.local().id()));
assert_eq!(g2.committed(), 0);
tracing::info!("g2 is online and is following g0 as leader");
timeout_s(2, g0.execute(CounterCommand::Increment(3))).await??;
timeout_s(2, g0.execute(CounterCommand::Increment(4))).await??;
timeout_s(2, g1.when().committed().reaches(2)).await?;
let index = g0.committed();
tracing::info!("leader committed to index {index}");
assert_eq!(index, 2);
let start = Instant::now();
let value = g0.query(CounterValueQuery, Strong).await?;
let dur_strong_leader = start.elapsed();
tracing::info!("counter value on leader: {value}");
assert_eq!(value, 7);
assert_eq!(value.state_position(), 2);
let start = Instant::now();
let value = g1.query(CounterValueQuery, Weak).await?;
let dur_weak_follower = start.elapsed();
tracing::info!("counter value on follower g1 (weak): {value}");
assert_eq!(value, 7);
assert_eq!(value.state_position(), 2);
let start = Instant::now();
let value = g1.query(CounterValueQuery, Strong).await?;
let dur_strong_follower = start.elapsed();
tracing::info!("counter value on follower g1 (strong): {value}");
assert_eq!(value, 7);
assert_eq!(value.state_position(), 2);
assert!(
dur_strong_follower > dur_strong_leader,
"strong query on follower should take longer than on leader due to \
forwarding and waiting for response {dur_strong_follower:?} vs \
{dur_strong_leader:?}"
);
assert!(
dur_strong_follower > dur_weak_follower,
"strong query on follower should take longer than weak query on follower \
due to forwarding and waiting for response {dur_strong_follower:?} vs \
{dur_weak_follower:?}"
);
Ok(())
}
#[tokio::test]
async fn execute_is_send_sync_via_joinset() -> anyhow::Result<()> {
let network_id = NetworkId::random();
let group_key = GroupKey::random();
let n0 = Network::new(network_id).await?;
let n1 = Network::new(network_id).await?;
discover_all([&n0, &n1]).await?;
let g0 = n0
.groups()
.with_key(group_key)
.with_state_machine(Counter::default())
.join();
let g1 = n1
.groups()
.with_key(group_key)
.with_state_machine(Counter::default())
.join();
let timeout = 2
* (g0.config().consensus().bootstrap_delay
+ g0.config().consensus().election_timeout
+ g0.config().consensus().election_timeout_jitter);
timeout_after(timeout, g0.when().online()).await?;
timeout_after(timeout, g1.when().online()).await?;
let mut set = JoinSet::new();
for i in 0..5 {
set.spawn(g0.execute(CounterCommand::Increment(i + 1)));
}
for i in 0..5 {
set.spawn(g1.execute(CounterCommand::Increment(i + 10)));
}
while let Some(result) = set.join_next().await {
let index = result??;
tracing::info!("command committed at index {index}");
}
timeout_s(2, g0.when().committed().reaches(10)).await?;
timeout_s(2, g1.when().committed().reaches(10)).await?;
let value = g0.query(CounterValueQuery, Consistency::Weak).await?;
tracing::info!("final counter value: {value}");
assert_eq!(value, 75);
Ok(())
}
#[tokio::test]
async fn execute_many_is_send_sync_via_joinset() -> anyhow::Result<()> {
let network_id = NetworkId::random();
let group_key = GroupKey::random();
let n0 = Network::new(network_id).await?;
let n1 = Network::new(network_id).await?;
discover_all([&n0, &n1]).await?;
let g0 = n0
.groups()
.with_key(group_key)
.with_state_machine(Counter::default())
.join();
let g1 = n1
.groups()
.with_key(group_key)
.with_state_machine(Counter::default())
.join();
let timeout = 2
* (g0.config().consensus().bootstrap_delay
+ g0.config().consensus().election_timeout
+ g0.config().consensus().election_timeout_jitter);
timeout_after(timeout, g0.when().online()).await?;
timeout_after(timeout, g1.when().online()).await?;
let mut set = JoinSet::new();
set.spawn(g0.execute_many([
CounterCommand::Increment(1),
CounterCommand::Increment(2),
CounterCommand::Increment(3),
]));
set.spawn(g0.execute_many([
CounterCommand::Decrement(1),
CounterCommand::Increment(5),
]));
set.spawn(g1.execute_many([
CounterCommand::Increment(10),
CounterCommand::Increment(20),
]));
set.spawn(g1.execute_many([
CounterCommand::Decrement(5),
CounterCommand::Increment(15),
]));
while let Some(result) = set.join_next().await {
let range = result??;
tracing::info!("batch committed at range {}", Pretty(&range));
}
timeout_s(2, g0.when().committed().reaches(9)).await?;
timeout_s(2, g1.when().committed().reaches(9)).await?;
let value = g0.query(CounterValueQuery, Consistency::Weak).await?;
tracing::info!("final counter value: {value}");
assert_eq!(value, 50);
Ok(())
}
#[tokio::test]
async fn query_is_send_sync_via_joinset() -> anyhow::Result<()> {
let network_id = NetworkId::random();
let group_key = GroupKey::random();
let n0 = Network::new(network_id).await?;
let n1 = Network::new(network_id).await?;
discover_all([&n0, &n1]).await?;
let g0 = n0
.groups()
.with_key(group_key)
.with_state_machine(Counter::default())
.join();
let g1 = n1
.groups()
.with_key(group_key)
.with_state_machine(Counter::default())
.join();
let timeout = 2
* (g0.config().consensus().bootstrap_delay
+ g0.config().consensus().election_timeout
+ g0.config().consensus().election_timeout_jitter);
timeout_after(timeout, g0.when().online()).await?;
timeout_after(timeout, g1.when().online()).await?;
timeout_s(2, g0.execute(CounterCommand::Increment(10))).await??;
timeout_s(2, g0.execute(CounterCommand::Increment(20))).await??;
timeout_s(2, g1.when().committed().reaches(2)).await?;
let mut set = JoinSet::new();
for _ in 0..3 {
set.spawn(g0.query(CounterValueQuery, Consistency::Weak));
set.spawn(g1.query(CounterValueQuery, Consistency::Weak));
}
for _ in 0..3 {
set.spawn(g0.query(CounterValueQuery, Consistency::Strong));
set.spawn(g1.query(CounterValueQuery, Consistency::Strong));
}
while let Some(result) = set.join_next().await {
let value = result??;
tracing::info!("query result: {value}");
assert_eq!(value, 30);
}
Ok(())
}