Skip to main content

irithyll_core/drift/
ddm.rs

1//! DDM (Drift Detection Method) detector.
2//!
3//! DDM monitors the running error rate and standard deviation of a stream of
4//! error values. When the current `mean + std` exceeds the historical minimum
5//! by a configurable number of standard deviations, the detector signals
6//! [`DriftSignal::Warning`] (at `warning_level * min_std`) or
7//! [`DriftSignal::Drift`] (at `drift_level * min_std`).
8//!
9//! Originally designed for binary error indicators (0/1), this implementation
10//! generalises to continuous error magnitudes by using Welford's online
11//! algorithm for running mean and population standard deviation.
12//!
13//! # References
14//!
15//! Gama, J., Medas, P., Castillo, G., & Rodrigues, P. (2004).
16//! *Learning with drift detection.* In Advances in Artificial Intelligence --
17//! SBIA 2004, pp. 286--295.
18
19use super::DriftSignal;
20
21/// DDM (Drift Detection Method) for concept drift detection.
22///
23/// Monitors running error rate and standard deviation.
24/// Signals warning when error exceeds minimum by 2 sigma, drift at 3 sigma.
25#[derive(Debug, Clone)]
26#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
27pub struct Ddm {
28    /// Warning threshold multiplier (default 2.0).
29    warning_level: f64,
30    /// Drift threshold multiplier (default 3.0).
31    drift_level: f64,
32    /// Minimum observations before detection activates (default 30).
33    min_instances: u64,
34
35    // --- Welford running statistics ---
36    /// Running mean of observed values.
37    mean: f64,
38    /// Running M2 aggregate for variance (Welford).
39    m2: f64,
40    /// Number of observations seen.
41    count: u64,
42
43    // --- Minimum tracking ---
44    /// Minimum `mean + std` observed so far.
45    min_p_plus_s: f64,
46    /// Standard deviation at the point where `min_p_plus_s` was recorded.
47    min_s: f64,
48}
49
50impl Ddm {
51    /// Create a new DDM detector with default parameters.
52    ///
53    /// Defaults: `warning_level = 2.0`, `drift_level = 3.0`,
54    /// `min_instances = 30`.
55    pub fn new() -> Self {
56        Self::with_params(2.0, 3.0, 30)
57    }
58
59    /// Create a DDM detector with custom parameters.
60    ///
61    /// # Arguments
62    ///
63    /// * `warning_level` -- multiplier of `min_s` above `min_p_plus_s` to
64    ///   trigger a warning (typically 2.0).
65    /// * `drift_level` -- multiplier of `min_s` above `min_p_plus_s` to
66    ///   trigger drift (typically 3.0).
67    /// * `min_instances` -- number of observations required before the
68    ///   detector can emit Warning or Drift signals.
69    pub fn with_params(warning_level: f64, drift_level: f64, min_instances: u64) -> Self {
70        Self {
71            warning_level,
72            drift_level,
73            min_instances,
74            mean: 0.0,
75            m2: 0.0,
76            count: 0,
77            min_p_plus_s: f64::MAX,
78            min_s: f64::MAX,
79        }
80    }
81
82    /// Return the current warning-level multiplier.
83    #[inline]
84    pub fn warning_level(&self) -> f64 {
85        self.warning_level
86    }
87
88    /// Return the current drift-level multiplier.
89    #[inline]
90    pub fn drift_level(&self) -> f64 {
91        self.drift_level
92    }
93
94    /// Return the minimum-instances warmup threshold.
95    #[inline]
96    pub fn min_instances(&self) -> u64 {
97        self.min_instances
98    }
99
100    /// Current population standard deviation of the observed stream.
101    #[inline]
102    pub fn std_dev(&self) -> f64 {
103        if self.count == 0 {
104            0.0
105        } else {
106            crate::math::sqrt(self.m2 / self.count as f64)
107        }
108    }
109
110    /// Minimum `mean + std` observed so far.
111    #[inline]
112    pub fn min_p_plus_s(&self) -> f64 {
113        self.min_p_plus_s
114    }
115
116    /// Reset the running statistics (mean, m2, count) while preserving the
117    /// minimum tracking state. Called internally after drift is confirmed.
118    fn reset_running_stats(&mut self) {
119        self.mean = 0.0;
120        self.m2 = 0.0;
121        self.count = 0;
122    }
123}
124
125impl Default for Ddm {
126    fn default() -> Self {
127        Self::new()
128    }
129}
130
131impl core::fmt::Display for Ddm {
132    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
133        write!(
134            f,
135            "Ddm(warn={}, drift={}, min_inst={}, count={})",
136            self.warning_level, self.drift_level, self.min_instances, self.count
137        )
138    }
139}
140
141// -- Inherent methods (always available, no alloc needed) --
142
143impl Ddm {
144    /// Feed a new value and get the current drift signal.
145    ///
146    /// This is the core algorithm, usable without the `alloc` feature.
147    /// When `alloc` is enabled, the `DriftDetector::update` trait method delegates here.
148    pub fn update(&mut self, value: f64) -> DriftSignal {
149        self.count += 1;
150        let n = self.count as f64;
151
152        let delta = value - self.mean;
153        self.mean += delta / n;
154        let delta2 = value - self.mean;
155        self.m2 += delta * delta2;
156
157        let std = crate::math::sqrt(self.m2 / n);
158        let p_plus_s = self.mean + std;
159
160        if self.count <= self.min_instances {
161            return DriftSignal::Stable;
162        }
163
164        if p_plus_s < self.min_p_plus_s {
165            self.min_p_plus_s = p_plus_s;
166            self.min_s = std;
167        }
168
169        if p_plus_s >= self.min_p_plus_s + self.drift_level * self.min_s {
170            self.reset_running_stats();
171            return DriftSignal::Drift;
172        }
173
174        if p_plus_s >= self.min_p_plus_s + self.warning_level * self.min_s {
175            return DriftSignal::Warning;
176        }
177
178        DriftSignal::Stable
179    }
180
181    /// Reset to initial state.
182    pub fn reset(&mut self) {
183        self.mean = 0.0;
184        self.m2 = 0.0;
185        self.count = 0;
186        self.min_p_plus_s = f64::MAX;
187        self.min_s = f64::MAX;
188    }
189
190    /// Current estimated mean of the monitored stream.
191    pub fn estimated_mean(&self) -> f64 {
192        self.mean
193    }
194}
195
196// -- DriftDetector trait impl (requires alloc for Box) --
197
198#[cfg(feature = "alloc")]
199impl super::DriftDetector for Ddm {
200    fn update(&mut self, value: f64) -> DriftSignal {
201        Ddm::update(self, value)
202    }
203
204    fn reset(&mut self) {
205        Ddm::reset(self);
206    }
207
208    fn clone_fresh(&self) -> alloc::boxed::Box<dyn super::DriftDetector> {
209        alloc::boxed::Box::new(Self::with_params(
210            self.warning_level,
211            self.drift_level,
212            self.min_instances,
213        ))
214    }
215
216    fn clone_boxed(&self) -> alloc::boxed::Box<dyn super::DriftDetector> {
217        alloc::boxed::Box::new(self.clone())
218    }
219
220    fn estimated_mean(&self) -> f64 {
221        Ddm::estimated_mean(self)
222    }
223
224    fn serialize_state(&self) -> Option<super::DriftDetectorState> {
225        Some(super::DriftDetectorState::Ddm {
226            mean: self.mean,
227            m2: self.m2,
228            count: self.count,
229            min_p_plus_s: self.min_p_plus_s,
230            min_s: self.min_s,
231        })
232    }
233
234    fn restore_state(&mut self, state: &super::DriftDetectorState) -> bool {
235        if let super::DriftDetectorState::Ddm {
236            mean,
237            m2,
238            count,
239            min_p_plus_s,
240            min_s,
241        } = state
242        {
243            self.mean = *mean;
244            self.m2 = *m2;
245            self.count = *count;
246            self.min_p_plus_s = *min_p_plus_s;
247            self.min_s = *min_s;
248            true
249        } else {
250            false
251        }
252    }
253}
254
255// ---------------------------------------------------------------------------
256// Tests
257// ---------------------------------------------------------------------------
258
259#[cfg(test)]
260mod tests {
261    extern crate alloc;
262    use alloc::vec::Vec;
263
264    #[cfg(feature = "alloc")]
265    use super::super::DriftDetector;
266    use super::super::DriftSignal;
267    use super::*;
268
269    /// Helper: feed a slice of values, collect all signals.
270    fn feed(ddm: &mut Ddm, values: &[f64]) -> Vec<DriftSignal> {
271        values.iter().map(|&v| ddm.update(v)).collect()
272    }
273
274    /// Helper: generate `n` values centred around `centre` with deterministic
275    /// jitter derived from the index (no randomness needed).
276    fn generate_values(centre: f64, jitter: f64, n: usize) -> Vec<f64> {
277        (0..n)
278            .map(|i| {
279                // Deterministic oscillation: sin-based spread.
280                let t = i as f64;
281                centre + jitter * crate::math::sin(t * 0.7)
282            })
283            .collect()
284    }
285
286    // 1. Stationary low error -- no drift expected.
287    #[test]
288    fn stationary_low_error_no_drift() {
289        let mut ddm = Ddm::new();
290        let values = generate_values(0.1, 0.02, 5000);
291        let signals = feed(&mut ddm, &values);
292
293        let drift_count = signals.iter().filter(|&&s| s == DriftSignal::Drift).count();
294        assert_eq!(
295            drift_count, 0,
296            "stationary low error should produce no drift"
297        );
298    }
299
300    // 2. Error rate increase -- drift detected.
301    #[test]
302    fn error_rate_increase_detects_drift() {
303        let mut ddm = Ddm::new();
304
305        // Phase 1: low error rate.
306        let low = generate_values(0.1, 0.01, 2000);
307        feed(&mut ddm, &low);
308
309        // Phase 2: high error rate -- should trigger drift.
310        let high = generate_values(0.8, 0.01, 2000);
311        let signals = feed(&mut ddm, &high);
312
313        let drift_count = signals.iter().filter(|&&s| s == DriftSignal::Drift).count();
314        assert!(
315            drift_count >= 1,
316            "sudden error increase should trigger at least one drift, got {}",
317            drift_count
318        );
319    }
320
321    // 3. Warning fires before drift on gradual increase.
322    #[test]
323    fn warning_before_drift() {
324        let mut ddm = Ddm::new();
325
326        // Stable baseline.
327        let baseline = generate_values(0.05, 0.005, 200);
328        feed(&mut ddm, &baseline);
329
330        // Gradual increase toward high error.
331        let ramp: Vec<f64> = (0..3000)
332            .map(|i| {
333                let t = i as f64 / 3000.0;
334                // Ramp from 0.05 to 0.9 linearly.
335                0.05 + 0.85 * t
336            })
337            .collect();
338
339        let signals = feed(&mut ddm, &ramp);
340
341        let first_warning = signals.iter().position(|&s| s == DriftSignal::Warning);
342        let first_drift = signals.iter().position(|&s| s == DriftSignal::Drift);
343
344        assert!(
345            first_warning.is_some(),
346            "gradual increase should trigger at least one warning"
347        );
348        assert!(
349            first_drift.is_some(),
350            "gradual increase should eventually trigger drift"
351        );
352        assert!(
353            first_warning.unwrap() < first_drift.unwrap(),
354            "warning (idx {}) should fire before drift (idx {})",
355            first_warning.unwrap(),
356            first_drift.unwrap()
357        );
358    }
359
360    // 4. Minimum tracking -- min_p_plus_s decreases as low errors flow in.
361    #[test]
362    fn minimum_tracking() {
363        let mut ddm = Ddm::new();
364
365        // Feed a few high-ish values first.
366        for _ in 0..5 {
367            ddm.update(0.5);
368        }
369        let early_min = ddm.min_p_plus_s();
370
371        // Feed many low values -- min should decrease.
372        let low = generate_values(0.05, 0.005, 200);
373        feed(&mut ddm, &low);
374        let later_min = ddm.min_p_plus_s();
375
376        assert!(
377            later_min < early_min,
378            "min_p_plus_s should decrease with low errors: early={}, later={}",
379            early_min,
380            later_min
381        );
382    }
383
384    // 5. estimated_mean tracks running mean correctly.
385    #[test]
386    fn estimated_mean_tracks_correctly() {
387        // Use a very long warmup so drift never fires during our test.
388        let mut ddm = Ddm::with_params(2.0, 3.0, 100_000);
389
390        // Feed 1000 values of exactly 0.3.
391        for _ in 0..1000 {
392            ddm.update(0.3);
393        }
394        let mean = ddm.estimated_mean();
395        assert!(
396            (mean - 0.3).abs() < 1e-10,
397            "mean should be ~0.3, got {}",
398            mean
399        );
400
401        // Feed 1000 more of exactly 0.7 => overall mean = 0.5.
402        for _ in 0..1000 {
403            ddm.update(0.7);
404        }
405        let mean2 = ddm.estimated_mean();
406        assert!(
407            (mean2 - 0.5).abs() < 1e-10,
408            "mean should be ~0.5, got {}",
409            mean2
410        );
411
412        // Feed 1000 more of exactly 0.2 => overall mean = (300+700+200)/3000 = 0.4.
413        for _ in 0..1000 {
414            ddm.update(0.2);
415        }
416        let mean3 = ddm.estimated_mean();
417        assert!(
418            (mean3 - 0.4).abs() < 1e-10,
419            "mean should be ~0.4, got {}",
420            mean3
421        );
422    }
423
424    // 6. Reset clears all state.
425    #[test]
426    fn reset_clears_state() {
427        let mut ddm = Ddm::new();
428
429        // Feed some data.
430        let vals = generate_values(0.4, 0.05, 500);
431        feed(&mut ddm, &vals);
432        assert!(ddm.count > 0);
433
434        ddm.reset();
435
436        assert_eq!(ddm.count, 0);
437        assert_eq!(ddm.mean, 0.0);
438        assert_eq!(ddm.m2, 0.0);
439        assert_eq!(ddm.min_p_plus_s, f64::MAX);
440        assert_eq!(ddm.min_s, f64::MAX);
441    }
442
443    // 7. clone_fresh produces a fresh instance with same params.
444    #[cfg(feature = "alloc")]
445    #[test]
446    fn clone_fresh_same_params() {
447        let ddm = Ddm::with_params(1.5, 2.5, 50);
448
449        // Feed some data to dirty the original.
450        let mut dirty = ddm.clone();
451        let vals = generate_values(0.3, 0.02, 200);
452        feed(&mut dirty, &vals);
453
454        let mut fresh = dirty.clone_fresh();
455
456        // Fresh detector should have zero mean (no data seen yet).
457        assert_eq!(fresh.estimated_mean(), 0.0);
458
459        // Compare behaviour: feed identical streams to the clone_fresh result
460        // and a manually-constructed fresh Ddm with the same params.
461        let mut manual_fresh = Ddm::with_params(1.5, 2.5, 50);
462        let test_vals = generate_values(0.2, 0.01, 100);
463
464        let signals_a: Vec<DriftSignal> = test_vals.iter().map(|&v| fresh.update(v)).collect();
465        let signals_b: Vec<DriftSignal> =
466            test_vals.iter().map(|&v| manual_fresh.update(v)).collect();
467
468        assert_eq!(
469            signals_a, signals_b,
470            "clone_fresh should behave identically to a new instance with same params"
471        );
472
473        // Means should converge to the same value.
474        assert!(
475            (fresh.estimated_mean() - manual_fresh.estimated_mean()).abs() < 1e-12,
476            "means should match: {} vs {}",
477            fresh.estimated_mean(),
478            manual_fresh.estimated_mean()
479        );
480    }
481
482    // 8. Warmup -- no drift during min_instances period.
483    #[test]
484    fn warmup_no_drift() {
485        let min_inst = 50u64;
486        let mut ddm = Ddm::with_params(2.0, 3.0, min_inst);
487
488        // Feed wildly changing values during warmup -- should still be Stable.
489        for i in 0..min_inst {
490            let value = if i % 2 == 0 { 0.0 } else { 1.0 };
491            let signal = ddm.update(value);
492            assert_eq!(
493                signal,
494                DriftSignal::Stable,
495                "during warmup (i={}), signal should be Stable, got {:?}",
496                i,
497                signal
498            );
499        }
500    }
501
502    // 9. Custom params stored correctly.
503    #[test]
504    fn custom_params() {
505        let ddm = Ddm::with_params(1.0, 4.0, 100);
506        assert_eq!(ddm.warning_level(), 1.0);
507        assert_eq!(ddm.drift_level(), 4.0);
508        assert_eq!(ddm.min_instances(), 100);
509    }
510
511    // Extra: Default trait works.
512    #[test]
513    fn default_matches_new() {
514        let a = Ddm::new();
515        let b = Ddm::default();
516        assert_eq!(a.warning_level, b.warning_level);
517        assert_eq!(a.drift_level, b.drift_level);
518        assert_eq!(a.min_instances, b.min_instances);
519    }
520
521    // Extra: std_dev is zero with no data.
522    #[test]
523    fn std_dev_zero_initially() {
524        let ddm = Ddm::new();
525        assert_eq!(ddm.std_dev(), 0.0);
526    }
527
528    // Extra: std_dev is zero for constant stream.
529    #[test]
530    fn std_dev_zero_for_constant() {
531        let mut ddm = Ddm::new();
532        for _ in 0..100 {
533            ddm.update(0.5);
534        }
535        assert!(
536            ddm.std_dev().abs() < 1e-12,
537            "std of constant stream should be ~0, got {}",
538            ddm.std_dev()
539        );
540    }
541
542    // Extra: after drift reset, running stats are cleared but mins are kept.
543    #[test]
544    fn drift_resets_running_keeps_mins() {
545        let mut ddm = Ddm::with_params(2.0, 3.0, 10);
546
547        // Build up a low baseline.
548        for _ in 0..100 {
549            ddm.update(0.05);
550        }
551        let min_before = ddm.min_p_plus_s();
552
553        // Trigger drift with sudden high error.
554        let mut got_drift = false;
555        for _ in 0..500 {
556            if ddm.update(0.95) == DriftSignal::Drift {
557                got_drift = true;
558                break;
559            }
560        }
561        assert!(got_drift, "should have triggered drift");
562
563        // After drift: count is reset, but min_p_plus_s is preserved.
564        assert_eq!(ddm.count, 0, "count should be 0 after drift reset");
565        assert_eq!(ddm.mean, 0.0, "mean should be 0 after drift reset");
566        assert_eq!(
567            ddm.min_p_plus_s(),
568            min_before,
569            "min_p_plus_s should be preserved after drift reset"
570        );
571    }
572}