use std::collections::HashSet;
use std::sync::Arc;
use std::time::{Duration, Instant};
use super::primary::PrimaryReplication;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum QuorumMode {
Async,
Sync { min_replicas: usize },
Regions { required: HashSet<String> },
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct QuorumConfig {
pub mode: QuorumMode,
pub timeout: Option<Duration>,
}
impl QuorumConfig {
pub fn async_commit() -> Self {
Self {
mode: QuorumMode::Async,
timeout: None,
}
}
pub fn sync(min_replicas: usize) -> Self {
Self {
mode: QuorumMode::Sync { min_replicas },
timeout: Some(Duration::from_secs(5)),
}
}
pub fn regions<I, S>(regions: I) -> Self
where
I: IntoIterator<Item = S>,
S: Into<String>,
{
Self {
mode: QuorumMode::Regions {
required: regions.into_iter().map(|r| r.into()).collect(),
},
timeout: Some(Duration::from_secs(10)),
}
}
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.timeout = Some(timeout);
self
}
pub fn without_timeout(mut self) -> Self {
self.timeout = None;
self
}
pub fn is_async(&self) -> bool {
matches!(self.mode, QuorumMode::Async)
}
}
impl Default for QuorumConfig {
fn default() -> Self {
Self::async_commit()
}
}
#[derive(Debug, Clone)]
pub enum QuorumError {
Timeout {
target_lsn: u64,
elapsed_ms: u128,
acked_regions: HashSet<String>,
},
InsufficientReplicas { required: usize, connected: usize },
MissingRegions { missing: Vec<String> },
}
impl std::fmt::Display for QuorumError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
QuorumError::Timeout {
target_lsn,
elapsed_ms,
acked_regions,
} => write!(
f,
"quorum timeout after {elapsed_ms}ms waiting for lsn {target_lsn} \
(acked by regions: {:?})",
acked_regions
),
QuorumError::InsufficientReplicas {
required,
connected,
} => write!(
f,
"quorum requires {required} replicas but only {connected} connected"
),
QuorumError::MissingRegions { missing } => {
write!(
f,
"required regions with no connected replicas: {:?}",
missing
)
}
}
}
}
impl std::error::Error for QuorumError {}
pub struct QuorumCoordinator {
primary: Arc<PrimaryReplication>,
config: QuorumConfig,
regions: parking_lot::RwLock<std::collections::HashMap<String, String>>,
}
impl QuorumCoordinator {
pub fn new(primary: Arc<PrimaryReplication>, config: QuorumConfig) -> Self {
Self {
primary,
config,
regions: parking_lot::RwLock::new(std::collections::HashMap::new()),
}
}
pub fn bind_replica_region(&self, replica_id: &str, region: &str) {
self.regions
.write()
.insert(replica_id.to_string(), region.to_string());
}
pub fn unbind_replica(&self, replica_id: &str) {
self.regions.write().remove(replica_id);
}
pub fn connected_regions(&self) -> HashSet<String> {
self.regions.read().values().cloned().collect()
}
pub fn wait_for_quorum(&self, target_lsn: u64) -> Result<(), QuorumError> {
if self.config.is_async() {
return Ok(());
}
self.validate_preconditions()?;
let start = Instant::now();
let timeout = self.config.timeout;
loop {
if self.has_quorum(target_lsn) {
return Ok(());
}
if let Some(limit) = timeout {
if start.elapsed() >= limit {
return Err(QuorumError::Timeout {
target_lsn,
elapsed_ms: start.elapsed().as_millis(),
acked_regions: self.acked_regions(target_lsn),
});
}
}
std::thread::sleep(Duration::from_millis(25));
}
}
pub fn has_quorum(&self, target_lsn: u64) -> bool {
match &self.config.mode {
QuorumMode::Async => true,
QuorumMode::Sync { min_replicas } => self.count_acked(target_lsn) >= *min_replicas,
QuorumMode::Regions { required } => {
let acked = self.acked_regions(target_lsn);
required.iter().all(|r| acked.contains(r))
}
}
}
fn validate_preconditions(&self) -> Result<(), QuorumError> {
match &self.config.mode {
QuorumMode::Async => Ok(()),
QuorumMode::Sync { min_replicas } => {
let connected = self.primary.replica_count();
if connected < *min_replicas {
return Err(QuorumError::InsufficientReplicas {
required: *min_replicas,
connected,
});
}
Ok(())
}
QuorumMode::Regions { required } => {
let connected = self.connected_regions();
let missing: Vec<String> = required
.iter()
.filter(|r| !connected.contains(*r))
.cloned()
.collect();
if missing.is_empty() {
Ok(())
} else {
Err(QuorumError::MissingRegions { missing })
}
}
}
}
fn count_acked(&self, target_lsn: u64) -> usize {
let replicas = self
.primary
.replicas
.read()
.unwrap_or_else(|e| e.into_inner());
replicas
.iter()
.filter(|r| r.last_acked_lsn >= target_lsn)
.count()
}
fn acked_regions(&self, target_lsn: u64) -> HashSet<String> {
let replicas = self
.primary
.replicas
.read()
.unwrap_or_else(|e| e.into_inner());
let regions = self.regions.read();
replicas
.iter()
.filter(|r| r.last_acked_lsn >= target_lsn)
.filter_map(|r| regions.get(&r.id).cloned())
.collect()
}
pub fn safe_replay_lsn(&self) -> Option<u64> {
let replicas = self
.primary
.replicas
.read()
.unwrap_or_else(|e| e.into_inner());
replicas.iter().map(|r| r.last_acked_lsn).min()
}
pub fn config(&self) -> &QuorumConfig {
&self.config
}
}
#[cfg(test)]
mod tests {
use super::*;
fn primary() -> Arc<PrimaryReplication> {
Arc::new(PrimaryReplication::new(None))
}
#[test]
fn async_mode_returns_immediately() {
let p = primary();
let q = QuorumCoordinator::new(Arc::clone(&p), QuorumConfig::async_commit());
assert!(q.wait_for_quorum(42).is_ok());
}
#[test]
fn sync_mode_fails_when_too_few_replicas() {
let p = primary();
let q = QuorumCoordinator::new(Arc::clone(&p), QuorumConfig::sync(2));
match q.wait_for_quorum(1) {
Err(QuorumError::InsufficientReplicas {
required,
connected,
}) => {
assert_eq!(required, 2);
assert_eq!(connected, 0);
}
other => panic!("expected InsufficientReplicas, got {:?}", other),
}
}
#[test]
fn sync_mode_returns_when_enough_acks() {
let p = primary();
p.register_replica("r1".to_string());
p.register_replica("r2".to_string());
p.ack_replica("r1", 10);
p.ack_replica("r2", 10);
let q = QuorumCoordinator::new(
Arc::clone(&p),
QuorumConfig::sync(2).with_timeout(Duration::from_millis(500)),
);
assert!(q.wait_for_quorum(10).is_ok());
}
#[test]
fn region_mode_needs_all_regions_acked() {
let p = primary();
p.register_replica("us_a".to_string());
p.register_replica("eu_a".to_string());
let q = QuorumCoordinator::new(
Arc::clone(&p),
QuorumConfig::regions(["us", "eu"]).with_timeout(Duration::from_millis(500)),
);
q.bind_replica_region("us_a", "us");
q.bind_replica_region("eu_a", "eu");
p.ack_replica("us_a", 50);
assert!(!q.has_quorum(50));
p.ack_replica("eu_a", 50);
assert!(q.has_quorum(50));
}
#[test]
fn region_mode_rejects_missing_regions_upfront() {
let p = primary();
p.register_replica("us_a".to_string());
let q = QuorumCoordinator::new(
Arc::clone(&p),
QuorumConfig::regions(["us", "eu"]).with_timeout(Duration::from_millis(500)),
);
q.bind_replica_region("us_a", "us");
match q.wait_for_quorum(1) {
Err(QuorumError::MissingRegions { missing }) => {
assert_eq!(missing, vec!["eu".to_string()]);
}
other => panic!("expected MissingRegions, got {:?}", other),
}
}
#[test]
fn safe_replay_lsn_is_min_across_replicas() {
let p = primary();
p.register_replica("r1".to_string());
p.register_replica("r2".to_string());
p.ack_replica("r1", 100);
p.ack_replica("r2", 50);
let q = QuorumCoordinator::new(Arc::clone(&p), QuorumConfig::async_commit());
assert_eq!(q.safe_replay_lsn(), Some(50));
}
}