reddb_server/replication/
fence.rs1pub use reddb_file::FileTermStore;
38
39#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42pub enum FenceBoundary {
43 Apply,
45 Handshake,
47 Lease,
49}
50
51impl FenceBoundary {
52 pub fn as_str(self) -> &'static str {
53 match self {
54 Self::Apply => "apply",
55 Self::Handshake => "handshake",
56 Self::Lease => "lease",
57 }
58 }
59}
60
61#[inline]
64pub fn term_is_stale(incoming_term: u64, current_term: u64) -> bool {
65 incoming_term < current_term
66}
67
68#[derive(Debug, Clone, PartialEq, Eq)]
70pub struct StreamHandshake {
71 pub peer_id: String,
72 pub term: u64,
73}
74
75impl StreamHandshake {
76 pub fn new(peer_id: impl Into<String>, term: u64) -> Self {
77 Self {
78 peer_id: peer_id.into(),
79 term,
80 }
81 }
82}
83
84#[derive(Debug, Clone, Copy, PartialEq, Eq)]
87pub struct StaleTermFenced {
88 pub boundary: FenceBoundary,
89 pub incoming_term: u64,
90 pub current_term: u64,
91}
92
93impl std::fmt::Display for StaleTermFenced {
94 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95 write!(
96 f,
97 "fenced stale-term {} message: incoming term {} is behind current term {}",
98 self.boundary.as_str(),
99 self.incoming_term,
100 self.current_term
101 )
102 }
103}
104
105impl std::error::Error for StaleTermFenced {}
106
107pub type StaleTermRejection = StaleTermFenced;
108
109#[derive(Debug, Clone, Copy, PartialEq, Eq)]
111pub enum FenceVerdict {
112 Admit { term: u64 },
114 Adopt { new_term: u64 },
117 Fenced(StaleTermFenced),
119}
120
121impl FenceVerdict {
122 pub fn is_admitted(&self) -> bool {
125 matches!(self, Self::Admit { .. } | Self::Adopt { .. })
126 }
127
128 pub fn is_fenced(&self) -> bool {
130 matches!(self, Self::Fenced(_))
131 }
132}
133
134#[derive(Debug)]
136pub enum TermStoreError {
137 Io(std::io::Error),
138 InvalidFormat(String),
139}
140
141impl std::fmt::Display for TermStoreError {
142 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
143 match self {
144 Self::Io(err) => write!(f, "term store io error: {err}"),
145 Self::InvalidFormat(msg) => write!(f, "invalid term store format: {msg}"),
146 }
147 }
148}
149
150impl std::error::Error for TermStoreError {}
151
152impl From<reddb_file::RdbFileError> for TermStoreError {
153 fn from(value: reddb_file::RdbFileError) -> Self {
154 match value {
155 reddb_file::RdbFileError::Io(err) => Self::Io(err),
156 reddb_file::RdbFileError::InvalidOperation(msg) => Self::InvalidFormat(msg),
157 }
158 }
159}
160
161pub trait TermStore {
166 fn load(&self) -> Result<u64, TermStoreError>;
167 fn persist(&self, term: u64) -> Result<(), TermStoreError>;
168}
169
170#[derive(Debug)]
172pub struct MemoryTermStore {
173 inner: std::sync::Mutex<u64>,
174}
175
176impl Default for MemoryTermStore {
177 fn default() -> Self {
178 Self::new()
179 }
180}
181
182impl MemoryTermStore {
183 pub fn new() -> Self {
184 Self {
185 inner: std::sync::Mutex::new(crate::replication::DEFAULT_REPLICATION_TERM),
186 }
187 }
188
189 pub fn seeded(term: u64) -> Self {
192 Self {
193 inner: std::sync::Mutex::new(term),
194 }
195 }
196}
197
198impl TermStore for MemoryTermStore {
199 fn load(&self) -> Result<u64, TermStoreError> {
200 Ok(*self.inner.lock().expect("term store mutex"))
201 }
202
203 fn persist(&self, term: u64) -> Result<(), TermStoreError> {
204 *self.inner.lock().expect("term store mutex") = term;
205 Ok(())
206 }
207}
208
209impl TermStore for FileTermStore {
210 fn load(&self) -> Result<u64, TermStoreError> {
211 self.load_file().map_err(TermStoreError::from)
212 }
213
214 fn persist(&self, term: u64) -> Result<(), TermStoreError> {
215 self.persist_file(term).map_err(TermStoreError::from)
216 }
217}
218
219pub struct TermFence<S: TermStore> {
222 store: S,
223}
224
225impl<S: TermStore> TermFence<S> {
226 pub fn new(store: S) -> Self {
227 Self { store }
228 }
229
230 pub fn current_term(&self) -> Result<u64, TermStoreError> {
232 self.store.load()
233 }
234
235 pub fn classify(
239 &self,
240 boundary: FenceBoundary,
241 incoming_term: u64,
242 ) -> Result<FenceVerdict, TermStoreError> {
243 let current = self.store.load()?;
244 Ok(if term_is_stale(incoming_term, current) {
245 FenceVerdict::Fenced(StaleTermFenced {
246 boundary,
247 incoming_term,
248 current_term: current,
249 })
250 } else if incoming_term > current {
251 FenceVerdict::Adopt {
252 new_term: incoming_term,
253 }
254 } else {
255 FenceVerdict::Admit { term: current }
256 })
257 }
258
259 pub fn admit_record(&self, incoming_term: u64) -> Result<FenceVerdict, TermStoreError> {
262 self.admit(FenceBoundary::Apply, incoming_term)
263 }
264
265 pub fn admit_handshake(&self, incoming_term: u64) -> Result<FenceVerdict, TermStoreError> {
268 self.admit(FenceBoundary::Handshake, incoming_term)
269 }
270
271 pub fn admit_stream_handshake(
273 &self,
274 handshake: &StreamHandshake,
275 ) -> Result<FenceVerdict, TermStoreError> {
276 self.admit_handshake(handshake.term)
277 }
278
279 pub fn admit_lease_write(&self, lease_term: u64) -> Result<FenceVerdict, TermStoreError> {
282 self.admit(FenceBoundary::Lease, lease_term)
283 }
284
285 fn admit(
286 &self,
287 boundary: FenceBoundary,
288 incoming_term: u64,
289 ) -> Result<FenceVerdict, TermStoreError> {
290 let verdict = self.classify(boundary, incoming_term)?;
291 if let FenceVerdict::Adopt { new_term } = verdict {
292 self.store.persist(new_term)?;
295 }
296 Ok(verdict)
297 }
298
299 pub fn adopt(&self, new_term: u64) -> Result<(), TermStoreError> {
304 let current = self.store.load()?;
305 if new_term > current {
306 self.store.persist(new_term)?;
307 }
308 Ok(())
309 }
310}
311
312#[cfg(test)]
313mod tests {
314 use super::*;
315
316 fn fence(term: u64) -> TermFence<MemoryTermStore> {
317 TermFence::new(MemoryTermStore::seeded(term))
318 }
319
320 #[test]
323 fn apply_boundary_fences_stale_term() {
324 let f = fence(5);
325 let verdict = f.admit_record(4).unwrap();
326 assert_eq!(
327 verdict,
328 FenceVerdict::Fenced(StaleTermFenced {
329 boundary: FenceBoundary::Apply,
330 incoming_term: 4,
331 current_term: 5,
332 })
333 );
334 assert!(verdict.is_fenced());
335 assert_eq!(f.current_term().unwrap(), 5);
337 }
338
339 #[test]
340 fn apply_boundary_admits_current_term() {
341 let f = fence(5);
342 assert_eq!(f.admit_record(5).unwrap(), FenceVerdict::Admit { term: 5 });
343 assert_eq!(f.current_term().unwrap(), 5);
344 }
345
346 #[test]
347 fn apply_boundary_adopts_higher_term_durably() {
348 let f = fence(5);
349 assert_eq!(
350 f.admit_record(8).unwrap(),
351 FenceVerdict::Adopt { new_term: 8 }
352 );
353 assert_eq!(f.current_term().unwrap(), 8);
355 assert!(f.admit_record(5).unwrap().is_fenced());
356 }
357
358 #[test]
361 fn handshake_boundary_fences_stale_term() {
362 let f = fence(7);
363 let verdict = f.admit_handshake(6).unwrap();
364 assert_eq!(
365 verdict,
366 FenceVerdict::Fenced(StaleTermFenced {
367 boundary: FenceBoundary::Handshake,
368 incoming_term: 6,
369 current_term: 7,
370 })
371 );
372 assert!(verdict.is_fenced());
373 }
374
375 #[test]
376 fn handshake_boundary_admits_current_and_adopts_higher() {
377 let f = fence(7);
378 assert_eq!(
379 f.admit_handshake(7).unwrap(),
380 FenceVerdict::Admit { term: 7 }
381 );
382 assert_eq!(
383 f.admit_handshake(9).unwrap(),
384 FenceVerdict::Adopt { new_term: 9 }
385 );
386 assert_eq!(f.current_term().unwrap(), 9);
387 }
388
389 #[test]
392 fn returning_ex_primary_is_fenced_until_it_adopts_new_term() {
393 let f = fence(5);
395 assert!(matches!(
396 f.admit_handshake(6).unwrap(),
397 FenceVerdict::Adopt { new_term: 6 }
398 ));
399
400 assert!(f.admit_handshake(5).unwrap().is_fenced());
403 assert!(f.admit_record(5).unwrap().is_fenced());
404
405 f.adopt(6).unwrap();
408 assert!(f.admit_record(6).unwrap().is_admitted());
409 }
410
411 #[test]
412 fn classify_is_pure_and_does_not_adopt() {
413 let f = fence(3);
414 assert_eq!(
416 f.classify(FenceBoundary::Apply, 9).unwrap(),
417 FenceVerdict::Adopt { new_term: 9 }
418 );
419 assert_eq!(f.current_term().unwrap(), 3, "classify must not mutate");
420 }
421
422 #[test]
423 fn adopt_never_moves_term_backwards() {
424 let f = fence(10);
425 f.adopt(4).unwrap();
426 assert_eq!(f.current_term().unwrap(), 10);
427 f.adopt(12).unwrap();
428 assert_eq!(f.current_term().unwrap(), 12);
429 }
430
431 #[test]
434 fn file_term_store_round_trips_and_defaults() {
435 let path = std::env::temp_dir().join(format!(
436 "reddb-term-fence-{}-{}.json",
437 std::process::id(),
438 crate::utils::now_unix_nanos()
439 ));
440 let _ = std::fs::remove_file(&path);
441
442 let store = FileTermStore::new(&path);
444 assert_eq!(
445 store.load().unwrap(),
446 crate::replication::DEFAULT_REPLICATION_TERM
447 );
448
449 {
452 let fence = TermFence::new(FileTermStore::new(&path));
453 assert!(matches!(
454 fence.admit_handshake(6).unwrap(),
455 FenceVerdict::Adopt { new_term: 6 }
456 ));
457 }
458 let reopened = TermFence::new(FileTermStore::new(&path));
459 assert_eq!(reopened.current_term().unwrap(), 6);
460 assert!(reopened.admit_record(5).unwrap().is_fenced());
461
462 let _ = std::fs::remove_file(&path);
463 }
464}