use loom::sync::Arc;
use loom::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
use loom::thread;
#[derive(Debug)]
struct LoomDeltaBuffer {
edge_count: AtomicUsize,
next_seq: AtomicU64,
compacting: AtomicBool,
staging_count: AtomicUsize,
}
impl LoomDeltaBuffer {
fn new() -> Self {
Self {
edge_count: AtomicUsize::new(0),
next_seq: AtomicU64::new(0),
compacting: AtomicBool::new(false),
staging_count: AtomicUsize::new(0),
}
}
fn add_edge(&self) -> u64 {
let seq = self.next_seq.fetch_add(1, Ordering::AcqRel);
if self.compacting.load(Ordering::Acquire) {
self.staging_count.fetch_add(1, Ordering::AcqRel);
} else {
self.edge_count.fetch_add(1, Ordering::AcqRel);
}
seq
}
fn try_start_compaction(&self) -> bool {
self.compacting
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
}
fn drain_for_compaction(&self) -> usize {
self.edge_count.swap(0, Ordering::AcqRel)
}
fn finish_compaction(&self) {
let staged = self.staging_count.swap(0, Ordering::AcqRel);
self.edge_count.fetch_add(staged, Ordering::AcqRel);
self.compacting.store(false, Ordering::Release);
}
fn edge_count(&self) -> usize {
self.edge_count.load(Ordering::Acquire)
}
fn staging_count(&self) -> usize {
self.staging_count.load(Ordering::Acquire)
}
fn is_compacting(&self) -> bool {
self.compacting.load(Ordering::Acquire)
}
}
#[derive(Debug)]
struct LoomCheckpoint {
phase: AtomicUsize,
counter_value: AtomicUsize,
valid: AtomicBool,
}
impl LoomCheckpoint {
fn new() -> Self {
Self {
phase: AtomicUsize::new(0),
counter_value: AtomicUsize::new(0),
valid: AtomicBool::new(false),
}
}
fn save(&self, phase: usize, counter: usize) {
self.phase.store(phase, Ordering::Release);
self.counter_value.store(counter, Ordering::Release);
self.valid.store(true, Ordering::Release);
}
fn is_valid(&self) -> bool {
self.valid.load(Ordering::Acquire)
}
fn phase(&self) -> usize {
self.phase.load(Ordering::Acquire)
}
fn counter_value(&self) -> usize {
self.counter_value.load(Ordering::Acquire)
}
fn invalidate(&self) {
self.valid.store(false, Ordering::Release);
}
}
fn spawn_compaction_drain(
buffer: Arc<LoomDeltaBuffer>,
drained: Arc<AtomicUsize>,
compaction_ran: Arc<AtomicBool>,
) -> loom::thread::JoinHandle<()> {
thread::spawn(move || {
if buffer.try_start_compaction() {
compaction_ran.store(true, Ordering::Release);
let count = buffer.drain_for_compaction();
drained.store(count, Ordering::Release);
buffer.finish_compaction();
}
})
}
fn spawn_compaction_with_overlap_tracking(
buffer: Arc<LoomDeltaBuffer>,
active_compactions: Arc<AtomicUsize>,
overlap_detected: Arc<AtomicBool>,
compaction_count: Arc<AtomicUsize>,
) -> loom::thread::JoinHandle<()> {
thread::spawn(move || {
if buffer.try_start_compaction() {
let prev = active_compactions.fetch_add(1, Ordering::AcqRel);
if prev > 0 {
overlap_detected.store(true, Ordering::Release);
}
compaction_count.fetch_add(1, Ordering::Relaxed);
let _drained = buffer.drain_for_compaction();
active_compactions.fetch_sub(1, Ordering::AcqRel);
buffer.finish_compaction();
}
})
}
fn spawn_compaction_with_flag(
buffer: Arc<LoomDeltaBuffer>,
compaction_finished: Arc<AtomicBool>,
) -> loom::thread::JoinHandle<()> {
thread::spawn(move || {
if buffer.try_start_compaction() {
let _drained = buffer.drain_for_compaction();
buffer.finish_compaction();
compaction_finished.store(true, Ordering::Release);
}
})
}
fn spawn_compaction_with_checkpoint(
buffer: Arc<LoomDeltaBuffer>,
checkpoint: Arc<LoomCheckpoint>,
) -> loom::thread::JoinHandle<()> {
thread::spawn(move || {
if buffer.try_start_compaction() {
let count = buffer.edge_count();
checkpoint.save(1, count);
let drained = buffer.drain_for_compaction();
checkpoint.save(2, drained);
buffer.finish_compaction();
checkpoint.invalidate();
}
})
}
fn spawn_add_edge(buffer: Arc<LoomDeltaBuffer>) -> loom::thread::JoinHandle<u64> {
thread::spawn(move || buffer.add_edge())
}
fn join_threads(threads: [loom::thread::JoinHandle<()>; 2]) {
for thread in threads {
thread.join().unwrap();
}
}
fn join_three_threads(threads: [loom::thread::JoinHandle<()>; 3]) {
for thread in threads {
thread.join().unwrap();
}
}
#[test]
fn test_cp11_drain_during_compaction() {
loom::model(|| {
let buffer = Arc::new(LoomDeltaBuffer::new());
buffer.add_edge();
buffer.add_edge();
assert_eq!(buffer.edge_count(), 2);
let buffer1 = Arc::clone(&buffer);
let buffer2 = Arc::clone(&buffer);
let drained = Arc::new(AtomicUsize::new(0));
let drained1 = Arc::clone(&drained);
let compaction_ran = Arc::new(AtomicBool::new(false));
let cr = Arc::clone(&compaction_ran);
let t1 = spawn_compaction_drain(buffer1, drained1, cr);
let t2 = spawn_add_edge(buffer2);
join_threads([t1, t2]);
let d = drained.load(Ordering::Acquire);
let c = buffer.edge_count();
let ran = compaction_ran.load(Ordering::Acquire);
if ran {
assert!(d >= 2, "Should drain at least initial edges");
assert!(
d + c >= 2,
"Should have at least initial edges accounted for"
);
} else {
assert_eq!(c, 3, "All edges in buffer without compaction");
}
});
}
#[test]
fn test_cp12_concurrent_compaction_triggers() {
loom::model(|| {
let buffer = Arc::new(LoomDeltaBuffer::new());
for _ in 0..3 {
buffer.add_edge();
}
let buffer1 = Arc::clone(&buffer);
let buffer2 = Arc::clone(&buffer);
let compaction_count = Arc::new(AtomicUsize::new(0));
let count1 = Arc::clone(&compaction_count);
let count2 = Arc::clone(&compaction_count);
let active_compactions = Arc::new(AtomicUsize::new(0));
let active1 = Arc::clone(&active_compactions);
let active2 = Arc::clone(&active_compactions);
let overlap_detected = Arc::new(AtomicBool::new(false));
let overlap1 = Arc::clone(&overlap_detected);
let overlap2 = Arc::clone(&overlap_detected);
let t1 = spawn_compaction_with_overlap_tracking(buffer1, active1, overlap1, count1);
let t2 = spawn_compaction_with_overlap_tracking(buffer2, active2, overlap2, count2);
join_threads([t1, t2]);
assert!(
!overlap_detected.load(Ordering::Acquire),
"Mutual exclusion violated: concurrent compactions detected"
);
assert!(!buffer.is_compacting(), "Compaction should be finished");
let count = compaction_count.load(Ordering::Relaxed);
assert!(
count >= 1 && count <= 2,
"At least one compaction should succeed"
);
});
}
#[test]
fn test_cp13_snapshot_consistency() {
loom::model(|| {
let buffer = Arc::new(LoomDeltaBuffer::new());
buffer.add_edge();
buffer.add_edge();
let buffer1 = Arc::clone(&buffer);
let buffer2 = Arc::clone(&buffer);
let snapshot_valid = Arc::new(AtomicBool::new(true));
let snapshot1 = Arc::clone(&snapshot_valid);
let t1 = thread::spawn(move || {
if buffer1.try_start_compaction() {
let _drained = buffer1.drain_for_compaction();
buffer1.finish_compaction();
}
});
let t2 = thread::spawn(move || {
let snap1 = buffer2.edge_count();
let snap2 = buffer2.staging_count();
if snap1 > 100 || snap2 > 100 {
snapshot1.store(false, Ordering::Relaxed);
}
});
join_threads([t1, t2]);
assert!(
snapshot_valid.load(Ordering::Relaxed),
"Snapshots should see valid values"
);
});
}
#[test]
fn test_cp14_counter_sync_post_compaction() {
loom::model(|| {
let buffer = Arc::new(LoomDeltaBuffer::new());
buffer.add_edge();
buffer.add_edge();
let buffer1 = Arc::clone(&buffer);
let buffer2 = Arc::clone(&buffer);
let buffer3 = Arc::clone(&buffer);
let compaction_finished = Arc::new(AtomicBool::new(false));
let cf = Arc::clone(&compaction_finished);
let t1 = spawn_compaction_with_flag(buffer1, cf);
let t2 = spawn_add_edge(buffer2);
let t3 = spawn_add_edge(buffer3);
join_three_threads([t1, t2, t3]);
let ec = buffer.edge_count();
let sc = buffer.staging_count();
if compaction_finished.load(Ordering::Acquire) {
assert!(ec + sc <= 4, "Total should not exceed initial + 2 writes");
} else {
assert_eq!(ec, 4, "All edges in buffer");
assert_eq!(sc, 0, "No staging without compaction");
}
});
}
#[test]
fn test_cp15_interrupted_compaction_recovery() {
loom::model(|| {
let buffer = Arc::new(LoomDeltaBuffer::new());
let checkpoint = Arc::new(LoomCheckpoint::new());
buffer.add_edge();
buffer.add_edge();
let buffer1 = Arc::clone(&buffer);
let checkpoint1 = Arc::clone(&checkpoint);
let _buffer2 = Arc::clone(&buffer); let checkpoint2 = Arc::clone(&checkpoint);
let t1 = spawn_compaction_with_checkpoint(buffer1, checkpoint1);
let t2 = thread::spawn(move || {
if checkpoint2.is_valid() {
let phase = checkpoint2.phase();
let _value = checkpoint2.counter_value();
assert!(phase <= 2, "Phase should be valid");
}
});
join_threads([t1, t2]);
assert!(!buffer.is_compacting());
});
}
#[test]
fn test_sequence_monotonicity() {
loom::model(|| {
let buffer = Arc::new(LoomDeltaBuffer::new());
let buffer1 = Arc::clone(&buffer);
let buffer2 = Arc::clone(&buffer);
let seq1 = Arc::new(AtomicU64::new(0));
let seq2 = Arc::new(AtomicU64::new(0));
let s1 = Arc::clone(&seq1);
let s2 = Arc::clone(&seq2);
let t1 = thread::spawn(move || {
let seq = buffer1.add_edge();
s1.store(seq, Ordering::Relaxed);
});
let t2 = thread::spawn(move || {
let seq = buffer2.add_edge();
s2.store(seq, Ordering::Relaxed);
});
join_threads([t1, t2]);
let s1_val = seq1.load(Ordering::Relaxed);
let s2_val = seq2.load(Ordering::Relaxed);
assert_ne!(s1_val, s2_val, "Sequence numbers should be unique");
assert!(s1_val < 2 && s2_val < 2, "Sequences should be 0 or 1");
});
}
#[test]
fn test_csr_swap_simulation() {
loom::model(|| {
let csr_version = Arc::new(AtomicUsize::new(1));
let compacting = Arc::new(AtomicBool::new(false));
let csr1 = Arc::clone(&csr_version);
let comp1 = Arc::clone(&compacting);
let csr2 = Arc::clone(&csr_version);
let t1 = thread::spawn(move || {
if comp1
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
let new_version = csr1.load(Ordering::Acquire) + 1;
csr1.store(new_version, Ordering::Release);
comp1.store(false, Ordering::Release);
}
});
let t2 = thread::spawn(move || {
let version = csr2.load(Ordering::Acquire);
assert!(version == 1 || version == 2, "CSR version should be valid");
});
join_threads([t1, t2]);
});
}