1use crate::error::DriftError;
2use crate::spc::types::{SpcDriftMap, SpcFeatureDrift};
3use crate::utils::CategoricalFeatureHelpers;
4use chrono::Utc;
5use indicatif::ProgressBar;
6use ndarray::prelude::*;
7use ndarray::Axis;
8use num_traits::{Float, FromPrimitive, Num};
9use rayon::prelude::*;
10use scouter_types::{
11 spc::{SpcDriftConfig, SpcDriftProfile, SpcFeatureDriftProfile},
12 ServerRecord, ServerRecords, SpcRecord,
13};
14use std::collections::HashMap;
15use std::fmt::Debug;
16
17pub struct SpcMonitor {}
18
19impl CategoricalFeatureHelpers for SpcMonitor {}
20
21impl SpcMonitor {
22 pub fn new() -> Self {
23 SpcMonitor {}
24 }
25
26 fn compute_c4(&self, number: usize) -> f32 {
36 let n = number as f32;
38 let left = 4.0 * n - 4.0;
39 let right = 4.0 * n - 3.0;
40 left / right
41 }
42
43 fn set_sample_size(&self, shape: usize) -> usize {
53 if shape < 1000 {
54 25
55 } else if (1000..10000).contains(&shape) {
56 100
57 } else if (10000..100000).contains(&shape) {
58 1000
59 } else if (100000..1000000).contains(&shape) {
60 10000
61 } else if shape >= 1000000 {
62 100000
63 } else {
64 25
65 }
66 }
67
68 pub fn compute_array_mean<F>(&self, x: &ArrayView2<F>) -> Result<Array1<F>, DriftError>
78 where
79 F: Float
80 + Sync
81 + FromPrimitive
82 + Send
83 + Num
84 + Debug
85 + num_traits::Zero
86 + ndarray::ScalarOperand,
87 F: Into<f64>,
88 {
89 x.mean_axis(Axis(0)).ok_or(DriftError::ComputeMeanError)
90 }
91
92 fn compute_control_limits<F>(
104 &self,
105 sample_size: usize,
106 sample_data: &ArrayView2<F>,
107 num_features: usize,
108 features: &[String],
109 drift_config: &SpcDriftConfig,
110 ) -> Result<SpcDriftProfile, DriftError>
111 where
112 F: FromPrimitive + Num + Clone + Float + Debug + Sync + Send + ndarray::ScalarOperand,
113
114 F: Into<f64>,
115 {
116 let c4 = self.compute_c4(sample_size);
117 let sample_mean = self.compute_array_mean(sample_data)?;
118
119 let means = sample_mean.slice(s![0..num_features]);
120 let stdev = sample_mean.slice(s![num_features..]);
121 let base = &stdev / F::from(c4).unwrap();
124 let one_sigma = &base * F::from(1.0).unwrap();
125 let two_sigma = &base * F::from(2.0).unwrap();
126 let three_sigma = &base * F::from(3.0).unwrap();
127
128 let one_lcl = &means - &one_sigma;
130 let one_ucl = &means + &one_sigma;
131
132 let two_lcl = &means - &two_sigma;
133 let two_ucl = &means + &two_sigma;
134
135 let three_lcl = &means - &three_sigma;
136 let three_ucl = &means + &three_sigma;
137 let center = &means;
138
139 let mut feat_profile = HashMap::new();
141
142 for (i, feature) in features.iter().enumerate() {
143 feat_profile.insert(
144 feature.to_string(),
145 SpcFeatureDriftProfile {
146 id: feature.to_string(),
147 center: center[i].into(),
148 one_ucl: one_ucl[i].into(),
149 one_lcl: one_lcl[i].into(),
150 two_ucl: two_ucl[i].into(),
151 two_lcl: two_lcl[i].into(),
152 three_ucl: three_ucl[i].into(),
153 three_lcl: three_lcl[i].into(),
154 timestamp: Utc::now(),
155 },
156 );
157 }
158
159 Ok(SpcDriftProfile::new(feat_profile, drift_config.clone()))
160 }
161
162 pub fn create_2d_drift_profile<F>(
173 &self,
174 features: &[String],
175 array: &ArrayView2<F>,
176 drift_config: &SpcDriftConfig,
177 ) -> Result<SpcDriftProfile, DriftError>
178 where
179 F: Float
180 + Sync
181 + FromPrimitive
182 + Send
183 + Num
184 + Debug
185 + num_traits::Zero
186 + ndarray::ScalarOperand,
187 F: Into<f64>,
188 {
189 let shape = array.shape()[0];
190 let num_features = features.len();
191 let sample_size = self.set_sample_size(shape);
192
193 let nbr_chunks = shape / sample_size;
194 let pb = ProgressBar::new(nbr_chunks as u64);
195
196 let sample_vec = array
198 .axis_chunks_iter(Axis(0), sample_size)
199 .into_par_iter()
200 .map(|x| {
201 let mean = x.mean_axis(Axis(0)).unwrap();
202 let stddev = x.std_axis(Axis(0), F::from(1.0).unwrap());
203
204 let combined = ndarray::concatenate![Axis(0), mean, stddev];
206 pb.inc(1);
208
209 combined.to_vec()
210 })
211 .collect::<Vec<_>>();
212
213 let sample_data =
215 Array::from_shape_vec((sample_vec.len(), features.len() * 2), sample_vec.concat())?;
216
217 let drift_profile = self.compute_control_limits(
218 sample_size,
219 &sample_data.view(),
220 num_features,
221 features,
222 drift_config,
223 )?;
224
225 Ok(drift_profile)
226 }
227
228 fn _sample_data<F>(
239 &self,
240 array: &ArrayView2<F>,
241 sample_size: usize,
242 columns: usize,
243 ) -> Result<Array2<f64>, DriftError>
244 where
245 F: Float
246 + Sync
247 + FromPrimitive
248 + Send
249 + Num
250 + Debug
251 + num_traits::Zero
252 + ndarray::ScalarOperand,
253 F: Into<f64>,
254 {
255 let sample_vec: Vec<Vec<f64>> = array
256 .axis_chunks_iter(Axis(0), sample_size)
257 .into_par_iter()
258 .map(|x| {
259 let mean = x.mean_axis(Axis(0)).unwrap();
260 let mean = mean.mapv(|x| x.into());
262 mean.to_vec()
263 })
264 .collect::<Vec<_>>();
265
266 let sample_data = Array::from_shape_vec((sample_vec.len(), columns), sample_vec.concat())?;
268 Ok(sample_data)
269 }
270
271 pub fn set_control_drift_value(
272 &self,
273 array: ArrayView1<f64>,
274 num_features: usize,
275 drift_profile: &SpcDriftProfile,
276 features: &[String],
277 ) -> Result<Vec<f64>, DriftError> {
278 let mut drift: Vec<f64> = vec![0.0; num_features];
279 for (i, feature) in features.iter().enumerate() {
280 if !drift_profile.features.contains_key(feature) {
282 continue;
283 }
284
285 let feature_profile = drift_profile
286 .features
287 .get(feature)
288 .ok_or(DriftError::FeatureNotExistError)?;
289
290 let value = array[i];
291
292 if value > feature_profile.three_ucl {
293 drift[i] = 4.0;
295 } else if value < feature_profile.three_lcl {
296 drift[i] = -4.0;
297 } else if value < feature_profile.three_ucl && value >= feature_profile.two_ucl {
298 drift[i] = 3.0;
299 } else if value < feature_profile.two_ucl && value >= feature_profile.one_ucl {
300 drift[i] = 2.0;
301 } else if value < feature_profile.one_ucl && value > feature_profile.center {
302 drift[i] = 1.0;
303 } else if value > feature_profile.three_lcl && value <= feature_profile.two_lcl {
304 drift[i] = -3.0;
305 } else if value > feature_profile.two_lcl && value <= feature_profile.one_lcl {
306 drift[i] = -2.0;
307 } else if value > feature_profile.one_lcl && value < feature_profile.center {
308 drift[i] = -1.0;
309 }
310 }
311
312 Ok(drift)
313 }
314
315 pub fn compute_drift<F>(
325 &self,
326 features: &[String],
327 array: &ArrayView2<F>, drift_profile: &SpcDriftProfile,
329 ) -> Result<SpcDriftMap, DriftError>
330 where
331 F: Float
332 + Sync
333 + FromPrimitive
334 + Send
335 + Num
336 + Debug
337 + num_traits::Zero
338 + ndarray::ScalarOperand,
339 F: Into<f64>,
340 {
341 let num_features = drift_profile.features.len();
342
343 let sample_data =
345 self._sample_data(array, drift_profile.config.sample_size, num_features)?;
346
347 let drift_array = sample_data
349 .axis_iter(Axis(0))
350 .into_par_iter()
351 .map(|x| {
352 let drift =
355 self.set_control_drift_value(x, num_features, drift_profile, features)?;
356
357 Ok(drift)
358 })
359 .collect::<Result<Vec<_>, DriftError>>()?;
360
361 let drift_array =
365 Array::from_shape_vec((drift_array.len(), num_features), drift_array.concat())?;
366
367 let mut drift_map = SpcDriftMap::new(
368 drift_profile.config.name.clone(),
369 drift_profile.config.space.clone(),
370 drift_profile.config.version.clone(),
371 );
372
373 for (i, feature) in features.iter().enumerate() {
374 let drift = drift_array.column(i);
375 let sample = sample_data.column(i);
376
377 let feature_drift = SpcFeatureDrift {
378 samples: sample.to_vec(),
379 drift: drift.to_vec(),
380 };
381
382 drift_map.add_feature(feature.to_string(), feature_drift);
383 }
384
385 Ok(drift_map)
386 }
387
388 pub fn sample_data<F>(
397 &self,
398 features: &[String],
399 array: &ArrayView2<F>, drift_profile: &SpcDriftProfile,
401 ) -> Result<ServerRecords, DriftError>
402 where
403 F: Float
404 + Sync
405 + FromPrimitive
406 + Send
407 + Num
408 + Debug
409 + num_traits::Zero
410 + ndarray::ScalarOperand,
411 F: Into<f64>,
412 {
413 let num_features = features.len();
414
415 let sample_data =
417 self._sample_data(array, drift_profile.config.sample_size, num_features)?; let mut records = Vec::new();
420
421 for (i, feature) in features.iter().enumerate() {
422 if !drift_profile.features.contains_key(feature) {
423 continue;
424 }
425
426 let sample = sample_data.column(i);
427
428 sample.iter().for_each(|value| {
429 let record = SpcRecord::new(
430 drift_profile.config.uid.clone(),
431 feature.to_string(),
432 *value,
433 );
434
435 records.push(ServerRecord::Spc(record));
436 });
437 }
438 Ok(ServerRecords::new(records))
439 }
440
441 pub fn calculate_drift_from_sample(
442 &self,
443 features: &[String],
444 sample_array: &ArrayView2<f64>, drift_profile: &SpcDriftProfile,
446 ) -> Result<Array2<f64>, DriftError> {
447 let num_features = features.len();
449 let drift_array = sample_array
450 .axis_iter(Axis(0))
451 .into_par_iter()
452 .map(|x| {
453 let drift =
456 self.set_control_drift_value(x, num_features, drift_profile, features)?;
457 Ok(drift)
458 })
459 .collect::<Result<Vec<_>, DriftError>>()?;
460
461 let drift_array =
463 Array::from_shape_vec((drift_array.len(), num_features), drift_array.concat())?;
464
465 Ok(drift_array)
466 }
467}
468
469impl Default for SpcMonitor {
472 fn default() -> Self {
473 SpcMonitor::new()
474 }
475}
476
477#[cfg(test)]
478mod tests {
479
480 use scouter_types::drift::DriftProfile;
482 use scouter_types::spc::SpcAlertConfig;
483 use scouter_types::util::ProfileBaseArgs;
484 use scouter_types::DriftType;
485
486 use super::*;
487 use approx::relative_eq;
488 use ndarray::Array;
489 use ndarray_rand::rand_distr::Uniform;
490 use ndarray_rand::RandomExt;
491 #[test]
492 fn test_create_2d_drift_profile_f32() {
493 let array = Array::random((1030, 3), Uniform::new(0., 10.).unwrap());
495
496 let array = array.mapv(|x| x as f32);
498
499 let features = vec![
500 "feature_1".to_string(),
501 "feature_2".to_string(),
502 "feature_3".to_string(),
503 ];
504
505 let alert_config = SpcAlertConfig::default();
506 let monitor = SpcMonitor::new();
507 let config = SpcDriftConfig::new(
508 "space",
509 "name",
510 "1.0.0",
511 None,
512 None,
513 Some(alert_config),
514 None,
515 );
516
517 let profile = monitor
518 .create_2d_drift_profile(&features, &array.view(), &config.unwrap())
519 .unwrap();
520 assert_eq!(profile.features.len(), 3);
521
522 profile.__str__();
524 let model_string = profile.model_dump_json();
525
526 let mut loaded_profile = SpcDriftProfile::model_validate_json(model_string);
527 assert_eq!(loaded_profile.features.len(), 3);
528
529 loaded_profile
531 .update_config_args(
532 Some("updated".to_string()),
533 Some("updated".to_string()),
534 Some("1.0.0".to_string()),
535 Some(loaded_profile.config.uid.clone()),
536 Some(loaded_profile.config.sample),
537 Some(loaded_profile.config.sample_size),
538 Some(loaded_profile.config.alert_config.clone()),
539 )
540 .unwrap();
541
542 assert_eq!(loaded_profile.config.name, "updated");
543 assert_eq!(loaded_profile.config.space, "updated");
544 assert_eq!(loaded_profile.config.version, "1.0.0");
545 }
546
547 #[test]
548 fn test_create_2d_drift_profile_f64() {
549 let array = Array::random((1030, 3), Uniform::new(0., 10.).unwrap());
551
552 let features = vec![
553 "feature_1".to_string(),
554 "feature_2".to_string(),
555 "feature_3".to_string(),
556 ];
557
558 let monitor = SpcMonitor::new();
559 let alert_config = SpcAlertConfig::default();
560 let config = SpcDriftConfig::new(
561 "space",
562 "name",
563 "1.0.0",
564 None,
565 None,
566 Some(alert_config),
567 None,
568 );
569
570 let profile = monitor
571 .create_2d_drift_profile(&features, &array.view(), &config.unwrap())
572 .unwrap();
573 assert_eq!(profile.features.len(), 3);
574
575 let args = profile.get_base_args();
576 assert_eq!(args.name, "name");
577 assert_eq!(args.space, "space");
578 assert_eq!(args.version, Some("1.0.0".to_string()));
579 assert_eq!(args.schedule, "0 0 0 * * *");
580
581 let value = profile.to_value();
582
583 let profile = DriftProfile::from_value(value).unwrap();
585 let new_args = profile.get_base_args();
586
587 assert_eq!(new_args, args);
588
589 let profile_str = profile.to_value().to_string();
590 DriftProfile::from_str(&DriftType::Spc, &profile_str).unwrap();
591 }
592
593 #[test]
594 fn test_drift_detect_process() {
595 let array = Array::random((1030, 3), Uniform::new(0., 10.).unwrap());
597
598 let features = vec![
599 "feature_1".to_string(),
600 "feature_2".to_string(),
601 "feature_3".to_string(),
602 ];
603 let alert_config = SpcAlertConfig::default();
604 let config = SpcDriftConfig::new(
605 "space",
606 "name",
607 "1.0.0",
608 None,
609 None,
610 Some(alert_config),
611 None,
612 );
613
614 let monitor = SpcMonitor::new();
615
616 let profile = monitor
617 .create_2d_drift_profile(&features, &array.view(), &config.unwrap())
618 .unwrap();
619 assert_eq!(profile.features.len(), 3);
620
621 let mut array = array.to_owned();
623 array.slice_mut(s![0..200, 1]).fill(100.0);
624
625 let drift_map = monitor
626 .compute_drift(&features, &array.view(), &profile)
627 .unwrap();
628
629 let feature_1 = drift_map.features.get("feature_2").unwrap();
631 assert!(relative_eq!(feature_1.samples[0], 100.0, epsilon = 2.0));
632
633 let _ = drift_map.model_dump_json();
635
636 }
638
639 #[test]
640 fn test_sample_data() {
641 let array = Array::random((1030, 3), Uniform::new(0., 10.).unwrap());
643
644 let features = vec![
645 "feature_1".to_string(),
646 "feature_2".to_string(),
647 "feature_3".to_string(),
648 ];
649 let alert_config = SpcAlertConfig {
650 features_to_monitor: features.clone(),
651 ..Default::default()
652 };
653 let config = SpcDriftConfig::new(
654 "space",
655 "name",
656 "1.0.0",
657 None,
658 None,
659 Some(alert_config),
660 None,
661 );
662
663 let monitor = SpcMonitor::new();
664
665 let profile = monitor
666 .create_2d_drift_profile(&features, &array.view(), &config.unwrap())
667 .unwrap();
668 assert_eq!(profile.features.len(), 3);
669
670 let server_records = monitor
671 .sample_data(&features, &array.view(), &profile)
672 .unwrap();
673
674 assert_eq!(server_records.records.len(), 126);
675
676 }
678
679 #[test]
680 fn test_calculate_drift_from_sample() {
681 let array = Array::random((1030, 3), Uniform::new(0., 10.).unwrap());
682
683 let features = vec![
684 "feature_1".to_string(),
685 "feature_2".to_string(),
686 "feature_3".to_string(),
687 ];
688
689 let alert_config = SpcAlertConfig::default();
690 let config = SpcDriftConfig::new(
691 "space",
692 "name",
693 "1.0.0",
694 None,
695 None,
696 Some(alert_config),
697 None,
698 );
699
700 let monitor = SpcMonitor::new();
701
702 let profile = monitor
703 .create_2d_drift_profile(&features, &array.view(), &config.unwrap())
704 .unwrap();
705 assert_eq!(profile.features.len(), 3);
706
707 let mut array = array.to_owned();
709 array.slice_mut(s![0..200, 1]).fill(100.0);
710
711 let drift_array = monitor
712 .calculate_drift_from_sample(&features, &array.view(), &profile)
713 .unwrap();
714
715 let feature_1 = drift_array.column(1);
717 assert!(relative_eq!(feature_1[0], 4.0, epsilon = 2.0));
718 }
719}