use crate::remote::NodeId;
use crate::time::{TimeSource, TimerDriverHandle, WallClock};
use crate::types::Time;
use parking_lot::Mutex;
use std::collections::BTreeMap;
use std::fmt;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
pub trait LogicalClock: Send + Sync {
type Time: Clone + PartialOrd + Send + Sync + 'static;
#[must_use]
fn tick(&self) -> Self::Time;
#[must_use]
fn receive(&self, sender_time: &Self::Time) -> Self::Time;
#[must_use]
fn now(&self) -> Self::Time;
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct LamportTime(u64);
impl LamportTime {
#[must_use]
pub const fn raw(self) -> u64 {
self.0
}
#[must_use]
pub const fn from_raw(value: u64) -> Self {
Self(value)
}
}
pub struct LamportClock {
counter: AtomicU64,
}
impl LamportClock {
#[must_use]
pub fn new() -> Self {
Self {
counter: AtomicU64::new(0),
}
}
#[must_use]
pub fn with_start(start: u64) -> Self {
Self {
counter: AtomicU64::new(start),
}
}
#[must_use]
pub fn now(&self) -> LamportTime {
LamportTime(self.counter.load(Ordering::Acquire))
}
#[must_use]
pub fn tick(&self) -> LamportTime {
let mut current = self.counter.load(Ordering::Acquire);
loop {
let next = current
.checked_add(1)
.expect("Lamport clock overflowed while ticking");
match self.counter.compare_exchange_weak(
current,
next,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => return LamportTime(next),
Err(actual) => current = actual,
}
}
}
#[must_use]
pub fn receive(&self, sender: LamportTime) -> LamportTime {
let mut current = self.counter.load(Ordering::Acquire);
loop {
let next = current
.max(sender.raw())
.checked_add(1)
.expect("Lamport clock overflowed while merging a received time");
match self.counter.compare_exchange_weak(
current,
next,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => return LamportTime(next),
Err(actual) => current = actual,
}
}
}
}
impl Default for LamportClock {
fn default() -> Self {
Self::new()
}
}
impl fmt::Debug for LamportClock {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("LamportClock")
.field("counter", &self.counter.load(Ordering::Relaxed))
.finish()
}
}
impl LogicalClock for LamportClock {
type Time = LamportTime;
fn tick(&self) -> Self::Time {
Self::tick(self)
}
fn receive(&self, sender_time: &Self::Time) -> Self::Time {
Self::receive(self, *sender_time)
}
fn now(&self) -> Self::Time {
Self::now(self)
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct HybridTime {
physical: Time,
logical: u64,
}
impl HybridTime {
#[must_use]
pub const fn new(physical: Time, logical: u64) -> Self {
Self { physical, logical }
}
#[must_use]
pub const fn physical(self) -> Time {
self.physical
}
#[must_use]
pub const fn logical(self) -> u64 {
self.logical
}
}
#[derive(Debug)]
struct HybridState {
last_physical: Time,
logical: u64,
}
pub struct HybridClock {
time_source: Arc<dyn TimeSource>,
state: Mutex<HybridState>,
}
impl HybridClock {
#[must_use]
pub fn new(time_source: Arc<dyn TimeSource>) -> Self {
let now = time_source.now();
Self {
time_source,
state: Mutex::new(HybridState {
last_physical: now,
logical: 0,
}),
}
}
#[must_use]
pub fn now(&self) -> HybridTime {
let state = self.state.lock();
let physical = self.physical_now(&state);
let logical = if physical == state.last_physical {
state.logical
} else {
0
};
HybridTime::new(physical, logical)
}
#[must_use]
pub fn tick(&self) -> HybridTime {
let mut state = self.state.lock();
let physical = self.physical_now(&state);
if physical == state.last_physical {
state.logical = state
.logical
.checked_add(1)
.expect("Hybrid clock logical counter overflowed while ticking");
} else {
state.last_physical = physical;
state.logical = 0;
}
HybridTime::new(state.last_physical, state.logical)
}
#[must_use]
pub fn receive(&self, sender: HybridTime) -> HybridTime {
let mut state = self.state.lock();
let physical_now = self.physical_now(&state);
let max_physical = physical_now.max(state.last_physical).max(sender.physical);
let next_logical = if max_physical == state.last_physical && max_physical == sender.physical
{
state
.logical
.max(sender.logical)
.checked_add(1)
.expect("Hybrid clock logical counter overflowed while merging equal physical time")
} else if max_physical == state.last_physical {
state.logical.checked_add(1).expect(
"Hybrid clock logical counter overflowed while advancing local logical time",
)
} else if max_physical == sender.physical {
sender.logical.checked_add(1).expect(
"Hybrid clock logical counter overflowed while incorporating a remote physical time",
)
} else {
0
};
state.last_physical = max_physical;
state.logical = next_logical;
HybridTime::new(state.last_physical, state.logical)
}
fn physical_now(&self, state: &HybridState) -> Time {
let physical = self.time_source.now();
if physical < state.last_physical {
state.last_physical
} else {
physical
}
}
}
impl fmt::Debug for HybridClock {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let state = self.state.lock();
f.debug_struct("HybridClock")
.field("last_physical", &state.last_physical)
.field("logical", &state.logical)
.finish_non_exhaustive()
}
}
impl LogicalClock for HybridClock {
type Time = HybridTime;
fn tick(&self) -> Self::Time {
Self::tick(self)
}
fn receive(&self, sender_time: &Self::Time) -> Self::Time {
Self::receive(self, *sender_time)
}
fn now(&self) -> Self::Time {
Self::now(self)
}
}
pub struct VectorClockHandle {
node: NodeId,
clock: Mutex<VectorClock>,
}
impl VectorClockHandle {
#[must_use]
pub fn new(node: NodeId) -> Self {
Self {
node,
clock: Mutex::new(VectorClock::new()),
}
}
#[must_use]
pub fn current(&self) -> VectorClock {
self.clock.lock().clone()
}
}
impl fmt::Debug for VectorClockHandle {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("VectorClockHandle")
.field("node", &self.node)
.field("clock", &self.clock.lock())
.finish()
}
}
impl LogicalClock for VectorClockHandle {
type Time = VectorClock;
fn tick(&self) -> Self::Time {
let mut clock = self.clock.lock();
clock.increment(&self.node);
clock.clone()
}
fn receive(&self, sender_time: &Self::Time) -> Self::Time {
let mut clock = self.clock.lock();
clock.receive(&self.node, sender_time);
clock.clone()
}
fn now(&self) -> Self::Time {
self.clock.lock().clone()
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum LogicalTime {
Lamport(LamportTime),
Vector(VectorClock),
Hybrid(HybridTime),
}
impl LogicalTime {
#[must_use]
pub const fn kind(&self) -> LogicalClockKind {
match self {
Self::Lamport(_) => LogicalClockKind::Lamport,
Self::Vector(_) => LogicalClockKind::Vector,
Self::Hybrid(_) => LogicalClockKind::Hybrid,
}
}
#[must_use]
pub fn causal_order(&self, other: &Self) -> CausalOrder {
match (self, other) {
(Self::Vector(a), Self::Vector(b)) => a.causal_order(b),
_ => match self.partial_cmp(other) {
Some(std::cmp::Ordering::Less) => CausalOrder::Before,
Some(std::cmp::Ordering::Greater) => CausalOrder::After,
Some(std::cmp::Ordering::Equal) => CausalOrder::Equal,
None => CausalOrder::Concurrent,
},
}
}
}
impl PartialOrd for LogicalTime {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
match (self, other) {
(Self::Lamport(a), Self::Lamport(b)) => a.partial_cmp(b),
(Self::Vector(a), Self::Vector(b)) => a.partial_cmp(b),
(Self::Hybrid(a), Self::Hybrid(b)) => a.partial_cmp(b),
_ => None,
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum LogicalClockKind {
Lamport,
Vector,
Hybrid,
}
#[derive(Clone, Debug)]
pub enum LogicalClockMode {
Lamport,
Vector {
node: NodeId,
},
Hybrid,
}
#[derive(Clone)]
pub enum LogicalClockHandle {
Lamport(Arc<LamportClock>),
Vector(Arc<VectorClockHandle>),
Hybrid(Arc<HybridClock>),
}
impl LogicalClockHandle {
#[must_use]
pub const fn kind(&self) -> LogicalClockKind {
match self {
Self::Lamport(_) => LogicalClockKind::Lamport,
Self::Vector(_) => LogicalClockKind::Vector,
Self::Hybrid(_) => LogicalClockKind::Hybrid,
}
}
#[must_use]
pub fn tick(&self) -> LogicalTime {
match self {
Self::Lamport(clock) => LogicalTime::Lamport(clock.tick()),
Self::Vector(clock) => LogicalTime::Vector(clock.tick()),
Self::Hybrid(clock) => LogicalTime::Hybrid(clock.tick()),
}
}
#[must_use]
pub fn receive(&self, sender_time: &LogicalTime) -> LogicalTime {
match (self, sender_time) {
(Self::Lamport(clock), LogicalTime::Lamport(time)) => {
LogicalTime::Lamport(clock.receive(*time))
}
(Self::Vector(clock), LogicalTime::Vector(time)) => {
LogicalTime::Vector(clock.receive(time))
}
(Self::Hybrid(clock), LogicalTime::Hybrid(time)) => {
LogicalTime::Hybrid(clock.receive(*time))
}
_ => self.tick(),
}
}
#[must_use]
pub fn now(&self) -> LogicalTime {
match self {
Self::Lamport(clock) => LogicalTime::Lamport(clock.now()),
Self::Vector(clock) => LogicalTime::Vector(clock.now()),
Self::Hybrid(clock) => LogicalTime::Hybrid(clock.now()),
}
}
}
impl fmt::Debug for LogicalClockHandle {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Lamport(_) => f.write_str("LogicalClockHandle::Lamport"),
Self::Vector(_) => f.write_str("LogicalClockHandle::Vector"),
Self::Hybrid(_) => f.write_str("LogicalClockHandle::Hybrid"),
}
}
}
impl Default for LogicalClockHandle {
fn default() -> Self {
Self::Lamport(Arc::new(LamportClock::new()))
}
}
impl LogicalClockMode {
#[must_use]
pub fn build_handle(&self, timer_driver: Option<TimerDriverHandle>) -> LogicalClockHandle {
match self {
Self::Lamport => LogicalClockHandle::Lamport(Arc::new(LamportClock::new())),
Self::Vector { node } => {
LogicalClockHandle::Vector(Arc::new(VectorClockHandle::new(node.clone())))
}
Self::Hybrid => {
let time_source: Arc<dyn TimeSource> = match timer_driver {
Some(driver) => Arc::new(TimerDriverSource::new(driver)),
None => Arc::new(WallClock::new()),
};
LogicalClockHandle::Hybrid(Arc::new(HybridClock::new(time_source)))
}
}
}
}
#[derive(Clone)]
struct TimerDriverSource {
timer: TimerDriverHandle,
}
impl TimerDriverSource {
fn new(timer: TimerDriverHandle) -> Self {
Self { timer }
}
}
impl fmt::Debug for TimerDriverSource {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("TimerDriverSource").finish()
}
}
impl TimeSource for TimerDriverSource {
fn now(&self) -> Time {
self.timer.now()
}
}
#[derive(Clone, PartialEq, Eq)]
pub struct VectorClock {
entries: BTreeMap<NodeId, u64>,
}
impl VectorClock {
#[must_use]
pub fn new() -> Self {
Self {
entries: BTreeMap::new(),
}
}
#[must_use]
pub fn for_node(node: &NodeId) -> Self {
let mut vc = Self::new();
vc.entries.insert(node.clone(), 1);
vc
}
#[must_use]
pub fn get(&self, node: &NodeId) -> u64 {
self.entries.get(node).copied().unwrap_or(0)
}
pub fn increment(&mut self, node: &NodeId) -> u64 {
let entry = self.entries.entry(node.clone()).or_insert(0);
*entry = entry
.checked_add(1)
.expect("Vector clock counter overflowed while incrementing");
*entry
}
pub fn set(&mut self, node: &NodeId, value: u64) {
if value == 0 {
return;
}
let entry = self.entries.entry(node.clone()).or_insert(0);
if value > *entry {
*entry = value;
}
}
#[must_use]
pub fn merge(&self, other: &Self) -> Self {
let mut result = self.clone();
for (node, &value) in &other.entries {
let entry = result.entries.entry(node.clone()).or_insert(0);
if value > *entry {
*entry = value;
}
}
result
}
pub fn merge_in(&mut self, other: &Self) {
for (node, &value) in &other.entries {
let entry = self.entries.entry(node.clone()).or_insert(0);
if value > *entry {
*entry = value;
}
}
}
pub fn receive(&mut self, local_node: &NodeId, remote_clock: &Self) {
self.merge_in(remote_clock);
self.increment(local_node);
}
#[must_use]
pub fn causal_order(&self, other: &Self) -> CausalOrder {
let mut self_leq_other = true;
let mut other_leq_self = true;
let all_nodes: std::collections::BTreeSet<&NodeId> =
self.entries.keys().chain(other.entries.keys()).collect();
for node in all_nodes {
let a = self.get(node);
let b = other.get(node);
if a > b {
self_leq_other = false;
}
if b > a {
other_leq_self = false;
}
if !self_leq_other && !other_leq_self {
return CausalOrder::Concurrent;
}
}
match (self_leq_other, other_leq_self) {
(true, true) => CausalOrder::Equal,
(true, false) => CausalOrder::Before,
(false, true) => CausalOrder::After,
(false, false) => CausalOrder::Concurrent,
}
}
#[must_use]
pub fn happens_before(&self, other: &Self) -> bool {
self.causal_order(other) == CausalOrder::Before
}
#[must_use]
pub fn is_concurrent_with(&self, other: &Self) -> bool {
self.causal_order(other) == CausalOrder::Concurrent
}
#[must_use]
pub fn node_count(&self) -> usize {
self.entries.len()
}
#[must_use]
pub fn is_zero(&self) -> bool {
self.entries.is_empty()
}
pub fn iter(&self) -> impl Iterator<Item = (&NodeId, &u64)> {
self.entries.iter()
}
}
impl Default for VectorClock {
fn default() -> Self {
Self::new()
}
}
impl PartialOrd for VectorClock {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
match self.causal_order(other) {
CausalOrder::Before => Some(std::cmp::Ordering::Less),
CausalOrder::After => Some(std::cmp::Ordering::Greater),
CausalOrder::Equal => Some(std::cmp::Ordering::Equal),
CausalOrder::Concurrent => None,
}
}
}
impl fmt::Debug for VectorClock {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "VC{{")?;
for (i, (node, value)) in self.entries.iter().enumerate() {
if i > 0 {
write!(f, ", ")?;
}
write!(f, "{}:{}", node.as_str(), value)?;
}
write!(f, "}}")
}
}
impl fmt::Display for VectorClock {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "[")?;
for (i, (node, value)) in self.entries.iter().enumerate() {
if i > 0 {
write!(f, ", ")?;
}
write!(f, "{}={}", node.as_str(), value)?;
}
write!(f, "]")
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CausalOrder {
Before,
After,
Equal,
Concurrent,
}
#[derive(Clone, Debug)]
pub struct CausalEvent<T> {
pub origin: NodeId,
pub clock: VectorClock,
pub event: T,
}
impl<T> CausalEvent<T> {
pub fn new(origin: NodeId, clock: VectorClock, event: T) -> Self {
Self {
origin,
clock,
event,
}
}
pub fn happens_before<U>(&self, other: &CausalEvent<U>) -> bool {
self.clock.happens_before(&other.clock)
}
pub fn is_concurrent_with<U>(&self, other: &CausalEvent<U>) -> bool {
self.clock.is_concurrent_with(&other.clock)
}
}
#[derive(Clone, Debug)]
pub struct CausalTracker {
node: NodeId,
clock: VectorClock,
}
impl CausalTracker {
#[must_use]
pub fn new(node: NodeId) -> Self {
Self {
node,
clock: VectorClock::new(),
}
}
pub fn record_local_event(&mut self) -> VectorClock {
self.clock.increment(&self.node);
self.clock.clone()
}
pub fn record<T>(&mut self, event: T) -> CausalEvent<T> {
let clock = self.record_local_event();
CausalEvent::new(self.node.clone(), clock, event)
}
pub fn on_send(&mut self) -> VectorClock {
self.record_local_event()
}
pub fn on_receive(&mut self, remote_clock: &VectorClock) {
self.clock.receive(&self.node, remote_clock);
}
#[must_use]
pub fn current_clock(&self) -> &VectorClock {
&self.clock
}
#[must_use]
pub fn node(&self) -> &NodeId {
&self.node
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::time::VirtualClock;
use serde_json::{Value, json};
use std::sync::Arc;
fn node(name: &str) -> NodeId {
NodeId::new(name)
}
fn scrub_vclock_output(value: Value) -> Value {
fn scrub_node_names(text: &str) -> String {
text.replace("alpha-node", "[NODE_A]")
.replace("beta-node", "[NODE_B]")
}
let mut scrubbed = value;
if let Some(display) = scrubbed.pointer_mut("/display") {
if let Some(text) = display.as_str() {
*display = Value::String(scrub_node_names(text));
}
}
if let Some(debug) = scrubbed.pointer_mut("/debug") {
if let Some(text) = debug.as_str() {
*debug = Value::String(scrub_node_names(text));
}
}
scrubbed
}
#[test]
fn empty_clocks_are_equal() {
let a = VectorClock::new();
let b = VectorClock::new();
assert_eq!(a.causal_order(&b), CausalOrder::Equal);
assert_eq!(a.partial_cmp(&b), Some(std::cmp::Ordering::Equal));
}
#[test]
fn increment_creates_happens_before() {
let n = node("A");
let mut a = VectorClock::new();
let b = a.clone();
a.increment(&n);
assert_eq!(b.causal_order(&a), CausalOrder::Before);
assert!(b.happens_before(&a));
}
#[test]
fn concurrent_detection() {
let na = node("A");
let nb = node("B");
let mut a = VectorClock::new();
let mut b = VectorClock::new();
a.increment(&na);
b.increment(&nb);
assert_eq!(a.causal_order(&b), CausalOrder::Concurrent);
assert!(a.is_concurrent_with(&b));
assert_eq!(a.partial_cmp(&b), None);
}
#[test]
fn lamport_tick_and_receive() {
let clock = LamportClock::new();
let t1 = clock.tick();
let t2 = clock.tick();
assert!(t2 > t1);
let remote = LamportTime::from_raw(10);
let merged = clock.receive(remote);
assert!(merged.raw() > remote.raw());
}
#[test]
#[should_panic(expected = "Lamport clock overflowed while ticking")]
fn lamport_tick_panics_on_overflow() {
let clock = LamportClock::with_start(u64::MAX);
let _ = clock.tick();
}
#[test]
#[should_panic(expected = "Lamport clock overflowed while merging a received time")]
fn lamport_receive_panics_on_overflow() {
let clock = LamportClock::with_start(u64::MAX - 1);
let _ = clock.receive(LamportTime::from_raw(u64::MAX));
}
#[test]
fn hybrid_clock_deterministic_with_virtual_time() {
let virtual_clock = Arc::new(VirtualClock::new());
let hlc = HybridClock::new(virtual_clock.clone());
let t1 = hlc.tick();
let t2 = hlc.tick();
assert!(t2 >= t1);
virtual_clock.advance(1_000);
let t3 = hlc.tick();
assert!(t3.physical() >= t2.physical());
}
#[test]
fn hybrid_now_resets_logical_when_physical_advances() {
let virtual_clock = Arc::new(VirtualClock::new());
let hlc = HybridClock::new(virtual_clock.clone());
let t1 = hlc.tick();
assert_eq!(t1.logical(), 1);
virtual_clock.advance(1_000);
let observed = hlc.now();
assert!(observed.physical() > t1.physical());
assert_eq!(observed.logical(), 0);
let t2 = hlc.tick();
assert!(t2 >= observed);
}
#[test]
#[should_panic(expected = "Hybrid clock logical counter overflowed while ticking")]
fn hybrid_tick_panics_on_logical_overflow() {
let time_source: Arc<dyn TimeSource> = Arc::new(VirtualClock::new());
let hlc = HybridClock {
time_source,
state: Mutex::new(HybridState {
last_physical: Time::ZERO,
logical: u64::MAX,
}),
};
let _ = hlc.tick();
}
#[test]
#[should_panic(
expected = "Hybrid clock logical counter overflowed while merging equal physical time"
)]
fn hybrid_receive_panics_on_equal_physical_logical_overflow() {
let time_source: Arc<dyn TimeSource> = Arc::new(VirtualClock::new());
let hlc = HybridClock {
time_source,
state: Mutex::new(HybridState {
last_physical: Time::ZERO,
logical: u64::MAX,
}),
};
let _ = hlc.receive(HybridTime::new(Time::ZERO, u64::MAX));
}
#[test]
fn merge_is_least_upper_bound() {
let na = node("A");
let nb = node("B");
let mut a = VectorClock::new();
a.increment(&na);
a.increment(&na);
let mut b = VectorClock::new();
b.increment(&nb);
b.increment(&nb);
b.increment(&nb);
let merged = a.merge(&b);
assert_eq!(merged.get(&na), 2);
assert_eq!(merged.get(&nb), 3);
assert!(a.happens_before(&merged));
assert!(b.happens_before(&merged));
}
#[test]
fn merge_is_commutative() {
let na = node("A");
let nb = node("B");
let mut a = VectorClock::new();
a.increment(&na);
let mut b = VectorClock::new();
b.increment(&nb);
assert_eq!(a.merge(&b), b.merge(&a));
}
#[test]
fn merge_is_associative() {
let na = node("A");
let nb = node("B");
let nc = node("C");
let mut a = VectorClock::new();
a.increment(&na);
let mut b = VectorClock::new();
b.increment(&nb);
let mut c = VectorClock::new();
c.increment(&nc);
let ab_c = a.merge(&b).merge(&c);
let a_bc = a.merge(&b.merge(&c));
assert_eq!(ab_c, a_bc);
}
#[test]
fn merge_is_idempotent() {
let na = node("A");
let mut a = VectorClock::new();
a.increment(&na);
assert_eq!(a.merge(&a), a);
}
#[test]
fn receive_merges_and_increments() {
let na = node("A");
let nb = node("B");
let mut a = VectorClock::new();
a.increment(&na);
let mut b = VectorClock::new();
b.increment(&nb); b.increment(&nb);
a.receive(&na, &b); assert_eq!(a.get(&na), 2);
assert_eq!(a.get(&nb), 2);
}
#[test]
fn vector_clock_output_snapshot_scrubbed() {
let na = node("alpha-node");
let nb = node("beta-node");
let mut a = VectorClock::new();
a.increment(&na);
a.increment(&na);
let mut b = VectorClock::new();
b.increment(&nb);
let merged = a.merge(&b);
insta::assert_json_snapshot!(
"vector_clock_output_scrubbed",
scrub_vclock_output(json!({
"display": merged.to_string(),
"debug": format!("{merged:?}"),
"order_vs_a": format!("{:?}", merged.causal_order(&a)),
}))
);
}
#[test]
fn for_node_initializes_to_one() {
let n = node("X");
let vc = VectorClock::for_node(&n);
assert_eq!(vc.get(&n), 1);
assert_eq!(vc.node_count(), 1);
}
#[test]
fn set_is_monotone() {
let n = node("A");
let mut vc = VectorClock::new();
vc.set(&n, 3);
assert_eq!(vc.get(&n), 3);
vc.set(&n, 1);
assert_eq!(vc.get(&n), 3);
vc.set(&n, 7);
assert_eq!(vc.get(&n), 7);
}
#[test]
#[should_panic(expected = "Vector clock counter overflowed while incrementing")]
fn vector_clock_increment_panics_on_overflow() {
let n = node("A");
let mut vc = VectorClock::new();
vc.entries.insert(n.clone(), u64::MAX);
let _ = vc.increment(&n);
}
#[test]
#[should_panic(expected = "Vector clock counter overflowed while incrementing")]
fn vector_clock_receive_panics_on_local_overflow() {
let local = node("A");
let remote = node("B");
let mut vc = VectorClock::new();
vc.entries.insert(local.clone(), u64::MAX);
let mut remote_clock = VectorClock::new();
remote_clock.set(&remote, 1);
vc.receive(&local, &remote_clock);
}
#[test]
fn causal_tracker_send_receive_protocol() {
let na = node("A");
let nb = node("B");
let mut tracker_a = CausalTracker::new(na.clone());
let mut tracker_b = CausalTracker::new(nb.clone());
tracker_a.record_local_event();
let msg_clock = tracker_a.on_send(); assert_eq!(msg_clock.get(&na), 2);
tracker_b.on_receive(&msg_clock); assert_eq!(tracker_b.current_clock().get(&na), 2);
assert_eq!(tracker_b.current_clock().get(&nb), 1);
tracker_b.record_local_event();
assert!(msg_clock.happens_before(tracker_b.current_clock()));
}
#[test]
fn causal_event_ordering() {
let na = node("A");
let nb = node("B");
let mut tracker_a = CausalTracker::new(na);
let mut tracker_b = CausalTracker::new(nb);
let e1 = tracker_a.record("event-1");
let e2 = tracker_b.record("event-2");
assert!(e1.is_concurrent_with(&e2));
assert!(!e1.happens_before(&e2));
}
#[test]
fn display_formatting() {
let na = node("A");
let nb = node("B");
let mut vc = VectorClock::new();
vc.increment(&na);
vc.increment(&nb);
vc.increment(&nb);
let display = format!("{vc}");
assert!(display.contains("A=1"));
assert!(display.contains("B=2"));
}
#[test]
fn partial_order_after() {
let na = node("A");
let mut a = VectorClock::new();
a.increment(&na);
let b = VectorClock::new();
assert_eq!(a.causal_order(&b), CausalOrder::After);
assert_eq!(a.partial_cmp(&b), Some(std::cmp::Ordering::Greater));
}
#[test]
fn three_node_diamond() {
let na = node("A");
let nb = node("B");
let nc = node("C");
let nd = node("D");
let mut ta = CausalTracker::new(na);
let mut tb = CausalTracker::new(nb);
let mut tc = CausalTracker::new(nc);
let mut td = CausalTracker::new(nd);
let msg_to_b = ta.on_send();
let msg_to_c = ta.on_send();
tb.on_receive(&msg_to_b);
tc.on_receive(&msg_to_c);
let b_clock = tb.on_send();
let c_clock = tc.on_send();
assert!(b_clock.is_concurrent_with(&c_clock));
td.on_receive(&b_clock);
td.on_receive(&c_clock);
assert!(b_clock.happens_before(td.current_clock()));
assert!(c_clock.happens_before(td.current_clock()));
}
#[test]
fn hybrid_time_debug_clone_copy_hash_ord() {
use std::collections::HashSet;
let ht = HybridTime::new(Time::from_nanos(1_000), 3);
let dbg = format!("{ht:?}");
assert!(dbg.contains("HybridTime"), "{dbg}");
let copied = ht;
let cloned = ht;
assert_eq!(copied, cloned);
let earlier = HybridTime::new(Time::ZERO, 0);
assert!(earlier < ht);
let mut set = HashSet::new();
set.insert(ht);
set.insert(earlier);
assert_eq!(set.len(), 2);
assert!(set.contains(&ht));
}
#[test]
fn logical_clock_kind_debug_clone_copy_eq() {
let k = LogicalClockKind::Lamport;
let dbg = format!("{k:?}");
assert!(dbg.contains("Lamport"), "{dbg}");
let copied = k;
let cloned = k;
assert_eq!(copied, cloned);
assert_ne!(k, LogicalClockKind::Vector);
assert_ne!(k, LogicalClockKind::Hybrid);
}
#[test]
fn logical_time_debug_clone_eq() {
let lt = LogicalTime::Lamport(LamportTime::from_raw(5));
let dbg = format!("{lt:?}");
assert!(dbg.contains("Lamport"), "{dbg}");
let cloned = lt.clone();
assert_eq!(lt, cloned);
}
#[test]
fn logical_clock_mode_debug_clone() {
let mode = LogicalClockMode::Lamport;
let dbg = format!("{mode:?}");
assert!(dbg.contains("Lamport"), "{dbg}");
let cloned = mode;
let dbg2 = format!("{cloned:?}");
assert_eq!(dbg, dbg2);
}
}