scouter_drift/spc/
monitor.rs

1use crate::error::DriftError;
2use crate::spc::types::{SpcDriftMap, SpcFeatureDrift};
3use crate::utils::CategoricalFeatureHelpers;
4use chrono::Utc;
5use indicatif::ProgressBar;
6use ndarray::prelude::*;
7use ndarray::Axis;
8use num_traits::{Float, FromPrimitive, Num};
9use rayon::prelude::*;
10use scouter_types::{
11    spc::{SpcDriftConfig, SpcDriftProfile, SpcFeatureDriftProfile},
12    ServerRecord, ServerRecords, SpcServerRecord,
13};
14use std::collections::HashMap;
15use std::fmt::Debug;
16
17pub struct SpcMonitor {}
18
19impl CategoricalFeatureHelpers for SpcMonitor {}
20
21impl SpcMonitor {
22    pub fn new() -> Self {
23        SpcMonitor {}
24    }
25
26    /// Compute c4 for control limits
27    ///
28    /// # Arguments
29    ///
30    /// * `number` - The sample size
31    ///
32    /// # Returns
33    ///
34    /// The c4 value
35    fn compute_c4(&self, number: usize) -> f32 {
36        //c4 is asymptotically equivalent to (4n-4)/(4n-3)
37        let n = number as f32;
38        let left = 4.0 * n - 4.0;
39        let right = 4.0 * n - 3.0;
40        left / right
41    }
42
43    /// Set the sample size based on the shape of the array
44    ///
45    /// # Arguments
46    ///
47    /// * `shape` - The shape of the array
48    ///
49    /// # Returns
50    ///
51    /// The sample size
52    fn set_sample_size(&self, shape: usize) -> usize {
53        if shape < 1000 {
54            25
55        } else if (1000..10000).contains(&shape) {
56            100
57        } else if (10000..100000).contains(&shape) {
58            1000
59        } else if (100000..1000000).contains(&shape) {
60            10000
61        } else if shape >= 1000000 {
62            100000
63        } else {
64            25
65        }
66    }
67
68    /// Compute the mean for a 2D array
69    ///
70    /// # Arguments
71    ///
72    /// * `x` - A 2D array of f64 values
73    ///
74    /// # Returns
75    ///
76    /// A 1D array of f64 values
77    pub fn compute_array_mean<F>(&self, x: &ArrayView2<F>) -> Result<Array1<F>, DriftError>
78    where
79        F: Float
80            + Sync
81            + FromPrimitive
82            + Send
83            + Num
84            + Debug
85            + num_traits::Zero
86            + ndarray::ScalarOperand,
87        F: Into<f64>,
88    {
89        x.mean_axis(Axis(0)).ok_or(DriftError::ComputeMeanError)
90    }
91
92    // Computes control limits for a 2D array of data
93    // Control limits are calculated as per NIST standards
94    // https://www.itl.nist.gov/div898/handbook/pmc/section3/pmc32.htm
95    //
96    // # Arguments
97    //
98    // * `sample_size` - The sample size
99    // * `sample_data` - A 2D array of f64 values
100    // * `num_features` - The number of features
101    // * `features` - A vector of feature names
102    // * `drift_config` - A monitor config
103    fn compute_control_limits<F>(
104        &self,
105        sample_size: usize,
106        sample_data: &ArrayView2<F>,
107        num_features: usize,
108        features: &[String],
109        drift_config: &SpcDriftConfig,
110    ) -> Result<SpcDriftProfile, DriftError>
111    where
112        F: FromPrimitive + Num + Clone + Float + Debug + Sync + Send + ndarray::ScalarOperand,
113
114        F: Into<f64>,
115    {
116        let c4 = self.compute_c4(sample_size);
117        let sample_mean = self.compute_array_mean(sample_data)?;
118
119        let means = sample_mean.slice(s![0..num_features]);
120        let stdev = sample_mean.slice(s![num_features..]);
121        // calculate control limit arrays
122
123        let base = &stdev / F::from(c4).unwrap();
124        let one_sigma = &base * F::from(1.0).unwrap();
125        let two_sigma = &base * F::from(2.0).unwrap();
126        let three_sigma = &base * F::from(3.0).unwrap();
127
128        // calculate control limits for each zone
129        let one_lcl = &means - &one_sigma;
130        let one_ucl = &means + &one_sigma;
131
132        let two_lcl = &means - &two_sigma;
133        let two_ucl = &means + &two_sigma;
134
135        let three_lcl = &means - &three_sigma;
136        let three_ucl = &means + &three_sigma;
137        let center = &means;
138
139        // create monitor profile
140        let mut feat_profile = HashMap::new();
141
142        for (i, feature) in features.iter().enumerate() {
143            feat_profile.insert(
144                feature.to_string(),
145                SpcFeatureDriftProfile {
146                    id: feature.to_string(),
147                    center: center[i].into(),
148                    one_ucl: one_ucl[i].into(),
149                    one_lcl: one_lcl[i].into(),
150                    two_ucl: two_ucl[i].into(),
151                    two_lcl: two_lcl[i].into(),
152                    three_ucl: three_ucl[i].into(),
153                    three_lcl: three_lcl[i].into(),
154                    timestamp: Utc::now(),
155                },
156            );
157        }
158
159        Ok(SpcDriftProfile::new(feat_profile, drift_config.clone()))
160    }
161
162    /// Create a 2D monitor profile
163    ///
164    /// # Arguments
165    ///
166    /// * `features` - A vector of feature names
167    /// * `array` - A 2D array of f64 values
168    ///
169    /// # Returns
170    ///
171    /// A monitor profile
172    pub fn create_2d_drift_profile<F>(
173        &self,
174        features: &[String],
175        array: &ArrayView2<F>,
176        drift_config: &SpcDriftConfig,
177    ) -> Result<SpcDriftProfile, DriftError>
178    where
179        F: Float
180            + Sync
181            + FromPrimitive
182            + Send
183            + Num
184            + Debug
185            + num_traits::Zero
186            + ndarray::ScalarOperand,
187        F: Into<f64>,
188    {
189        let shape = array.shape()[0];
190        let num_features = features.len();
191        let sample_size = self.set_sample_size(shape);
192
193        let nbr_chunks = shape / sample_size;
194        let pb = ProgressBar::new(nbr_chunks as u64);
195
196        // iterate through each feature
197        let sample_vec = array
198            .axis_chunks_iter(Axis(0), sample_size)
199            .into_par_iter()
200            .map(|x| {
201                let mean = x.mean_axis(Axis(0)).unwrap();
202                let stddev = x.std_axis(Axis(0), F::from(1.0).unwrap());
203
204                // append stddev to mean
205                let combined = ndarray::concatenate![Axis(0), mean, stddev];
206                //mean.remove_axis(Axis(1));
207                pb.inc(1);
208
209                combined.to_vec()
210            })
211            .collect::<Vec<_>>();
212
213        // reshape vec to 2D array
214        let sample_data =
215            Array::from_shape_vec((sample_vec.len(), features.len() * 2), sample_vec.concat())?;
216
217        let drift_profile = self.compute_control_limits(
218            sample_size,
219            &sample_data.view(),
220            num_features,
221            features,
222            drift_config,
223        )?;
224
225        Ok(drift_profile)
226    }
227
228    // Samples data for drift detection
229    //
230    // # Arguments
231    //
232    // * `array` - A 2D array of f64 values
233    // * `sample_size` - The sample size
234    // * `columns` - The number of columns
235    //
236    // # Returns
237    // A 2D array of f64 values
238    fn _sample_data<F>(
239        &self,
240        array: &ArrayView2<F>,
241        sample_size: usize,
242        columns: usize,
243    ) -> Result<Array2<f64>, DriftError>
244    where
245        F: Float
246            + Sync
247            + FromPrimitive
248            + Send
249            + Num
250            + Debug
251            + num_traits::Zero
252            + ndarray::ScalarOperand,
253        F: Into<f64>,
254    {
255        let sample_vec: Vec<Vec<f64>> = array
256            .axis_chunks_iter(Axis(0), sample_size)
257            .into_par_iter()
258            .map(|x| {
259                let mean = x.mean_axis(Axis(0)).unwrap();
260                // convert to f64
261                let mean = mean.mapv(|x| x.into());
262                mean.to_vec()
263            })
264            .collect::<Vec<_>>();
265
266        // reshape vec to 2D array
267        let sample_data = Array::from_shape_vec((sample_vec.len(), columns), sample_vec.concat())?;
268        Ok(sample_data)
269    }
270
271    pub fn set_control_drift_value(
272        &self,
273        array: ArrayView1<f64>,
274        num_features: usize,
275        drift_profile: &SpcDriftProfile,
276        features: &[String],
277    ) -> Result<Vec<f64>, DriftError> {
278        let mut drift: Vec<f64> = vec![0.0; num_features];
279        for (i, feature) in features.iter().enumerate() {
280            // check if feature exists
281            if !drift_profile.features.contains_key(feature) {
282                continue;
283            }
284
285            let feature_profile = drift_profile
286                .features
287                .get(feature)
288                .ok_or(DriftError::FeatureNotExistError)?;
289
290            let value = array[i];
291
292            if value > feature_profile.three_ucl {
293                // insert into zero array
294                drift[i] = 4.0;
295            } else if value < feature_profile.three_lcl {
296                drift[i] = -4.0;
297            } else if value < feature_profile.three_ucl && value >= feature_profile.two_ucl {
298                drift[i] = 3.0;
299            } else if value < feature_profile.two_ucl && value >= feature_profile.one_ucl {
300                drift[i] = 2.0;
301            } else if value < feature_profile.one_ucl && value > feature_profile.center {
302                drift[i] = 1.0;
303            } else if value > feature_profile.three_lcl && value <= feature_profile.two_lcl {
304                drift[i] = -3.0;
305            } else if value > feature_profile.two_lcl && value <= feature_profile.one_lcl {
306                drift[i] = -2.0;
307            } else if value > feature_profile.one_lcl && value < feature_profile.center {
308                drift[i] = -1.0;
309            }
310        }
311
312        Ok(drift)
313    }
314
315    // Computes drift on a  2D array of data. Typically of n size >= sample_size
316    //
317    // # Arguments
318    //
319    // * `array` - A 2D array of f64 values
320    // * `features` - A vector of feature names that is mapped to the array (order of features in the order in the array)
321    // * `drift_profile` - A monitor profile
322    //
323
324    pub fn compute_drift<F>(
325        &self,
326        features: &[String],
327        array: &ArrayView2<F>, // n x m data array (features and predictions)
328        drift_profile: &SpcDriftProfile,
329    ) -> Result<SpcDriftMap, DriftError>
330    where
331        F: Float
332            + Sync
333            + FromPrimitive
334            + Send
335            + Num
336            + Debug
337            + num_traits::Zero
338            + ndarray::ScalarOperand,
339        F: Into<f64>,
340    {
341        let num_features = drift_profile.features.len();
342
343        // iterate through each feature
344        let sample_data =
345            self._sample_data(array, drift_profile.config.sample_size, num_features)?;
346
347        // iterate through each row of samples
348        let drift_array = sample_data
349            .axis_iter(Axis(0))
350            .into_par_iter()
351            .map(|x| {
352                // match AlertRules enum
353
354                let drift =
355                    self.set_control_drift_value(x, num_features, drift_profile, features)?;
356
357                Ok(drift)
358            })
359            .collect::<Result<Vec<_>, DriftError>>()?;
360
361        // check for errors
362
363        // convert drift array to 2D array
364        let drift_array =
365            Array::from_shape_vec((drift_array.len(), num_features), drift_array.concat())?;
366
367        let mut drift_map = SpcDriftMap::new(
368            drift_profile.config.name.clone(),
369            drift_profile.config.space.clone(),
370            drift_profile.config.version.clone(),
371        );
372
373        for (i, feature) in features.iter().enumerate() {
374            let drift = drift_array.column(i);
375            let sample = sample_data.column(i);
376
377            let feature_drift = SpcFeatureDrift {
378                samples: sample.to_vec(),
379                drift: drift.to_vec(),
380            };
381
382            drift_map.add_feature(feature.to_string(), feature_drift);
383        }
384
385        Ok(drift_map)
386    }
387
388    // Samples data for drift detection and returns a vector of DriftServerRecord to send to scouter server
389    //
390    // # Arguments
391    //
392    // * `array` - A 2D array of f64 values
393    // * `features` - A vector of feature names that is mapped to the array (order of features in the order in the array)
394    // * `drift_profile` - A monitor profile
395    //
396    pub fn sample_data<F>(
397        &self,
398        features: &[String],
399        array: &ArrayView2<F>, // n x m data array (features and predictions)
400        drift_profile: &SpcDriftProfile,
401    ) -> Result<ServerRecords, DriftError>
402    where
403        F: Float
404            + Sync
405            + FromPrimitive
406            + Send
407            + Num
408            + Debug
409            + num_traits::Zero
410            + ndarray::ScalarOperand,
411        F: Into<f64>,
412    {
413        let num_features = drift_profile.config.alert_config.features_to_monitor.len();
414
415        // iterate through each feature
416        let sample_data =
417            self._sample_data(array, drift_profile.config.sample_size, num_features)?; // n x m data array (features and predictions)
418
419        let mut records = Vec::new();
420
421        for (i, feature) in features.iter().enumerate() {
422            let sample = sample_data.column(i);
423
424            sample.iter().for_each(|value| {
425                let record = SpcServerRecord::new(
426                    drift_profile.config.space.clone(),
427                    drift_profile.config.name.clone(),
428                    drift_profile.config.version.clone(),
429                    feature.to_string(),
430                    *value,
431                );
432
433                records.push(ServerRecord::Spc(record));
434            });
435        }
436
437        Ok(ServerRecords::new(records))
438    }
439
440    pub fn calculate_drift_from_sample(
441        &self,
442        features: &[String],
443        sample_array: &ArrayView2<f64>, // n x m data array (features and predictions)
444        drift_profile: &SpcDriftProfile,
445    ) -> Result<Array2<f64>, DriftError> {
446        // iterate through each row of samples
447        let num_features = features.len();
448        let drift_array = sample_array
449            .axis_iter(Axis(0))
450            .into_par_iter()
451            .map(|x| {
452                // match AlertRules enum
453
454                let drift =
455                    self.set_control_drift_value(x, num_features, drift_profile, features)?;
456                Ok(drift)
457            })
458            .collect::<Result<Vec<_>, DriftError>>()?;
459
460        // convert drift array to 2D array
461        let drift_array =
462            Array::from_shape_vec((drift_array.len(), num_features), drift_array.concat())?;
463
464        Ok(drift_array)
465    }
466}
467
468// convert drift array to 2D array
469
470impl Default for SpcMonitor {
471    fn default() -> Self {
472        SpcMonitor::new()
473    }
474}
475
476#[cfg(test)]
477mod tests {
478
479    // use crate::core::drift::base::DriftProfile;
480    use scouter_types::drift::DriftProfile;
481    use scouter_types::spc::SpcAlertConfig;
482    use scouter_types::util::ProfileBaseArgs;
483    use scouter_types::DriftType;
484
485    use super::*;
486    use approx::relative_eq;
487    use ndarray::Array;
488    use ndarray_rand::rand_distr::Uniform;
489    use ndarray_rand::RandomExt;
490    #[test]
491    fn test_create_2d_drift_profile_f32() {
492        // create 2d array
493        let array = Array::random((1030, 3), Uniform::new(0., 10.));
494
495        // cast array to f32
496        let array = array.mapv(|x| x as f32);
497
498        let features = vec![
499            "feature_1".to_string(),
500            "feature_2".to_string(),
501            "feature_3".to_string(),
502        ];
503
504        let alert_config = SpcAlertConfig::default();
505        let monitor = SpcMonitor::new();
506        let config = SpcDriftConfig::new(
507            Some("name".to_string()),
508            Some("repo".to_string()),
509            None,
510            None,
511            None,
512            Some(alert_config),
513            None,
514        );
515
516        let profile = monitor
517            .create_2d_drift_profile(&features, &array.view(), &config.unwrap())
518            .unwrap();
519        assert_eq!(profile.features.len(), 3);
520
521        // test extra funcs that are used in python
522        profile.__str__();
523        let model_string = profile.model_dump_json();
524
525        let mut loaded_profile = SpcDriftProfile::model_validate_json(model_string);
526        assert_eq!(loaded_profile.features.len(), 3);
527
528        // test update args
529        loaded_profile
530            .update_config_args(
531                Some("updated".to_string()),
532                Some("updated".to_string()),
533                Some("1.0.0".to_string()),
534                Some(loaded_profile.config.sample),
535                Some(loaded_profile.config.sample_size),
536                Some(loaded_profile.config.alert_config.clone()),
537            )
538            .unwrap();
539
540        assert_eq!(loaded_profile.config.name, "updated");
541        assert_eq!(loaded_profile.config.space, "updated");
542        assert_eq!(loaded_profile.config.version, "1.0.0");
543    }
544
545    #[test]
546    fn test_create_2d_drift_profile_f64() {
547        // create 2d array
548        let array = Array::random((1030, 3), Uniform::new(0., 10.));
549
550        let features = vec![
551            "feature_1".to_string(),
552            "feature_2".to_string(),
553            "feature_3".to_string(),
554        ];
555
556        let monitor = SpcMonitor::new();
557        let alert_config = SpcAlertConfig::default();
558        let config = SpcDriftConfig::new(
559            Some("repo".to_string()),
560            Some("name".to_string()),
561            None,
562            None,
563            None,
564            Some(alert_config),
565            None,
566        );
567
568        let profile = monitor
569            .create_2d_drift_profile(&features, &array.view(), &config.unwrap())
570            .unwrap();
571        assert_eq!(profile.features.len(), 3);
572
573        let args = profile.get_base_args();
574        assert_eq!(args.name, "name");
575        assert_eq!(args.space, "repo");
576        assert_eq!(args.version, Some("0.1.0".to_string()));
577        assert_eq!(args.schedule, "0 0 0 * * *");
578
579        let value = profile.to_value();
580
581        // test DriftProfile
582        let profile = DriftProfile::from_value(value).unwrap();
583        let new_args = profile.get_base_args();
584
585        assert_eq!(new_args, args);
586
587        let profile_str = profile.to_value().to_string();
588        DriftProfile::from_str(DriftType::Spc, profile_str).unwrap();
589    }
590
591    #[test]
592    fn test_drift_detect_process() {
593        // create 2d array
594        let array = Array::random((1030, 3), Uniform::new(0., 10.));
595
596        let features = vec![
597            "feature_1".to_string(),
598            "feature_2".to_string(),
599            "feature_3".to_string(),
600        ];
601        let alert_config = SpcAlertConfig::default();
602        let config = SpcDriftConfig::new(
603            Some("name".to_string()),
604            Some("repo".to_string()),
605            None,
606            None,
607            None,
608            Some(alert_config),
609            None,
610        );
611
612        let monitor = SpcMonitor::new();
613
614        let profile = monitor
615            .create_2d_drift_profile(&features, &array.view(), &config.unwrap())
616            .unwrap();
617        assert_eq!(profile.features.len(), 3);
618
619        // change first 100 rows to 100 at index 1
620        let mut array = array.to_owned();
621        array.slice_mut(s![0..200, 1]).fill(100.0);
622
623        let drift_map = monitor
624            .compute_drift(&features, &array.view(), &profile)
625            .unwrap();
626
627        // assert relative
628        let feature_1 = drift_map.features.get("feature_2").unwrap();
629        assert!(relative_eq!(feature_1.samples[0], 100.0, epsilon = 2.0));
630
631        // convert profile to json and load it back
632        let _ = drift_map.model_dump_json();
633
634        // create server records
635    }
636
637    #[test]
638    fn test_sample_data() {
639        // create 2d array
640        let array = Array::random((1030, 3), Uniform::new(0., 10.));
641
642        let features = vec![
643            "feature_1".to_string(),
644            "feature_2".to_string(),
645            "feature_3".to_string(),
646        ];
647        let alert_config = SpcAlertConfig {
648            features_to_monitor: features.clone(),
649            ..Default::default()
650        };
651        let config = SpcDriftConfig::new(
652            Some("name".to_string()),
653            Some("repo".to_string()),
654            None,
655            None,
656            None,
657            Some(alert_config),
658            None,
659        );
660
661        let monitor = SpcMonitor::new();
662
663        let profile = monitor
664            .create_2d_drift_profile(&features, &array.view(), &config.unwrap())
665            .unwrap();
666        assert_eq!(profile.features.len(), 3);
667
668        let server_records = monitor
669            .sample_data(&features, &array.view(), &profile)
670            .unwrap();
671
672        assert_eq!(server_records.records.len(), 126);
673
674        // create server records
675    }
676
677    #[test]
678    fn test_calculate_drift_from_sample() {
679        let array = Array::random((1030, 3), Uniform::new(0., 10.));
680
681        let features = vec![
682            "feature_1".to_string(),
683            "feature_2".to_string(),
684            "feature_3".to_string(),
685        ];
686
687        let alert_config = SpcAlertConfig::default();
688        let config = SpcDriftConfig::new(
689            Some("name".to_string()),
690            Some("repo".to_string()),
691            None,
692            None,
693            None,
694            Some(alert_config),
695            None,
696        );
697
698        let monitor = SpcMonitor::new();
699
700        let profile = monitor
701            .create_2d_drift_profile(&features, &array.view(), &config.unwrap())
702            .unwrap();
703        assert_eq!(profile.features.len(), 3);
704
705        // change first 100 rows to 100 at index 1
706        let mut array = array.to_owned();
707        array.slice_mut(s![0..200, 1]).fill(100.0);
708
709        let drift_array = monitor
710            .calculate_drift_from_sample(&features, &array.view(), &profile)
711            .unwrap();
712
713        // assert relative
714        let feature_1 = drift_array.column(1);
715        assert!(relative_eq!(feature_1[0], 4.0, epsilon = 2.0));
716    }
717}