pub use reddb_file::FileTermStore;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FenceBoundary {
Apply,
Handshake,
}
impl FenceBoundary {
pub fn as_str(self) -> &'static str {
match self {
Self::Apply => "apply",
Self::Handshake => "handshake",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct StaleTermFenced {
pub boundary: FenceBoundary,
pub incoming_term: u64,
pub current_term: u64,
}
impl std::fmt::Display for StaleTermFenced {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"fenced stale-term {} message: incoming term {} is behind current term {}",
self.boundary.as_str(),
self.incoming_term,
self.current_term
)
}
}
impl std::error::Error for StaleTermFenced {}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FenceVerdict {
Admit { term: u64 },
Adopt { new_term: u64 },
Fenced(StaleTermFenced),
}
impl FenceVerdict {
pub fn is_admitted(&self) -> bool {
matches!(self, Self::Admit { .. } | Self::Adopt { .. })
}
pub fn is_fenced(&self) -> bool {
matches!(self, Self::Fenced(_))
}
}
#[derive(Debug)]
pub enum TermStoreError {
Io(std::io::Error),
InvalidFormat(String),
}
impl std::fmt::Display for TermStoreError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Io(err) => write!(f, "term store io error: {err}"),
Self::InvalidFormat(msg) => write!(f, "invalid term store format: {msg}"),
}
}
}
impl std::error::Error for TermStoreError {}
impl From<reddb_file::RdbFileError> for TermStoreError {
fn from(value: reddb_file::RdbFileError) -> Self {
match value {
reddb_file::RdbFileError::Io(err) => Self::Io(err),
reddb_file::RdbFileError::InvalidOperation(msg) => Self::InvalidFormat(msg),
}
}
}
pub trait TermStore {
fn load(&self) -> Result<u64, TermStoreError>;
fn persist(&self, term: u64) -> Result<(), TermStoreError>;
}
#[derive(Debug)]
pub struct MemoryTermStore {
inner: std::sync::Mutex<u64>,
}
impl Default for MemoryTermStore {
fn default() -> Self {
Self::new()
}
}
impl MemoryTermStore {
pub fn new() -> Self {
Self {
inner: std::sync::Mutex::new(crate::replication::DEFAULT_REPLICATION_TERM),
}
}
pub fn seeded(term: u64) -> Self {
Self {
inner: std::sync::Mutex::new(term),
}
}
}
impl TermStore for MemoryTermStore {
fn load(&self) -> Result<u64, TermStoreError> {
Ok(*self.inner.lock().expect("term store mutex"))
}
fn persist(&self, term: u64) -> Result<(), TermStoreError> {
*self.inner.lock().expect("term store mutex") = term;
Ok(())
}
}
impl TermStore for FileTermStore {
fn load(&self) -> Result<u64, TermStoreError> {
self.load_file().map_err(TermStoreError::from)
}
fn persist(&self, term: u64) -> Result<(), TermStoreError> {
self.persist_file(term).map_err(TermStoreError::from)
}
}
pub struct TermFence<S: TermStore> {
store: S,
}
impl<S: TermStore> TermFence<S> {
pub fn new(store: S) -> Self {
Self { store }
}
pub fn current_term(&self) -> Result<u64, TermStoreError> {
self.store.load()
}
pub fn classify(
&self,
boundary: FenceBoundary,
incoming_term: u64,
) -> Result<FenceVerdict, TermStoreError> {
let current = self.store.load()?;
Ok(if incoming_term < current {
FenceVerdict::Fenced(StaleTermFenced {
boundary,
incoming_term,
current_term: current,
})
} else if incoming_term > current {
FenceVerdict::Adopt {
new_term: incoming_term,
}
} else {
FenceVerdict::Admit { term: current }
})
}
pub fn admit_record(&self, incoming_term: u64) -> Result<FenceVerdict, TermStoreError> {
self.admit(FenceBoundary::Apply, incoming_term)
}
pub fn admit_handshake(&self, incoming_term: u64) -> Result<FenceVerdict, TermStoreError> {
self.admit(FenceBoundary::Handshake, incoming_term)
}
fn admit(
&self,
boundary: FenceBoundary,
incoming_term: u64,
) -> Result<FenceVerdict, TermStoreError> {
let verdict = self.classify(boundary, incoming_term)?;
if let FenceVerdict::Adopt { new_term } = verdict {
self.store.persist(new_term)?;
}
Ok(verdict)
}
pub fn adopt(&self, new_term: u64) -> Result<(), TermStoreError> {
let current = self.store.load()?;
if new_term > current {
self.store.persist(new_term)?;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
fn fence(term: u64) -> TermFence<MemoryTermStore> {
TermFence::new(MemoryTermStore::seeded(term))
}
#[test]
fn apply_boundary_fences_stale_term() {
let f = fence(5);
let verdict = f.admit_record(4).unwrap();
assert_eq!(
verdict,
FenceVerdict::Fenced(StaleTermFenced {
boundary: FenceBoundary::Apply,
incoming_term: 4,
current_term: 5,
})
);
assert!(verdict.is_fenced());
assert_eq!(f.current_term().unwrap(), 5);
}
#[test]
fn apply_boundary_admits_current_term() {
let f = fence(5);
assert_eq!(f.admit_record(5).unwrap(), FenceVerdict::Admit { term: 5 });
assert_eq!(f.current_term().unwrap(), 5);
}
#[test]
fn apply_boundary_adopts_higher_term_durably() {
let f = fence(5);
assert_eq!(
f.admit_record(8).unwrap(),
FenceVerdict::Adopt { new_term: 8 }
);
assert_eq!(f.current_term().unwrap(), 8);
assert!(f.admit_record(5).unwrap().is_fenced());
}
#[test]
fn handshake_boundary_fences_stale_term() {
let f = fence(7);
let verdict = f.admit_handshake(6).unwrap();
assert_eq!(
verdict,
FenceVerdict::Fenced(StaleTermFenced {
boundary: FenceBoundary::Handshake,
incoming_term: 6,
current_term: 7,
})
);
assert!(verdict.is_fenced());
}
#[test]
fn handshake_boundary_admits_current_and_adopts_higher() {
let f = fence(7);
assert_eq!(
f.admit_handshake(7).unwrap(),
FenceVerdict::Admit { term: 7 }
);
assert_eq!(
f.admit_handshake(9).unwrap(),
FenceVerdict::Adopt { new_term: 9 }
);
assert_eq!(f.current_term().unwrap(), 9);
}
#[test]
fn returning_ex_primary_is_fenced_until_it_adopts_new_term() {
let f = fence(5);
assert!(matches!(
f.admit_handshake(6).unwrap(),
FenceVerdict::Adopt { new_term: 6 }
));
assert!(f.admit_handshake(5).unwrap().is_fenced());
assert!(f.admit_record(5).unwrap().is_fenced());
f.adopt(6).unwrap();
assert!(f.admit_record(6).unwrap().is_admitted());
}
#[test]
fn classify_is_pure_and_does_not_adopt() {
let f = fence(3);
assert_eq!(
f.classify(FenceBoundary::Apply, 9).unwrap(),
FenceVerdict::Adopt { new_term: 9 }
);
assert_eq!(f.current_term().unwrap(), 3, "classify must not mutate");
}
#[test]
fn adopt_never_moves_term_backwards() {
let f = fence(10);
f.adopt(4).unwrap();
assert_eq!(f.current_term().unwrap(), 10);
f.adopt(12).unwrap();
assert_eq!(f.current_term().unwrap(), 12);
}
#[test]
fn file_term_store_round_trips_and_defaults() {
let path = std::env::temp_dir().join(format!(
"reddb-term-fence-{}-{}.json",
std::process::id(),
crate::utils::now_unix_nanos()
));
let _ = std::fs::remove_file(&path);
let store = FileTermStore::new(&path);
assert_eq!(
store.load().unwrap(),
crate::replication::DEFAULT_REPLICATION_TERM
);
{
let fence = TermFence::new(FileTermStore::new(&path));
assert!(matches!(
fence.admit_handshake(6).unwrap(),
FenceVerdict::Adopt { new_term: 6 }
));
}
let reopened = TermFence::new(FileTermStore::new(&path));
assert_eq!(reopened.current_term().unwrap(), 6);
assert!(reopened.admit_record(5).unwrap().is_fenced());
let _ = std::fs::remove_file(&path);
}
}