use crate::elections::proposal::Proposal;
use noxu_sync::{Condvar, Mutex};
use std::time::{Duration, Instant};
const DEFAULT_LATCH_TIMEOUT: Duration = Duration::from_millis(5000);
struct FreezeState {
proposal: Option<Proposal>,
freeze_end: Option<Instant>,
generation: u64,
frozen: bool,
}
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
pub struct FreezeStats {
pub freeze_count: u64,
pub await_timeout_count: u64,
pub await_election_count: u64,
}
pub struct CommitFreezeLatch {
state: Mutex<FreezeState>,
thaw_signal: Condvar,
timeout: Duration,
stats: Mutex<FreezeStats>,
}
impl Default for CommitFreezeLatch {
fn default() -> Self {
Self::new()
}
}
impl CommitFreezeLatch {
pub fn new() -> Self {
Self {
state: Mutex::new(FreezeState {
proposal: None,
freeze_end: None,
generation: 0,
frozen: false,
}),
thaw_signal: Condvar::new(),
timeout: DEFAULT_LATCH_TIMEOUT,
stats: Mutex::new(FreezeStats::default()),
}
}
pub fn with_timeout(timeout: Duration) -> Self {
let mut l = Self::new();
l.timeout = timeout;
l
}
pub fn stats(&self) -> FreezeStats {
*self.stats.lock()
}
pub fn freeze(&self, freeze_proposal: Proposal) {
let mut st = self.state.lock();
if let Some(ref cur) = st.proposal {
if !freeze_proposal.is_better_than(cur) {
return;
}
st.generation = st.generation.wrapping_add(1);
self.thaw_signal.notify_all();
}
st.proposal = Some(freeze_proposal);
st.freeze_end = Some(Instant::now() + self.timeout);
st.frozen = true;
self.stats.lock().freeze_count += 1;
}
pub fn vlsn_event(&self, listener_proposal: &Proposal) {
let mut st = self.state.lock();
let lift = match st.proposal {
None => return, Some(ref cur) => !cur.is_better_than(listener_proposal),
};
if lift {
st.frozen = false;
st.generation = st.generation.wrapping_add(1);
self.thaw_signal.notify_all();
}
}
pub fn clear_latch(&self) {
let mut st = self.state.lock();
st.frozen = false;
st.proposal = None;
st.freeze_end = None;
st.generation = st.generation.wrapping_add(1);
self.thaw_signal.notify_all();
}
pub fn await_thaw(&self) -> bool {
let mut st = self.state.lock();
if !st.frozen {
return false; }
let my_generation = st.generation;
loop {
if !st.frozen || st.generation != my_generation {
if !st.frozen {
self.stats.lock().await_election_count += 1;
st.proposal = None;
st.freeze_end = None;
return true;
}
return false;
}
let now = Instant::now();
let end = st.freeze_end.unwrap_or(now);
if now >= end {
self.stats.lock().await_timeout_count += 1;
st.frozen = false;
st.proposal = None;
st.freeze_end = None;
return false;
}
let remaining = end - now;
let _ = self.thaw_signal.wait_for(&mut st, remaining);
}
}
pub fn is_frozen(&self) -> bool {
self.state.lock().frozen
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use std::thread;
fn prop(name: &str, vlsn: u64, term: u64) -> Proposal {
Proposal::with_timestamp(name.to_string(), vlsn, 1, term, 0)
}
#[test]
fn await_thaw_no_freeze_returns_false() {
let latch = CommitFreezeLatch::new();
assert!(!latch.await_thaw(), "no freeze in effect");
}
#[test]
fn freeze_then_election_event_thaws() {
let latch =
Arc::new(CommitFreezeLatch::with_timeout(Duration::from_secs(5)));
latch.freeze(prop("n1", 100, 5));
assert!(latch.is_frozen());
let l2 = Arc::clone(&latch);
let waiter = thread::spawn(move || l2.await_thaw());
thread::sleep(Duration::from_millis(20));
latch.vlsn_event(&prop("n1", 100, 5));
let started = Instant::now();
let thawed = waiter.join().unwrap();
assert!(thawed, "election event must thaw the freeze");
assert!(
started.elapsed() < Duration::from_secs(2),
"must wake on the event, not spin to timeout"
);
assert_eq!(latch.stats().await_election_count, 1);
}
#[test]
fn freeze_times_out_without_event() {
let latch = CommitFreezeLatch::with_timeout(Duration::from_millis(40));
latch.freeze(prop("n1", 100, 5));
let thawed = latch.await_thaw();
assert!(!thawed, "timeout returns false");
assert_eq!(latch.stats().await_timeout_count, 1);
assert!(!latch.is_frozen());
}
#[test]
fn older_proposal_does_not_extend_freeze() {
let latch = CommitFreezeLatch::with_timeout(Duration::from_secs(5));
latch.freeze(prop("n1", 200, 5));
let before = latch.stats().freeze_count;
latch.freeze(prop("n1", 100, 3));
assert_eq!(
latch.stats().freeze_count,
before,
"older proposal must not (re)freeze"
);
}
#[test]
fn older_election_event_does_not_thaw() {
let latch = CommitFreezeLatch::with_timeout(Duration::from_millis(80));
latch.freeze(prop("n1", 200, 5));
latch.vlsn_event(&prop("n1", 100, 3));
assert!(latch.is_frozen(), "older event must not thaw");
assert!(!latch.await_thaw());
}
}