#![cfg(feature = "cluster")]
use std::collections::BTreeMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use laminar_core::cluster::control::{
AssignmentSnapshot, AssignmentSnapshotStore, BarrierAck, BarrierAnnouncement, QuorumOutcome,
};
use laminar_core::cluster::discovery::NodeId;
use laminar_core::cluster::testing::{FaultyObjectStore, MiniCluster, ObjectStoreFault};
use object_store::local::LocalFileSystem;
use object_store::ObjectStore;
const CONVERGENCE_DEADLINE: Duration = Duration::from_secs(8);
const FAILOVER_DEADLINE: Duration = Duration::from_secs(5);
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn three_node_cluster_converges() {
let cluster = MiniCluster::spawn(3).await;
cluster
.wait_for_convergence(CONVERGENCE_DEADLINE)
.await
.expect("cluster must converge");
cluster.shutdown().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn leader_is_consistent_across_nodes() {
let cluster = MiniCluster::spawn(3).await;
cluster
.wait_for_convergence(CONVERGENCE_DEADLINE)
.await
.expect("convergence");
let leaders: Vec<_> = cluster
.nodes
.iter()
.map(|n| n.controller.current_leader())
.collect();
for (i, l) in leaders.iter().enumerate() {
assert_eq!(
*l,
Some(cluster.nodes[0].instance_id),
"node {i} disagrees on leader; got {l:?}"
);
}
let leader_count = cluster
.nodes
.iter()
.filter(|n| n.controller.is_leader())
.count();
assert_eq!(
leader_count, 1,
"exactly one node must self-identify as leader"
);
assert!(
cluster.nodes[0].controller.is_leader(),
"node 0 (lowest id) is leader"
);
cluster.shutdown().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn barrier_announce_then_follower_observes_and_acks() {
let cluster = MiniCluster::spawn(3).await;
cluster
.wait_for_convergence(CONVERGENCE_DEADLINE)
.await
.expect("convergence");
let leader = &cluster.nodes[0];
let followers: Vec<_> = cluster.nodes.iter().skip(1).collect();
let follower_ids: Vec<_> = followers.iter().map(|n| n.instance_id).collect();
let announcement = BarrierAnnouncement {
epoch: 7,
checkpoint_id: 101,
phase: laminar_core::cluster::control::Phase::Prepare,
flags: 0,
min_watermark_ms: None,
};
leader
.controller
.announce_barrier(&announcement)
.await
.expect("announce");
for follower in &followers {
let mut observed = None;
let start = std::time::Instant::now();
while start.elapsed() < Duration::from_secs(4) {
match follower.controller.observe_barrier().await {
Ok(Some(ann)) if ann.epoch == 7 => {
observed = Some(ann);
break;
}
_ => tokio::time::sleep(Duration::from_millis(50)).await,
}
}
observed.unwrap_or_else(|| {
panic!(
"follower {} never observed the barrier",
follower.instance_id.0
)
});
follower
.controller
.ack_barrier(&BarrierAck {
epoch: 7,
ok: true,
error: None,
local_watermark_ms: None,
})
.await
.expect("ack");
}
let outcome = leader
.controller
.wait_for_quorum(7, &follower_ids, Duration::from_secs(6))
.await;
match outcome {
QuorumOutcome::Reached { mut acks, .. } => {
acks.sort_by_key(|n| n.0);
let mut expected = follower_ids.clone();
expected.sort_by_key(|n| n.0);
assert_eq!(acks, expected);
}
other => panic!("expected Reached, got {other:?}"),
}
cluster.shutdown().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn leader_failover_on_kill() {
let mut cluster = MiniCluster::spawn(3).await;
cluster
.wait_for_convergence(CONVERGENCE_DEADLINE)
.await
.expect("convergence");
assert!(
cluster.nodes[0].controller.is_leader(),
"node 0 (lowest id) starts as leader"
);
let original_leader = cluster.nodes.remove(0);
let original_id = original_leader.instance_id;
original_leader.kill().await;
let start = Instant::now();
let expected = cluster.nodes[0].instance_id;
loop {
let leaders: Vec<_> = cluster
.nodes
.iter()
.map(|n| n.controller.current_leader())
.collect();
let all_agree = leaders.iter().all(|l| *l == Some(expected));
let correct_self_id =
cluster.nodes[0].controller.is_leader() && !cluster.nodes[1].controller.is_leader();
if all_agree && correct_self_id {
break;
}
if start.elapsed() > FAILOVER_DEADLINE {
panic!(
"leader failover incomplete after {:?}: got leaders {leaders:?}, expected {expected:?} (original {original_id:?})",
FAILOVER_DEADLINE
);
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
cluster.shutdown().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn network_partition_produces_two_leaders() {
let cluster = MiniCluster::spawn_partitionable(4).await;
cluster
.wait_for_convergence(CONVERGENCE_DEADLINE)
.await
.expect("initial convergence");
let pre = cluster.nodes[0].controller.current_leader();
assert_eq!(pre, Some(cluster.nodes[0].instance_id));
let addrs = cluster.addrs();
let rules = cluster
.rules
.as_ref()
.expect("partitionable cluster has rules");
rules.partition(&addrs[0..2], &addrs[2..4]);
let expected_side_a = cluster.nodes[0].instance_id;
let expected_side_b = cluster.nodes[2].instance_id;
let start = Instant::now();
loop {
let side_a = cluster.nodes[0].controller.current_leader();
let side_a_peer = cluster.nodes[1].controller.current_leader();
let side_b = cluster.nodes[2].controller.current_leader();
let side_b_peer = cluster.nodes[3].controller.current_leader();
let converged = side_a == Some(expected_side_a)
&& side_a_peer == Some(expected_side_a)
&& side_b == Some(expected_side_b)
&& side_b_peer == Some(expected_side_b);
if converged {
break;
}
if start.elapsed() > FAILOVER_DEADLINE {
panic!(
"partition did not produce split leaders within {:?}: \
side_a={side_a:?} side_a_peer={side_a_peer:?} \
side_b={side_b:?} side_b_peer={side_b_peer:?}",
FAILOVER_DEADLINE
);
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
rules.heal();
let start = Instant::now();
loop {
let leaders: Vec<_> = cluster
.nodes
.iter()
.map(|n| n.controller.current_leader())
.collect();
if leaders.iter().all(|l| *l == Some(expected_side_a)) {
break;
}
if start.elapsed() > FAILOVER_DEADLINE {
panic!("partition did not heal in {FAILOVER_DEADLINE:?}: {leaders:?}");
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
cluster.shutdown().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn snapshot_save_fails_under_object_store_fault_and_recovers() {
let dir = tempfile::tempdir().expect("tempdir");
let inner: Arc<dyn ObjectStore> =
Arc::new(LocalFileSystem::new_with_prefix(dir.path()).unwrap());
let faulty = Arc::new(FaultyObjectStore::new(inner));
let store = Arc::new(AssignmentSnapshotStore::new(
Arc::clone(&faulty) as Arc<dyn ObjectStore>
));
let mut v1_map = BTreeMap::new();
v1_map.insert(0u32, NodeId(1));
let v1 = AssignmentSnapshot::empty().next(v1_map);
store.save_if_absent(&v1).await.expect("baseline save");
faulty.set_fault(ObjectStoreFault::FailWrites);
let mut v2_map = BTreeMap::new();
v2_map.insert(0u32, NodeId(2));
let v2 = v1.next(v2_map);
let write_err = store
.save_if_version(&v2, v1.version)
.await
.expect_err("write should fail");
assert!(
format!("{write_err}").to_lowercase().contains("injected")
|| matches!(
write_err,
laminar_core::cluster::control::SnapshotError::Io(_)
),
"unexpected error shape: {write_err:?}"
);
let still_v1 = store.load().await.expect("load ok").expect("present");
assert_eq!(still_v1.version, 1);
faulty.set_fault(ObjectStoreFault::FailReads);
match store.load().await {
Err(_) | Ok(None) => {}
Ok(Some(s)) => panic!("expected read fault, got Ok(Some({s:?}))"),
}
faulty.set_fault(ObjectStoreFault::None);
assert!(matches!(
store
.save_if_version(&v2, v1.version)
.await
.expect("rotate after heal"),
laminar_core::cluster::control::RotateOutcome::Rotated,
));
let loaded = store.load().await.unwrap().unwrap();
assert_eq!(loaded.version, 2);
assert_eq!(loaded.vnodes.get(&0), Some(&NodeId(2)));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn assignment_snapshot_survives_full_cluster_restart() {
let dir = tempfile::tempdir().expect("tempdir");
let fs_a: Arc<dyn ObjectStore> =
Arc::new(LocalFileSystem::new_with_prefix(dir.path()).unwrap());
let store_a = Arc::new(AssignmentSnapshotStore::new(fs_a));
let cluster_a = MiniCluster::spawn_with_snapshot(3, Arc::clone(&store_a)).await;
cluster_a
.wait_for_convergence(CONVERGENCE_DEADLINE)
.await
.expect("initial convergence");
let leader = &cluster_a.nodes[0];
assert!(leader.controller.is_leader());
let mut vnodes = BTreeMap::new();
vnodes.insert(0u32, NodeId(1));
vnodes.insert(1u32, NodeId(2));
vnodes.insert(2u32, NodeId(3));
let snapshot = AssignmentSnapshot::empty().next(vnodes);
leader
.controller
.snapshot_store()
.expect("snapshot store installed")
.save_if_absent(&snapshot)
.await
.expect("save");
cluster_a.shutdown().await;
let fs_b: Arc<dyn ObjectStore> =
Arc::new(LocalFileSystem::new_with_prefix(dir.path()).unwrap());
let store_b = Arc::new(AssignmentSnapshotStore::new(fs_b));
let cluster_b = MiniCluster::spawn_with_snapshot(3, Arc::clone(&store_b)).await;
cluster_b
.wait_for_convergence(CONVERGENCE_DEADLINE)
.await
.expect("restart convergence");
let leader_b = &cluster_b.nodes[0];
assert!(leader_b.controller.is_leader());
let loaded = leader_b
.controller
.snapshot_store()
.expect("snapshot store installed")
.load()
.await
.expect("load")
.expect("snapshot present");
assert_eq!(loaded.version, snapshot.version);
assert_eq!(loaded.vnodes, snapshot.vnodes);
cluster_b.shutdown().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn fresh_node_can_join_running_cluster() {
let mut cluster = MiniCluster::spawn(3).await;
cluster
.wait_for_convergence(CONVERGENCE_DEADLINE)
.await
.expect("initial convergence");
cluster.join_node(NodeId(99)).await;
let start = Instant::now();
loop {
let all_see_four = cluster
.nodes
.iter()
.all(|n| n.controller.live_instances().len() == 4);
if all_see_four {
break;
}
if start.elapsed() > FAILOVER_DEADLINE {
let dumps: Vec<_> = cluster
.nodes
.iter()
.map(|n| (n.instance_id, n.controller.live_instances()))
.collect();
panic!("fresh join didn't converge: {dumps:?}");
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
cluster.shutdown().await;
}
#[ignore = "known chitchat same-id rejoin limitation; see fresh_node_can_join_running_cluster for the (a)-path test"]
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn killed_node_can_rejoin() {
let mut cluster = MiniCluster::spawn(3).await;
cluster
.wait_for_convergence(CONVERGENCE_DEADLINE)
.await
.expect("initial convergence");
let killed_id = cluster.nodes[0].instance_id;
let killed = cluster.nodes.remove(0);
killed.kill().await;
tokio::time::sleep(Duration::from_secs(5)).await;
cluster.join_node(killed_id).await;
let rejoined_idx = cluster
.nodes
.iter()
.position(|n| n.instance_id == killed_id)
.expect("rejoined node present");
cluster.nodes.swap(0, rejoined_idx);
let start = Instant::now();
loop {
let leaders: Vec<_> = cluster
.nodes
.iter()
.map(|n| n.controller.current_leader())
.collect();
if leaders.iter().all(|l| *l == Some(killed_id)) {
break;
}
if start.elapsed() > Duration::from_secs(15) {
panic!("rejoined leader never surfaced: {leaders:?}");
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
cluster.shutdown().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn quorum_times_out_when_follower_silent() {
let cluster = MiniCluster::spawn(3).await;
cluster
.wait_for_convergence(CONVERGENCE_DEADLINE)
.await
.expect("convergence");
let leader = &cluster.nodes[0];
let silent = cluster.nodes[1].instance_id;
let acker = &cluster.nodes[2];
let expected_acks = vec![silent, acker.instance_id];
leader
.controller
.announce_barrier(&BarrierAnnouncement {
epoch: 42,
checkpoint_id: 1,
phase: laminar_core::cluster::control::Phase::Prepare,
flags: 0,
min_watermark_ms: None,
})
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(300)).await;
acker
.controller
.ack_barrier(&BarrierAck {
epoch: 42,
ok: true,
error: None,
local_watermark_ms: None,
})
.await
.unwrap();
let outcome = leader
.controller
.wait_for_quorum(42, &expected_acks, Duration::from_secs(2))
.await;
match outcome {
QuorumOutcome::TimedOut { got, missing } => {
assert_eq!(got, vec![acker.instance_id]);
assert_eq!(missing, vec![silent]);
}
other => panic!("expected TimedOut, got {other:?}"),
}
cluster.shutdown().await;
}