#[cfg(all(feature = "sync", test))]
mod sync {
use hdrhistogram::{sync::SyncHistogram, Histogram};
use std::sync::{atomic, Arc};
use std::{thread, time};
const TRACKABLE_MAX: u64 = 3600 * 1000 * 1000;
const SIGFIG: u8 = 3;
const TEST_VALUE_LEVEL: u64 = 4;
#[test]
fn record_through() {
let mut h: SyncHistogram<_> = Histogram::<u64>::new_with_max(TRACKABLE_MAX, SIGFIG)
.unwrap()
.into();
h.record(TEST_VALUE_LEVEL).unwrap();
assert_eq!(h.count_at(TEST_VALUE_LEVEL), 1);
assert_eq!(h.len(), 1);
}
#[test]
fn recorder_drop() {
let mut h: SyncHistogram<_> = Histogram::<u64>::new_with_max(TRACKABLE_MAX, SIGFIG)
.unwrap()
.into();
let mut r = h.recorder();
let jh = thread::spawn(move || {
r += TEST_VALUE_LEVEL;
});
h.refresh();
assert_eq!(h.count_at(TEST_VALUE_LEVEL), 1);
assert_eq!(h.len(), 1);
jh.join().unwrap();
}
#[test]
fn record_nodrop() {
let mut h: SyncHistogram<_> = Histogram::<u64>::new_with_max(TRACKABLE_MAX, SIGFIG)
.unwrap()
.into();
let barrier = Arc::new(std::sync::Barrier::new(2));
let mut r = h.recorder();
let b = Arc::clone(&barrier);
let jh = thread::spawn(move || {
r += TEST_VALUE_LEVEL;
b.wait();
});
h.refresh();
assert_eq!(h.count_at(TEST_VALUE_LEVEL), 1);
assert_eq!(h.len(), 1);
barrier.wait();
jh.join().unwrap();
}
#[test]
fn phase_timeout() {
let mut h: SyncHistogram<_> = Histogram::<u64>::new_with_max(TRACKABLE_MAX, SIGFIG)
.unwrap()
.into();
h.record(TEST_VALUE_LEVEL).unwrap();
let mut r = h.recorder();
r += TEST_VALUE_LEVEL;
h.refresh_timeout(time::Duration::from_millis(100));
assert_eq!(h.count_at(TEST_VALUE_LEVEL), 1);
assert_eq!(h.len(), 1);
}
#[test]
fn recorder_drop_staged() {
let mut h: SyncHistogram<_> = Histogram::<u64>::new_with_max(TRACKABLE_MAX, SIGFIG)
.unwrap()
.into();
let barrier = Arc::new(std::sync::Barrier::new(2));
let mut r = h.recorder();
let b = Arc::clone(&barrier);
let jh = thread::spawn(move || {
let n = 10_000;
for _ in 0..n {
r += TEST_VALUE_LEVEL;
}
b.wait();
r += TEST_VALUE_LEVEL;
b.wait();
drop(r);
n + 1
});
h.refresh(); barrier.wait();
barrier.wait();
h.refresh(); let n = jh.join().unwrap();
h.refresh();
assert_eq!(h.count_at(TEST_VALUE_LEVEL), n);
assert_eq!(h.len(), n);
}
#[test]
fn phase_no_wait_after_drop() {
let mut h: SyncHistogram<_> = Histogram::<u64>::new_with_max(TRACKABLE_MAX, SIGFIG)
.unwrap()
.into();
{
let _ = h.recorder();
}
h.refresh();
assert_eq!(h.len(), 0);
}
#[test]
fn mt_record_static() {
let mut h: SyncHistogram<_> = Histogram::<u64>::new_with_max(TRACKABLE_MAX, SIGFIG)
.unwrap()
.into();
let n = 16;
let barrier = Arc::new(std::sync::Barrier::new(n + 1));
let jhs: Vec<_> = (0..n)
.map(|_| {
let mut r = h.recorder();
let barrier = Arc::clone(&barrier);
thread::spawn(move || {
let n = 100_000;
for _ in 0..n {
r += TEST_VALUE_LEVEL;
}
barrier.wait();
n
})
})
.collect();
barrier.wait();
h.refresh();
assert_eq!(h.len(), jhs.into_iter().map(|r| r.join().unwrap()).sum());
}
#[test]
fn refresh_times_out() {
let mut h: SyncHistogram<_> = Histogram::<u64>::new_with_max(TRACKABLE_MAX, SIGFIG)
.unwrap()
.into();
let _r = h.recorder();
h.refresh_timeout(time::Duration::from_millis(100));
}
#[test]
fn mt_record_dynamic() {
let mut h: SyncHistogram<_> = Histogram::<u64>::new_with_max(TRACKABLE_MAX, SIGFIG)
.unwrap()
.into();
let n = 16;
let barrier = Arc::new(std::sync::Barrier::new(n + 1));
let jhs: Vec<_> = (0..n)
.map(|_| {
let mut r = h.recorder();
let barrier = Arc::clone(&barrier);
thread::spawn(move || {
let n = 30_000;
for i in 0..n {
if i % 1_000 == 0 {
r = r.clone();
}
r += TEST_VALUE_LEVEL;
}
barrier.wait();
n as u64
})
})
.collect();
barrier.wait();
h.refresh();
assert_eq!(h.len(), jhs.into_iter().map(|r| r.join().unwrap()).sum());
}
#[test]
fn idle_recorder() {
let mut h: SyncHistogram<_> = Histogram::<u64>::new_with_max(TRACKABLE_MAX, SIGFIG)
.unwrap()
.into();
let barrier = Arc::new(std::sync::Barrier::new(2));
let mut r = h.recorder();
let i = r.idle();
h.refresh(); h.refresh(); drop(i);
let b = Arc::clone(&barrier);
let jh = thread::spawn(move || {
r += TEST_VALUE_LEVEL;
b.wait();
});
barrier.wait();
h.refresh();
assert_eq!(h.count_at(TEST_VALUE_LEVEL), 1);
assert_eq!(h.len(), 1);
jh.join().unwrap();
}
#[test]
fn clone_idle_recorder() {
let mut h: SyncHistogram<_> = Histogram::<u64>::new_with_max(TRACKABLE_MAX, SIGFIG)
.unwrap()
.into();
let done = Arc::new(atomic::AtomicBool::new(false));
let r = h.recorder().into_idle();
h.refresh(); h.refresh(); let mut r2 = r.recorder();
let d = Arc::clone(&done);
let jh = thread::spawn(move || {
let mut i = 0;
while !d.load(atomic::Ordering::SeqCst) {
r2 += TEST_VALUE_LEVEL;
i += 1;
}
i
});
h.refresh(); let mut r = r.activate();
let d = Arc::clone(&done);
let jh2 = thread::spawn(move || {
let mut i = 0;
while !d.load(atomic::Ordering::SeqCst) {
r += TEST_VALUE_LEVEL;
i += 1;
}
i
});
h.refresh();
done.store(true, atomic::Ordering::SeqCst);
h.refresh(); let n = jh.join().unwrap() + jh2.join().unwrap();
h.refresh();
assert_eq!(h.count_at(TEST_VALUE_LEVEL), n);
assert_eq!(h.len(), n);
}
#[test]
fn concurrent_writes() {
let mut h: SyncHistogram<_> = Histogram::<u64>::new_with_max(TRACKABLE_MAX, SIGFIG)
.unwrap()
.into();
h.record(TEST_VALUE_LEVEL).unwrap();
let mut r = h.recorder();
r += TEST_VALUE_LEVEL;
h.refresh_timeout(time::Duration::from_millis(100));
assert_eq!(h.count_at(TEST_VALUE_LEVEL), 1);
assert_eq!(h.len(), 1);
}
}