use nexus_stats_core::Direction;
#[cfg(feature = "alloc")]
macro_rules! impl_mosum_update {
(float, $ty:ty) => {
#[inline]
pub fn update(
&mut self,
sample: $ty,
) -> Result<Option<Direction>, nexus_stats_core::DataError> {
check_finite!(sample);
let target = self.target;
let head = self.head;
let window = self.window;
let sum = self.sum;
let deviation = sample - target;
let ring = self.ring_mut();
let new_sum = sum - ring[head] + deviation;
ring[head] = deviation;
self.sum = new_sum;
self.head = (head + 1) % window;
self.count += 1;
if self.count < self.min_samples {
return Ok(Option::None);
}
Ok(if self.sum > self.threshold {
Option::Some(Direction::Rising)
} else if self.sum < -self.threshold {
Option::Some(Direction::Falling)
} else {
Option::Some(Direction::Neutral)
})
}
};
(int, $ty:ty) => {
#[inline]
#[must_use]
pub fn update(&mut self, sample: $ty) -> Option<Direction> {
let target = self.target;
let head = self.head;
let window = self.window;
let sum = self.sum;
let deviation = sample - target;
let ring = self.ring_mut();
let new_sum = sum - ring[head] + deviation;
ring[head] = deviation;
self.sum = new_sum;
self.head = (head + 1) % window;
self.count += 1;
if self.count < self.min_samples {
return Option::None;
}
if self.sum > self.threshold {
Option::Some(Direction::Rising)
} else if self.sum < -self.threshold {
Option::Some(Direction::Falling)
} else {
Option::Some(Direction::Neutral)
}
}
};
}
#[cfg(feature = "alloc")]
macro_rules! impl_mosum {
($name:ident, $builder:ident, $ty:ty, $kind:tt, $zero:expr) => {
pub struct $name {
target: $ty,
threshold: $ty,
buffer: *mut $ty,
window: usize,
head: usize,
sum: $ty,
count: u64,
min_samples: u64,
}
unsafe impl Send for $name {}
impl $name {
#[inline]
fn ring(&self) -> &[$ty] {
unsafe { core::slice::from_raw_parts(self.buffer, self.window) }
}
#[inline]
fn ring_mut(&mut self) -> &mut [$ty] {
unsafe { core::slice::from_raw_parts_mut(self.buffer, self.window) }
}
}
#[doc = stringify!($name)]
#[derive(Debug, Clone)]
pub struct $builder {
target: $ty,
window: Option<usize>,
threshold: Option<$ty>,
min_samples: Option<u64>,
}
impl $name {
#[inline]
#[must_use]
pub fn builder(target: $ty) -> $builder {
$builder {
target,
window: Option::None,
threshold: Option::None,
min_samples: Option::None,
}
}
impl_mosum_update!($kind, $ty);
#[inline]
#[must_use]
pub fn sum(&self) -> $ty { self.sum }
#[inline]
#[must_use]
pub fn window_size(&self) -> usize { self.window }
#[inline]
#[must_use]
pub fn count(&self) -> u64 { self.count }
#[inline]
#[must_use]
pub fn is_primed(&self) -> bool { self.count >= self.min_samples }
#[inline]
pub fn reset(&mut self) {
self.ring_mut().fill($zero);
self.head = 0;
self.sum = $zero;
self.count = 0;
}
}
impl Drop for $name {
fn drop(&mut self) {
unsafe {
let _ = alloc::vec::Vec::from_raw_parts(self.buffer, 0, self.window);
}
}
}
impl Clone for $name {
fn clone(&self) -> Self {
let mut vec = alloc::vec![$zero; self.window];
vec.copy_from_slice(self.ring());
let mut cloned = core::mem::ManuallyDrop::new(vec);
let buffer = cloned.as_mut_ptr();
Self {
target: self.target,
threshold: self.threshold,
buffer,
window: self.window,
head: self.head,
sum: self.sum,
count: self.count,
min_samples: self.min_samples,
}
}
}
impl core::fmt::Debug for $name {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct(stringify!($name))
.field("window", &self.window)
.field("count", &self.count)
.field("sum", &self.sum)
.finish()
}
}
impl $builder {
#[inline]
#[must_use]
pub fn window_size(mut self, n: usize) -> Self {
self.window = Option::Some(n);
self
}
#[inline]
#[must_use]
pub fn threshold(mut self, threshold: $ty) -> Self {
self.threshold = Option::Some(threshold);
self
}
#[inline]
#[must_use]
pub fn min_samples(mut self, min: u64) -> Self {
self.min_samples = Option::Some(min);
self
}
#[inline]
pub fn build(self) -> Result<$name, nexus_stats_core::ConfigError> {
let window = self.window.ok_or(nexus_stats_core::ConfigError::Missing("window_size"))?;
if window == 0 {
return Err(nexus_stats_core::ConfigError::Invalid("window_size must be > 0"));
}
let threshold = self.threshold.ok_or(nexus_stats_core::ConfigError::Missing("threshold"))?;
if threshold <= $zero {
return Err(nexus_stats_core::ConfigError::Invalid("threshold must be positive"));
}
let min_samples = self.min_samples.unwrap_or(window as u64);
let mut vec = core::mem::ManuallyDrop::new(alloc::vec![$zero; window]);
let buffer = vec.as_mut_ptr();
Ok($name {
target: self.target,
threshold,
buffer,
window,
head: 0,
sum: $zero,
count: 0,
min_samples,
})
}
}
};
}
#[cfg(feature = "alloc")]
impl_mosum!(MosumF64, MosumF64Builder, f64, float, 0.0);
#[cfg(feature = "alloc")]
impl_mosum!(MosumF32, MosumF32Builder, f32, float, 0.0);
#[cfg(feature = "alloc")]
impl_mosum!(MosumI64, MosumI64Builder, i64, int, 0);
#[cfg(feature = "alloc")]
impl_mosum!(MosumI32, MosumI32Builder, i32, int, 0);
#[cfg(feature = "alloc")]
impl_mosum!(MosumI128, MosumI128Builder, i128, int, 0);
#[cfg(all(test, feature = "alloc"))]
#[allow(clippy::float_cmp)]
mod tests {
use super::*;
#[test]
fn no_detection_at_target() {
let mut mosum = MosumF64::builder(100.0)
.window_size(10)
.threshold(50.0)
.build()
.unwrap();
for _ in 0..10 {
let _ = mosum.update(100.0);
}
for _ in 0..100 {
assert_eq!(mosum.update(100.0).unwrap(), Some(Direction::Neutral));
}
}
#[test]
fn detects_upward_spike() {
let mut mosum = MosumF64::builder(100.0)
.window_size(10)
.threshold(50.0)
.build()
.unwrap();
for _ in 0..10 {
let _ = mosum.update(100.0);
}
let mut triggered = false;
for _ in 0..10 {
if mosum.update(110.0).unwrap() == Some(Direction::Rising) {
triggered = true;
break;
}
}
assert!(triggered, "should detect upward spike");
}
#[test]
fn transient_clears_after_window() {
let mut mosum = MosumF64::builder(100.0)
.window_size(5)
.threshold(40.0)
.build()
.unwrap();
for _ in 0..5 {
let _ = mosum.update(100.0);
}
for _ in 0..5 {
let _ = mosum.update(120.0);
}
for _ in 0..5 {
let _ = mosum.update(100.0);
}
assert!(
mosum.sum().abs() < 1e-10,
"sum should return to ~0, got {}",
mosum.sum()
);
}
#[test]
#[allow(clippy::float_cmp)]
fn reset_clears_state() {
let mut mosum = MosumF64::builder(100.0)
.window_size(10)
.threshold(50.0)
.build()
.unwrap();
for _ in 0..20 {
let _ = mosum.update(120.0);
}
mosum.reset();
assert_eq!(mosum.count(), 0);
assert_eq!(mosum.sum(), 0.0);
}
#[test]
fn clone_works() {
let mut mosum = MosumF64::builder(100.0)
.window_size(5)
.threshold(50.0)
.build()
.unwrap();
for _ in 0..5 {
let _ = mosum.update(110.0);
}
let cloned = mosum.clone();
assert_eq!(cloned.count(), mosum.count());
assert_eq!(cloned.sum(), mosum.sum());
}
#[test]
fn i64_basic() {
let mut mosum = MosumI64::builder(1000)
.window_size(5)
.threshold(100)
.build()
.unwrap();
for _ in 0..5 {
let _ = mosum.update(1000);
}
assert_eq!(mosum.update(1000), Some(Direction::Neutral));
}
#[test]
fn errors_without_threshold() {
let result = MosumF64::builder(100.0).window_size(10).build();
assert!(matches!(
result,
Err(nexus_stats_core::ConfigError::Missing("threshold"))
));
}
#[test]
fn errors_without_window() {
let result = MosumF64::builder(100.0).threshold(50.0).build();
assert!(matches!(
result,
Err(nexus_stats_core::ConfigError::Missing("window_size"))
));
}
#[test]
fn i128_basic() {
let mut mosum = MosumI128::builder(1000)
.window_size(5)
.threshold(100)
.build()
.unwrap();
for _ in 0..5 {
let _ = mosum.update(1000);
}
assert_eq!(mosum.update(1000), Some(Direction::Neutral));
}
#[test]
fn rejects_nan_and_inf() {
let mut mosum = MosumF64::builder(100.0)
.window_size(10)
.threshold(50.0)
.build()
.unwrap();
assert_eq!(
mosum.update(f64::NAN).unwrap_err(),
nexus_stats_core::DataError::NotANumber
);
assert_eq!(
mosum.update(f64::INFINITY).unwrap_err(),
nexus_stats_core::DataError::Infinite
);
assert_eq!(
mosum.update(f64::NEG_INFINITY).unwrap_err(),
nexus_stats_core::DataError::Infinite
);
assert_eq!(mosum.count(), 0);
}
}