use crate::item::{InitializedItem, Item};
use core::hint::unreachable_unchecked;
use core::num::NonZeroUsize;
pub use crate::rate::SamplingRate;
#[derive(Clone)]
pub struct SamplingReservoir<T, const N: usize> {
buf: Option<[Item<T>; N]>,
fill_level: usize,
sample_rate: SamplingRate,
inner_index: usize,
outer_index: usize,
}
impl<T, const N: usize> SamplingReservoir<T, N> {
const EMPTY: Item<T> = Item::empty();
const LOG_N: u32 = N.trailing_zeros();
const WRAPAROUND_MASK: usize = N / 2 - 1;
pub const fn new() -> Self {
assert!(N > 1);
assert!(
N.is_power_of_two(),
"Buffer capacity must be a power of two"
);
Self {
buf: Some([Self::EMPTY; N]),
fill_level: 0,
sample_rate: SamplingRate::new(1),
inner_index: 0,
outer_index: 0,
}
}
pub const fn capacity(&self) -> usize {
N
}
pub const fn len(&self) -> usize {
self.fill_level
}
pub fn into_inner(mut self) -> [Item<T>; N] {
let buf = self.buf.take();
unsafe { buf.unwrap_unchecked() }
}
pub fn as_unordered_slice(&self) -> &[InitializedItem<T>] {
unsafe {
&*(&self.buf.as_ref().unwrap_unchecked()[..self.fill_level] as *const [Item<T>]
as *const [InitializedItem<T>])
}
}
pub fn into_ordered_iter(mut self) -> ReservoirOrderedIter<T, N> {
unsafe { self.buf.as_mut().unwrap_unchecked() }.sort_unstable_by_key(|x| {
match x.insertion_index {
Some(x) => x.into(),
None => usize::MAX,
}
});
ReservoirOrderedIter {
buf: unsafe { self.buf.take().unwrap_unchecked() },
len: self.len(),
pos: 0,
}
}
pub fn sampling_rate(&self) -> &SamplingRate {
&self.sample_rate
}
pub fn samples_stored(&self) -> usize {
self.inner_index
}
pub fn samples_seen(&self) -> usize {
self.outer_index
}
pub(crate) fn storage_index_for_outer_index(outer_index: usize) -> usize {
if N - 1 == 0 {
unsafe {
unreachable_unchecked();
}
}
match outer_index {
0 => 0,
i => ((i - 1) % (N - 1)) + 1,
}
}
#[allow(dead_code)]
pub(crate) fn should_sample(outer_index: usize) -> bool {
let significant_bits = usize::BITS - outer_index.leading_zeros();
let counter_bits = significant_bits.saturating_sub(Self::LOG_N);
let mask = (1 << counter_bits) - 1;
outer_index & mask == 0
}
pub(crate) fn write_at_outer_index(&mut self, outer_index: usize, value: T) {
let insert_index = Self::storage_index_for_outer_index(outer_index);
unsafe {
self.buf.as_mut().unwrap_unchecked()[insert_index]
.write(NonZeroUsize::new_unchecked(outer_index + 1), value);
}
self.fill_level = self.fill_level.min(N - 1) + 1;
}
#[inline(never)]
pub fn sample(&mut self, value: T) -> SamplingOutcome<T> {
self.outer_index += 1;
if !self.sample_rate.step() {
return SamplingOutcome::Discarded(value);
}
let mut result = SamplingOutcome::Consumed;
if self.inner_index >= N && (self.inner_index - N) & Self::WRAPAROUND_MASK == 0 {
self.sample_rate.div(2);
result = SamplingOutcome::ConsumedAndRateReduced { factor: 2 };
}
self.inner_index += 1;
self.write_at_outer_index(self.outer_index - 1, value);
result
}
}
pub struct ReservoirOrderedIter<T, const N: usize> {
buf: [Item<T>; N],
len: usize,
pos: usize,
}
impl<T, const N: usize> ReservoirOrderedIter<T, N> {
pub fn len(&self) -> usize {
self.len
}
pub fn as_slice(&self) -> &[InitializedItem<T>] {
unsafe { &*(&self.buf[..self.len] as *const [Item<T>] as *const [InitializedItem<T>]) }
}
}
impl<T, const N: usize> ExactSizeIterator for ReservoirOrderedIter<T, N> {}
impl<T, const N: usize> Iterator for ReservoirOrderedIter<T, N> {
type Item = T;
fn next(&mut self) -> Option<Self::Item> {
if self.pos == self.len {
return None;
}
let idx = self.pos;
self.pos += 1;
Some(unsafe { self.buf[idx].take_unchecked() })
}
fn size_hint(&self) -> (usize, Option<usize>) {
(self.len - self.pos, Some(self.len - self.pos))
}
}
pub enum SamplingOutcome<T> {
Consumed,
ConsumedAndRateReduced { factor: u32 },
Discarded(T),
}