use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct RebootstrapInProgress;
impl std::fmt::Display for RebootstrapInProgress {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"node is re-bootstrapping; causal read must route to a caught-up peer"
)
}
}
impl std::error::Error for RebootstrapInProgress {}
pub struct SwapDb<D> {
current: RwLock<Arc<D>>,
rebootstrapping: AtomicBool,
}
impl<D> SwapDb<D> {
pub fn new(data: D) -> Self {
Self {
current: RwLock::new(Arc::new(data)),
rebootstrapping: AtomicBool::new(false),
}
}
pub fn is_rebootstrapping(&self) -> bool {
self.rebootstrapping.load(Ordering::Acquire)
}
pub fn snapshot(&self) -> Arc<D> {
Arc::clone(&self.current.read().unwrap_or_else(|e| e.into_inner()))
}
pub fn read_noncausal(&self) -> Arc<D> {
self.snapshot()
}
pub fn read_causal(&self) -> Result<Arc<D>, RebootstrapInProgress> {
if self.is_rebootstrapping() {
return Err(RebootstrapInProgress);
}
Ok(self.snapshot())
}
pub fn begin_rebootstrap(&self) {
self.rebootstrapping.store(true, Ordering::Release);
}
pub fn complete_rebootstrap(&self, fresh: D) -> Arc<D> {
let new = Arc::new(fresh);
let old = {
let mut guard = self.current.write().unwrap_or_else(|e| e.into_inner());
std::mem::replace(&mut *guard, new)
};
self.rebootstrapping.store(false, Ordering::Release);
old
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn serves_noncausal_reads_from_old_data_during_rebuild() {
let db = SwapDb::new(vec![1, 2, 3]);
db.begin_rebootstrap();
assert!(db.is_rebootstrapping());
assert_eq!(*db.read_noncausal(), vec![1, 2, 3]);
}
#[test]
fn refuses_causal_reads_during_rebuild() {
let db = SwapDb::new(vec![1, 2, 3]);
assert!(db.read_causal().is_ok());
db.begin_rebootstrap();
assert_eq!(db.read_causal(), Err(RebootstrapInProgress));
}
#[test]
fn swap_replaces_data_and_resumes_causal_reads() {
let db = SwapDb::new(vec![1, 2, 3]);
db.begin_rebootstrap();
let old = db.complete_rebootstrap(vec![9, 9, 9, 9]);
assert_eq!(*old, vec![1, 2, 3]);
assert!(!db.is_rebootstrapping());
assert_eq!(*db.read_noncausal(), vec![9, 9, 9, 9]);
assert_eq!(*db.read_causal().expect("causal ok"), vec![9, 9, 9, 9]);
}
#[test]
fn swap_is_atomic_old_reader_keeps_complete_old_dataset() {
let db = SwapDb::new(vec![1, 2, 3]);
let pre = db.read_noncausal();
db.begin_rebootstrap();
db.complete_rebootstrap(vec![7, 8]);
assert_eq!(*pre, vec![1, 2, 3]);
assert_eq!(*db.read_noncausal(), vec![7, 8]);
}
#[test]
fn begin_rebootstrap_is_idempotent() {
let db = SwapDb::new(0u64);
db.begin_rebootstrap();
db.begin_rebootstrap();
assert!(db.is_rebootstrapping());
db.complete_rebootstrap(42);
assert!(!db.is_rebootstrapping());
assert_eq!(*db.snapshot(), 42);
}
#[test]
fn rebuild_then_swap_cycle_can_repeat() {
let db = SwapDb::new(1u32);
for n in 2..=5 {
db.begin_rebootstrap();
assert!(db.read_causal().is_err());
db.complete_rebootstrap(n);
assert_eq!(*db.read_causal().expect("ok"), n);
}
}
}