use alloc::vec::Vec;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[non_exhaustive]
pub enum DriftDirection {
Increase,
Decrease,
}
#[derive(Debug, Clone, Copy, PartialEq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct PerFeatureCusumAlert {
pub feature_index: usize,
pub direction: DriftDirection,
pub magnitude: f64,
pub duration_samples: u64,
}
#[derive(Debug, Clone, Copy, PartialEq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[cfg_attr(
feature = "serde",
serde(try_from = "PerFeatureCusumAccumulatorShadow")
)]
pub struct PerFeatureCusumAccumulator {
pub s_pos: f64,
pub s_neg: f64,
pub reference: f64,
pub reference_set: bool,
pub drift_samples: u64,
}
#[cfg(feature = "serde")]
#[derive(serde::Serialize, serde::Deserialize)]
#[allow(clippy::missing_docs_in_private_items)]
struct PerFeatureCusumAccumulatorShadow {
s_pos: f64,
s_neg: f64,
reference: f64,
reference_set: bool,
drift_samples: u64,
}
#[cfg(feature = "serde")]
impl TryFrom<PerFeatureCusumAccumulatorShadow> for PerFeatureCusumAccumulator {
type Error = crate::error::RcfError;
fn try_from(raw: PerFeatureCusumAccumulatorShadow) -> Result<Self, Self::Error> {
if !raw.s_pos.is_finite() || !raw.s_neg.is_finite() || !raw.reference.is_finite() {
return Err(crate::error::RcfError::InvalidConfig(alloc::format!(
"PerFeatureCusumAccumulator: non-finite field (s_pos={}, s_neg={}, reference={})",
raw.s_pos,
raw.s_neg,
raw.reference
).into()));
}
if raw.s_pos < 0.0 || raw.s_neg < 0.0 {
return Err(crate::error::RcfError::InvalidConfig(alloc::format!(
"PerFeatureCusumAccumulator: cumulative sums must be non-negative (s_pos={}, s_neg={})",
raw.s_pos,
raw.s_neg
).into()));
}
Ok(Self {
s_pos: raw.s_pos,
s_neg: raw.s_neg,
reference: raw.reference,
reference_set: raw.reference_set,
drift_samples: raw.drift_samples,
})
}
}
impl PerFeatureCusumAccumulator {
#[must_use]
pub const fn new() -> Self {
Self {
s_pos: 0.0,
s_neg: 0.0,
reference: 0.0,
reference_set: false,
drift_samples: 0,
}
}
pub fn reset(&mut self) {
*self = Self::new();
}
#[must_use]
pub fn magnitude(&self) -> f64 {
self.s_pos.max(self.s_neg)
}
pub fn update(
&mut self,
value: f64,
slack: f64,
threshold: f64,
feature_index: usize,
) -> Option<PerFeatureCusumAlert> {
if !self.reference_set {
self.reference = value;
self.reference_set = true;
return None;
}
self.s_pos = (self.s_pos + (value - self.reference - slack)).max(0.0);
self.s_neg = (self.s_neg - (value - self.reference + slack)).max(0.0);
if self.s_pos > threshold || self.s_neg > threshold {
self.drift_samples += 1;
let (direction, magnitude) = if self.s_pos > self.s_neg {
(DriftDirection::Increase, self.s_pos)
} else {
(DriftDirection::Decrease, self.s_neg)
};
Some(PerFeatureCusumAlert {
feature_index,
direction,
magnitude,
duration_samples: self.drift_samples,
})
} else {
self.drift_samples = 0;
None
}
}
}
impl Default for PerFeatureCusumAccumulator {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, Copy, PartialEq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[cfg_attr(feature = "serde", serde(try_from = "PerFeatureCusumConfigShadow"))]
pub struct PerFeatureCusumConfig {
pub slack: f64,
pub threshold: f64,
}
#[cfg(feature = "serde")]
#[derive(serde::Serialize, serde::Deserialize)]
#[allow(clippy::missing_docs_in_private_items)]
struct PerFeatureCusumConfigShadow {
slack: f64,
threshold: f64,
}
#[cfg(feature = "serde")]
impl TryFrom<PerFeatureCusumConfigShadow> for PerFeatureCusumConfig {
type Error = crate::error::RcfError;
fn try_from(raw: PerFeatureCusumConfigShadow) -> Result<Self, Self::Error> {
if !raw.slack.is_finite() || raw.slack < 0.0 {
return Err(crate::error::RcfError::InvalidConfig(
alloc::format!(
"PerFeatureCusumConfig: slack must be finite and ≥ 0, got {}",
raw.slack
)
.into(),
));
}
if !raw.threshold.is_finite() || raw.threshold <= 0.0 {
return Err(crate::error::RcfError::InvalidConfig(
alloc::format!(
"PerFeatureCusumConfig: threshold must be finite and > 0, got {}",
raw.threshold
)
.into(),
));
}
Ok(Self {
slack: raw.slack,
threshold: raw.threshold,
})
}
}
impl Default for PerFeatureCusumConfig {
fn default() -> Self {
Self {
slack: 0.5,
threshold: 5.0,
}
}
}
#[derive(Debug, Clone)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct PerFeatureCusumResult<const D: usize> {
#[cfg_attr(feature = "serde", serde(with = "crate::serde_util::fixed_array_f64"))]
pub per_feature_magnitude: [f64; D],
pub max_magnitude: f64,
pub alerts: Vec<PerFeatureCusumAlert>,
}
#[derive(Debug, Clone)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct PerFeatureCusum<const D: usize> {
#[cfg_attr(feature = "serde", serde(with = "serde_accumulators"))]
accumulators: [PerFeatureCusumAccumulator; D],
config: PerFeatureCusumConfig,
total_samples: u64,
}
impl<const D: usize> PerFeatureCusum<D> {
#[must_use]
pub const fn new(config: PerFeatureCusumConfig) -> Self {
Self {
accumulators: [PerFeatureCusumAccumulator::new(); D],
config,
total_samples: 0,
}
}
#[must_use]
pub const fn config(&self) -> &PerFeatureCusumConfig {
&self.config
}
#[must_use]
pub const fn total_samples(&self) -> u64 {
self.total_samples
}
#[must_use]
pub const fn accumulators(&self) -> &[PerFeatureCusumAccumulator; D] {
&self.accumulators
}
#[must_use]
pub fn active_drifts(&self) -> usize {
self.accumulators
.iter()
.filter(|a| a.drift_samples > 0)
.count()
}
pub fn set_reference(&mut self, means: &[f64; D]) {
for (acc, &mean) in self.accumulators.iter_mut().zip(means.iter()) {
acc.reference = mean;
acc.reference_set = true;
}
}
#[must_use = "detector output should be checked — dropping it silently usually indicates a logic bug"]
pub fn observe(&mut self, input: &[f64; D]) -> PerFeatureCusumResult<D> {
let mut per_feature_magnitude = [0.0_f64; D];
let mut alerts: Vec<PerFeatureCusumAlert> = Vec::new();
for (i, &value) in input.iter().enumerate() {
let pre_magnitude = self.accumulators[i].magnitude();
per_feature_magnitude[i] = pre_magnitude;
if let Some(alert) =
self.accumulators[i].update(value, self.config.slack, self.config.threshold, i)
{
per_feature_magnitude[i] = alert.magnitude;
alerts.push(alert);
}
}
self.total_samples += 1;
let max_magnitude = per_feature_magnitude
.iter()
.copied()
.fold(0.0_f64, f64::max);
PerFeatureCusumResult {
per_feature_magnitude,
max_magnitude,
alerts,
}
}
pub fn reset(&mut self) {
for acc in &mut self.accumulators {
acc.reset();
}
self.total_samples = 0;
}
}
#[cfg(feature = "serde")]
mod serde_accumulators {
use super::PerFeatureCusumAccumulator;
use alloc::vec::Vec;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
pub fn serialize<S: Serializer, const D: usize>(
accs: &[PerFeatureCusumAccumulator; D],
s: S,
) -> Result<S::Ok, S::Error> {
accs.as_slice().serialize(s)
}
pub fn deserialize<'de, DSer: Deserializer<'de>, const D: usize>(
d: DSer,
) -> Result<[PerFeatureCusumAccumulator; D], DSer::Error> {
let v: Vec<PerFeatureCusumAccumulator> = Vec::deserialize(d)?;
if v.len() != D {
return Err(serde::de::Error::invalid_length(
v.len(),
&"expected D accumulators",
));
}
let mut out = [PerFeatureCusumAccumulator::new(); D];
for (slot, acc) in out.iter_mut().zip(v) {
*slot = acc;
}
Ok(out)
}
}
#[cfg(test)]
#[allow(clippy::float_cmp)]
mod tests {
use super::*;
#[test]
fn first_observation_seeds_reference() {
let mut det = PerFeatureCusum::<1>::new(PerFeatureCusumConfig {
slack: 0.5,
threshold: 5.0,
});
let out = det.observe(&[100.0]);
assert!(out.alerts.is_empty());
assert!(det.accumulators()[0].reference_set);
assert_eq!(det.accumulators()[0].reference, 100.0);
}
#[test]
fn no_alert_on_stable_signal() {
let mut det = PerFeatureCusum::<1>::new(PerFeatureCusumConfig {
slack: 0.5,
threshold: 5.0,
});
for _ in 0..100 {
let out = det.observe(&[100.0]);
assert!(out.alerts.is_empty());
}
}
#[test]
fn detects_upward_ramp() {
let mut det = PerFeatureCusum::<1>::new(PerFeatureCusumConfig {
slack: 0.5,
threshold: 5.0,
});
let _ = det.observe(&[100.0]);
let mut alerted = false;
for _ in 0..20 {
let out = det.observe(&[105.0]);
if let Some(alert) = out.alerts.first() {
assert_eq!(alert.direction, DriftDirection::Increase);
assert_eq!(alert.feature_index, 0);
alerted = true;
break;
}
}
assert!(alerted);
}
#[test]
fn detects_downward_ramp() {
let mut det = PerFeatureCusum::<1>::new(PerFeatureCusumConfig {
slack: 0.5,
threshold: 5.0,
});
let _ = det.observe(&[100.0]);
let mut alerted = false;
for _ in 0..20 {
let out = det.observe(&[95.0]);
if let Some(alert) = out.alerts.first() {
assert_eq!(alert.direction, DriftDirection::Decrease);
alerted = true;
break;
}
}
assert!(alerted);
}
#[test]
fn drift_samples_counter_grows_then_resets() {
let mut det = PerFeatureCusum::<1>::new(PerFeatureCusumConfig {
slack: 0.5,
threshold: 5.0,
});
let _ = det.observe(&[100.0]);
for _ in 0..20 {
let _ = det.observe(&[105.0]);
}
assert!(det.accumulators()[0].drift_samples > 0);
for _ in 0..250 {
let _ = det.observe(&[100.0]);
}
assert_eq!(det.accumulators()[0].drift_samples, 0);
}
#[test]
fn set_reference_overrides_auto_learn() {
let mut det = PerFeatureCusum::<2>::new(PerFeatureCusumConfig {
slack: 0.5,
threshold: 5.0,
});
det.set_reference(&[50.0, 100.0]);
assert!(det.accumulators()[0].reference_set);
assert_eq!(det.accumulators()[0].reference, 50.0);
for _ in 0..50 {
let out = det.observe(&[50.0, 100.0]);
assert!(out.alerts.is_empty());
}
}
#[test]
fn max_magnitude_picks_largest_feature() {
let mut det = PerFeatureCusum::<3>::new(PerFeatureCusumConfig {
slack: 0.5,
threshold: 5.0,
});
let _ = det.observe(&[0.0, 0.0, 0.0]);
for _ in 0..20 {
let _ = det.observe(&[0.0, 10.0, 0.0]);
}
let out = det.observe(&[0.0, 10.0, 0.0]);
assert_eq!(out.max_magnitude, out.per_feature_magnitude[1]);
assert!(out.per_feature_magnitude[1] > out.per_feature_magnitude[0]);
assert!(out.per_feature_magnitude[1] > out.per_feature_magnitude[2]);
}
#[test]
fn reset_clears_state() {
let mut det = PerFeatureCusum::<2>::new(PerFeatureCusumConfig {
slack: 0.5,
threshold: 5.0,
});
let _ = det.observe(&[100.0, 200.0]);
for _ in 0..20 {
let _ = det.observe(&[110.0, 220.0]);
}
assert!(det.active_drifts() > 0);
det.reset();
assert_eq!(det.total_samples(), 0);
assert_eq!(det.active_drifts(), 0);
for acc in det.accumulators() {
assert!(!acc.reference_set);
assert_eq!(acc.s_pos, 0.0);
assert_eq!(acc.s_neg, 0.0);
}
}
#[test]
fn active_drifts_counts_per_feature() {
let mut det = PerFeatureCusum::<2>::new(PerFeatureCusumConfig {
slack: 0.5,
threshold: 5.0,
});
let _ = det.observe(&[100.0, 100.0]);
for _ in 0..20 {
let _ = det.observe(&[110.0, 100.0]);
}
assert_eq!(det.active_drifts(), 1);
}
#[cfg(all(feature = "serde", feature = "postcard"))]
#[test]
fn postcard_roundtrip_preserves_state() {
let mut det = PerFeatureCusum::<3>::new(PerFeatureCusumConfig {
slack: 0.5,
threshold: 5.0,
});
let _ = det.observe(&[100.0, 200.0, 300.0]);
for _ in 0..10 {
let _ = det.observe(&[105.0, 200.0, 300.0]);
}
let bytes = postcard::to_allocvec(&det).expect("serde ok");
let back: PerFeatureCusum<3> = postcard::from_bytes(&bytes).expect("serde ok");
assert_eq!(back.total_samples(), det.total_samples());
assert_eq!(back.accumulators()[0].s_pos, det.accumulators()[0].s_pos);
assert_eq!(
back.accumulators()[0].reference,
det.accumulators()[0].reference
);
}
#[cfg(all(feature = "serde", feature = "postcard"))]
#[test]
fn deserialize_rejects_nan_in_accumulator() {
let bad = PerFeatureCusumAccumulatorShadow {
s_pos: f64::NAN,
s_neg: 0.0,
reference: 0.0,
reference_set: true,
drift_samples: 0,
};
let bytes = postcard::to_allocvec(&bad).unwrap();
let back: Result<PerFeatureCusumAccumulator, _> = postcard::from_bytes(&bytes);
assert!(back.is_err());
}
#[cfg(all(feature = "serde", feature = "postcard"))]
#[test]
fn deserialize_rejects_negative_cumsum() {
let bad = PerFeatureCusumAccumulatorShadow {
s_pos: -1.0,
s_neg: 0.0,
reference: 0.0,
reference_set: true,
drift_samples: 0,
};
let bytes = postcard::to_allocvec(&bad).unwrap();
let back: Result<PerFeatureCusumAccumulator, _> = postcard::from_bytes(&bytes);
assert!(back.is_err());
}
#[cfg(all(feature = "serde", feature = "postcard"))]
#[test]
fn deserialize_rejects_non_positive_threshold() {
let bad = PerFeatureCusumConfigShadow {
slack: 0.5,
threshold: 0.0,
};
let bytes = postcard::to_allocvec(&bad).unwrap();
let back: Result<PerFeatureCusumConfig, _> = postcard::from_bytes(&bytes);
assert!(back.is_err());
}
#[cfg(all(feature = "serde", feature = "postcard"))]
#[test]
fn deserialize_rejects_nan_threshold() {
let bad = PerFeatureCusumConfigShadow {
slack: 0.5,
threshold: f64::NAN,
};
let bytes = postcard::to_allocvec(&bad).unwrap();
let back: Result<PerFeatureCusumConfig, _> = postcard::from_bytes(&bytes);
assert!(back.is_err());
}
}