1use super::DriftSignal;
20
21#[derive(Debug, Clone)]
26#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
27pub struct Ddm {
28 warning_level: f64,
30 drift_level: f64,
32 min_instances: u64,
34
35 mean: f64,
38 m2: f64,
40 count: u64,
42
43 min_p_plus_s: f64,
46 min_s: f64,
48}
49
50impl Ddm {
51 pub fn new() -> Self {
56 Self::with_params(2.0, 3.0, 30)
57 }
58
59 pub fn with_params(warning_level: f64, drift_level: f64, min_instances: u64) -> Self {
70 Self {
71 warning_level,
72 drift_level,
73 min_instances,
74 mean: 0.0,
75 m2: 0.0,
76 count: 0,
77 min_p_plus_s: f64::MAX,
78 min_s: f64::MAX,
79 }
80 }
81
82 #[inline]
84 pub fn warning_level(&self) -> f64 {
85 self.warning_level
86 }
87
88 #[inline]
90 pub fn drift_level(&self) -> f64 {
91 self.drift_level
92 }
93
94 #[inline]
96 pub fn min_instances(&self) -> u64 {
97 self.min_instances
98 }
99
100 #[inline]
102 pub fn std_dev(&self) -> f64 {
103 if self.count == 0 {
104 0.0
105 } else {
106 crate::math::sqrt(self.m2 / self.count as f64)
107 }
108 }
109
110 #[inline]
112 pub fn min_p_plus_s(&self) -> f64 {
113 self.min_p_plus_s
114 }
115
116 fn reset_running_stats(&mut self) {
119 self.mean = 0.0;
120 self.m2 = 0.0;
121 self.count = 0;
122 }
123}
124
125impl Default for Ddm {
126 fn default() -> Self {
127 Self::new()
128 }
129}
130
131impl core::fmt::Display for Ddm {
132 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
133 write!(
134 f,
135 "Ddm(warn={}, drift={}, min_inst={}, count={})",
136 self.warning_level, self.drift_level, self.min_instances, self.count
137 )
138 }
139}
140
141impl Ddm {
144 pub fn update(&mut self, value: f64) -> DriftSignal {
149 self.count += 1;
150 let n = self.count as f64;
151
152 let delta = value - self.mean;
153 self.mean += delta / n;
154 let delta2 = value - self.mean;
155 self.m2 += delta * delta2;
156
157 let std = crate::math::sqrt(self.m2 / n);
158 let p_plus_s = self.mean + std;
159
160 if self.count <= self.min_instances {
161 return DriftSignal::Stable;
162 }
163
164 if p_plus_s < self.min_p_plus_s {
165 self.min_p_plus_s = p_plus_s;
166 self.min_s = std;
167 }
168
169 if p_plus_s >= self.min_p_plus_s + self.drift_level * self.min_s {
170 self.reset_running_stats();
171 return DriftSignal::Drift;
172 }
173
174 if p_plus_s >= self.min_p_plus_s + self.warning_level * self.min_s {
175 return DriftSignal::Warning;
176 }
177
178 DriftSignal::Stable
179 }
180
181 pub fn reset(&mut self) {
183 self.mean = 0.0;
184 self.m2 = 0.0;
185 self.count = 0;
186 self.min_p_plus_s = f64::MAX;
187 self.min_s = f64::MAX;
188 }
189
190 pub fn estimated_mean(&self) -> f64 {
192 self.mean
193 }
194}
195
196#[cfg(feature = "alloc")]
199impl super::DriftDetector for Ddm {
200 fn update(&mut self, value: f64) -> DriftSignal {
201 Ddm::update(self, value)
202 }
203
204 fn reset(&mut self) {
205 Ddm::reset(self);
206 }
207
208 fn clone_fresh(&self) -> alloc::boxed::Box<dyn super::DriftDetector> {
209 alloc::boxed::Box::new(Self::with_params(
210 self.warning_level,
211 self.drift_level,
212 self.min_instances,
213 ))
214 }
215
216 fn clone_boxed(&self) -> alloc::boxed::Box<dyn super::DriftDetector> {
217 alloc::boxed::Box::new(self.clone())
218 }
219
220 fn estimated_mean(&self) -> f64 {
221 Ddm::estimated_mean(self)
222 }
223
224 fn serialize_state(&self) -> Option<super::DriftDetectorState> {
225 Some(super::DriftDetectorState::Ddm {
226 mean: self.mean,
227 m2: self.m2,
228 count: self.count,
229 min_p_plus_s: self.min_p_plus_s,
230 min_s: self.min_s,
231 })
232 }
233
234 fn restore_state(&mut self, state: &super::DriftDetectorState) -> bool {
235 if let super::DriftDetectorState::Ddm {
236 mean,
237 m2,
238 count,
239 min_p_plus_s,
240 min_s,
241 } = state
242 {
243 self.mean = *mean;
244 self.m2 = *m2;
245 self.count = *count;
246 self.min_p_plus_s = *min_p_plus_s;
247 self.min_s = *min_s;
248 true
249 } else {
250 false
251 }
252 }
253}
254
255#[cfg(test)]
260mod tests {
261 extern crate alloc;
262 use alloc::vec::Vec;
263
264 #[cfg(feature = "alloc")]
265 use super::super::DriftDetector;
266 use super::super::DriftSignal;
267 use super::*;
268
269 fn feed(ddm: &mut Ddm, values: &[f64]) -> Vec<DriftSignal> {
271 values.iter().map(|&v| ddm.update(v)).collect()
272 }
273
274 fn generate_values(centre: f64, jitter: f64, n: usize) -> Vec<f64> {
277 (0..n)
278 .map(|i| {
279 let t = i as f64;
281 centre + jitter * crate::math::sin(t * 0.7)
282 })
283 .collect()
284 }
285
286 #[test]
288 fn stationary_low_error_no_drift() {
289 let mut ddm = Ddm::new();
290 let values = generate_values(0.1, 0.02, 5000);
291 let signals = feed(&mut ddm, &values);
292
293 let drift_count = signals.iter().filter(|&&s| s == DriftSignal::Drift).count();
294 assert_eq!(
295 drift_count, 0,
296 "stationary low error should produce no drift"
297 );
298 }
299
300 #[test]
302 fn error_rate_increase_detects_drift() {
303 let mut ddm = Ddm::new();
304
305 let low = generate_values(0.1, 0.01, 2000);
307 feed(&mut ddm, &low);
308
309 let high = generate_values(0.8, 0.01, 2000);
311 let signals = feed(&mut ddm, &high);
312
313 let drift_count = signals.iter().filter(|&&s| s == DriftSignal::Drift).count();
314 assert!(
315 drift_count >= 1,
316 "sudden error increase should trigger at least one drift, got {}",
317 drift_count
318 );
319 }
320
321 #[test]
323 fn warning_before_drift() {
324 let mut ddm = Ddm::new();
325
326 let baseline = generate_values(0.05, 0.005, 200);
328 feed(&mut ddm, &baseline);
329
330 let ramp: Vec<f64> = (0..3000)
332 .map(|i| {
333 let t = i as f64 / 3000.0;
334 0.05 + 0.85 * t
336 })
337 .collect();
338
339 let signals = feed(&mut ddm, &ramp);
340
341 let first_warning = signals.iter().position(|&s| s == DriftSignal::Warning);
342 let first_drift = signals.iter().position(|&s| s == DriftSignal::Drift);
343
344 assert!(
345 first_warning.is_some(),
346 "gradual increase should trigger at least one warning"
347 );
348 assert!(
349 first_drift.is_some(),
350 "gradual increase should eventually trigger drift"
351 );
352 assert!(
353 first_warning.unwrap() < first_drift.unwrap(),
354 "warning (idx {}) should fire before drift (idx {})",
355 first_warning.unwrap(),
356 first_drift.unwrap()
357 );
358 }
359
360 #[test]
362 fn minimum_tracking() {
363 let mut ddm = Ddm::new();
364
365 for _ in 0..5 {
367 ddm.update(0.5);
368 }
369 let early_min = ddm.min_p_plus_s();
370
371 let low = generate_values(0.05, 0.005, 200);
373 feed(&mut ddm, &low);
374 let later_min = ddm.min_p_plus_s();
375
376 assert!(
377 later_min < early_min,
378 "min_p_plus_s should decrease with low errors: early={}, later={}",
379 early_min,
380 later_min
381 );
382 }
383
384 #[test]
386 fn estimated_mean_tracks_correctly() {
387 let mut ddm = Ddm::with_params(2.0, 3.0, 100_000);
389
390 for _ in 0..1000 {
392 ddm.update(0.3);
393 }
394 let mean = ddm.estimated_mean();
395 assert!(
396 (mean - 0.3).abs() < 1e-10,
397 "mean should be ~0.3, got {}",
398 mean
399 );
400
401 for _ in 0..1000 {
403 ddm.update(0.7);
404 }
405 let mean2 = ddm.estimated_mean();
406 assert!(
407 (mean2 - 0.5).abs() < 1e-10,
408 "mean should be ~0.5, got {}",
409 mean2
410 );
411
412 for _ in 0..1000 {
414 ddm.update(0.2);
415 }
416 let mean3 = ddm.estimated_mean();
417 assert!(
418 (mean3 - 0.4).abs() < 1e-10,
419 "mean should be ~0.4, got {}",
420 mean3
421 );
422 }
423
424 #[test]
426 fn reset_clears_state() {
427 let mut ddm = Ddm::new();
428
429 let vals = generate_values(0.4, 0.05, 500);
431 feed(&mut ddm, &vals);
432 assert!(ddm.count > 0);
433
434 ddm.reset();
435
436 assert_eq!(ddm.count, 0);
437 assert_eq!(ddm.mean, 0.0);
438 assert_eq!(ddm.m2, 0.0);
439 assert_eq!(ddm.min_p_plus_s, f64::MAX);
440 assert_eq!(ddm.min_s, f64::MAX);
441 }
442
443 #[cfg(feature = "alloc")]
445 #[test]
446 fn clone_fresh_same_params() {
447 let ddm = Ddm::with_params(1.5, 2.5, 50);
448
449 let mut dirty = ddm.clone();
451 let vals = generate_values(0.3, 0.02, 200);
452 feed(&mut dirty, &vals);
453
454 let mut fresh = dirty.clone_fresh();
455
456 assert_eq!(fresh.estimated_mean(), 0.0);
458
459 let mut manual_fresh = Ddm::with_params(1.5, 2.5, 50);
462 let test_vals = generate_values(0.2, 0.01, 100);
463
464 let signals_a: Vec<DriftSignal> = test_vals.iter().map(|&v| fresh.update(v)).collect();
465 let signals_b: Vec<DriftSignal> =
466 test_vals.iter().map(|&v| manual_fresh.update(v)).collect();
467
468 assert_eq!(
469 signals_a, signals_b,
470 "clone_fresh should behave identically to a new instance with same params"
471 );
472
473 assert!(
475 (fresh.estimated_mean() - manual_fresh.estimated_mean()).abs() < 1e-12,
476 "means should match: {} vs {}",
477 fresh.estimated_mean(),
478 manual_fresh.estimated_mean()
479 );
480 }
481
482 #[test]
484 fn warmup_no_drift() {
485 let min_inst = 50u64;
486 let mut ddm = Ddm::with_params(2.0, 3.0, min_inst);
487
488 for i in 0..min_inst {
490 let value = if i % 2 == 0 { 0.0 } else { 1.0 };
491 let signal = ddm.update(value);
492 assert_eq!(
493 signal,
494 DriftSignal::Stable,
495 "during warmup (i={}), signal should be Stable, got {:?}",
496 i,
497 signal
498 );
499 }
500 }
501
502 #[test]
504 fn custom_params() {
505 let ddm = Ddm::with_params(1.0, 4.0, 100);
506 assert_eq!(ddm.warning_level(), 1.0);
507 assert_eq!(ddm.drift_level(), 4.0);
508 assert_eq!(ddm.min_instances(), 100);
509 }
510
511 #[test]
513 fn default_matches_new() {
514 let a = Ddm::new();
515 let b = Ddm::default();
516 assert_eq!(a.warning_level, b.warning_level);
517 assert_eq!(a.drift_level, b.drift_level);
518 assert_eq!(a.min_instances, b.min_instances);
519 }
520
521 #[test]
523 fn std_dev_zero_initially() {
524 let ddm = Ddm::new();
525 assert_eq!(ddm.std_dev(), 0.0);
526 }
527
528 #[test]
530 fn std_dev_zero_for_constant() {
531 let mut ddm = Ddm::new();
532 for _ in 0..100 {
533 ddm.update(0.5);
534 }
535 assert!(
536 ddm.std_dev().abs() < 1e-12,
537 "std of constant stream should be ~0, got {}",
538 ddm.std_dev()
539 );
540 }
541
542 #[test]
544 fn drift_resets_running_keeps_mins() {
545 let mut ddm = Ddm::with_params(2.0, 3.0, 10);
546
547 for _ in 0..100 {
549 ddm.update(0.05);
550 }
551 let min_before = ddm.min_p_plus_s();
552
553 let mut got_drift = false;
555 for _ in 0..500 {
556 if ddm.update(0.95) == DriftSignal::Drift {
557 got_drift = true;
558 break;
559 }
560 }
561 assert!(got_drift, "should have triggered drift");
562
563 assert_eq!(ddm.count, 0, "count should be 0 after drift reset");
565 assert_eq!(ddm.mean, 0.0, "mean should be 0 after drift reset");
566 assert_eq!(
567 ddm.min_p_plus_s(),
568 min_before,
569 "min_p_plus_s should be preserved after drift reset"
570 );
571 }
572}