Skip to main content

reddb_server/replication/
commit_policy.rs

1//! Primary commit policies (PLAN.md Phase 11.4).
2//!
3//! `Local` — commit returns after the local WAL is durable (default;
4//! current behaviour). No replica involvement at commit time.
5//!
6//! `RemoteWal` — commit returns after the WAL segment containing the
7//! transaction has been archived to the remote backend. Bounds
8//! durability to "survives a single-node loss as long as the remote
9//! is reachable".
10//!
11//! `AckN(n)` — commit returns after `n` replicas have ack'd the
12//! transaction's LSN via `ack_replica_lsn`. `n=0` is equivalent to
13//! `Local`. The primary blocks the commit response until the count
14//! is met or `RED_REPLICATION_ACK_TIMEOUT_MS` elapses.
15//!
16//! `Quorum` — future policy backed by `QuorumConfig` once quorum
17//! coordination is wired into the commit path. For now this is a
18//! marker enum value; the runtime falls back to `Local` semantics
19//! and emits a warning at boot when set.
20//!
21//! In this sprint only the enum + parsing + observability are wired.
22//! Actually blocking commits on `RemoteWal` / `AckN` / `Quorum` is
23//! out of scope; the write path still returns after local durability
24//! regardless of the configured policy. See PLAN.md 11.4 "default v1
25//! behavior remains `local`".
26
27#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
28pub enum CommitPolicy {
29    #[default]
30    Local,
31    RemoteWal,
32    AckN(u32),
33    Quorum,
34}
35
36impl CommitPolicy {
37    pub fn label(self) -> &'static str {
38        match self {
39            Self::Local => "local",
40            Self::RemoteWal => "remote_wal",
41            Self::AckN(_) => "ack_n",
42            Self::Quorum => "quorum",
43        }
44    }
45
46    /// Parse from `RED_PRIMARY_COMMIT_POLICY` env var. Accepts:
47    /// `local` (default), `remote_wal`, `ack_n=N` (decimal),
48    /// `quorum`. Unknown values fall back to `Local` with a warning.
49    pub fn from_env() -> Self {
50        match std::env::var("RED_PRIMARY_COMMIT_POLICY").ok() {
51            Some(raw) => Self::parse(raw.trim()),
52            None => Self::Local,
53        }
54    }
55
56    pub fn parse(raw: &str) -> Self {
57        let lower = raw.to_ascii_lowercase();
58        if lower == "local" || lower.is_empty() {
59            return Self::Local;
60        }
61        if lower == "remote_wal" {
62            return Self::RemoteWal;
63        }
64        if lower == "quorum" {
65            return Self::Quorum;
66        }
67        if let Some(n_str) = lower.strip_prefix("ack_n=") {
68            if let Ok(n) = n_str.parse::<u32>() {
69                return Self::AckN(n);
70            }
71        }
72        tracing::warn!(
73            target: "reddb::replication::commit_policy",
74            value = %raw,
75            "unknown RED_PRIMARY_COMMIT_POLICY; falling back to local"
76        );
77        Self::Local
78    }
79}
80
81#[cfg(test)]
82mod tests {
83    use super::*;
84
85    #[test]
86    fn default_is_local() {
87        assert_eq!(CommitPolicy::default(), CommitPolicy::Local);
88    }
89
90    #[test]
91    fn parse_known_values() {
92        assert_eq!(CommitPolicy::parse("local"), CommitPolicy::Local);
93        assert_eq!(CommitPolicy::parse("LOCAL"), CommitPolicy::Local);
94        assert_eq!(CommitPolicy::parse("remote_wal"), CommitPolicy::RemoteWal);
95        assert_eq!(CommitPolicy::parse("quorum"), CommitPolicy::Quorum);
96        assert_eq!(CommitPolicy::parse("ack_n=3"), CommitPolicy::AckN(3));
97        assert_eq!(CommitPolicy::parse("ack_n=0"), CommitPolicy::AckN(0));
98    }
99
100    #[test]
101    fn parse_unknown_falls_back_to_local() {
102        assert_eq!(CommitPolicy::parse("nonsense"), CommitPolicy::Local);
103        assert_eq!(CommitPolicy::parse("ack_n=abc"), CommitPolicy::Local);
104        assert_eq!(CommitPolicy::parse(""), CommitPolicy::Local);
105    }
106
107    #[test]
108    fn label_round_trips_known_values() {
109        assert_eq!(CommitPolicy::Local.label(), "local");
110        assert_eq!(CommitPolicy::RemoteWal.label(), "remote_wal");
111        assert_eq!(CommitPolicy::AckN(5).label(), "ack_n");
112        assert_eq!(CommitPolicy::Quorum.label(), "quorum");
113    }
114}