Skip to main content

reddb_server/replication/
swap_db.rs

1//! Stay-readable re-bootstrap with an atomic dataset swap (issue #837,
2//! PRD #819).
3//!
4//! When a replica must re-bootstrap — discard its current dataset and
5//! load a fresh snapshot from the primary — it must not go dark. Read
6//! capacity is most precious exactly then, because a re-bootstrap is
7//! often triggered *because* another node is already down. [`SwapDb`]
8//! keeps the old data fully readable for the entire rebuild and swaps
9//! to the fresh dataset in one atomic step at the end.
10//!
11//! ## The two-state guarantee
12//!
13//! * **Stay-readable.** Non-causal reads ([`SwapDb::read_noncausal`])
14//!   are *always* served from the currently-installed dataset — the
15//!   old data throughout the rebuild, the new data after the swap.
16//!   They never block and never fail.
17//! * **Causal correctness.** While a rebuild is in flight the node's
18//!   applied frontier describes data it is *about to throw away*, so a
19//!   bookmark read served from it could observe a commit that the
20//!   post-swap dataset has not yet reached. [`SwapDb::read_causal`]
21//!   therefore refuses ([`RebootstrapInProgress`]) for the duration of
22//!   the rebuild; the caller routes that read to a caught-up peer. The
23//!   same signal is surfaced on the wire via
24//!   [`crate::replication::primary::ReplicaState::rebootstrapping`] so
25//!   the *client* routing table excludes the node before the read ever
26//!   reaches it.
27//!
28//! ## Atomicity
29//!
30//! The installed dataset is an `Arc<D>` behind an `RwLock`. A reader
31//! clones the `Arc` under a short read lock and then works against its
32//! own handle. [`SwapDb::complete_rebootstrap`] takes the write lock
33//! just long enough to replace the pointer. A reader that captured the
34//! old `Arc` before the swap keeps observing a complete old dataset;
35//! a reader that captures after the swap sees a complete new one.
36//! There is no window in which a half-built dataset is visible — the
37//! swap publishes the fresh `D` only once it is fully constructed.
38
39use std::sync::atomic::{AtomicBool, Ordering};
40use std::sync::{Arc, RwLock};
41
42/// A causal read was requested while the node is re-bootstrapping.
43///
44/// The node is intentionally refusing to serve a bookmark read from a
45/// dataset it is about to discard. The caller is expected to route the
46/// read elsewhere (a caught-up peer, or the primary) — never to treat
47/// this as a hard error surfaced to the application.
48#[derive(Debug, Clone, Copy, PartialEq, Eq)]
49pub struct RebootstrapInProgress;
50
51impl std::fmt::Display for RebootstrapInProgress {
52    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
53        write!(
54            f,
55            "node is re-bootstrapping; causal read must route to a caught-up peer"
56        )
57    }
58}
59
60impl std::error::Error for RebootstrapInProgress {}
61
62/// A dataset that stays readable across an atomic re-bootstrap swap.
63///
64/// Generic over the installed dataset `D` so the replication engine,
65/// the integration tests, and any future caller share one swap
66/// discipline rather than re-implementing the lock dance. `D` is held
67/// behind an `Arc`, so a "swap" is a single pointer write and old
68/// readers keep their snapshot alive.
69pub struct SwapDb<D> {
70    /// The currently-installed dataset. Readers clone the `Arc`; the
71    /// rebuild replaces the pointer under the write lock.
72    current: RwLock<Arc<D>>,
73    /// `true` from [`Self::begin_rebootstrap`] until the matching
74    /// [`Self::complete_rebootstrap`]. Gates causal reads and is the
75    /// value mirrored into the topology advertisement.
76    rebootstrapping: AtomicBool,
77}
78
79impl<D> SwapDb<D> {
80    /// Install `data` as the initial dataset. The node starts *not*
81    /// re-bootstrapping — it is serving normally.
82    pub fn new(data: D) -> Self {
83        Self {
84            current: RwLock::new(Arc::new(data)),
85            rebootstrapping: AtomicBool::new(false),
86        }
87    }
88
89    /// `true` while a re-bootstrap is in flight. This is exactly the
90    /// value the topology advertiser surfaces as
91    /// `ReplicaInfo::rebootstrapping`.
92    pub fn is_rebootstrapping(&self) -> bool {
93        self.rebootstrapping.load(Ordering::Acquire)
94    }
95
96    /// The currently-installed dataset, cloned as an `Arc`. Always
97    /// available — this is the stay-readable path. During a rebuild it
98    /// returns the *old* data; after [`Self::complete_rebootstrap`] it
99    /// returns the new data.
100    pub fn snapshot(&self) -> Arc<D> {
101        Arc::clone(&self.current.read().unwrap_or_else(|e| e.into_inner()))
102    }
103
104    /// Serve a non-causal read: always the currently-installed
105    /// dataset, rebuild in flight or not. Never blocks on the rebuild,
106    /// never fails. Identical to [`Self::snapshot`]; named for intent
107    /// at the call site.
108    pub fn read_noncausal(&self) -> Arc<D> {
109        self.snapshot()
110    }
111
112    /// Serve a causal (bookmark) read.
113    ///
114    /// Returns the installed dataset only when the node is *not*
115    /// re-bootstrapping. While a rebuild is in flight it returns
116    /// [`RebootstrapInProgress`] so the caller bounces the read to a
117    /// caught-up peer — never serving a bookmark from data the node is
118    /// about to discard.
119    pub fn read_causal(&self) -> Result<Arc<D>, RebootstrapInProgress> {
120        if self.is_rebootstrapping() {
121            return Err(RebootstrapInProgress);
122        }
123        Ok(self.snapshot())
124    }
125
126    /// Enter the re-bootstrap state. Idempotent: calling it while
127    /// already rebuilding is a no-op. The installed dataset is left
128    /// untouched, so non-causal reads keep flowing from the old data
129    /// while the fresh snapshot loads in the background.
130    pub fn begin_rebootstrap(&self) {
131        self.rebootstrapping.store(true, Ordering::Release);
132    }
133
134    /// Atomically install `fresh` as the new dataset and leave the
135    /// re-bootstrap state.
136    ///
137    /// The pointer swap happens under the write lock; the
138    /// `rebootstrapping` flag is cleared only *after* the new dataset
139    /// is published, so there is no instant at which the node both
140    /// claims to be caught up and still serves the old data to a
141    /// causal reader. Returns the previously-installed dataset (the
142    /// old `Arc`) so the caller can keep or drop it; outstanding
143    /// readers that already cloned it stay valid regardless.
144    pub fn complete_rebootstrap(&self, fresh: D) -> Arc<D> {
145        let new = Arc::new(fresh);
146        let old = {
147            let mut guard = self.current.write().unwrap_or_else(|e| e.into_inner());
148            std::mem::replace(&mut *guard, new)
149        };
150        // Publish the new dataset before re-enabling causal reads.
151        self.rebootstrapping.store(false, Ordering::Release);
152        old
153    }
154}
155
156#[cfg(test)]
157mod tests {
158    use super::*;
159
160    #[test]
161    fn serves_noncausal_reads_from_old_data_during_rebuild() {
162        let db = SwapDb::new(vec![1, 2, 3]);
163        db.begin_rebootstrap();
164        assert!(db.is_rebootstrapping());
165        // Old data stays readable for non-causal reads.
166        assert_eq!(*db.read_noncausal(), vec![1, 2, 3]);
167    }
168
169    #[test]
170    fn refuses_causal_reads_during_rebuild() {
171        let db = SwapDb::new(vec![1, 2, 3]);
172        assert!(db.read_causal().is_ok());
173        db.begin_rebootstrap();
174        assert_eq!(db.read_causal(), Err(RebootstrapInProgress));
175    }
176
177    #[test]
178    fn swap_replaces_data_and_resumes_causal_reads() {
179        let db = SwapDb::new(vec![1, 2, 3]);
180        db.begin_rebootstrap();
181        let old = db.complete_rebootstrap(vec![9, 9, 9, 9]);
182        assert_eq!(*old, vec![1, 2, 3]);
183        assert!(!db.is_rebootstrapping());
184        // New data is now served on both paths.
185        assert_eq!(*db.read_noncausal(), vec![9, 9, 9, 9]);
186        assert_eq!(*db.read_causal().expect("causal ok"), vec![9, 9, 9, 9]);
187    }
188
189    #[test]
190    fn swap_is_atomic_old_reader_keeps_complete_old_dataset() {
191        let db = SwapDb::new(vec![1, 2, 3]);
192        // Capture the dataset before the swap.
193        let pre = db.read_noncausal();
194        db.begin_rebootstrap();
195        db.complete_rebootstrap(vec![7, 8]);
196        // The pre-swap handle still observes the *whole* old dataset —
197        // never a torn/half-built view.
198        assert_eq!(*pre, vec![1, 2, 3]);
199        // A fresh read sees the new dataset.
200        assert_eq!(*db.read_noncausal(), vec![7, 8]);
201    }
202
203    #[test]
204    fn begin_rebootstrap_is_idempotent() {
205        let db = SwapDb::new(0u64);
206        db.begin_rebootstrap();
207        db.begin_rebootstrap();
208        assert!(db.is_rebootstrapping());
209        db.complete_rebootstrap(42);
210        assert!(!db.is_rebootstrapping());
211        assert_eq!(*db.snapshot(), 42);
212    }
213
214    #[test]
215    fn rebuild_then_swap_cycle_can_repeat() {
216        let db = SwapDb::new(1u32);
217        for n in 2..=5 {
218            db.begin_rebootstrap();
219            assert!(db.read_causal().is_err());
220            db.complete_rebootstrap(n);
221            assert_eq!(*db.read_causal().expect("ok"), n);
222        }
223    }
224}