reddb_server/replication/
fence.rs1pub use reddb_file::FileTermStore;
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
39pub enum FenceBoundary {
40 Apply,
42 Handshake,
44}
45
46impl FenceBoundary {
47 pub fn as_str(self) -> &'static str {
48 match self {
49 Self::Apply => "apply",
50 Self::Handshake => "handshake",
51 }
52 }
53}
54
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
58pub struct StaleTermFenced {
59 pub boundary: FenceBoundary,
60 pub incoming_term: u64,
61 pub current_term: u64,
62}
63
64impl std::fmt::Display for StaleTermFenced {
65 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66 write!(
67 f,
68 "fenced stale-term {} message: incoming term {} is behind current term {}",
69 self.boundary.as_str(),
70 self.incoming_term,
71 self.current_term
72 )
73 }
74}
75
76impl std::error::Error for StaleTermFenced {}
77
78#[derive(Debug, Clone, Copy, PartialEq, Eq)]
80pub enum FenceVerdict {
81 Admit { term: u64 },
83 Adopt { new_term: u64 },
86 Fenced(StaleTermFenced),
88}
89
90impl FenceVerdict {
91 pub fn is_admitted(&self) -> bool {
94 matches!(self, Self::Admit { .. } | Self::Adopt { .. })
95 }
96
97 pub fn is_fenced(&self) -> bool {
99 matches!(self, Self::Fenced(_))
100 }
101}
102
103#[derive(Debug)]
105pub enum TermStoreError {
106 Io(std::io::Error),
107 InvalidFormat(String),
108}
109
110impl std::fmt::Display for TermStoreError {
111 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
112 match self {
113 Self::Io(err) => write!(f, "term store io error: {err}"),
114 Self::InvalidFormat(msg) => write!(f, "invalid term store format: {msg}"),
115 }
116 }
117}
118
119impl std::error::Error for TermStoreError {}
120
121impl From<reddb_file::RdbFileError> for TermStoreError {
122 fn from(value: reddb_file::RdbFileError) -> Self {
123 match value {
124 reddb_file::RdbFileError::Io(err) => Self::Io(err),
125 reddb_file::RdbFileError::InvalidOperation(msg) => Self::InvalidFormat(msg),
126 }
127 }
128}
129
130pub trait TermStore {
135 fn load(&self) -> Result<u64, TermStoreError>;
136 fn persist(&self, term: u64) -> Result<(), TermStoreError>;
137}
138
139#[derive(Debug)]
141pub struct MemoryTermStore {
142 inner: std::sync::Mutex<u64>,
143}
144
145impl Default for MemoryTermStore {
146 fn default() -> Self {
147 Self::new()
148 }
149}
150
151impl MemoryTermStore {
152 pub fn new() -> Self {
153 Self {
154 inner: std::sync::Mutex::new(crate::replication::DEFAULT_REPLICATION_TERM),
155 }
156 }
157
158 pub fn seeded(term: u64) -> Self {
161 Self {
162 inner: std::sync::Mutex::new(term),
163 }
164 }
165}
166
167impl TermStore for MemoryTermStore {
168 fn load(&self) -> Result<u64, TermStoreError> {
169 Ok(*self.inner.lock().expect("term store mutex"))
170 }
171
172 fn persist(&self, term: u64) -> Result<(), TermStoreError> {
173 *self.inner.lock().expect("term store mutex") = term;
174 Ok(())
175 }
176}
177
178impl TermStore for FileTermStore {
179 fn load(&self) -> Result<u64, TermStoreError> {
180 self.load_file().map_err(TermStoreError::from)
181 }
182
183 fn persist(&self, term: u64) -> Result<(), TermStoreError> {
184 self.persist_file(term).map_err(TermStoreError::from)
185 }
186}
187
188pub struct TermFence<S: TermStore> {
191 store: S,
192}
193
194impl<S: TermStore> TermFence<S> {
195 pub fn new(store: S) -> Self {
196 Self { store }
197 }
198
199 pub fn current_term(&self) -> Result<u64, TermStoreError> {
201 self.store.load()
202 }
203
204 pub fn classify(
208 &self,
209 boundary: FenceBoundary,
210 incoming_term: u64,
211 ) -> Result<FenceVerdict, TermStoreError> {
212 let current = self.store.load()?;
213 Ok(if incoming_term < current {
214 FenceVerdict::Fenced(StaleTermFenced {
215 boundary,
216 incoming_term,
217 current_term: current,
218 })
219 } else if incoming_term > current {
220 FenceVerdict::Adopt {
221 new_term: incoming_term,
222 }
223 } else {
224 FenceVerdict::Admit { term: current }
225 })
226 }
227
228 pub fn admit_record(&self, incoming_term: u64) -> Result<FenceVerdict, TermStoreError> {
231 self.admit(FenceBoundary::Apply, incoming_term)
232 }
233
234 pub fn admit_handshake(&self, incoming_term: u64) -> Result<FenceVerdict, TermStoreError> {
237 self.admit(FenceBoundary::Handshake, incoming_term)
238 }
239
240 fn admit(
241 &self,
242 boundary: FenceBoundary,
243 incoming_term: u64,
244 ) -> Result<FenceVerdict, TermStoreError> {
245 let verdict = self.classify(boundary, incoming_term)?;
246 if let FenceVerdict::Adopt { new_term } = verdict {
247 self.store.persist(new_term)?;
250 }
251 Ok(verdict)
252 }
253
254 pub fn adopt(&self, new_term: u64) -> Result<(), TermStoreError> {
259 let current = self.store.load()?;
260 if new_term > current {
261 self.store.persist(new_term)?;
262 }
263 Ok(())
264 }
265}
266
267#[cfg(test)]
268mod tests {
269 use super::*;
270
271 fn fence(term: u64) -> TermFence<MemoryTermStore> {
272 TermFence::new(MemoryTermStore::seeded(term))
273 }
274
275 #[test]
278 fn apply_boundary_fences_stale_term() {
279 let f = fence(5);
280 let verdict = f.admit_record(4).unwrap();
281 assert_eq!(
282 verdict,
283 FenceVerdict::Fenced(StaleTermFenced {
284 boundary: FenceBoundary::Apply,
285 incoming_term: 4,
286 current_term: 5,
287 })
288 );
289 assert!(verdict.is_fenced());
290 assert_eq!(f.current_term().unwrap(), 5);
292 }
293
294 #[test]
295 fn apply_boundary_admits_current_term() {
296 let f = fence(5);
297 assert_eq!(f.admit_record(5).unwrap(), FenceVerdict::Admit { term: 5 });
298 assert_eq!(f.current_term().unwrap(), 5);
299 }
300
301 #[test]
302 fn apply_boundary_adopts_higher_term_durably() {
303 let f = fence(5);
304 assert_eq!(
305 f.admit_record(8).unwrap(),
306 FenceVerdict::Adopt { new_term: 8 }
307 );
308 assert_eq!(f.current_term().unwrap(), 8);
310 assert!(f.admit_record(5).unwrap().is_fenced());
311 }
312
313 #[test]
316 fn handshake_boundary_fences_stale_term() {
317 let f = fence(7);
318 let verdict = f.admit_handshake(6).unwrap();
319 assert_eq!(
320 verdict,
321 FenceVerdict::Fenced(StaleTermFenced {
322 boundary: FenceBoundary::Handshake,
323 incoming_term: 6,
324 current_term: 7,
325 })
326 );
327 assert!(verdict.is_fenced());
328 }
329
330 #[test]
331 fn handshake_boundary_admits_current_and_adopts_higher() {
332 let f = fence(7);
333 assert_eq!(
334 f.admit_handshake(7).unwrap(),
335 FenceVerdict::Admit { term: 7 }
336 );
337 assert_eq!(
338 f.admit_handshake(9).unwrap(),
339 FenceVerdict::Adopt { new_term: 9 }
340 );
341 assert_eq!(f.current_term().unwrap(), 9);
342 }
343
344 #[test]
347 fn returning_ex_primary_is_fenced_until_it_adopts_new_term() {
348 let f = fence(5);
350 assert!(matches!(
351 f.admit_handshake(6).unwrap(),
352 FenceVerdict::Adopt { new_term: 6 }
353 ));
354
355 assert!(f.admit_handshake(5).unwrap().is_fenced());
358 assert!(f.admit_record(5).unwrap().is_fenced());
359
360 f.adopt(6).unwrap();
363 assert!(f.admit_record(6).unwrap().is_admitted());
364 }
365
366 #[test]
367 fn classify_is_pure_and_does_not_adopt() {
368 let f = fence(3);
369 assert_eq!(
371 f.classify(FenceBoundary::Apply, 9).unwrap(),
372 FenceVerdict::Adopt { new_term: 9 }
373 );
374 assert_eq!(f.current_term().unwrap(), 3, "classify must not mutate");
375 }
376
377 #[test]
378 fn adopt_never_moves_term_backwards() {
379 let f = fence(10);
380 f.adopt(4).unwrap();
381 assert_eq!(f.current_term().unwrap(), 10);
382 f.adopt(12).unwrap();
383 assert_eq!(f.current_term().unwrap(), 12);
384 }
385
386 #[test]
389 fn file_term_store_round_trips_and_defaults() {
390 let path = std::env::temp_dir().join(format!(
391 "reddb-term-fence-{}-{}.json",
392 std::process::id(),
393 crate::utils::now_unix_nanos()
394 ));
395 let _ = std::fs::remove_file(&path);
396
397 let store = FileTermStore::new(&path);
399 assert_eq!(
400 store.load().unwrap(),
401 crate::replication::DEFAULT_REPLICATION_TERM
402 );
403
404 {
407 let fence = TermFence::new(FileTermStore::new(&path));
408 assert!(matches!(
409 fence.admit_handshake(6).unwrap(),
410 FenceVerdict::Adopt { new_term: 6 }
411 ));
412 }
413 let reopened = TermFence::new(FileTermStore::new(&path));
414 assert_eq!(reopened.current_term().unwrap(), 6);
415 assert!(reopened.admit_record(5).unwrap().is_fenced());
416
417 let _ = std::fs::remove_file(&path);
418 }
419}