1use crate::error::DriftError;
2use crate::spc::types::{SpcDriftMap, SpcFeatureDrift};
3use crate::utils::CategoricalFeatureHelpers;
4use chrono::Utc;
5use indicatif::ProgressBar;
6use ndarray::prelude::*;
7use ndarray::Axis;
8use num_traits::{Float, FromPrimitive, Num};
9use rayon::prelude::*;
10use scouter_types::{
11 spc::{SpcDriftConfig, SpcDriftProfile, SpcFeatureDriftProfile},
12 ServerRecord, ServerRecords, SpcServerRecord,
13};
14use std::collections::HashMap;
15use std::fmt::Debug;
16
17pub struct SpcMonitor {}
18
19impl CategoricalFeatureHelpers for SpcMonitor {}
20
21impl SpcMonitor {
22 pub fn new() -> Self {
23 SpcMonitor {}
24 }
25
26 fn compute_c4(&self, number: usize) -> f32 {
36 let n = number as f32;
38 let left = 4.0 * n - 4.0;
39 let right = 4.0 * n - 3.0;
40 left / right
41 }
42
43 fn set_sample_size(&self, shape: usize) -> usize {
53 if shape < 1000 {
54 25
55 } else if (1000..10000).contains(&shape) {
56 100
57 } else if (10000..100000).contains(&shape) {
58 1000
59 } else if (100000..1000000).contains(&shape) {
60 10000
61 } else if shape >= 1000000 {
62 100000
63 } else {
64 25
65 }
66 }
67
68 pub fn compute_array_mean<F>(&self, x: &ArrayView2<F>) -> Result<Array1<F>, DriftError>
78 where
79 F: Float
80 + Sync
81 + FromPrimitive
82 + Send
83 + Num
84 + Debug
85 + num_traits::Zero
86 + ndarray::ScalarOperand,
87 F: Into<f64>,
88 {
89 x.mean_axis(Axis(0)).ok_or(DriftError::ComputeMeanError)
90 }
91
92 fn compute_control_limits<F>(
104 &self,
105 sample_size: usize,
106 sample_data: &ArrayView2<F>,
107 num_features: usize,
108 features: &[String],
109 drift_config: &SpcDriftConfig,
110 ) -> Result<SpcDriftProfile, DriftError>
111 where
112 F: FromPrimitive + Num + Clone + Float + Debug + Sync + Send + ndarray::ScalarOperand,
113
114 F: Into<f64>,
115 {
116 let c4 = self.compute_c4(sample_size);
117 let sample_mean = self.compute_array_mean(sample_data)?;
118
119 let means = sample_mean.slice(s![0..num_features]);
120 let stdev = sample_mean.slice(s![num_features..]);
121 let base = &stdev / F::from(c4).unwrap();
124 let one_sigma = &base * F::from(1.0).unwrap();
125 let two_sigma = &base * F::from(2.0).unwrap();
126 let three_sigma = &base * F::from(3.0).unwrap();
127
128 let one_lcl = &means - &one_sigma;
130 let one_ucl = &means + &one_sigma;
131
132 let two_lcl = &means - &two_sigma;
133 let two_ucl = &means + &two_sigma;
134
135 let three_lcl = &means - &three_sigma;
136 let three_ucl = &means + &three_sigma;
137 let center = &means;
138
139 let mut feat_profile = HashMap::new();
141
142 for (i, feature) in features.iter().enumerate() {
143 feat_profile.insert(
144 feature.to_string(),
145 SpcFeatureDriftProfile {
146 id: feature.to_string(),
147 center: center[i].into(),
148 one_ucl: one_ucl[i].into(),
149 one_lcl: one_lcl[i].into(),
150 two_ucl: two_ucl[i].into(),
151 two_lcl: two_lcl[i].into(),
152 three_ucl: three_ucl[i].into(),
153 three_lcl: three_lcl[i].into(),
154 timestamp: Utc::now(),
155 },
156 );
157 }
158
159 Ok(SpcDriftProfile::new(feat_profile, drift_config.clone()))
160 }
161
162 pub fn create_2d_drift_profile<F>(
173 &self,
174 features: &[String],
175 array: &ArrayView2<F>,
176 drift_config: &SpcDriftConfig,
177 ) -> Result<SpcDriftProfile, DriftError>
178 where
179 F: Float
180 + Sync
181 + FromPrimitive
182 + Send
183 + Num
184 + Debug
185 + num_traits::Zero
186 + ndarray::ScalarOperand,
187 F: Into<f64>,
188 {
189 let shape = array.shape()[0];
190 let num_features = features.len();
191 let sample_size = self.set_sample_size(shape);
192
193 let nbr_chunks = shape / sample_size;
194 let pb = ProgressBar::new(nbr_chunks as u64);
195
196 let sample_vec = array
198 .axis_chunks_iter(Axis(0), sample_size)
199 .into_par_iter()
200 .map(|x| {
201 let mean = x.mean_axis(Axis(0)).unwrap();
202 let stddev = x.std_axis(Axis(0), F::from(1.0).unwrap());
203
204 let combined = ndarray::concatenate![Axis(0), mean, stddev];
206 pb.inc(1);
208
209 combined.to_vec()
210 })
211 .collect::<Vec<_>>();
212
213 let sample_data =
215 Array::from_shape_vec((sample_vec.len(), features.len() * 2), sample_vec.concat())?;
216
217 let drift_profile = self.compute_control_limits(
218 sample_size,
219 &sample_data.view(),
220 num_features,
221 features,
222 drift_config,
223 )?;
224
225 Ok(drift_profile)
226 }
227
228 fn _sample_data<F>(
239 &self,
240 array: &ArrayView2<F>,
241 sample_size: usize,
242 columns: usize,
243 ) -> Result<Array2<f64>, DriftError>
244 where
245 F: Float
246 + Sync
247 + FromPrimitive
248 + Send
249 + Num
250 + Debug
251 + num_traits::Zero
252 + ndarray::ScalarOperand,
253 F: Into<f64>,
254 {
255 let sample_vec: Vec<Vec<f64>> = array
256 .axis_chunks_iter(Axis(0), sample_size)
257 .into_par_iter()
258 .map(|x| {
259 let mean = x.mean_axis(Axis(0)).unwrap();
260 let mean = mean.mapv(|x| x.into());
262 mean.to_vec()
263 })
264 .collect::<Vec<_>>();
265
266 let sample_data = Array::from_shape_vec((sample_vec.len(), columns), sample_vec.concat())?;
268 Ok(sample_data)
269 }
270
271 pub fn set_control_drift_value(
272 &self,
273 array: ArrayView1<f64>,
274 num_features: usize,
275 drift_profile: &SpcDriftProfile,
276 features: &[String],
277 ) -> Result<Vec<f64>, DriftError> {
278 let mut drift: Vec<f64> = vec![0.0; num_features];
279 for (i, feature) in features.iter().enumerate() {
280 if !drift_profile.features.contains_key(feature) {
282 continue;
283 }
284
285 let feature_profile = drift_profile
286 .features
287 .get(feature)
288 .ok_or(DriftError::FeatureNotExistError)?;
289
290 let value = array[i];
291
292 if value > feature_profile.three_ucl {
293 drift[i] = 4.0;
295 } else if value < feature_profile.three_lcl {
296 drift[i] = -4.0;
297 } else if value < feature_profile.three_ucl && value >= feature_profile.two_ucl {
298 drift[i] = 3.0;
299 } else if value < feature_profile.two_ucl && value >= feature_profile.one_ucl {
300 drift[i] = 2.0;
301 } else if value < feature_profile.one_ucl && value > feature_profile.center {
302 drift[i] = 1.0;
303 } else if value > feature_profile.three_lcl && value <= feature_profile.two_lcl {
304 drift[i] = -3.0;
305 } else if value > feature_profile.two_lcl && value <= feature_profile.one_lcl {
306 drift[i] = -2.0;
307 } else if value > feature_profile.one_lcl && value < feature_profile.center {
308 drift[i] = -1.0;
309 }
310 }
311
312 Ok(drift)
313 }
314
315 pub fn compute_drift<F>(
325 &self,
326 features: &[String],
327 array: &ArrayView2<F>, drift_profile: &SpcDriftProfile,
329 ) -> Result<SpcDriftMap, DriftError>
330 where
331 F: Float
332 + Sync
333 + FromPrimitive
334 + Send
335 + Num
336 + Debug
337 + num_traits::Zero
338 + ndarray::ScalarOperand,
339 F: Into<f64>,
340 {
341 let num_features = drift_profile.features.len();
342
343 let sample_data =
345 self._sample_data(array, drift_profile.config.sample_size, num_features)?;
346
347 let drift_array = sample_data
349 .axis_iter(Axis(0))
350 .into_par_iter()
351 .map(|x| {
352 let drift =
355 self.set_control_drift_value(x, num_features, drift_profile, features)?;
356
357 Ok(drift)
358 })
359 .collect::<Result<Vec<_>, DriftError>>()?;
360
361 let drift_array =
365 Array::from_shape_vec((drift_array.len(), num_features), drift_array.concat())?;
366
367 let mut drift_map = SpcDriftMap::new(
368 drift_profile.config.name.clone(),
369 drift_profile.config.space.clone(),
370 drift_profile.config.version.clone(),
371 );
372
373 for (i, feature) in features.iter().enumerate() {
374 let drift = drift_array.column(i);
375 let sample = sample_data.column(i);
376
377 let feature_drift = SpcFeatureDrift {
378 samples: sample.to_vec(),
379 drift: drift.to_vec(),
380 };
381
382 drift_map.add_feature(feature.to_string(), feature_drift);
383 }
384
385 Ok(drift_map)
386 }
387
388 pub fn sample_data<F>(
397 &self,
398 features: &[String],
399 array: &ArrayView2<F>, drift_profile: &SpcDriftProfile,
401 ) -> Result<ServerRecords, DriftError>
402 where
403 F: Float
404 + Sync
405 + FromPrimitive
406 + Send
407 + Num
408 + Debug
409 + num_traits::Zero
410 + ndarray::ScalarOperand,
411 F: Into<f64>,
412 {
413 let num_features = drift_profile.config.alert_config.features_to_monitor.len();
414
415 let sample_data =
417 self._sample_data(array, drift_profile.config.sample_size, num_features)?; let mut records = Vec::new();
420
421 for (i, feature) in features.iter().enumerate() {
422 let sample = sample_data.column(i);
423
424 sample.iter().for_each(|value| {
425 let record = SpcServerRecord::new(
426 drift_profile.config.space.clone(),
427 drift_profile.config.name.clone(),
428 drift_profile.config.version.clone(),
429 feature.to_string(),
430 *value,
431 );
432
433 records.push(ServerRecord::Spc(record));
434 });
435 }
436
437 Ok(ServerRecords::new(records))
438 }
439
440 pub fn calculate_drift_from_sample(
441 &self,
442 features: &[String],
443 sample_array: &ArrayView2<f64>, drift_profile: &SpcDriftProfile,
445 ) -> Result<Array2<f64>, DriftError> {
446 let num_features = features.len();
448 let drift_array = sample_array
449 .axis_iter(Axis(0))
450 .into_par_iter()
451 .map(|x| {
452 let drift =
455 self.set_control_drift_value(x, num_features, drift_profile, features)?;
456 Ok(drift)
457 })
458 .collect::<Result<Vec<_>, DriftError>>()?;
459
460 let drift_array =
462 Array::from_shape_vec((drift_array.len(), num_features), drift_array.concat())?;
463
464 Ok(drift_array)
465 }
466}
467
468impl Default for SpcMonitor {
471 fn default() -> Self {
472 SpcMonitor::new()
473 }
474}
475
476#[cfg(test)]
477mod tests {
478
479 use scouter_types::drift::DriftProfile;
481 use scouter_types::spc::SpcAlertConfig;
482 use scouter_types::util::ProfileBaseArgs;
483 use scouter_types::DriftType;
484
485 use super::*;
486 use approx::relative_eq;
487 use ndarray::Array;
488 use ndarray_rand::rand_distr::Uniform;
489 use ndarray_rand::RandomExt;
490 #[test]
491 fn test_create_2d_drift_profile_f32() {
492 let array = Array::random((1030, 3), Uniform::new(0., 10.));
494
495 let array = array.mapv(|x| x as f32);
497
498 let features = vec![
499 "feature_1".to_string(),
500 "feature_2".to_string(),
501 "feature_3".to_string(),
502 ];
503
504 let alert_config = SpcAlertConfig::default();
505 let monitor = SpcMonitor::new();
506 let config = SpcDriftConfig::new(
507 Some("name".to_string()),
508 Some("repo".to_string()),
509 None,
510 None,
511 None,
512 Some(alert_config),
513 None,
514 );
515
516 let profile = monitor
517 .create_2d_drift_profile(&features, &array.view(), &config.unwrap())
518 .unwrap();
519 assert_eq!(profile.features.len(), 3);
520
521 profile.__str__();
523 let model_string = profile.model_dump_json();
524
525 let mut loaded_profile = SpcDriftProfile::model_validate_json(model_string);
526 assert_eq!(loaded_profile.features.len(), 3);
527
528 loaded_profile
530 .update_config_args(
531 Some("updated".to_string()),
532 Some("updated".to_string()),
533 Some("1.0.0".to_string()),
534 Some(loaded_profile.config.sample),
535 Some(loaded_profile.config.sample_size),
536 Some(loaded_profile.config.alert_config.clone()),
537 )
538 .unwrap();
539
540 assert_eq!(loaded_profile.config.name, "updated");
541 assert_eq!(loaded_profile.config.space, "updated");
542 assert_eq!(loaded_profile.config.version, "1.0.0");
543 }
544
545 #[test]
546 fn test_create_2d_drift_profile_f64() {
547 let array = Array::random((1030, 3), Uniform::new(0., 10.));
549
550 let features = vec![
551 "feature_1".to_string(),
552 "feature_2".to_string(),
553 "feature_3".to_string(),
554 ];
555
556 let monitor = SpcMonitor::new();
557 let alert_config = SpcAlertConfig::default();
558 let config = SpcDriftConfig::new(
559 Some("repo".to_string()),
560 Some("name".to_string()),
561 None,
562 None,
563 None,
564 Some(alert_config),
565 None,
566 );
567
568 let profile = monitor
569 .create_2d_drift_profile(&features, &array.view(), &config.unwrap())
570 .unwrap();
571 assert_eq!(profile.features.len(), 3);
572
573 let args = profile.get_base_args();
574 assert_eq!(args.name, "name");
575 assert_eq!(args.space, "repo");
576 assert_eq!(args.version, Some("0.1.0".to_string()));
577 assert_eq!(args.schedule, "0 0 0 * * *");
578
579 let value = profile.to_value();
580
581 let profile = DriftProfile::from_value(value).unwrap();
583 let new_args = profile.get_base_args();
584
585 assert_eq!(new_args, args);
586
587 let profile_str = profile.to_value().to_string();
588 DriftProfile::from_str(DriftType::Spc, profile_str).unwrap();
589 }
590
591 #[test]
592 fn test_drift_detect_process() {
593 let array = Array::random((1030, 3), Uniform::new(0., 10.));
595
596 let features = vec![
597 "feature_1".to_string(),
598 "feature_2".to_string(),
599 "feature_3".to_string(),
600 ];
601 let alert_config = SpcAlertConfig::default();
602 let config = SpcDriftConfig::new(
603 Some("name".to_string()),
604 Some("repo".to_string()),
605 None,
606 None,
607 None,
608 Some(alert_config),
609 None,
610 );
611
612 let monitor = SpcMonitor::new();
613
614 let profile = monitor
615 .create_2d_drift_profile(&features, &array.view(), &config.unwrap())
616 .unwrap();
617 assert_eq!(profile.features.len(), 3);
618
619 let mut array = array.to_owned();
621 array.slice_mut(s![0..200, 1]).fill(100.0);
622
623 let drift_map = monitor
624 .compute_drift(&features, &array.view(), &profile)
625 .unwrap();
626
627 let feature_1 = drift_map.features.get("feature_2").unwrap();
629 assert!(relative_eq!(feature_1.samples[0], 100.0, epsilon = 2.0));
630
631 let _ = drift_map.model_dump_json();
633
634 }
636
637 #[test]
638 fn test_sample_data() {
639 let array = Array::random((1030, 3), Uniform::new(0., 10.));
641
642 let features = vec![
643 "feature_1".to_string(),
644 "feature_2".to_string(),
645 "feature_3".to_string(),
646 ];
647 let alert_config = SpcAlertConfig {
648 features_to_monitor: features.clone(),
649 ..Default::default()
650 };
651 let config = SpcDriftConfig::new(
652 Some("name".to_string()),
653 Some("repo".to_string()),
654 None,
655 None,
656 None,
657 Some(alert_config),
658 None,
659 );
660
661 let monitor = SpcMonitor::new();
662
663 let profile = monitor
664 .create_2d_drift_profile(&features, &array.view(), &config.unwrap())
665 .unwrap();
666 assert_eq!(profile.features.len(), 3);
667
668 let server_records = monitor
669 .sample_data(&features, &array.view(), &profile)
670 .unwrap();
671
672 assert_eq!(server_records.records.len(), 126);
673
674 }
676
677 #[test]
678 fn test_calculate_drift_from_sample() {
679 let array = Array::random((1030, 3), Uniform::new(0., 10.));
680
681 let features = vec![
682 "feature_1".to_string(),
683 "feature_2".to_string(),
684 "feature_3".to_string(),
685 ];
686
687 let alert_config = SpcAlertConfig::default();
688 let config = SpcDriftConfig::new(
689 Some("name".to_string()),
690 Some("repo".to_string()),
691 None,
692 None,
693 None,
694 Some(alert_config),
695 None,
696 );
697
698 let monitor = SpcMonitor::new();
699
700 let profile = monitor
701 .create_2d_drift_profile(&features, &array.view(), &config.unwrap())
702 .unwrap();
703 assert_eq!(profile.features.len(), 3);
704
705 let mut array = array.to_owned();
707 array.slice_mut(s![0..200, 1]).fill(100.0);
708
709 let drift_array = monitor
710 .calculate_drift_from_sample(&features, &array.view(), &profile)
711 .unwrap();
712
713 let feature_1 = drift_array.column(1);
715 assert!(relative_eq!(feature_1[0], 4.0, epsilon = 2.0));
716 }
717}