Skip to main content

reddb_server/replication/
flow_control.rs

1//! Write-admission flow control keyed on in-quorum replica lag (issue #826).
2//!
3//! The primary streams WAL to every connected replica, but only some of
4//! those replicas count toward the configured commit quorum. When a
5//! *quorum member* falls behind, the primary should slow incoming writes
6//! so the lagging member can catch up — otherwise sync/quorum commits
7//! stall and the lag compounds. Replicas that are pure read scale-out
8//! (async, not in the quorum) must never exert this back-pressure: read
9//! fan-out should not be able to throttle write throughput.
10//!
11//! `FlowController` implements that policy as a ticket-based admission
12//! gate. It watches the max lag across *in-quorum* replicas against a
13//! soft target (in LSN records):
14//!
15//! * lag `<=` soft target → tickets flow, writes admitted.
16//! * lag `>`  soft target → throttle engaged, admission tickets denied
17//!   until the quorum member recovers below the target.
18//!
19//! A soft target of `0` disables the feature entirely (the default), so
20//! standalone and async-commit deployments are unaffected. The decision
21//! mirrors the engine-managed graceful-pause precedent in
22//! [`crate::runtime::write_gate`] (issue #519 archive-lag auto-pause):
23//! an independent, automatically-engaging/releasing gate that the
24//! operator's manual read-only pin never stomps.
25//!
26//! In-quorum membership is derived from the active [`QuorumConfig`]:
27//!
28//! * [`QuorumMode::Async`] — no replica is synchronous, so *nothing* is
29//!   in-quorum and the controller never throttles.
30//! * [`QuorumMode::Sync`] — every connected replica is a candidate for
31//!   the synchronous quorum and counts toward the lag signal.
32//! * [`QuorumMode::Regions`] — only replicas whose declared region is in
33//!   the required set count; replicas in other regions (or with no
34//!   region) are async read-replicas and are excluded.
35
36use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
37
38use super::primary::ReplicaState;
39use super::quorum::{QuorumConfig, QuorumMode};
40
41/// Outcome of a write-admission attempt.
42#[derive(Debug, Clone, Copy, PartialEq, Eq)]
43pub enum Admission {
44    /// A ticket was issued — the write may proceed.
45    Granted,
46    /// Throttle engaged — an in-quorum replica's lag exceeds the soft
47    /// target. The caller must not proceed with the write.
48    Throttled,
49}
50
51impl Admission {
52    pub fn is_granted(self) -> bool {
53        matches!(self, Admission::Granted)
54    }
55}
56
57/// Is `replica` a member of the commit quorum under `quorum`?
58///
59/// Async read-replicas (not in the quorum) return `false` and are
60/// therefore excluded from the flow-control lag signal.
61pub fn is_in_quorum(replica: &ReplicaState, quorum: &QuorumConfig) -> bool {
62    match &quorum.mode {
63        // Async-commit: nobody is synchronous, so no replica gates writes.
64        QuorumMode::Async => false,
65        // Sync(n): every connected replica is a quorum candidate.
66        QuorumMode::Sync { .. } => true,
67        // Regions: only replicas in a required region are in-quorum.
68        QuorumMode::Regions { required } => replica
69            .region
70            .as_deref()
71            .map(|region| required.contains(region))
72            .unwrap_or(false),
73    }
74}
75
76/// Max lag in LSN records across the in-quorum replicas, measured as the
77/// distance from the primary's current LSN to each replica's last acked
78/// LSN. Async read-replicas are excluded. Returns `0` when no replica is
79/// in-quorum (so the controller never throttles on read scale-out alone).
80pub fn in_quorum_max_lag_lsn(
81    replicas: &[ReplicaState],
82    primary_lsn: u64,
83    quorum: &QuorumConfig,
84) -> u64 {
85    replicas
86        .iter()
87        .filter(|replica| is_in_quorum(replica, quorum))
88        .map(|replica| primary_lsn.saturating_sub(replica.last_acked_lsn))
89        .max()
90        .unwrap_or(0)
91}
92
93/// Ticket-based write-admission flow controller.
94///
95/// Holds the soft-target policy and the live throttle state. Cheap to
96/// share: every field is a single atomic plus the immutable quorum
97/// config, so [`Self::try_admit`] on the write hot path is a couple of
98/// relaxed loads.
99#[derive(Debug)]
100pub struct FlowController {
101    /// Soft target lag in LSN records. `0` disables throttling.
102    soft_target_lsn: AtomicU64,
103    /// Whether the throttle is currently engaged.
104    throttled: AtomicBool,
105    /// Most recent observed max in-quorum lag (for metrics).
106    observed_lag_lsn: AtomicU64,
107    /// Active quorum shape — decides which replicas count as in-quorum.
108    quorum: QuorumConfig,
109}
110
111impl FlowController {
112    /// A disabled controller (soft target `0`): never throttles.
113    pub fn disabled() -> Self {
114        Self::new(0, QuorumConfig::async_commit())
115    }
116
117    /// Build a controller with an explicit soft target and quorum shape.
118    pub fn new(soft_target_lsn: u64, quorum: QuorumConfig) -> Self {
119        Self {
120            soft_target_lsn: AtomicU64::new(soft_target_lsn),
121            throttled: AtomicBool::new(false),
122            observed_lag_lsn: AtomicU64::new(0),
123            quorum,
124        }
125    }
126
127    /// Install (or change) the soft target at runtime. `0` disables the
128    /// feature and immediately releases any active throttle.
129    pub fn configure_soft_target(&self, soft_target_lsn: u64) {
130        self.soft_target_lsn
131            .store(soft_target_lsn, Ordering::Release);
132        if soft_target_lsn == 0 {
133            self.throttled.store(false, Ordering::Release);
134        }
135    }
136
137    /// Soft target in LSN records. `0` means disabled.
138    pub fn soft_target_lsn(&self) -> u64 {
139        self.soft_target_lsn.load(Ordering::Acquire)
140    }
141
142    /// Whether flow control is enabled (soft target `> 0`).
143    pub fn is_enabled(&self) -> bool {
144        self.soft_target_lsn() > 0
145    }
146
147    /// Whether the throttle is currently engaged.
148    pub fn is_throttled(&self) -> bool {
149        self.throttled.load(Ordering::Acquire)
150    }
151
152    /// Most recent observed max in-quorum replica lag in LSN records.
153    pub fn observed_lag_lsn(&self) -> u64 {
154        self.observed_lag_lsn.load(Ordering::Acquire)
155    }
156
157    /// Re-evaluate the throttle from a replica snapshot.
158    ///
159    /// Computes the max lag across in-quorum replicas (async read-replicas
160    /// excluded) and engages the throttle when it exceeds the soft target,
161    /// releasing it when the quorum member recovers to/below the target.
162    /// Returns the resulting `throttled` state.
163    pub fn observe(&self, replicas: &[ReplicaState], primary_lsn: u64) -> bool {
164        let soft_target = self.soft_target_lsn();
165        let lag = in_quorum_max_lag_lsn(replicas, primary_lsn, &self.quorum);
166        self.observed_lag_lsn.store(lag, Ordering::Release);
167        // Disabled (soft target 0) can never throttle.
168        let throttled = soft_target > 0 && lag > soft_target;
169        self.throttled.store(throttled, Ordering::Release);
170        throttled
171    }
172
173    /// Request a write-admission ticket. `Granted` unless the throttle is
174    /// engaged. The check is a single relaxed load on the hot path.
175    pub fn try_admit(&self) -> Admission {
176        if self.is_throttled() {
177            Admission::Throttled
178        } else {
179            Admission::Granted
180        }
181    }
182}
183
184#[cfg(test)]
185mod tests {
186    use super::*;
187
188    fn replica(id: &str, region: Option<&str>, last_acked_lsn: u64) -> ReplicaState {
189        ReplicaState {
190            id: id.to_string(),
191            last_acked_lsn,
192            last_sent_lsn: last_acked_lsn,
193            last_durable_lsn: last_acked_lsn,
194            apply_error_count: 0,
195            divergence_count: 0,
196            connected_at_unix_ms: 0,
197            last_seen_at_unix_ms: 0,
198            region: region.map(String::from),
199            rebootstrapping: false,
200        }
201    }
202
203    #[test]
204    fn async_mode_classifies_no_replica_in_quorum() {
205        let q = QuorumConfig::async_commit();
206        assert!(!is_in_quorum(&replica("r1", Some("us"), 0), &q));
207    }
208
209    #[test]
210    fn sync_mode_classifies_every_replica_in_quorum() {
211        let q = QuorumConfig::sync(2);
212        assert!(is_in_quorum(&replica("r1", None, 0), &q));
213        assert!(is_in_quorum(&replica("r2", Some("eu"), 0), &q));
214    }
215
216    #[test]
217    fn regions_mode_classifies_only_required_regions_in_quorum() {
218        let q = QuorumConfig::regions(["us", "eu"]);
219        assert!(is_in_quorum(&replica("r1", Some("us"), 0), &q));
220        assert!(is_in_quorum(&replica("r2", Some("eu"), 0), &q));
221        // async read-replica in a non-required region — excluded.
222        assert!(!is_in_quorum(&replica("r3", Some("ap"), 0), &q));
223        // no declared region — excluded.
224        assert!(!is_in_quorum(&replica("r4", None, 0), &q));
225    }
226
227    #[test]
228    fn disabled_controller_never_throttles() {
229        let fc = FlowController::disabled();
230        let replicas = vec![replica("r1", Some("us"), 0)];
231        // Huge lag, but soft target 0 → no throttle.
232        assert!(!fc.observe(&replicas, 1_000_000));
233        assert!(!fc.is_throttled());
234        assert_eq!(fc.try_admit(), Admission::Granted);
235    }
236
237    #[test]
238    fn engages_when_in_quorum_replica_exceeds_soft_target() {
239        let fc = FlowController::new(100, QuorumConfig::sync(1));
240        // primary at 500, replica acked 350 → lag 150 > 100.
241        let replicas = vec![replica("r1", Some("us"), 350)];
242        assert!(fc.observe(&replicas, 500));
243        assert!(fc.is_throttled());
244        assert_eq!(fc.observed_lag_lsn(), 150);
245        assert_eq!(fc.try_admit(), Admission::Throttled);
246    }
247
248    #[test]
249    fn releases_when_in_quorum_replica_recovers() {
250        let fc = FlowController::new(100, QuorumConfig::sync(1));
251        let lagging = vec![replica("r1", Some("us"), 350)];
252        assert!(fc.observe(&lagging, 500));
253        assert_eq!(fc.try_admit(), Admission::Throttled);
254
255        // Replica catches up to within the soft target (lag 50 <= 100).
256        let recovered = vec![replica("r1", Some("us"), 450)];
257        assert!(!fc.observe(&recovered, 500));
258        assert!(!fc.is_throttled());
259        assert_eq!(fc.observed_lag_lsn(), 50);
260        assert_eq!(fc.try_admit(), Admission::Granted);
261    }
262
263    #[test]
264    fn at_soft_target_boundary_does_not_throttle() {
265        let fc = FlowController::new(100, QuorumConfig::sync(1));
266        // lag exactly == soft target → not throttled (strictly greater).
267        let replicas = vec![replica("r1", Some("us"), 400)];
268        assert!(!fc.observe(&replicas, 500));
269        assert!(!fc.is_throttled());
270    }
271
272    #[test]
273    fn async_read_replica_lag_never_engages_throttling() {
274        // Regions quorum requires "us". An async read-replica in "ap"
275        // lags massively, but the in-quorum "us" replica is caught up.
276        let fc = FlowController::new(100, QuorumConfig::regions(["us"]));
277        let replicas = vec![
278            replica("in-quorum-us", Some("us"), 500), // caught up
279            replica("async-ap", Some("ap"), 0),       // lag 500, excluded
280        ];
281        assert!(!fc.observe(&replicas, 500));
282        assert!(!fc.is_throttled());
283        // The lag signal reflects only the in-quorum replica (0), not the
284        // async read-replica's 500-record lag.
285        assert_eq!(fc.observed_lag_lsn(), 0);
286        assert_eq!(fc.try_admit(), Admission::Granted);
287    }
288
289    #[test]
290    fn in_quorum_replica_still_throttles_with_async_replica_present() {
291        // Same shape, but now the in-quorum "us" replica lags past target
292        // while the async "ap" replica is caught up — must still throttle.
293        let fc = FlowController::new(100, QuorumConfig::regions(["us"]));
294        let replicas = vec![
295            replica("in-quorum-us", Some("us"), 300), // lag 200 > 100
296            replica("async-ap", Some("ap"), 500),     // caught up, excluded
297        ];
298        assert!(fc.observe(&replicas, 500));
299        assert!(fc.is_throttled());
300        assert_eq!(fc.observed_lag_lsn(), 200);
301    }
302
303    #[test]
304    fn configure_soft_target_zero_releases_throttle() {
305        let fc = FlowController::new(100, QuorumConfig::sync(1));
306        assert!(fc.observe(&[replica("r1", Some("us"), 0)], 500));
307        assert!(fc.is_throttled());
308        // Operator disables flow control — throttle releases immediately.
309        fc.configure_soft_target(0);
310        assert!(!fc.is_enabled());
311        assert!(!fc.is_throttled());
312        assert_eq!(fc.try_admit(), Admission::Granted);
313    }
314
315    #[test]
316    fn no_in_quorum_replicas_never_throttles() {
317        // Sync quorum configured but only async (region-excluded under a
318        // regions quorum) — here Sync counts all, so use regions with no
319        // matching replica to prove "no in-quorum members → lag 0".
320        let fc = FlowController::new(10, QuorumConfig::regions(["us"]));
321        let replicas = vec![replica("ap-only", Some("ap"), 0)];
322        assert!(!fc.observe(&replicas, 1_000));
323        assert_eq!(fc.observed_lag_lsn(), 0);
324        assert!(!fc.is_throttled());
325    }
326}