#![allow(dead_code)]
use std::collections::HashSet;
use std::time::{Duration, Instant};
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QuorumPolicy {
pub n: usize,
pub w: usize,
pub ack_timeout: Duration,
pub clock_skew_warn: Duration,
}
impl QuorumPolicy {
pub fn new(
n: usize,
w: usize,
ack_timeout: Duration,
clock_skew_warn: Duration,
) -> Result<Self, QuorumError> {
if n == 0 {
return Err(QuorumError::InvalidPolicy {
detail: "n must be >= 1".to_string(),
});
}
Ok(Self {
n,
w: w.clamp(1, n),
ack_timeout,
clock_skew_warn,
})
}
pub fn majority(n: usize) -> Result<Self, QuorumError> {
let w = n.div_ceil(2).max(1);
Self::new(n, w, Duration::from_secs(2), Duration::from_secs(30))
}
}
#[non_exhaustive]
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum QuorumError {
QuorumNotMet {
got: usize,
needed: usize,
reason: QuorumFailureReason,
},
InvalidPolicy { detail: String },
LocalWriteFailed { detail: String },
}
impl std::fmt::Display for QuorumError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::QuorumNotMet {
got,
needed,
reason,
} => write!(
f,
"quorum not met (got {got}, need {needed}, reason {reason:?})"
),
Self::InvalidPolicy { detail } => write!(f, "invalid quorum policy: {detail}"),
Self::LocalWriteFailed { detail } => write!(f, "local write failed: {detail}"),
}
}
}
impl std::error::Error for QuorumError {}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum QuorumFailureReason {
Unreachable,
Timeout,
IdDrift,
InFlight,
}
#[derive(Debug)]
pub struct AckTracker {
policy: QuorumPolicy,
deadline: Instant,
local_committed: bool,
acks: HashSet<String>,
id_drifts: Vec<String>,
}
impl AckTracker {
#[must_use]
pub fn new(policy: QuorumPolicy, now: Instant) -> Self {
let deadline = now + policy.ack_timeout;
Self {
policy,
deadline,
local_committed: false,
acks: HashSet::new(),
id_drifts: Vec::new(),
}
}
pub fn record_local(&mut self) {
self.local_committed = true;
}
pub fn record_peer_ack(&mut self, peer_id: impl Into<String>) {
self.acks.insert(peer_id.into());
}
pub fn record_id_drift(&mut self, peer_id: impl Into<String>) {
self.id_drifts.push(peer_id.into());
}
#[must_use]
pub fn is_quorum_met(&self, now: Instant) -> bool {
if !self.local_committed || now > self.deadline {
return false;
}
let total = self.acks.len() + 1;
total >= self.policy.w
}
pub fn finalise(&self, now: Instant) -> Result<usize, QuorumError> {
if !self.local_committed {
return Err(QuorumError::LocalWriteFailed {
detail: "local commit not recorded before finalise".to_string(),
});
}
let got = self.acks.len() + 1;
if got >= self.policy.w {
return Ok(got);
}
let reason = if now > self.deadline {
if self.acks.is_empty() {
QuorumFailureReason::Unreachable
} else {
QuorumFailureReason::Timeout
}
} else {
QuorumFailureReason::InFlight
};
Err(QuorumError::QuorumNotMet {
got,
needed: self.policy.w,
reason,
})
}
#[must_use]
pub fn id_drift_count(&self) -> usize {
self.id_drifts.len()
}
}
#[cfg(test)]
mod tests {
use super::*;
fn instant_base() -> Instant {
Instant::now()
}
#[test]
fn policy_rejects_zero_n() {
let err = QuorumPolicy::new(0, 1, Duration::from_millis(500), Duration::from_secs(30))
.unwrap_err();
assert!(matches!(err, QuorumError::InvalidPolicy { .. }));
}
#[test]
fn policy_clamps_w_to_n() {
let p =
QuorumPolicy::new(3, 9, Duration::from_millis(500), Duration::from_secs(30)).unwrap();
assert_eq!(p.n, 3);
assert_eq!(p.w, 3);
}
#[test]
fn majority_default_matches_adr() {
assert_eq!(QuorumPolicy::majority(1).unwrap().w, 1);
assert_eq!(QuorumPolicy::majority(3).unwrap().w, 2);
assert_eq!(QuorumPolicy::majority(5).unwrap().w, 3);
assert_eq!(QuorumPolicy::majority(7).unwrap().w, 4);
}
#[test]
fn quorum_met_with_local_plus_peers() {
let policy = QuorumPolicy::majority(3).unwrap();
let mut tracker = AckTracker::new(policy, instant_base());
tracker.record_local();
tracker.record_peer_ack("peer-1");
assert!(tracker.is_quorum_met(instant_base()));
}
#[test]
fn quorum_dedupes_duplicate_peer() {
let policy =
QuorumPolicy::new(5, 3, Duration::from_millis(500), Duration::from_secs(30)).unwrap();
let mut tracker = AckTracker::new(policy, instant_base());
tracker.record_local();
tracker.record_peer_ack("peer-1");
tracker.record_peer_ack("peer-1");
tracker.record_peer_ack("peer-1");
assert!(!tracker.is_quorum_met(instant_base()));
tracker.record_peer_ack("peer-2");
assert!(tracker.is_quorum_met(instant_base()));
}
#[test]
fn quorum_not_met_without_local() {
let policy = QuorumPolicy::majority(3).unwrap();
let mut tracker = AckTracker::new(policy, instant_base());
tracker.record_peer_ack("peer-1");
tracker.record_peer_ack("peer-2");
assert!(!tracker.is_quorum_met(instant_base()));
}
#[test]
fn quorum_expired_after_deadline() {
let policy =
QuorumPolicy::new(3, 2, Duration::from_millis(1), Duration::from_secs(30)).unwrap();
let t0 = instant_base();
let mut tracker = AckTracker::new(policy, t0);
tracker.record_local();
let later = t0 + Duration::from_millis(50);
assert!(!tracker.is_quorum_met(later));
let err = tracker.finalise(later).unwrap_err();
match err {
QuorumError::QuorumNotMet {
got,
needed,
reason,
} => {
assert_eq!(got, 1);
assert_eq!(needed, 2);
assert_eq!(reason, QuorumFailureReason::Unreachable);
}
other => panic!("expected QuorumNotMet, got {other:?}"),
}
}
#[test]
fn quorum_finalise_reports_timeout_when_partial_acks() {
let policy =
QuorumPolicy::new(5, 3, Duration::from_millis(1), Duration::from_secs(30)).unwrap();
let t0 = instant_base();
let mut tracker = AckTracker::new(policy, t0);
tracker.record_local();
tracker.record_peer_ack("peer-1");
let err = tracker
.finalise(t0 + Duration::from_millis(50))
.unwrap_err();
match err {
QuorumError::QuorumNotMet { reason, .. } => {
assert_eq!(reason, QuorumFailureReason::Timeout);
}
other => panic!("expected QuorumNotMet/Timeout, got {other:?}"),
}
}
#[test]
fn id_drift_counted_but_does_not_satisfy_quorum() {
let policy = QuorumPolicy::majority(3).unwrap();
let mut tracker = AckTracker::new(policy, instant_base());
tracker.record_local();
tracker.record_id_drift("peer-1");
tracker.record_id_drift("peer-2");
assert_eq!(tracker.id_drift_count(), 2);
assert!(!tracker.is_quorum_met(instant_base()));
}
#[test]
fn finalise_without_local_commit_errors_local_write_failed() {
let policy = QuorumPolicy::majority(3).unwrap();
let tracker = AckTracker::new(policy, instant_base());
let err = tracker.finalise(instant_base()).unwrap_err();
assert!(matches!(err, QuorumError::LocalWriteFailed { .. }));
}
#[test]
fn quorum_error_is_displayable_and_is_an_error() {
let e = QuorumError::QuorumNotMet {
got: 1,
needed: 3,
reason: QuorumFailureReason::Timeout,
};
let display = format!("{e}");
assert!(display.contains("quorum not met"));
let _: &dyn std::error::Error = &e;
}
#[test]
fn single_node_quorum_is_trivially_met() {
let policy =
QuorumPolicy::new(1, 1, Duration::from_millis(500), Duration::from_secs(30)).unwrap();
let mut tracker = AckTracker::new(policy, instant_base());
tracker.record_local();
assert!(tracker.is_quorum_met(instant_base()));
}
}