use crate::serde_json::{self, Value as JsonValue};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FenceBoundary {
Apply,
Handshake,
}
impl FenceBoundary {
pub fn as_str(self) -> &'static str {
match self {
Self::Apply => "apply",
Self::Handshake => "handshake",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct StaleTermFenced {
pub boundary: FenceBoundary,
pub incoming_term: u64,
pub current_term: u64,
}
impl std::fmt::Display for StaleTermFenced {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"fenced stale-term {} message: incoming term {} is behind current term {}",
self.boundary.as_str(),
self.incoming_term,
self.current_term
)
}
}
impl std::error::Error for StaleTermFenced {}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FenceVerdict {
Admit { term: u64 },
Adopt { new_term: u64 },
Fenced(StaleTermFenced),
}
impl FenceVerdict {
pub fn is_admitted(&self) -> bool {
matches!(self, Self::Admit { .. } | Self::Adopt { .. })
}
pub fn is_fenced(&self) -> bool {
matches!(self, Self::Fenced(_))
}
}
#[derive(Debug)]
pub enum TermStoreError {
Io(std::io::Error),
InvalidFormat(String),
}
impl std::fmt::Display for TermStoreError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Io(err) => write!(f, "term store io error: {err}"),
Self::InvalidFormat(msg) => write!(f, "invalid term store format: {msg}"),
}
}
}
impl std::error::Error for TermStoreError {}
pub trait TermStore {
fn load(&self) -> Result<u64, TermStoreError>;
fn persist(&self, term: u64) -> Result<(), TermStoreError>;
}
#[derive(Debug)]
pub struct MemoryTermStore {
inner: std::sync::Mutex<u64>,
}
impl Default for MemoryTermStore {
fn default() -> Self {
Self::new()
}
}
impl MemoryTermStore {
pub fn new() -> Self {
Self {
inner: std::sync::Mutex::new(crate::replication::DEFAULT_REPLICATION_TERM),
}
}
pub fn seeded(term: u64) -> Self {
Self {
inner: std::sync::Mutex::new(term),
}
}
}
impl TermStore for MemoryTermStore {
fn load(&self) -> Result<u64, TermStoreError> {
Ok(*self.inner.lock().expect("term store mutex"))
}
fn persist(&self, term: u64) -> Result<(), TermStoreError> {
*self.inner.lock().expect("term store mutex") = term;
Ok(())
}
}
pub struct FileTermStore {
path: std::path::PathBuf,
}
impl FileTermStore {
pub fn new(path: impl Into<std::path::PathBuf>) -> Self {
Self { path: path.into() }
}
}
impl TermStore for FileTermStore {
fn load(&self) -> Result<u64, TermStoreError> {
match std::fs::read(&self.path) {
Ok(bytes) => {
let json: JsonValue = serde_json::from_slice(&bytes)
.map_err(|err| TermStoreError::InvalidFormat(format!("parse: {err}")))?;
json.get("term")
.and_then(JsonValue::as_u64)
.ok_or_else(|| TermStoreError::InvalidFormat("missing term".into()))
}
Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
Ok(crate::replication::DEFAULT_REPLICATION_TERM)
}
Err(err) => Err(TermStoreError::Io(err)),
}
}
fn persist(&self, term: u64) -> Result<(), TermStoreError> {
let mut obj = serde_json::Map::new();
obj.insert("term".to_string(), JsonValue::Number(term as f64));
let bytes = serde_json::to_vec(&JsonValue::Object(obj))
.map_err(|err| TermStoreError::InvalidFormat(format!("serialize: {err}")))?;
if let Some(parent) = self.path.parent() {
std::fs::create_dir_all(parent).map_err(TermStoreError::Io)?;
}
let tmp = self.path.with_extension("term.tmp");
std::fs::write(&tmp, &bytes).map_err(TermStoreError::Io)?;
if let Ok(f) = std::fs::File::open(&tmp) {
let _ = f.sync_all();
}
std::fs::rename(&tmp, &self.path).map_err(TermStoreError::Io)?;
if let Some(parent) = self.path.parent() {
if let Ok(dir) = std::fs::File::open(parent) {
let _ = dir.sync_all();
}
}
Ok(())
}
}
pub struct TermFence<S: TermStore> {
store: S,
}
impl<S: TermStore> TermFence<S> {
pub fn new(store: S) -> Self {
Self { store }
}
pub fn current_term(&self) -> Result<u64, TermStoreError> {
self.store.load()
}
pub fn classify(
&self,
boundary: FenceBoundary,
incoming_term: u64,
) -> Result<FenceVerdict, TermStoreError> {
let current = self.store.load()?;
Ok(if incoming_term < current {
FenceVerdict::Fenced(StaleTermFenced {
boundary,
incoming_term,
current_term: current,
})
} else if incoming_term > current {
FenceVerdict::Adopt {
new_term: incoming_term,
}
} else {
FenceVerdict::Admit { term: current }
})
}
pub fn admit_record(&self, incoming_term: u64) -> Result<FenceVerdict, TermStoreError> {
self.admit(FenceBoundary::Apply, incoming_term)
}
pub fn admit_handshake(&self, incoming_term: u64) -> Result<FenceVerdict, TermStoreError> {
self.admit(FenceBoundary::Handshake, incoming_term)
}
fn admit(
&self,
boundary: FenceBoundary,
incoming_term: u64,
) -> Result<FenceVerdict, TermStoreError> {
let verdict = self.classify(boundary, incoming_term)?;
if let FenceVerdict::Adopt { new_term } = verdict {
self.store.persist(new_term)?;
}
Ok(verdict)
}
pub fn adopt(&self, new_term: u64) -> Result<(), TermStoreError> {
let current = self.store.load()?;
if new_term > current {
self.store.persist(new_term)?;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
fn fence(term: u64) -> TermFence<MemoryTermStore> {
TermFence::new(MemoryTermStore::seeded(term))
}
#[test]
fn apply_boundary_fences_stale_term() {
let f = fence(5);
let verdict = f.admit_record(4).unwrap();
assert_eq!(
verdict,
FenceVerdict::Fenced(StaleTermFenced {
boundary: FenceBoundary::Apply,
incoming_term: 4,
current_term: 5,
})
);
assert!(verdict.is_fenced());
assert_eq!(f.current_term().unwrap(), 5);
}
#[test]
fn apply_boundary_admits_current_term() {
let f = fence(5);
assert_eq!(f.admit_record(5).unwrap(), FenceVerdict::Admit { term: 5 });
assert_eq!(f.current_term().unwrap(), 5);
}
#[test]
fn apply_boundary_adopts_higher_term_durably() {
let f = fence(5);
assert_eq!(
f.admit_record(8).unwrap(),
FenceVerdict::Adopt { new_term: 8 }
);
assert_eq!(f.current_term().unwrap(), 8);
assert!(f.admit_record(5).unwrap().is_fenced());
}
#[test]
fn handshake_boundary_fences_stale_term() {
let f = fence(7);
let verdict = f.admit_handshake(6).unwrap();
assert_eq!(
verdict,
FenceVerdict::Fenced(StaleTermFenced {
boundary: FenceBoundary::Handshake,
incoming_term: 6,
current_term: 7,
})
);
assert!(verdict.is_fenced());
}
#[test]
fn handshake_boundary_admits_current_and_adopts_higher() {
let f = fence(7);
assert_eq!(
f.admit_handshake(7).unwrap(),
FenceVerdict::Admit { term: 7 }
);
assert_eq!(
f.admit_handshake(9).unwrap(),
FenceVerdict::Adopt { new_term: 9 }
);
assert_eq!(f.current_term().unwrap(), 9);
}
#[test]
fn returning_ex_primary_is_fenced_until_it_adopts_new_term() {
let f = fence(5);
assert!(matches!(
f.admit_handshake(6).unwrap(),
FenceVerdict::Adopt { new_term: 6 }
));
assert!(f.admit_handshake(5).unwrap().is_fenced());
assert!(f.admit_record(5).unwrap().is_fenced());
f.adopt(6).unwrap();
assert!(f.admit_record(6).unwrap().is_admitted());
}
#[test]
fn classify_is_pure_and_does_not_adopt() {
let f = fence(3);
assert_eq!(
f.classify(FenceBoundary::Apply, 9).unwrap(),
FenceVerdict::Adopt { new_term: 9 }
);
assert_eq!(f.current_term().unwrap(), 3, "classify must not mutate");
}
#[test]
fn adopt_never_moves_term_backwards() {
let f = fence(10);
f.adopt(4).unwrap();
assert_eq!(f.current_term().unwrap(), 10);
f.adopt(12).unwrap();
assert_eq!(f.current_term().unwrap(), 12);
}
#[test]
fn file_term_store_round_trips_and_defaults() {
let path = std::env::temp_dir().join(format!(
"reddb-term-fence-{}-{}.json",
std::process::id(),
crate::utils::now_unix_nanos()
));
let _ = std::fs::remove_file(&path);
let store = FileTermStore::new(&path);
assert_eq!(
store.load().unwrap(),
crate::replication::DEFAULT_REPLICATION_TERM
);
{
let fence = TermFence::new(FileTermStore::new(&path));
assert!(matches!(
fence.admit_handshake(6).unwrap(),
FenceVerdict::Adopt { new_term: 6 }
));
}
let reopened = TermFence::new(FileTermStore::new(&path));
assert_eq!(reopened.current_term().unwrap(), 6);
assert!(reopened.admit_record(5).unwrap().is_fenced());
let _ = std::fs::remove_file(&path);
}
}