1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242
//! Testing and benchmarking tools for concurrent Rust code
//!
//! This crate groups together a bunch of utilities which I've found useful when
//! testing and benchmarking Rust concurrency primitives in the triple_buffer
//! and spmc_buffer crates.
//!
//! If it proves popular, other testing and benchmarking tools may be added,
//! based on user demand.
//!
//! # Examples
//!
//! For examples of this crate at work, look at its "tests" and "benchs"
//! submodules, which showcase expected usage.
#![warn(
anonymous_parameters,
missing_copy_implementations,
missing_debug_implementations,
missing_docs,
nonstandard_style,
rust_2018_idioms,
single_use_lifetimes,
trivial_casts,
trivial_numeric_casts,
unreachable_pub,
unused_extern_crates,
unused_qualifications,
variant_size_differences
)]
pub mod noinline;
pub mod race_cell;
use crossbeam_utils::thread;
use std::sync::{
atomic::{AtomicBool, Ordering},
Barrier,
};
/// Test that running two operations concurrently works
///
/// When testing multi-threaded constructs, such as synchronization primitives,
/// checking that the code works when run sequentially is insufficient.
/// Concurrent interactions must also be tested, by running some operations
/// in parallel across multiple threads.
///
/// Ideally, this function would take as input a variable-size set of functors
/// to be run in parallel. Since Rust does not support variadic generics yet,
/// however, multiple versions of this function must be provided, each
/// associated with a different functor tuple size.
///
/// # Panics
///
/// This function will propagate panics from the inner functors.
///
pub fn concurrent_test_2(f1: impl FnOnce() + Send, f2: impl FnOnce() + Send) {
let barrier = Barrier::new(2);
thread::scope(|s| {
s.spawn(|_| {
barrier.wait();
noinline::call_once(f1);
});
barrier.wait();
noinline::call_once(f2);
})
.expect("Failed to join thread running f1");
}
/// Test that running three operations concurrently works
///
/// This is a variant of concurrent_test_2 that works with three functors
/// instead of two. It is hoped that future evolutions of Rust will render this
/// (light) code duplication obsolete, in favor of some variadic design.
///
/// # Panics
///
/// This function will propagate panics from the inner functors.
///
pub fn concurrent_test_3(
f1: impl FnOnce() + Send,
f2: impl FnOnce() + Send,
f3: impl FnOnce() + Send,
) {
let barrier = Barrier::new(3);
thread::scope(|s| {
s.spawn(|_| {
barrier.wait();
noinline::call_once(f1);
});
s.spawn(|_| {
barrier.wait();
noinline::call_once(f2);
});
barrier.wait();
noinline::call_once(f3);
})
.expect("Failed to join threads running f1 and f2");
}
/// Perform some operation while another is running in a loop in another thread
///
/// For multithreaded code, benchmarking the performance of isolated operations
/// is usually only half of the story. Synchronization and memory contention can
/// also have a large impact on performance.
///
/// For this reason, it is often useful to also measure the performance of one
/// operation as another "antagonist" operation is also running in a background
/// thread. This function helps you with setting up such an antagonist thread.
///
/// Note that the antagonist function must be designed in such a way as not to
/// be optimized out by the compiler when run in a tight loop. Here are some
/// ways to do this:
///
/// - You can hide the fact that the code is run in a loop by preventing the
/// compiler from inlining it there, see this crate's `noinline::call_mut()`.
/// - You can obscure the fact that inputs are always the same and outputs are
/// are not used by using `core::hint::black_box()` on nightly Rust, or its
/// emulation by the Criterion benchmarking crate.
/// - You can generate inputs that the compiler cannot guess using a random
/// number generator, and use your outputs by sending them through some sort
/// of reduction function (sum, min, max...) and checking the result.
///
pub fn run_under_contention<AntagonistResult, BenchmarkResult>(
mut antagonist: impl FnMut() -> AntagonistResult + Send,
mut benchmark: impl FnMut() -> BenchmarkResult,
) -> BenchmarkResult {
let start_barrier = Barrier::new(2);
let continue_flag = AtomicBool::new(true);
thread::scope(|s| {
s.spawn(|_| {
start_barrier.wait();
while continue_flag.load(Ordering::Relaxed) {
antagonist();
}
});
start_barrier.wait();
let result = benchmark();
continue_flag.store(false, Ordering::Relaxed);
result
})
.expect("Failed to join antagonist thread")
}
/// Examples of concurrent testing code
#[cfg(test)]
mod tests {
use std::{
sync::atomic::{AtomicUsize, Ordering},
time::Duration,
};
// Check the behaviour of concurrent atomic swaps and fetch-adds
#[test]
fn swap_and_fetch_add() {
// Amount of atomic operations to check
const ATOMIC_OPS_COUNT: usize = 100_000_000;
// Create a shared atomic variable
let atom = AtomicUsize::new(0);
// Check that concurrent atomic operations work correctly
let mut last_value = 0;
super::concurrent_test_2(
|| {
// One thread continuously increments the atomic variable...
for _ in 1..=ATOMIC_OPS_COUNT {
let former_atom = atom.fetch_add(1, Ordering::Relaxed);
assert!((former_atom == 0) || (former_atom == last_value));
last_value = former_atom + 1;
}
},
|| {
// ...as another continuously resets it to zero
for _ in 1..=ATOMIC_OPS_COUNT {
let former_atom = atom.swap(0, Ordering::Relaxed);
assert!(former_atom <= ATOMIC_OPS_COUNT);
}
},
);
}
// Check the behaviour of concurrent fetch-and/or/xors
#[test]
fn fetch_and_or_xor() {
// Amount of atomic operations to check
const ATOMIC_OPS_COUNT: usize = 30_000_000;
// Create a shared atomic variable. Even though this is an atomic Usize,
// we will only use the 16 low-order bits for maximal portability.
let atom = AtomicUsize::new(0);
// Masks used by each atomic operation
const AND_MASK: usize = 0b0000_0000_0000_0000; // Clear all bits
const XOR_MASK: usize = 0b0000_1111_0000_1111; // Flip some bits
const OR_MASK: usize = 0b1111_0000_1111_0000; // Set other bits
// Check that concurrent atomic operations work correctly by ensuring
// that at any point in time, only the 16 low-order bits can be set, and
// the grouped sets of bits in the masks above are either all set or
// all cleared in any observable state.
super::concurrent_test_3(
|| {
// One thread runs fetch-ands in a loop...
for _ in 1..=ATOMIC_OPS_COUNT {
let old_val = atom.fetch_and(AND_MASK, Ordering::Relaxed);
assert_eq!(old_val & 0b1111_1111_1111_1111, old_val);
assert!((old_val & XOR_MASK == XOR_MASK) || (old_val & XOR_MASK == 0));
assert!((old_val & OR_MASK == OR_MASK) || (old_val & OR_MASK == 0));
}
},
|| {
// ...another runs fetch-ors in a loop...
for _ in 1..=ATOMIC_OPS_COUNT {
let old_val = atom.fetch_or(OR_MASK, Ordering::Relaxed);
assert_eq!(old_val & 0b1111_1111_1111_1111, old_val);
assert!((old_val & XOR_MASK == XOR_MASK) || (old_val & XOR_MASK == 0));
assert!((old_val & OR_MASK == OR_MASK) || (old_val & OR_MASK == 0));
}
},
|| {
// ...and the last one runs fetch-xors in a loop...
for _ in 1..=ATOMIC_OPS_COUNT {
let old_val = atom.fetch_xor(XOR_MASK, Ordering::Relaxed);
assert_eq!(old_val & 0b1111_1111_1111_1111, old_val);
assert!((old_val & XOR_MASK == XOR_MASK) || (old_val & XOR_MASK == 0));
assert!((old_val & OR_MASK == OR_MASK) || (old_val & OR_MASK == 0));
}
},
);
}
// Show how adversarial code is actually run in concurrent "benchmarking"
#[test]
fn antagonist_showcase() {
let atom = AtomicUsize::new(0);
super::run_under_contention(
|| atom.fetch_add(1, Ordering::Relaxed),
|| std::thread::sleep(Duration::from_millis(100)),
);
assert!(atom.load(Ordering::Relaxed) > 100000);
}
}