Skip to main content

reddb_server/replication/
commit_policy.rs

1//! Primary commit policies (PLAN.md Phase 11.4).
2//!
3//! `Local` — commit returns after the local WAL is durable (default;
4//! current behaviour). No replica involvement at commit time.
5//!
6//! `RemoteWal` — commit returns after the WAL segment containing the
7//! transaction has been archived to the remote backend. Bounds
8//! durability to "survives a single-node loss as long as the remote
9//! is reachable".
10//!
11//! `AckN(n)` — commit returns after `n` replicas have ack'd the
12//! transaction's LSN via `ack_replica_lsn`. `n=0` is equivalent to
13//! `Local`. The primary blocks the commit response until the count
14//! is met or `RED_REPLICATION_ACK_TIMEOUT_MS` elapses.
15//!
16//! `Quorum` — commit returns after the configured `QuorumConfig`
17//! reaches its durable commit watermark.
18//!
19//! `RemoteWal` is parsed for observability but does not block the
20//! write path yet. The default remains `Local`.
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
23pub enum CommitPolicy {
24    #[default]
25    Local,
26    RemoteWal,
27    AckN(u32),
28    Quorum,
29}
30
31impl CommitPolicy {
32    pub fn label(self) -> &'static str {
33        match self {
34            Self::Local => "local",
35            Self::RemoteWal => "remote_wal",
36            Self::AckN(_) => "ack_n",
37            Self::Quorum => "quorum",
38        }
39    }
40
41    /// Parse from `RED_PRIMARY_COMMIT_POLICY` env var. Accepts:
42    /// `local` (default), `remote_wal`, `ack_n=N` (decimal),
43    /// `quorum`. Unknown values fall back to `Local` with a warning.
44    pub fn from_env() -> Self {
45        match std::env::var("RED_PRIMARY_COMMIT_POLICY").ok() {
46            Some(raw) => Self::parse(raw.trim()),
47            None => Self::Local,
48        }
49    }
50
51    pub fn parse(raw: &str) -> Self {
52        let lower = raw.to_ascii_lowercase();
53        if lower == "local" || lower.is_empty() {
54            return Self::Local;
55        }
56        if lower == "remote_wal" {
57            return Self::RemoteWal;
58        }
59        if lower == "quorum" {
60            return Self::Quorum;
61        }
62        if let Some(n_str) = lower.strip_prefix("ack_n=") {
63            if let Ok(n) = n_str.parse::<u32>() {
64                return Self::AckN(n);
65            }
66        }
67        tracing::warn!(
68            target: "reddb::replication::commit_policy",
69            value = %raw,
70            "unknown RED_PRIMARY_COMMIT_POLICY; falling back to local"
71        );
72        Self::Local
73    }
74}
75
76#[cfg(test)]
77mod tests {
78    use super::*;
79
80    #[test]
81    fn default_is_local() {
82        assert_eq!(CommitPolicy::default(), CommitPolicy::Local);
83    }
84
85    #[test]
86    fn parse_known_values() {
87        assert_eq!(CommitPolicy::parse("local"), CommitPolicy::Local);
88        assert_eq!(CommitPolicy::parse("LOCAL"), CommitPolicy::Local);
89        assert_eq!(CommitPolicy::parse("remote_wal"), CommitPolicy::RemoteWal);
90        assert_eq!(CommitPolicy::parse("quorum"), CommitPolicy::Quorum);
91        assert_eq!(CommitPolicy::parse("ack_n=3"), CommitPolicy::AckN(3));
92        assert_eq!(CommitPolicy::parse("ack_n=0"), CommitPolicy::AckN(0));
93    }
94
95    #[test]
96    fn parse_unknown_falls_back_to_local() {
97        assert_eq!(CommitPolicy::parse("nonsense"), CommitPolicy::Local);
98        assert_eq!(CommitPolicy::parse("ack_n=abc"), CommitPolicy::Local);
99        assert_eq!(CommitPolicy::parse(""), CommitPolicy::Local);
100    }
101
102    #[test]
103    fn label_round_trips_known_values() {
104        assert_eq!(CommitPolicy::Local.label(), "local");
105        assert_eq!(CommitPolicy::RemoteWal.label(), "remote_wal");
106        assert_eq!(CommitPolicy::AckN(5).label(), "ack_n");
107        assert_eq!(CommitPolicy::Quorum.label(), "quorum");
108    }
109}