1use alloc::vec::Vec;
34
35#[derive(Debug, Clone, Copy, PartialEq, Eq)]
37#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
38#[non_exhaustive]
39pub enum DriftDirection {
40 Increase,
42 Decrease,
44}
45
46#[derive(Debug, Clone, Copy, PartialEq)]
49#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
50pub struct PerFeatureCusumAlert {
51 pub feature_index: usize,
54 pub direction: DriftDirection,
56 pub magnitude: f64,
58 pub duration_samples: u64,
60}
61
62#[derive(Debug, Clone, Copy, PartialEq)]
64#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
65#[cfg_attr(
66 feature = "serde",
67 serde(try_from = "PerFeatureCusumAccumulatorShadow")
68)]
69pub struct PerFeatureCusumAccumulator {
70 pub s_pos: f64,
72 pub s_neg: f64,
74 pub reference: f64,
76 pub reference_set: bool,
78 pub drift_samples: u64,
81}
82
83#[cfg(feature = "serde")]
89#[derive(serde::Serialize, serde::Deserialize)]
90#[allow(clippy::missing_docs_in_private_items)]
91struct PerFeatureCusumAccumulatorShadow {
92 s_pos: f64,
93 s_neg: f64,
94 reference: f64,
95 reference_set: bool,
96 drift_samples: u64,
97}
98
99#[cfg(feature = "serde")]
100impl TryFrom<PerFeatureCusumAccumulatorShadow> for PerFeatureCusumAccumulator {
101 type Error = crate::error::RcfError;
102
103 fn try_from(raw: PerFeatureCusumAccumulatorShadow) -> Result<Self, Self::Error> {
104 if !raw.s_pos.is_finite() || !raw.s_neg.is_finite() || !raw.reference.is_finite() {
105 return Err(crate::error::RcfError::InvalidConfig(alloc::format!(
106 "PerFeatureCusumAccumulator: non-finite field (s_pos={}, s_neg={}, reference={})",
107 raw.s_pos,
108 raw.s_neg,
109 raw.reference
110 ).into()));
111 }
112 if raw.s_pos < 0.0 || raw.s_neg < 0.0 {
113 return Err(crate::error::RcfError::InvalidConfig(alloc::format!(
114 "PerFeatureCusumAccumulator: cumulative sums must be non-negative (s_pos={}, s_neg={})",
115 raw.s_pos,
116 raw.s_neg
117 ).into()));
118 }
119 Ok(Self {
120 s_pos: raw.s_pos,
121 s_neg: raw.s_neg,
122 reference: raw.reference,
123 reference_set: raw.reference_set,
124 drift_samples: raw.drift_samples,
125 })
126 }
127}
128
129impl PerFeatureCusumAccumulator {
130 #[must_use]
132 pub const fn new() -> Self {
133 Self {
134 s_pos: 0.0,
135 s_neg: 0.0,
136 reference: 0.0,
137 reference_set: false,
138 drift_samples: 0,
139 }
140 }
141
142 pub fn reset(&mut self) {
144 *self = Self::new();
145 }
146
147 #[must_use]
150 pub fn magnitude(&self) -> f64 {
151 self.s_pos.max(self.s_neg)
152 }
153
154 pub fn update(
158 &mut self,
159 value: f64,
160 slack: f64,
161 threshold: f64,
162 feature_index: usize,
163 ) -> Option<PerFeatureCusumAlert> {
164 if !self.reference_set {
165 self.reference = value;
166 self.reference_set = true;
167 return None;
168 }
169
170 self.s_pos = (self.s_pos + (value - self.reference - slack)).max(0.0);
171 self.s_neg = (self.s_neg - (value - self.reference + slack)).max(0.0);
172
173 if self.s_pos > threshold || self.s_neg > threshold {
174 self.drift_samples += 1;
175 let (direction, magnitude) = if self.s_pos > self.s_neg {
176 (DriftDirection::Increase, self.s_pos)
177 } else {
178 (DriftDirection::Decrease, self.s_neg)
179 };
180 Some(PerFeatureCusumAlert {
181 feature_index,
182 direction,
183 magnitude,
184 duration_samples: self.drift_samples,
185 })
186 } else {
187 self.drift_samples = 0;
188 None
189 }
190 }
191}
192
193impl Default for PerFeatureCusumAccumulator {
194 fn default() -> Self {
195 Self::new()
196 }
197}
198
199#[derive(Debug, Clone, Copy, PartialEq)]
201#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
202#[cfg_attr(feature = "serde", serde(try_from = "PerFeatureCusumConfigShadow"))]
203pub struct PerFeatureCusumConfig {
204 pub slack: f64,
207 pub threshold: f64,
210}
211
212#[cfg(feature = "serde")]
218#[derive(serde::Serialize, serde::Deserialize)]
219#[allow(clippy::missing_docs_in_private_items)]
220struct PerFeatureCusumConfigShadow {
221 slack: f64,
222 threshold: f64,
223}
224
225#[cfg(feature = "serde")]
226impl TryFrom<PerFeatureCusumConfigShadow> for PerFeatureCusumConfig {
227 type Error = crate::error::RcfError;
228
229 fn try_from(raw: PerFeatureCusumConfigShadow) -> Result<Self, Self::Error> {
230 if !raw.slack.is_finite() || raw.slack < 0.0 {
231 return Err(crate::error::RcfError::InvalidConfig(
232 alloc::format!(
233 "PerFeatureCusumConfig: slack must be finite and ≥ 0, got {}",
234 raw.slack
235 )
236 .into(),
237 ));
238 }
239 if !raw.threshold.is_finite() || raw.threshold <= 0.0 {
240 return Err(crate::error::RcfError::InvalidConfig(
241 alloc::format!(
242 "PerFeatureCusumConfig: threshold must be finite and > 0, got {}",
243 raw.threshold
244 )
245 .into(),
246 ));
247 }
248 Ok(Self {
249 slack: raw.slack,
250 threshold: raw.threshold,
251 })
252 }
253}
254
255impl Default for PerFeatureCusumConfig {
256 fn default() -> Self {
257 Self {
258 slack: 0.5,
259 threshold: 5.0,
260 }
261 }
262}
263
264#[derive(Debug, Clone)]
266#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
267pub struct PerFeatureCusumResult<const D: usize> {
268 #[cfg_attr(feature = "serde", serde(with = "crate::serde_util::fixed_array_f64"))]
271 pub per_feature_magnitude: [f64; D],
272 pub max_magnitude: f64,
274 pub alerts: Vec<PerFeatureCusumAlert>,
277}
278
279#[derive(Debug, Clone)]
297#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
298pub struct PerFeatureCusum<const D: usize> {
299 #[cfg_attr(feature = "serde", serde(with = "serde_accumulators"))]
301 accumulators: [PerFeatureCusumAccumulator; D],
302 config: PerFeatureCusumConfig,
304 total_samples: u64,
306}
307
308impl<const D: usize> PerFeatureCusum<D> {
309 #[must_use]
311 pub const fn new(config: PerFeatureCusumConfig) -> Self {
312 Self {
313 accumulators: [PerFeatureCusumAccumulator::new(); D],
314 config,
315 total_samples: 0,
316 }
317 }
318
319 #[must_use]
321 pub const fn config(&self) -> &PerFeatureCusumConfig {
322 &self.config
323 }
324
325 #[must_use]
327 pub const fn total_samples(&self) -> u64 {
328 self.total_samples
329 }
330
331 #[must_use]
333 pub const fn accumulators(&self) -> &[PerFeatureCusumAccumulator; D] {
334 &self.accumulators
335 }
336
337 #[must_use]
340 pub fn active_drifts(&self) -> usize {
341 self.accumulators
342 .iter()
343 .filter(|a| a.drift_samples > 0)
344 .count()
345 }
346
347 pub fn set_reference(&mut self, means: &[f64; D]) {
351 for (acc, &mean) in self.accumulators.iter_mut().zip(means.iter()) {
352 acc.reference = mean;
353 acc.reference_set = true;
354 }
355 }
356
357 #[must_use = "detector output should be checked — dropping it silently usually indicates a logic bug"]
360 pub fn observe(&mut self, input: &[f64; D]) -> PerFeatureCusumResult<D> {
361 let mut per_feature_magnitude = [0.0_f64; D];
362 let mut alerts: Vec<PerFeatureCusumAlert> = Vec::new();
363
364 for (i, &value) in input.iter().enumerate() {
365 let pre_magnitude = self.accumulators[i].magnitude();
366 per_feature_magnitude[i] = pre_magnitude;
367
368 if let Some(alert) =
369 self.accumulators[i].update(value, self.config.slack, self.config.threshold, i)
370 {
371 per_feature_magnitude[i] = alert.magnitude;
372 alerts.push(alert);
373 }
374 }
375
376 self.total_samples += 1;
377 let max_magnitude = per_feature_magnitude
378 .iter()
379 .copied()
380 .fold(0.0_f64, f64::max);
381
382 PerFeatureCusumResult {
383 per_feature_magnitude,
384 max_magnitude,
385 alerts,
386 }
387 }
388
389 pub fn reset(&mut self) {
391 for acc in &mut self.accumulators {
392 acc.reset();
393 }
394 self.total_samples = 0;
395 }
396}
397
398#[cfg(feature = "serde")]
399mod serde_accumulators {
400 use super::PerFeatureCusumAccumulator;
403 use alloc::vec::Vec;
404 use serde::{Deserialize, Deserializer, Serialize, Serializer};
405
406 pub fn serialize<S: Serializer, const D: usize>(
408 accs: &[PerFeatureCusumAccumulator; D],
409 s: S,
410 ) -> Result<S::Ok, S::Error> {
411 accs.as_slice().serialize(s)
412 }
413
414 pub fn deserialize<'de, DSer: Deserializer<'de>, const D: usize>(
416 d: DSer,
417 ) -> Result<[PerFeatureCusumAccumulator; D], DSer::Error> {
418 let v: Vec<PerFeatureCusumAccumulator> = Vec::deserialize(d)?;
419 if v.len() != D {
420 return Err(serde::de::Error::invalid_length(
421 v.len(),
422 &"expected D accumulators",
423 ));
424 }
425 let mut out = [PerFeatureCusumAccumulator::new(); D];
426 for (slot, acc) in out.iter_mut().zip(v) {
427 *slot = acc;
428 }
429 Ok(out)
430 }
431}
432
433#[cfg(test)]
434#[allow(clippy::float_cmp)]
435mod tests {
436 use super::*;
437
438 #[test]
439 fn first_observation_seeds_reference() {
440 let mut det = PerFeatureCusum::<1>::new(PerFeatureCusumConfig {
441 slack: 0.5,
442 threshold: 5.0,
443 });
444 let out = det.observe(&[100.0]);
445 assert!(out.alerts.is_empty());
446 assert!(det.accumulators()[0].reference_set);
447 assert_eq!(det.accumulators()[0].reference, 100.0);
448 }
449
450 #[test]
451 fn no_alert_on_stable_signal() {
452 let mut det = PerFeatureCusum::<1>::new(PerFeatureCusumConfig {
453 slack: 0.5,
454 threshold: 5.0,
455 });
456 for _ in 0..100 {
457 let out = det.observe(&[100.0]);
458 assert!(out.alerts.is_empty());
459 }
460 }
461
462 #[test]
463 fn detects_upward_ramp() {
464 let mut det = PerFeatureCusum::<1>::new(PerFeatureCusumConfig {
465 slack: 0.5,
466 threshold: 5.0,
467 });
468 let _ = det.observe(&[100.0]);
469 let mut alerted = false;
470 for _ in 0..20 {
471 let out = det.observe(&[105.0]);
472 if let Some(alert) = out.alerts.first() {
473 assert_eq!(alert.direction, DriftDirection::Increase);
474 assert_eq!(alert.feature_index, 0);
475 alerted = true;
476 break;
477 }
478 }
479 assert!(alerted);
480 }
481
482 #[test]
483 fn detects_downward_ramp() {
484 let mut det = PerFeatureCusum::<1>::new(PerFeatureCusumConfig {
485 slack: 0.5,
486 threshold: 5.0,
487 });
488 let _ = det.observe(&[100.0]);
489 let mut alerted = false;
490 for _ in 0..20 {
491 let out = det.observe(&[95.0]);
492 if let Some(alert) = out.alerts.first() {
493 assert_eq!(alert.direction, DriftDirection::Decrease);
494 alerted = true;
495 break;
496 }
497 }
498 assert!(alerted);
499 }
500
501 #[test]
502 fn drift_samples_counter_grows_then_resets() {
503 let mut det = PerFeatureCusum::<1>::new(PerFeatureCusumConfig {
504 slack: 0.5,
505 threshold: 5.0,
506 });
507 let _ = det.observe(&[100.0]);
508 for _ in 0..20 {
509 let _ = det.observe(&[105.0]);
510 }
511 assert!(det.accumulators()[0].drift_samples > 0);
512 for _ in 0..250 {
516 let _ = det.observe(&[100.0]);
517 }
518 assert_eq!(det.accumulators()[0].drift_samples, 0);
519 }
520
521 #[test]
522 fn set_reference_overrides_auto_learn() {
523 let mut det = PerFeatureCusum::<2>::new(PerFeatureCusumConfig {
524 slack: 0.5,
525 threshold: 5.0,
526 });
527 det.set_reference(&[50.0, 100.0]);
528 assert!(det.accumulators()[0].reference_set);
529 assert_eq!(det.accumulators()[0].reference, 50.0);
530
531 for _ in 0..50 {
533 let out = det.observe(&[50.0, 100.0]);
534 assert!(out.alerts.is_empty());
535 }
536 }
537
538 #[test]
539 fn max_magnitude_picks_largest_feature() {
540 let mut det = PerFeatureCusum::<3>::new(PerFeatureCusumConfig {
541 slack: 0.5,
542 threshold: 5.0,
543 });
544 let _ = det.observe(&[0.0, 0.0, 0.0]);
545 for _ in 0..20 {
546 let _ = det.observe(&[0.0, 10.0, 0.0]);
547 }
548 let out = det.observe(&[0.0, 10.0, 0.0]);
549 assert_eq!(out.max_magnitude, out.per_feature_magnitude[1]);
550 assert!(out.per_feature_magnitude[1] > out.per_feature_magnitude[0]);
551 assert!(out.per_feature_magnitude[1] > out.per_feature_magnitude[2]);
552 }
553
554 #[test]
555 fn reset_clears_state() {
556 let mut det = PerFeatureCusum::<2>::new(PerFeatureCusumConfig {
557 slack: 0.5,
558 threshold: 5.0,
559 });
560 let _ = det.observe(&[100.0, 200.0]);
561 for _ in 0..20 {
562 let _ = det.observe(&[110.0, 220.0]);
563 }
564 assert!(det.active_drifts() > 0);
565 det.reset();
566 assert_eq!(det.total_samples(), 0);
567 assert_eq!(det.active_drifts(), 0);
568 for acc in det.accumulators() {
569 assert!(!acc.reference_set);
570 assert_eq!(acc.s_pos, 0.0);
571 assert_eq!(acc.s_neg, 0.0);
572 }
573 }
574
575 #[test]
576 fn active_drifts_counts_per_feature() {
577 let mut det = PerFeatureCusum::<2>::new(PerFeatureCusumConfig {
578 slack: 0.5,
579 threshold: 5.0,
580 });
581 let _ = det.observe(&[100.0, 100.0]);
582 for _ in 0..20 {
583 let _ = det.observe(&[110.0, 100.0]);
584 }
585 assert_eq!(det.active_drifts(), 1);
587 }
588
589 #[cfg(all(feature = "serde", feature = "postcard"))]
590 #[test]
591 fn postcard_roundtrip_preserves_state() {
592 let mut det = PerFeatureCusum::<3>::new(PerFeatureCusumConfig {
593 slack: 0.5,
594 threshold: 5.0,
595 });
596 let _ = det.observe(&[100.0, 200.0, 300.0]);
597 for _ in 0..10 {
598 let _ = det.observe(&[105.0, 200.0, 300.0]);
599 }
600 let bytes = postcard::to_allocvec(&det).expect("serde ok");
601 let back: PerFeatureCusum<3> = postcard::from_bytes(&bytes).expect("serde ok");
602 assert_eq!(back.total_samples(), det.total_samples());
603 assert_eq!(back.accumulators()[0].s_pos, det.accumulators()[0].s_pos);
604 assert_eq!(
605 back.accumulators()[0].reference,
606 det.accumulators()[0].reference
607 );
608 }
609
610 #[cfg(all(feature = "serde", feature = "postcard"))]
611 #[test]
612 fn deserialize_rejects_nan_in_accumulator() {
613 let bad = PerFeatureCusumAccumulatorShadow {
614 s_pos: f64::NAN,
615 s_neg: 0.0,
616 reference: 0.0,
617 reference_set: true,
618 drift_samples: 0,
619 };
620 let bytes = postcard::to_allocvec(&bad).unwrap();
621 let back: Result<PerFeatureCusumAccumulator, _> = postcard::from_bytes(&bytes);
622 assert!(back.is_err());
623 }
624
625 #[cfg(all(feature = "serde", feature = "postcard"))]
626 #[test]
627 fn deserialize_rejects_negative_cumsum() {
628 let bad = PerFeatureCusumAccumulatorShadow {
629 s_pos: -1.0,
630 s_neg: 0.0,
631 reference: 0.0,
632 reference_set: true,
633 drift_samples: 0,
634 };
635 let bytes = postcard::to_allocvec(&bad).unwrap();
636 let back: Result<PerFeatureCusumAccumulator, _> = postcard::from_bytes(&bytes);
637 assert!(back.is_err());
638 }
639
640 #[cfg(all(feature = "serde", feature = "postcard"))]
641 #[test]
642 fn deserialize_rejects_non_positive_threshold() {
643 let bad = PerFeatureCusumConfigShadow {
644 slack: 0.5,
645 threshold: 0.0,
646 };
647 let bytes = postcard::to_allocvec(&bad).unwrap();
648 let back: Result<PerFeatureCusumConfig, _> = postcard::from_bytes(&bytes);
649 assert!(back.is_err());
650 }
651
652 #[cfg(all(feature = "serde", feature = "postcard"))]
653 #[test]
654 fn deserialize_rejects_nan_threshold() {
655 let bad = PerFeatureCusumConfigShadow {
656 slack: 0.5,
657 threshold: f64::NAN,
658 };
659 let bytes = postcard::to_allocvec(&bad).unwrap();
660 let back: Result<PerFeatureCusumConfig, _> = postcard::from_bytes(&bytes);
661 assert!(back.is_err());
662 }
663}