reddb_server/replication/
flow_control.rs1use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
37
38use super::primary::ReplicaState;
39use super::quorum::{QuorumConfig, QuorumMode};
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq)]
43pub enum Admission {
44 Granted,
46 Throttled,
49}
50
51impl Admission {
52 pub fn is_granted(self) -> bool {
53 matches!(self, Admission::Granted)
54 }
55}
56
57pub fn is_in_quorum(replica: &ReplicaState, quorum: &QuorumConfig) -> bool {
62 match &quorum.mode {
63 QuorumMode::Async => false,
65 QuorumMode::Sync { .. } => true,
67 QuorumMode::Regions { required } => replica
69 .region
70 .as_deref()
71 .map(|region| required.contains(region))
72 .unwrap_or(false),
73 }
74}
75
76pub fn in_quorum_max_lag_lsn(
81 replicas: &[ReplicaState],
82 primary_lsn: u64,
83 quorum: &QuorumConfig,
84) -> u64 {
85 replicas
86 .iter()
87 .filter(|replica| is_in_quorum(replica, quorum))
88 .map(|replica| primary_lsn.saturating_sub(replica.last_acked_lsn))
89 .max()
90 .unwrap_or(0)
91}
92
93#[derive(Debug)]
100pub struct FlowController {
101 soft_target_lsn: AtomicU64,
103 throttled: AtomicBool,
105 observed_lag_lsn: AtomicU64,
107 quorum: QuorumConfig,
109}
110
111impl FlowController {
112 pub fn disabled() -> Self {
114 Self::new(0, QuorumConfig::async_commit())
115 }
116
117 pub fn new(soft_target_lsn: u64, quorum: QuorumConfig) -> Self {
119 Self {
120 soft_target_lsn: AtomicU64::new(soft_target_lsn),
121 throttled: AtomicBool::new(false),
122 observed_lag_lsn: AtomicU64::new(0),
123 quorum,
124 }
125 }
126
127 pub fn configure_soft_target(&self, soft_target_lsn: u64) {
130 self.soft_target_lsn
131 .store(soft_target_lsn, Ordering::Release);
132 if soft_target_lsn == 0 {
133 self.throttled.store(false, Ordering::Release);
134 }
135 }
136
137 pub fn soft_target_lsn(&self) -> u64 {
139 self.soft_target_lsn.load(Ordering::Acquire)
140 }
141
142 pub fn is_enabled(&self) -> bool {
144 self.soft_target_lsn() > 0
145 }
146
147 pub fn is_throttled(&self) -> bool {
149 self.throttled.load(Ordering::Acquire)
150 }
151
152 pub fn observed_lag_lsn(&self) -> u64 {
154 self.observed_lag_lsn.load(Ordering::Acquire)
155 }
156
157 pub fn observe(&self, replicas: &[ReplicaState], primary_lsn: u64) -> bool {
164 let soft_target = self.soft_target_lsn();
165 let lag = in_quorum_max_lag_lsn(replicas, primary_lsn, &self.quorum);
166 self.observed_lag_lsn.store(lag, Ordering::Release);
167 let throttled = soft_target > 0 && lag > soft_target;
169 self.throttled.store(throttled, Ordering::Release);
170 throttled
171 }
172
173 pub fn try_admit(&self) -> Admission {
176 if self.is_throttled() {
177 Admission::Throttled
178 } else {
179 Admission::Granted
180 }
181 }
182}
183
184#[cfg(test)]
185mod tests {
186 use super::*;
187
188 fn replica(id: &str, region: Option<&str>, last_acked_lsn: u64) -> ReplicaState {
189 ReplicaState {
190 id: id.to_string(),
191 last_acked_lsn,
192 last_sent_lsn: last_acked_lsn,
193 last_durable_lsn: last_acked_lsn,
194 apply_error_count: 0,
195 divergence_count: 0,
196 connected_at_unix_ms: 0,
197 last_seen_at_unix_ms: 0,
198 region: region.map(String::from),
199 rebootstrapping: false,
200 }
201 }
202
203 #[test]
204 fn async_mode_classifies_no_replica_in_quorum() {
205 let q = QuorumConfig::async_commit();
206 assert!(!is_in_quorum(&replica("r1", Some("us"), 0), &q));
207 }
208
209 #[test]
210 fn sync_mode_classifies_every_replica_in_quorum() {
211 let q = QuorumConfig::sync(2);
212 assert!(is_in_quorum(&replica("r1", None, 0), &q));
213 assert!(is_in_quorum(&replica("r2", Some("eu"), 0), &q));
214 }
215
216 #[test]
217 fn regions_mode_classifies_only_required_regions_in_quorum() {
218 let q = QuorumConfig::regions(["us", "eu"]);
219 assert!(is_in_quorum(&replica("r1", Some("us"), 0), &q));
220 assert!(is_in_quorum(&replica("r2", Some("eu"), 0), &q));
221 assert!(!is_in_quorum(&replica("r3", Some("ap"), 0), &q));
223 assert!(!is_in_quorum(&replica("r4", None, 0), &q));
225 }
226
227 #[test]
228 fn disabled_controller_never_throttles() {
229 let fc = FlowController::disabled();
230 let replicas = vec![replica("r1", Some("us"), 0)];
231 assert!(!fc.observe(&replicas, 1_000_000));
233 assert!(!fc.is_throttled());
234 assert_eq!(fc.try_admit(), Admission::Granted);
235 }
236
237 #[test]
238 fn engages_when_in_quorum_replica_exceeds_soft_target() {
239 let fc = FlowController::new(100, QuorumConfig::sync(1));
240 let replicas = vec![replica("r1", Some("us"), 350)];
242 assert!(fc.observe(&replicas, 500));
243 assert!(fc.is_throttled());
244 assert_eq!(fc.observed_lag_lsn(), 150);
245 assert_eq!(fc.try_admit(), Admission::Throttled);
246 }
247
248 #[test]
249 fn releases_when_in_quorum_replica_recovers() {
250 let fc = FlowController::new(100, QuorumConfig::sync(1));
251 let lagging = vec![replica("r1", Some("us"), 350)];
252 assert!(fc.observe(&lagging, 500));
253 assert_eq!(fc.try_admit(), Admission::Throttled);
254
255 let recovered = vec![replica("r1", Some("us"), 450)];
257 assert!(!fc.observe(&recovered, 500));
258 assert!(!fc.is_throttled());
259 assert_eq!(fc.observed_lag_lsn(), 50);
260 assert_eq!(fc.try_admit(), Admission::Granted);
261 }
262
263 #[test]
264 fn at_soft_target_boundary_does_not_throttle() {
265 let fc = FlowController::new(100, QuorumConfig::sync(1));
266 let replicas = vec![replica("r1", Some("us"), 400)];
268 assert!(!fc.observe(&replicas, 500));
269 assert!(!fc.is_throttled());
270 }
271
272 #[test]
273 fn async_read_replica_lag_never_engages_throttling() {
274 let fc = FlowController::new(100, QuorumConfig::regions(["us"]));
277 let replicas = vec![
278 replica("in-quorum-us", Some("us"), 500), replica("async-ap", Some("ap"), 0), ];
281 assert!(!fc.observe(&replicas, 500));
282 assert!(!fc.is_throttled());
283 assert_eq!(fc.observed_lag_lsn(), 0);
286 assert_eq!(fc.try_admit(), Admission::Granted);
287 }
288
289 #[test]
290 fn in_quorum_replica_still_throttles_with_async_replica_present() {
291 let fc = FlowController::new(100, QuorumConfig::regions(["us"]));
294 let replicas = vec![
295 replica("in-quorum-us", Some("us"), 300), replica("async-ap", Some("ap"), 500), ];
298 assert!(fc.observe(&replicas, 500));
299 assert!(fc.is_throttled());
300 assert_eq!(fc.observed_lag_lsn(), 200);
301 }
302
303 #[test]
304 fn configure_soft_target_zero_releases_throttle() {
305 let fc = FlowController::new(100, QuorumConfig::sync(1));
306 assert!(fc.observe(&[replica("r1", Some("us"), 0)], 500));
307 assert!(fc.is_throttled());
308 fc.configure_soft_target(0);
310 assert!(!fc.is_enabled());
311 assert!(!fc.is_throttled());
312 assert_eq!(fc.try_admit(), Admission::Granted);
313 }
314
315 #[test]
316 fn no_in_quorum_replicas_never_throttles() {
317 let fc = FlowController::new(10, QuorumConfig::regions(["us"]));
321 let replicas = vec![replica("ap-only", Some("ap"), 0)];
322 assert!(!fc.observe(&replicas, 1_000));
323 assert_eq!(fc.observed_lag_lsn(), 0);
324 assert!(!fc.is_throttled());
325 }
326}