Skip to main content

reddb_server/replication/
fence.rs

1//! Stale-term fencing for a returning ex-primary (issue #835, PRD #819, ADR 0030).
2//!
3//! After a failover the cluster serves a *new term*. A former primary that
4//! rejoins on its old, **stale** term must not be able to corrupt the new
5//! timeline — its term-stamped writes and its stream handshakes have to be
6//! refused until it re-syncs and adopts the new term. This module is the
7//! reusable term-comparison primitive both fencing boundaries share:
8//!
9//! * **Apply boundary** — a replica rejects a WAL/logical record whose term
10//!   is behind its current term. The live replica apply path enforces this
11//!   directly in [`super::logical::LogicalChangeApplier::apply`] (it already
12//!   tracks the last-applied term); [`TermFence::admit_record`] is the same
13//!   rule expressed over a durable term so it survives a restart.
14//! * **Handshake boundary** — when a node opens a replication stream it
15//!   declares the term it is streaming under. [`TermFence::admit_handshake`]
16//!   refuses a handshake whose declared term is behind the current term, so a
17//!   stale ex-primary cannot even establish the stream.
18//! * **Lease boundary** — a serverless writer lease is stamped with the term
19//!   it was taken under; a holder whose term is behind the current term fails
20//!   closed before mutating remote artifacts.
21//!
22//! The decision is deliberately the data-path twin of the election-side
23//! [`super::RefusalReason::StaleTerm`]:
24//!
25//! * `incoming == current` → **admit** at the live term;
26//! * `incoming  > current` → a newer timeline supersedes ours, so **adopt**
27//!   the new term (persisted durably) and then admit. This is how a replica
28//!   moves forward when the legitimate new primary streams to it;
29//! * `incoming  < current` → **fenced**: a superseded primary, refused.
30//!
31//! The current term is held behind a [`TermStore`] so adoption is durable —
32//! a replica that crashes after adopting term *N* comes back fencing stale
33//! term *N-1* records rather than briefly accepting them. Production wires the
34//! file-backed store alongside the node's other durable replication state;
35//! tests use the in-memory store.
36
37pub use reddb_file::FileTermStore;
38
39/// The boundary at which a term-stamped message is being admitted. Only
40/// affects diagnostics — the term rule is identical at both.
41#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42pub enum FenceBoundary {
43    /// A WAL/logical record being applied on a replica.
44    Apply,
45    /// A replication stream handshake declaring the streamer's term.
46    Handshake,
47    /// A serverless writer lease attempting to mutate under its lease term.
48    Lease,
49}
50
51impl FenceBoundary {
52    pub fn as_str(self) -> &'static str {
53        match self {
54            Self::Apply => "apply",
55            Self::Handshake => "handshake",
56            Self::Lease => "lease",
57        }
58    }
59}
60
61/// The one shared stale-term predicate: only a strictly older incoming term is
62/// stale. Equal terms are retries; newer terms are adoption candidates.
63#[inline]
64pub fn term_is_stale(incoming_term: u64, current_term: u64) -> bool {
65    incoming_term < current_term
66}
67
68/// A replication-stream handshake as seen by the admitting replica.
69#[derive(Debug, Clone, PartialEq, Eq)]
70pub struct StreamHandshake {
71    pub peer_id: String,
72    pub term: u64,
73}
74
75impl StreamHandshake {
76    pub fn new(peer_id: impl Into<String>, term: u64) -> Self {
77        Self {
78            peer_id: peer_id.into(),
79            term,
80        }
81    }
82}
83
84/// Why the term fence refused a message: the incoming term is behind the
85/// current term, so the sender is a deposed primary on a superseded timeline.
86#[derive(Debug, Clone, Copy, PartialEq, Eq)]
87pub struct StaleTermFenced {
88    pub boundary: FenceBoundary,
89    pub incoming_term: u64,
90    pub current_term: u64,
91}
92
93impl std::fmt::Display for StaleTermFenced {
94    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95        write!(
96            f,
97            "fenced stale-term {} message: incoming term {} is behind current term {}",
98            self.boundary.as_str(),
99            self.incoming_term,
100            self.current_term
101        )
102    }
103}
104
105impl std::error::Error for StaleTermFenced {}
106
107pub type StaleTermRejection = StaleTermFenced;
108
109/// The verdict of the term fence for one incoming term-stamped message.
110#[derive(Debug, Clone, Copy, PartialEq, Eq)]
111pub enum FenceVerdict {
112    /// `incoming == current`: admit at the live term.
113    Admit { term: u64 },
114    /// `incoming > current`: a newer timeline. The fence adopted `new_term`
115    /// (persisting it) and the message is admitted under it.
116    Adopt { new_term: u64 },
117    /// `incoming < current`: stale — refused.
118    Fenced(StaleTermFenced),
119}
120
121impl FenceVerdict {
122    /// Was the message let through (either at the live term or after adopting
123    /// a newer one)?
124    pub fn is_admitted(&self) -> bool {
125        matches!(self, Self::Admit { .. } | Self::Adopt { .. })
126    }
127
128    /// Was the message fenced as stale?
129    pub fn is_fenced(&self) -> bool {
130        matches!(self, Self::Fenced(_))
131    }
132}
133
134/// Error reading or persisting the durable current term.
135#[derive(Debug)]
136pub enum TermStoreError {
137    Io(std::io::Error),
138    InvalidFormat(String),
139}
140
141impl std::fmt::Display for TermStoreError {
142    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
143        match self {
144            Self::Io(err) => write!(f, "term store io error: {err}"),
145            Self::InvalidFormat(msg) => write!(f, "invalid term store format: {msg}"),
146        }
147    }
148}
149
150impl std::error::Error for TermStoreError {}
151
152impl From<reddb_file::RdbFileError> for TermStoreError {
153    fn from(value: reddb_file::RdbFileError) -> Self {
154        match value {
155            reddb_file::RdbFileError::Io(err) => Self::Io(err),
156            reddb_file::RdbFileError::InvalidOperation(msg) => Self::InvalidFormat(msg),
157        }
158    }
159}
160
161/// Durable store for a node's current replication term. The default (when
162/// nothing was ever written) is
163/// [`DEFAULT_REPLICATION_TERM`](crate::replication::DEFAULT_REPLICATION_TERM),
164/// matching the term records carry before any failover.
165pub trait TermStore {
166    fn load(&self) -> Result<u64, TermStoreError>;
167    fn persist(&self, term: u64) -> Result<(), TermStoreError>;
168}
169
170/// In-memory term store for tests and ephemeral nodes.
171#[derive(Debug)]
172pub struct MemoryTermStore {
173    inner: std::sync::Mutex<u64>,
174}
175
176impl Default for MemoryTermStore {
177    fn default() -> Self {
178        Self::new()
179    }
180}
181
182impl MemoryTermStore {
183    pub fn new() -> Self {
184        Self {
185            inner: std::sync::Mutex::new(crate::replication::DEFAULT_REPLICATION_TERM),
186        }
187    }
188
189    /// Seed an initial term — used by tests to simulate a node that already
190    /// adopted a term before a restart.
191    pub fn seeded(term: u64) -> Self {
192        Self {
193            inner: std::sync::Mutex::new(term),
194        }
195    }
196}
197
198impl TermStore for MemoryTermStore {
199    fn load(&self) -> Result<u64, TermStoreError> {
200        Ok(*self.inner.lock().expect("term store mutex"))
201    }
202
203    fn persist(&self, term: u64) -> Result<(), TermStoreError> {
204        *self.inner.lock().expect("term store mutex") = term;
205        Ok(())
206    }
207}
208
209impl TermStore for FileTermStore {
210    fn load(&self) -> Result<u64, TermStoreError> {
211        self.load_file().map_err(TermStoreError::from)
212    }
213
214    fn persist(&self, term: u64) -> Result<(), TermStoreError> {
215        self.persist_file(term).map_err(TermStoreError::from)
216    }
217}
218
219/// The stale-term fence. Wraps a durable [`TermStore`] and applies the term
220/// rule at the apply and handshake boundaries.
221pub struct TermFence<S: TermStore> {
222    store: S,
223}
224
225impl<S: TermStore> TermFence<S> {
226    pub fn new(store: S) -> Self {
227        Self { store }
228    }
229
230    /// The node's current (highest adopted) term.
231    pub fn current_term(&self) -> Result<u64, TermStoreError> {
232        self.store.load()
233    }
234
235    /// Classify `incoming_term` against the current term **without** mutating
236    /// anything. Pure read — useful for probing a decision before committing
237    /// to the adoption side-effect.
238    pub fn classify(
239        &self,
240        boundary: FenceBoundary,
241        incoming_term: u64,
242    ) -> Result<FenceVerdict, TermStoreError> {
243        let current = self.store.load()?;
244        Ok(if term_is_stale(incoming_term, current) {
245            FenceVerdict::Fenced(StaleTermFenced {
246                boundary,
247                incoming_term,
248                current_term: current,
249            })
250        } else if incoming_term > current {
251            FenceVerdict::Adopt {
252                new_term: incoming_term,
253            }
254        } else {
255            FenceVerdict::Admit { term: current }
256        })
257    }
258
259    /// Admit (or fence) a term-stamped **record** at the apply boundary. On a
260    /// newer term the fence adopts it durably before returning `Adopt`.
261    pub fn admit_record(&self, incoming_term: u64) -> Result<FenceVerdict, TermStoreError> {
262        self.admit(FenceBoundary::Apply, incoming_term)
263    }
264
265    /// Admit (or fence) a stream **handshake** declaring `incoming_term`. On a
266    /// newer term the fence adopts it durably before returning `Adopt`.
267    pub fn admit_handshake(&self, incoming_term: u64) -> Result<FenceVerdict, TermStoreError> {
268        self.admit(FenceBoundary::Handshake, incoming_term)
269    }
270
271    /// Admit (or fence) a stream handshake carrying peer metadata.
272    pub fn admit_stream_handshake(
273        &self,
274        handshake: &StreamHandshake,
275    ) -> Result<FenceVerdict, TermStoreError> {
276        self.admit_handshake(handshake.term)
277    }
278
279    /// Admit (or fence) a lease-backed write whose lease was taken under
280    /// `lease_term`.
281    pub fn admit_lease_write(&self, lease_term: u64) -> Result<FenceVerdict, TermStoreError> {
282        self.admit(FenceBoundary::Lease, lease_term)
283    }
284
285    fn admit(
286        &self,
287        boundary: FenceBoundary,
288        incoming_term: u64,
289    ) -> Result<FenceVerdict, TermStoreError> {
290        let verdict = self.classify(boundary, incoming_term)?;
291        if let FenceVerdict::Adopt { new_term } = verdict {
292            // Persist the adopted term before admitting so the advance is
293            // durable: a crash right after this cannot un-adopt the new term.
294            self.store.persist(new_term)?;
295        }
296        Ok(verdict)
297    }
298
299    /// Force the current term to `new_term`, persisting it. Used after a node
300    /// re-syncs under a known term (e.g. a deposed primary rejoining as a
301    /// replica) so it stops fencing the timeline it has now adopted. Never
302    /// moves the term backwards.
303    pub fn adopt(&self, new_term: u64) -> Result<(), TermStoreError> {
304        let current = self.store.load()?;
305        if new_term > current {
306            self.store.persist(new_term)?;
307        }
308        Ok(())
309    }
310}
311
312#[cfg(test)]
313mod tests {
314    use super::*;
315
316    fn fence(term: u64) -> TermFence<MemoryTermStore> {
317        TermFence::new(MemoryTermStore::seeded(term))
318    }
319
320    // ---- Apply boundary ----
321
322    #[test]
323    fn apply_boundary_fences_stale_term() {
324        let f = fence(5);
325        let verdict = f.admit_record(4).unwrap();
326        assert_eq!(
327            verdict,
328            FenceVerdict::Fenced(StaleTermFenced {
329                boundary: FenceBoundary::Apply,
330                incoming_term: 4,
331                current_term: 5,
332            })
333        );
334        assert!(verdict.is_fenced());
335        // A fenced record must not move the current term.
336        assert_eq!(f.current_term().unwrap(), 5);
337    }
338
339    #[test]
340    fn apply_boundary_admits_current_term() {
341        let f = fence(5);
342        assert_eq!(f.admit_record(5).unwrap(), FenceVerdict::Admit { term: 5 });
343        assert_eq!(f.current_term().unwrap(), 5);
344    }
345
346    #[test]
347    fn apply_boundary_adopts_higher_term_durably() {
348        let f = fence(5);
349        assert_eq!(
350            f.admit_record(8).unwrap(),
351            FenceVerdict::Adopt { new_term: 8 }
352        );
353        // Adoption persisted: the old term is now fenced.
354        assert_eq!(f.current_term().unwrap(), 8);
355        assert!(f.admit_record(5).unwrap().is_fenced());
356    }
357
358    // ---- Handshake boundary ----
359
360    #[test]
361    fn handshake_boundary_fences_stale_term() {
362        let f = fence(7);
363        let verdict = f.admit_handshake(6).unwrap();
364        assert_eq!(
365            verdict,
366            FenceVerdict::Fenced(StaleTermFenced {
367                boundary: FenceBoundary::Handshake,
368                incoming_term: 6,
369                current_term: 7,
370            })
371        );
372        assert!(verdict.is_fenced());
373    }
374
375    #[test]
376    fn handshake_boundary_admits_current_and_adopts_higher() {
377        let f = fence(7);
378        assert_eq!(
379            f.admit_handshake(7).unwrap(),
380            FenceVerdict::Admit { term: 7 }
381        );
382        assert_eq!(
383            f.admit_handshake(9).unwrap(),
384            FenceVerdict::Adopt { new_term: 9 }
385        );
386        assert_eq!(f.current_term().unwrap(), 9);
387    }
388
389    // ---- End-to-end: returning ex-primary on a stale term ----
390
391    #[test]
392    fn returning_ex_primary_is_fenced_until_it_adopts_new_term() {
393        // The replica adopted the new term 6 (handshake from the new primary).
394        let f = fence(5);
395        assert!(matches!(
396            f.admit_handshake(6).unwrap(),
397            FenceVerdict::Adopt { new_term: 6 }
398        ));
399
400        // The ex-primary returns on its stale term 5: both its handshake and
401        // its records are fenced.
402        assert!(f.admit_handshake(5).unwrap().is_fenced());
403        assert!(f.admit_record(5).unwrap().is_fenced());
404
405        // Only after it re-syncs and adopts the new term can it participate —
406        // here it rejoins as a replica that has caught up to term 6.
407        f.adopt(6).unwrap();
408        assert!(f.admit_record(6).unwrap().is_admitted());
409    }
410
411    #[test]
412    fn classify_is_pure_and_does_not_adopt() {
413        let f = fence(3);
414        // Classifying a higher term reports Adopt but must NOT persist it.
415        assert_eq!(
416            f.classify(FenceBoundary::Apply, 9).unwrap(),
417            FenceVerdict::Adopt { new_term: 9 }
418        );
419        assert_eq!(f.current_term().unwrap(), 3, "classify must not mutate");
420    }
421
422    #[test]
423    fn adopt_never_moves_term_backwards() {
424        let f = fence(10);
425        f.adopt(4).unwrap();
426        assert_eq!(f.current_term().unwrap(), 10);
427        f.adopt(12).unwrap();
428        assert_eq!(f.current_term().unwrap(), 12);
429    }
430
431    // ---- File-backed durability ----
432
433    #[test]
434    fn file_term_store_round_trips_and_defaults() {
435        let path = std::env::temp_dir().join(format!(
436            "reddb-term-fence-{}-{}.json",
437            std::process::id(),
438            crate::utils::now_unix_nanos()
439        ));
440        let _ = std::fs::remove_file(&path);
441
442        // Missing file → default base term.
443        let store = FileTermStore::new(&path);
444        assert_eq!(
445            store.load().unwrap(),
446            crate::replication::DEFAULT_REPLICATION_TERM
447        );
448
449        // Adopt across a "restart": a fresh store at the same path still
450        // fences the old term.
451        {
452            let fence = TermFence::new(FileTermStore::new(&path));
453            assert!(matches!(
454                fence.admit_handshake(6).unwrap(),
455                FenceVerdict::Adopt { new_term: 6 }
456            ));
457        }
458        let reopened = TermFence::new(FileTermStore::new(&path));
459        assert_eq!(reopened.current_term().unwrap(), 6);
460        assert!(reopened.admit_record(5).unwrap().is_fenced());
461
462        let _ = std::fs::remove_file(&path);
463    }
464}