#![cfg(loom)]
#![forbid(unsafe_code)]
use loom::sync::atomic::{AtomicUsize, Ordering};
use loom::sync::{Arc, Mutex};
use std::collections::HashMap;
struct SubRegistry {
subscribers: HashMap<u64, ()>,
}
#[derive(Clone)]
struct Sub {
state: Arc<Mutex<SubRegistry>>,
deactivate_count: Arc<AtomicUsize>,
is_producer: bool,
sub_id: u64,
}
impl Sub {
fn unsubscribe(&self) {
let was_last = {
let mut s = self.state.lock().unwrap();
s.subscribers.remove(&self.sub_id);
s.subscribers.is_empty()
};
if was_last && self.is_producer {
self.deactivate_count.fetch_add(1, Ordering::SeqCst);
}
}
}
#[test]
fn concurrent_drop_of_last_two_subs_fires_deactivate_exactly_once() {
loom::model(|| {
let registry = Arc::new(Mutex::new(SubRegistry {
subscribers: {
let mut m = HashMap::new();
m.insert(1u64, ());
m.insert(2u64, ());
m
},
}));
let count = Arc::new(AtomicUsize::new(0));
let sub_a = Sub {
state: registry.clone(),
deactivate_count: count.clone(),
is_producer: true,
sub_id: 1,
};
let sub_b = Sub {
state: registry,
deactivate_count: count.clone(),
is_producer: true,
sub_id: 2,
};
let h1 = loom::thread::spawn(move || sub_a.unsubscribe());
let h2 = loom::thread::spawn(move || sub_b.unsubscribe());
h1.join().unwrap();
h2.join().unwrap();
let final_count = count.load(Ordering::SeqCst);
assert_eq!(
final_count, 1,
"producer_deactivate must fire exactly once across all interleavings of the last two unsubscribes; got {final_count}"
);
});
}
#[test]
fn concurrent_drop_with_one_remaining_sub_does_not_fire_deactivate() {
loom::model(|| {
let registry = Arc::new(Mutex::new(SubRegistry {
subscribers: {
let mut m = HashMap::new();
m.insert(1u64, ());
m.insert(2u64, ());
m.insert(3u64, ()); m
},
}));
let count = Arc::new(AtomicUsize::new(0));
let sub_a = Sub {
state: registry.clone(),
deactivate_count: count.clone(),
is_producer: true,
sub_id: 1,
};
let sub_b = Sub {
state: registry,
deactivate_count: count.clone(),
is_producer: true,
sub_id: 2,
};
let h1 = loom::thread::spawn(move || sub_a.unsubscribe());
let h2 = loom::thread::spawn(move || sub_b.unsubscribe());
h1.join().unwrap();
h2.join().unwrap();
let final_count = count.load(Ordering::SeqCst);
assert_eq!(
final_count, 0,
"producer_deactivate must NOT fire while subs remain; got {final_count}"
);
});
}
#[test]
fn drop_of_last_sub_on_non_producer_node_does_not_deactivate() {
loom::model(|| {
let registry = Arc::new(Mutex::new(SubRegistry {
subscribers: {
let mut m = HashMap::new();
m.insert(1u64, ());
m
},
}));
let count = Arc::new(AtomicUsize::new(0));
let sub = Sub {
state: registry,
deactivate_count: count.clone(),
is_producer: false, sub_id: 1,
};
let h = loom::thread::spawn(move || sub.unsubscribe());
h.join().unwrap();
let final_count = count.load(Ordering::SeqCst);
assert_eq!(
final_count, 0,
"producer_deactivate must not fire for non-producer nodes; got {final_count}"
);
});
}