mod common;
#[cfg(feature = "recipes-leader-election")]
mod leader_election_tests {
use foundationdb::{
Database, FdbBindingError,
recipes::leader_election::{ElectionConfig, LeaderElection},
tuple::Subspace,
};
use std::time::Duration;
#[test]
fn test_leader_election() {
let _guard = unsafe { foundationdb::boot() };
futures::executor::block_on(test_leader_election_basic_async()).expect("failed to run");
futures::executor::block_on(test_multi_process_leadership_async()).expect("failed to run");
futures::executor::block_on(test_heartbeat_and_lease_async()).expect("failed to run");
futures::executor::block_on(test_leadership_transfer_on_expired_lease_async())
.expect("failed to run");
futures::executor::block_on(test_config_change_async()).expect("failed to run");
futures::executor::block_on(test_resign_leadership_async()).expect("failed to run");
futures::executor::block_on(test_preemption_async()).expect("failed to run");
}
fn current_time() -> Duration {
std::time::SystemTime::now()
.duration_since(std::time::SystemTime::UNIX_EPOCH)
.unwrap()
}
async fn setup_test(db: &Database, test_name: &str) -> Result<LeaderElection, FdbBindingError> {
let subspace = Subspace::all().subspace(&test_name);
let (from, to) = subspace.range();
let from_ref = &from;
let to_ref = &to;
db.run(|txn, _| async move {
txn.clear_range(from_ref, to_ref);
Ok(())
})
.await?;
let election = LeaderElection::new(subspace);
Ok(election)
}
async fn test_leader_election_basic_async() -> Result<(), FdbBindingError> {
let db = crate::common::database().await?;
let election = setup_test(&db, "test_leader_election_basic_async").await?;
let process_id = "test-process-1";
let election_ref = &election;
db.run(|txn, _| async move {
election_ref.initialize(&txn).await?;
Ok(())
})
.await?;
let election_ref = &election;
db.run(|txn, _| async move {
election_ref
.register_candidate(&txn, process_id, 0, current_time())
.await?;
Ok(())
})
.await?;
let election_ref = &election;
let became_leader = db
.run(|txn, _| async move {
let result = election_ref
.try_claim_leadership(&txn, process_id, 0, current_time())
.await?;
Ok(result.is_some())
})
.await?;
assert!(
became_leader,
"Process should become leader when it's the only one"
);
let election_ref = &election;
let is_leader = db
.run(|txn, _| async move {
let is_leader = election_ref
.is_leader(&txn, process_id, current_time())
.await?;
Ok(is_leader)
})
.await?;
assert!(is_leader, "Process should be the leader");
Ok(())
}
async fn test_multi_process_leadership_async() -> Result<(), FdbBindingError> {
let db = crate::common::database().await?;
let election = setup_test(&db, "test_multi_process_leadership_async").await?;
let process_ids = ["process-1", "process-2", "process-3"];
let election_ref = &election;
db.run(|txn, _| async move {
election_ref.initialize(&txn).await?;
Ok(())
})
.await?;
for process_id in &process_ids {
let election_ref = &election;
db.run(|txn, _| async move {
election_ref
.register_candidate(&txn, process_id, 0, current_time())
.await?;
Ok(())
})
.await?;
}
let mut leaders = Vec::new();
for process_id in process_ids.iter() {
let election_ref = &election;
let became_leader = db
.run(|txn, _| async move {
let result = election_ref
.try_claim_leadership(&txn, process_id, 0, current_time())
.await?;
Ok(result.is_some())
})
.await?;
if became_leader {
leaders.push(*process_id);
}
}
assert_eq!(leaders.len(), 1, "Exactly one process should become leader");
let leader_id = leaders[0];
let election_ref = &election;
let is_leader = db
.run(|txn, _| async move {
let is_leader = election_ref
.is_leader(&txn, leader_id, current_time())
.await?;
Ok(is_leader)
})
.await?;
assert!(
is_leader,
"The elected process should be confirmed as leader"
);
for process_id in &process_ids {
if *process_id != leader_id {
let election_ref = &election;
let is_leader = db
.run(|txn, _| async move {
let is_leader = election_ref
.is_leader(&txn, process_id, current_time())
.await?;
Ok(is_leader)
})
.await?;
assert!(!is_leader, "Non-leader process should not be leader");
}
}
Ok(())
}
async fn test_heartbeat_and_lease_async() -> Result<(), FdbBindingError> {
use std::thread::sleep;
let db = crate::common::database().await?;
let election = setup_test(&db, "test_heartbeat_and_lease_async").await?;
let leader_id = "leader-process";
let follower_id = "follower-process";
let config = ElectionConfig::with_lease_duration(Duration::from_secs(5));
let election_ref = &election;
db.run(|txn, _| {
let config = config.clone();
async move {
election_ref.initialize_with_config(&txn, config).await?;
Ok(())
}
})
.await?;
let election_ref = &election;
db.run(|txn, _| async move {
election_ref
.register_candidate(&txn, leader_id, 0, current_time())
.await?;
Ok(())
})
.await?;
let election_ref = &election;
db.run(|txn, _| async move {
election_ref
.register_candidate(&txn, follower_id, 0, current_time())
.await?;
Ok(())
})
.await?;
let election_ref = &election;
let became_leader = db
.run(|txn, _| async move {
let result = election_ref
.try_claim_leadership(&txn, leader_id, 0, current_time())
.await?;
Ok(result.is_some())
})
.await?;
assert!(became_leader, "First process should become leader");
let election_ref = &election;
let refreshed = db
.run(|txn, _| async move {
let result = election_ref
.refresh_lease(&txn, leader_id, current_time())
.await?;
Ok(result.is_some())
})
.await?;
assert!(refreshed, "Leader should be able to refresh lease");
sleep(Duration::from_secs(2));
let election_ref = &election;
let still_leader = db
.run(|txn, _| async move {
let result = election_ref
.refresh_lease(&txn, leader_id, current_time())
.await?;
Ok(result.is_some())
})
.await?;
assert!(still_leader, "Leader should still be able to refresh");
Ok(())
}
async fn test_leadership_transfer_on_expired_lease_async() -> Result<(), FdbBindingError> {
use std::thread::sleep;
let db = crate::common::database().await?;
let election = setup_test(&db, "test_leadership_transfer_on_expired_lease_async").await?;
let initial_leader = "initial-leader";
let new_leader = "new-leader";
let config = ElectionConfig::with_lease_duration(Duration::from_secs(3));
let election_ref = &election;
db.run(|txn, _| {
let config = config.clone();
async move {
election_ref.initialize_with_config(&txn, config).await?;
Ok(())
}
})
.await?;
let election_ref = &election;
db.run(|txn, _| async move {
election_ref
.register_candidate(&txn, initial_leader, 0, current_time())
.await?;
Ok(())
})
.await?;
let election_ref = &election;
db.run(|txn, _| async move {
election_ref
.register_candidate(&txn, new_leader, 0, current_time())
.await?;
Ok(())
})
.await?;
let election_ref = &election;
let became_leader = db
.run(|txn, _| async move {
let result = election_ref
.try_claim_leadership(&txn, initial_leader, 0, current_time())
.await?;
Ok(result.is_some())
})
.await?;
assert!(became_leader, "Initial process should become leader");
let election_ref = &election;
let became_leader = db
.run(|txn, _| async move {
let result = election_ref
.try_claim_leadership(&txn, new_leader, 0, current_time())
.await?;
Ok(result.is_some())
})
.await?;
assert!(
!became_leader,
"New process should not become leader while initial lease is valid"
);
sleep(Duration::from_secs(4));
let election_ref = &election;
db.run(|txn, _| async move {
election_ref
.heartbeat_candidate(&txn, new_leader, 0, current_time())
.await?;
Ok(())
})
.await?;
let election_ref = &election;
let became_leader = db
.run(|txn, _| async move {
let result = election_ref
.try_claim_leadership(&txn, new_leader, 0, current_time())
.await?;
Ok(result.is_some())
})
.await?;
assert!(
became_leader,
"New process should become leader after initial lease expires"
);
let election_ref = &election;
let is_leader = db
.run(|txn, _| async move {
let is_leader = election_ref
.is_leader(&txn, new_leader, current_time())
.await?;
Ok(is_leader)
})
.await?;
assert!(is_leader, "New process should be confirmed as leader");
let election_ref = &election;
let is_leader = db
.run(|txn, _| async move {
let is_leader = election_ref
.is_leader(&txn, initial_leader, current_time())
.await?;
Ok(is_leader)
})
.await?;
assert!(!is_leader, "Initial process should no longer be leader");
Ok(())
}
async fn test_config_change_async() -> Result<(), FdbBindingError> {
let db = crate::common::database().await?;
let election = setup_test(&db, "test_config_change_async").await?;
let election_ref = &election;
db.run(|txn, _| async move {
election_ref.initialize(&txn).await?;
Ok(())
})
.await?;
let new_config = ElectionConfig {
election_enabled: false,
..Default::default()
};
let election_ref = &election;
db.run(|txn, _| {
let config = new_config.clone();
async move {
election_ref.write_config(&txn, &config).await?;
Ok(())
}
})
.await?;
let election_ref = &election;
let registration_failed = db
.run(|txn, _| async move {
match election_ref
.register_candidate(&txn, "test-process", 0, current_time())
.await
{
Err(_) => Ok(true), Ok(_) => Ok(false), }
})
.await?;
assert!(
registration_failed,
"Registration should fail when elections are disabled"
);
let enabled_config = ElectionConfig {
election_enabled: true,
..Default::default()
};
let election_ref = &election;
db.run(|txn, _| {
let config = enabled_config.clone();
async move {
election_ref.write_config(&txn, &config).await?;
Ok(())
}
})
.await?;
let election_ref = &election;
db.run(|txn, _| async move {
election_ref
.register_candidate(&txn, "test-process", 0, current_time())
.await?;
Ok(())
})
.await?;
Ok(())
}
async fn test_resign_leadership_async() -> Result<(), FdbBindingError> {
let db = crate::common::database().await?;
let election = setup_test(&db, "test_resign_leadership_async").await?;
let leader_id = "leader-process";
let follower_id = "follower-process";
let election_ref = &election;
db.run(|txn, _| async move {
election_ref.initialize(&txn).await?;
Ok(())
})
.await?;
let election_ref = &election;
db.run(|txn, _| async move {
election_ref
.register_candidate(&txn, leader_id, 0, current_time())
.await?;
Ok(())
})
.await?;
let election_ref = &election;
db.run(|txn, _| async move {
election_ref
.register_candidate(&txn, follower_id, 0, current_time())
.await?;
Ok(())
})
.await?;
let election_ref = &election;
db.run(|txn, _| async move {
election_ref
.try_claim_leadership(&txn, leader_id, 0, current_time())
.await?;
Ok(())
})
.await?;
let election_ref = &election;
let resigned = db
.run(|txn, _| async move {
let resigned = election_ref.resign_leadership(&txn, leader_id).await?;
Ok(resigned)
})
.await?;
assert!(resigned, "Leader should be able to resign");
let election_ref = &election;
let became_leader = db
.run(|txn, _| async move {
let result = election_ref
.try_claim_leadership(&txn, follower_id, 0, current_time())
.await?;
Ok(result.is_some())
})
.await?;
assert!(
became_leader,
"Follower should become leader after resignation"
);
Ok(())
}
async fn test_preemption_async() -> Result<(), FdbBindingError> {
let db = crate::common::database().await?;
let election = setup_test(&db, "test_preemption_async").await?;
let low_priority = "low-priority";
let high_priority = "high-priority";
let config = ElectionConfig {
allow_preemption: true,
..Default::default()
};
let election_ref = &election;
db.run(|txn, _| {
let config = config.clone();
async move {
election_ref.initialize_with_config(&txn, config).await?;
Ok(())
}
})
.await?;
let election_ref = &election;
db.run(|txn, _| async move {
election_ref
.register_candidate(&txn, low_priority, 1, current_time())
.await?;
Ok(())
})
.await?;
let election_ref = &election;
db.run(|txn, _| async move {
election_ref
.register_candidate(&txn, high_priority, 10, current_time())
.await?;
Ok(())
})
.await?;
let election_ref = &election;
let became_leader = db
.run(|txn, _| async move {
let result = election_ref
.try_claim_leadership(&txn, low_priority, 1, current_time())
.await?;
Ok(result.is_some())
})
.await?;
assert!(became_leader, "Low priority should become leader initially");
let election_ref = &election;
let became_leader = db
.run(|txn, _| async move {
let result = election_ref
.try_claim_leadership(&txn, high_priority, 10, current_time())
.await?;
Ok(result.is_some())
})
.await?;
assert!(
became_leader,
"High priority should preempt low priority leader"
);
let election_ref = &election;
let is_leader = db
.run(|txn, _| async move {
let is_leader = election_ref
.is_leader(&txn, high_priority, current_time())
.await?;
Ok(is_leader)
})
.await?;
assert!(is_leader, "High priority should be the leader");
let election_ref = &election;
let is_leader = db
.run(|txn, _| async move {
let is_leader = election_ref
.is_leader(&txn, low_priority, current_time())
.await?;
Ok(is_leader)
})
.await?;
assert!(!is_leader, "Low priority should not be the leader anymore");
Ok(())
}
}