metrics_util/storage/
reservoir.rs1use metrics::atomics::AtomicU64;
4use std::{
5 cell::UnsafeCell,
6 sync::{
7 atomic::{
8 AtomicBool, AtomicUsize,
9 Ordering::{Acquire, Relaxed, Release},
10 },
11 Mutex,
12 },
13};
14
15use rand::{rngs::OsRng, Rng, SeedableRng};
16use rand_xoshiro::Xoshiro256StarStar;
17
18thread_local! {
19 static FAST_RNG: UnsafeCell<Xoshiro256StarStar> = {
20 UnsafeCell::new(Xoshiro256StarStar::try_from_rng(&mut OsRng).unwrap())
21 };
22}
23
24fn fastrand(upper: usize) -> usize {
25 FAST_RNG.with(|rng| {
26 let rng = unsafe { &mut *rng.get() };
29 rng.random_range(0..upper)
30 })
31}
32
33struct Reservoir {
34 values: Box<[AtomicU64]>,
35 count: AtomicUsize,
36}
37
38impl Reservoir {
39 fn with_capacity(capacity: usize) -> Self {
40 let mut values = Vec::with_capacity(capacity);
41 for _ in 0..capacity {
42 values.push(AtomicU64::new(0));
43 }
44
45 Self { values: values.into_boxed_slice(), count: AtomicUsize::new(0) }
46 }
47
48 fn push(&self, value: f64) {
49 let idx = self.count.fetch_add(1, Relaxed);
50 if idx < self.values.len() {
51 self.values[idx].store(value.to_bits(), Relaxed);
52 } else {
53 let maybe_idx = fastrand(idx);
54 if maybe_idx < self.values.len() {
55 self.values[maybe_idx].store(value.to_bits(), Relaxed);
56 }
57 }
58 }
59
60 fn drain(&self) -> Drain<'_> {
61 let unsampled_len = self.count.load(Relaxed);
62 let len = if unsampled_len > self.values.len() { self.values.len() } else { unsampled_len };
63 Drain { reservoir: self, unsampled_len, len, idx: 0 }
64 }
65}
66
67pub struct Drain<'a> {
69 reservoir: &'a Reservoir,
70 unsampled_len: usize,
71 len: usize,
72 idx: usize,
73}
74
75impl<'a> Drain<'a> {
76 pub fn sample_rate(&self) -> f64 {
86 if self.unsampled_len == self.len {
87 1.0
88 } else {
89 self.len as f64 / self.unsampled_len as f64
90 }
91 }
92}
93
94impl<'a> Iterator for Drain<'a> {
95 type Item = f64;
96
97 fn next(&mut self) -> Option<Self::Item> {
98 if self.idx < self.len {
99 let value = f64::from_bits(self.reservoir.values[self.idx].load(Relaxed));
100 self.idx += 1;
101 Some(value)
102 } else {
103 None
104 }
105 }
106}
107
108impl ExactSizeIterator for Drain<'_> {
109 fn len(&self) -> usize {
110 self.len - self.idx
111 }
112}
113
114impl<'a> Drop for Drain<'a> {
115 fn drop(&mut self) {
116 self.reservoir.count.store(0, Release);
117 }
118}
119
120pub struct AtomicSamplingReservoir {
133 primary: Reservoir,
134 secondary: Reservoir,
135 use_primary: AtomicBool,
136 swap: Mutex<()>,
137}
138
139impl AtomicSamplingReservoir {
140 pub fn new(size: usize) -> Self {
142 Self {
143 primary: Reservoir::with_capacity(size),
144 secondary: Reservoir::with_capacity(size),
145 use_primary: AtomicBool::new(true),
146 swap: Mutex::new(()),
147 }
148 }
149
150 pub fn is_empty(&self) -> bool {
152 let use_primary = self.use_primary.load(Acquire);
153 if use_primary {
154 self.primary.count.load(Relaxed) == 0
155 } else {
156 self.secondary.count.load(Relaxed) == 0
157 }
158 }
159
160 pub fn push(&self, value: f64) {
162 let use_primary = self.use_primary.load(Relaxed);
163 if use_primary {
164 self.primary.push(value);
165 } else {
166 self.secondary.push(value);
167 };
168 }
169
170 pub fn consume<F>(&self, mut f: F)
174 where
175 F: FnMut(Drain<'_>),
176 {
177 let _guard = self.swap.lock().unwrap();
178
179 let use_primary = self.use_primary.load(Acquire);
181 self.use_primary.store(!use_primary, Release);
182
183 let drain = if use_primary { self.primary.drain() } else { self.secondary.drain() };
185
186 f(drain);
187 }
188}