1use crate::error::DriftError;
2use crate::spc::types::{SpcDriftMap, SpcFeatureDrift};
3use crate::utils::CategoricalFeatureHelpers;
4use chrono::Utc;
5use indicatif::ProgressBar;
6use ndarray::prelude::*;
7use ndarray::Axis;
8use num_traits::{Float, FromPrimitive, Num};
9use rayon::prelude::*;
10use scouter_types::{
11 spc::{SpcDriftConfig, SpcDriftProfile, SpcFeatureDriftProfile},
12 ServerRecord, ServerRecords, SpcServerRecord,
13};
14use std::collections::HashMap;
15use std::fmt::Debug;
16
17pub struct SpcMonitor {}
18
19impl CategoricalFeatureHelpers for SpcMonitor {}
20
21impl SpcMonitor {
22 pub fn new() -> Self {
23 SpcMonitor {}
24 }
25
26 fn compute_c4(&self, number: usize) -> f32 {
36 let n = number as f32;
38 let left = 4.0 * n - 4.0;
39 let right = 4.0 * n - 3.0;
40 left / right
41 }
42
43 fn set_sample_size(&self, shape: usize) -> usize {
53 if shape < 1000 {
54 25
55 } else if (1000..10000).contains(&shape) {
56 100
57 } else if (10000..100000).contains(&shape) {
58 1000
59 } else if (100000..1000000).contains(&shape) {
60 10000
61 } else if shape >= 1000000 {
62 100000
63 } else {
64 25
65 }
66 }
67
68 pub fn compute_array_mean<F>(&self, x: &ArrayView2<F>) -> Result<Array1<F>, DriftError>
78 where
79 F: Float
80 + Sync
81 + FromPrimitive
82 + Send
83 + Num
84 + Debug
85 + num_traits::Zero
86 + ndarray::ScalarOperand,
87 F: Into<f64>,
88 {
89 x.mean_axis(Axis(0)).ok_or(DriftError::ComputeMeanError)
90 }
91
92 fn compute_control_limits<F>(
104 &self,
105 sample_size: usize,
106 sample_data: &ArrayView2<F>,
107 num_features: usize,
108 features: &[String],
109 drift_config: &SpcDriftConfig,
110 ) -> Result<SpcDriftProfile, DriftError>
111 where
112 F: FromPrimitive + Num + Clone + Float + Debug + Sync + Send + ndarray::ScalarOperand,
113
114 F: Into<f64>,
115 {
116 let c4 = self.compute_c4(sample_size);
117 let sample_mean = self.compute_array_mean(sample_data)?;
118
119 let means = sample_mean.slice(s![0..num_features]);
120 let stdev = sample_mean.slice(s![num_features..]);
121 let base = &stdev / F::from(c4).unwrap();
124 let one_sigma = &base * F::from(1.0).unwrap();
125 let two_sigma = &base * F::from(2.0).unwrap();
126 let three_sigma = &base * F::from(3.0).unwrap();
127
128 let one_lcl = &means - &one_sigma;
130 let one_ucl = &means + &one_sigma;
131
132 let two_lcl = &means - &two_sigma;
133 let two_ucl = &means + &two_sigma;
134
135 let three_lcl = &means - &three_sigma;
136 let three_ucl = &means + &three_sigma;
137 let center = &means;
138
139 let mut feat_profile = HashMap::new();
141
142 for (i, feature) in features.iter().enumerate() {
143 feat_profile.insert(
144 feature.to_string(),
145 SpcFeatureDriftProfile {
146 id: feature.to_string(),
147 center: center[i].into(),
148 one_ucl: one_ucl[i].into(),
149 one_lcl: one_lcl[i].into(),
150 two_ucl: two_ucl[i].into(),
151 two_lcl: two_lcl[i].into(),
152 three_ucl: three_ucl[i].into(),
153 three_lcl: three_lcl[i].into(),
154 timestamp: Utc::now(),
155 },
156 );
157 }
158
159 Ok(SpcDriftProfile::new(
160 feat_profile,
161 drift_config.clone(),
162 None,
163 ))
164 }
165
166 pub fn create_2d_drift_profile<F>(
177 &self,
178 features: &[String],
179 array: &ArrayView2<F>,
180 drift_config: &SpcDriftConfig,
181 ) -> Result<SpcDriftProfile, DriftError>
182 where
183 F: Float
184 + Sync
185 + FromPrimitive
186 + Send
187 + Num
188 + Debug
189 + num_traits::Zero
190 + ndarray::ScalarOperand,
191 F: Into<f64>,
192 {
193 let shape = array.shape()[0];
194 let num_features = features.len();
195 let sample_size = self.set_sample_size(shape);
196
197 let nbr_chunks = shape / sample_size;
198 let pb = ProgressBar::new(nbr_chunks as u64);
199
200 let sample_vec = array
202 .axis_chunks_iter(Axis(0), sample_size)
203 .into_par_iter()
204 .map(|x| {
205 let mean = x.mean_axis(Axis(0)).unwrap();
206 let stddev = x.std_axis(Axis(0), F::from(1.0).unwrap());
207
208 let combined = ndarray::concatenate![Axis(0), mean, stddev];
210 pb.inc(1);
212
213 combined.to_vec()
214 })
215 .collect::<Vec<_>>();
216
217 let sample_data =
219 Array::from_shape_vec((sample_vec.len(), features.len() * 2), sample_vec.concat())?;
220
221 let drift_profile = self.compute_control_limits(
222 sample_size,
223 &sample_data.view(),
224 num_features,
225 features,
226 drift_config,
227 )?;
228
229 Ok(drift_profile)
230 }
231
232 fn _sample_data<F>(
243 &self,
244 array: &ArrayView2<F>,
245 sample_size: usize,
246 columns: usize,
247 ) -> Result<Array2<f64>, DriftError>
248 where
249 F: Float
250 + Sync
251 + FromPrimitive
252 + Send
253 + Num
254 + Debug
255 + num_traits::Zero
256 + ndarray::ScalarOperand,
257 F: Into<f64>,
258 {
259 let sample_vec: Vec<Vec<f64>> = array
260 .axis_chunks_iter(Axis(0), sample_size)
261 .into_par_iter()
262 .map(|x| {
263 let mean = x.mean_axis(Axis(0)).unwrap();
264 let mean = mean.mapv(|x| x.into());
266 mean.to_vec()
267 })
268 .collect::<Vec<_>>();
269
270 let sample_data = Array::from_shape_vec((sample_vec.len(), columns), sample_vec.concat())?;
272 Ok(sample_data)
273 }
274
275 pub fn set_control_drift_value(
276 &self,
277 array: ArrayView1<f64>,
278 num_features: usize,
279 drift_profile: &SpcDriftProfile,
280 features: &[String],
281 ) -> Result<Vec<f64>, DriftError> {
282 let mut drift: Vec<f64> = vec![0.0; num_features];
283 for (i, feature) in features.iter().enumerate() {
284 if !drift_profile.features.contains_key(feature) {
286 continue;
287 }
288
289 let feature_profile = drift_profile
290 .features
291 .get(feature)
292 .ok_or(DriftError::FeatureNotExistError)?;
293
294 let value = array[i];
295
296 if value > feature_profile.three_ucl {
297 drift[i] = 4.0;
299 } else if value < feature_profile.three_lcl {
300 drift[i] = -4.0;
301 } else if value < feature_profile.three_ucl && value >= feature_profile.two_ucl {
302 drift[i] = 3.0;
303 } else if value < feature_profile.two_ucl && value >= feature_profile.one_ucl {
304 drift[i] = 2.0;
305 } else if value < feature_profile.one_ucl && value > feature_profile.center {
306 drift[i] = 1.0;
307 } else if value > feature_profile.three_lcl && value <= feature_profile.two_lcl {
308 drift[i] = -3.0;
309 } else if value > feature_profile.two_lcl && value <= feature_profile.one_lcl {
310 drift[i] = -2.0;
311 } else if value > feature_profile.one_lcl && value < feature_profile.center {
312 drift[i] = -1.0;
313 }
314 }
315
316 Ok(drift)
317 }
318
319 pub fn compute_drift<F>(
329 &self,
330 features: &[String],
331 array: &ArrayView2<F>, drift_profile: &SpcDriftProfile,
333 ) -> Result<SpcDriftMap, DriftError>
334 where
335 F: Float
336 + Sync
337 + FromPrimitive
338 + Send
339 + Num
340 + Debug
341 + num_traits::Zero
342 + ndarray::ScalarOperand,
343 F: Into<f64>,
344 {
345 let num_features = drift_profile.features.len();
346
347 let sample_data =
349 self._sample_data(array, drift_profile.config.sample_size, num_features)?;
350
351 let drift_array = sample_data
353 .axis_iter(Axis(0))
354 .into_par_iter()
355 .map(|x| {
356 let drift =
359 self.set_control_drift_value(x, num_features, drift_profile, features)?;
360
361 Ok(drift)
362 })
363 .collect::<Result<Vec<_>, DriftError>>()?;
364
365 let drift_array =
369 Array::from_shape_vec((drift_array.len(), num_features), drift_array.concat())?;
370
371 let mut drift_map = SpcDriftMap::new(
372 drift_profile.config.name.clone(),
373 drift_profile.config.space.clone(),
374 drift_profile.config.version.clone(),
375 );
376
377 for (i, feature) in features.iter().enumerate() {
378 let drift = drift_array.column(i);
379 let sample = sample_data.column(i);
380
381 let feature_drift = SpcFeatureDrift {
382 samples: sample.to_vec(),
383 drift: drift.to_vec(),
384 };
385
386 drift_map.add_feature(feature.to_string(), feature_drift);
387 }
388
389 Ok(drift_map)
390 }
391
392 pub fn sample_data<F>(
401 &self,
402 features: &[String],
403 array: &ArrayView2<F>, drift_profile: &SpcDriftProfile,
405 ) -> Result<ServerRecords, DriftError>
406 where
407 F: Float
408 + Sync
409 + FromPrimitive
410 + Send
411 + Num
412 + Debug
413 + num_traits::Zero
414 + ndarray::ScalarOperand,
415 F: Into<f64>,
416 {
417 let num_features = drift_profile.config.alert_config.features_to_monitor.len();
418
419 let sample_data =
421 self._sample_data(array, drift_profile.config.sample_size, num_features)?; let mut records = Vec::new();
424
425 for (i, feature) in features.iter().enumerate() {
426 let sample = sample_data.column(i);
427
428 sample.iter().for_each(|value| {
429 let record = SpcServerRecord::new(
430 drift_profile.config.space.clone(),
431 drift_profile.config.name.clone(),
432 drift_profile.config.version.clone(),
433 feature.to_string(),
434 *value,
435 );
436
437 records.push(ServerRecord::Spc(record));
438 });
439 }
440
441 Ok(ServerRecords::new(records))
442 }
443
444 pub fn calculate_drift_from_sample(
445 &self,
446 features: &[String],
447 sample_array: &ArrayView2<f64>, drift_profile: &SpcDriftProfile,
449 ) -> Result<Array2<f64>, DriftError> {
450 let num_features = features.len();
452 let drift_array = sample_array
453 .axis_iter(Axis(0))
454 .into_par_iter()
455 .map(|x| {
456 let drift =
459 self.set_control_drift_value(x, num_features, drift_profile, features)?;
460 Ok(drift)
461 })
462 .collect::<Result<Vec<_>, DriftError>>()?;
463
464 let drift_array =
466 Array::from_shape_vec((drift_array.len(), num_features), drift_array.concat())?;
467
468 Ok(drift_array)
469 }
470}
471
472impl Default for SpcMonitor {
475 fn default() -> Self {
476 SpcMonitor::new()
477 }
478}
479
480#[cfg(test)]
481mod tests {
482
483 use scouter_types::drift::DriftProfile;
485 use scouter_types::spc::SpcAlertConfig;
486 use scouter_types::util::ProfileBaseArgs;
487 use scouter_types::DriftType;
488
489 use super::*;
490 use approx::relative_eq;
491 use ndarray::Array;
492 use ndarray_rand::rand_distr::Uniform;
493 use ndarray_rand::RandomExt;
494 #[test]
495 fn test_create_2d_drift_profile_f32() {
496 let array = Array::random((1030, 3), Uniform::new(0., 10.));
498
499 let array = array.mapv(|x| x as f32);
501
502 let features = vec![
503 "feature_1".to_string(),
504 "feature_2".to_string(),
505 "feature_3".to_string(),
506 ];
507
508 let alert_config = SpcAlertConfig::default();
509 let monitor = SpcMonitor::new();
510 let config = SpcDriftConfig::new(
511 Some("name".to_string()),
512 Some("repo".to_string()),
513 None,
514 None,
515 None,
516 Some(alert_config),
517 None,
518 );
519
520 let profile = monitor
521 .create_2d_drift_profile(&features, &array.view(), &config.unwrap())
522 .unwrap();
523 assert_eq!(profile.features.len(), 3);
524
525 profile.__str__();
527 let model_string = profile.model_dump_json();
528
529 let mut loaded_profile = SpcDriftProfile::model_validate_json(model_string);
530 assert_eq!(loaded_profile.features.len(), 3);
531
532 loaded_profile
534 .update_config_args(
535 Some("updated".to_string()),
536 Some("updated".to_string()),
537 Some("1.0.0".to_string()),
538 Some(loaded_profile.config.sample),
539 Some(loaded_profile.config.sample_size),
540 Some(loaded_profile.config.alert_config.clone()),
541 )
542 .unwrap();
543
544 assert_eq!(loaded_profile.config.name, "updated");
545 assert_eq!(loaded_profile.config.space, "updated");
546 assert_eq!(loaded_profile.config.version, "1.0.0");
547 }
548
549 #[test]
550 fn test_create_2d_drift_profile_f64() {
551 let array = Array::random((1030, 3), Uniform::new(0., 10.));
553
554 let features = vec![
555 "feature_1".to_string(),
556 "feature_2".to_string(),
557 "feature_3".to_string(),
558 ];
559
560 let monitor = SpcMonitor::new();
561 let alert_config = SpcAlertConfig::default();
562 let config = SpcDriftConfig::new(
563 Some("repo".to_string()),
564 Some("name".to_string()),
565 None,
566 None,
567 None,
568 Some(alert_config),
569 None,
570 );
571
572 let profile = monitor
573 .create_2d_drift_profile(&features, &array.view(), &config.unwrap())
574 .unwrap();
575 assert_eq!(profile.features.len(), 3);
576
577 let args = profile.get_base_args();
578 assert_eq!(args.name, "name");
579 assert_eq!(args.space, "repo");
580 assert_eq!(args.version, "0.1.0");
581 assert_eq!(args.schedule, "0 0 0 * * *");
582
583 let value = profile.to_value();
584
585 let profile = DriftProfile::from_value(value).unwrap();
587 let new_args = profile.get_base_args();
588
589 assert_eq!(new_args, args);
590
591 let profile_str = profile.to_value().to_string();
592 DriftProfile::from_str(DriftType::Spc, profile_str).unwrap();
593 }
594
595 #[test]
596 fn test_drift_detect_process() {
597 let array = Array::random((1030, 3), Uniform::new(0., 10.));
599
600 let features = vec![
601 "feature_1".to_string(),
602 "feature_2".to_string(),
603 "feature_3".to_string(),
604 ];
605 let alert_config = SpcAlertConfig::default();
606 let config = SpcDriftConfig::new(
607 Some("name".to_string()),
608 Some("repo".to_string()),
609 None,
610 None,
611 None,
612 Some(alert_config),
613 None,
614 );
615
616 let monitor = SpcMonitor::new();
617
618 let profile = monitor
619 .create_2d_drift_profile(&features, &array.view(), &config.unwrap())
620 .unwrap();
621 assert_eq!(profile.features.len(), 3);
622
623 let mut array = array.to_owned();
625 array.slice_mut(s![0..200, 1]).fill(100.0);
626
627 let drift_map = monitor
628 .compute_drift(&features, &array.view(), &profile)
629 .unwrap();
630
631 let feature_1 = drift_map.features.get("feature_2").unwrap();
633 assert!(relative_eq!(feature_1.samples[0], 100.0, epsilon = 2.0));
634
635 let _ = drift_map.model_dump_json();
637
638 }
640
641 #[test]
642 fn test_sample_data() {
643 let array = Array::random((1030, 3), Uniform::new(0., 10.));
645
646 let features = vec![
647 "feature_1".to_string(),
648 "feature_2".to_string(),
649 "feature_3".to_string(),
650 ];
651 let alert_config = SpcAlertConfig {
652 features_to_monitor: features.clone(),
653 ..Default::default()
654 };
655 let config = SpcDriftConfig::new(
656 Some("name".to_string()),
657 Some("repo".to_string()),
658 None,
659 None,
660 None,
661 Some(alert_config),
662 None,
663 );
664
665 let monitor = SpcMonitor::new();
666
667 let profile = monitor
668 .create_2d_drift_profile(&features, &array.view(), &config.unwrap())
669 .unwrap();
670 assert_eq!(profile.features.len(), 3);
671
672 let server_records = monitor
673 .sample_data(&features, &array.view(), &profile)
674 .unwrap();
675
676 assert_eq!(server_records.records.len(), 126);
677
678 }
680
681 #[test]
682 fn test_calculate_drift_from_sample() {
683 let array = Array::random((1030, 3), Uniform::new(0., 10.));
684
685 let features = vec![
686 "feature_1".to_string(),
687 "feature_2".to_string(),
688 "feature_3".to_string(),
689 ];
690
691 let alert_config = SpcAlertConfig::default();
692 let config = SpcDriftConfig::new(
693 Some("name".to_string()),
694 Some("repo".to_string()),
695 None,
696 None,
697 None,
698 Some(alert_config),
699 None,
700 );
701
702 let monitor = SpcMonitor::new();
703
704 let profile = monitor
705 .create_2d_drift_profile(&features, &array.view(), &config.unwrap())
706 .unwrap();
707 assert_eq!(profile.features.len(), 3);
708
709 let mut array = array.to_owned();
711 array.slice_mut(s![0..200, 1]).fill(100.0);
712
713 let drift_array = monitor
714 .calculate_drift_from_sample(&features, &array.view(), &profile)
715 .unwrap();
716
717 let feature_1 = drift_array.column(1);
719 assert!(relative_eq!(feature_1[0], 4.0, epsilon = 2.0));
720 }
721}