use metrics::atomics::AtomicU64;
use std::{
cell::UnsafeCell,
sync::{
atomic::{
AtomicBool, AtomicUsize,
Ordering::{AcqRel, Acquire, Relaxed, Release},
},
Mutex,
},
};
use rand::{rngs::OsRng, Rng, SeedableRng};
use rand_xoshiro::Xoshiro256StarStar;
thread_local! {
static FAST_RNG: UnsafeCell<Xoshiro256StarStar> = {
UnsafeCell::new(Xoshiro256StarStar::try_from_rng(&mut OsRng).unwrap())
};
}
fn fastrand(upper: usize) -> usize {
FAST_RNG.with(|rng| {
let rng = unsafe { &mut *rng.get() };
rng.random_range(0..upper)
})
}
struct Reservoir {
values: Box<[AtomicU64]>,
count: AtomicUsize,
}
impl Reservoir {
fn with_capacity(capacity: usize) -> Self {
let mut values = Vec::with_capacity(capacity);
for _ in 0..capacity {
values.push(AtomicU64::new(0));
}
Self { values: values.into_boxed_slice(), count: AtomicUsize::new(0) }
}
fn push(&self, value: f64) {
let idx = self.count.fetch_add(1, Relaxed);
if idx < self.values.len() {
self.values[idx].store(value.to_bits(), Relaxed);
} else {
let maybe_idx = fastrand(idx);
if maybe_idx < self.values.len() {
self.values[maybe_idx].store(value.to_bits(), Relaxed);
}
}
}
fn drain(&self) -> Drain<'_> {
let unsampled_len = self.count.load(Relaxed);
let len = if unsampled_len > self.values.len() { self.values.len() } else { unsampled_len };
Drain { reservoir: self, unsampled_len, len, idx: 0 }
}
}
pub struct Drain<'a> {
reservoir: &'a Reservoir,
unsampled_len: usize,
len: usize,
idx: usize,
}
impl<'a> Drain<'a> {
pub fn sample_rate(&self) -> f64 {
if self.unsampled_len == self.len {
1.0
} else {
self.len as f64 / self.unsampled_len as f64
}
}
}
impl<'a> Iterator for Drain<'a> {
type Item = f64;
fn next(&mut self) -> Option<Self::Item> {
if self.idx < self.len {
let value = f64::from_bits(self.reservoir.values[self.idx].load(Relaxed));
self.idx += 1;
Some(value)
} else {
None
}
}
}
impl ExactSizeIterator for Drain<'_> {
fn len(&self) -> usize {
self.len - self.idx
}
}
impl<'a> Drop for Drain<'a> {
fn drop(&mut self) {
self.reservoir.count.store(0, Release);
}
}
pub struct AtomicSamplingReservoir {
primary: Reservoir,
secondary: Reservoir,
use_primary: AtomicBool,
swap: Mutex<()>,
}
impl AtomicSamplingReservoir {
pub fn new(size: usize) -> Self {
Self {
primary: Reservoir::with_capacity(size),
secondary: Reservoir::with_capacity(size),
use_primary: AtomicBool::new(true),
swap: Mutex::new(()),
}
}
pub fn is_empty(&self) -> bool {
let use_primary = self.use_primary.load(Acquire);
if use_primary {
self.primary.count.load(Relaxed) == 0
} else {
self.secondary.count.load(Relaxed) == 0
}
}
pub fn push(&self, value: f64) {
let use_primary = self.use_primary.load(Relaxed);
if use_primary {
self.primary.push(value);
} else {
self.secondary.push(value);
};
}
pub fn consume<F>(&self, mut f: F)
where
F: FnMut(Drain<'_>),
{
let _guard = self.swap.lock().unwrap();
let use_primary = self.use_primary.fetch_xor(true, AcqRel);
let drain = if use_primary { self.primary.drain() } else { self.secondary.drain() };
f(drain);
}
}