Skip to main content

scouter_drift/spc/
monitor.rs

1use crate::error::DriftError;
2use crate::spc::types::{SpcDriftMap, SpcFeatureDrift};
3use crate::utils::CategoricalFeatureHelpers;
4use chrono::Utc;
5use indicatif::ProgressBar;
6use ndarray::prelude::*;
7use ndarray::Axis;
8use num_traits::{Float, FromPrimitive, Num};
9use rayon::prelude::*;
10use scouter_types::{
11    spc::{SpcDriftConfig, SpcDriftProfile, SpcFeatureDriftProfile},
12    ServerRecord, ServerRecords, SpcRecord,
13};
14use std::collections::HashMap;
15use std::fmt::Debug;
16
17pub struct SpcMonitor {}
18
19impl CategoricalFeatureHelpers for SpcMonitor {}
20
21impl SpcMonitor {
22    pub fn new() -> Self {
23        SpcMonitor {}
24    }
25
26    /// Compute c4 for control limits
27    ///
28    /// # Arguments
29    ///
30    /// * `number` - The sample size
31    ///
32    /// # Returns
33    ///
34    /// The c4 value
35    fn compute_c4(&self, number: usize) -> f32 {
36        //c4 is asymptotically equivalent to (4n-4)/(4n-3)
37        let n = number as f32;
38        let left = 4.0 * n - 4.0;
39        let right = 4.0 * n - 3.0;
40        left / right
41    }
42
43    /// Set the sample size based on the shape of the array
44    ///
45    /// # Arguments
46    ///
47    /// * `shape` - The shape of the array
48    ///
49    /// # Returns
50    ///
51    /// The sample size
52    fn set_sample_size(&self, shape: usize) -> usize {
53        if shape < 1000 {
54            25
55        } else if (1000..10000).contains(&shape) {
56            100
57        } else if (10000..100000).contains(&shape) {
58            1000
59        } else if (100000..1000000).contains(&shape) {
60            10000
61        } else if shape >= 1000000 {
62            100000
63        } else {
64            25
65        }
66    }
67
68    /// Compute the mean for a 2D array
69    ///
70    /// # Arguments
71    ///
72    /// * `x` - A 2D array of f64 values
73    ///
74    /// # Returns
75    ///
76    /// A 1D array of f64 values
77    pub fn compute_array_mean<F>(&self, x: &ArrayView2<F>) -> Result<Array1<F>, DriftError>
78    where
79        F: Float
80            + Sync
81            + FromPrimitive
82            + Send
83            + Num
84            + Debug
85            + num_traits::Zero
86            + ndarray::ScalarOperand,
87        F: Into<f64>,
88    {
89        x.mean_axis(Axis(0)).ok_or(DriftError::ComputeMeanError)
90    }
91
92    // Computes control limits for a 2D array of data
93    // Control limits are calculated as per NIST standards
94    // https://www.itl.nist.gov/div898/handbook/pmc/section3/pmc32.htm
95    //
96    // # Arguments
97    //
98    // * `sample_size` - The sample size
99    // * `sample_data` - A 2D array of f64 values
100    // * `num_features` - The number of features
101    // * `features` - A vector of feature names
102    // * `drift_config` - A monitor config
103    fn compute_control_limits<F>(
104        &self,
105        sample_size: usize,
106        sample_data: &ArrayView2<F>,
107        num_features: usize,
108        features: &[String],
109        drift_config: &SpcDriftConfig,
110    ) -> Result<SpcDriftProfile, DriftError>
111    where
112        F: FromPrimitive + Num + Clone + Float + Debug + Sync + Send + ndarray::ScalarOperand,
113
114        F: Into<f64>,
115    {
116        let c4 = self.compute_c4(sample_size);
117        let sample_mean = self.compute_array_mean(sample_data)?;
118
119        let means = sample_mean.slice(s![0..num_features]);
120        let stdev = sample_mean.slice(s![num_features..]);
121        // calculate control limit arrays
122
123        let base = &stdev / F::from(c4).unwrap();
124        let one_sigma = &base * F::from(1.0).unwrap();
125        let two_sigma = &base * F::from(2.0).unwrap();
126        let three_sigma = &base * F::from(3.0).unwrap();
127
128        // calculate control limits for each zone
129        let one_lcl = &means - &one_sigma;
130        let one_ucl = &means + &one_sigma;
131
132        let two_lcl = &means - &two_sigma;
133        let two_ucl = &means + &two_sigma;
134
135        let three_lcl = &means - &three_sigma;
136        let three_ucl = &means + &three_sigma;
137        let center = &means;
138
139        // create monitor profile
140        let mut feat_profile = HashMap::new();
141
142        for (i, feature) in features.iter().enumerate() {
143            feat_profile.insert(
144                feature.to_string(),
145                SpcFeatureDriftProfile {
146                    id: feature.to_string(),
147                    center: center[i].into(),
148                    one_ucl: one_ucl[i].into(),
149                    one_lcl: one_lcl[i].into(),
150                    two_ucl: two_ucl[i].into(),
151                    two_lcl: two_lcl[i].into(),
152                    three_ucl: three_ucl[i].into(),
153                    three_lcl: three_lcl[i].into(),
154                    timestamp: Utc::now(),
155                },
156            );
157        }
158
159        Ok(SpcDriftProfile::new(feat_profile, drift_config.clone()))
160    }
161
162    /// Create a 2D monitor profile
163    ///
164    /// # Arguments
165    ///
166    /// * `features` - A vector of feature names
167    /// * `array` - A 2D array of f64 values
168    ///
169    /// # Returns
170    ///
171    /// A monitor profile
172    pub fn create_2d_drift_profile<F>(
173        &self,
174        features: &[String],
175        array: &ArrayView2<F>,
176        drift_config: &SpcDriftConfig,
177    ) -> Result<SpcDriftProfile, DriftError>
178    where
179        F: Float
180            + Sync
181            + FromPrimitive
182            + Send
183            + Num
184            + Debug
185            + num_traits::Zero
186            + ndarray::ScalarOperand,
187        F: Into<f64>,
188    {
189        let shape = array.shape()[0];
190        let num_features = features.len();
191        let sample_size = self.set_sample_size(shape);
192
193        let nbr_chunks = shape / sample_size;
194        let pb = ProgressBar::new(nbr_chunks as u64);
195
196        // iterate through each feature
197        let sample_vec = array
198            .axis_chunks_iter(Axis(0), sample_size)
199            .into_par_iter()
200            .map(|x| {
201                let mean = x.mean_axis(Axis(0)).unwrap();
202                let stddev = x.std_axis(Axis(0), F::from(1.0).unwrap());
203
204                // append stddev to mean
205                let combined = ndarray::concatenate![Axis(0), mean, stddev];
206                //mean.remove_axis(Axis(1));
207                pb.inc(1);
208
209                combined.to_vec()
210            })
211            .collect::<Vec<_>>();
212
213        // reshape vec to 2D array
214        let sample_data =
215            Array::from_shape_vec((sample_vec.len(), features.len() * 2), sample_vec.concat())?;
216
217        let drift_profile = self.compute_control_limits(
218            sample_size,
219            &sample_data.view(),
220            num_features,
221            features,
222            drift_config,
223        )?;
224
225        Ok(drift_profile)
226    }
227
228    // Samples data for drift detection
229    //
230    // # Arguments
231    //
232    // * `array` - A 2D array of f64 values
233    // * `sample_size` - The sample size
234    // * `columns` - The number of columns
235    //
236    // # Returns
237    // A 2D array of f64 values
238    fn _sample_data<F>(
239        &self,
240        array: &ArrayView2<F>,
241        sample_size: usize,
242        columns: usize,
243    ) -> Result<Array2<f64>, DriftError>
244    where
245        F: Float
246            + Sync
247            + FromPrimitive
248            + Send
249            + Num
250            + Debug
251            + num_traits::Zero
252            + ndarray::ScalarOperand,
253        F: Into<f64>,
254    {
255        let sample_vec: Vec<Vec<f64>> = array
256            .axis_chunks_iter(Axis(0), sample_size)
257            .into_par_iter()
258            .map(|x| {
259                let mean = x.mean_axis(Axis(0)).unwrap();
260                // convert to f64
261                let mean = mean.mapv(|x| x.into());
262                mean.to_vec()
263            })
264            .collect::<Vec<_>>();
265
266        // reshape vec to 2D array
267        let sample_data = Array::from_shape_vec((sample_vec.len(), columns), sample_vec.concat())?;
268        Ok(sample_data)
269    }
270
271    pub fn set_control_drift_value(
272        &self,
273        array: ArrayView1<f64>,
274        num_features: usize,
275        drift_profile: &SpcDriftProfile,
276        features: &[String],
277    ) -> Result<Vec<f64>, DriftError> {
278        let mut drift: Vec<f64> = vec![0.0; num_features];
279        for (i, feature) in features.iter().enumerate() {
280            // check if feature exists
281            if !drift_profile.features.contains_key(feature) {
282                continue;
283            }
284
285            let feature_profile = drift_profile
286                .features
287                .get(feature)
288                .ok_or(DriftError::FeatureNotExistError)?;
289
290            let value = array[i];
291
292            if value > feature_profile.three_ucl {
293                // insert into zero array
294                drift[i] = 4.0;
295            } else if value < feature_profile.three_lcl {
296                drift[i] = -4.0;
297            } else if value < feature_profile.three_ucl && value >= feature_profile.two_ucl {
298                drift[i] = 3.0;
299            } else if value < feature_profile.two_ucl && value >= feature_profile.one_ucl {
300                drift[i] = 2.0;
301            } else if value < feature_profile.one_ucl && value > feature_profile.center {
302                drift[i] = 1.0;
303            } else if value > feature_profile.three_lcl && value <= feature_profile.two_lcl {
304                drift[i] = -3.0;
305            } else if value > feature_profile.two_lcl && value <= feature_profile.one_lcl {
306                drift[i] = -2.0;
307            } else if value > feature_profile.one_lcl && value < feature_profile.center {
308                drift[i] = -1.0;
309            }
310        }
311
312        Ok(drift)
313    }
314
315    // Computes drift on a  2D array of data. Typically of n size >= sample_size
316    //
317    // # Arguments
318    //
319    // * `array` - A 2D array of f64 values
320    // * `features` - A vector of feature names that is mapped to the array (order of features in the order in the array)
321    // * `drift_profile` - A monitor profile
322    //
323
324    pub fn compute_drift<F>(
325        &self,
326        features: &[String],
327        array: &ArrayView2<F>, // n x m data array (features and predictions)
328        drift_profile: &SpcDriftProfile,
329    ) -> Result<SpcDriftMap, DriftError>
330    where
331        F: Float
332            + Sync
333            + FromPrimitive
334            + Send
335            + Num
336            + Debug
337            + num_traits::Zero
338            + ndarray::ScalarOperand,
339        F: Into<f64>,
340    {
341        let num_features = drift_profile.features.len();
342
343        // iterate through each feature
344        let sample_data =
345            self._sample_data(array, drift_profile.config.sample_size, num_features)?;
346
347        // iterate through each row of samples
348        let drift_array = sample_data
349            .axis_iter(Axis(0))
350            .into_par_iter()
351            .map(|x| {
352                // match AlertRules enum
353
354                let drift =
355                    self.set_control_drift_value(x, num_features, drift_profile, features)?;
356
357                Ok(drift)
358            })
359            .collect::<Result<Vec<_>, DriftError>>()?;
360
361        // check for errors
362
363        // convert drift array to 2D array
364        let drift_array =
365            Array::from_shape_vec((drift_array.len(), num_features), drift_array.concat())?;
366
367        let mut drift_map = SpcDriftMap::new(
368            drift_profile.config.name.clone(),
369            drift_profile.config.space.clone(),
370            drift_profile.config.version.clone(),
371        );
372
373        for (i, feature) in features.iter().enumerate() {
374            let drift = drift_array.column(i);
375            let sample = sample_data.column(i);
376
377            let feature_drift = SpcFeatureDrift {
378                samples: sample.to_vec(),
379                drift: drift.to_vec(),
380            };
381
382            drift_map.add_feature(feature.to_string(), feature_drift);
383        }
384
385        Ok(drift_map)
386    }
387
388    // Samples data for drift detection and returns a vector of DriftServerRecord to send to scouter server
389    //
390    // # Arguments
391    //
392    // * `array` - A 2D array of f64 values
393    // * `features` - A vector of feature names that is mapped to the array (order of features in the order in the array)
394    // * `drift_profile` - A monitor profile
395    //
396    pub fn sample_data<F>(
397        &self,
398        features: &[String],
399        array: &ArrayView2<F>, // n x m data array (features and predictions)
400        drift_profile: &SpcDriftProfile,
401    ) -> Result<ServerRecords, DriftError>
402    where
403        F: Float
404            + Sync
405            + FromPrimitive
406            + Send
407            + Num
408            + Debug
409            + num_traits::Zero
410            + ndarray::ScalarOperand,
411        F: Into<f64>,
412    {
413        let num_features = features.len();
414
415        // iterate through each feature
416        let sample_data =
417            self._sample_data(array, drift_profile.config.sample_size, num_features)?; // n x m data array (features and predictions)
418
419        let mut records = Vec::new();
420
421        for (i, feature) in features.iter().enumerate() {
422            if !drift_profile.features.contains_key(feature) {
423                continue;
424            }
425
426            let sample = sample_data.column(i);
427
428            sample.iter().for_each(|value| {
429                let record = SpcRecord::new(
430                    drift_profile.config.uid.clone(),
431                    feature.to_string(),
432                    *value,
433                );
434
435                records.push(ServerRecord::Spc(record));
436            });
437        }
438        Ok(ServerRecords::new(records))
439    }
440
441    pub fn calculate_drift_from_sample(
442        &self,
443        features: &[String],
444        sample_array: &ArrayView2<f64>, // n x m data array (features and predictions)
445        drift_profile: &SpcDriftProfile,
446    ) -> Result<Array2<f64>, DriftError> {
447        // iterate through each row of samples
448        let num_features = features.len();
449        let drift_array = sample_array
450            .axis_iter(Axis(0))
451            .into_par_iter()
452            .map(|x| {
453                // match AlertRules enum
454
455                let drift =
456                    self.set_control_drift_value(x, num_features, drift_profile, features)?;
457                Ok(drift)
458            })
459            .collect::<Result<Vec<_>, DriftError>>()?;
460
461        // convert drift array to 2D array
462        let drift_array =
463            Array::from_shape_vec((drift_array.len(), num_features), drift_array.concat())?;
464
465        Ok(drift_array)
466    }
467}
468
469// convert drift array to 2D array
470
471impl Default for SpcMonitor {
472    fn default() -> Self {
473        SpcMonitor::new()
474    }
475}
476
477#[cfg(test)]
478mod tests {
479
480    // use crate::core::drift::base::DriftProfile;
481    use scouter_types::drift::DriftProfile;
482    use scouter_types::spc::SpcAlertConfig;
483    use scouter_types::util::ProfileBaseArgs;
484    use scouter_types::DriftType;
485
486    use super::*;
487    use approx::relative_eq;
488    use ndarray::Array;
489    use ndarray_rand::rand_distr::Uniform;
490    use ndarray_rand::RandomExt;
491    #[test]
492    fn test_create_2d_drift_profile_f32() {
493        // create 2d array
494        let array = Array::random((1030, 3), Uniform::new(0., 10.).unwrap());
495
496        // cast array to f32
497        let array = array.mapv(|x| x as f32);
498
499        let features = vec![
500            "feature_1".to_string(),
501            "feature_2".to_string(),
502            "feature_3".to_string(),
503        ];
504
505        let alert_config = SpcAlertConfig::default();
506        let monitor = SpcMonitor::new();
507        let config = SpcDriftConfig::new(
508            "space",
509            "name",
510            "1.0.0",
511            None,
512            None,
513            Some(alert_config),
514            None,
515        );
516
517        let profile = monitor
518            .create_2d_drift_profile(&features, &array.view(), &config.unwrap())
519            .unwrap();
520        assert_eq!(profile.features.len(), 3);
521
522        // test extra funcs that are used in python
523        profile.__str__();
524        let model_string = profile.model_dump_json();
525
526        let mut loaded_profile = SpcDriftProfile::model_validate_json(model_string);
527        assert_eq!(loaded_profile.features.len(), 3);
528
529        // test update args
530        loaded_profile
531            .update_config_args(
532                Some("updated".to_string()),
533                Some("updated".to_string()),
534                Some("1.0.0".to_string()),
535                Some(loaded_profile.config.uid.clone()),
536                Some(loaded_profile.config.sample),
537                Some(loaded_profile.config.sample_size),
538                Some(loaded_profile.config.alert_config.clone()),
539            )
540            .unwrap();
541
542        assert_eq!(loaded_profile.config.name, "updated");
543        assert_eq!(loaded_profile.config.space, "updated");
544        assert_eq!(loaded_profile.config.version, "1.0.0");
545    }
546
547    #[test]
548    fn test_create_2d_drift_profile_f64() {
549        // create 2d array
550        let array = Array::random((1030, 3), Uniform::new(0., 10.).unwrap());
551
552        let features = vec![
553            "feature_1".to_string(),
554            "feature_2".to_string(),
555            "feature_3".to_string(),
556        ];
557
558        let monitor = SpcMonitor::new();
559        let alert_config = SpcAlertConfig::default();
560        let config = SpcDriftConfig::new(
561            "space",
562            "name",
563            "1.0.0",
564            None,
565            None,
566            Some(alert_config),
567            None,
568        );
569
570        let profile = monitor
571            .create_2d_drift_profile(&features, &array.view(), &config.unwrap())
572            .unwrap();
573        assert_eq!(profile.features.len(), 3);
574
575        let args = profile.get_base_args();
576        assert_eq!(args.name, "name");
577        assert_eq!(args.space, "space");
578        assert_eq!(args.version, Some("1.0.0".to_string()));
579        assert_eq!(args.schedule, "0 0 0 * * *");
580
581        let value = profile.to_value();
582
583        // test DriftProfile
584        let profile = DriftProfile::from_value(value).unwrap();
585        let new_args = profile.get_base_args();
586
587        assert_eq!(new_args, args);
588
589        let profile_str = profile.to_value().to_string();
590        DriftProfile::from_str(&DriftType::Spc, &profile_str).unwrap();
591    }
592
593    #[test]
594    fn test_drift_detect_process() {
595        // create 2d array
596        let array = Array::random((1030, 3), Uniform::new(0., 10.).unwrap());
597
598        let features = vec![
599            "feature_1".to_string(),
600            "feature_2".to_string(),
601            "feature_3".to_string(),
602        ];
603        let alert_config = SpcAlertConfig::default();
604        let config = SpcDriftConfig::new(
605            "space",
606            "name",
607            "1.0.0",
608            None,
609            None,
610            Some(alert_config),
611            None,
612        );
613
614        let monitor = SpcMonitor::new();
615
616        let profile = monitor
617            .create_2d_drift_profile(&features, &array.view(), &config.unwrap())
618            .unwrap();
619        assert_eq!(profile.features.len(), 3);
620
621        // change first 100 rows to 100 at index 1
622        let mut array = array.to_owned();
623        array.slice_mut(s![0..200, 1]).fill(100.0);
624
625        let drift_map = monitor
626            .compute_drift(&features, &array.view(), &profile)
627            .unwrap();
628
629        // assert relative
630        let feature_1 = drift_map.features.get("feature_2").unwrap();
631        assert!(relative_eq!(feature_1.samples[0], 100.0, epsilon = 2.0));
632
633        // convert profile to json and load it back
634        let _ = drift_map.model_dump_json();
635
636        // create server records
637    }
638
639    #[test]
640    fn test_sample_data() {
641        // create 2d array
642        let array = Array::random((1030, 3), Uniform::new(0., 10.).unwrap());
643
644        let features = vec![
645            "feature_1".to_string(),
646            "feature_2".to_string(),
647            "feature_3".to_string(),
648        ];
649        let alert_config = SpcAlertConfig {
650            features_to_monitor: features.clone(),
651            ..Default::default()
652        };
653        let config = SpcDriftConfig::new(
654            "space",
655            "name",
656            "1.0.0",
657            None,
658            None,
659            Some(alert_config),
660            None,
661        );
662
663        let monitor = SpcMonitor::new();
664
665        let profile = monitor
666            .create_2d_drift_profile(&features, &array.view(), &config.unwrap())
667            .unwrap();
668        assert_eq!(profile.features.len(), 3);
669
670        let server_records = monitor
671            .sample_data(&features, &array.view(), &profile)
672            .unwrap();
673
674        assert_eq!(server_records.records.len(), 126);
675
676        // create server records
677    }
678
679    #[test]
680    fn test_calculate_drift_from_sample() {
681        let array = Array::random((1030, 3), Uniform::new(0., 10.).unwrap());
682
683        let features = vec![
684            "feature_1".to_string(),
685            "feature_2".to_string(),
686            "feature_3".to_string(),
687        ];
688
689        let alert_config = SpcAlertConfig::default();
690        let config = SpcDriftConfig::new(
691            "space",
692            "name",
693            "1.0.0",
694            None,
695            None,
696            Some(alert_config),
697            None,
698        );
699
700        let monitor = SpcMonitor::new();
701
702        let profile = monitor
703            .create_2d_drift_profile(&features, &array.view(), &config.unwrap())
704            .unwrap();
705        assert_eq!(profile.features.len(), 3);
706
707        // change first 100 rows to 100 at index 1
708        let mut array = array.to_owned();
709        array.slice_mut(s![0..200, 1]).fill(100.0);
710
711        let drift_array = monitor
712            .calculate_drift_from_sample(&features, &array.view(), &profile)
713            .unwrap();
714
715        // assert relative
716        let feature_1 = drift_array.column(1);
717        assert!(relative_eq!(feature_1[0], 4.0, epsilon = 2.0));
718    }
719}