use std::time::Duration;
use crate::error::{RepError, Result};
#[derive(Debug, Clone, PartialEq, Default)]
pub enum ConsistencyPolicy {
#[default]
NoConsistency,
TimeConsistency {
max_lag: Duration,
timeout: Duration,
},
CommitPointConsistency {
vlsn: i64,
timeout: Duration,
},
}
impl ConsistencyPolicy {
pub fn check_consistency(
&self,
current_vlsn: i64,
master_vlsn: i64,
) -> Result<bool> {
match self {
ConsistencyPolicy::NoConsistency => Ok(true),
ConsistencyPolicy::TimeConsistency { max_lag, .. } => {
let lag_vlsns = master_vlsn.saturating_sub(current_vlsn);
if lag_vlsns < 0 {
return Ok(true);
}
let lag_ms = lag_vlsns as u64;
let limit_ms = max_lag.as_millis() as u64;
if lag_ms <= limit_ms {
Ok(true)
} else {
Err(RepError::ReplicaLagExceeded { lag_ms, limit_ms })
}
}
ConsistencyPolicy::CommitPointConsistency { vlsn, .. } => {
if current_vlsn >= *vlsn {
Ok(true)
} else {
Err(RepError::ConsistencyTimeout(
self.timeout().unwrap_or(Duration::ZERO),
))
}
}
}
}
pub fn timeout(&self) -> Option<Duration> {
match self {
ConsistencyPolicy::NoConsistency => None,
ConsistencyPolicy::TimeConsistency { timeout, .. } => {
Some(*timeout)
}
ConsistencyPolicy::CommitPointConsistency { timeout, .. } => {
Some(*timeout)
}
}
}
}
impl std::fmt::Display for ConsistencyPolicy {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ConsistencyPolicy::NoConsistency => write!(f, "NoConsistency"),
ConsistencyPolicy::TimeConsistency { max_lag, timeout } => {
write!(
f,
"TimeConsistency(max_lag={:?}, timeout={:?})",
max_lag, timeout
)
}
ConsistencyPolicy::CommitPointConsistency { vlsn, timeout } => {
write!(
f,
"CommitPointConsistency(vlsn={}, timeout={:?})",
vlsn, timeout
)
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_no_consistency_always_passes() {
let policy = ConsistencyPolicy::NoConsistency;
assert!(policy.check_consistency(0, 1000).unwrap());
assert!(policy.check_consistency(1000, 1000).unwrap());
assert!(policy.check_consistency(1000, 0).unwrap());
}
#[test]
fn test_no_consistency_timeout_is_none() {
let policy = ConsistencyPolicy::NoConsistency;
assert!(policy.timeout().is_none());
}
#[test]
fn test_time_consistency_within_lag() {
let policy = ConsistencyPolicy::TimeConsistency {
max_lag: Duration::from_millis(100),
timeout: Duration::from_secs(5),
};
assert!(policy.check_consistency(950, 1000).unwrap());
}
#[test]
fn test_time_consistency_at_limit() {
let policy = ConsistencyPolicy::TimeConsistency {
max_lag: Duration::from_millis(100),
timeout: Duration::from_secs(5),
};
assert!(policy.check_consistency(900, 1000).unwrap());
}
#[test]
fn test_time_consistency_exceeds_lag() {
let policy = ConsistencyPolicy::TimeConsistency {
max_lag: Duration::from_millis(100),
timeout: Duration::from_secs(5),
};
let result = policy.check_consistency(800, 1000);
assert!(result.is_err());
match result.unwrap_err() {
RepError::ReplicaLagExceeded { lag_ms, limit_ms } => {
assert_eq!(lag_ms, 200);
assert_eq!(limit_ms, 100);
}
other => panic!("unexpected error: {:?}", other),
}
}
#[test]
fn test_time_consistency_replica_ahead() {
let policy = ConsistencyPolicy::TimeConsistency {
max_lag: Duration::from_millis(100),
timeout: Duration::from_secs(5),
};
assert!(policy.check_consistency(1000, 500).unwrap());
}
#[test]
fn test_time_consistency_timeout() {
let policy = ConsistencyPolicy::TimeConsistency {
max_lag: Duration::from_millis(100),
timeout: Duration::from_secs(5),
};
assert_eq!(policy.timeout(), Some(Duration::from_secs(5)));
}
#[test]
fn test_commit_point_satisfied() {
let policy = ConsistencyPolicy::CommitPointConsistency {
vlsn: 500,
timeout: Duration::from_secs(10),
};
assert!(policy.check_consistency(500, 1000).unwrap());
assert!(policy.check_consistency(600, 1000).unwrap());
}
#[test]
fn test_commit_point_not_satisfied() {
let policy = ConsistencyPolicy::CommitPointConsistency {
vlsn: 500,
timeout: Duration::from_secs(10),
};
let result = policy.check_consistency(400, 1000);
assert!(result.is_err());
match result.unwrap_err() {
RepError::ConsistencyTimeout(d) => {
assert_eq!(d, Duration::from_secs(10));
}
other => panic!("unexpected error: {:?}", other),
}
}
#[test]
fn test_commit_point_timeout() {
let policy = ConsistencyPolicy::CommitPointConsistency {
vlsn: 100,
timeout: Duration::from_secs(10),
};
assert_eq!(policy.timeout(), Some(Duration::from_secs(10)));
}
#[test]
fn test_default_is_no_consistency() {
assert_eq!(
ConsistencyPolicy::default(),
ConsistencyPolicy::NoConsistency
);
}
#[test]
fn test_display_no_consistency() {
assert_eq!(
ConsistencyPolicy::NoConsistency.to_string(),
"NoConsistency"
);
}
#[test]
fn test_display_time_consistency() {
let policy = ConsistencyPolicy::TimeConsistency {
max_lag: Duration::from_millis(500),
timeout: Duration::from_secs(10),
};
let s = policy.to_string();
assert!(s.contains("TimeConsistency"));
assert!(s.contains("500ms"));
}
#[test]
fn test_display_commit_point() {
let policy = ConsistencyPolicy::CommitPointConsistency {
vlsn: 42,
timeout: Duration::from_secs(5),
};
let s = policy.to_string();
assert!(s.contains("CommitPointConsistency"));
assert!(s.contains("42"));
}
#[test]
fn test_clone_and_eq() {
let policy = ConsistencyPolicy::TimeConsistency {
max_lag: Duration::from_millis(100),
timeout: Duration::from_secs(5),
};
let cloned = policy.clone();
assert_eq!(policy, cloned);
}
}