#![cfg(loom)]
#![allow(
clippy::disallowed_methods,
reason = "loom test uses std::sync types; no real poison concern"
)]
use loom::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use loom::sync::Arc;
use loom::thread;
#[derive(Default)]
struct StatsModel {
packets_in: AtomicU64,
packets_out: AtomicU64,
packets_dropped: AtomicU64,
bytes_in: AtomicU64,
bytes_out: AtomicU64,
}
impl StatsModel {
fn record_in(&self, bytes: u64) {
self.packets_in.fetch_add(1, Ordering::Relaxed);
self.bytes_in.fetch_add(bytes, Ordering::Relaxed);
}
fn record_out(&self, bytes: u64) {
self.packets_out.fetch_add(1, Ordering::Relaxed);
self.bytes_out.fetch_add(bytes, Ordering::Relaxed);
}
fn record_drop(&self) {
self.packets_dropped.fetch_add(1, Ordering::Relaxed);
}
fn totals(&self) -> (u64, u64, u64, u64, u64) {
(
self.packets_in.load(Ordering::Relaxed),
self.packets_out.load(Ordering::Relaxed),
self.packets_dropped.load(Ordering::Relaxed),
self.bytes_in.load(Ordering::Relaxed),
self.bytes_out.load(Ordering::Relaxed),
)
}
}
#[test]
fn stream_stats_counter_battery_is_atomic_under_concurrent_record() {
loom::model(|| {
let stats = Arc::new(StatsModel::default());
let a = {
let s = stats.clone();
thread::spawn(move || s.record_in(100))
};
let b = {
let s = stats.clone();
thread::spawn(move || {
s.record_out(250);
s.record_drop();
})
};
a.join().unwrap();
b.join().unwrap();
let (p_in, p_out, p_drop, b_in, b_out) = stats.totals();
assert_eq!(p_in, 1);
assert_eq!(p_out, 1);
assert_eq!(p_drop, 1);
assert_eq!(b_in, 100);
assert_eq!(b_out, 250);
});
}
fn try_decrement_burst(counter: &AtomicU64) -> bool {
loop {
let remaining = counter.load(Ordering::Relaxed);
if remaining == 0 {
return false;
}
match counter.compare_exchange_weak(
remaining,
remaining - 1,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => return true,
Err(_) => continue, }
}
}
#[test]
fn burst_cas_decrement_never_underflows_under_contention() {
loom::model(|| {
let counter = Arc::new(AtomicU64::new(2));
let decremented_a = Arc::new(AtomicBool::new(false));
let decremented_b = Arc::new(AtomicBool::new(false));
let a = {
let c = counter.clone();
let d = decremented_a.clone();
thread::spawn(move || {
if try_decrement_burst(&c) {
d.store(true, Ordering::Relaxed);
}
})
};
let b = {
let c = counter.clone();
let d = decremented_b.clone();
thread::spawn(move || {
if try_decrement_burst(&c) {
d.store(true, Ordering::Relaxed);
}
})
};
a.join().unwrap();
b.join().unwrap();
let final_count = counter.load(Ordering::Relaxed);
assert_eq!(
final_count, 0,
"CAS loop preserves the sum: initial(2) - decrements(2) = 0. \
A non-zero final count means the loop failed to decrement; a \
value near u64::MAX means the CAS loop regressed to load+sub.",
);
assert!(
decremented_a.load(Ordering::Relaxed),
"thread A did not observe a successful decrement",
);
assert!(
decremented_b.load(Ordering::Relaxed),
"thread B did not observe a successful decrement",
);
});
}
#[test]
fn burst_cas_decrement_caps_at_initial_count_under_contention() {
loom::model(|| {
let counter = Arc::new(AtomicU64::new(1));
let winners = Arc::new(AtomicU64::new(0));
let threads: Vec<_> = (0..2)
.map(|_| {
let c = counter.clone();
let w = winners.clone();
thread::spawn(move || {
if try_decrement_burst(&c) {
w.fetch_add(1, Ordering::Relaxed);
}
})
})
.collect();
for t in threads {
t.join().unwrap();
}
let final_count = counter.load(Ordering::Relaxed);
let winner_count = winners.load(Ordering::Relaxed);
assert_eq!(
final_count, 0,
"counter must end at exactly 0 — any other value means the \
CAS loop is racy. winners saw {winner_count} successful decrements",
);
assert_eq!(
winner_count, 1,
"exactly one thread must win the last decrement when initial=1",
);
});
}
struct AuthBloomModel {
bloom_bit: AtomicU64,
verified: AtomicU64,
}
impl AuthBloomModel {
fn new() -> Self {
Self {
bloom_bit: AtomicU64::new(0),
verified: AtomicU64::new(0),
}
}
fn authorize(&self) {
self.bloom_bit.store(1, Ordering::Relaxed);
self.verified.store(1, Ordering::Release);
}
fn check_fast(&self) -> u8 {
if self.bloom_bit.load(Ordering::Relaxed) == 0 {
return 0;
}
if self.verified.load(Ordering::Acquire) == 1 {
1
} else {
2
}
}
}
#[test]
fn auth_bloom_authorize_check_fast_concurrent_verdict_is_documented() {
loom::model(|| {
let m = Arc::new(AuthBloomModel::new());
let producer = {
let m = m.clone();
thread::spawn(move || m.authorize())
};
let consumer = {
let m = m.clone();
thread::spawn(move || m.check_fast())
};
producer.join().unwrap();
let verdict = consumer.join().unwrap();
assert!(
matches!(verdict, 0 | 1 | 2),
"check_fast returned undocumented verdict {verdict}",
);
});
}
#[test]
fn auth_bloom_post_authorize_check_never_denies() {
loom::model(|| {
let m = Arc::new(AuthBloomModel::new());
let producer = {
let m = m.clone();
thread::spawn(move || m.authorize())
};
producer.join().unwrap();
let verdict = m.check_fast();
assert_ne!(
verdict, 0,
"check_fast after a joined authorize must never return Denied",
);
});
}
fn record_tail_seq_monotonic_max(cell: &AtomicU64, seq: u64) {
let mut current = cell.load(Ordering::Relaxed);
while seq > current {
match cell.compare_exchange_weak(current, seq, Ordering::Relaxed, Ordering::Relaxed) {
Ok(_) => return,
Err(now) => current = now,
}
}
}
#[test]
fn record_tail_seq_converges_on_max_under_concurrent_updates() {
loom::model(|| {
let tail = Arc::new(AtomicU64::new(0));
let a = {
let t = tail.clone();
thread::spawn(move || {
record_tail_seq_monotonic_max(&t, 5);
record_tail_seq_monotonic_max(&t, 10);
})
};
let b = {
let t = tail.clone();
thread::spawn(move || {
record_tail_seq_monotonic_max(&t, 7);
record_tail_seq_monotonic_max(&t, 3);
})
};
a.join().unwrap();
b.join().unwrap();
let final_seq = tail.load(Ordering::Relaxed);
assert_eq!(
final_seq, 10,
"monotonic-max CAS must converge on max(proposals); \
a value below 10 means the loop dropped a proposal",
);
});
}
#[test]
fn record_tail_seq_lower_proposal_does_not_regress_existing() {
loom::model(|| {
let tail = Arc::new(AtomicU64::new(0));
let high = {
let t = tail.clone();
thread::spawn(move || record_tail_seq_monotonic_max(&t, 100))
};
let low = {
let t = tail.clone();
thread::spawn(move || record_tail_seq_monotonic_max(&t, 5))
};
high.join().unwrap();
low.join().unwrap();
assert_eq!(
tail.load(Ordering::Relaxed),
100,
"lower proposal must never regress the committed max",
);
});
}
#[derive(Default)]
struct ReplicationMetricsModel {
sync_bytes_total: AtomicU64,
leader_changes_total: AtomicU64,
under_capacity_total: AtomicU64,
}
impl ReplicationMetricsModel {
fn incr_sync_bytes(&self, bytes: u64) {
self.sync_bytes_total.fetch_add(bytes, Ordering::Relaxed);
}
fn incr_leader_change(&self) {
self.leader_changes_total.fetch_add(1, Ordering::Relaxed);
}
fn incr_under_capacity(&self) {
self.under_capacity_total.fetch_add(1, Ordering::Relaxed);
}
fn totals(&self) -> (u64, u64, u64) {
(
self.sync_bytes_total.load(Ordering::Relaxed),
self.leader_changes_total.load(Ordering::Relaxed),
self.under_capacity_total.load(Ordering::Relaxed),
)
}
}
fn try_first_close(flag: &AtomicBool) -> bool {
!flag.swap(true, Ordering::AcqRel)
}
#[test]
fn close_swap_pattern_exactly_one_caller_wins() {
loom::model(|| {
let closed = Arc::new(AtomicBool::new(false));
let winners = Arc::new(AtomicU64::new(0));
let a = {
let c = closed.clone();
let w = winners.clone();
thread::spawn(move || {
if try_first_close(&c) {
w.fetch_add(1, Ordering::Relaxed);
}
})
};
let b = {
let c = closed.clone();
let w = winners.clone();
thread::spawn(move || {
if try_first_close(&c) {
w.fetch_add(1, Ordering::Relaxed);
}
})
};
a.join().unwrap();
b.join().unwrap();
assert_eq!(
winners.load(Ordering::Relaxed),
1,
"exactly one caller wins the swap; a load+store regression \
would let both threads observe `false` and both run cleanup",
);
assert!(
closed.load(Ordering::Relaxed),
"flag must be true after both joins regardless of who won",
);
});
}
#[test]
fn replication_metrics_counters_atomic_under_concurrent_increments() {
loom::model(|| {
let m = Arc::new(ReplicationMetricsModel::default());
let a = {
let m = m.clone();
thread::spawn(move || {
m.incr_sync_bytes(1024);
m.incr_leader_change();
})
};
let b = {
let m = m.clone();
thread::spawn(move || {
m.incr_under_capacity();
m.incr_leader_change();
})
};
a.join().unwrap();
b.join().unwrap();
let (bytes, leader_changes, under_cap) = m.totals();
assert_eq!(bytes, 1024);
assert_eq!(leader_changes, 2, "both threads bumped leader_changes");
assert_eq!(under_cap, 1);
});
}
#[test]
fn replication_metrics_same_counter_three_way_contention() {
loom::model(|| {
let m = Arc::new(ReplicationMetricsModel::default());
let handles: Vec<_> = (0..3)
.map(|_| {
let m = m.clone();
thread::spawn(move || m.incr_leader_change())
})
.collect();
for h in handles {
h.join().unwrap();
}
let (_, leader_changes, _) = m.totals();
assert_eq!(leader_changes, 3, "no lost updates under contention");
});
}