scouter_drift/spc/
monitor.rs

1use crate::error::DriftError;
2use crate::spc::types::{SpcDriftMap, SpcFeatureDrift};
3use crate::utils::CategoricalFeatureHelpers;
4use chrono::Utc;
5use indicatif::ProgressBar;
6use ndarray::prelude::*;
7use ndarray::Axis;
8use num_traits::{Float, FromPrimitive, Num};
9use rayon::prelude::*;
10use scouter_types::{
11    spc::{SpcDriftConfig, SpcDriftProfile, SpcFeatureDriftProfile},
12    ServerRecord, ServerRecords, SpcServerRecord,
13};
14use std::collections::HashMap;
15use std::fmt::Debug;
16
17pub struct SpcMonitor {}
18
19impl CategoricalFeatureHelpers for SpcMonitor {}
20
21impl SpcMonitor {
22    pub fn new() -> Self {
23        SpcMonitor {}
24    }
25
26    /// Compute c4 for control limits
27    ///
28    /// # Arguments
29    ///
30    /// * `number` - The sample size
31    ///
32    /// # Returns
33    ///
34    /// The c4 value
35    fn compute_c4(&self, number: usize) -> f32 {
36        //c4 is asymptotically equivalent to (4n-4)/(4n-3)
37        let n = number as f32;
38        let left = 4.0 * n - 4.0;
39        let right = 4.0 * n - 3.0;
40        left / right
41    }
42
43    /// Set the sample size based on the shape of the array
44    ///
45    /// # Arguments
46    ///
47    /// * `shape` - The shape of the array
48    ///
49    /// # Returns
50    ///
51    /// The sample size
52    fn set_sample_size(&self, shape: usize) -> usize {
53        if shape < 1000 {
54            25
55        } else if (1000..10000).contains(&shape) {
56            100
57        } else if (10000..100000).contains(&shape) {
58            1000
59        } else if (100000..1000000).contains(&shape) {
60            10000
61        } else if shape >= 1000000 {
62            100000
63        } else {
64            25
65        }
66    }
67
68    /// Compute the mean for a 2D array
69    ///
70    /// # Arguments
71    ///
72    /// * `x` - A 2D array of f64 values
73    ///
74    /// # Returns
75    ///
76    /// A 1D array of f64 values
77    pub fn compute_array_mean<F>(&self, x: &ArrayView2<F>) -> Result<Array1<F>, DriftError>
78    where
79        F: Float
80            + Sync
81            + FromPrimitive
82            + Send
83            + Num
84            + Debug
85            + num_traits::Zero
86            + ndarray::ScalarOperand,
87        F: Into<f64>,
88    {
89        x.mean_axis(Axis(0)).ok_or(DriftError::ComputeMeanError)
90    }
91
92    // Computes control limits for a 2D array of data
93    // Control limits are calculated as per NIST standards
94    // https://www.itl.nist.gov/div898/handbook/pmc/section3/pmc32.htm
95    //
96    // # Arguments
97    //
98    // * `sample_size` - The sample size
99    // * `sample_data` - A 2D array of f64 values
100    // * `num_features` - The number of features
101    // * `features` - A vector of feature names
102    // * `drift_config` - A monitor config
103    fn compute_control_limits<F>(
104        &self,
105        sample_size: usize,
106        sample_data: &ArrayView2<F>,
107        num_features: usize,
108        features: &[String],
109        drift_config: &SpcDriftConfig,
110    ) -> Result<SpcDriftProfile, DriftError>
111    where
112        F: FromPrimitive + Num + Clone + Float + Debug + Sync + Send + ndarray::ScalarOperand,
113
114        F: Into<f64>,
115    {
116        let c4 = self.compute_c4(sample_size);
117        let sample_mean = self.compute_array_mean(sample_data)?;
118
119        let means = sample_mean.slice(s![0..num_features]);
120        let stdev = sample_mean.slice(s![num_features..]);
121        // calculate control limit arrays
122
123        let base = &stdev / F::from(c4).unwrap();
124        let one_sigma = &base * F::from(1.0).unwrap();
125        let two_sigma = &base * F::from(2.0).unwrap();
126        let three_sigma = &base * F::from(3.0).unwrap();
127
128        // calculate control limits for each zone
129        let one_lcl = &means - &one_sigma;
130        let one_ucl = &means + &one_sigma;
131
132        let two_lcl = &means - &two_sigma;
133        let two_ucl = &means + &two_sigma;
134
135        let three_lcl = &means - &three_sigma;
136        let three_ucl = &means + &three_sigma;
137        let center = &means;
138
139        // create monitor profile
140        let mut feat_profile = HashMap::new();
141
142        for (i, feature) in features.iter().enumerate() {
143            feat_profile.insert(
144                feature.to_string(),
145                SpcFeatureDriftProfile {
146                    id: feature.to_string(),
147                    center: center[i].into(),
148                    one_ucl: one_ucl[i].into(),
149                    one_lcl: one_lcl[i].into(),
150                    two_ucl: two_ucl[i].into(),
151                    two_lcl: two_lcl[i].into(),
152                    three_ucl: three_ucl[i].into(),
153                    three_lcl: three_lcl[i].into(),
154                    timestamp: Utc::now(),
155                },
156            );
157        }
158
159        Ok(SpcDriftProfile::new(
160            feat_profile,
161            drift_config.clone(),
162            None,
163        ))
164    }
165
166    /// Create a 2D monitor profile
167    ///
168    /// # Arguments
169    ///
170    /// * `features` - A vector of feature names
171    /// * `array` - A 2D array of f64 values
172    ///
173    /// # Returns
174    ///
175    /// A monitor profile
176    pub fn create_2d_drift_profile<F>(
177        &self,
178        features: &[String],
179        array: &ArrayView2<F>,
180        drift_config: &SpcDriftConfig,
181    ) -> Result<SpcDriftProfile, DriftError>
182    where
183        F: Float
184            + Sync
185            + FromPrimitive
186            + Send
187            + Num
188            + Debug
189            + num_traits::Zero
190            + ndarray::ScalarOperand,
191        F: Into<f64>,
192    {
193        let shape = array.shape()[0];
194        let num_features = features.len();
195        let sample_size = self.set_sample_size(shape);
196
197        let nbr_chunks = shape / sample_size;
198        let pb = ProgressBar::new(nbr_chunks as u64);
199
200        // iterate through each feature
201        let sample_vec = array
202            .axis_chunks_iter(Axis(0), sample_size)
203            .into_par_iter()
204            .map(|x| {
205                let mean = x.mean_axis(Axis(0)).unwrap();
206                let stddev = x.std_axis(Axis(0), F::from(1.0).unwrap());
207
208                // append stddev to mean
209                let combined = ndarray::concatenate![Axis(0), mean, stddev];
210                //mean.remove_axis(Axis(1));
211                pb.inc(1);
212
213                combined.to_vec()
214            })
215            .collect::<Vec<_>>();
216
217        // reshape vec to 2D array
218        let sample_data =
219            Array::from_shape_vec((sample_vec.len(), features.len() * 2), sample_vec.concat())?;
220
221        let drift_profile = self.compute_control_limits(
222            sample_size,
223            &sample_data.view(),
224            num_features,
225            features,
226            drift_config,
227        )?;
228
229        Ok(drift_profile)
230    }
231
232    // Samples data for drift detection
233    //
234    // # Arguments
235    //
236    // * `array` - A 2D array of f64 values
237    // * `sample_size` - The sample size
238    // * `columns` - The number of columns
239    //
240    // # Returns
241    // A 2D array of f64 values
242    fn _sample_data<F>(
243        &self,
244        array: &ArrayView2<F>,
245        sample_size: usize,
246        columns: usize,
247    ) -> Result<Array2<f64>, DriftError>
248    where
249        F: Float
250            + Sync
251            + FromPrimitive
252            + Send
253            + Num
254            + Debug
255            + num_traits::Zero
256            + ndarray::ScalarOperand,
257        F: Into<f64>,
258    {
259        let sample_vec: Vec<Vec<f64>> = array
260            .axis_chunks_iter(Axis(0), sample_size)
261            .into_par_iter()
262            .map(|x| {
263                let mean = x.mean_axis(Axis(0)).unwrap();
264                // convert to f64
265                let mean = mean.mapv(|x| x.into());
266                mean.to_vec()
267            })
268            .collect::<Vec<_>>();
269
270        // reshape vec to 2D array
271        let sample_data = Array::from_shape_vec((sample_vec.len(), columns), sample_vec.concat())?;
272        Ok(sample_data)
273    }
274
275    pub fn set_control_drift_value(
276        &self,
277        array: ArrayView1<f64>,
278        num_features: usize,
279        drift_profile: &SpcDriftProfile,
280        features: &[String],
281    ) -> Result<Vec<f64>, DriftError> {
282        let mut drift: Vec<f64> = vec![0.0; num_features];
283        for (i, feature) in features.iter().enumerate() {
284            // check if feature exists
285            if !drift_profile.features.contains_key(feature) {
286                continue;
287            }
288
289            let feature_profile = drift_profile
290                .features
291                .get(feature)
292                .ok_or(DriftError::FeatureNotExistError)?;
293
294            let value = array[i];
295
296            if value > feature_profile.three_ucl {
297                // insert into zero array
298                drift[i] = 4.0;
299            } else if value < feature_profile.three_lcl {
300                drift[i] = -4.0;
301            } else if value < feature_profile.three_ucl && value >= feature_profile.two_ucl {
302                drift[i] = 3.0;
303            } else if value < feature_profile.two_ucl && value >= feature_profile.one_ucl {
304                drift[i] = 2.0;
305            } else if value < feature_profile.one_ucl && value > feature_profile.center {
306                drift[i] = 1.0;
307            } else if value > feature_profile.three_lcl && value <= feature_profile.two_lcl {
308                drift[i] = -3.0;
309            } else if value > feature_profile.two_lcl && value <= feature_profile.one_lcl {
310                drift[i] = -2.0;
311            } else if value > feature_profile.one_lcl && value < feature_profile.center {
312                drift[i] = -1.0;
313            }
314        }
315
316        Ok(drift)
317    }
318
319    // Computes drift on a  2D array of data. Typically of n size >= sample_size
320    //
321    // # Arguments
322    //
323    // * `array` - A 2D array of f64 values
324    // * `features` - A vector of feature names that is mapped to the array (order of features in the order in the array)
325    // * `drift_profile` - A monitor profile
326    //
327
328    pub fn compute_drift<F>(
329        &self,
330        features: &[String],
331        array: &ArrayView2<F>, // n x m data array (features and predictions)
332        drift_profile: &SpcDriftProfile,
333    ) -> Result<SpcDriftMap, DriftError>
334    where
335        F: Float
336            + Sync
337            + FromPrimitive
338            + Send
339            + Num
340            + Debug
341            + num_traits::Zero
342            + ndarray::ScalarOperand,
343        F: Into<f64>,
344    {
345        let num_features = drift_profile.features.len();
346
347        // iterate through each feature
348        let sample_data =
349            self._sample_data(array, drift_profile.config.sample_size, num_features)?;
350
351        // iterate through each row of samples
352        let drift_array = sample_data
353            .axis_iter(Axis(0))
354            .into_par_iter()
355            .map(|x| {
356                // match AlertRules enum
357
358                let drift =
359                    self.set_control_drift_value(x, num_features, drift_profile, features)?;
360
361                Ok(drift)
362            })
363            .collect::<Result<Vec<_>, DriftError>>()?;
364
365        // check for errors
366
367        // convert drift array to 2D array
368        let drift_array =
369            Array::from_shape_vec((drift_array.len(), num_features), drift_array.concat())?;
370
371        let mut drift_map = SpcDriftMap::new(
372            drift_profile.config.name.clone(),
373            drift_profile.config.space.clone(),
374            drift_profile.config.version.clone(),
375        );
376
377        for (i, feature) in features.iter().enumerate() {
378            let drift = drift_array.column(i);
379            let sample = sample_data.column(i);
380
381            let feature_drift = SpcFeatureDrift {
382                samples: sample.to_vec(),
383                drift: drift.to_vec(),
384            };
385
386            drift_map.add_feature(feature.to_string(), feature_drift);
387        }
388
389        Ok(drift_map)
390    }
391
392    // Samples data for drift detection and returns a vector of DriftServerRecord to send to scouter server
393    //
394    // # Arguments
395    //
396    // * `array` - A 2D array of f64 values
397    // * `features` - A vector of feature names that is mapped to the array (order of features in the order in the array)
398    // * `drift_profile` - A monitor profile
399    //
400    pub fn sample_data<F>(
401        &self,
402        features: &[String],
403        array: &ArrayView2<F>, // n x m data array (features and predictions)
404        drift_profile: &SpcDriftProfile,
405    ) -> Result<ServerRecords, DriftError>
406    where
407        F: Float
408            + Sync
409            + FromPrimitive
410            + Send
411            + Num
412            + Debug
413            + num_traits::Zero
414            + ndarray::ScalarOperand,
415        F: Into<f64>,
416    {
417        let num_features = drift_profile.config.alert_config.features_to_monitor.len();
418
419        // iterate through each feature
420        let sample_data =
421            self._sample_data(array, drift_profile.config.sample_size, num_features)?; // n x m data array (features and predictions)
422
423        let mut records = Vec::new();
424
425        for (i, feature) in features.iter().enumerate() {
426            let sample = sample_data.column(i);
427
428            sample.iter().for_each(|value| {
429                let record = SpcServerRecord::new(
430                    drift_profile.config.space.clone(),
431                    drift_profile.config.name.clone(),
432                    drift_profile.config.version.clone(),
433                    feature.to_string(),
434                    *value,
435                );
436
437                records.push(ServerRecord::Spc(record));
438            });
439        }
440
441        Ok(ServerRecords::new(records))
442    }
443
444    pub fn calculate_drift_from_sample(
445        &self,
446        features: &[String],
447        sample_array: &ArrayView2<f64>, // n x m data array (features and predictions)
448        drift_profile: &SpcDriftProfile,
449    ) -> Result<Array2<f64>, DriftError> {
450        // iterate through each row of samples
451        let num_features = features.len();
452        let drift_array = sample_array
453            .axis_iter(Axis(0))
454            .into_par_iter()
455            .map(|x| {
456                // match AlertRules enum
457
458                let drift =
459                    self.set_control_drift_value(x, num_features, drift_profile, features)?;
460                Ok(drift)
461            })
462            .collect::<Result<Vec<_>, DriftError>>()?;
463
464        // convert drift array to 2D array
465        let drift_array =
466            Array::from_shape_vec((drift_array.len(), num_features), drift_array.concat())?;
467
468        Ok(drift_array)
469    }
470}
471
472// convert drift array to 2D array
473
474impl Default for SpcMonitor {
475    fn default() -> Self {
476        SpcMonitor::new()
477    }
478}
479
480#[cfg(test)]
481mod tests {
482
483    // use crate::core::drift::base::DriftProfile;
484    use scouter_types::drift::DriftProfile;
485    use scouter_types::spc::SpcAlertConfig;
486    use scouter_types::util::ProfileBaseArgs;
487    use scouter_types::DriftType;
488
489    use super::*;
490    use approx::relative_eq;
491    use ndarray::Array;
492    use ndarray_rand::rand_distr::Uniform;
493    use ndarray_rand::RandomExt;
494    #[test]
495    fn test_create_2d_drift_profile_f32() {
496        // create 2d array
497        let array = Array::random((1030, 3), Uniform::new(0., 10.));
498
499        // cast array to f32
500        let array = array.mapv(|x| x as f32);
501
502        let features = vec![
503            "feature_1".to_string(),
504            "feature_2".to_string(),
505            "feature_3".to_string(),
506        ];
507
508        let alert_config = SpcAlertConfig::default();
509        let monitor = SpcMonitor::new();
510        let config = SpcDriftConfig::new(
511            Some("name".to_string()),
512            Some("repo".to_string()),
513            None,
514            None,
515            None,
516            Some(alert_config),
517            None,
518        );
519
520        let profile = monitor
521            .create_2d_drift_profile(&features, &array.view(), &config.unwrap())
522            .unwrap();
523        assert_eq!(profile.features.len(), 3);
524
525        // test extra funcs that are used in python
526        profile.__str__();
527        let model_string = profile.model_dump_json();
528
529        let mut loaded_profile = SpcDriftProfile::model_validate_json(model_string);
530        assert_eq!(loaded_profile.features.len(), 3);
531
532        // test update args
533        loaded_profile
534            .update_config_args(
535                Some("updated".to_string()),
536                Some("updated".to_string()),
537                Some("1.0.0".to_string()),
538                Some(loaded_profile.config.sample),
539                Some(loaded_profile.config.sample_size),
540                Some(loaded_profile.config.alert_config.clone()),
541            )
542            .unwrap();
543
544        assert_eq!(loaded_profile.config.name, "updated");
545        assert_eq!(loaded_profile.config.space, "updated");
546        assert_eq!(loaded_profile.config.version, "1.0.0");
547    }
548
549    #[test]
550    fn test_create_2d_drift_profile_f64() {
551        // create 2d array
552        let array = Array::random((1030, 3), Uniform::new(0., 10.));
553
554        let features = vec![
555            "feature_1".to_string(),
556            "feature_2".to_string(),
557            "feature_3".to_string(),
558        ];
559
560        let monitor = SpcMonitor::new();
561        let alert_config = SpcAlertConfig::default();
562        let config = SpcDriftConfig::new(
563            Some("repo".to_string()),
564            Some("name".to_string()),
565            None,
566            None,
567            None,
568            Some(alert_config),
569            None,
570        );
571
572        let profile = monitor
573            .create_2d_drift_profile(&features, &array.view(), &config.unwrap())
574            .unwrap();
575        assert_eq!(profile.features.len(), 3);
576
577        let args = profile.get_base_args();
578        assert_eq!(args.name, "name");
579        assert_eq!(args.space, "repo");
580        assert_eq!(args.version, "0.1.0");
581        assert_eq!(args.schedule, "0 0 0 * * *");
582
583        let value = profile.to_value();
584
585        // test DriftProfile
586        let profile = DriftProfile::from_value(value).unwrap();
587        let new_args = profile.get_base_args();
588
589        assert_eq!(new_args, args);
590
591        let profile_str = profile.to_value().to_string();
592        DriftProfile::from_str(DriftType::Spc, profile_str).unwrap();
593    }
594
595    #[test]
596    fn test_drift_detect_process() {
597        // create 2d array
598        let array = Array::random((1030, 3), Uniform::new(0., 10.));
599
600        let features = vec![
601            "feature_1".to_string(),
602            "feature_2".to_string(),
603            "feature_3".to_string(),
604        ];
605        let alert_config = SpcAlertConfig::default();
606        let config = SpcDriftConfig::new(
607            Some("name".to_string()),
608            Some("repo".to_string()),
609            None,
610            None,
611            None,
612            Some(alert_config),
613            None,
614        );
615
616        let monitor = SpcMonitor::new();
617
618        let profile = monitor
619            .create_2d_drift_profile(&features, &array.view(), &config.unwrap())
620            .unwrap();
621        assert_eq!(profile.features.len(), 3);
622
623        // change first 100 rows to 100 at index 1
624        let mut array = array.to_owned();
625        array.slice_mut(s![0..200, 1]).fill(100.0);
626
627        let drift_map = monitor
628            .compute_drift(&features, &array.view(), &profile)
629            .unwrap();
630
631        // assert relative
632        let feature_1 = drift_map.features.get("feature_2").unwrap();
633        assert!(relative_eq!(feature_1.samples[0], 100.0, epsilon = 2.0));
634
635        // convert profile to json and load it back
636        let _ = drift_map.model_dump_json();
637
638        // create server records
639    }
640
641    #[test]
642    fn test_sample_data() {
643        // create 2d array
644        let array = Array::random((1030, 3), Uniform::new(0., 10.));
645
646        let features = vec![
647            "feature_1".to_string(),
648            "feature_2".to_string(),
649            "feature_3".to_string(),
650        ];
651        let alert_config = SpcAlertConfig {
652            features_to_monitor: features.clone(),
653            ..Default::default()
654        };
655        let config = SpcDriftConfig::new(
656            Some("name".to_string()),
657            Some("repo".to_string()),
658            None,
659            None,
660            None,
661            Some(alert_config),
662            None,
663        );
664
665        let monitor = SpcMonitor::new();
666
667        let profile = monitor
668            .create_2d_drift_profile(&features, &array.view(), &config.unwrap())
669            .unwrap();
670        assert_eq!(profile.features.len(), 3);
671
672        let server_records = monitor
673            .sample_data(&features, &array.view(), &profile)
674            .unwrap();
675
676        assert_eq!(server_records.records.len(), 126);
677
678        // create server records
679    }
680
681    #[test]
682    fn test_calculate_drift_from_sample() {
683        let array = Array::random((1030, 3), Uniform::new(0., 10.));
684
685        let features = vec![
686            "feature_1".to_string(),
687            "feature_2".to_string(),
688            "feature_3".to_string(),
689        ];
690
691        let alert_config = SpcAlertConfig::default();
692        let config = SpcDriftConfig::new(
693            Some("name".to_string()),
694            Some("repo".to_string()),
695            None,
696            None,
697            None,
698            Some(alert_config),
699            None,
700        );
701
702        let monitor = SpcMonitor::new();
703
704        let profile = monitor
705            .create_2d_drift_profile(&features, &array.view(), &config.unwrap())
706            .unwrap();
707        assert_eq!(profile.features.len(), 3);
708
709        // change first 100 rows to 100 at index 1
710        let mut array = array.to_owned();
711        array.slice_mut(s![0..200, 1]).fill(100.0);
712
713        let drift_array = monitor
714            .calculate_drift_from_sample(&features, &array.view(), &profile)
715            .unwrap();
716
717        // assert relative
718        let feature_1 = drift_array.column(1);
719        assert!(relative_eq!(feature_1[0], 4.0, epsilon = 2.0));
720    }
721}