use delegate::delegate;
use crate::item::{InitializedItem, Item};
use crate::InfinitySamplerIndexer;
use core::num::NonZeroUsize;
pub use crate::rate::SamplingRate;
pub struct RawReservoir<T, const N: usize> {
buf: Option<[Item<T>; N]>,
iter: InfinitySamplerIndexer<N>,
fill_level: usize,
}
pub enum SamplingOutcome<T> {
Consumed,
Discarded(T),
}
impl<T: Clone, const N: usize> Clone for RawReservoir<T, N> {
fn clone(&self) -> Self {
Self {
buf: self.buf.clone(),
iter: self.iter.clone(),
fill_level: self.fill_level,
}
}
}
impl<T, const N: usize> RawReservoir<T, N> {
const INDEXING_LOOP_SIZE: usize = N * 2;
const IN_GROUP_BITS: u32 = (N / 2).trailing_zeros();
const EMPTY: Item<T> = Item::empty();
pub const fn new() -> Self {
assert!(
N.is_power_of_two(),
"Buffer capacity must be a power of two"
);
Self {
buf: Some([Self::EMPTY; N]),
iter: InfinitySamplerIndexer::new(),
fill_level: 0,
}
}
pub const fn capacity(&self) -> usize {
N
}
pub const fn len(&self) -> usize {
self.fill_level
}
pub fn into_inner(mut self) -> ([Item<T>; N], InfinitySamplerIndexer<N>) {
let buf = self.buf.take();
(unsafe { buf.unwrap_unchecked() }, self.iter)
}
pub fn as_unordered_slice(&self) -> &[InitializedItem<T>] {
unsafe {
&*(&self.buf.as_ref().unwrap_unchecked()[..self.fill_level] as *const [Item<T>]
as *const [InitializedItem<T>])
}
}
#[inline(always)]
#[allow(dead_code)]
const fn _pattern_base_exp_for_in_loop_index(in_loop_index: usize) -> usize {
in_loop_index >> Self::IN_GROUP_BITS
}
#[inline(always)]
#[allow(dead_code)]
pub(crate) const fn _pattern_base_for_in_loop_index(in_loop_index: usize) -> usize {
1 << (Self::_pattern_base_exp_for_in_loop_index(in_loop_index) as u32)
}
#[inline(always)]
#[allow(dead_code)]
pub(crate) const fn _pattern_index_for_in_loop_index(mut in_loop_index: usize) -> usize {
in_loop_index = in_loop_index % Self::INDEXING_LOOP_SIZE;
let pattern_base_exp = Self::_pattern_base_exp_for_in_loop_index(in_loop_index);
let pattern_base = Self::_pattern_base_for_in_loop_index(in_loop_index);
pattern_base
+ ((in_loop_index >> (Self::IN_GROUP_BITS as usize - pattern_base_exp))
& ((1 << pattern_base_exp) - 1))
}
pub(crate) fn _naive_storage_index(inner_index: usize) -> usize {
if inner_index < N {
inner_index
} else {
let in_loop_index = (inner_index - N) % Self::INDEXING_LOOP_SIZE;
let mut pattern_index = 1;
let mut group_size = N / 2;
let mut group_step = 2;
let mut group_instances = 1;
let mut remainder = in_loop_index;
'outer: loop {
for _ in 0..group_instances {
if remainder < group_size {
break 'outer;
}
pattern_index += 1;
remainder -= group_size;
}
group_size = (group_size / 2).max(1);
group_step *= 2;
group_instances *= 2;
}
pattern_index + remainder * group_step
}
}
pub(crate) const fn _optimized_storage_index(inner_index: usize) -> usize {
if inner_index < N {
inner_index
} else {
let in_loop_index = (inner_index - N) % Self::INDEXING_LOOP_SIZE;
let pattern_index = Self::_pattern_index_for_in_loop_index(in_loop_index);
let left_offset = pattern_index;
let pattern_base_exp = Self::_pattern_base_exp_for_in_loop_index(in_loop_index);
let pattern_base = Self::_pattern_base_for_in_loop_index(in_loop_index);
let group_size = N / 2 / pattern_base;
let pattern_offset = in_loop_index % group_size;
let pattern_step = 1 << (pattern_base_exp as u32 + 1);
let idx = left_offset + pattern_offset * pattern_step;
debug_assert!(idx < N);
idx
}
}
pub fn write(&mut self, value: T) {
let iter_position = self.iter.position();
let insert_index = unsafe { self.iter.next().unwrap_unchecked() };
unsafe {
self.buf.as_mut().unwrap_unchecked()[insert_index]
.write(NonZeroUsize::new_unchecked(iter_position + 1), value);
}
self.fill_level = self.fill_level.min(N - 1) + 1;
}
pub fn into_ordered_iter(mut self) -> ReservoirOrderedIter<T, N> {
unsafe { self.buf.as_mut().unwrap_unchecked() }.sort_unstable_by_key(|x| {
match x.insertion_index {
Some(x) => x.into(),
None => usize::MAX,
}
});
ReservoirOrderedIter {
buf: unsafe { self.buf.take().unwrap_unchecked() },
len: self.len(),
pos: 0,
}
}
}
pub struct ReservoirOrderedIter<T, const N: usize> {
buf: [Item<T>; N],
len: usize,
pos: usize,
}
impl<T, const N: usize> ReservoirOrderedIter<T, N> {
pub fn len(&self) -> usize {
self.len
}
pub fn as_slice(&self) -> &[InitializedItem<T>] {
unsafe { &*(&self.buf[..self.len] as *const [Item<T>] as *const [InitializedItem<T>]) }
}
}
impl<T, const N: usize> ExactSizeIterator for ReservoirOrderedIter<T, N> {}
impl<T, const N: usize> Iterator for ReservoirOrderedIter<T, N> {
type Item = T;
fn next(&mut self) -> Option<Self::Item> {
if self.pos == self.len {
return None;
}
let idx = self.pos;
self.pos += 1;
Some(unsafe { self.buf[idx].take_unchecked() })
}
fn size_hint(&self) -> (usize, Option<usize>) {
(self.len - self.pos, Some(self.len - self.pos))
}
}
#[derive(Clone)]
pub struct SamplingReservoir<T, const N: usize> {
inner: RawReservoir<T, N>,
sample_rate: SamplingRate,
}
impl<T, const N: usize> SamplingReservoir<T, N> {
delegate! {
to self.inner {
pub const fn capacity(&self) -> usize;
pub const fn len(&self) -> usize;
pub fn into_inner(self) -> ([Item<T>; N], InfinitySamplerIndexer<N>);
pub fn as_unordered_slice(&self) -> &[InitializedItem<T>];
pub fn into_ordered_iter(self) -> ReservoirOrderedIter<T, N>;
}
}
pub const fn new() -> Self {
Self {
inner: RawReservoir::new(),
sample_rate: SamplingRate::new(1),
}
}
pub fn sampling_rate(&self) -> &SamplingRate {
&self.sample_rate
}
pub fn sample(&mut self, value: T) -> SamplingOutcome<T> {
if self.sample_rate.step() {
if self.inner.iter.position() >= N && (self.inner.iter.position() - N) % (N / 2) == 0 {
self.sample_rate.div(2);
}
self.inner.write(value);
SamplingOutcome::Consumed
} else {
SamplingOutcome::Discarded(value)
}
}
}