#![allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
use crate::error::{MemoryError, Result};
use crate::hyperdim::HVec10240;
use crate::reservoir_sparse::SparseWeights;
use rand::rngs::StdRng;
use rand::{RngExt, SeedableRng};
#[cfg(all(not(target_arch = "wasm32"), feature = "parallel"))]
use rayon::prelude::*;
use std::sync::atomic::{AtomicU64, Ordering};
#[cfg(not(target_arch = "wasm32"))]
use {std::time::Instant, tracing::instrument};
#[derive(Debug, Default)]
struct ReservoirMetrics {
steps_total: AtomicU64,
step_latency_us_total: AtomicU64,
step_latency_count: AtomicU64,
nodes_active: AtomicU64,
}
#[derive(Debug, Clone, Default)]
pub struct ReservoirMetricsSnapshot {
pub reservoir_steps_total: u64,
pub avg_reservoir_step_latency_us: f64,
pub reservoir_nodes_active: u64,
}
pub struct ReservoirStepOutput<'a> {
pub state: &'a [f32],
pub state_norm: f64,
pub change_norm: f64,
}
impl ReservoirMetrics {
fn observe_step(&self, latency_us: u64, nodes_active: u64) {
self.steps_total.fetch_add(1, Ordering::Relaxed);
self.step_latency_us_total
.fetch_add(latency_us, Ordering::Relaxed);
self.step_latency_count.fetch_add(1, Ordering::Relaxed);
self.nodes_active.store(nodes_active, Ordering::Relaxed);
}
fn snapshot(&self) -> ReservoirMetricsSnapshot {
let count = self.step_latency_count.load(Ordering::Relaxed);
let total = self.step_latency_us_total.load(Ordering::Relaxed);
let avg = if count == 0 {
0.0
} else {
total as f64 / count as f64
};
ReservoirMetricsSnapshot {
reservoir_steps_total: self.steps_total.load(Ordering::Relaxed),
avg_reservoir_step_latency_us: avg,
reservoir_nodes_active: self.nodes_active.load(Ordering::Relaxed),
}
}
}
pub struct Reservoir {
size: usize,
input_size: usize,
state: Vec<f32>,
scratch: Vec<f32>,
prev_state: Vec<f32>, w_in: SparseWeights,
w_res: SparseWeights,
input_cache: Vec<f32>,
input_projection: Vec<f32>,
input_version: u32,
node_versions: Vec<u32>,
update_stride: usize,
update_phase: usize,
spectral_radius: f32,
alpha: f32,
pub(crate) beta: f32, state_norm_sq: f64, metrics: ReservoirMetrics,
}
impl Reservoir {
pub const DEFAULT_SIZE: usize = 50000;
pub const DEFAULT_RADIUS: f32 = 0.95;
pub const DEFAULT_ALPHA: f32 = 0.3;
const INPUT_DEGREE: usize = 4;
const RESERVOIR_DEGREE: usize = 8;
const RESERVOIR_LOCAL_WINDOW: usize = 512;
const PARTIAL_UPDATE_STRIDE: usize = 32;
pub const MAX_SIZE: usize = 100_000;
pub(crate) fn validate_params(size: usize, input_size: usize, chaos: f32) -> Result<()> {
if size == 0 || size > Self::MAX_SIZE {
return Err(MemoryError::InvalidInput {
field: "reservoir_size".into(),
reason: format!("must be 1..={}", Self::MAX_SIZE),
});
}
if input_size == 0 || input_size > Self::MAX_SIZE {
return Err(MemoryError::InvalidInput {
field: "reservoir_input_size".into(),
reason: format!("must be 1..={}", Self::MAX_SIZE),
});
}
if !chaos.is_finite() || chaos < 0.0 {
return Err(MemoryError::InvalidInput {
field: "chaos_strength".into(),
reason: "must be finite and non-negative".into(),
});
}
Ok(())
}
pub fn new(input_size: usize, size: usize) -> Result<Self> {
Self::new_seeded(input_size, size, rand::rng().random())
}
pub fn new_seeded(input_size: usize, size: usize, seed: u64) -> Result<Self> {
Self::validate_params(size, input_size, 0.0)?;
let mut rng = StdRng::seed_from_u64(seed);
let w_in = SparseWeights::build(size, input_size, Self::INPUT_DEGREE, &mut rng);
let mut w_res = SparseWeights::build_local_reservoir(
size,
Self::RESERVOIR_DEGREE.min(size),
Self::RESERVOIR_LOCAL_WINDOW.min(size),
&mut rng,
);
let current_radius = Self::estimate_spectral_radius(&w_res, size);
if current_radius > 0.0 {
let scale = Self::DEFAULT_RADIUS / current_radius;
w_res.scale(scale);
}
Ok(Self {
size,
input_size,
state: vec![0.0; size],
scratch: vec![0.0; size],
prev_state: vec![0.0; size], w_in,
w_res,
input_cache: vec![0.0; input_size],
input_projection: vec![0.0; size],
input_version: 0,
node_versions: vec![0; size],
update_stride: Self::PARTIAL_UPDATE_STRIDE,
update_phase: 0,
spectral_radius: Self::DEFAULT_RADIUS,
alpha: Self::DEFAULT_ALPHA,
beta: 0.0, state_norm_sq: 0.0,
metrics: ReservoirMetrics::default(),
})
}
#[cfg_attr(not(target_arch = "wasm32"), instrument(skip(self)))]
pub fn step(&mut self, input: &[f32]) -> Result<ReservoirStepOutput<'_>> {
#[cfg(not(target_arch = "wasm32"))]
let started = Instant::now();
if input.len() != self.input_size {
return Err(MemoryError::reservoir(format!(
"Input size mismatch: expected {}, got {}",
self.input_size,
input.len()
)));
}
if self.input_cache != input {
self.input_cache.copy_from_slice(input);
self.input_version = self.input_version.wrapping_add(1);
}
let one_minus_alpha = 1.0 - self.alpha;
let beta = self.beta;
let update_phase = self.update_phase;
let mut change_norm_sq = 0.0;
for i in (update_phase..self.size).step_by(self.update_stride) {
if self.node_versions[i] != self.input_version {
self.input_projection[i] = self.w_in.dot_row(i, input);
self.node_versions[i] = self.input_version;
}
let res_sum = self.w_res.dot_row(i, &self.state);
let activated = fast_tanh(self.input_projection[i] + res_sum);
let current_val = self.state[i];
let inertial = beta * (current_val - self.prev_state[i]);
self.scratch[i] =
current_val.mul_add(one_minus_alpha, activated.mul_add(self.alpha, inertial));
let diff = self.scratch[i] - current_val;
change_norm_sq += diff * diff;
}
for i in (update_phase..self.size).step_by(self.update_stride) {
let old_val = self.state[i];
let new_val = self.scratch[i];
self.prev_state[i] = old_val;
self.state[i] = new_val;
self.state_norm_sq +=
(f64::from(new_val)).mul_add(f64::from(new_val), -f64::from(old_val).powi(2));
}
if update_phase == 0 {
self.state_norm_sq = self.state.iter().map(|&x| f64::from(x).powi(2)).sum();
}
self.update_phase = (update_phase + 1) % self.update_stride;
#[cfg(not(target_arch = "wasm32"))]
let latency_us = u64::try_from(started.elapsed().as_micros()).unwrap_or(u64::MAX);
#[cfg(target_arch = "wasm32")]
let latency_us = 0;
self.metrics.observe_step(latency_us, self.size as u64);
Ok(ReservoirStepOutput {
state: &self.state,
state_norm: self.state_norm_sq.sqrt(),
change_norm: (change_norm_sq as f64).sqrt(),
})
}
#[cfg_attr(not(target_arch = "wasm32"), instrument(skip(self)))]
pub fn run(&mut self, inputs: &[Vec<f32>]) -> Result<Vec<Vec<f32>>> {
let mut states = Vec::with_capacity(inputs.len());
for input in inputs {
self.step(input)?;
states.push(self.state.clone());
}
Ok(states)
}
pub fn state(&self) -> &[f32] {
&self.state
}
#[cfg_attr(not(target_arch = "wasm32"), instrument(skip(self)))]
pub fn set_spectral_radius(&mut self, radius: f32) -> Result<()> {
if !(0.9..=1.1).contains(&radius) {
return Err(MemoryError::reservoir(
"Spectral radius must be in [0.9, 1.1]".to_string(),
));
}
let current = Self::estimate_spectral_radius(&self.w_res, self.size);
if current > 0.0 {
let scale = radius / current;
self.w_res.scale(scale);
self.spectral_radius = radius;
}
Ok(())
}
#[cfg_attr(not(target_arch = "wasm32"), instrument(skip(self)))]
pub fn reset(&mut self) {
self.state.fill(0.0);
self.scratch.fill(0.0);
self.prev_state.fill(0.0);
self.state_norm_sq = 0.0;
}
#[cfg_attr(
all(not(target_arch = "wasm32"), feature = "parallel"),
instrument(skip(self))
)]
#[cfg(all(not(target_arch = "wasm32"), feature = "parallel"))]
pub fn to_hypervector(&self) -> Result<HVec10240> {
if self.size < HVec10240::DIMENSION {
return Err(MemoryError::InvalidDimension {
expected: HVec10240::DIMENSION,
actual: self.size,
});
}
let chunk_size = self.size / HVec10240::DIMENSION;
let data: Vec<u128> = (0..80)
.into_par_iter()
.map(|i| {
let mut word = 0u128;
for j in 0..128 {
let bit_index = i * 128 + j;
let sum: f32 = self.state
[(bit_index * chunk_size)..(bit_index * chunk_size + chunk_size)]
.iter()
.sum();
if sum > 0.0 {
word |= 1u128 << j;
}
}
word
})
.collect();
let data: [u128; 80] = data.try_into().map_err(|_| {
MemoryError::reservoir(
"internal: par_iter produced unexpected element count".to_string(),
)
})?;
Ok(HVec10240 { data })
}
#[cfg(any(target_arch = "wasm32", not(feature = "parallel")))]
#[cfg_attr(not(target_arch = "wasm32"), instrument(skip(self)))]
pub fn to_hypervector(&self) -> Result<HVec10240> {
if self.size < HVec10240::DIMENSION {
return Err(MemoryError::InvalidDimension {
expected: HVec10240::DIMENSION,
actual: self.size,
});
}
let chunk_size = self.size / HVec10240::DIMENSION;
let mut data = [0u128; 80];
for (i, word) in data.iter_mut().enumerate() {
for j in 0..128 {
let bit_index = i * 128 + j;
let sum: f32 = self.state
[(bit_index * chunk_size)..(bit_index * chunk_size + chunk_size)]
.iter()
.sum();
if sum > 0.0 {
*word |= 1u128 << j;
}
}
}
Ok(HVec10240 { data })
}
pub const fn size(&self) -> usize {
self.size
}
pub fn metrics_snapshot(&self) -> ReservoirMetricsSnapshot {
self.metrics.snapshot()
}
fn estimate_spectral_radius(w: &SparseWeights, size: usize) -> f32 {
let mut v = vec![1.0f32 / size as f32; size];
let mut y = vec![0.0f32; size];
for _ in 0..16 {
for (i, y_i) in y.iter_mut().enumerate() {
*y_i = w.dot_row(i, &v);
}
let mut norm = 0.0f32;
for val in &y {
norm += val * val;
}
norm = norm.sqrt();
if norm.abs() < f32::EPSILON {
return 0.0;
}
for i in 0..size {
v[i] = y[i] / norm;
}
}
let mut wv = vec![0.0f32; size];
for (i, wv_i) in wv.iter_mut().enumerate() {
*wv_i = w.dot_row(i, &v);
}
let mut numerator = 0.0f32;
let mut denominator = 0.0f32;
for i in 0..size {
numerator += v[i] * wv[i];
denominator += v[i] * v[i];
}
if denominator.abs() < f32::EPSILON {
0.0
} else {
(numerator / denominator).abs()
}
}
}
#[inline]
fn fast_tanh(x: f32) -> f32 {
let x2 = x * x;
x2.mul_add(x, 27.0 * x) / x2.mul_add(9.0, 27.0)
}
pub struct ChaoticReservoir {
base: Reservoir,
chaos_strength: f32,
rng: StdRng,
noisy_input: Vec<f32>,
}
impl ChaoticReservoir {
pub fn new(input_size: usize, size: usize, chaos_strength: f32) -> Result<Self> {
let seed = rand::rng().random();
Self::new_seeded(input_size, size, chaos_strength, seed)
}
pub fn new_seeded(
input_size: usize,
size: usize,
chaos_strength: f32,
seed: u64,
) -> Result<Self> {
Reservoir::validate_params(size, input_size, chaos_strength)?;
let mut base = Reservoir::new_seeded(input_size, size, seed)?;
base.set_spectral_radius(1.0)?;
Ok(Self {
base,
chaos_strength,
rng: StdRng::seed_from_u64(seed ^ 0xA5A5_5A5A_F0F0_0F0F),
noisy_input: vec![0.0; input_size],
})
}
pub fn step(&mut self, input: &[f32]) -> Result<ReservoirStepOutput<'_>> {
if input.len() != self.noisy_input.len() {
return Err(MemoryError::reservoir(format!(
"Input size mismatch: expected {}, got {}",
self.noisy_input.len(),
input.len()
)));
}
for (i, value) in input.iter().enumerate() {
let noise = if self.chaos_strength > 0.0 {
self.rng
.random_range(-self.chaos_strength..self.chaos_strength)
} else {
0.0
};
self.noisy_input[i] = *value + noise;
}
self.base.step(&self.noisy_input)
}
pub fn reset(&mut self) {
self.base.reset();
}
pub fn state(&self) -> &[f32] {
self.base.state()
}
pub fn to_hypervector(&self) -> Result<HVec10240> {
self.base.to_hypervector()
}
pub fn metrics_snapshot(&self) -> ReservoirMetricsSnapshot {
self.base.metrics_snapshot()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn new_valid() {
let r = Reservoir::new(1024, 10240).unwrap();
assert_eq!(r.size(), 10240);
}
#[test]
fn new_invalid_size() {
assert!(Reservoir::new(1024, 0).is_err());
assert!(Reservoir::new(1024, 200_000).is_err());
}
#[test]
fn step_ok() {
let mut r = Reservoir::new(1024, 10240).unwrap();
let out = r.step(&[0.0; 1024]).unwrap();
assert_eq!(out.state.len(), 10240);
}
#[test]
fn reset_clears() {
let mut r = Reservoir::new(1024, 10240).unwrap();
r.step(&[1.0; 1024]).unwrap();
r.reset();
assert!(r.state().iter().all(|x| *x == 0.0));
}
#[test]
fn spectral_radius_bounds() {
let mut r = Reservoir::new(1024, 10240).unwrap();
assert!(r.set_spectral_radius(0.8).is_err());
r.set_spectral_radius(1.0).unwrap();
}
#[test]
fn metrics_steps() {
let mut r = Reservoir::new(1024, 10240).unwrap();
r.step(&[0.0; 1024]).unwrap();
assert_eq!(r.metrics_snapshot().reservoir_steps_total, 1);
}
#[test]
fn chaotic_new() {
let c = ChaoticReservoir::new(1024, 10240, 0.1).unwrap();
assert_eq!(c.state().len(), 10240);
}
}