use std::collections::VecDeque;
use crate::error::{Result, TimeSeriesError};
#[derive(Debug, Clone)]
struct Xoshiro256 {
s: [u64; 4],
}
impl Xoshiro256 {
fn from_seed(seed: u64) -> Self {
let mut z = seed;
let mut s = [0u64; 4];
for si in s.iter_mut() {
z = z.wrapping_add(0x9e3779b97f4a7c15);
let mut tmp = z;
tmp = (tmp ^ (tmp >> 30)).wrapping_mul(0xbf58476d1ce4e5b9);
tmp = (tmp ^ (tmp >> 27)).wrapping_mul(0x94d049bb133111eb);
*si = tmp ^ (tmp >> 31);
}
Self { s }
}
fn next_u64(&mut self) -> u64 {
let result = self.s[1].wrapping_mul(5).rotate_left(7).wrapping_mul(9);
let t = self.s[1] << 17;
self.s[2] ^= self.s[0];
self.s[3] ^= self.s[1];
self.s[1] ^= self.s[2];
self.s[0] ^= self.s[3];
self.s[2] ^= t;
self.s[3] = self.s[3].rotate_left(45);
result
}
fn gen_range(&mut self, n: u64) -> u64 {
debug_assert!(n > 0);
let threshold = n.wrapping_neg() % n;
loop {
let r = self.next_u64();
if r >= threshold {
return r % n;
}
}
}
fn gen_f64(&mut self) -> f64 {
(self.next_u64() >> 11) as f64 * (1.0 / (1u64 << 53) as f64)
}
}
#[derive(Debug, Clone)]
pub struct ReservoirSampler<T: Clone> {
capacity: usize,
reservoir: Vec<T>,
n_seen: u64,
rng: Xoshiro256,
}
impl<T: Clone> ReservoirSampler<T> {
pub fn new(capacity: usize, seed: u64) -> Result<Self> {
if capacity == 0 {
return Err(TimeSeriesError::InvalidParameter {
name: "capacity".to_string(),
message: "Reservoir capacity must be at least 1".to_string(),
});
}
Ok(Self {
capacity,
reservoir: Vec::with_capacity(capacity),
n_seen: 0,
rng: Xoshiro256::from_seed(seed),
})
}
pub fn update(&mut self, item: T) {
self.n_seen += 1;
if self.reservoir.len() < self.capacity {
self.reservoir.push(item);
} else {
let j = self.rng.gen_range(self.n_seen) as usize;
if j < self.capacity {
self.reservoir[j] = item;
}
}
}
pub fn sample(&self) -> &[T] {
&self.reservoir
}
pub fn n_seen(&self) -> u64 {
self.n_seen
}
pub fn capacity(&self) -> usize {
self.capacity
}
pub fn is_full(&self) -> bool {
self.reservoir.len() == self.capacity
}
pub fn reset(&mut self) {
self.reservoir.clear();
self.n_seen = 0;
}
}
#[derive(Debug, Clone)]
pub struct WeightedReservoir<T: Clone> {
capacity: usize,
heap: Vec<(f64, T)>,
min_key: f64,
min_idx: usize,
n_seen: u64,
rng: Xoshiro256,
}
impl<T: Clone> WeightedReservoir<T> {
pub fn new(capacity: usize, seed: u64) -> Result<Self> {
if capacity == 0 {
return Err(TimeSeriesError::InvalidParameter {
name: "capacity".to_string(),
message: "Reservoir capacity must be at least 1".to_string(),
});
}
Ok(Self {
capacity,
heap: Vec::with_capacity(capacity),
min_key: 0.0,
min_idx: 0,
n_seen: 0,
rng: Xoshiro256::from_seed(seed),
})
}
pub fn update(&mut self, item: T, weight: f64) -> Result<()> {
if weight <= 0.0 {
return Err(TimeSeriesError::InvalidParameter {
name: "weight".to_string(),
message: "Weights must be strictly positive".to_string(),
});
}
self.n_seen += 1;
let u = self.rng.gen_f64().max(f64::EPSILON);
let key = u.ln() / weight;
if self.heap.len() < self.capacity {
self.heap.push((key, item));
if self.heap.len() == self.capacity {
self.refresh_min();
}
} else if key > self.min_key {
self.heap[self.min_idx] = (key, item);
self.refresh_min();
}
Ok(())
}
fn refresh_min(&mut self) {
self.min_key = f64::INFINITY;
self.min_idx = 0;
for (i, (k, _)) in self.heap.iter().enumerate() {
if *k < self.min_key {
self.min_key = *k;
self.min_idx = i;
}
}
}
pub fn sample(&self) -> &[(f64, T)] {
&self.heap
}
pub fn items(&self) -> Vec<&T> {
self.heap.iter().map(|(_, item)| item).collect()
}
pub fn n_seen(&self) -> u64 {
self.n_seen
}
pub fn capacity(&self) -> usize {
self.capacity
}
}
#[derive(Debug, Clone)]
pub struct TimedObservation {
pub value: f64,
pub timestamp: f64,
}
#[derive(Debug, Clone)]
pub struct TimeSeriesReservoir {
inner: WeightedReservoir<TimedObservation>,
decay_rate: f64,
current_time: f64,
}
impl TimeSeriesReservoir {
pub fn new(capacity: usize, decay_rate: f64, seed: u64) -> Result<Self> {
if decay_rate < 0.0 {
return Err(TimeSeriesError::InvalidParameter {
name: "decay_rate".to_string(),
message: "Decay rate must be non-negative".to_string(),
});
}
Ok(Self {
inner: WeightedReservoir::new(capacity, seed)?,
decay_rate,
current_time: 0.0,
})
}
pub fn update(&mut self, value: f64, timestamp: f64) -> Result<()> {
if timestamp < self.current_time {
return Err(TimeSeriesError::InvalidInput(
"Timestamps must be non-decreasing".to_string(),
));
}
self.current_time = timestamp;
let weight = if self.decay_rate == 0.0 {
1.0
} else {
1.0_f64.exp() };
self.inner.update(TimedObservation { value, timestamp }, weight)
}
pub fn sample(&self) -> Vec<&TimedObservation> {
self.inner.items()
}
pub fn current_time(&self) -> f64 {
self.current_time
}
pub fn capacity(&self) -> usize {
self.inner.capacity()
}
pub fn n_seen(&self) -> u64 {
self.inner.n_seen()
}
}
#[derive(Debug, Clone)]
pub struct StreamStats {
pub n_seen: u64,
pub reservoir: ReservoirSampler<f64>,
mean_acc: f64,
m2_acc: f64,
window: VecDeque<f64>,
window_size: usize,
}
impl StreamStats {
pub fn new(reservoir_capacity: usize, window_size: usize, seed: u64) -> Result<Self> {
if window_size == 0 {
return Err(TimeSeriesError::InvalidParameter {
name: "window_size".to_string(),
message: "Window size must be at least 1".to_string(),
});
}
Ok(Self {
n_seen: 0,
reservoir: ReservoirSampler::new(reservoir_capacity, seed)?,
mean_acc: 0.0,
m2_acc: 0.0,
window: VecDeque::with_capacity(window_size + 1),
window_size,
})
}
pub fn update(&mut self, x: f64) {
self.n_seen += 1;
let delta = x - self.mean_acc;
self.mean_acc += delta / self.n_seen as f64;
let delta2 = x - self.mean_acc;
self.m2_acc += delta * delta2;
self.reservoir.update(x);
self.window.push_back(x);
if self.window.len() > self.window_size {
self.window.pop_front();
}
}
pub fn mean(&self) -> f64 {
self.mean_acc
}
pub fn variance(&self) -> f64 {
if self.n_seen < 2 {
0.0
} else {
self.m2_acc / (self.n_seen - 1) as f64
}
}
pub fn std(&self) -> f64 {
self.variance().sqrt()
}
pub fn recent_window(&self) -> &VecDeque<f64> {
&self.window
}
pub fn window_mean(&self) -> f64 {
if self.window.is_empty() {
0.0
} else {
self.window.iter().sum::<f64>() / self.window.len() as f64
}
}
pub fn window_variance(&self) -> f64 {
if self.window.len() < 2 {
return 0.0;
}
let m = self.window_mean();
let var: f64 = self.window.iter().map(|&x| (x - m).powi(2)).sum::<f64>();
var / (self.window.len() - 1) as f64
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_reservoir_sampler_fills() {
let mut sampler = ReservoirSampler::<u32>::new(10, 42).expect("failed to create sampler");
for i in 0..5u32 {
sampler.update(i);
}
assert_eq!(sampler.sample().len(), 5);
assert!(!sampler.is_full());
for i in 5..15u32 {
sampler.update(i);
}
assert_eq!(sampler.sample().len(), 10);
assert!(sampler.is_full());
}
#[test]
fn test_reservoir_sampler_n_seen() {
let mut sampler = ReservoirSampler::<i32>::new(5, 0).expect("failed to create sampler");
for i in 0..100 {
sampler.update(i);
}
assert_eq!(sampler.n_seen(), 100);
assert_eq!(sampler.sample().len(), 5);
}
#[test]
fn test_reservoir_uniformity() {
let n_trials = 2000;
let mut counts = [0u32; 10];
for seed in 0..n_trials as u64 {
let mut sampler = ReservoirSampler::<usize>::new(5, seed).expect("failed to create sampler");
for i in 0..10 {
sampler.update(i);
}
for &item in sampler.sample() {
counts[item] += 1;
}
}
let expected = 1000.0f64;
for (i, &cnt) in counts.iter().enumerate() {
let diff = (cnt as f64 - expected).abs() / expected;
assert!(
diff < 0.15,
"Item {i} has biased count {cnt}, expected ≈{expected}"
);
}
}
#[test]
fn test_reservoir_invalid_capacity() {
assert!(ReservoirSampler::<i32>::new(0, 0).is_err());
}
#[test]
fn test_weighted_reservoir_basic() {
let mut wr = WeightedReservoir::<usize>::new(5, 7).expect("failed to create wr");
for i in 0..20 {
wr.update(i, (i + 1) as f64).expect("unexpected None or Err");
}
assert_eq!(wr.sample().len(), 5);
}
#[test]
fn test_weighted_reservoir_invalid_weight() {
let mut wr = WeightedReservoir::<i32>::new(5, 0).expect("failed to create wr");
assert!(wr.update(1, 0.0).is_err());
assert!(wr.update(1, -1.0).is_err());
}
#[test]
fn test_time_series_reservoir_basic() {
let mut tsr = TimeSeriesReservoir::new(20, 0.0, 0).expect("failed to create tsr");
for t in 0..200 {
tsr.update(t as f64, t as f64).expect("unexpected None or Err");
}
assert_eq!(tsr.sample().len(), 20);
assert_eq!(tsr.n_seen(), 200);
}
#[test]
fn test_time_series_reservoir_monotone_timestamps() {
let mut tsr = TimeSeriesReservoir::new(5, 0.1, 0).expect("failed to create tsr");
tsr.update(1.0, 1.0).expect("unexpected None or Err");
tsr.update(2.0, 2.0).expect("unexpected None or Err");
assert!(tsr.update(3.0, 1.5).is_err());
}
#[test]
fn test_stream_stats_basic() {
let mut ss = StreamStats::new(50, 100, 0).expect("failed to create ss");
for x in 1..=100 {
ss.update(x as f64);
}
assert_eq!(ss.n_seen, 100);
assert!((ss.mean() - 50.5).abs() < 1e-9, "mean = {}", ss.mean());
assert!(ss.std() > 0.0);
assert_eq!(ss.reservoir.sample().len(), 50);
assert_eq!(ss.recent_window().len(), 100);
}
#[test]
fn test_stream_stats_window_trimming() {
let mut ss = StreamStats::new(10, 5, 0).expect("failed to create ss");
for x in 0..20 {
ss.update(x as f64);
}
assert_eq!(ss.recent_window().len(), 5);
let w: Vec<f64> = ss.recent_window().iter().copied().collect();
assert_eq!(w, vec![15.0, 16.0, 17.0, 18.0, 19.0]);
}
#[test]
fn test_stream_stats_window_mean_var() {
let mut ss = StreamStats::new(10, 10, 0).expect("failed to create ss");
for &x in &[2.0f64, 4.0, 4.0, 4.0, 5.0, 5.0, 7.0, 9.0] {
ss.update(x);
}
let wm = ss.window_mean();
assert!((wm - 5.0).abs() < 1e-9, "window mean = {wm}");
}
}