infinity_sampler/
buf.rs

1use core::mem::MaybeUninit;
2use heapless::Vec;
3
4pub use crate::rate::SamplingRate;
5
6/// # Infinity Sampler
7///
8/// See the [top-level doc](crate) for an example.
9///
10/// The sampling rate gets halved after every `N/2` stored values, which is the same
11/// as every `N*2^X` values observed by the sampler.
12///
13/// Feed the values into the reservoir using [sample()](Self::sample) and then
14/// turn it into an ordered iterator with [into_ordered_iter()](Self::into_ordered_iter).
15///
16/// The buffer size must be a power of two.
17#[derive(Clone)]
18pub struct SamplingReservoir<T, const N: usize> {
19    buf: Option<Vec<T, N>>,
20    sample_rate: SamplingRate,
21    inner_index: usize,
22    outer_index: usize,
23}
24
25impl<T, const N: usize> SamplingReservoir<T, N> {
26    const LOG_N: u32 = N.trailing_zeros();
27
28    // For panic-free `x % (N / 2) == 0` operation
29    const WRAPAROUND_MASK: usize = N / 2 - 1;
30
31    /// Creates a empty reservoir, allocating an uninitialized buffer.
32    /// Panics if `N` is not a power of two.
33    pub const fn new() -> Self {
34        assert!(N > 1);
35        assert!(
36            N.is_power_of_two(),
37            "Buffer capacity must be a power of two"
38        );
39        Self {
40            buf: Some(Vec::new()),
41            sample_rate: SamplingRate::new(1),
42            inner_index: 0,
43            outer_index: 0,
44        }
45    }
46
47    /// Returns N, the capacity of the internal buffer.
48    pub const fn capacity(&self) -> usize {
49        N
50    }
51
52    /// Get the number of currently stored items. Can be from 0 to N-1 and never decreases.
53    pub fn len(&self) -> usize {
54        unsafe { self.buf.as_ref().unwrap_unchecked() }.len()
55    }
56
57    pub fn is_empty(&self) -> bool {
58        self.len() == 0
59    }
60
61    /// Consume self and return the internal item buffer.
62    pub fn into_inner(mut self) -> Vec<T, N> {
63        let buf = self.buf.take();
64        unsafe { buf.unwrap_unchecked() }
65    }
66
67    /// Get a view into the occupied part of the internal buffer.
68    pub fn as_unordered_slice(&self) -> &[T] {
69        // SAFETY: values up to fill_level are initialized
70        let buf = unsafe { self.buf.as_ref().unwrap_unchecked() };
71        &(buf)[..self.len()]
72    }
73
74    /// Return an iterator over
75    /// the items in chronological order - *O(N)*.
76    pub fn ordered_iter(&self) -> impl Iterator<Item = &T> {
77        ReservoirOrderedIter2 {
78            inner: ReservoirOrderedIndexIter {
79                pos: 0,
80                len: self.len(),
81                samples_seen: self.samples_seen(),
82                samples_stored: self.samples_stored(),
83            },
84            buf: self,
85        }
86    }
87
88    /// This is irreversible and consumes the reservoir.
89    pub fn into_ordered_iter(self) -> impl Iterator<Item = T> {
90        OwningReservoirOrderedIter {
91            inner: ReservoirOrderedIndexIter {
92                pos: 0,
93                len: self.len(),
94                samples_seen: self.samples_seen(),
95                samples_stored: self.samples_stored(),
96            },
97            buf: self.buf,
98        }
99    }
100
101    /// Returns a reference to the current sampling rate.
102    pub fn sampling_rate(&self) -> &SamplingRate {
103        &self.sample_rate
104    }
105
106    /// Returns the total number of samples written into the buffer since the beginning.
107    pub fn samples_stored(&self) -> usize {
108        self.inner_index
109    }
110
111    /// Returns the total number of samples observed by the sampler since the beginning.
112    pub fn samples_seen(&self) -> usize {
113        self.outer_index
114    }
115
116    pub(crate) fn storage_index_for_outer_index(outer_index: usize) -> usize {
117        match outer_index {
118            0 => 0,
119            i => ((i - 1) % (N - 1)) + 1,
120        }
121    }
122
123    #[allow(dead_code)]
124    pub(crate) fn should_sample(outer_index: usize) -> bool {
125        let significant_bits = usize::BITS - outer_index.leading_zeros();
126        let counter_bits = significant_bits.saturating_sub(Self::LOG_N);
127        let mask = (1 << counter_bits) - 1;
128        outer_index & mask == 0
129    }
130
131    /// Unconditionally stores a value in the reservoir.
132    pub(crate) fn write_at_outer_index(&mut self, outer_index: usize, value: T) {
133        let insert_index = Self::storage_index_for_outer_index(outer_index);
134
135        let buf = unsafe { self.buf.as_mut().unwrap_unchecked() };
136        if insert_index == buf.len() {
137            let _ = buf.push(value);
138        } else {
139            buf[insert_index] = value;
140        }
141    }
142
143    /// Observe a value and possibly store it - *O(1)*.
144    ///
145    /// Performs a sampling "step", consuming the value and storing it into the buffer,
146    /// or returning it back if it's discarded due to the sampling rate.
147    #[inline(never)]
148    pub fn sample(&mut self, value: T) -> SamplingOutcome<T> {
149        self.outer_index += 1;
150        if !self.sample_rate.step() {
151            return SamplingOutcome::Discarded(value);
152        }
153        let mut result = SamplingOutcome::Consumed;
154
155        if self.inner_index >= N && (self.inner_index - N) & Self::WRAPAROUND_MASK == 0 {
156            self.sample_rate.div(2);
157            result = SamplingOutcome::ConsumedAndRateReduced { factor: 2 };
158        }
159        self.inner_index += 1;
160        self.write_at_outer_index(self.outer_index - 1, value);
161        result
162    }
163}
164
165struct ReservoirOrderedIndexIter<const N: usize> {
166    pos: usize,
167    len: usize,
168    samples_stored: usize,
169    samples_seen: usize,
170}
171
172impl<const N: usize> ExactSizeIterator for ReservoirOrderedIndexIter<N> {}
173
174impl<const N: usize> Iterator for ReservoirOrderedIndexIter<N> {
175    type Item = usize;
176
177    fn next(&mut self) -> Option<Self::Item> {
178        if self.pos == self.len {
179            return None;
180        }
181
182        if self.samples_seen < N {
183            self.pos += 1;
184            return Some(self.pos - 1);
185        }
186
187        let log = usize::BITS - ((self.samples_seen - 1) / (N - 1)).leading_zeros() - 1;
188        let step_lower = 1 << log;
189        let step_upper = step_lower << 1;
190
191        let n_upper_steps = self.samples_stored % (N / 2);
192
193        let outer_index = if self.pos < n_upper_steps {
194            self.pos * step_upper
195        } else if self.pos < N - n_upper_steps {
196            n_upper_steps * step_upper + (self.pos - n_upper_steps) * step_lower
197        } else {
198            n_upper_steps * step_upper
199                + (N - n_upper_steps * 2) * step_lower
200                + (self.pos - (N - n_upper_steps)) * step_upper
201        };
202        let idx = SamplingReservoir::<(), N>::storage_index_for_outer_index(outer_index);
203        self.pos += 1;
204
205        Some(idx)
206    }
207
208    fn size_hint(&self) -> (usize, Option<usize>) {
209        (self.len - self.pos, Some(self.len - self.pos))
210    }
211}
212
213struct ReservoirOrderedIter2<'a, T, const N: usize> {
214    buf: &'a SamplingReservoir<T, N>,
215    inner: ReservoirOrderedIndexIter<N>,
216}
217
218impl<T, const N: usize> ExactSizeIterator for ReservoirOrderedIter2<'_, T, N> {}
219
220impl<'a, T, const N: usize> Iterator for ReservoirOrderedIter2<'a, T, N> {
221    type Item = &'a T;
222
223    fn next(&mut self) -> Option<Self::Item> {
224        let idx = self.inner.next()?;
225        Some(&self.buf.as_unordered_slice()[idx])
226    }
227
228    fn size_hint(&self) -> (usize, Option<usize>) {
229        self.inner.size_hint()
230    }
231}
232
233struct OwningReservoirOrderedIter<T, const N: usize> {
234    buf: Option<Vec<T, N>>,
235    inner: ReservoirOrderedIndexIter<N>,
236}
237
238impl<T, const N: usize> ExactSizeIterator for OwningReservoirOrderedIter<T, N> {}
239
240impl<T, const N: usize> OwningReservoirOrderedIter<T, N> {
241    fn get_item_ref(&mut self, idx: usize) -> &mut MaybeUninit<T> {
242        unsafe {
243            &mut *(self.buf.as_mut().unwrap_unchecked().as_mut_ptr().add(idx)
244                as *mut MaybeUninit<T>)
245        }
246    }
247}
248
249impl<T, const N: usize> Iterator for OwningReservoirOrderedIter<T, N> {
250    type Item = T;
251
252    fn next(&mut self) -> Option<Self::Item> {
253        let idx = self.inner.next()?;
254        Some(unsafe {
255            core::mem::replace(self.get_item_ref(idx), MaybeUninit::uninit()).assume_init()
256        })
257    }
258
259    fn size_hint(&self) -> (usize, Option<usize>) {
260        self.inner.size_hint()
261    }
262}
263
264impl<T, const N: usize> Drop for OwningReservoirOrderedIter<T, N> {
265    fn drop(&mut self) {
266        // Consume remaining items
267        for _ in self.by_ref() {}
268        core::mem::forget(self.buf.take());
269    }
270}
271
272pub enum SamplingOutcome<T> {
273    Consumed,
274    ConsumedAndRateReduced { factor: u32 },
275    Discarded(T),
276}