use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
#[derive(Debug, Clone, Default)]
pub struct CutoverHandle {
requested: Arc<AtomicBool>,
}
impl CutoverHandle {
pub fn new() -> Self {
Self::default()
}
pub fn request(&self) -> bool {
self.requested.swap(true, Ordering::SeqCst)
}
pub fn is_requested(&self) -> bool {
self.requested.load(Ordering::SeqCst)
}
}
#[derive(Debug, Clone, Copy)]
pub struct LagSampler {
threshold_bytes: u64,
caught_up: bool,
}
impl LagSampler {
pub fn new(threshold_bytes: u64) -> Self {
Self {
threshold_bytes,
caught_up: false,
}
}
pub fn is_caught_up(&self) -> bool {
self.caught_up
}
pub fn lag_bytes(source_lsn: u64, applied_lsn: u64) -> u64 {
source_lsn.saturating_sub(applied_lsn)
}
pub fn observe(&mut self, source_lsn: u64, applied_lsn: u64) -> Transition {
let lag = Self::lag_bytes(source_lsn, applied_lsn);
let now_caught_up = lag <= self.threshold_bytes;
let transition = match (self.caught_up, now_caught_up) {
(false, true) => Transition::JustCaughtUp { lag },
(true, false) => Transition::FellBehind { lag },
(true, true) => Transition::StillCaughtUp { lag },
(false, false) => Transition::StillBehind { lag },
};
self.caught_up = now_caught_up;
transition
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Transition {
JustCaughtUp { lag: u64 },
StillCaughtUp { lag: u64 },
FellBehind { lag: u64 },
StillBehind { lag: u64 },
}
impl Transition {
pub fn lag(&self) -> u64 {
match self {
Self::JustCaughtUp { lag }
| Self::StillCaughtUp { lag }
| Self::FellBehind { lag }
| Self::StillBehind { lag } => *lag,
}
}
pub fn just_caught_up(&self) -> bool {
matches!(self, Self::JustCaughtUp { .. })
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn handle_starts_unrequested() {
let h = CutoverHandle::new();
assert!(!h.is_requested());
}
#[test]
fn handle_request_is_idempotent() {
let h = CutoverHandle::new();
assert!(!h.request()); assert!(h.is_requested());
assert!(h.request()); }
#[test]
fn handle_clones_share_state() {
let h1 = CutoverHandle::new();
let h2 = h1.clone();
h1.request();
assert!(h2.is_requested());
}
#[test]
fn lag_bytes_clamps_to_zero_when_target_ahead() {
assert_eq!(LagSampler::lag_bytes(100, 200), 0);
assert_eq!(LagSampler::lag_bytes(200, 100), 100);
assert_eq!(LagSampler::lag_bytes(100, 100), 0);
}
#[test]
fn sampler_first_catch_up_emits_just_caught_up() {
let mut s = LagSampler::new(8);
let t = s.observe(1000, 500);
assert!(matches!(t, Transition::StillBehind { lag: 500 }));
assert!(!s.is_caught_up());
let t = s.observe(1000, 995);
assert!(matches!(t, Transition::JustCaughtUp { lag: 5 }));
assert!(s.is_caught_up());
}
#[test]
fn sampler_does_not_re_emit_when_still_caught_up() {
let mut s = LagSampler::new(8);
s.observe(1000, 500);
s.observe(1000, 1000); let t = s.observe(1010, 1005);
assert!(matches!(t, Transition::StillCaughtUp { lag: 5 }));
}
#[test]
fn sampler_emits_fell_behind_on_lag_spike() {
let mut s = LagSampler::new(8);
s.observe(1000, 1000); let t = s.observe(2000, 1000); assert!(matches!(t, Transition::FellBehind { lag: 1000 }));
assert!(!s.is_caught_up());
}
#[test]
fn transition_lag_accessor() {
assert_eq!(Transition::JustCaughtUp { lag: 7 }.lag(), 7);
assert_eq!(Transition::StillBehind { lag: 99 }.lag(), 99);
assert!(Transition::JustCaughtUp { lag: 0 }.just_caught_up());
assert!(!Transition::FellBehind { lag: 0 }.just_caught_up());
}
#[test]
fn threshold_inclusive_boundary_counts_as_caught_up() {
let mut s = LagSampler::new(8);
let t = s.observe(108, 100);
assert!(matches!(t, Transition::JustCaughtUp { lag: 8 }));
}
}