metrics_util/storage/
reservoir.rs

1//! An atomic sampling reservoir.
2
3use metrics::atomics::AtomicU64;
4use std::{
5    cell::UnsafeCell,
6    sync::{
7        atomic::{
8            AtomicBool, AtomicUsize,
9            Ordering::{Acquire, Relaxed, Release},
10        },
11        Mutex,
12    },
13};
14
15use rand::{rngs::OsRng, Rng, SeedableRng};
16use rand_xoshiro::Xoshiro256StarStar;
17
18thread_local! {
19    static FAST_RNG: UnsafeCell<Xoshiro256StarStar> = {
20        UnsafeCell::new(Xoshiro256StarStar::try_from_rng(&mut OsRng).unwrap())
21    };
22}
23
24fn fastrand(upper: usize) -> usize {
25    FAST_RNG.with(|rng| {
26        // SAFETY: We know it's safe to take a mutable reference since we're getting a pointer to a thread-local value,
27        // and the reference never outlives the closure executing on this thread.
28        let rng = unsafe { &mut *rng.get() };
29        rng.random_range(0..upper)
30    })
31}
32
33struct Reservoir {
34    values: Box<[AtomicU64]>,
35    count: AtomicUsize,
36}
37
38impl Reservoir {
39    fn with_capacity(capacity: usize) -> Self {
40        let mut values = Vec::with_capacity(capacity);
41        for _ in 0..capacity {
42            values.push(AtomicU64::new(0));
43        }
44
45        Self { values: values.into_boxed_slice(), count: AtomicUsize::new(0) }
46    }
47
48    fn push(&self, value: f64) {
49        let idx = self.count.fetch_add(1, Relaxed);
50        if idx < self.values.len() {
51            self.values[idx].store(value.to_bits(), Relaxed);
52        } else {
53            let maybe_idx = fastrand(idx);
54            if maybe_idx < self.values.len() {
55                self.values[maybe_idx].store(value.to_bits(), Relaxed);
56            }
57        }
58    }
59
60    fn drain(&self) -> Drain<'_> {
61        let unsampled_len = self.count.load(Relaxed);
62        let len = if unsampled_len > self.values.len() { self.values.len() } else { unsampled_len };
63        Drain { reservoir: self, unsampled_len, len, idx: 0 }
64    }
65}
66
67/// A draining iterator over the samples in a reservoir.
68pub struct Drain<'a> {
69    reservoir: &'a Reservoir,
70    unsampled_len: usize,
71    len: usize,
72    idx: usize,
73}
74
75impl<'a> Drain<'a> {
76    /// Returns the sample rate of the reservoir that produced this iterator.
77    ///
78    /// The sample rate is the ratio of the number of samples pushed into the reservoir to the number of samples held in
79    /// the reservoir. When the reservoir has not been filled, the sample rate is 1.0. When more samples have been
80    /// pushed into the reservoir than its overall capacity, the sample rate is `size / count`, where `size` is the
81    /// reservoir's capacity and `count` is the number of samples pushed into the reservoir.
82    ///
83    /// For example, if the reservoir holds 1,000 samples, and 100,000 values were pushed into the reservoir, the sample
84    /// rate would be 0.01 (100,000 / 1,000).
85    pub fn sample_rate(&self) -> f64 {
86        if self.unsampled_len == self.len {
87            1.0
88        } else {
89            self.len as f64 / self.unsampled_len as f64
90        }
91    }
92}
93
94impl<'a> Iterator for Drain<'a> {
95    type Item = f64;
96
97    fn next(&mut self) -> Option<Self::Item> {
98        if self.idx < self.len {
99            let value = f64::from_bits(self.reservoir.values[self.idx].load(Relaxed));
100            self.idx += 1;
101            Some(value)
102        } else {
103            None
104        }
105    }
106}
107
108impl ExactSizeIterator for Drain<'_> {
109    fn len(&self) -> usize {
110        self.len - self.idx
111    }
112}
113
114impl<'a> Drop for Drain<'a> {
115    fn drop(&mut self) {
116        self.reservoir.count.store(0, Release);
117    }
118}
119
120/// An atomic sampling reservoir.
121///
122/// [Reservoir sampling][rs] is a technique used to produce a statistically representative sample of a data stream, in a
123/// fixed space, without knowing the length of the stream in advance. `AtomicSamplingReservoir` is a thread-safe version of a
124/// sampling reservoir, based on Vitter's ["Algorithm R"][vitter_paper].
125///
126/// Utilizes an A/B-based storage mechanism to avoid contention between producers and the consumer, and a fast,
127/// thread-local PRNG ([Xoshiro256**][xoshiro256starstar]) to limit the per-call sampling overhead.
128///
129/// [rs]: https://en.wikipedia.org/wiki/Reservoir_sampling
130/// [vitter_paper]: https://www.cs.umd.edu/~samir/498/vitter.pdf
131/// [xoshiro256starstar]: https://prng.di.unimi.it
132pub struct AtomicSamplingReservoir {
133    primary: Reservoir,
134    secondary: Reservoir,
135    use_primary: AtomicBool,
136    swap: Mutex<()>,
137}
138
139impl AtomicSamplingReservoir {
140    /// Creates a new `AtomicSamplingReservoir` that stores up to `size` samples.
141    pub fn new(size: usize) -> Self {
142        Self {
143            primary: Reservoir::with_capacity(size),
144            secondary: Reservoir::with_capacity(size),
145            use_primary: AtomicBool::new(true),
146            swap: Mutex::new(()),
147        }
148    }
149
150    /// Returns `true` if the reservoir is empty.
151    pub fn is_empty(&self) -> bool {
152        let use_primary = self.use_primary.load(Acquire);
153        if use_primary {
154            self.primary.count.load(Relaxed) == 0
155        } else {
156            self.secondary.count.load(Relaxed) == 0
157        }
158    }
159
160    /// Pushes a sample into the reservoir.
161    pub fn push(&self, value: f64) {
162        let use_primary = self.use_primary.load(Relaxed);
163        if use_primary {
164            self.primary.push(value);
165        } else {
166            self.secondary.push(value);
167        };
168    }
169
170    /// Consumes all samples in the reservoir, passing them to the provided closure.
171    ///
172    /// The underlying storage is swapped before the closure is called, and the previous storage is consumed.
173    pub fn consume<F>(&self, mut f: F)
174    where
175        F: FnMut(Drain<'_>),
176    {
177        let _guard = self.swap.lock().unwrap();
178
179        // Swap the active reservoir.
180        let use_primary = self.use_primary.load(Acquire);
181        self.use_primary.store(!use_primary, Release);
182
183        // Consume the previous reservoir.
184        let drain = if use_primary { self.primary.drain() } else { self.secondary.drain() };
185
186        f(drain);
187    }
188}