Skip to main content

reddb_server/replication/
quorum.rs

1//! Quorum-based commit coordination (Phase 2.6 multi-region PG parity).
2//!
3//! The existing `PrimaryReplication` module streams WAL records to every
4//! connected replica but the primary acks the client as soon as the
5//! record hits its own WAL — replicas are eventually-consistent. For
6//! multi-region deployments that's not enough: a datacenter failure
7//! after ack but before replication would drop the write.
8//!
9//! `QuorumCoordinator` sits between the write path and the client ack.
10//! It watches `ReplicaState::last_acked_lsn` on the underlying primary
11//! and blocks the caller until the configured quorum of replicas has
12//! durably received the record. Three quorum shapes are supported:
13//!
14//! * **Async** (default, backwards compatible) — ack immediately, don't
15//!   wait for replicas. Same semantics as pre-Phase-2.6 RedDB.
16//! * **Sync(n)** — wait for N replicas (any region) before acking.
17//! * **Regions(set)** — wait for at least one replica from each listed
18//!   region. Survives full-region loss as long as the surviving regions
19//!   were in the required set at write time.
20//!
21//! Crash safety: the primary WAL is already durable before quorum wait
22//! begins, so a coordinator crash doesn't lose the record — it just
23//! means the client never got an ack and must retry idempotently.
24
25use std::collections::HashSet;
26use std::sync::Arc;
27use std::time::{Duration, Instant};
28
29use super::primary::PrimaryReplication;
30
31/// Quorum mode selected for a replication config.
32#[derive(Debug, Clone, PartialEq, Eq)]
33pub enum QuorumMode {
34    /// Ack the client immediately, propagate asynchronously.
35    /// Loses writes if the primary dies before replicas catch up.
36    Async,
37    /// Wait for `n` replicas (any region) to ack before returning.
38    /// Tolerates `replicas - n` losses.
39    Sync { min_replicas: usize },
40    /// Wait for at least one replica from *each* listed region.
41    /// Survives full-region loss as long as the remaining regions were
42    /// in the required set and have acknowledged the write.
43    Regions { required: HashSet<String> },
44}
45
46/// Quorum configuration stored alongside `ReplicationConfig`.
47#[derive(Debug, Clone, PartialEq, Eq)]
48pub struct QuorumConfig {
49    pub mode: QuorumMode,
50    /// How long the coordinator waits for acks before giving up.
51    /// `None` = wait forever (use for strong consistency only when
52    /// you can tolerate the writer stalling on a partitioned region).
53    pub timeout: Option<Duration>,
54}
55
56impl QuorumConfig {
57    /// Ack immediately. Loss tolerance = 0 under primary failure.
58    pub fn async_commit() -> Self {
59        Self {
60            mode: QuorumMode::Async,
61            timeout: None,
62        }
63    }
64
65    /// Wait for `n` replicas to ack (any region). Typical PG-like
66    /// "synchronous_commit = on, synchronous_standby_names = 'ANY n'".
67    pub fn sync(min_replicas: usize) -> Self {
68        Self {
69            mode: QuorumMode::Sync { min_replicas },
70            timeout: Some(Duration::from_secs(5)),
71        }
72    }
73
74    /// Wait for at least one replica from each region in the set. Use
75    /// this for disaster-recovery deployments across cloud regions.
76    pub fn regions<I, S>(regions: I) -> Self
77    where
78        I: IntoIterator<Item = S>,
79        S: Into<String>,
80    {
81        Self {
82            mode: QuorumMode::Regions {
83                required: regions.into_iter().map(|r| r.into()).collect(),
84            },
85            timeout: Some(Duration::from_secs(10)),
86        }
87    }
88
89    pub fn with_timeout(mut self, timeout: Duration) -> Self {
90        self.timeout = Some(timeout);
91        self
92    }
93
94    pub fn without_timeout(mut self) -> Self {
95        self.timeout = None;
96        self
97    }
98
99    /// Is this config ack-first (no wait)?
100    pub fn is_async(&self) -> bool {
101        matches!(self.mode, QuorumMode::Async)
102    }
103}
104
105impl Default for QuorumConfig {
106    fn default() -> Self {
107        Self::async_commit()
108    }
109}
110
111/// Errors raised by the quorum coordinator. The write itself succeeded
112/// on the primary WAL — these errors signal that replica acknowledgement
113/// did not reach quorum and the caller must decide whether to surface
114/// the failure or continue anyway.
115#[derive(Debug, Clone)]
116pub enum QuorumError {
117    /// Timed out waiting for enough acks. Includes the set of regions
118    /// that had replied (for observability / fallback routing).
119    Timeout {
120        target_lsn: u64,
121        elapsed_ms: u128,
122        acked_regions: HashSet<String>,
123    },
124    /// Not enough replicas are currently connected to ever satisfy the
125    /// configured quorum. Returned immediately (no wait) so the caller
126    /// can fail fast instead of hanging on a timeout.
127    InsufficientReplicas { required: usize, connected: usize },
128    /// Required-regions mode is configured but one or more regions have
129    /// zero connected replicas. Reported up front so the health-check
130    /// layer can alert on "regional partition" before writes stall.
131    MissingRegions { missing: Vec<String> },
132}
133
134impl std::fmt::Display for QuorumError {
135    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
136        match self {
137            QuorumError::Timeout {
138                target_lsn,
139                elapsed_ms,
140                acked_regions,
141            } => write!(
142                f,
143                "quorum timeout after {elapsed_ms}ms waiting for lsn {target_lsn} \
144                 (acked by regions: {:?})",
145                acked_regions
146            ),
147            QuorumError::InsufficientReplicas {
148                required,
149                connected,
150            } => write!(
151                f,
152                "quorum requires {required} replicas but only {connected} connected"
153            ),
154            QuorumError::MissingRegions { missing } => {
155                write!(
156                    f,
157                    "required regions with no connected replicas: {:?}",
158                    missing
159                )
160            }
161        }
162    }
163}
164
165impl std::error::Error for QuorumError {}
166
167/// Tracks per-replica region bindings and pairs them with the primary's
168/// ack map. `PrimaryReplication` owns the WAL buffer + `ReplicaState`
169/// list; this coordinator adds the region dimension and the wait-for-
170/// quorum logic without duplicating the ack table.
171pub struct QuorumCoordinator {
172    primary: Arc<PrimaryReplication>,
173    config: QuorumConfig,
174    /// Map replica_id → region. Populated by `bind_replica_region` when
175    /// a replica connects; queried during quorum evaluation.
176    regions: parking_lot::RwLock<std::collections::HashMap<String, String>>,
177}
178
179impl QuorumCoordinator {
180    pub fn new(primary: Arc<PrimaryReplication>, config: QuorumConfig) -> Self {
181        Self {
182            primary,
183            config,
184            regions: parking_lot::RwLock::new(std::collections::HashMap::new()),
185        }
186    }
187
188    /// Associate a replica with its region. Called by the primary's
189    /// handshake handler when a replica connects — the replica declares
190    /// its region in the handshake payload.
191    pub fn bind_replica_region(&self, replica_id: &str, region: &str) {
192        self.regions
193            .write()
194            .insert(replica_id.to_string(), region.to_string());
195    }
196
197    /// Forget a replica's region binding on disconnect. Safe to call
198    /// repeatedly; no-op if the binding doesn't exist.
199    pub fn unbind_replica(&self, replica_id: &str) {
200        self.regions.write().remove(replica_id);
201    }
202
203    /// Which regions currently have at least one connected replica?
204    pub fn connected_regions(&self) -> HashSet<String> {
205        self.regions.read().values().cloned().collect()
206    }
207
208    /// Wait until the configured quorum has acked `target_lsn`.
209    ///
210    /// Returns `Ok(())` on successful quorum, `Err(QuorumError)` on
211    /// timeout or early-exit validation failures. `Async` mode returns
212    /// immediately — the caller already has the primary WAL confirmation.
213    pub fn wait_for_quorum(&self, target_lsn: u64) -> Result<(), QuorumError> {
214        if self.config.is_async() {
215            return Ok(());
216        }
217
218        // Early validation: can we ever satisfy this quorum?
219        self.validate_preconditions()?;
220
221        let start = Instant::now();
222        let timeout = self.config.timeout;
223        loop {
224            if self.has_quorum(target_lsn) {
225                return Ok(());
226            }
227            if let Some(limit) = timeout {
228                if start.elapsed() >= limit {
229                    return Err(QuorumError::Timeout {
230                        target_lsn,
231                        elapsed_ms: start.elapsed().as_millis(),
232                        acked_regions: self.acked_regions(target_lsn),
233                    });
234                }
235            }
236            // Poll interval matches ReplicationConfig::poll_interval_ms
237            // default (100ms). The coordinator doesn't get woken by
238            // ack_replica directly today — a future revision adds a
239            // condvar on ReplicaState. For Phase 2.6 polling is fine.
240            std::thread::sleep(Duration::from_millis(25));
241        }
242    }
243
244    /// Fast-check the quorum predicate without waiting. Returns true
245    /// when the current ack map already satisfies the quorum for
246    /// `target_lsn`.
247    pub fn has_quorum(&self, target_lsn: u64) -> bool {
248        match &self.config.mode {
249            QuorumMode::Async => true,
250            QuorumMode::Sync { min_replicas } => self.count_acked(target_lsn) >= *min_replicas,
251            QuorumMode::Regions { required } => {
252                let acked = self.acked_regions(target_lsn);
253                required.iter().all(|r| acked.contains(r))
254            }
255        }
256    }
257
258    fn validate_preconditions(&self) -> Result<(), QuorumError> {
259        match &self.config.mode {
260            QuorumMode::Async => Ok(()),
261            QuorumMode::Sync { min_replicas } => {
262                let connected = self.primary.replica_count();
263                if connected < *min_replicas {
264                    return Err(QuorumError::InsufficientReplicas {
265                        required: *min_replicas,
266                        connected,
267                    });
268                }
269                Ok(())
270            }
271            QuorumMode::Regions { required } => {
272                let connected = self.connected_regions();
273                let missing: Vec<String> = required
274                    .iter()
275                    .filter(|r| !connected.contains(*r))
276                    .cloned()
277                    .collect();
278                if missing.is_empty() {
279                    Ok(())
280                } else {
281                    Err(QuorumError::MissingRegions { missing })
282                }
283            }
284        }
285    }
286
287    fn count_acked(&self, target_lsn: u64) -> usize {
288        let replicas = self
289            .primary
290            .replicas
291            .read()
292            .unwrap_or_else(|e| e.into_inner());
293        replicas
294            .iter()
295            .filter(|r| r.last_acked_lsn >= target_lsn)
296            .count()
297    }
298
299    fn acked_regions(&self, target_lsn: u64) -> HashSet<String> {
300        let replicas = self
301            .primary
302            .replicas
303            .read()
304            .unwrap_or_else(|e| e.into_inner());
305        let regions = self.regions.read();
306        replicas
307            .iter()
308            .filter(|r| r.last_acked_lsn >= target_lsn)
309            .filter_map(|r| regions.get(&r.id).cloned())
310            .collect()
311    }
312
313    /// Minimum LSN across all connected replicas — the "safe replay"
314    /// watermark. Any WAL segment whose records are all `<= this` can
315    /// be pruned from the primary's spool without losing any replica's
316    /// ability to catch up.
317    pub fn safe_replay_lsn(&self) -> Option<u64> {
318        let replicas = self
319            .primary
320            .replicas
321            .read()
322            .unwrap_or_else(|e| e.into_inner());
323        replicas.iter().map(|r| r.last_acked_lsn).min()
324    }
325
326    /// Config accessor.
327    pub fn config(&self) -> &QuorumConfig {
328        &self.config
329    }
330}
331
332#[cfg(test)]
333mod tests {
334    use super::*;
335
336    fn primary() -> Arc<PrimaryReplication> {
337        Arc::new(PrimaryReplication::new(None))
338    }
339
340    #[test]
341    fn async_mode_returns_immediately() {
342        let p = primary();
343        let q = QuorumCoordinator::new(Arc::clone(&p), QuorumConfig::async_commit());
344        assert!(q.wait_for_quorum(42).is_ok());
345    }
346
347    #[test]
348    fn sync_mode_fails_when_too_few_replicas() {
349        let p = primary();
350        let q = QuorumCoordinator::new(Arc::clone(&p), QuorumConfig::sync(2));
351        // No replicas connected → InsufficientReplicas immediately.
352        match q.wait_for_quorum(1) {
353            Err(QuorumError::InsufficientReplicas {
354                required,
355                connected,
356            }) => {
357                assert_eq!(required, 2);
358                assert_eq!(connected, 0);
359            }
360            other => panic!("expected InsufficientReplicas, got {:?}", other),
361        }
362    }
363
364    #[test]
365    fn sync_mode_returns_when_enough_acks() {
366        let p = primary();
367        p.register_replica("r1".to_string());
368        p.register_replica("r2".to_string());
369        p.ack_replica("r1", 10);
370        p.ack_replica("r2", 10);
371        let q = QuorumCoordinator::new(
372            Arc::clone(&p),
373            QuorumConfig::sync(2).with_timeout(Duration::from_millis(500)),
374        );
375        assert!(q.wait_for_quorum(10).is_ok());
376    }
377
378    #[test]
379    fn region_mode_needs_all_regions_acked() {
380        let p = primary();
381        p.register_replica("us_a".to_string());
382        p.register_replica("eu_a".to_string());
383        let q = QuorumCoordinator::new(
384            Arc::clone(&p),
385            QuorumConfig::regions(["us", "eu"]).with_timeout(Duration::from_millis(500)),
386        );
387        q.bind_replica_region("us_a", "us");
388        q.bind_replica_region("eu_a", "eu");
389
390        // Only us has acked → not enough.
391        p.ack_replica("us_a", 50);
392        assert!(!q.has_quorum(50));
393
394        // Both acked → quorum satisfied.
395        p.ack_replica("eu_a", 50);
396        assert!(q.has_quorum(50));
397    }
398
399    #[test]
400    fn region_mode_rejects_missing_regions_upfront() {
401        let p = primary();
402        p.register_replica("us_a".to_string());
403        let q = QuorumCoordinator::new(
404            Arc::clone(&p),
405            QuorumConfig::regions(["us", "eu"]).with_timeout(Duration::from_millis(500)),
406        );
407        q.bind_replica_region("us_a", "us");
408        // No replica in "eu" → MissingRegions at validate time.
409        match q.wait_for_quorum(1) {
410            Err(QuorumError::MissingRegions { missing }) => {
411                assert_eq!(missing, vec!["eu".to_string()]);
412            }
413            other => panic!("expected MissingRegions, got {:?}", other),
414        }
415    }
416
417    #[test]
418    fn safe_replay_lsn_is_min_across_replicas() {
419        let p = primary();
420        p.register_replica("r1".to_string());
421        p.register_replica("r2".to_string());
422        p.ack_replica("r1", 100);
423        p.ack_replica("r2", 50);
424        let q = QuorumCoordinator::new(Arc::clone(&p), QuorumConfig::async_commit());
425        assert_eq!(q.safe_replay_lsn(), Some(50));
426    }
427}