reddb_server/replication/
quorum.rs1use std::collections::HashSet;
26use std::sync::Arc;
27use std::time::{Duration, Instant};
28
29use super::primary::PrimaryReplication;
30
31#[derive(Debug, Clone, PartialEq, Eq)]
33pub enum QuorumMode {
34 Async,
37 Sync { min_replicas: usize },
40 Regions { required: HashSet<String> },
44}
45
46#[derive(Debug, Clone, PartialEq, Eq)]
48pub struct QuorumConfig {
49 pub mode: QuorumMode,
50 pub timeout: Option<Duration>,
54}
55
56impl QuorumConfig {
57 pub fn async_commit() -> Self {
59 Self {
60 mode: QuorumMode::Async,
61 timeout: None,
62 }
63 }
64
65 pub fn sync(min_replicas: usize) -> Self {
68 Self {
69 mode: QuorumMode::Sync { min_replicas },
70 timeout: Some(Duration::from_secs(5)),
71 }
72 }
73
74 pub fn regions<I, S>(regions: I) -> Self
77 where
78 I: IntoIterator<Item = S>,
79 S: Into<String>,
80 {
81 Self {
82 mode: QuorumMode::Regions {
83 required: regions.into_iter().map(|r| r.into()).collect(),
84 },
85 timeout: Some(Duration::from_secs(10)),
86 }
87 }
88
89 pub fn with_timeout(mut self, timeout: Duration) -> Self {
90 self.timeout = Some(timeout);
91 self
92 }
93
94 pub fn without_timeout(mut self) -> Self {
95 self.timeout = None;
96 self
97 }
98
99 pub fn is_async(&self) -> bool {
101 matches!(self.mode, QuorumMode::Async)
102 }
103}
104
105impl Default for QuorumConfig {
106 fn default() -> Self {
107 Self::async_commit()
108 }
109}
110
111#[derive(Debug, Clone)]
116pub enum QuorumError {
117 Timeout {
120 target_lsn: u64,
121 elapsed_ms: u128,
122 acked_regions: HashSet<String>,
123 },
124 InsufficientReplicas { required: usize, connected: usize },
128 MissingRegions { missing: Vec<String> },
132}
133
134impl std::fmt::Display for QuorumError {
135 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
136 match self {
137 QuorumError::Timeout {
138 target_lsn,
139 elapsed_ms,
140 acked_regions,
141 } => write!(
142 f,
143 "quorum timeout after {elapsed_ms}ms waiting for lsn {target_lsn} \
144 (acked by regions: {:?})",
145 acked_regions
146 ),
147 QuorumError::InsufficientReplicas {
148 required,
149 connected,
150 } => write!(
151 f,
152 "quorum requires {required} replicas but only {connected} connected"
153 ),
154 QuorumError::MissingRegions { missing } => {
155 write!(
156 f,
157 "required regions with no connected replicas: {:?}",
158 missing
159 )
160 }
161 }
162 }
163}
164
165impl std::error::Error for QuorumError {}
166
167pub struct QuorumCoordinator {
172 primary: Arc<PrimaryReplication>,
173 config: QuorumConfig,
174 regions: parking_lot::RwLock<std::collections::HashMap<String, String>>,
177}
178
179impl QuorumCoordinator {
180 pub fn new(primary: Arc<PrimaryReplication>, config: QuorumConfig) -> Self {
181 Self {
182 primary,
183 config,
184 regions: parking_lot::RwLock::new(std::collections::HashMap::new()),
185 }
186 }
187
188 pub fn bind_replica_region(&self, replica_id: &str, region: &str) {
192 self.regions
193 .write()
194 .insert(replica_id.to_string(), region.to_string());
195 }
196
197 pub fn unbind_replica(&self, replica_id: &str) {
200 self.regions.write().remove(replica_id);
201 }
202
203 pub fn connected_regions(&self) -> HashSet<String> {
205 self.regions.read().values().cloned().collect()
206 }
207
208 pub fn wait_for_quorum(&self, target_lsn: u64) -> Result<(), QuorumError> {
214 if self.config.is_async() {
215 return Ok(());
216 }
217
218 self.validate_preconditions()?;
220
221 let start = Instant::now();
222 let timeout = self.config.timeout;
223 loop {
224 if self.has_quorum(target_lsn) {
225 return Ok(());
226 }
227 if let Some(limit) = timeout {
228 if start.elapsed() >= limit {
229 return Err(QuorumError::Timeout {
230 target_lsn,
231 elapsed_ms: start.elapsed().as_millis(),
232 acked_regions: self.acked_regions(target_lsn),
233 });
234 }
235 }
236 std::thread::sleep(Duration::from_millis(25));
241 }
242 }
243
244 pub fn has_quorum(&self, target_lsn: u64) -> bool {
248 match &self.config.mode {
249 QuorumMode::Async => true,
250 QuorumMode::Sync { min_replicas } => self.count_acked(target_lsn) >= *min_replicas,
251 QuorumMode::Regions { required } => {
252 let acked = self.acked_regions(target_lsn);
253 required.iter().all(|r| acked.contains(r))
254 }
255 }
256 }
257
258 fn validate_preconditions(&self) -> Result<(), QuorumError> {
259 match &self.config.mode {
260 QuorumMode::Async => Ok(()),
261 QuorumMode::Sync { min_replicas } => {
262 let connected = self.primary.replica_count();
263 if connected < *min_replicas {
264 return Err(QuorumError::InsufficientReplicas {
265 required: *min_replicas,
266 connected,
267 });
268 }
269 Ok(())
270 }
271 QuorumMode::Regions { required } => {
272 let connected = self.connected_regions();
273 let missing: Vec<String> = required
274 .iter()
275 .filter(|r| !connected.contains(*r))
276 .cloned()
277 .collect();
278 if missing.is_empty() {
279 Ok(())
280 } else {
281 Err(QuorumError::MissingRegions { missing })
282 }
283 }
284 }
285 }
286
287 fn count_acked(&self, target_lsn: u64) -> usize {
288 let replicas = self
289 .primary
290 .replicas
291 .read()
292 .unwrap_or_else(|e| e.into_inner());
293 replicas
294 .iter()
295 .filter(|r| r.last_acked_lsn >= target_lsn)
296 .count()
297 }
298
299 fn acked_regions(&self, target_lsn: u64) -> HashSet<String> {
300 let replicas = self
301 .primary
302 .replicas
303 .read()
304 .unwrap_or_else(|e| e.into_inner());
305 let regions = self.regions.read();
306 replicas
307 .iter()
308 .filter(|r| r.last_acked_lsn >= target_lsn)
309 .filter_map(|r| regions.get(&r.id).cloned())
310 .collect()
311 }
312
313 pub fn safe_replay_lsn(&self) -> Option<u64> {
318 let replicas = self
319 .primary
320 .replicas
321 .read()
322 .unwrap_or_else(|e| e.into_inner());
323 replicas.iter().map(|r| r.last_acked_lsn).min()
324 }
325
326 pub fn config(&self) -> &QuorumConfig {
328 &self.config
329 }
330}
331
332#[cfg(test)]
333mod tests {
334 use super::*;
335
336 fn primary() -> Arc<PrimaryReplication> {
337 Arc::new(PrimaryReplication::new(None))
338 }
339
340 #[test]
341 fn async_mode_returns_immediately() {
342 let p = primary();
343 let q = QuorumCoordinator::new(Arc::clone(&p), QuorumConfig::async_commit());
344 assert!(q.wait_for_quorum(42).is_ok());
345 }
346
347 #[test]
348 fn sync_mode_fails_when_too_few_replicas() {
349 let p = primary();
350 let q = QuorumCoordinator::new(Arc::clone(&p), QuorumConfig::sync(2));
351 match q.wait_for_quorum(1) {
353 Err(QuorumError::InsufficientReplicas {
354 required,
355 connected,
356 }) => {
357 assert_eq!(required, 2);
358 assert_eq!(connected, 0);
359 }
360 other => panic!("expected InsufficientReplicas, got {:?}", other),
361 }
362 }
363
364 #[test]
365 fn sync_mode_returns_when_enough_acks() {
366 let p = primary();
367 p.register_replica("r1".to_string());
368 p.register_replica("r2".to_string());
369 p.ack_replica("r1", 10);
370 p.ack_replica("r2", 10);
371 let q = QuorumCoordinator::new(
372 Arc::clone(&p),
373 QuorumConfig::sync(2).with_timeout(Duration::from_millis(500)),
374 );
375 assert!(q.wait_for_quorum(10).is_ok());
376 }
377
378 #[test]
379 fn region_mode_needs_all_regions_acked() {
380 let p = primary();
381 p.register_replica("us_a".to_string());
382 p.register_replica("eu_a".to_string());
383 let q = QuorumCoordinator::new(
384 Arc::clone(&p),
385 QuorumConfig::regions(["us", "eu"]).with_timeout(Duration::from_millis(500)),
386 );
387 q.bind_replica_region("us_a", "us");
388 q.bind_replica_region("eu_a", "eu");
389
390 p.ack_replica("us_a", 50);
392 assert!(!q.has_quorum(50));
393
394 p.ack_replica("eu_a", 50);
396 assert!(q.has_quorum(50));
397 }
398
399 #[test]
400 fn region_mode_rejects_missing_regions_upfront() {
401 let p = primary();
402 p.register_replica("us_a".to_string());
403 let q = QuorumCoordinator::new(
404 Arc::clone(&p),
405 QuorumConfig::regions(["us", "eu"]).with_timeout(Duration::from_millis(500)),
406 );
407 q.bind_replica_region("us_a", "us");
408 match q.wait_for_quorum(1) {
410 Err(QuorumError::MissingRegions { missing }) => {
411 assert_eq!(missing, vec!["eu".to_string()]);
412 }
413 other => panic!("expected MissingRegions, got {:?}", other),
414 }
415 }
416
417 #[test]
418 fn safe_replay_lsn_is_min_across_replicas() {
419 let p = primary();
420 p.register_replica("r1".to_string());
421 p.register_replica("r2".to_string());
422 p.ack_replica("r1", 100);
423 p.ack_replica("r2", 50);
424 let q = QuorumCoordinator::new(Arc::clone(&p), QuorumConfig::async_commit());
425 assert_eq!(q.safe_replay_lsn(), Some(50));
426 }
427}