use std::sync::atomic::{AtomicU64, Ordering};
#[derive(Debug)]
pub struct LamportClock {
counter: AtomicU64,
}
impl LamportClock {
pub const fn new() -> Self {
Self { counter: AtomicU64::new(0) }
}
pub const fn with_value(initial_value: u64) -> Self {
Self { counter: AtomicU64::new(initial_value) }
}
pub fn tick(&self) -> u64 {
self.counter.fetch_add(1, Ordering::SeqCst)
}
pub fn sync(&self, remote_clock: u64) {
self.counter.fetch_max(remote_clock, Ordering::SeqCst);
self.counter.fetch_add(1, Ordering::SeqCst);
}
pub fn now(&self) -> u64 {
self.counter.load(Ordering::SeqCst)
}
#[cfg(test)]
pub(crate) fn reset(&self) {
self.counter.store(0, Ordering::SeqCst);
}
}
impl Default for LamportClock {
fn default() -> Self {
Self::new()
}
}
pub static GLOBAL_CLOCK: LamportClock = LamportClock::new();
pub fn init_from_env() {
if let Ok(clock_str) = std::env::var("RENACER_LOGICAL_CLOCK") {
if let Ok(parent_clock) = clock_str.parse::<u64>() {
GLOBAL_CLOCK.sync(parent_clock);
eprintln!("DEBUG: Initialized Lamport clock from parent: {parent_clock}");
}
}
}
pub fn propagate_to_env() {
let current_time = GLOBAL_CLOCK.now();
std::env::set_var("RENACER_LOGICAL_CLOCK", current_time.to_string());
}
#[cfg(test)]
mod tests {
use super::*;
use serial_test::serial;
#[test]
fn test_tick_increments() {
let clock = LamportClock::new();
assert_eq!(clock.tick(), 0);
assert_eq!(clock.tick(), 1);
assert_eq!(clock.tick(), 2);
assert_eq!(clock.now(), 3);
}
#[test]
fn test_sync_with_higher_remote_clock() {
let clock = LamportClock::new();
clock.tick();
clock.sync(10);
assert_eq!(clock.now(), 11);
}
#[test]
fn test_sync_with_lower_remote_clock() {
let clock = LamportClock::new();
for _ in 0..5 {
clock.tick(); }
clock.sync(2);
assert_eq!(clock.now(), 6);
}
#[test]
fn test_happens_before_ordering() {
let clock_a = LamportClock::new();
let clock_b = LamportClock::new();
let t1 = clock_a.tick(); let t2 = clock_a.tick();
clock_b.sync(t2); let t3 = clock_b.tick();
assert!(t1 < t2);
assert!(t2 < t3);
}
#[test]
fn test_with_value() {
let clock = LamportClock::with_value(100);
assert_eq!(clock.now(), 100);
assert_eq!(clock.tick(), 100);
assert_eq!(clock.now(), 101);
}
#[test]
fn test_reset() {
let clock = LamportClock::new();
clock.tick();
clock.tick();
assert_eq!(clock.now(), 2);
clock.reset();
assert_eq!(clock.now(), 0);
}
#[test]
fn test_thread_safety() {
use std::sync::Arc;
use std::thread;
let clock = Arc::new(LamportClock::new());
let mut handles = vec![];
for _ in 0..10 {
let clock_clone = clock.clone();
let handle = thread::spawn(move || {
for _ in 0..100 {
clock_clone.tick();
}
});
handles.push(handle);
}
for handle in handles {
handle.join().expect("test");
}
assert_eq!(clock.now(), 1000);
}
#[test]
fn test_env_propagation() {
let clock = LamportClock::new();
for _ in 0..42 {
clock.tick();
}
let current = clock.now();
std::env::set_var("TEST_RENACER_LOGICAL_CLOCK", current.to_string());
let clock_str = std::env::var("TEST_RENACER_LOGICAL_CLOCK").expect("test");
let parent_clock = clock_str.parse::<u64>().expect("test");
let child_clock = LamportClock::new();
child_clock.sync(parent_clock);
assert!(child_clock.now() > current);
assert_eq!(child_clock.now(), 43); }
#[test]
fn test_default_trait() {
let clock = LamportClock::default();
assert_eq!(clock.now(), 0);
assert_eq!(clock.tick(), 0);
assert_eq!(clock.now(), 1);
}
#[test]
#[serial]
fn test_propagate_to_env() {
GLOBAL_CLOCK.reset();
GLOBAL_CLOCK.tick();
GLOBAL_CLOCK.tick();
GLOBAL_CLOCK.tick();
propagate_to_env();
let env_val = std::env::var("RENACER_LOGICAL_CLOCK").expect("test");
assert_eq!(env_val, "3");
}
#[test]
#[serial]
fn test_init_from_env_with_valid_clock() {
GLOBAL_CLOCK.reset();
std::env::set_var("RENACER_LOGICAL_CLOCK", "50");
init_from_env();
assert_eq!(GLOBAL_CLOCK.now(), 51);
std::env::remove_var("RENACER_LOGICAL_CLOCK");
GLOBAL_CLOCK.reset();
}
#[test]
#[serial]
fn test_init_from_env_with_missing_env() {
GLOBAL_CLOCK.reset();
std::env::remove_var("RENACER_LOGICAL_CLOCK");
init_from_env();
assert_eq!(GLOBAL_CLOCK.now(), 0);
}
#[test]
#[serial]
fn test_init_from_env_with_invalid_value() {
GLOBAL_CLOCK.reset();
std::env::set_var("RENACER_LOGICAL_CLOCK", "not_a_number");
init_from_env();
assert_eq!(GLOBAL_CLOCK.now(), 0);
std::env::remove_var("RENACER_LOGICAL_CLOCK");
}
#[test]
#[serial]
fn test_global_clock_tick() {
GLOBAL_CLOCK.reset();
let t1 = GLOBAL_CLOCK.tick();
let t2 = GLOBAL_CLOCK.tick();
let t3 = GLOBAL_CLOCK.tick();
assert_eq!(t1, 0);
assert_eq!(t2, 1);
assert_eq!(t3, 2);
assert_eq!(GLOBAL_CLOCK.now(), 3);
GLOBAL_CLOCK.reset();
}
}