use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use super::primary::ReplicaState;
use super::quorum::{QuorumConfig, QuorumMode};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Admission {
Granted,
Throttled,
}
impl Admission {
pub fn is_granted(self) -> bool {
matches!(self, Admission::Granted)
}
}
pub fn is_in_quorum(replica: &ReplicaState, quorum: &QuorumConfig) -> bool {
match &quorum.mode {
QuorumMode::Async => false,
QuorumMode::Sync { .. } => true,
QuorumMode::Regions { required } => replica
.region
.as_deref()
.map(|region| required.contains(region))
.unwrap_or(false),
}
}
pub fn in_quorum_max_lag_lsn(
replicas: &[ReplicaState],
primary_lsn: u64,
quorum: &QuorumConfig,
) -> u64 {
replicas
.iter()
.filter(|replica| is_in_quorum(replica, quorum))
.map(|replica| primary_lsn.saturating_sub(replica.last_acked_lsn))
.max()
.unwrap_or(0)
}
#[derive(Debug)]
pub struct FlowController {
soft_target_lsn: AtomicU64,
throttled: AtomicBool,
observed_lag_lsn: AtomicU64,
quorum: QuorumConfig,
}
impl FlowController {
pub fn disabled() -> Self {
Self::new(0, QuorumConfig::async_commit())
}
pub fn new(soft_target_lsn: u64, quorum: QuorumConfig) -> Self {
Self {
soft_target_lsn: AtomicU64::new(soft_target_lsn),
throttled: AtomicBool::new(false),
observed_lag_lsn: AtomicU64::new(0),
quorum,
}
}
pub fn configure_soft_target(&self, soft_target_lsn: u64) {
self.soft_target_lsn
.store(soft_target_lsn, Ordering::Release);
if soft_target_lsn == 0 {
self.throttled.store(false, Ordering::Release);
}
}
pub fn soft_target_lsn(&self) -> u64 {
self.soft_target_lsn.load(Ordering::Acquire)
}
pub fn is_enabled(&self) -> bool {
self.soft_target_lsn() > 0
}
pub fn is_throttled(&self) -> bool {
self.throttled.load(Ordering::Acquire)
}
pub fn observed_lag_lsn(&self) -> u64 {
self.observed_lag_lsn.load(Ordering::Acquire)
}
pub fn observe(&self, replicas: &[ReplicaState], primary_lsn: u64) -> bool {
let soft_target = self.soft_target_lsn();
let lag = in_quorum_max_lag_lsn(replicas, primary_lsn, &self.quorum);
self.observed_lag_lsn.store(lag, Ordering::Release);
let throttled = soft_target > 0 && lag > soft_target;
self.throttled.store(throttled, Ordering::Release);
throttled
}
pub fn try_admit(&self) -> Admission {
if self.is_throttled() {
Admission::Throttled
} else {
Admission::Granted
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn replica(id: &str, region: Option<&str>, last_acked_lsn: u64) -> ReplicaState {
ReplicaState {
id: id.to_string(),
last_acked_lsn,
last_sent_lsn: last_acked_lsn,
last_durable_lsn: last_acked_lsn,
apply_error_count: 0,
divergence_count: 0,
connected_at_unix_ms: 0,
last_seen_at_unix_ms: 0,
region: region.map(String::from),
rebootstrapping: false,
}
}
#[test]
fn async_mode_classifies_no_replica_in_quorum() {
let q = QuorumConfig::async_commit();
assert!(!is_in_quorum(&replica("r1", Some("us"), 0), &q));
}
#[test]
fn sync_mode_classifies_every_replica_in_quorum() {
let q = QuorumConfig::sync(2);
assert!(is_in_quorum(&replica("r1", None, 0), &q));
assert!(is_in_quorum(&replica("r2", Some("eu"), 0), &q));
}
#[test]
fn regions_mode_classifies_only_required_regions_in_quorum() {
let q = QuorumConfig::regions(["us", "eu"]);
assert!(is_in_quorum(&replica("r1", Some("us"), 0), &q));
assert!(is_in_quorum(&replica("r2", Some("eu"), 0), &q));
assert!(!is_in_quorum(&replica("r3", Some("ap"), 0), &q));
assert!(!is_in_quorum(&replica("r4", None, 0), &q));
}
#[test]
fn disabled_controller_never_throttles() {
let fc = FlowController::disabled();
let replicas = vec![replica("r1", Some("us"), 0)];
assert!(!fc.observe(&replicas, 1_000_000));
assert!(!fc.is_throttled());
assert_eq!(fc.try_admit(), Admission::Granted);
}
#[test]
fn engages_when_in_quorum_replica_exceeds_soft_target() {
let fc = FlowController::new(100, QuorumConfig::sync(1));
let replicas = vec![replica("r1", Some("us"), 350)];
assert!(fc.observe(&replicas, 500));
assert!(fc.is_throttled());
assert_eq!(fc.observed_lag_lsn(), 150);
assert_eq!(fc.try_admit(), Admission::Throttled);
}
#[test]
fn releases_when_in_quorum_replica_recovers() {
let fc = FlowController::new(100, QuorumConfig::sync(1));
let lagging = vec![replica("r1", Some("us"), 350)];
assert!(fc.observe(&lagging, 500));
assert_eq!(fc.try_admit(), Admission::Throttled);
let recovered = vec![replica("r1", Some("us"), 450)];
assert!(!fc.observe(&recovered, 500));
assert!(!fc.is_throttled());
assert_eq!(fc.observed_lag_lsn(), 50);
assert_eq!(fc.try_admit(), Admission::Granted);
}
#[test]
fn at_soft_target_boundary_does_not_throttle() {
let fc = FlowController::new(100, QuorumConfig::sync(1));
let replicas = vec![replica("r1", Some("us"), 400)];
assert!(!fc.observe(&replicas, 500));
assert!(!fc.is_throttled());
}
#[test]
fn async_read_replica_lag_never_engages_throttling() {
let fc = FlowController::new(100, QuorumConfig::regions(["us"]));
let replicas = vec![
replica("in-quorum-us", Some("us"), 500), replica("async-ap", Some("ap"), 0), ];
assert!(!fc.observe(&replicas, 500));
assert!(!fc.is_throttled());
assert_eq!(fc.observed_lag_lsn(), 0);
assert_eq!(fc.try_admit(), Admission::Granted);
}
#[test]
fn in_quorum_replica_still_throttles_with_async_replica_present() {
let fc = FlowController::new(100, QuorumConfig::regions(["us"]));
let replicas = vec![
replica("in-quorum-us", Some("us"), 300), replica("async-ap", Some("ap"), 500), ];
assert!(fc.observe(&replicas, 500));
assert!(fc.is_throttled());
assert_eq!(fc.observed_lag_lsn(), 200);
}
#[test]
fn configure_soft_target_zero_releases_throttle() {
let fc = FlowController::new(100, QuorumConfig::sync(1));
assert!(fc.observe(&[replica("r1", Some("us"), 0)], 500));
assert!(fc.is_throttled());
fc.configure_soft_target(0);
assert!(!fc.is_enabled());
assert!(!fc.is_throttled());
assert_eq!(fc.try_admit(), Admission::Granted);
}
#[test]
fn no_in_quorum_replicas_never_throttles() {
let fc = FlowController::new(10, QuorumConfig::regions(["us"]));
let replicas = vec![replica("ap-only", Some("ap"), 0)];
assert!(!fc.observe(&replicas, 1_000));
assert_eq!(fc.observed_lag_lsn(), 0);
assert!(!fc.is_throttled());
}
}