Skip to main content

reddb_server/replication/
quorum.rs

1//! Quorum-based commit coordination (Phase 2.6 multi-region PG parity).
2//!
3//! The existing `PrimaryReplication` module streams WAL records to every
4//! connected replica but the primary acks the client as soon as the
5//! record hits its own WAL — replicas are eventually-consistent. For
6//! multi-region deployments that's not enough: a datacenter failure
7//! after ack but before replication would drop the write.
8//!
9//! `QuorumCoordinator` sits between the write path and the client ack.
10//! It watches durable replica ACKs on the underlying primary's
11//! `CommitWaiter`
12//! and blocks the caller until the configured quorum of replicas has
13//! durably received the record. Three quorum shapes are supported:
14//!
15//! * **Async** (default, backwards compatible) — ack immediately, don't
16//!   wait for replicas. Same semantics as pre-Phase-2.6 RedDB.
17//! * **Sync(n)** — wait for N replicas (any region) before acking.
18//! * **Regions(set)** — wait for at least one replica from each listed
19//!   region. Survives full-region loss as long as the surviving regions
20//!   were in the required set at write time.
21//!
22//! Crash safety: the primary WAL is already durable before quorum wait
23//! begins, so a coordinator crash doesn't lose the record — it just
24//! means the client never got an ack and must retry idempotently.
25
26use std::collections::HashSet;
27use std::sync::Arc;
28use std::time::Duration;
29
30use super::primary::PrimaryReplication;
31
32/// Quorum mode selected for a replication config.
33#[derive(Debug, Clone, PartialEq, Eq)]
34pub enum QuorumMode {
35    /// Ack the client immediately, propagate asynchronously.
36    /// Loses writes if the primary dies before replicas catch up.
37    Async,
38    /// Wait for `n` replicas (any region) to ack before returning.
39    /// Tolerates `replicas - n` losses.
40    Sync { min_replicas: usize },
41    /// Wait for at least one replica from *each* listed region.
42    /// Survives full-region loss as long as the remaining regions were
43    /// in the required set and have acknowledged the write.
44    Regions { required: HashSet<String> },
45}
46
47/// Quorum configuration stored alongside `ReplicationConfig`.
48#[derive(Debug, Clone, PartialEq, Eq)]
49pub struct QuorumConfig {
50    pub mode: QuorumMode,
51    /// How long the coordinator waits for acks before giving up.
52    /// `None` = wait forever (use for strong consistency only when
53    /// you can tolerate the writer stalling on a partitioned region).
54    pub timeout: Option<Duration>,
55}
56
57impl QuorumConfig {
58    /// Ack immediately. Loss tolerance = 0 under primary failure.
59    pub fn async_commit() -> Self {
60        Self {
61            mode: QuorumMode::Async,
62            timeout: None,
63        }
64    }
65
66    /// Wait for `n` replicas to ack (any region). Typical PG-like
67    /// "synchronous_commit = on, synchronous_standby_names = 'ANY n'".
68    pub fn sync(min_replicas: usize) -> Self {
69        Self {
70            mode: QuorumMode::Sync { min_replicas },
71            timeout: Some(Duration::from_secs(5)),
72        }
73    }
74
75    /// Wait for at least one replica from each region in the set. Use
76    /// this for disaster-recovery deployments across cloud regions.
77    pub fn regions<I, S>(regions: I) -> Self
78    where
79        I: IntoIterator<Item = S>,
80        S: Into<String>,
81    {
82        Self {
83            mode: QuorumMode::Regions {
84                required: regions.into_iter().map(|r| r.into()).collect(),
85            },
86            timeout: Some(Duration::from_secs(10)),
87        }
88    }
89
90    pub fn with_timeout(mut self, timeout: Duration) -> Self {
91        self.timeout = Some(timeout);
92        self
93    }
94
95    pub fn without_timeout(mut self) -> Self {
96        self.timeout = None;
97        self
98    }
99
100    /// Is this config ack-first (no wait)?
101    pub fn is_async(&self) -> bool {
102        matches!(self.mode, QuorumMode::Async)
103    }
104}
105
106impl Default for QuorumConfig {
107    fn default() -> Self {
108        Self::async_commit()
109    }
110}
111
112/// Errors raised by the quorum coordinator. The write itself succeeded
113/// on the primary WAL — these errors signal that replica acknowledgement
114/// did not reach quorum and the caller must decide whether to surface
115/// the failure or continue anyway.
116#[derive(Debug, Clone)]
117pub enum QuorumError {
118    /// Timed out waiting for enough acks. Includes the set of regions
119    /// that had replied (for observability / fallback routing).
120    Timeout {
121        target_lsn: u64,
122        elapsed_ms: u128,
123        acked_regions: HashSet<String>,
124    },
125    /// Not enough replicas are currently connected to ever satisfy the
126    /// configured quorum. Returned immediately (no wait) so the caller
127    /// can fail fast instead of hanging on a timeout.
128    InsufficientReplicas { required: usize, connected: usize },
129    /// Required-regions mode is configured but one or more regions have
130    /// zero connected replicas. Reported up front so the health-check
131    /// layer can alert on "regional partition" before writes stall.
132    MissingRegions { missing: Vec<String> },
133}
134
135impl std::fmt::Display for QuorumError {
136    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
137        match self {
138            QuorumError::Timeout {
139                target_lsn,
140                elapsed_ms,
141                acked_regions,
142            } => write!(
143                f,
144                "quorum timeout after {elapsed_ms}ms waiting for lsn {target_lsn} \
145                 (acked by regions: {:?})",
146                acked_regions
147            ),
148            QuorumError::InsufficientReplicas {
149                required,
150                connected,
151            } => write!(
152                f,
153                "quorum requires {required} replicas but only {connected} connected"
154            ),
155            QuorumError::MissingRegions { missing } => {
156                write!(
157                    f,
158                    "required regions with no connected replicas: {:?}",
159                    missing
160                )
161            }
162        }
163    }
164}
165
166impl std::error::Error for QuorumError {}
167
168/// Tracks per-replica region bindings and pairs them with the primary's
169/// ack map. `PrimaryReplication` owns the WAL buffer + `ReplicaState`
170/// list; this coordinator adds the region dimension and the wait-for-
171/// quorum logic without duplicating the ack table.
172pub struct QuorumCoordinator {
173    primary: Arc<PrimaryReplication>,
174    config: QuorumConfig,
175    /// Map replica_id → region. Populated by `bind_replica_region` when
176    /// a replica connects; queried during quorum evaluation.
177    regions: parking_lot::RwLock<std::collections::HashMap<String, String>>,
178}
179
180impl QuorumCoordinator {
181    pub fn new(primary: Arc<PrimaryReplication>, config: QuorumConfig) -> Self {
182        Self {
183            primary,
184            config,
185            regions: parking_lot::RwLock::new(std::collections::HashMap::new()),
186        }
187    }
188
189    /// Associate a replica with its region. Called by the primary's
190    /// handshake handler when a replica connects — the replica declares
191    /// its region in the handshake payload.
192    pub fn bind_replica_region(&self, replica_id: &str, region: &str) {
193        self.regions
194            .write()
195            .insert(replica_id.to_string(), region.to_string());
196    }
197
198    /// Forget a replica's region binding on disconnect. Safe to call
199    /// repeatedly; no-op if the binding doesn't exist.
200    pub fn unbind_replica(&self, replica_id: &str) {
201        self.regions.write().remove(replica_id);
202    }
203
204    /// Which regions currently have at least one connected replica?
205    pub fn connected_regions(&self) -> HashSet<String> {
206        self.regions.read().values().cloned().collect()
207    }
208
209    /// Wait until the configured quorum has acked `target_lsn`.
210    ///
211    /// Returns `Ok(())` on successful quorum, `Err(QuorumError)` on
212    /// timeout or early-exit validation failures. `Async` mode returns
213    /// immediately — the caller already has the primary WAL confirmation.
214    pub fn wait_for_quorum(&self, target_lsn: u64) -> Result<(), QuorumError> {
215        if self.config.is_async() {
216            return Ok(());
217        }
218
219        // Early validation: can we ever satisfy this quorum?
220        self.validate_preconditions()?;
221
222        let timeout = self.config.timeout;
223        let start = std::time::Instant::now();
224        let reached = match &self.config.mode {
225            QuorumMode::Async => true,
226            QuorumMode::Sync { min_replicas } => self
227                .primary
228                .commit_waiter
229                .wait_for_commit_watermark(target_lsn, *min_replicas as u32, timeout),
230            QuorumMode::Regions { .. } => self
231                .primary
232                .commit_waiter
233                .wait_for_change_until(timeout, || self.has_quorum(target_lsn)),
234        };
235        if reached {
236            return Ok(());
237        }
238        Err(QuorumError::Timeout {
239            target_lsn,
240            elapsed_ms: start.elapsed().as_millis(),
241            acked_regions: self.acked_regions(target_lsn),
242        })
243    }
244
245    /// Fast-check the quorum predicate without waiting. Returns true
246    /// when the current ack map already satisfies the quorum for
247    /// `target_lsn`.
248    pub fn has_quorum(&self, target_lsn: u64) -> bool {
249        match &self.config.mode {
250            QuorumMode::Async => true,
251            QuorumMode::Sync { min_replicas } => {
252                self.primary
253                    .commit_waiter
254                    .commit_watermark(*min_replicas as u32)
255                    >= target_lsn
256            }
257            QuorumMode::Regions { required } => {
258                let acked = self.acked_regions(target_lsn);
259                required.iter().all(|r| acked.contains(r))
260            }
261        }
262    }
263
264    fn validate_preconditions(&self) -> Result<(), QuorumError> {
265        match &self.config.mode {
266            QuorumMode::Async => Ok(()),
267            QuorumMode::Sync { min_replicas } => {
268                let connected = self.primary.replica_count();
269                if connected < *min_replicas {
270                    return Err(QuorumError::InsufficientReplicas {
271                        required: *min_replicas,
272                        connected,
273                    });
274                }
275                Ok(())
276            }
277            QuorumMode::Regions { required } => {
278                let connected = self.connected_regions();
279                let missing: Vec<String> = required
280                    .iter()
281                    .filter(|r| !connected.contains(*r))
282                    .cloned()
283                    .collect();
284                if missing.is_empty() {
285                    Ok(())
286                } else {
287                    Err(QuorumError::MissingRegions { missing })
288                }
289            }
290        }
291    }
292
293    fn acked_regions(&self, target_lsn: u64) -> HashSet<String> {
294        let replicas = self
295            .primary
296            .replicas
297            .read()
298            .unwrap_or_else(|e| e.into_inner());
299        let regions = self.regions.read();
300        replicas
301            .iter()
302            .filter(|r| r.last_durable_lsn >= target_lsn)
303            .filter_map(|r| regions.get(&r.id).cloned())
304            .collect()
305    }
306
307    /// Highest LSN durable on the configured quorum shape.
308    pub fn commit_watermark(&self) -> u64 {
309        match &self.config.mode {
310            QuorumMode::Async => 0,
311            QuorumMode::Sync { min_replicas } => self
312                .primary
313                .commit_waiter
314                .commit_watermark(*min_replicas as u32),
315            QuorumMode::Regions { required } => self.region_commit_watermark(required),
316        }
317    }
318
319    fn region_commit_watermark(&self, required: &HashSet<String>) -> u64 {
320        if required.is_empty() {
321            return 0;
322        }
323        let replicas = self
324            .primary
325            .replicas
326            .read()
327            .unwrap_or_else(|e| e.into_inner());
328        let regions = self.regions.read();
329        let mut watermark = u64::MAX;
330        for required_region in required {
331            let Some(region_lsn) = replicas
332                .iter()
333                .filter(|r| regions.get(&r.id) == Some(required_region))
334                .map(|r| r.last_durable_lsn)
335                .max()
336            else {
337                return 0;
338            };
339            watermark = watermark.min(region_lsn);
340        }
341        watermark
342    }
343
344    /// Minimum LSN across all connected replicas — the "safe replay"
345    /// watermark. Any WAL segment whose records are all `<= this` can
346    /// be pruned from the primary's spool without losing any replica's
347    /// ability to catch up.
348    pub fn safe_replay_lsn(&self) -> Option<u64> {
349        let replicas = self
350            .primary
351            .replicas
352            .read()
353            .unwrap_or_else(|e| e.into_inner());
354        replicas.iter().map(|r| r.last_durable_lsn).min()
355    }
356
357    /// Config accessor.
358    pub fn config(&self) -> &QuorumConfig {
359        &self.config
360    }
361}
362
363#[cfg(test)]
364mod tests {
365    use super::*;
366
367    fn primary() -> Arc<PrimaryReplication> {
368        Arc::new(PrimaryReplication::new(None))
369    }
370
371    #[test]
372    fn async_mode_returns_immediately() {
373        let p = primary();
374        let q = QuorumCoordinator::new(Arc::clone(&p), QuorumConfig::async_commit());
375        assert!(q.wait_for_quorum(42).is_ok());
376    }
377
378    #[test]
379    fn sync_mode_fails_when_too_few_replicas() {
380        let p = primary();
381        let q = QuorumCoordinator::new(Arc::clone(&p), QuorumConfig::sync(2));
382        // No replicas connected → InsufficientReplicas immediately.
383        match q.wait_for_quorum(1) {
384            Err(QuorumError::InsufficientReplicas {
385                required,
386                connected,
387            }) => {
388                assert_eq!(required, 2);
389                assert_eq!(connected, 0);
390            }
391            other => panic!("expected InsufficientReplicas, got {:?}", other),
392        }
393    }
394
395    #[test]
396    fn sync_mode_returns_when_enough_acks() {
397        let p = primary();
398        p.register_replica("r1".to_string());
399        p.register_replica("r2".to_string());
400        p.ack_replica("r1", 10);
401        p.ack_replica("r2", 10);
402        let q = QuorumCoordinator::new(
403            Arc::clone(&p),
404            QuorumConfig::sync(2).with_timeout(Duration::from_millis(500)),
405        );
406        assert!(q.wait_for_quorum(10).is_ok());
407    }
408
409    #[test]
410    fn region_mode_needs_all_regions_acked() {
411        let p = primary();
412        p.register_replica("us_a".to_string());
413        p.register_replica("eu_a".to_string());
414        let q = QuorumCoordinator::new(
415            Arc::clone(&p),
416            QuorumConfig::regions(["us", "eu"]).with_timeout(Duration::from_millis(500)),
417        );
418        q.bind_replica_region("us_a", "us");
419        q.bind_replica_region("eu_a", "eu");
420
421        // Only us has acked → not enough.
422        p.ack_replica("us_a", 50);
423        assert!(!q.has_quorum(50));
424
425        // Both acked → quorum satisfied.
426        p.ack_replica("eu_a", 50);
427        assert!(q.has_quorum(50));
428    }
429
430    #[test]
431    fn region_mode_requires_durable_lsn_for_watermark() {
432        let p = primary();
433        p.register_replica("us_a".to_string());
434        p.register_replica("eu_a".to_string());
435        let q = QuorumCoordinator::new(
436            Arc::clone(&p),
437            QuorumConfig::regions(["us", "eu"]).with_timeout(Duration::from_millis(50)),
438        );
439        q.bind_replica_region("us_a", "us");
440        q.bind_replica_region("eu_a", "eu");
441
442        p.ack_replica_lsn("us_a", 50, 50);
443        p.ack_replica_lsn("eu_a", 50, 40);
444        assert!(!q.has_quorum(50));
445        assert_eq!(q.commit_watermark(), 40);
446
447        p.ack_replica_lsn("eu_a", 50, 50);
448        assert!(q.has_quorum(50));
449        assert_eq!(q.commit_watermark(), 50);
450    }
451
452    #[test]
453    fn region_mode_wait_wakes_on_durable_ack() {
454        let p = primary();
455        p.register_replica("us_a".to_string());
456        p.register_replica("eu_a".to_string());
457        let q = Arc::new(QuorumCoordinator::new(
458            Arc::clone(&p),
459            QuorumConfig::regions(["us", "eu"]).with_timeout(Duration::from_secs(1)),
460        ));
461        q.bind_replica_region("us_a", "us");
462        q.bind_replica_region("eu_a", "eu");
463
464        let waiter = Arc::clone(&q);
465        let handle = std::thread::spawn(move || waiter.wait_for_quorum(75));
466        std::thread::sleep(Duration::from_millis(20));
467        p.ack_replica_lsn("us_a", 75, 75);
468        p.ack_replica_lsn("eu_a", 75, 75);
469
470        handle
471            .join()
472            .expect("waiter thread")
473            .expect("quorum should release after durable acks");
474    }
475
476    #[test]
477    fn region_mode_rejects_missing_regions_upfront() {
478        let p = primary();
479        p.register_replica("us_a".to_string());
480        let q = QuorumCoordinator::new(
481            Arc::clone(&p),
482            QuorumConfig::regions(["us", "eu"]).with_timeout(Duration::from_millis(500)),
483        );
484        q.bind_replica_region("us_a", "us");
485        // No replica in "eu" → MissingRegions at validate time.
486        match q.wait_for_quorum(1) {
487            Err(QuorumError::MissingRegions { missing }) => {
488                assert_eq!(missing, vec!["eu".to_string()]);
489            }
490            other => panic!("expected MissingRegions, got {:?}", other),
491        }
492    }
493
494    #[test]
495    fn safe_replay_lsn_is_min_across_replicas() {
496        let p = primary();
497        p.register_replica("r1".to_string());
498        p.register_replica("r2".to_string());
499        p.ack_replica("r1", 100);
500        p.ack_replica("r2", 50);
501        let q = QuorumCoordinator::new(Arc::clone(&p), QuorumConfig::async_commit());
502        assert_eq!(q.safe_replay_lsn(), Some(50));
503    }
504}