Skip to main content

melin_server_runtime/
durability_policy.rs

1//! Operator-facing durability mode.
2//!
3//! The generic policy types — [`Level`], [`Clause`], [`Policy`],
4//! [`CursorView`], [`EvalStatus`], [`PolicyError`], [`MAX_CLUSTER_SIZE`]
5//! — live in `melin_transport_core::durability_policy`. They're
6//! re-exported here so existing call sites (`crate::durability_policy::*`)
7//! keep working, and the response stage's ack gate is built on them.
8//!
9//! What this module owns is the *operator surface*: a small enum that
10//! exposes three named modes (`local`, `hybrid`, `durably-replicated`)
11//! via `--durability-mode`, plus the mapping from each mode to the
12//! underlying clause list. The set of modes is exchange-server policy,
13//! not a transport-core concern, so it lives here.
14
15use std::fmt;
16
17pub use melin_transport_core::durability_policy::{
18    Clause, CursorView, EvalStatus, Level, MAX_CLUSTER_SIZE, Policy, PolicyError,
19};
20
21/// Operator-facing durability mode. Each variant maps to one of three
22/// named policies that compose the underlying [`Clause`] list directly
23/// in code, replacing the legacy `--durability-policy <STRING>` DSL.
24/// See `docs/replication.md` for the three-tier menu in operational
25/// terms.
26///
27/// `clap::ValueEnum` derives `--durability-mode <local|hybrid|durably-replicated>`.
28#[derive(Debug, Clone, Copy, PartialEq, Eq, clap::ValueEnum)]
29pub enum DurabilityMode {
30    /// `persisted>=1`. Single-node durability — the primary's
31    /// PLP-backed NVMe write is the only confirmation needed.
32    /// Required when running with `--standalone`; appropriate for
33    /// dev/staging deployments without a replica.
34    Local,
35
36    /// `persisted>=1 && in_memory>=2`. One durable copy on the
37    /// primary's disk plus an in-memory ack from a second node.
38    /// Single-failure-safe with a brief RAM-only window (~80 µs on
39    /// PLP-backed NVMe) for the secondary copy. The default — typical
40    /// live trading deployments. Saves ~50–80 µs per fill vs
41    /// [`DurablyReplicated`](Self::DurablyReplicated). Fails closed
42    /// when no replica is connected.
43    Hybrid,
44
45    /// `persisted>=2`. Two durable copies before client ack. Zero
46    /// RAM-only window; the gate stalls if no replica is currently
47    /// connected. Compliance-driven venues.
48    DurablyReplicated,
49}
50
51impl DurabilityMode {
52    /// Build the underlying [`Policy`] for this mode. Every variant's
53    /// clause list is hand-constructed from in-range counts, so
54    /// [`Policy::new`] cannot fail — any regression would surface in
55    /// the unit tests below.
56    pub fn to_policy(self) -> Policy {
57        let clauses = match self {
58            DurabilityMode::Local => vec![Clause {
59                count: 1,
60                level: Level::Persisted,
61            }],
62            DurabilityMode::Hybrid => vec![
63                Clause {
64                    count: 1,
65                    level: Level::Persisted,
66                },
67                Clause {
68                    count: 2,
69                    level: Level::InMemory,
70                },
71            ],
72            DurabilityMode::DurablyReplicated => vec![Clause {
73                count: 2,
74                level: Level::Persisted,
75            }],
76        };
77        Policy::new(clauses)
78            .expect("DurabilityMode::to_policy: hand-constructed clauses must validate")
79    }
80
81    /// CLI / log-friendly name. Matches the `clap::ValueEnum`
82    /// kebab-cased spelling.
83    pub fn as_str(self) -> &'static str {
84        match self {
85            DurabilityMode::Local => "local",
86            DurabilityMode::Hybrid => "hybrid",
87            DurabilityMode::DurablyReplicated => "durably-replicated",
88        }
89    }
90
91    /// Parse the admin-channel / CLI wire spelling. Accepts the same
92    /// kebab-cased strings [`as_str`](Self::as_str) emits so operators
93    /// only have to learn one vocabulary across `--durability-mode`
94    /// and the admin `DURABILITY` command.
95    pub fn parse(s: &str) -> Option<Self> {
96        match s {
97            "local" => Some(DurabilityMode::Local),
98            "hybrid" => Some(DurabilityMode::Hybrid),
99            "durably-replicated" => Some(DurabilityMode::DurablyReplicated),
100            _ => None,
101        }
102    }
103
104    /// Stable u8 discriminant. The response stage publishes the
105    /// operator-selected mode through an [`AtomicU8`] so it can detect
106    /// a runtime swap (via the admin `DURABILITY` command) with a
107    /// relaxed load on every gate iteration — cheaper than crossing a
108    /// `Mutex` or carrying a refcounted `Arc<Policy>` snapshot.
109    /// Values are part of the in-process ABI between admin and
110    /// response, not a wire format; they must remain stable so the
111    /// round-trip `from_u8(as_u8(x)) == Some(x)` always holds.
112    pub fn as_u8(self) -> u8 {
113        match self {
114            DurabilityMode::Local => 0,
115            DurabilityMode::Hybrid => 1,
116            DurabilityMode::DurablyReplicated => 2,
117        }
118    }
119
120    /// Inverse of [`as_u8`]. Returns `None` for an unknown byte —
121    /// callers initialise the atomic from a valid mode and the admin
122    /// path only writes `as_u8(parse(s)?)`, so an unknown byte
123    /// indicates memory corruption or a programmer bug. The response
124    /// stage logs and retains the prior mode in that case rather than
125    /// silently falling back.
126    pub fn from_u8(b: u8) -> Option<Self> {
127        match b {
128            0 => Some(DurabilityMode::Local),
129            1 => Some(DurabilityMode::Hybrid),
130            2 => Some(DurabilityMode::DurablyReplicated),
131            _ => None,
132        }
133    }
134}
135
136impl fmt::Display for DurabilityMode {
137    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
138        f.write_str(self.as_str())
139    }
140}
141
142#[cfg(test)]
143mod tests {
144    use super::*;
145
146    #[test]
147    fn durability_mode_u8_round_trip() {
148        for m in [
149            DurabilityMode::Local,
150            DurabilityMode::Hybrid,
151            DurabilityMode::DurablyReplicated,
152        ] {
153            assert_eq!(DurabilityMode::from_u8(m.as_u8()), Some(m));
154        }
155        // Unknown bytes surface as None — the response stage relies on
156        // this to detect a corrupted atomic and retain the prior mode.
157        for b in [3, 4, 255] {
158            assert_eq!(DurabilityMode::from_u8(b), None);
159        }
160    }
161
162    #[test]
163    fn durability_mode_parse_matches_as_str() {
164        for m in [
165            DurabilityMode::Local,
166            DurabilityMode::Hybrid,
167            DurabilityMode::DurablyReplicated,
168        ] {
169            assert_eq!(DurabilityMode::parse(m.as_str()), Some(m));
170        }
171        for bad in ["", "LOCAL", "hyb", "fast", "durably_replicated"] {
172            assert_eq!(DurabilityMode::parse(bad), None, "{bad:?} should not parse");
173        }
174    }
175
176    #[test]
177    fn mode_local_builds_persisted_ge_1() {
178        let p = DurabilityMode::Local.to_policy();
179        assert_eq!(p.clauses().len(), 1);
180        let c = p.clauses()[0];
181        assert_eq!(c.level, Level::Persisted);
182        assert_eq!(c.count, 1);
183    }
184
185    #[test]
186    fn mode_hybrid_builds_persisted_ge_1_and_in_memory_ge_2() {
187        let p = DurabilityMode::Hybrid.to_policy();
188        assert_eq!(p.clauses().len(), 2);
189        let persisted = p
190            .clauses()
191            .iter()
192            .find(|c| c.level == Level::Persisted)
193            .expect("persisted clause");
194        assert_eq!(persisted.count, 1);
195        let in_mem = p
196            .clauses()
197            .iter()
198            .find(|c| c.level == Level::InMemory)
199            .expect("in_memory clause");
200        assert_eq!(in_mem.count, 2);
201    }
202
203    #[test]
204    fn mode_durably_replicated_builds_persisted_ge_2() {
205        let p = DurabilityMode::DurablyReplicated.to_policy();
206        assert_eq!(p.clauses().len(), 1);
207        let c = p.clauses()[0];
208        assert_eq!(c.level, Level::Persisted);
209        assert_eq!(c.count, 2);
210    }
211
212    #[test]
213    fn mode_hybrid_fails_closed_on_single_node() {
214        // The gate must NOT advance when only the primary is present
215        // — in_memory>=2 can't be satisfied. This is the fail-closed
216        // semantic the design call rests on; the dev-evaluator
217        // footgun is caught upstream by the `--standalone` validation
218        // in `server.rs`.
219        let p = DurabilityMode::Hybrid.to_policy();
220        let nodes = [[100u64, 100u64]];
221        let v = CursorView::new(&nodes);
222        assert_eq!(p.evaluate(&v), 0, "hybrid stalls on single-node view");
223    }
224
225    #[test]
226    fn mode_hybrid_advances_with_two_nodes_acking_in_memory() {
227        // Primary at persisted=100, in_memory=100; replica at
228        // in_memory=80, persisted=0. Both clauses' nth-largest must
229        // cross to advance.
230        // persisted>=1: 1st largest persisted = 100.
231        // in_memory>=2: 2nd largest in_memory = 80.
232        // Gate = min(100, 80) = 80.
233        let p = DurabilityMode::Hybrid.to_policy();
234        let nodes = [[100u64, 100u64], [80u64, 0u64]];
235        let v = CursorView::new(&nodes);
236        assert_eq!(p.evaluate(&v), 80);
237    }
238}