1use core::mem::MaybeUninit;
2use heapless::Vec;
3
4pub use crate::rate::SamplingRate;
5
6#[derive(Clone)]
18pub struct SamplingReservoir<T, const N: usize> {
19 buf: Option<Vec<T, N>>,
20 sample_rate: SamplingRate,
21 inner_index: usize,
22 outer_index: usize,
23}
24
25impl<T, const N: usize> SamplingReservoir<T, N> {
26 const LOG_N: u32 = N.trailing_zeros();
27
28 const WRAPAROUND_MASK: usize = N / 2 - 1;
30
31 pub const fn new() -> Self {
34 assert!(N > 1);
35 assert!(
36 N.is_power_of_two(),
37 "Buffer capacity must be a power of two"
38 );
39 Self {
40 buf: Some(Vec::new()),
41 sample_rate: SamplingRate::new(1),
42 inner_index: 0,
43 outer_index: 0,
44 }
45 }
46
47 pub const fn capacity(&self) -> usize {
49 N
50 }
51
52 pub fn len(&self) -> usize {
54 unsafe { self.buf.as_ref().unwrap_unchecked() }.len()
55 }
56
57 pub fn is_empty(&self) -> bool {
58 self.len() == 0
59 }
60
61 pub fn into_inner(mut self) -> Vec<T, N> {
63 let buf = self.buf.take();
64 unsafe { buf.unwrap_unchecked() }
65 }
66
67 pub fn as_unordered_slice(&self) -> &[T] {
69 let buf = unsafe { self.buf.as_ref().unwrap_unchecked() };
71 &(buf)[..self.len()]
72 }
73
74 pub fn ordered_iter(&self) -> impl Iterator<Item = &T> {
77 ReservoirOrderedIter2 {
78 inner: ReservoirOrderedIndexIter {
79 pos: 0,
80 len: self.len(),
81 samples_seen: self.samples_seen(),
82 samples_stored: self.samples_stored(),
83 },
84 buf: self,
85 }
86 }
87
88 pub fn into_ordered_iter(self) -> impl Iterator<Item = T> {
90 OwningReservoirOrderedIter {
91 inner: ReservoirOrderedIndexIter {
92 pos: 0,
93 len: self.len(),
94 samples_seen: self.samples_seen(),
95 samples_stored: self.samples_stored(),
96 },
97 buf: self.buf,
98 }
99 }
100
101 pub fn sampling_rate(&self) -> &SamplingRate {
103 &self.sample_rate
104 }
105
106 pub fn samples_stored(&self) -> usize {
108 self.inner_index
109 }
110
111 pub fn samples_seen(&self) -> usize {
113 self.outer_index
114 }
115
116 pub(crate) fn storage_index_for_outer_index(outer_index: usize) -> usize {
117 match outer_index {
118 0 => 0,
119 i => ((i - 1) % (N - 1)) + 1,
120 }
121 }
122
123 #[allow(dead_code)]
124 pub(crate) fn should_sample(outer_index: usize) -> bool {
125 let significant_bits = usize::BITS - outer_index.leading_zeros();
126 let counter_bits = significant_bits.saturating_sub(Self::LOG_N);
127 let mask = (1 << counter_bits) - 1;
128 outer_index & mask == 0
129 }
130
131 pub(crate) fn write_at_outer_index(&mut self, outer_index: usize, value: T) {
133 let insert_index = Self::storage_index_for_outer_index(outer_index);
134
135 let buf = unsafe { self.buf.as_mut().unwrap_unchecked() };
136 if insert_index == buf.len() {
137 let _ = buf.push(value);
138 } else {
139 buf[insert_index] = value;
140 }
141 }
142
143 #[inline(never)]
148 pub fn sample(&mut self, value: T) -> SamplingOutcome<T> {
149 self.outer_index += 1;
150 if !self.sample_rate.step() {
151 return SamplingOutcome::Discarded(value);
152 }
153 let mut result = SamplingOutcome::Consumed;
154
155 if self.inner_index >= N && (self.inner_index - N) & Self::WRAPAROUND_MASK == 0 {
156 self.sample_rate.div(2);
157 result = SamplingOutcome::ConsumedAndRateReduced { factor: 2 };
158 }
159 self.inner_index += 1;
160 self.write_at_outer_index(self.outer_index - 1, value);
161 result
162 }
163}
164
165struct ReservoirOrderedIndexIter<const N: usize> {
166 pos: usize,
167 len: usize,
168 samples_stored: usize,
169 samples_seen: usize,
170}
171
172impl<const N: usize> ExactSizeIterator for ReservoirOrderedIndexIter<N> {}
173
174impl<const N: usize> Iterator for ReservoirOrderedIndexIter<N> {
175 type Item = usize;
176
177 fn next(&mut self) -> Option<Self::Item> {
178 if self.pos == self.len {
179 return None;
180 }
181
182 if self.samples_seen < N {
183 self.pos += 1;
184 return Some(self.pos - 1);
185 }
186
187 let log = usize::BITS - ((self.samples_seen - 1) / (N - 1)).leading_zeros() - 1;
188 let step_lower = 1 << log;
189 let step_upper = step_lower << 1;
190
191 let n_upper_steps = self.samples_stored % (N / 2);
192
193 let outer_index = if self.pos < n_upper_steps {
194 self.pos * step_upper
195 } else if self.pos < N - n_upper_steps {
196 n_upper_steps * step_upper + (self.pos - n_upper_steps) * step_lower
197 } else {
198 n_upper_steps * step_upper
199 + (N - n_upper_steps * 2) * step_lower
200 + (self.pos - (N - n_upper_steps)) * step_upper
201 };
202 let idx = SamplingReservoir::<(), N>::storage_index_for_outer_index(outer_index);
203 self.pos += 1;
204
205 Some(idx)
206 }
207
208 fn size_hint(&self) -> (usize, Option<usize>) {
209 (self.len - self.pos, Some(self.len - self.pos))
210 }
211}
212
213struct ReservoirOrderedIter2<'a, T, const N: usize> {
214 buf: &'a SamplingReservoir<T, N>,
215 inner: ReservoirOrderedIndexIter<N>,
216}
217
218impl<T, const N: usize> ExactSizeIterator for ReservoirOrderedIter2<'_, T, N> {}
219
220impl<'a, T, const N: usize> Iterator for ReservoirOrderedIter2<'a, T, N> {
221 type Item = &'a T;
222
223 fn next(&mut self) -> Option<Self::Item> {
224 let idx = self.inner.next()?;
225 Some(&self.buf.as_unordered_slice()[idx])
226 }
227
228 fn size_hint(&self) -> (usize, Option<usize>) {
229 self.inner.size_hint()
230 }
231}
232
233struct OwningReservoirOrderedIter<T, const N: usize> {
234 buf: Option<Vec<T, N>>,
235 inner: ReservoirOrderedIndexIter<N>,
236}
237
238impl<T, const N: usize> ExactSizeIterator for OwningReservoirOrderedIter<T, N> {}
239
240impl<T, const N: usize> OwningReservoirOrderedIter<T, N> {
241 fn get_item_ref(&mut self, idx: usize) -> &mut MaybeUninit<T> {
242 unsafe {
243 &mut *(self.buf.as_mut().unwrap_unchecked().as_mut_ptr().add(idx)
244 as *mut MaybeUninit<T>)
245 }
246 }
247}
248
249impl<T, const N: usize> Iterator for OwningReservoirOrderedIter<T, N> {
250 type Item = T;
251
252 fn next(&mut self) -> Option<Self::Item> {
253 let idx = self.inner.next()?;
254 Some(unsafe {
255 core::mem::replace(self.get_item_ref(idx), MaybeUninit::uninit()).assume_init()
256 })
257 }
258
259 fn size_hint(&self) -> (usize, Option<usize>) {
260 self.inner.size_hint()
261 }
262}
263
264impl<T, const N: usize> Drop for OwningReservoirOrderedIter<T, N> {
265 fn drop(&mut self) {
266 for _ in self.by_ref() {}
268 core::mem::forget(self.buf.take());
269 }
270}
271
272pub enum SamplingOutcome<T> {
273 Consumed,
274 ConsumedAndRateReduced { factor: u32 },
275 Discarded(T),
276}