use crate::{
RangeOption, Transaction,
options::MutationType,
tuple::{Subspace, Versionstamp, pack, pack_with_versionstamp, unpack},
};
use futures::StreamExt;
use std::ops::Deref;
use std::time::Duration;
use super::{
errors::{LeaderElectionError, Result},
keys::*,
types::*,
};
pub async fn initialize<T>(txn: &T, subspace: &Subspace, config: ElectionConfig) -> Result<()>
where
T: Deref<Target = Transaction>,
{
let key = config_key(subspace);
if txn.get(&key, false).await?.is_none() {
write_config_internal(txn, &key, &config);
}
Ok(())
}
pub async fn read_config<T>(txn: &T, subspace: &Subspace) -> Result<ElectionConfig>
where
T: Deref<Target = Transaction>,
{
let key = config_key(subspace);
let data = txn
.get(&key, false)
.await?
.ok_or(LeaderElectionError::NotInitialized)?;
let tuple: (u64, u64, u64, bool, bool) = unpack(&data)?;
Ok(ElectionConfig {
lease_duration: Duration::from_nanos(tuple.0),
heartbeat_interval: Duration::from_nanos(tuple.1),
candidate_timeout: Duration::from_nanos(tuple.2),
election_enabled: tuple.3,
allow_preemption: tuple.4,
})
}
pub async fn write_config<T>(txn: &T, subspace: &Subspace, config: &ElectionConfig) -> Result<()>
where
T: Deref<Target = Transaction>,
{
let key = config_key(subspace);
write_config_internal(txn, &key, config);
Ok(())
}
fn write_config_internal<T>(txn: &T, key: &[u8], config: &ElectionConfig)
where
T: Deref<Target = Transaction>,
{
let data = (
config.lease_duration.as_nanos() as u64,
config.heartbeat_interval.as_nanos() as u64,
config.candidate_timeout.as_nanos() as u64,
config.election_enabled,
config.allow_preemption,
);
let packed = pack(&data);
txn.set(key, &packed);
}
async fn read_leader_state<T>(txn: &T, key: &[u8]) -> Result<Option<LeaderState>>
where
T: Deref<Target = Transaction>,
{
let data = match txn.get(key, false).await? {
Some(d) => d,
None => return Ok(None),
};
let tuple: (u64, String, i32, u64, Vec<u8>) = unpack(&data)?;
let versionstamp: [u8; 12] = tuple.4.try_into().map_err(|_| {
LeaderElectionError::InvalidState("Invalid versionstamp length".to_string())
})?;
Ok(Some(LeaderState {
ballot: tuple.0,
leader_id: tuple.1,
priority: tuple.2,
lease_expiry_nanos: tuple.3,
versionstamp,
}))
}
fn write_leader_state<T>(txn: &T, key: &[u8], state: &LeaderState)
where
T: Deref<Target = Transaction>,
{
let data = (
state.ballot,
state.leader_id.clone(),
state.priority,
state.lease_expiry_nanos,
state.versionstamp.to_vec(),
);
let packed = pack(&data);
txn.set(key, &packed);
}
pub async fn try_claim_leadership<T>(
txn: &T,
subspace: &Subspace,
process_id: &str,
my_priority: i32,
current_time: Duration,
) -> Result<Option<LeaderState>>
where
T: Deref<Target = Transaction>,
{
let config = read_config(txn, subspace).await?;
if !config.election_enabled {
return Err(LeaderElectionError::ElectionDisabled);
}
let candidate = get_candidate(txn, subspace, process_id)
.await?
.ok_or(LeaderElectionError::UnregisteredCandidate)?;
let my_versionstamp = candidate.versionstamp;
let key = leader_key(subspace);
let current_leader = read_leader_state(txn, &key).await?;
let can_claim = match ¤t_leader {
None => true, Some(leader) => {
leader.leader_id == process_id || !leader.is_lease_valid(current_time) || (config.allow_preemption && my_priority > leader.priority) }
};
if !can_claim {
return Ok(None);
}
let new_ballot = current_leader.as_ref().map(|l| l.ballot + 1).unwrap_or(1);
let lease_expiry = current_time + config.lease_duration;
let new_state = LeaderState {
ballot: new_ballot,
leader_id: process_id.to_string(),
priority: my_priority,
lease_expiry_nanos: lease_expiry.as_nanos() as u64,
versionstamp: my_versionstamp,
};
write_leader_state(txn, &key, &new_state);
Ok(Some(new_state))
}
pub async fn refresh_lease<T>(
txn: &T,
subspace: &Subspace,
process_id: &str,
current_time: Duration,
) -> Result<Option<LeaderState>>
where
T: Deref<Target = Transaction>,
{
let config = read_config(txn, subspace).await?;
let key = leader_key(subspace);
let current = read_leader_state(txn, &key).await?;
match current {
Some(leader) if leader.leader_id == process_id => {
let new_state = LeaderState {
ballot: leader.ballot + 1,
lease_expiry_nanos: (current_time + config.lease_duration).as_nanos() as u64,
leader_id: leader.leader_id,
priority: leader.priority,
versionstamp: leader.versionstamp,
};
write_leader_state(txn, &key, &new_state);
Ok(Some(new_state))
}
_ => Ok(None), }
}
pub async fn resign_leadership<T>(txn: &T, subspace: &Subspace, process_id: &str) -> Result<bool>
where
T: Deref<Target = Transaction>,
{
let key = leader_key(subspace);
let current = read_leader_state(txn, &key).await?;
match current {
Some(leader) if leader.leader_id == process_id => {
txn.clear(&key);
Ok(true)
}
_ => Ok(false),
}
}
pub async fn is_leader<T>(
txn: &T,
subspace: &Subspace,
process_id: &str,
current_time: Duration,
) -> Result<bool>
where
T: Deref<Target = Transaction>,
{
let config = read_config(txn, subspace).await?;
if !config.election_enabled {
return Err(LeaderElectionError::ElectionDisabled);
}
match get_leader(txn, subspace, current_time).await? {
Some(leader) => Ok(leader.leader_id == process_id),
None => Ok(false),
}
}
pub async fn get_leader<T>(
txn: &T,
subspace: &Subspace,
current_time: Duration,
) -> Result<Option<LeaderState>>
where
T: Deref<Target = Transaction>,
{
let key = leader_key(subspace);
let leader = read_leader_state(txn, &key).await?;
match leader {
Some(l) if l.is_lease_valid(current_time) => Ok(Some(l)),
_ => Ok(None),
}
}
pub async fn get_leader_raw<T>(txn: &T, subspace: &Subspace) -> Result<Option<LeaderState>>
where
T: Deref<Target = Transaction>,
{
let key = leader_key(subspace);
read_leader_state(txn, &key).await
}
pub async fn register_candidate<T>(
txn: &T,
subspace: &Subspace,
process_id: &str,
priority: i32,
current_time: Duration,
) -> Result<()>
where
T: Deref<Target = Transaction>,
{
let config = read_config(txn, subspace).await?;
if !config.election_enabled {
return Err(LeaderElectionError::ElectionDisabled);
}
let key = candidate_key(subspace, process_id);
let data = (
priority,
current_time.as_nanos() as u64,
Versionstamp::incomplete(0),
);
let packed = pack_with_versionstamp(&data);
txn.atomic_op(&key, &packed, MutationType::SetVersionstampedValue);
Ok(())
}
pub async fn heartbeat_candidate<T>(
txn: &T,
subspace: &Subspace,
process_id: &str,
priority: i32,
current_time: Duration,
) -> Result<()>
where
T: Deref<Target = Transaction>,
{
let config = read_config(txn, subspace).await?;
if !config.election_enabled {
return Err(LeaderElectionError::ElectionDisabled);
}
let key = candidate_key(subspace, process_id);
let existing = txn
.get(&key, false)
.await?
.ok_or_else(|| LeaderElectionError::ProcessNotFound(process_id.to_string()))?;
let tuple: (i32, u64, Versionstamp) = unpack(&existing)?;
let versionstamp = tuple.2;
let data = (priority, current_time.as_nanos() as u64, versionstamp);
let packed = pack(&data); txn.set(&key, &packed);
Ok(())
}
pub async fn unregister_candidate<T>(txn: &T, subspace: &Subspace, process_id: &str) -> Result<()>
where
T: Deref<Target = Transaction>,
{
let key = candidate_key(subspace, process_id);
txn.clear(&key);
Ok(())
}
pub async fn get_candidate<T>(
txn: &T,
subspace: &Subspace,
process_id: &str,
) -> Result<Option<CandidateInfo>>
where
T: Deref<Target = Transaction>,
{
let key = candidate_key(subspace, process_id);
let data = match txn.get(&key, false).await? {
Some(d) => d,
None => return Ok(None),
};
let tuple: (i32, u64, Versionstamp) = unpack(&data)?;
Ok(Some(CandidateInfo {
process_id: process_id.to_string(),
priority: tuple.0,
last_heartbeat_nanos: tuple.1,
versionstamp: *tuple.2.as_bytes(),
}))
}
pub async fn list_candidates<T>(
txn: &T,
subspace: &Subspace,
current_time: Duration,
) -> Result<Vec<CandidateInfo>>
where
T: Deref<Target = Transaction>,
{
let config = read_config(txn, subspace).await?;
let (start, end) = candidates_range(subspace);
let candidates_subspace = subspace.subspace(&(CANDIDATES_PREFIX,));
let mut alive_candidates = Vec::new();
let range = RangeOption::from((start, end));
let mut kvs = txn.get_ranges_keyvalues(range, false);
while let Some(kv) = kvs.next().await {
let kv = kv?;
let key_tuple: (String,) = candidates_subspace.unpack(kv.key())?;
let process_id = key_tuple.0;
let tuple: (i32, u64, Versionstamp) = unpack(kv.value())?;
let candidate = CandidateInfo {
process_id,
priority: tuple.0,
last_heartbeat_nanos: tuple.1,
versionstamp: *tuple.2.as_bytes(),
};
if candidate.is_alive(current_time, config.candidate_timeout) {
alive_candidates.push(candidate);
}
}
alive_candidates.sort_by_key(|a| a.versionstamp);
Ok(alive_candidates)
}
pub async fn evict_dead_candidates<T>(
txn: &T,
subspace: &Subspace,
current_time: Duration,
) -> Result<usize>
where
T: Deref<Target = Transaction>,
{
let config = read_config(txn, subspace).await?;
let (start, end) = candidates_range(subspace);
let candidates_subspace = subspace.subspace(&(CANDIDATES_PREFIX,));
let mut evicted = 0;
let range = RangeOption::from((start, end));
let mut kvs = txn.get_ranges_keyvalues(range, false);
while let Some(kv) = kvs.next().await {
let kv = kv?;
let tuple: (i32, u64, Versionstamp) = unpack(kv.value())?;
let key_tuple: (String,) = candidates_subspace.unpack(kv.key())?;
let candidate = CandidateInfo {
process_id: key_tuple.0,
priority: tuple.0,
last_heartbeat_nanos: tuple.1,
versionstamp: *tuple.2.as_bytes(),
};
if !candidate.is_alive(current_time, config.candidate_timeout) {
txn.clear(kv.key());
evicted += 1;
}
}
Ok(evicted)
}
pub async fn run_election_cycle<T>(
txn: &T,
subspace: &Subspace,
process_id: &str,
my_priority: i32,
current_time: Duration,
) -> Result<ElectionResult>
where
T: Deref<Target = Transaction>,
{
heartbeat_candidate(txn, subspace, process_id, my_priority, current_time).await?;
match try_claim_leadership(txn, subspace, process_id, my_priority, current_time).await? {
Some(state) => Ok(ElectionResult::Leader(state)),
None => {
let current_leader = get_leader(txn, subspace, current_time).await?;
Ok(ElectionResult::Follower(current_leader))
}
}
}