reddb_server/replication/
fence.rs1use crate::serde_json::{self, Value as JsonValue};
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
39pub enum FenceBoundary {
40 Apply,
42 Handshake,
44}
45
46impl FenceBoundary {
47 pub fn as_str(self) -> &'static str {
48 match self {
49 Self::Apply => "apply",
50 Self::Handshake => "handshake",
51 }
52 }
53}
54
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
58pub struct StaleTermFenced {
59 pub boundary: FenceBoundary,
60 pub incoming_term: u64,
61 pub current_term: u64,
62}
63
64impl std::fmt::Display for StaleTermFenced {
65 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66 write!(
67 f,
68 "fenced stale-term {} message: incoming term {} is behind current term {}",
69 self.boundary.as_str(),
70 self.incoming_term,
71 self.current_term
72 )
73 }
74}
75
76impl std::error::Error for StaleTermFenced {}
77
78#[derive(Debug, Clone, Copy, PartialEq, Eq)]
80pub enum FenceVerdict {
81 Admit { term: u64 },
83 Adopt { new_term: u64 },
86 Fenced(StaleTermFenced),
88}
89
90impl FenceVerdict {
91 pub fn is_admitted(&self) -> bool {
94 matches!(self, Self::Admit { .. } | Self::Adopt { .. })
95 }
96
97 pub fn is_fenced(&self) -> bool {
99 matches!(self, Self::Fenced(_))
100 }
101}
102
103#[derive(Debug)]
105pub enum TermStoreError {
106 Io(std::io::Error),
107 InvalidFormat(String),
108}
109
110impl std::fmt::Display for TermStoreError {
111 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
112 match self {
113 Self::Io(err) => write!(f, "term store io error: {err}"),
114 Self::InvalidFormat(msg) => write!(f, "invalid term store format: {msg}"),
115 }
116 }
117}
118
119impl std::error::Error for TermStoreError {}
120
121pub trait TermStore {
126 fn load(&self) -> Result<u64, TermStoreError>;
127 fn persist(&self, term: u64) -> Result<(), TermStoreError>;
128}
129
130#[derive(Debug)]
132pub struct MemoryTermStore {
133 inner: std::sync::Mutex<u64>,
134}
135
136impl Default for MemoryTermStore {
137 fn default() -> Self {
138 Self::new()
139 }
140}
141
142impl MemoryTermStore {
143 pub fn new() -> Self {
144 Self {
145 inner: std::sync::Mutex::new(crate::replication::DEFAULT_REPLICATION_TERM),
146 }
147 }
148
149 pub fn seeded(term: u64) -> Self {
152 Self {
153 inner: std::sync::Mutex::new(term),
154 }
155 }
156}
157
158impl TermStore for MemoryTermStore {
159 fn load(&self) -> Result<u64, TermStoreError> {
160 Ok(*self.inner.lock().expect("term store mutex"))
161 }
162
163 fn persist(&self, term: u64) -> Result<(), TermStoreError> {
164 *self.inner.lock().expect("term store mutex") = term;
165 Ok(())
166 }
167}
168
169pub struct FileTermStore {
174 path: std::path::PathBuf,
175}
176
177impl FileTermStore {
178 pub fn new(path: impl Into<std::path::PathBuf>) -> Self {
179 Self { path: path.into() }
180 }
181}
182
183impl TermStore for FileTermStore {
184 fn load(&self) -> Result<u64, TermStoreError> {
185 match std::fs::read(&self.path) {
186 Ok(bytes) => {
187 let json: JsonValue = serde_json::from_slice(&bytes)
188 .map_err(|err| TermStoreError::InvalidFormat(format!("parse: {err}")))?;
189 json.get("term")
190 .and_then(JsonValue::as_u64)
191 .ok_or_else(|| TermStoreError::InvalidFormat("missing term".into()))
192 }
193 Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
194 Ok(crate::replication::DEFAULT_REPLICATION_TERM)
195 }
196 Err(err) => Err(TermStoreError::Io(err)),
197 }
198 }
199
200 fn persist(&self, term: u64) -> Result<(), TermStoreError> {
201 let mut obj = serde_json::Map::new();
202 obj.insert("term".to_string(), JsonValue::Number(term as f64));
203 let bytes = serde_json::to_vec(&JsonValue::Object(obj))
204 .map_err(|err| TermStoreError::InvalidFormat(format!("serialize: {err}")))?;
205 if let Some(parent) = self.path.parent() {
206 std::fs::create_dir_all(parent).map_err(TermStoreError::Io)?;
207 }
208 let tmp = self.path.with_extension("term.tmp");
209 std::fs::write(&tmp, &bytes).map_err(TermStoreError::Io)?;
210 if let Ok(f) = std::fs::File::open(&tmp) {
211 let _ = f.sync_all();
212 }
213 std::fs::rename(&tmp, &self.path).map_err(TermStoreError::Io)?;
214 if let Some(parent) = self.path.parent() {
218 if let Ok(dir) = std::fs::File::open(parent) {
219 let _ = dir.sync_all();
220 }
221 }
222 Ok(())
223 }
224}
225
226pub struct TermFence<S: TermStore> {
229 store: S,
230}
231
232impl<S: TermStore> TermFence<S> {
233 pub fn new(store: S) -> Self {
234 Self { store }
235 }
236
237 pub fn current_term(&self) -> Result<u64, TermStoreError> {
239 self.store.load()
240 }
241
242 pub fn classify(
246 &self,
247 boundary: FenceBoundary,
248 incoming_term: u64,
249 ) -> Result<FenceVerdict, TermStoreError> {
250 let current = self.store.load()?;
251 Ok(if incoming_term < current {
252 FenceVerdict::Fenced(StaleTermFenced {
253 boundary,
254 incoming_term,
255 current_term: current,
256 })
257 } else if incoming_term > current {
258 FenceVerdict::Adopt {
259 new_term: incoming_term,
260 }
261 } else {
262 FenceVerdict::Admit { term: current }
263 })
264 }
265
266 pub fn admit_record(&self, incoming_term: u64) -> Result<FenceVerdict, TermStoreError> {
269 self.admit(FenceBoundary::Apply, incoming_term)
270 }
271
272 pub fn admit_handshake(&self, incoming_term: u64) -> Result<FenceVerdict, TermStoreError> {
275 self.admit(FenceBoundary::Handshake, incoming_term)
276 }
277
278 fn admit(
279 &self,
280 boundary: FenceBoundary,
281 incoming_term: u64,
282 ) -> Result<FenceVerdict, TermStoreError> {
283 let verdict = self.classify(boundary, incoming_term)?;
284 if let FenceVerdict::Adopt { new_term } = verdict {
285 self.store.persist(new_term)?;
288 }
289 Ok(verdict)
290 }
291
292 pub fn adopt(&self, new_term: u64) -> Result<(), TermStoreError> {
297 let current = self.store.load()?;
298 if new_term > current {
299 self.store.persist(new_term)?;
300 }
301 Ok(())
302 }
303}
304
305#[cfg(test)]
306mod tests {
307 use super::*;
308
309 fn fence(term: u64) -> TermFence<MemoryTermStore> {
310 TermFence::new(MemoryTermStore::seeded(term))
311 }
312
313 #[test]
316 fn apply_boundary_fences_stale_term() {
317 let f = fence(5);
318 let verdict = f.admit_record(4).unwrap();
319 assert_eq!(
320 verdict,
321 FenceVerdict::Fenced(StaleTermFenced {
322 boundary: FenceBoundary::Apply,
323 incoming_term: 4,
324 current_term: 5,
325 })
326 );
327 assert!(verdict.is_fenced());
328 assert_eq!(f.current_term().unwrap(), 5);
330 }
331
332 #[test]
333 fn apply_boundary_admits_current_term() {
334 let f = fence(5);
335 assert_eq!(f.admit_record(5).unwrap(), FenceVerdict::Admit { term: 5 });
336 assert_eq!(f.current_term().unwrap(), 5);
337 }
338
339 #[test]
340 fn apply_boundary_adopts_higher_term_durably() {
341 let f = fence(5);
342 assert_eq!(
343 f.admit_record(8).unwrap(),
344 FenceVerdict::Adopt { new_term: 8 }
345 );
346 assert_eq!(f.current_term().unwrap(), 8);
348 assert!(f.admit_record(5).unwrap().is_fenced());
349 }
350
351 #[test]
354 fn handshake_boundary_fences_stale_term() {
355 let f = fence(7);
356 let verdict = f.admit_handshake(6).unwrap();
357 assert_eq!(
358 verdict,
359 FenceVerdict::Fenced(StaleTermFenced {
360 boundary: FenceBoundary::Handshake,
361 incoming_term: 6,
362 current_term: 7,
363 })
364 );
365 assert!(verdict.is_fenced());
366 }
367
368 #[test]
369 fn handshake_boundary_admits_current_and_adopts_higher() {
370 let f = fence(7);
371 assert_eq!(
372 f.admit_handshake(7).unwrap(),
373 FenceVerdict::Admit { term: 7 }
374 );
375 assert_eq!(
376 f.admit_handshake(9).unwrap(),
377 FenceVerdict::Adopt { new_term: 9 }
378 );
379 assert_eq!(f.current_term().unwrap(), 9);
380 }
381
382 #[test]
385 fn returning_ex_primary_is_fenced_until_it_adopts_new_term() {
386 let f = fence(5);
388 assert!(matches!(
389 f.admit_handshake(6).unwrap(),
390 FenceVerdict::Adopt { new_term: 6 }
391 ));
392
393 assert!(f.admit_handshake(5).unwrap().is_fenced());
396 assert!(f.admit_record(5).unwrap().is_fenced());
397
398 f.adopt(6).unwrap();
401 assert!(f.admit_record(6).unwrap().is_admitted());
402 }
403
404 #[test]
405 fn classify_is_pure_and_does_not_adopt() {
406 let f = fence(3);
407 assert_eq!(
409 f.classify(FenceBoundary::Apply, 9).unwrap(),
410 FenceVerdict::Adopt { new_term: 9 }
411 );
412 assert_eq!(f.current_term().unwrap(), 3, "classify must not mutate");
413 }
414
415 #[test]
416 fn adopt_never_moves_term_backwards() {
417 let f = fence(10);
418 f.adopt(4).unwrap();
419 assert_eq!(f.current_term().unwrap(), 10);
420 f.adopt(12).unwrap();
421 assert_eq!(f.current_term().unwrap(), 12);
422 }
423
424 #[test]
427 fn file_term_store_round_trips_and_defaults() {
428 let path = std::env::temp_dir().join(format!(
429 "reddb-term-fence-{}-{}.json",
430 std::process::id(),
431 crate::utils::now_unix_nanos()
432 ));
433 let _ = std::fs::remove_file(&path);
434
435 let store = FileTermStore::new(&path);
437 assert_eq!(
438 store.load().unwrap(),
439 crate::replication::DEFAULT_REPLICATION_TERM
440 );
441
442 {
445 let fence = TermFence::new(FileTermStore::new(&path));
446 assert!(matches!(
447 fence.admit_handshake(6).unwrap(),
448 FenceVerdict::Adopt { new_term: 6 }
449 ));
450 }
451 let reopened = TermFence::new(FileTermStore::new(&path));
452 assert_eq!(reopened.current_term().unwrap(), 6);
453 assert!(reopened.admit_record(5).unwrap().is_fenced());
454
455 let _ = std::fs::remove_file(&path);
456 }
457}