use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant};
use noxu_sync::{Condvar, Mutex};
use crate::error::{RepError, Result};
#[derive(Debug, Clone, PartialEq, Default)]
pub enum ConsistencyPolicy {
#[default]
NoConsistency,
TimeConsistency {
max_lag: Duration,
timeout: Duration,
},
CommitPointConsistency {
vlsn: i64,
timeout: Duration,
},
}
impl ConsistencyPolicy {
pub fn commit_point(token: &crate::CommitToken, timeout: Duration) -> Self {
ConsistencyPolicy::CommitPointConsistency {
vlsn: token.vlsn() as i64,
timeout,
}
}
pub fn check_consistency(
&self,
current_vlsn: i64,
master_vlsn: i64,
) -> Result<bool> {
match self {
ConsistencyPolicy::NoConsistency => Ok(true),
ConsistencyPolicy::TimeConsistency { max_lag, .. } => {
let lag_vlsns = master_vlsn.saturating_sub(current_vlsn);
if lag_vlsns < 0 {
return Ok(true);
}
let lag_ms = lag_vlsns as u64;
let limit_ms = max_lag.as_millis() as u64;
if lag_ms <= limit_ms {
Ok(true)
} else {
Err(RepError::ReplicaLagExceeded { lag_ms, limit_ms })
}
}
ConsistencyPolicy::CommitPointConsistency { vlsn, .. } => {
if current_vlsn >= *vlsn {
Ok(true)
} else {
Err(RepError::ConsistencyTimeout(
self.timeout().unwrap_or(Duration::ZERO),
))
}
}
}
}
pub fn timeout(&self) -> Option<Duration> {
match self {
ConsistencyPolicy::NoConsistency => None,
ConsistencyPolicy::TimeConsistency { timeout, .. } => {
Some(*timeout)
}
ConsistencyPolicy::CommitPointConsistency { timeout, .. } => {
Some(*timeout)
}
}
}
}
impl std::fmt::Display for ConsistencyPolicy {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ConsistencyPolicy::NoConsistency => write!(f, "NoConsistency"),
ConsistencyPolicy::TimeConsistency { max_lag, timeout } => {
write!(
f,
"TimeConsistency(max_lag={:?}, timeout={:?})",
max_lag, timeout
)
}
ConsistencyPolicy::CommitPointConsistency { vlsn, timeout } => {
write!(
f,
"CommitPointConsistency(vlsn={}, timeout={:?})",
vlsn, timeout
)
}
}
}
}
#[derive(Clone)]
pub struct ConsistencyTracker {
last_applied_vlsn: Arc<AtomicU64>,
master_vlsn: Arc<AtomicU64>,
signal: Arc<(Mutex<()>, Condvar)>,
}
impl ConsistencyTracker {
const RECHECK_TICK: Duration = Duration::from_millis(5);
pub fn new(last_applied_vlsn: Arc<AtomicU64>) -> Self {
Self {
last_applied_vlsn,
master_vlsn: Arc::new(AtomicU64::new(0)),
signal: Arc::new((Mutex::new(()), Condvar::new())),
}
}
pub fn last_applied_vlsn(&self) -> u64 {
self.last_applied_vlsn.load(Ordering::Acquire)
}
pub fn set_master_vlsn(&self, vlsn: u64) {
self.master_vlsn.fetch_max(vlsn, Ordering::AcqRel);
}
pub fn master_vlsn(&self) -> u64 {
self.master_vlsn.load(Ordering::Acquire)
}
pub fn notify_applied(&self) {
let (_lock, cv) = &*self.signal;
cv.notify_all();
}
pub fn await_consistency(&self, policy: &ConsistencyPolicy) -> Result<()> {
let target_vlsn = match policy {
ConsistencyPolicy::NoConsistency => return Ok(()),
ConsistencyPolicy::CommitPointConsistency { vlsn, .. } => {
*vlsn as u64
}
ConsistencyPolicy::TimeConsistency { max_lag, .. } => {
let master = self.master_vlsn();
let slack = max_lag.as_millis() as u64;
master.saturating_sub(slack)
}
};
if self.last_applied_vlsn() >= target_vlsn {
return Ok(());
}
let timeout = policy.timeout().unwrap_or(Duration::ZERO);
let deadline = Instant::now() + timeout;
let (lock, cv) = &*self.signal;
let mut guard = lock.lock();
loop {
if self.last_applied_vlsn() >= target_vlsn {
return Ok(());
}
let now = Instant::now();
if now >= deadline {
return Err(self.timeout_error(policy, target_vlsn));
}
let remaining = deadline - now;
let wait = remaining.min(Self::RECHECK_TICK);
let _ = cv.wait_for(&mut guard, wait);
}
}
fn timeout_error(
&self,
policy: &ConsistencyPolicy,
target_vlsn: u64,
) -> RepError {
match policy {
ConsistencyPolicy::TimeConsistency { max_lag, .. } => {
let lag_ms =
self.master_vlsn().saturating_sub(self.last_applied_vlsn());
RepError::ReplicaLagExceeded {
lag_ms,
limit_ms: max_lag.as_millis() as u64,
}
}
_ => {
let _ = target_vlsn;
RepError::ConsistencyTimeout(
policy.timeout().unwrap_or(Duration::ZERO),
)
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_no_consistency_always_passes() {
let policy = ConsistencyPolicy::NoConsistency;
assert!(policy.check_consistency(0, 1000).unwrap());
assert!(policy.check_consistency(1000, 1000).unwrap());
assert!(policy.check_consistency(1000, 0).unwrap());
}
#[test]
fn test_no_consistency_timeout_is_none() {
let policy = ConsistencyPolicy::NoConsistency;
assert!(policy.timeout().is_none());
}
#[test]
fn test_time_consistency_within_lag() {
let policy = ConsistencyPolicy::TimeConsistency {
max_lag: Duration::from_millis(100),
timeout: Duration::from_secs(5),
};
assert!(policy.check_consistency(950, 1000).unwrap());
}
#[test]
fn test_time_consistency_at_limit() {
let policy = ConsistencyPolicy::TimeConsistency {
max_lag: Duration::from_millis(100),
timeout: Duration::from_secs(5),
};
assert!(policy.check_consistency(900, 1000).unwrap());
}
#[test]
fn test_time_consistency_exceeds_lag() {
let policy = ConsistencyPolicy::TimeConsistency {
max_lag: Duration::from_millis(100),
timeout: Duration::from_secs(5),
};
let result = policy.check_consistency(800, 1000);
assert!(result.is_err());
match result.unwrap_err() {
RepError::ReplicaLagExceeded { lag_ms, limit_ms } => {
assert_eq!(lag_ms, 200);
assert_eq!(limit_ms, 100);
}
other => panic!("unexpected error: {:?}", other),
}
}
#[test]
fn test_time_consistency_replica_ahead() {
let policy = ConsistencyPolicy::TimeConsistency {
max_lag: Duration::from_millis(100),
timeout: Duration::from_secs(5),
};
assert!(policy.check_consistency(1000, 500).unwrap());
}
#[test]
fn test_time_consistency_timeout() {
let policy = ConsistencyPolicy::TimeConsistency {
max_lag: Duration::from_millis(100),
timeout: Duration::from_secs(5),
};
assert_eq!(policy.timeout(), Some(Duration::from_secs(5)));
}
#[test]
fn test_commit_point_satisfied() {
let policy = ConsistencyPolicy::CommitPointConsistency {
vlsn: 500,
timeout: Duration::from_secs(10),
};
assert!(policy.check_consistency(500, 1000).unwrap());
assert!(policy.check_consistency(600, 1000).unwrap());
}
#[test]
fn test_commit_point_not_satisfied() {
let policy = ConsistencyPolicy::CommitPointConsistency {
vlsn: 500,
timeout: Duration::from_secs(10),
};
let result = policy.check_consistency(400, 1000);
assert!(result.is_err());
match result.unwrap_err() {
RepError::ConsistencyTimeout(d) => {
assert_eq!(d, Duration::from_secs(10));
}
other => panic!("unexpected error: {:?}", other),
}
}
#[test]
fn test_commit_point_timeout() {
let policy = ConsistencyPolicy::CommitPointConsistency {
vlsn: 100,
timeout: Duration::from_secs(10),
};
assert_eq!(policy.timeout(), Some(Duration::from_secs(10)));
}
#[test]
fn test_default_is_no_consistency() {
assert_eq!(
ConsistencyPolicy::default(),
ConsistencyPolicy::NoConsistency
);
}
#[test]
fn test_display_no_consistency() {
assert_eq!(
ConsistencyPolicy::NoConsistency.to_string(),
"NoConsistency"
);
}
#[test]
fn test_display_time_consistency() {
let policy = ConsistencyPolicy::TimeConsistency {
max_lag: Duration::from_millis(500),
timeout: Duration::from_secs(10),
};
let s = policy.to_string();
assert!(s.contains("TimeConsistency"));
assert!(s.contains("500ms"));
}
#[test]
fn test_display_commit_point() {
let policy = ConsistencyPolicy::CommitPointConsistency {
vlsn: 42,
timeout: Duration::from_secs(5),
};
let s = policy.to_string();
assert!(s.contains("CommitPointConsistency"));
assert!(s.contains("42"));
}
#[test]
fn test_clone_and_eq() {
let policy = ConsistencyPolicy::TimeConsistency {
max_lag: Duration::from_millis(100),
timeout: Duration::from_secs(5),
};
let cloned = policy.clone();
assert_eq!(policy, cloned);
}
#[test]
fn test_tracker_no_consistency_never_blocks() {
let applied = Arc::new(AtomicU64::new(0));
let tracker = ConsistencyTracker::new(applied);
tracker.set_master_vlsn(10_000);
tracker.await_consistency(&ConsistencyPolicy::NoConsistency).unwrap();
}
#[test]
fn test_tracker_commit_point_already_satisfied() {
let applied = Arc::new(AtomicU64::new(500));
let tracker = ConsistencyTracker::new(applied);
let policy = ConsistencyPolicy::CommitPointConsistency {
vlsn: 500,
timeout: Duration::from_secs(5),
};
tracker.await_consistency(&policy).unwrap();
}
#[test]
fn test_tracker_commit_point_blocks_then_satisfied() {
let applied = Arc::new(AtomicU64::new(0));
let tracker = ConsistencyTracker::new(Arc::clone(&applied));
let policy = ConsistencyPolicy::CommitPointConsistency {
vlsn: 7,
timeout: Duration::from_secs(5),
};
let tracker_bg = tracker.clone();
let applied_bg = Arc::clone(&applied);
let bg = std::thread::spawn(move || {
std::thread::sleep(Duration::from_millis(50));
applied_bg.store(7, Ordering::Release);
tracker_bg.notify_applied();
});
let start = Instant::now();
tracker.await_consistency(&policy).unwrap();
assert!(start.elapsed() >= Duration::from_millis(40));
assert!(applied.load(Ordering::Acquire) >= 7);
bg.join().unwrap();
}
#[test]
fn test_tracker_commit_point_times_out() {
let applied = Arc::new(AtomicU64::new(0));
let tracker = ConsistencyTracker::new(applied);
let policy = ConsistencyPolicy::CommitPointConsistency {
vlsn: 100,
timeout: Duration::from_millis(80),
};
let start = Instant::now();
let err = tracker.await_consistency(&policy).unwrap_err();
assert!(start.elapsed() < Duration::from_secs(2));
assert!(matches!(err, RepError::ConsistencyTimeout(_)));
}
#[test]
fn test_tracker_time_blocks_then_catches_up() {
let applied = Arc::new(AtomicU64::new(0));
let tracker = ConsistencyTracker::new(Arc::clone(&applied));
tracker.set_master_vlsn(1000);
let policy = ConsistencyPolicy::TimeConsistency {
max_lag: Duration::from_millis(100),
timeout: Duration::from_secs(5),
};
let tracker_bg = tracker.clone();
let applied_bg = Arc::clone(&applied);
let bg = std::thread::spawn(move || {
std::thread::sleep(Duration::from_millis(40));
applied_bg.store(920, Ordering::Release);
tracker_bg.notify_applied();
});
let start = Instant::now();
tracker.await_consistency(&policy).unwrap();
assert!(start.elapsed() >= Duration::from_millis(30));
bg.join().unwrap();
}
#[test]
fn test_tracker_time_times_out() {
let applied = Arc::new(AtomicU64::new(0));
let tracker = ConsistencyTracker::new(applied);
tracker.set_master_vlsn(1000);
let policy = ConsistencyPolicy::TimeConsistency {
max_lag: Duration::from_millis(10),
timeout: Duration::from_millis(80),
};
let start = Instant::now();
let err = tracker.await_consistency(&policy).unwrap_err();
assert!(start.elapsed() < Duration::from_secs(2));
assert!(matches!(err, RepError::ReplicaLagExceeded { .. }));
}
}