mod common;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Barrier};
use std::thread;
use net_kit::Net;
#[test]
fn net_is_send_and_sync() {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<Net>();
}
#[test]
fn concurrent_registration_yields_unique_handles() {
let rt = common::build_runtime();
let net = Arc::new(Net::new());
rt.block_on(async { net.start().await.unwrap() });
const THREADS: usize = 8;
const PER_THREAD: usize = 200;
let barrier = Arc::new(Barrier::new(THREADS));
let mut joins = Vec::new();
for _ in 0..THREADS {
let net = Arc::clone(&net);
let barrier = Arc::clone(&barrier);
joins.push(thread::spawn(move || {
barrier.wait();
let mut local = Vec::with_capacity(PER_THREAD);
for _ in 0..PER_THREAD {
local.push(
net.register(Box::new(|_| {}))
.unwrap()
.expect("register ok"),
);
}
local
}));
}
let mut all = Vec::new();
for j in joins {
all.extend(j.join().expect("worker thread panicked"));
}
assert_eq!(all.len(), THREADS * PER_THREAD);
let mut keys: Vec<String> = all.iter().map(|h| format!("{h:?}")).collect();
keys.sort();
keys.dedup();
assert_eq!(
keys.len(),
all.len(),
"concurrently allocated handles must be unique"
);
net.shutdown().unwrap();
}
#[test]
fn concurrent_mixed_operations() {
let rt = common::build_runtime();
let net = Arc::new(Net::new());
rt.block_on(async { net.start().await.unwrap() });
let seeded: Arc<std::sync::Mutex<Vec<_>>> = Arc::new(std::sync::Mutex::new(
(0..500)
.map(|_| net.register(Box::new(|_| {})).unwrap().unwrap())
.collect(),
));
let errors = Arc::new(AtomicUsize::new(0));
let mut joins = Vec::new();
for _ in 0..3 {
let net = Arc::clone(&net);
joins.push(thread::spawn(move || {
for _ in 0..300 {
if net.register(Box::new(|_| {})).unwrap().is_none() {
}
}
}));
}
{
let net = Arc::clone(&net);
let seeded = Arc::clone(&seeded);
joins.push(thread::spawn(move || {
loop {
let next = seeded.lock().unwrap().pop();
match next {
Some(h) => {
let _ = net.unregister(h);
}
None => break,
}
}
}));
}
for _ in 0..2 {
let net = Arc::clone(&net);
let errors = Arc::clone(&errors);
joins.push(thread::spawn(move || {
for _ in 0..500 {
let s = net.local_network_reachability().unwrap();
if !matches!(
s,
net_kit::NetworkStatus::Available | net_kit::NetworkStatus::Unavailable
) {
errors.fetch_add(1, Ordering::SeqCst);
}
}
}));
}
for j in joins {
j.join().expect("worker thread panicked");
}
assert_eq!(
errors.load(Ordering::SeqCst),
0,
"no invalid status observed"
);
net.shutdown().unwrap();
}
#[test]
fn shutdown_racing_with_operations_is_safe() {
let rt = common::build_runtime();
let net = Arc::new(Net::new());
rt.block_on(async { net.start().await.unwrap() });
let net_ops = Arc::clone(&net);
let worker = thread::spawn(move || {
for _ in 0..2_000 {
let _ = net_ops.register(Box::new(|_| {}));
let _ = net_ops.local_network_reachability();
let _ = net_ops.clear_all_listener();
}
});
net.shutdown().unwrap();
worker.join().expect("operations thread must not panic");
}