reddb_server/replication/swap_db.rs
1//! Stay-readable re-bootstrap with an atomic dataset swap (issue #837,
2//! PRD #819).
3//!
4//! When a replica must re-bootstrap — discard its current dataset and
5//! load a fresh snapshot from the primary — it must not go dark. Read
6//! capacity is most precious exactly then, because a re-bootstrap is
7//! often triggered *because* another node is already down. [`SwapDb`]
8//! keeps the old data fully readable for the entire rebuild and swaps
9//! to the fresh dataset in one atomic step at the end.
10//!
11//! ## The two-state guarantee
12//!
13//! * **Stay-readable.** Non-causal reads ([`SwapDb::read_noncausal`])
14//! are *always* served from the currently-installed dataset — the
15//! old data throughout the rebuild, the new data after the swap.
16//! They never block and never fail.
17//! * **Causal correctness.** While a rebuild is in flight the node's
18//! applied frontier describes data it is *about to throw away*, so a
19//! bookmark read served from it could observe a commit that the
20//! post-swap dataset has not yet reached. [`SwapDb::read_causal`]
21//! therefore refuses ([`RebootstrapInProgress`]) for the duration of
22//! the rebuild; the caller routes that read to a caught-up peer. The
23//! same signal is surfaced on the wire via
24//! [`crate::replication::primary::ReplicaState::rebootstrapping`] so
25//! the *client* routing table excludes the node before the read ever
26//! reaches it.
27//!
28//! ## Atomicity
29//!
30//! The installed dataset is an `Arc<D>` behind an `RwLock`. A reader
31//! clones the `Arc` under a short read lock and then works against its
32//! own handle. [`SwapDb::complete_rebootstrap`] takes the write lock
33//! just long enough to replace the pointer. A reader that captured the
34//! old `Arc` before the swap keeps observing a complete old dataset;
35//! a reader that captures after the swap sees a complete new one.
36//! There is no window in which a half-built dataset is visible — the
37//! swap publishes the fresh `D` only once it is fully constructed.
38
39use std::sync::atomic::{AtomicBool, Ordering};
40use std::sync::{Arc, RwLock};
41
42/// A causal read was requested while the node is re-bootstrapping.
43///
44/// The node is intentionally refusing to serve a bookmark read from a
45/// dataset it is about to discard. The caller is expected to route the
46/// read elsewhere (a caught-up peer, or the primary) — never to treat
47/// this as a hard error surfaced to the application.
48#[derive(Debug, Clone, Copy, PartialEq, Eq)]
49pub struct RebootstrapInProgress;
50
51impl std::fmt::Display for RebootstrapInProgress {
52 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
53 write!(
54 f,
55 "node is re-bootstrapping; causal read must route to a caught-up peer"
56 )
57 }
58}
59
60impl std::error::Error for RebootstrapInProgress {}
61
62/// A dataset that stays readable across an atomic re-bootstrap swap.
63///
64/// Generic over the installed dataset `D` so the replication engine,
65/// the integration tests, and any future caller share one swap
66/// discipline rather than re-implementing the lock dance. `D` is held
67/// behind an `Arc`, so a "swap" is a single pointer write and old
68/// readers keep their snapshot alive.
69pub struct SwapDb<D> {
70 /// The currently-installed dataset. Readers clone the `Arc`; the
71 /// rebuild replaces the pointer under the write lock.
72 current: RwLock<Arc<D>>,
73 /// `true` from [`Self::begin_rebootstrap`] until the matching
74 /// [`Self::complete_rebootstrap`]. Gates causal reads and is the
75 /// value mirrored into the topology advertisement.
76 rebootstrapping: AtomicBool,
77}
78
79impl<D> SwapDb<D> {
80 /// Install `data` as the initial dataset. The node starts *not*
81 /// re-bootstrapping — it is serving normally.
82 pub fn new(data: D) -> Self {
83 Self {
84 current: RwLock::new(Arc::new(data)),
85 rebootstrapping: AtomicBool::new(false),
86 }
87 }
88
89 /// `true` while a re-bootstrap is in flight. This is exactly the
90 /// value the topology advertiser surfaces as
91 /// `ReplicaInfo::rebootstrapping`.
92 pub fn is_rebootstrapping(&self) -> bool {
93 self.rebootstrapping.load(Ordering::Acquire)
94 }
95
96 /// The currently-installed dataset, cloned as an `Arc`. Always
97 /// available — this is the stay-readable path. During a rebuild it
98 /// returns the *old* data; after [`Self::complete_rebootstrap`] it
99 /// returns the new data.
100 pub fn snapshot(&self) -> Arc<D> {
101 Arc::clone(&self.current.read().unwrap_or_else(|e| e.into_inner()))
102 }
103
104 /// Serve a non-causal read: always the currently-installed
105 /// dataset, rebuild in flight or not. Never blocks on the rebuild,
106 /// never fails. Identical to [`Self::snapshot`]; named for intent
107 /// at the call site.
108 pub fn read_noncausal(&self) -> Arc<D> {
109 self.snapshot()
110 }
111
112 /// Serve a causal (bookmark) read.
113 ///
114 /// Returns the installed dataset only when the node is *not*
115 /// re-bootstrapping. While a rebuild is in flight it returns
116 /// [`RebootstrapInProgress`] so the caller bounces the read to a
117 /// caught-up peer — never serving a bookmark from data the node is
118 /// about to discard.
119 pub fn read_causal(&self) -> Result<Arc<D>, RebootstrapInProgress> {
120 if self.is_rebootstrapping() {
121 return Err(RebootstrapInProgress);
122 }
123 Ok(self.snapshot())
124 }
125
126 /// Enter the re-bootstrap state. Idempotent: calling it while
127 /// already rebuilding is a no-op. The installed dataset is left
128 /// untouched, so non-causal reads keep flowing from the old data
129 /// while the fresh snapshot loads in the background.
130 pub fn begin_rebootstrap(&self) {
131 self.rebootstrapping.store(true, Ordering::Release);
132 }
133
134 /// Atomically install `fresh` as the new dataset and leave the
135 /// re-bootstrap state.
136 ///
137 /// The pointer swap happens under the write lock; the
138 /// `rebootstrapping` flag is cleared only *after* the new dataset
139 /// is published, so there is no instant at which the node both
140 /// claims to be caught up and still serves the old data to a
141 /// causal reader. Returns the previously-installed dataset (the
142 /// old `Arc`) so the caller can keep or drop it; outstanding
143 /// readers that already cloned it stay valid regardless.
144 pub fn complete_rebootstrap(&self, fresh: D) -> Arc<D> {
145 let new = Arc::new(fresh);
146 let old = {
147 let mut guard = self.current.write().unwrap_or_else(|e| e.into_inner());
148 std::mem::replace(&mut *guard, new)
149 };
150 // Publish the new dataset before re-enabling causal reads.
151 self.rebootstrapping.store(false, Ordering::Release);
152 old
153 }
154}
155
156#[cfg(test)]
157mod tests {
158 use super::*;
159
160 #[test]
161 fn serves_noncausal_reads_from_old_data_during_rebuild() {
162 let db = SwapDb::new(vec![1, 2, 3]);
163 db.begin_rebootstrap();
164 assert!(db.is_rebootstrapping());
165 // Old data stays readable for non-causal reads.
166 assert_eq!(*db.read_noncausal(), vec![1, 2, 3]);
167 }
168
169 #[test]
170 fn refuses_causal_reads_during_rebuild() {
171 let db = SwapDb::new(vec![1, 2, 3]);
172 assert!(db.read_causal().is_ok());
173 db.begin_rebootstrap();
174 assert_eq!(db.read_causal(), Err(RebootstrapInProgress));
175 }
176
177 #[test]
178 fn swap_replaces_data_and_resumes_causal_reads() {
179 let db = SwapDb::new(vec![1, 2, 3]);
180 db.begin_rebootstrap();
181 let old = db.complete_rebootstrap(vec![9, 9, 9, 9]);
182 assert_eq!(*old, vec![1, 2, 3]);
183 assert!(!db.is_rebootstrapping());
184 // New data is now served on both paths.
185 assert_eq!(*db.read_noncausal(), vec![9, 9, 9, 9]);
186 assert_eq!(*db.read_causal().expect("causal ok"), vec![9, 9, 9, 9]);
187 }
188
189 #[test]
190 fn swap_is_atomic_old_reader_keeps_complete_old_dataset() {
191 let db = SwapDb::new(vec![1, 2, 3]);
192 // Capture the dataset before the swap.
193 let pre = db.read_noncausal();
194 db.begin_rebootstrap();
195 db.complete_rebootstrap(vec![7, 8]);
196 // The pre-swap handle still observes the *whole* old dataset —
197 // never a torn/half-built view.
198 assert_eq!(*pre, vec![1, 2, 3]);
199 // A fresh read sees the new dataset.
200 assert_eq!(*db.read_noncausal(), vec![7, 8]);
201 }
202
203 #[test]
204 fn begin_rebootstrap_is_idempotent() {
205 let db = SwapDb::new(0u64);
206 db.begin_rebootstrap();
207 db.begin_rebootstrap();
208 assert!(db.is_rebootstrapping());
209 db.complete_rebootstrap(42);
210 assert!(!db.is_rebootstrapping());
211 assert_eq!(*db.snapshot(), 42);
212 }
213
214 #[test]
215 fn rebuild_then_swap_cycle_can_repeat() {
216 let db = SwapDb::new(1u32);
217 for n in 2..=5 {
218 db.begin_rebootstrap();
219 assert!(db.read_causal().is_err());
220 db.complete_rebootstrap(n);
221 assert_eq!(*db.read_causal().expect("ok"), n);
222 }
223 }
224}