use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use crate::catalog::ClusterCatalog;
use crate::error::Result;
static LOCAL_CLUSTER_EPOCH: AtomicU64 = AtomicU64::new(0);
pub fn current_local_cluster_epoch() -> u64 {
LOCAL_CLUSTER_EPOCH.load(Ordering::Acquire)
}
pub fn set_local_cluster_epoch(value: u64) {
LOCAL_CLUSTER_EPOCH.store(value, Ordering::Release);
}
pub fn observe_peer_cluster_epoch(peer_epoch: u64) -> u64 {
let prev = LOCAL_CLUSTER_EPOCH.fetch_max(peer_epoch, Ordering::AcqRel);
prev.max(peer_epoch)
}
pub fn bump_local_cluster_epoch(catalog: &ClusterCatalog) -> Result<u64> {
let new_epoch = LOCAL_CLUSTER_EPOCH.fetch_add(1, Ordering::AcqRel) + 1;
catalog.save_cluster_epoch(new_epoch)?;
Ok(new_epoch)
}
pub fn init_local_cluster_epoch_from_catalog(catalog: &Arc<ClusterCatalog>) -> Result<u64> {
let persisted = catalog.load_cluster_epoch()?.unwrap_or(0);
let prev = LOCAL_CLUSTER_EPOCH.fetch_max(persisted, Ordering::AcqRel);
Ok(prev.max(persisted))
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Mutex;
static TEST_LOCK: Mutex<()> = Mutex::new(());
fn reset() -> std::sync::MutexGuard<'static, ()> {
let g = TEST_LOCK.lock().unwrap_or_else(|p| p.into_inner());
LOCAL_CLUSTER_EPOCH.store(0, Ordering::Release);
g
}
#[test]
fn observe_is_monotonic_max() {
let _g = reset();
assert_eq!(observe_peer_cluster_epoch(5), 5);
assert_eq!(current_local_cluster_epoch(), 5);
assert_eq!(observe_peer_cluster_epoch(3), 5);
assert_eq!(current_local_cluster_epoch(), 5);
assert_eq!(observe_peer_cluster_epoch(7), 7);
assert_eq!(current_local_cluster_epoch(), 7);
}
#[test]
fn set_overrides_for_init() {
let _g = reset();
set_local_cluster_epoch(42);
assert_eq!(current_local_cluster_epoch(), 42);
}
#[test]
fn observe_zero_is_noop() {
let _g = reset();
set_local_cluster_epoch(9);
assert_eq!(observe_peer_cluster_epoch(0), 9);
assert_eq!(current_local_cluster_epoch(), 9);
}
#[test]
fn bump_increments_and_persists() {
let _g = reset();
let dir = tempfile::tempdir().unwrap();
let catalog = ClusterCatalog::open(&dir.path().join("cluster.redb")).unwrap();
set_local_cluster_epoch(10);
let new_epoch = bump_local_cluster_epoch(&catalog).unwrap();
assert_eq!(new_epoch, 11);
assert_eq!(current_local_cluster_epoch(), 11);
assert_eq!(catalog.load_cluster_epoch().unwrap(), Some(11));
}
#[test]
fn init_from_catalog_loads_persisted_value() {
let _g = reset();
let dir = tempfile::tempdir().unwrap();
let catalog = Arc::new(ClusterCatalog::open(&dir.path().join("cluster.redb")).unwrap());
catalog.save_cluster_epoch(123).unwrap();
let v = init_local_cluster_epoch_from_catalog(&catalog).unwrap();
assert_eq!(v, 123);
assert_eq!(current_local_cluster_epoch(), 123);
}
#[test]
fn init_with_no_persisted_value_starts_at_zero() {
let _g = reset();
let dir = tempfile::tempdir().unwrap();
let catalog = Arc::new(ClusterCatalog::open(&dir.path().join("cluster.redb")).unwrap());
let v = init_local_cluster_epoch_from_catalog(&catalog).unwrap();
assert_eq!(v, 0);
assert_eq!(current_local_cluster_epoch(), 0);
}
}