reddb_server/replication/
quorum.rs1use std::collections::HashSet;
27use std::sync::Arc;
28use std::time::Duration;
29
30use super::primary::PrimaryReplication;
31
32#[derive(Debug, Clone, PartialEq, Eq)]
34pub enum QuorumMode {
35 Async,
38 Sync { min_replicas: usize },
41 Regions { required: HashSet<String> },
45}
46
47#[derive(Debug, Clone, PartialEq, Eq)]
49pub struct QuorumConfig {
50 pub mode: QuorumMode,
51 pub timeout: Option<Duration>,
55}
56
57impl QuorumConfig {
58 pub fn async_commit() -> Self {
60 Self {
61 mode: QuorumMode::Async,
62 timeout: None,
63 }
64 }
65
66 pub fn sync(min_replicas: usize) -> Self {
69 Self {
70 mode: QuorumMode::Sync { min_replicas },
71 timeout: Some(Duration::from_secs(5)),
72 }
73 }
74
75 pub fn regions<I, S>(regions: I) -> Self
78 where
79 I: IntoIterator<Item = S>,
80 S: Into<String>,
81 {
82 Self {
83 mode: QuorumMode::Regions {
84 required: regions.into_iter().map(|r| r.into()).collect(),
85 },
86 timeout: Some(Duration::from_secs(10)),
87 }
88 }
89
90 pub fn with_timeout(mut self, timeout: Duration) -> Self {
91 self.timeout = Some(timeout);
92 self
93 }
94
95 pub fn without_timeout(mut self) -> Self {
96 self.timeout = None;
97 self
98 }
99
100 pub fn is_async(&self) -> bool {
102 matches!(self.mode, QuorumMode::Async)
103 }
104}
105
106impl Default for QuorumConfig {
107 fn default() -> Self {
108 Self::async_commit()
109 }
110}
111
112#[derive(Debug, Clone)]
117pub enum QuorumError {
118 Timeout {
121 target_lsn: u64,
122 elapsed_ms: u128,
123 acked_regions: HashSet<String>,
124 },
125 InsufficientReplicas { required: usize, connected: usize },
129 MissingRegions { missing: Vec<String> },
133}
134
135impl std::fmt::Display for QuorumError {
136 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
137 match self {
138 QuorumError::Timeout {
139 target_lsn,
140 elapsed_ms,
141 acked_regions,
142 } => write!(
143 f,
144 "quorum timeout after {elapsed_ms}ms waiting for lsn {target_lsn} \
145 (acked by regions: {:?})",
146 acked_regions
147 ),
148 QuorumError::InsufficientReplicas {
149 required,
150 connected,
151 } => write!(
152 f,
153 "quorum requires {required} replicas but only {connected} connected"
154 ),
155 QuorumError::MissingRegions { missing } => {
156 write!(
157 f,
158 "required regions with no connected replicas: {:?}",
159 missing
160 )
161 }
162 }
163 }
164}
165
166impl std::error::Error for QuorumError {}
167
168pub struct QuorumCoordinator {
173 primary: Arc<PrimaryReplication>,
174 config: QuorumConfig,
175 regions: parking_lot::RwLock<std::collections::HashMap<String, String>>,
178}
179
180impl QuorumCoordinator {
181 pub fn new(primary: Arc<PrimaryReplication>, config: QuorumConfig) -> Self {
182 Self {
183 primary,
184 config,
185 regions: parking_lot::RwLock::new(std::collections::HashMap::new()),
186 }
187 }
188
189 pub fn bind_replica_region(&self, replica_id: &str, region: &str) {
193 self.regions
194 .write()
195 .insert(replica_id.to_string(), region.to_string());
196 }
197
198 pub fn unbind_replica(&self, replica_id: &str) {
201 self.regions.write().remove(replica_id);
202 }
203
204 pub fn connected_regions(&self) -> HashSet<String> {
206 self.regions.read().values().cloned().collect()
207 }
208
209 pub fn wait_for_quorum(&self, target_lsn: u64) -> Result<(), QuorumError> {
215 if self.config.is_async() {
216 return Ok(());
217 }
218
219 self.validate_preconditions()?;
221
222 let timeout = self.config.timeout;
223 let start = std::time::Instant::now();
224 let reached = match &self.config.mode {
225 QuorumMode::Async => true,
226 QuorumMode::Sync { min_replicas } => self
227 .primary
228 .commit_waiter
229 .wait_for_commit_watermark(target_lsn, *min_replicas as u32, timeout),
230 QuorumMode::Regions { .. } => self
231 .primary
232 .commit_waiter
233 .wait_for_change_until(timeout, || self.has_quorum(target_lsn)),
234 };
235 if reached {
236 return Ok(());
237 }
238 Err(QuorumError::Timeout {
239 target_lsn,
240 elapsed_ms: start.elapsed().as_millis(),
241 acked_regions: self.acked_regions(target_lsn),
242 })
243 }
244
245 pub fn has_quorum(&self, target_lsn: u64) -> bool {
249 match &self.config.mode {
250 QuorumMode::Async => true,
251 QuorumMode::Sync { min_replicas } => {
252 self.primary
253 .commit_waiter
254 .commit_watermark(*min_replicas as u32)
255 >= target_lsn
256 }
257 QuorumMode::Regions { required } => {
258 let acked = self.acked_regions(target_lsn);
259 required.iter().all(|r| acked.contains(r))
260 }
261 }
262 }
263
264 fn validate_preconditions(&self) -> Result<(), QuorumError> {
265 match &self.config.mode {
266 QuorumMode::Async => Ok(()),
267 QuorumMode::Sync { min_replicas } => {
268 let connected = self.primary.replica_count();
269 if connected < *min_replicas {
270 return Err(QuorumError::InsufficientReplicas {
271 required: *min_replicas,
272 connected,
273 });
274 }
275 Ok(())
276 }
277 QuorumMode::Regions { required } => {
278 let connected = self.connected_regions();
279 let missing: Vec<String> = required
280 .iter()
281 .filter(|r| !connected.contains(*r))
282 .cloned()
283 .collect();
284 if missing.is_empty() {
285 Ok(())
286 } else {
287 Err(QuorumError::MissingRegions { missing })
288 }
289 }
290 }
291 }
292
293 fn acked_regions(&self, target_lsn: u64) -> HashSet<String> {
294 let replicas = self
295 .primary
296 .replicas
297 .read()
298 .unwrap_or_else(|e| e.into_inner());
299 let regions = self.regions.read();
300 replicas
301 .iter()
302 .filter(|r| r.last_durable_lsn >= target_lsn)
303 .filter_map(|r| regions.get(&r.id).cloned())
304 .collect()
305 }
306
307 pub fn commit_watermark(&self) -> u64 {
309 match &self.config.mode {
310 QuorumMode::Async => 0,
311 QuorumMode::Sync { min_replicas } => self
312 .primary
313 .commit_waiter
314 .commit_watermark(*min_replicas as u32),
315 QuorumMode::Regions { required } => self.region_commit_watermark(required),
316 }
317 }
318
319 fn region_commit_watermark(&self, required: &HashSet<String>) -> u64 {
320 if required.is_empty() {
321 return 0;
322 }
323 let replicas = self
324 .primary
325 .replicas
326 .read()
327 .unwrap_or_else(|e| e.into_inner());
328 let regions = self.regions.read();
329 let mut watermark = u64::MAX;
330 for required_region in required {
331 let Some(region_lsn) = replicas
332 .iter()
333 .filter(|r| regions.get(&r.id) == Some(required_region))
334 .map(|r| r.last_durable_lsn)
335 .max()
336 else {
337 return 0;
338 };
339 watermark = watermark.min(region_lsn);
340 }
341 watermark
342 }
343
344 pub fn safe_replay_lsn(&self) -> Option<u64> {
349 let replicas = self
350 .primary
351 .replicas
352 .read()
353 .unwrap_or_else(|e| e.into_inner());
354 replicas.iter().map(|r| r.last_durable_lsn).min()
355 }
356
357 pub fn config(&self) -> &QuorumConfig {
359 &self.config
360 }
361}
362
363#[cfg(test)]
364mod tests {
365 use super::*;
366
367 fn primary() -> Arc<PrimaryReplication> {
368 Arc::new(PrimaryReplication::new(None))
369 }
370
371 #[test]
372 fn async_mode_returns_immediately() {
373 let p = primary();
374 let q = QuorumCoordinator::new(Arc::clone(&p), QuorumConfig::async_commit());
375 assert!(q.wait_for_quorum(42).is_ok());
376 }
377
378 #[test]
379 fn sync_mode_fails_when_too_few_replicas() {
380 let p = primary();
381 let q = QuorumCoordinator::new(Arc::clone(&p), QuorumConfig::sync(2));
382 match q.wait_for_quorum(1) {
384 Err(QuorumError::InsufficientReplicas {
385 required,
386 connected,
387 }) => {
388 assert_eq!(required, 2);
389 assert_eq!(connected, 0);
390 }
391 other => panic!("expected InsufficientReplicas, got {:?}", other),
392 }
393 }
394
395 #[test]
396 fn sync_mode_returns_when_enough_acks() {
397 let p = primary();
398 p.register_replica("r1".to_string());
399 p.register_replica("r2".to_string());
400 p.ack_replica("r1", 10);
401 p.ack_replica("r2", 10);
402 let q = QuorumCoordinator::new(
403 Arc::clone(&p),
404 QuorumConfig::sync(2).with_timeout(Duration::from_millis(500)),
405 );
406 assert!(q.wait_for_quorum(10).is_ok());
407 }
408
409 #[test]
410 fn region_mode_needs_all_regions_acked() {
411 let p = primary();
412 p.register_replica("us_a".to_string());
413 p.register_replica("eu_a".to_string());
414 let q = QuorumCoordinator::new(
415 Arc::clone(&p),
416 QuorumConfig::regions(["us", "eu"]).with_timeout(Duration::from_millis(500)),
417 );
418 q.bind_replica_region("us_a", "us");
419 q.bind_replica_region("eu_a", "eu");
420
421 p.ack_replica("us_a", 50);
423 assert!(!q.has_quorum(50));
424
425 p.ack_replica("eu_a", 50);
427 assert!(q.has_quorum(50));
428 }
429
430 #[test]
431 fn region_mode_requires_durable_lsn_for_watermark() {
432 let p = primary();
433 p.register_replica("us_a".to_string());
434 p.register_replica("eu_a".to_string());
435 let q = QuorumCoordinator::new(
436 Arc::clone(&p),
437 QuorumConfig::regions(["us", "eu"]).with_timeout(Duration::from_millis(50)),
438 );
439 q.bind_replica_region("us_a", "us");
440 q.bind_replica_region("eu_a", "eu");
441
442 p.ack_replica_lsn("us_a", 50, 50);
443 p.ack_replica_lsn("eu_a", 50, 40);
444 assert!(!q.has_quorum(50));
445 assert_eq!(q.commit_watermark(), 40);
446
447 p.ack_replica_lsn("eu_a", 50, 50);
448 assert!(q.has_quorum(50));
449 assert_eq!(q.commit_watermark(), 50);
450 }
451
452 #[test]
453 fn region_mode_wait_wakes_on_durable_ack() {
454 let p = primary();
455 p.register_replica("us_a".to_string());
456 p.register_replica("eu_a".to_string());
457 let q = Arc::new(QuorumCoordinator::new(
458 Arc::clone(&p),
459 QuorumConfig::regions(["us", "eu"]).with_timeout(Duration::from_secs(1)),
460 ));
461 q.bind_replica_region("us_a", "us");
462 q.bind_replica_region("eu_a", "eu");
463
464 let waiter = Arc::clone(&q);
465 let handle = std::thread::spawn(move || waiter.wait_for_quorum(75));
466 std::thread::sleep(Duration::from_millis(20));
467 p.ack_replica_lsn("us_a", 75, 75);
468 p.ack_replica_lsn("eu_a", 75, 75);
469
470 handle
471 .join()
472 .expect("waiter thread")
473 .expect("quorum should release after durable acks");
474 }
475
476 #[test]
477 fn region_mode_rejects_missing_regions_upfront() {
478 let p = primary();
479 p.register_replica("us_a".to_string());
480 let q = QuorumCoordinator::new(
481 Arc::clone(&p),
482 QuorumConfig::regions(["us", "eu"]).with_timeout(Duration::from_millis(500)),
483 );
484 q.bind_replica_region("us_a", "us");
485 match q.wait_for_quorum(1) {
487 Err(QuorumError::MissingRegions { missing }) => {
488 assert_eq!(missing, vec!["eu".to_string()]);
489 }
490 other => panic!("expected MissingRegions, got {:?}", other),
491 }
492 }
493
494 #[test]
495 fn safe_replay_lsn_is_min_across_replicas() {
496 let p = primary();
497 p.register_replica("r1".to_string());
498 p.register_replica("r2".to_string());
499 p.ack_replica("r1", 100);
500 p.ack_replica("r2", 50);
501 let q = QuorumCoordinator::new(Arc::clone(&p), QuorumConfig::async_commit());
502 assert_eq!(q.safe_replay_lsn(), Some(50));
503 }
504}