Skip to main content

reddb_server/replication/
fence.rs

1//! Stale-term fencing for a returning ex-primary (issue #835, PRD #819, ADR 0030).
2//!
3//! After a failover the cluster serves a *new term*. A former primary that
4//! rejoins on its old, **stale** term must not be able to corrupt the new
5//! timeline — its term-stamped writes and its stream handshakes have to be
6//! refused until it re-syncs and adopts the new term. This module is the
7//! reusable term-comparison primitive both fencing boundaries share:
8//!
9//! * **Apply boundary** — a replica rejects a WAL/logical record whose term
10//!   is behind its current term. The live replica apply path enforces this
11//!   directly in [`super::logical::LogicalChangeApplier::apply`] (it already
12//!   tracks the last-applied term); [`TermFence::admit_record`] is the same
13//!   rule expressed over a durable term so it survives a restart.
14//! * **Handshake boundary** — when a node opens a replication stream it
15//!   declares the term it is streaming under. [`TermFence::admit_handshake`]
16//!   refuses a handshake whose declared term is behind the current term, so a
17//!   stale ex-primary cannot even establish the stream.
18//!
19//! The decision is deliberately the data-path twin of the election-side
20//! [`super::RefusalReason::StaleTerm`]:
21//!
22//! * `incoming == current` → **admit** at the live term;
23//! * `incoming  > current` → a newer timeline supersedes ours, so **adopt**
24//!   the new term (persisted durably) and then admit. This is how a replica
25//!   moves forward when the legitimate new primary streams to it;
26//! * `incoming  < current` → **fenced**: a superseded primary, refused.
27//!
28//! The current term is held behind a [`TermStore`] so adoption is durable —
29//! a replica that crashes after adopting term *N* comes back fencing stale
30//! term *N-1* records rather than briefly accepting them. Production wires the
31//! file-backed store alongside the node's other durable replication state;
32//! tests use the in-memory store.
33
34use crate::serde_json::{self, Value as JsonValue};
35
36/// The boundary at which a term-stamped message is being admitted. Only
37/// affects diagnostics — the term rule is identical at both.
38#[derive(Debug, Clone, Copy, PartialEq, Eq)]
39pub enum FenceBoundary {
40    /// A WAL/logical record being applied on a replica.
41    Apply,
42    /// A replication stream handshake declaring the streamer's term.
43    Handshake,
44}
45
46impl FenceBoundary {
47    pub fn as_str(self) -> &'static str {
48        match self {
49            Self::Apply => "apply",
50            Self::Handshake => "handshake",
51        }
52    }
53}
54
55/// Why the term fence refused a message: the incoming term is behind the
56/// current term, so the sender is a deposed primary on a superseded timeline.
57#[derive(Debug, Clone, Copy, PartialEq, Eq)]
58pub struct StaleTermFenced {
59    pub boundary: FenceBoundary,
60    pub incoming_term: u64,
61    pub current_term: u64,
62}
63
64impl std::fmt::Display for StaleTermFenced {
65    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66        write!(
67            f,
68            "fenced stale-term {} message: incoming term {} is behind current term {}",
69            self.boundary.as_str(),
70            self.incoming_term,
71            self.current_term
72        )
73    }
74}
75
76impl std::error::Error for StaleTermFenced {}
77
78/// The verdict of the term fence for one incoming term-stamped message.
79#[derive(Debug, Clone, Copy, PartialEq, Eq)]
80pub enum FenceVerdict {
81    /// `incoming == current`: admit at the live term.
82    Admit { term: u64 },
83    /// `incoming > current`: a newer timeline. The fence adopted `new_term`
84    /// (persisting it) and the message is admitted under it.
85    Adopt { new_term: u64 },
86    /// `incoming < current`: stale — refused.
87    Fenced(StaleTermFenced),
88}
89
90impl FenceVerdict {
91    /// Was the message let through (either at the live term or after adopting
92    /// a newer one)?
93    pub fn is_admitted(&self) -> bool {
94        matches!(self, Self::Admit { .. } | Self::Adopt { .. })
95    }
96
97    /// Was the message fenced as stale?
98    pub fn is_fenced(&self) -> bool {
99        matches!(self, Self::Fenced(_))
100    }
101}
102
103/// Error reading or persisting the durable current term.
104#[derive(Debug)]
105pub enum TermStoreError {
106    Io(std::io::Error),
107    InvalidFormat(String),
108}
109
110impl std::fmt::Display for TermStoreError {
111    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
112        match self {
113            Self::Io(err) => write!(f, "term store io error: {err}"),
114            Self::InvalidFormat(msg) => write!(f, "invalid term store format: {msg}"),
115        }
116    }
117}
118
119impl std::error::Error for TermStoreError {}
120
121/// Durable store for a node's current replication term. The default (when
122/// nothing was ever written) is
123/// [`DEFAULT_REPLICATION_TERM`](crate::replication::DEFAULT_REPLICATION_TERM),
124/// matching the term records carry before any failover.
125pub trait TermStore {
126    fn load(&self) -> Result<u64, TermStoreError>;
127    fn persist(&self, term: u64) -> Result<(), TermStoreError>;
128}
129
130/// In-memory term store for tests and ephemeral nodes.
131#[derive(Debug)]
132pub struct MemoryTermStore {
133    inner: std::sync::Mutex<u64>,
134}
135
136impl Default for MemoryTermStore {
137    fn default() -> Self {
138        Self::new()
139    }
140}
141
142impl MemoryTermStore {
143    pub fn new() -> Self {
144        Self {
145            inner: std::sync::Mutex::new(crate::replication::DEFAULT_REPLICATION_TERM),
146        }
147    }
148
149    /// Seed an initial term — used by tests to simulate a node that already
150    /// adopted a term before a restart.
151    pub fn seeded(term: u64) -> Self {
152        Self {
153            inner: std::sync::Mutex::new(term),
154        }
155    }
156}
157
158impl TermStore for MemoryTermStore {
159    fn load(&self) -> Result<u64, TermStoreError> {
160        Ok(*self.inner.lock().expect("term store mutex"))
161    }
162
163    fn persist(&self, term: u64) -> Result<(), TermStoreError> {
164        *self.inner.lock().expect("term store mutex") = term;
165        Ok(())
166    }
167}
168
169/// File-backed term store. Persisted with the atomic temp-file + rename +
170/// parent-dir fsync discipline used for the durable last-vote
171/// ([`super::FileLastVoteStore`]) so a crash mid-write never yields a torn
172/// record and an adopted term cannot be silently lost.
173pub struct FileTermStore {
174    path: std::path::PathBuf,
175}
176
177impl FileTermStore {
178    pub fn new(path: impl Into<std::path::PathBuf>) -> Self {
179        Self { path: path.into() }
180    }
181}
182
183impl TermStore for FileTermStore {
184    fn load(&self) -> Result<u64, TermStoreError> {
185        match std::fs::read(&self.path) {
186            Ok(bytes) => {
187                let json: JsonValue = serde_json::from_slice(&bytes)
188                    .map_err(|err| TermStoreError::InvalidFormat(format!("parse: {err}")))?;
189                json.get("term")
190                    .and_then(JsonValue::as_u64)
191                    .ok_or_else(|| TermStoreError::InvalidFormat("missing term".into()))
192            }
193            Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
194                Ok(crate::replication::DEFAULT_REPLICATION_TERM)
195            }
196            Err(err) => Err(TermStoreError::Io(err)),
197        }
198    }
199
200    fn persist(&self, term: u64) -> Result<(), TermStoreError> {
201        let mut obj = serde_json::Map::new();
202        obj.insert("term".to_string(), JsonValue::Number(term as f64));
203        let bytes = serde_json::to_vec(&JsonValue::Object(obj))
204            .map_err(|err| TermStoreError::InvalidFormat(format!("serialize: {err}")))?;
205        if let Some(parent) = self.path.parent() {
206            std::fs::create_dir_all(parent).map_err(TermStoreError::Io)?;
207        }
208        let tmp = self.path.with_extension("term.tmp");
209        std::fs::write(&tmp, &bytes).map_err(TermStoreError::Io)?;
210        if let Ok(f) = std::fs::File::open(&tmp) {
211            let _ = f.sync_all();
212        }
213        std::fs::rename(&tmp, &self.path).map_err(TermStoreError::Io)?;
214        // fsync the parent dir so the rename itself is durable — otherwise a
215        // crash could leave the directory entry pointing at the old term and
216        // let the node briefly accept records it had already fenced.
217        if let Some(parent) = self.path.parent() {
218            if let Ok(dir) = std::fs::File::open(parent) {
219                let _ = dir.sync_all();
220            }
221        }
222        Ok(())
223    }
224}
225
226/// The stale-term fence. Wraps a durable [`TermStore`] and applies the term
227/// rule at the apply and handshake boundaries.
228pub struct TermFence<S: TermStore> {
229    store: S,
230}
231
232impl<S: TermStore> TermFence<S> {
233    pub fn new(store: S) -> Self {
234        Self { store }
235    }
236
237    /// The node's current (highest adopted) term.
238    pub fn current_term(&self) -> Result<u64, TermStoreError> {
239        self.store.load()
240    }
241
242    /// Classify `incoming_term` against the current term **without** mutating
243    /// anything. Pure read — useful for probing a decision before committing
244    /// to the adoption side-effect.
245    pub fn classify(
246        &self,
247        boundary: FenceBoundary,
248        incoming_term: u64,
249    ) -> Result<FenceVerdict, TermStoreError> {
250        let current = self.store.load()?;
251        Ok(if incoming_term < current {
252            FenceVerdict::Fenced(StaleTermFenced {
253                boundary,
254                incoming_term,
255                current_term: current,
256            })
257        } else if incoming_term > current {
258            FenceVerdict::Adopt {
259                new_term: incoming_term,
260            }
261        } else {
262            FenceVerdict::Admit { term: current }
263        })
264    }
265
266    /// Admit (or fence) a term-stamped **record** at the apply boundary. On a
267    /// newer term the fence adopts it durably before returning `Adopt`.
268    pub fn admit_record(&self, incoming_term: u64) -> Result<FenceVerdict, TermStoreError> {
269        self.admit(FenceBoundary::Apply, incoming_term)
270    }
271
272    /// Admit (or fence) a stream **handshake** declaring `incoming_term`. On a
273    /// newer term the fence adopts it durably before returning `Adopt`.
274    pub fn admit_handshake(&self, incoming_term: u64) -> Result<FenceVerdict, TermStoreError> {
275        self.admit(FenceBoundary::Handshake, incoming_term)
276    }
277
278    fn admit(
279        &self,
280        boundary: FenceBoundary,
281        incoming_term: u64,
282    ) -> Result<FenceVerdict, TermStoreError> {
283        let verdict = self.classify(boundary, incoming_term)?;
284        if let FenceVerdict::Adopt { new_term } = verdict {
285            // Persist the adopted term before admitting so the advance is
286            // durable: a crash right after this cannot un-adopt the new term.
287            self.store.persist(new_term)?;
288        }
289        Ok(verdict)
290    }
291
292    /// Force the current term to `new_term`, persisting it. Used after a node
293    /// re-syncs under a known term (e.g. a deposed primary rejoining as a
294    /// replica) so it stops fencing the timeline it has now adopted. Never
295    /// moves the term backwards.
296    pub fn adopt(&self, new_term: u64) -> Result<(), TermStoreError> {
297        let current = self.store.load()?;
298        if new_term > current {
299            self.store.persist(new_term)?;
300        }
301        Ok(())
302    }
303}
304
305#[cfg(test)]
306mod tests {
307    use super::*;
308
309    fn fence(term: u64) -> TermFence<MemoryTermStore> {
310        TermFence::new(MemoryTermStore::seeded(term))
311    }
312
313    // ---- Apply boundary ----
314
315    #[test]
316    fn apply_boundary_fences_stale_term() {
317        let f = fence(5);
318        let verdict = f.admit_record(4).unwrap();
319        assert_eq!(
320            verdict,
321            FenceVerdict::Fenced(StaleTermFenced {
322                boundary: FenceBoundary::Apply,
323                incoming_term: 4,
324                current_term: 5,
325            })
326        );
327        assert!(verdict.is_fenced());
328        // A fenced record must not move the current term.
329        assert_eq!(f.current_term().unwrap(), 5);
330    }
331
332    #[test]
333    fn apply_boundary_admits_current_term() {
334        let f = fence(5);
335        assert_eq!(f.admit_record(5).unwrap(), FenceVerdict::Admit { term: 5 });
336        assert_eq!(f.current_term().unwrap(), 5);
337    }
338
339    #[test]
340    fn apply_boundary_adopts_higher_term_durably() {
341        let f = fence(5);
342        assert_eq!(
343            f.admit_record(8).unwrap(),
344            FenceVerdict::Adopt { new_term: 8 }
345        );
346        // Adoption persisted: the old term is now fenced.
347        assert_eq!(f.current_term().unwrap(), 8);
348        assert!(f.admit_record(5).unwrap().is_fenced());
349    }
350
351    // ---- Handshake boundary ----
352
353    #[test]
354    fn handshake_boundary_fences_stale_term() {
355        let f = fence(7);
356        let verdict = f.admit_handshake(6).unwrap();
357        assert_eq!(
358            verdict,
359            FenceVerdict::Fenced(StaleTermFenced {
360                boundary: FenceBoundary::Handshake,
361                incoming_term: 6,
362                current_term: 7,
363            })
364        );
365        assert!(verdict.is_fenced());
366    }
367
368    #[test]
369    fn handshake_boundary_admits_current_and_adopts_higher() {
370        let f = fence(7);
371        assert_eq!(
372            f.admit_handshake(7).unwrap(),
373            FenceVerdict::Admit { term: 7 }
374        );
375        assert_eq!(
376            f.admit_handshake(9).unwrap(),
377            FenceVerdict::Adopt { new_term: 9 }
378        );
379        assert_eq!(f.current_term().unwrap(), 9);
380    }
381
382    // ---- End-to-end: returning ex-primary on a stale term ----
383
384    #[test]
385    fn returning_ex_primary_is_fenced_until_it_adopts_new_term() {
386        // The replica adopted the new term 6 (handshake from the new primary).
387        let f = fence(5);
388        assert!(matches!(
389            f.admit_handshake(6).unwrap(),
390            FenceVerdict::Adopt { new_term: 6 }
391        ));
392
393        // The ex-primary returns on its stale term 5: both its handshake and
394        // its records are fenced.
395        assert!(f.admit_handshake(5).unwrap().is_fenced());
396        assert!(f.admit_record(5).unwrap().is_fenced());
397
398        // Only after it re-syncs and adopts the new term can it participate —
399        // here it rejoins as a replica that has caught up to term 6.
400        f.adopt(6).unwrap();
401        assert!(f.admit_record(6).unwrap().is_admitted());
402    }
403
404    #[test]
405    fn classify_is_pure_and_does_not_adopt() {
406        let f = fence(3);
407        // Classifying a higher term reports Adopt but must NOT persist it.
408        assert_eq!(
409            f.classify(FenceBoundary::Apply, 9).unwrap(),
410            FenceVerdict::Adopt { new_term: 9 }
411        );
412        assert_eq!(f.current_term().unwrap(), 3, "classify must not mutate");
413    }
414
415    #[test]
416    fn adopt_never_moves_term_backwards() {
417        let f = fence(10);
418        f.adopt(4).unwrap();
419        assert_eq!(f.current_term().unwrap(), 10);
420        f.adopt(12).unwrap();
421        assert_eq!(f.current_term().unwrap(), 12);
422    }
423
424    // ---- File-backed durability ----
425
426    #[test]
427    fn file_term_store_round_trips_and_defaults() {
428        let path = std::env::temp_dir().join(format!(
429            "reddb-term-fence-{}-{}.json",
430            std::process::id(),
431            crate::utils::now_unix_nanos()
432        ));
433        let _ = std::fs::remove_file(&path);
434
435        // Missing file → default base term.
436        let store = FileTermStore::new(&path);
437        assert_eq!(
438            store.load().unwrap(),
439            crate::replication::DEFAULT_REPLICATION_TERM
440        );
441
442        // Adopt across a "restart": a fresh store at the same path still
443        // fences the old term.
444        {
445            let fence = TermFence::new(FileTermStore::new(&path));
446            assert!(matches!(
447                fence.admit_handshake(6).unwrap(),
448                FenceVerdict::Adopt { new_term: 6 }
449            ));
450        }
451        let reopened = TermFence::new(FileTermStore::new(&path));
452        assert_eq!(reopened.current_term().unwrap(), 6);
453        assert!(reopened.admit_record(5).unwrap().is_fenced());
454
455        let _ = std::fs::remove_file(&path);
456    }
457}