1use alloc::format;
58
59use crate::error::{RcfError, RcfResult};
60use crate::thresholded::EmaStats;
61
62pub const DEFAULT_ALLOWANCE_K: f64 = 0.5;
65pub const DEFAULT_THRESHOLD_H: f64 = 5.0;
68pub const DEFAULT_MIN_OBSERVATIONS: u64 = 32;
70pub const DEFAULT_DECAY: f64 = 0.01;
72
73#[derive(Debug, Clone, Copy, PartialEq, Eq)]
75#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
76#[non_exhaustive]
77pub enum DriftKind {
78 Upward,
82 Downward,
86}
87
88#[derive(Debug, Clone, Copy, PartialEq)]
90#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
91pub struct CusumConfig {
92 pub allowance_k: f64,
94 pub threshold_h: f64,
96 pub min_observations: u64,
99 pub decay: f64,
101}
102
103impl Default for CusumConfig {
104 fn default() -> Self {
105 Self {
106 allowance_k: DEFAULT_ALLOWANCE_K,
107 threshold_h: DEFAULT_THRESHOLD_H,
108 min_observations: DEFAULT_MIN_OBSERVATIONS,
109 decay: DEFAULT_DECAY,
110 }
111 }
112}
113
114impl CusumConfig {
115 pub fn validate(&self) -> RcfResult<()> {
124 if !self.allowance_k.is_finite() || self.allowance_k < 0.0 {
125 return Err(RcfError::InvalidConfig(
126 format!(
127 "allowance_k must be finite and >= 0, got {}",
128 self.allowance_k
129 )
130 .into(),
131 ));
132 }
133 if !self.threshold_h.is_finite() || self.threshold_h <= 0.0 {
134 return Err(RcfError::InvalidConfig(
135 format!(
136 "threshold_h must be finite and > 0, got {}",
137 self.threshold_h
138 )
139 .into(),
140 ));
141 }
142 if !self.decay.is_finite() || self.decay <= 0.0 || self.decay > 1.0 {
143 return Err(RcfError::InvalidConfig(
144 format!("decay must be in (0.0, 1.0], got {}", self.decay).into(),
145 ));
146 }
147 Ok(())
148 }
149}
150
151#[derive(Debug, Clone, Copy, PartialEq)]
153#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
154pub struct DriftVerdict {
155 pub s_high: f64,
157 pub s_low: f64,
159 pub threshold: f64,
161 pub mean: f64,
164 pub stddev: f64,
166 pub ready: bool,
170 pub drift: Option<DriftKind>,
173}
174
175#[derive(Debug, Clone)]
183#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
184pub struct MetaDriftDetector {
185 config: CusumConfig,
187 stats: EmaStats,
189 s_high: f64,
191 s_low: f64,
193 #[cfg(feature = "std")]
196 #[cfg_attr(
197 feature = "serde",
198 serde(skip, default = "crate::metrics::default_sink")
199 )]
200 metrics: std::sync::Arc<dyn crate::metrics::MetricsSink>,
201}
202
203impl MetaDriftDetector {
204 pub fn new(config: CusumConfig) -> RcfResult<Self> {
211 config.validate()?;
212 let stats = EmaStats::new(config.decay)?;
213 Ok(Self {
214 config,
215 stats,
216 s_high: 0.0,
217 s_low: 0.0,
218 #[cfg(feature = "std")]
219 metrics: crate::metrics::default_sink(),
220 })
221 }
222
223 #[cfg(feature = "std")]
227 #[must_use]
228 pub fn with_metrics_sink(
229 mut self,
230 sink: std::sync::Arc<dyn crate::metrics::MetricsSink>,
231 ) -> Self {
232 self.metrics = sink;
233 self
234 }
235
236 #[cfg(feature = "std")]
238 #[must_use]
239 pub fn metrics_sink(&self) -> &std::sync::Arc<dyn crate::metrics::MetricsSink> {
240 &self.metrics
241 }
242
243 pub fn with_defaults() -> RcfResult<Self> {
249 Self::new(CusumConfig::default())
250 }
251
252 #[must_use]
254 pub fn config(&self) -> &CusumConfig {
255 &self.config
256 }
257
258 #[must_use]
260 pub fn stats(&self) -> &EmaStats {
261 &self.stats
262 }
263
264 #[must_use]
266 pub fn s_high(&self) -> f64 {
267 self.s_high
268 }
269
270 #[must_use]
272 pub fn s_low(&self) -> f64 {
273 self.s_low
274 }
275
276 #[must_use = "detector output should be checked — dropping it silently usually indicates a logic bug"]
281 pub fn observe(&mut self, score: f64) -> DriftVerdict {
282 if !score.is_finite() {
283 return DriftVerdict {
284 s_high: self.s_high,
285 s_low: self.s_low,
286 threshold: 0.0,
287 mean: self.stats.mean(),
288 stddev: self.stats.stddev(),
289 ready: false,
290 drift: None,
291 };
292 }
293
294 let prev_mean = self.stats.mean();
295 let prev_stddev = self.stats.stddev();
296 let prev_observations = self.stats.observations();
297 self.stats.update(score);
298
299 let ready = prev_observations >= self.config.min_observations && prev_stddev > 0.0;
300 if !ready {
301 return DriftVerdict {
302 s_high: self.s_high,
303 s_low: self.s_low,
304 threshold: 0.0,
305 mean: prev_mean,
306 stddev: prev_stddev,
307 ready: false,
308 drift: None,
309 };
310 }
311
312 let k = self.config.allowance_k * prev_stddev;
313 let h = self.config.threshold_h * prev_stddev;
314 let dev = score - prev_mean;
315
316 self.s_high = (self.s_high + dev - k).max(0.0);
317 self.s_low = (self.s_low - dev - k).max(0.0);
318
319 let drift = if self.s_high > h {
320 Some(DriftKind::Upward)
321 } else if self.s_low > h {
322 Some(DriftKind::Downward)
323 } else {
324 None
325 };
326
327 #[cfg(feature = "std")]
328 {
329 use crate::metrics::names;
330 self.metrics
331 .observe_histogram(names::DRIFT_S_HIGH, self.s_high);
332 self.metrics
333 .observe_histogram(names::DRIFT_S_LOW, self.s_low);
334 match drift {
335 Some(DriftKind::Upward) => {
336 self.metrics.inc_counter(names::DRIFT_FIRES_TOTAL, 1);
337 self.metrics.inc_counter(names::DRIFT_UP_TOTAL, 1);
338 }
339 Some(DriftKind::Downward) => {
340 self.metrics.inc_counter(names::DRIFT_FIRES_TOTAL, 1);
341 self.metrics.inc_counter(names::DRIFT_DOWN_TOTAL, 1);
342 }
343 None => {}
344 }
345 }
346
347 DriftVerdict {
348 s_high: self.s_high,
349 s_low: self.s_low,
350 threshold: h,
351 mean: prev_mean,
352 stddev: prev_stddev,
353 ready: true,
354 drift,
355 }
356 }
357
358 pub fn reset(&mut self) {
362 self.s_high = 0.0;
363 self.s_low = 0.0;
364 }
365
366 pub fn reset_stats(&mut self) {
369 self.s_high = 0.0;
370 self.s_low = 0.0;
371 self.stats.reset();
372 }
373}
374
375#[cfg(test)]
376#[allow(clippy::float_cmp)] mod tests {
378 use super::*;
379
380 fn detector(h: f64) -> MetaDriftDetector {
381 MetaDriftDetector::new(CusumConfig {
382 allowance_k: 0.5,
383 threshold_h: h,
384 min_observations: 8,
385 decay: 0.1,
386 })
387 .unwrap()
388 }
389
390 #[test]
391 fn default_config_validates() {
392 CusumConfig::default().validate().unwrap();
393 }
394
395 fn cfg(k: f64, h: f64, min_obs: u64, decay: f64) -> CusumConfig {
396 CusumConfig {
397 allowance_k: k,
398 threshold_h: h,
399 min_observations: min_obs,
400 decay,
401 }
402 }
403
404 #[test]
405 fn validate_rejects_negative_allowance_k() {
406 assert!(
407 cfg(-0.1, DEFAULT_THRESHOLD_H, 8, DEFAULT_DECAY)
408 .validate()
409 .is_err()
410 );
411 }
412
413 #[test]
414 fn validate_rejects_zero_threshold_h() {
415 assert!(
416 cfg(DEFAULT_ALLOWANCE_K, 0.0, 8, DEFAULT_DECAY)
417 .validate()
418 .is_err()
419 );
420 }
421
422 #[test]
423 fn validate_rejects_decay_outside_range() {
424 assert!(
425 cfg(DEFAULT_ALLOWANCE_K, DEFAULT_THRESHOLD_H, 8, 0.0)
426 .validate()
427 .is_err()
428 );
429 assert!(
430 cfg(DEFAULT_ALLOWANCE_K, DEFAULT_THRESHOLD_H, 8, 1.5)
431 .validate()
432 .is_err()
433 );
434 assert!(
435 cfg(DEFAULT_ALLOWANCE_K, DEFAULT_THRESHOLD_H, 8, f64::NAN)
436 .validate()
437 .is_err()
438 );
439 }
440
441 #[test]
442 fn warmup_never_fires() {
443 let mut d = detector(5.0);
444 for _ in 0..8 {
445 let v = d.observe(1.0);
446 assert!(!v.ready);
447 assert!(v.drift.is_none());
448 }
449 }
450
451 #[test]
452 fn constant_stream_does_not_fire() {
453 let mut d = detector(5.0);
456 for _ in 0..200 {
457 let v = d.observe(1.0);
458 assert!(v.drift.is_none());
459 }
460 assert_eq!(d.s_high(), 0.0);
462 assert_eq!(d.s_low(), 0.0);
463 }
464
465 #[test]
466 fn upward_shift_fires_upward() {
467 let mut d = detector(3.0);
468 for i in 0..64 {
470 let noise = if i % 2 == 0 { 0.95 } else { 1.05 };
471 let _ = d.observe(noise);
472 }
473 let mut saw_upward = false;
475 for _ in 0..100 {
476 let v = d.observe(5.0);
477 if matches!(v.drift, Some(DriftKind::Upward)) {
478 saw_upward = true;
479 break;
480 }
481 }
482 assert!(saw_upward, "CUSUM should fire upward on sustained shift");
483 }
484
485 #[test]
486 fn downward_shift_fires_downward() {
487 let mut d = detector(3.0);
488 for i in 0..64 {
489 let noise = if i % 2 == 0 { 4.95 } else { 5.05 };
490 let _ = d.observe(noise);
491 }
492 let mut saw_downward = false;
493 for _ in 0..100 {
494 let v = d.observe(1.0);
495 if matches!(v.drift, Some(DriftKind::Downward)) {
496 saw_downward = true;
497 break;
498 }
499 }
500 assert!(
501 saw_downward,
502 "CUSUM should fire downward on sustained shift"
503 );
504 }
505
506 #[test]
507 fn non_finite_input_ignored() {
508 let mut d = detector(3.0);
509 for _ in 0..16 {
510 let _ = d.observe(1.0);
511 }
512 let obs_before = d.stats().observations();
513 let v_nan = d.observe(f64::NAN);
514 let v_inf = d.observe(f64::INFINITY);
515 assert!(v_nan.drift.is_none());
516 assert!(v_inf.drift.is_none());
517 assert_eq!(d.stats().observations(), obs_before);
518 }
519
520 #[test]
521 fn reset_clears_accumulators_but_keeps_stats() {
522 let mut d = detector(3.0);
523 for i in 0..64 {
524 let noise = if i % 2 == 0 { 0.95 } else { 1.05 };
525 let _ = d.observe(noise);
526 }
527 for _ in 0..50 {
528 let _ = d.observe(5.0);
529 }
530 assert!(d.s_high() > 0.0);
531 let stats_obs = d.stats().observations();
532 d.reset();
533 assert_eq!(d.s_high(), 0.0);
534 assert_eq!(d.s_low(), 0.0);
535 assert_eq!(
536 d.stats().observations(),
537 stats_obs,
538 "reset() must keep the EMA reference"
539 );
540 }
541
542 #[test]
543 fn reset_stats_clears_everything() {
544 let mut d = detector(3.0);
545 for _ in 0..64 {
546 let _ = d.observe(1.0);
547 }
548 d.reset_stats();
549 assert_eq!(d.s_high(), 0.0);
550 assert_eq!(d.s_low(), 0.0);
551 assert_eq!(d.stats().observations(), 0);
552 }
553
554 #[test]
555 fn verdict_exposes_reference_mean_and_stddev() {
556 let mut d = detector(5.0);
557 for _ in 0..32 {
558 let _ = d.observe(2.0);
559 }
560 let v = d.observe(2.5);
561 assert!((v.mean - 2.0).abs() < 0.5);
565 assert!(v.stddev >= 0.0);
566 }
567
568 #[test]
569 fn with_defaults_builds() {
570 let d = MetaDriftDetector::with_defaults().unwrap();
571 assert_eq!(d.config().allowance_k, DEFAULT_ALLOWANCE_K);
572 assert_eq!(d.config().threshold_h, DEFAULT_THRESHOLD_H);
573 }
574}