Skip to main content

reddb_server/replication/
fence.rs

1//! Stale-term fencing for a returning ex-primary (issue #835, PRD #819, ADR 0030).
2//!
3//! After a failover the cluster serves a *new term*. A former primary that
4//! rejoins on its old, **stale** term must not be able to corrupt the new
5//! timeline — its term-stamped writes and its stream handshakes have to be
6//! refused until it re-syncs and adopts the new term. This module is the
7//! reusable term-comparison primitive both fencing boundaries share:
8//!
9//! * **Apply boundary** — a replica rejects a WAL/logical record whose term
10//!   is behind its current term. The live replica apply path enforces this
11//!   directly in [`super::logical::LogicalChangeApplier::apply`] (it already
12//!   tracks the last-applied term); [`TermFence::admit_record`] is the same
13//!   rule expressed over a durable term so it survives a restart.
14//! * **Handshake boundary** — when a node opens a replication stream it
15//!   declares the term it is streaming under. [`TermFence::admit_handshake`]
16//!   refuses a handshake whose declared term is behind the current term, so a
17//!   stale ex-primary cannot even establish the stream.
18//!
19//! The decision is deliberately the data-path twin of the election-side
20//! [`super::RefusalReason::StaleTerm`]:
21//!
22//! * `incoming == current` → **admit** at the live term;
23//! * `incoming  > current` → a newer timeline supersedes ours, so **adopt**
24//!   the new term (persisted durably) and then admit. This is how a replica
25//!   moves forward when the legitimate new primary streams to it;
26//! * `incoming  < current` → **fenced**: a superseded primary, refused.
27//!
28//! The current term is held behind a [`TermStore`] so adoption is durable —
29//! a replica that crashes after adopting term *N* comes back fencing stale
30//! term *N-1* records rather than briefly accepting them. Production wires the
31//! file-backed store alongside the node's other durable replication state;
32//! tests use the in-memory store.
33
34pub use reddb_file::FileTermStore;
35
36/// The boundary at which a term-stamped message is being admitted. Only
37/// affects diagnostics — the term rule is identical at both.
38#[derive(Debug, Clone, Copy, PartialEq, Eq)]
39pub enum FenceBoundary {
40    /// A WAL/logical record being applied on a replica.
41    Apply,
42    /// A replication stream handshake declaring the streamer's term.
43    Handshake,
44}
45
46impl FenceBoundary {
47    pub fn as_str(self) -> &'static str {
48        match self {
49            Self::Apply => "apply",
50            Self::Handshake => "handshake",
51        }
52    }
53}
54
55/// Why the term fence refused a message: the incoming term is behind the
56/// current term, so the sender is a deposed primary on a superseded timeline.
57#[derive(Debug, Clone, Copy, PartialEq, Eq)]
58pub struct StaleTermFenced {
59    pub boundary: FenceBoundary,
60    pub incoming_term: u64,
61    pub current_term: u64,
62}
63
64impl std::fmt::Display for StaleTermFenced {
65    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66        write!(
67            f,
68            "fenced stale-term {} message: incoming term {} is behind current term {}",
69            self.boundary.as_str(),
70            self.incoming_term,
71            self.current_term
72        )
73    }
74}
75
76impl std::error::Error for StaleTermFenced {}
77
78/// The verdict of the term fence for one incoming term-stamped message.
79#[derive(Debug, Clone, Copy, PartialEq, Eq)]
80pub enum FenceVerdict {
81    /// `incoming == current`: admit at the live term.
82    Admit { term: u64 },
83    /// `incoming > current`: a newer timeline. The fence adopted `new_term`
84    /// (persisting it) and the message is admitted under it.
85    Adopt { new_term: u64 },
86    /// `incoming < current`: stale — refused.
87    Fenced(StaleTermFenced),
88}
89
90impl FenceVerdict {
91    /// Was the message let through (either at the live term or after adopting
92    /// a newer one)?
93    pub fn is_admitted(&self) -> bool {
94        matches!(self, Self::Admit { .. } | Self::Adopt { .. })
95    }
96
97    /// Was the message fenced as stale?
98    pub fn is_fenced(&self) -> bool {
99        matches!(self, Self::Fenced(_))
100    }
101}
102
103/// Error reading or persisting the durable current term.
104#[derive(Debug)]
105pub enum TermStoreError {
106    Io(std::io::Error),
107    InvalidFormat(String),
108}
109
110impl std::fmt::Display for TermStoreError {
111    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
112        match self {
113            Self::Io(err) => write!(f, "term store io error: {err}"),
114            Self::InvalidFormat(msg) => write!(f, "invalid term store format: {msg}"),
115        }
116    }
117}
118
119impl std::error::Error for TermStoreError {}
120
121impl From<reddb_file::RdbFileError> for TermStoreError {
122    fn from(value: reddb_file::RdbFileError) -> Self {
123        match value {
124            reddb_file::RdbFileError::Io(err) => Self::Io(err),
125            reddb_file::RdbFileError::InvalidOperation(msg) => Self::InvalidFormat(msg),
126        }
127    }
128}
129
130/// Durable store for a node's current replication term. The default (when
131/// nothing was ever written) is
132/// [`DEFAULT_REPLICATION_TERM`](crate::replication::DEFAULT_REPLICATION_TERM),
133/// matching the term records carry before any failover.
134pub trait TermStore {
135    fn load(&self) -> Result<u64, TermStoreError>;
136    fn persist(&self, term: u64) -> Result<(), TermStoreError>;
137}
138
139/// In-memory term store for tests and ephemeral nodes.
140#[derive(Debug)]
141pub struct MemoryTermStore {
142    inner: std::sync::Mutex<u64>,
143}
144
145impl Default for MemoryTermStore {
146    fn default() -> Self {
147        Self::new()
148    }
149}
150
151impl MemoryTermStore {
152    pub fn new() -> Self {
153        Self {
154            inner: std::sync::Mutex::new(crate::replication::DEFAULT_REPLICATION_TERM),
155        }
156    }
157
158    /// Seed an initial term — used by tests to simulate a node that already
159    /// adopted a term before a restart.
160    pub fn seeded(term: u64) -> Self {
161        Self {
162            inner: std::sync::Mutex::new(term),
163        }
164    }
165}
166
167impl TermStore for MemoryTermStore {
168    fn load(&self) -> Result<u64, TermStoreError> {
169        Ok(*self.inner.lock().expect("term store mutex"))
170    }
171
172    fn persist(&self, term: u64) -> Result<(), TermStoreError> {
173        *self.inner.lock().expect("term store mutex") = term;
174        Ok(())
175    }
176}
177
178impl TermStore for FileTermStore {
179    fn load(&self) -> Result<u64, TermStoreError> {
180        self.load_file().map_err(TermStoreError::from)
181    }
182
183    fn persist(&self, term: u64) -> Result<(), TermStoreError> {
184        self.persist_file(term).map_err(TermStoreError::from)
185    }
186}
187
188/// The stale-term fence. Wraps a durable [`TermStore`] and applies the term
189/// rule at the apply and handshake boundaries.
190pub struct TermFence<S: TermStore> {
191    store: S,
192}
193
194impl<S: TermStore> TermFence<S> {
195    pub fn new(store: S) -> Self {
196        Self { store }
197    }
198
199    /// The node's current (highest adopted) term.
200    pub fn current_term(&self) -> Result<u64, TermStoreError> {
201        self.store.load()
202    }
203
204    /// Classify `incoming_term` against the current term **without** mutating
205    /// anything. Pure read — useful for probing a decision before committing
206    /// to the adoption side-effect.
207    pub fn classify(
208        &self,
209        boundary: FenceBoundary,
210        incoming_term: u64,
211    ) -> Result<FenceVerdict, TermStoreError> {
212        let current = self.store.load()?;
213        Ok(if incoming_term < current {
214            FenceVerdict::Fenced(StaleTermFenced {
215                boundary,
216                incoming_term,
217                current_term: current,
218            })
219        } else if incoming_term > current {
220            FenceVerdict::Adopt {
221                new_term: incoming_term,
222            }
223        } else {
224            FenceVerdict::Admit { term: current }
225        })
226    }
227
228    /// Admit (or fence) a term-stamped **record** at the apply boundary. On a
229    /// newer term the fence adopts it durably before returning `Adopt`.
230    pub fn admit_record(&self, incoming_term: u64) -> Result<FenceVerdict, TermStoreError> {
231        self.admit(FenceBoundary::Apply, incoming_term)
232    }
233
234    /// Admit (or fence) a stream **handshake** declaring `incoming_term`. On a
235    /// newer term the fence adopts it durably before returning `Adopt`.
236    pub fn admit_handshake(&self, incoming_term: u64) -> Result<FenceVerdict, TermStoreError> {
237        self.admit(FenceBoundary::Handshake, incoming_term)
238    }
239
240    fn admit(
241        &self,
242        boundary: FenceBoundary,
243        incoming_term: u64,
244    ) -> Result<FenceVerdict, TermStoreError> {
245        let verdict = self.classify(boundary, incoming_term)?;
246        if let FenceVerdict::Adopt { new_term } = verdict {
247            // Persist the adopted term before admitting so the advance is
248            // durable: a crash right after this cannot un-adopt the new term.
249            self.store.persist(new_term)?;
250        }
251        Ok(verdict)
252    }
253
254    /// Force the current term to `new_term`, persisting it. Used after a node
255    /// re-syncs under a known term (e.g. a deposed primary rejoining as a
256    /// replica) so it stops fencing the timeline it has now adopted. Never
257    /// moves the term backwards.
258    pub fn adopt(&self, new_term: u64) -> Result<(), TermStoreError> {
259        let current = self.store.load()?;
260        if new_term > current {
261            self.store.persist(new_term)?;
262        }
263        Ok(())
264    }
265}
266
267#[cfg(test)]
268mod tests {
269    use super::*;
270
271    fn fence(term: u64) -> TermFence<MemoryTermStore> {
272        TermFence::new(MemoryTermStore::seeded(term))
273    }
274
275    // ---- Apply boundary ----
276
277    #[test]
278    fn apply_boundary_fences_stale_term() {
279        let f = fence(5);
280        let verdict = f.admit_record(4).unwrap();
281        assert_eq!(
282            verdict,
283            FenceVerdict::Fenced(StaleTermFenced {
284                boundary: FenceBoundary::Apply,
285                incoming_term: 4,
286                current_term: 5,
287            })
288        );
289        assert!(verdict.is_fenced());
290        // A fenced record must not move the current term.
291        assert_eq!(f.current_term().unwrap(), 5);
292    }
293
294    #[test]
295    fn apply_boundary_admits_current_term() {
296        let f = fence(5);
297        assert_eq!(f.admit_record(5).unwrap(), FenceVerdict::Admit { term: 5 });
298        assert_eq!(f.current_term().unwrap(), 5);
299    }
300
301    #[test]
302    fn apply_boundary_adopts_higher_term_durably() {
303        let f = fence(5);
304        assert_eq!(
305            f.admit_record(8).unwrap(),
306            FenceVerdict::Adopt { new_term: 8 }
307        );
308        // Adoption persisted: the old term is now fenced.
309        assert_eq!(f.current_term().unwrap(), 8);
310        assert!(f.admit_record(5).unwrap().is_fenced());
311    }
312
313    // ---- Handshake boundary ----
314
315    #[test]
316    fn handshake_boundary_fences_stale_term() {
317        let f = fence(7);
318        let verdict = f.admit_handshake(6).unwrap();
319        assert_eq!(
320            verdict,
321            FenceVerdict::Fenced(StaleTermFenced {
322                boundary: FenceBoundary::Handshake,
323                incoming_term: 6,
324                current_term: 7,
325            })
326        );
327        assert!(verdict.is_fenced());
328    }
329
330    #[test]
331    fn handshake_boundary_admits_current_and_adopts_higher() {
332        let f = fence(7);
333        assert_eq!(
334            f.admit_handshake(7).unwrap(),
335            FenceVerdict::Admit { term: 7 }
336        );
337        assert_eq!(
338            f.admit_handshake(9).unwrap(),
339            FenceVerdict::Adopt { new_term: 9 }
340        );
341        assert_eq!(f.current_term().unwrap(), 9);
342    }
343
344    // ---- End-to-end: returning ex-primary on a stale term ----
345
346    #[test]
347    fn returning_ex_primary_is_fenced_until_it_adopts_new_term() {
348        // The replica adopted the new term 6 (handshake from the new primary).
349        let f = fence(5);
350        assert!(matches!(
351            f.admit_handshake(6).unwrap(),
352            FenceVerdict::Adopt { new_term: 6 }
353        ));
354
355        // The ex-primary returns on its stale term 5: both its handshake and
356        // its records are fenced.
357        assert!(f.admit_handshake(5).unwrap().is_fenced());
358        assert!(f.admit_record(5).unwrap().is_fenced());
359
360        // Only after it re-syncs and adopts the new term can it participate —
361        // here it rejoins as a replica that has caught up to term 6.
362        f.adopt(6).unwrap();
363        assert!(f.admit_record(6).unwrap().is_admitted());
364    }
365
366    #[test]
367    fn classify_is_pure_and_does_not_adopt() {
368        let f = fence(3);
369        // Classifying a higher term reports Adopt but must NOT persist it.
370        assert_eq!(
371            f.classify(FenceBoundary::Apply, 9).unwrap(),
372            FenceVerdict::Adopt { new_term: 9 }
373        );
374        assert_eq!(f.current_term().unwrap(), 3, "classify must not mutate");
375    }
376
377    #[test]
378    fn adopt_never_moves_term_backwards() {
379        let f = fence(10);
380        f.adopt(4).unwrap();
381        assert_eq!(f.current_term().unwrap(), 10);
382        f.adopt(12).unwrap();
383        assert_eq!(f.current_term().unwrap(), 12);
384    }
385
386    // ---- File-backed durability ----
387
388    #[test]
389    fn file_term_store_round_trips_and_defaults() {
390        let path = std::env::temp_dir().join(format!(
391            "reddb-term-fence-{}-{}.json",
392            std::process::id(),
393            crate::utils::now_unix_nanos()
394        ));
395        let _ = std::fs::remove_file(&path);
396
397        // Missing file → default base term.
398        let store = FileTermStore::new(&path);
399        assert_eq!(
400            store.load().unwrap(),
401            crate::replication::DEFAULT_REPLICATION_TERM
402        );
403
404        // Adopt across a "restart": a fresh store at the same path still
405        // fences the old term.
406        {
407            let fence = TermFence::new(FileTermStore::new(&path));
408            assert!(matches!(
409                fence.admit_handshake(6).unwrap(),
410                FenceVerdict::Adopt { new_term: 6 }
411            ));
412        }
413        let reopened = TermFence::new(FileTermStore::new(&path));
414        assert_eq!(reopened.current_term().unwrap(), 6);
415        assert!(reopened.admit_record(5).unwrap().is_fenced());
416
417        let _ = std::fs::remove_file(&path);
418    }
419}