scouter_events/queue/psi/
feature_queue.rs

1use crate::error::FeatureQueueError;
2use crate::queue::traits::FeatureQueue;
3use core::result::Result::Ok;
4use scouter_drift::psi::monitor::PsiMonitor;
5use scouter_types::{
6    psi::{Bin, BinType, PsiDriftProfile},
7    Feature, MessageRecord, PsiServerRecord, QueueExt, ServerRecord, ServerRecords,
8};
9use std::collections::HashMap;
10use tracing::{debug, error, info, instrument};
11
12pub struct PsiFeatureQueue {
13    pub drift_profile: PsiDriftProfile,
14    pub empty_queue: HashMap<String, HashMap<usize, usize>>,
15    pub monitor: PsiMonitor,
16    pub feature_names: Vec<String>,
17}
18
19impl PsiFeatureQueue {
20    #[instrument(skip_all)]
21    fn find_numeric_bin_given_scaler(
22        value: f64,
23        bins: &[Bin],
24    ) -> Result<&usize, FeatureQueueError> {
25        let bin = bins
26            .iter()
27            .find(|bin| value > bin.lower_limit.unwrap() && value <= bin.upper_limit.unwrap())
28            .map(|bin| &bin.id);
29
30        match bin {
31            Some(bin) => Ok(bin),
32            None => {
33                error!("Failed to find bin for value: {}", value);
34                Err(FeatureQueueError::GetBinError)
35            }
36        }
37    }
38
39    #[instrument(skip_all)]
40    fn process_numeric_queue(
41        queue: &mut HashMap<usize, usize>,
42        value: f64,
43        bins: &[Bin],
44    ) -> Result<(), FeatureQueueError> {
45        let bin_id = Self::find_numeric_bin_given_scaler(value, bins)?;
46        let count = queue
47            .get_mut(bin_id)
48            .ok_or(FeatureQueueError::GetBinError)
49            .map_err(|e| {
50                error!("Error processing numeric queue: {:?}", e);
51                e
52            })?;
53        *count += 1;
54
55        Ok(())
56    }
57
58    #[instrument(skip_all)]
59    fn process_categorical_queue(
60        queue: &mut HashMap<usize, usize>,
61        value: &usize,
62    ) -> Result<(), FeatureQueueError> {
63        let count = queue
64            .get_mut(value)
65            .ok_or(FeatureQueueError::GetBinError)
66            .inspect_err(|e| {
67                error!("Error processing categorical queue: {:?}", e);
68            })?;
69        *count += 1;
70        Ok(())
71    }
72
73    pub fn new(drift_profile: PsiDriftProfile) -> Self {
74        let features_to_monitor = drift_profile
75            .config
76            .alert_config
77            .features_to_monitor
78            .clone();
79
80        let empty_queue: HashMap<String, HashMap<usize, usize>> = drift_profile
81            .features
82            .iter()
83            .filter(|(feature_name, _)| features_to_monitor.contains(feature_name))
84            .map(|(feature_name, feature_drift_profile)| {
85                let inner_map: HashMap<usize, usize> = feature_drift_profile
86                    .bins
87                    .iter()
88                    .map(|bin| (bin.id, 0))
89                    .collect();
90                (feature_name.clone(), inner_map)
91            })
92            .collect();
93
94        let feature_names = empty_queue.keys().cloned().collect();
95
96        PsiFeatureQueue {
97            drift_profile,
98            empty_queue,
99            monitor: PsiMonitor::new(),
100            feature_names,
101        }
102    }
103
104    #[instrument(skip_all, name = "insert_psi")]
105    pub fn insert(
106        &self,
107        features: &[Feature],
108        queue: &mut HashMap<String, HashMap<usize, usize>>,
109    ) -> Result<(), FeatureQueueError> {
110        let feat_map = &self.drift_profile.config.feature_map;
111        for feature in features.iter() {
112            if let Some(feature_drift_profile) = self.drift_profile.features.get(feature.name()) {
113                let name = feature.name().to_string();
114
115                // if feature not in features_to_monitor, skip
116                if !self.feature_names.contains(&name) {
117                    error!(
118                        "Feature {} not in features to monitor, skipping",
119                        feature.name()
120                    );
121                    continue;
122                }
123
124                let bins = &feature_drift_profile.bins;
125                let queue = queue
126                    .get_mut(&name)
127                    .ok_or(FeatureQueueError::GetFeatureError)?;
128
129                match feature_drift_profile.bin_type {
130                    BinType::Numeric => {
131                        let value = feature.to_float(feat_map).map_err(|e| {
132                            error!("Error converting feature to float: {:?}", e);
133                            FeatureQueueError::InvalidValueError(
134                                feature.name().to_string(),
135                                e.to_string(),
136                            )
137                        })?;
138
139                        if !value.is_finite() {
140                            info!(
141                                "Non finite value detected for {}, value will not be inserted into queue",
142                                feature.name()
143                            );
144                            continue;
145                        }
146
147                        Self::process_numeric_queue(queue, value, bins)?
148                    }
149                    BinType::Category => {
150                        let value = feature.to_usize(feat_map).map_err(|e| {
151                            error!("Error converting feature to usize: {:?}", e);
152                            FeatureQueueError::InvalidValueError(
153                                feature.name().to_string(),
154                                e.to_string(),
155                            )
156                        })?;
157
158                        Self::process_categorical_queue(queue, &value)?
159                    }
160                }
161            }
162        }
163        Ok(())
164    }
165
166    #[instrument(skip_all)]
167    pub fn create_drift_records(
168        &self,
169        queue: HashMap<String, HashMap<usize, usize>>,
170    ) -> Result<ServerRecords, FeatureQueueError> {
171        // filter out any feature thats not in features_to_monitor
172        // Keep feature if any value in the bin map is greater than 0
173
174        let filtered_queue = queue
175            .iter()
176            .filter(|(_, bin_map)| bin_map.iter().any(|(_, count)| *count > 0))
177            .collect::<HashMap<_, _>>();
178
179        debug!("Filtered queue count: {:?}", filtered_queue.len());
180
181        let records = filtered_queue
182            .iter()
183            .flat_map(|(feature_name, bin_map)| {
184                bin_map.iter().map(move |(bin_id, count)| {
185                    ServerRecord::Psi(PsiServerRecord::new(
186                        self.drift_profile.config.space.clone(),
187                        self.drift_profile.config.name.clone(),
188                        self.drift_profile.config.version.clone(),
189                        feature_name.to_string(),
190                        *bin_id,
191                        *count,
192                    ))
193                })
194            })
195            .collect::<Vec<ServerRecord>>();
196
197        Ok(ServerRecords::new(records))
198    }
199}
200
201impl FeatureQueue for PsiFeatureQueue {
202    fn create_drift_records_from_batch<T: QueueExt>(
203        &self,
204        batch: Vec<T>,
205    ) -> Result<MessageRecord, FeatureQueueError> {
206        // clones the empty map (so we don't need to recreate it on each call)
207        let mut queue = self.empty_queue.clone();
208
209        for elem in batch {
210            self.insert(elem.features(), &mut queue)?;
211        }
212
213        Ok(MessageRecord::ServerRecords(
214            self.create_drift_records(queue)?,
215        ))
216    }
217}
218
219#[cfg(test)]
220mod tests {
221
222    use super::*;
223    use ndarray::{Array, Axis};
224    use ndarray_rand::rand::distributions::Bernoulli;
225    use ndarray_rand::rand_distr::Uniform;
226    use ndarray_rand::RandomExt;
227    use scouter_drift::utils::CategoricalFeatureHelpers;
228    use scouter_types::psi::PsiAlertConfig;
229    use scouter_types::psi::PsiDriftConfig;
230    use scouter_types::EntityType;
231    use scouter_types::{Features, DEFAULT_VERSION};
232
233    #[test]
234    fn test_feature_queue_insert_numeric() {
235        let min = 1.0;
236        let max = 87.0;
237        let mut array = Array::random((1030, 3), Uniform::new(min, max));
238
239        // Ensure that each column has at least one `1.0` and one `87.0`
240        for col in 0..3 {
241            array[[0, col]] = min;
242            array[[1, col]] = max;
243        }
244
245        let features = vec![
246            "feature_1".to_string(),
247            "feature_2".to_string(),
248            "feature_3".to_string(),
249        ];
250
251        let monitor = PsiMonitor::new();
252
253        let alert_config = PsiAlertConfig {
254            features_to_monitor: features.clone(),
255            ..Default::default()
256        };
257
258        let config = PsiDriftConfig {
259            space: "name".to_string(),
260            name: "repo".to_string(),
261            version: DEFAULT_VERSION.to_string(),
262            alert_config,
263            ..Default::default()
264        };
265
266        let profile = monitor
267            .create_2d_drift_profile(&features, &array.view(), &config)
268            .unwrap();
269        assert_eq!(profile.features.len(), 3);
270
271        let feature_queue = PsiFeatureQueue::new(profile);
272
273        assert_eq!(feature_queue.empty_queue.len(), 3);
274
275        let mut batch_features = Vec::new();
276        for _ in 0..9 {
277            let one = Feature::float("feature_1".to_string(), min);
278            let two = Feature::float("feature_2".to_string(), min);
279            let three = Feature::float("feature_3".to_string(), max);
280
281            let features = Features {
282                features: vec![one, two, three],
283                entity_type: EntityType::Feature,
284            };
285
286            batch_features.push(features);
287        }
288
289        let mut queue = feature_queue.empty_queue.clone();
290        for feature in batch_features {
291            feature_queue.insert(&feature.features, &mut queue).unwrap();
292        }
293
294        assert_eq!(*queue.get("feature_1").unwrap().get(&1).unwrap(), 9);
295        assert_eq!(*queue.get("feature_2").unwrap().get(&1).unwrap(), 9);
296        assert_eq!(*queue.get("feature_3").unwrap().get(&10).unwrap(), 9);
297    }
298
299    #[test]
300    fn test_feature_queue_insert_numeric_categorical() {
301        let numeric_cat_column =
302            Array::random((100, 1), Bernoulli::new(0.5).unwrap())
303                .mapv(|x| if x { 1.0 } else { 0.0 });
304        let uniform_column = Array::random((100, 1), Uniform::new(0.0, 20.0));
305        let array = ndarray::concatenate![Axis(1), numeric_cat_column, uniform_column];
306
307        let features = vec!["feature_1".to_string(), "feature_2".to_string()];
308
309        let monitor = PsiMonitor::new();
310
311        let drift_config = PsiDriftConfig {
312            categorical_features: Some(features.clone()),
313            ..Default::default()
314        };
315
316        let mut profile = monitor
317            .create_2d_drift_profile(&features, &array.view(), &drift_config)
318            .unwrap();
319        profile.config.alert_config.features_to_monitor = features.clone();
320
321        assert_eq!(profile.features.len(), 2);
322
323        let feature_queue = PsiFeatureQueue::new(profile);
324
325        assert_eq!(feature_queue.empty_queue.len(), 2);
326
327        let mut batch_features = Vec::new();
328        for _ in 0..9 {
329            let one = Feature::float("feature_1".to_string(), 0.0);
330            let two = Feature::float("feature_2".to_string(), 1.0);
331
332            let features = Features {
333                features: vec![one, two],
334                entity_type: EntityType::Feature,
335            };
336
337            batch_features.push(features);
338        }
339
340        let mut queue = feature_queue.empty_queue.clone();
341        for feature in batch_features {
342            feature_queue.insert(&feature.features, &mut queue).unwrap();
343        }
344
345        assert_eq!(*queue.get("feature_1").unwrap().get(&0).unwrap(), 9);
346        assert_eq!(*queue.get("feature_2").unwrap().get(&1).unwrap(), 9);
347    }
348
349    #[test]
350    fn test_feature_queue_insert_categorical() {
351        let psi_monitor = PsiMonitor::default();
352        let string_vec = vec![
353            vec![
354                "a".to_string(),
355                "b".to_string(),
356                "c".to_string(),
357                "d".to_string(),
358                "e".to_string(),
359            ],
360            vec![
361                "a".to_string(),
362                "a".to_string(),
363                "a".to_string(),
364                "b".to_string(),
365                "b".to_string(),
366            ],
367        ];
368
369        let string_features = vec!["feature_1".to_string(), "feature_2".to_string()];
370
371        let feature_map = psi_monitor
372            .create_feature_map(&string_features, &string_vec)
373            .unwrap();
374
375        assert_eq!(feature_map.features.len(), 2);
376
377        let mut config = PsiDriftConfig {
378            feature_map: feature_map.clone(),
379            categorical_features: Some(string_features.clone()),
380            ..Default::default()
381        };
382
383        config.alert_config.features_to_monitor =
384            vec!["feature_1".to_string(), "feature_2".to_string()];
385
386        let array = psi_monitor
387            .convert_strings_to_ndarray_f64(&string_features, &string_vec, &feature_map)
388            .unwrap();
389
390        assert_eq!(array.shape(), &[5, 2]);
391
392        let profile = psi_monitor
393            .create_2d_drift_profile(&string_features, &array.view(), &config)
394            .unwrap();
395        assert_eq!(profile.features.len(), 2);
396
397        let feature_queue = PsiFeatureQueue::new(profile);
398
399        assert_eq!(feature_queue.empty_queue.len(), 2);
400
401        let mut batch_features = Vec::new();
402        for _ in 0..9 {
403            let one = Feature::string("feature_1".to_string(), "c".to_string());
404            let two = Feature::string("feature_2".to_string(), "a".to_string());
405
406            let features = Features {
407                features: vec![one, two],
408                entity_type: EntityType::Feature,
409            };
410            batch_features.push(features);
411        }
412
413        let mut queue = feature_queue.empty_queue.clone();
414        for feature in batch_features {
415            feature_queue.insert(&feature.features, &mut queue).unwrap();
416        }
417
418        assert_eq!(*queue.get("feature_1").unwrap().get(&2).unwrap(), 9);
419        assert_eq!(*queue.get("feature_2").unwrap().get(&0).unwrap(), 9);
420    }
421
422    #[test]
423    fn test_feature_queue_is_empty() {
424        let psi_monitor = PsiMonitor::default();
425        let string_vec = vec![
426            vec![
427                "a".to_string(),
428                "b".to_string(),
429                "c".to_string(),
430                "d".to_string(),
431                "e".to_string(),
432            ],
433            vec![
434                "a".to_string(),
435                "a".to_string(),
436                "a".to_string(),
437                "b".to_string(),
438                "b".to_string(),
439            ],
440        ];
441
442        let string_features = vec!["feature_1".to_string(), "feature_2".to_string()];
443
444        let feature_map = psi_monitor
445            .create_feature_map(&string_features, &string_vec)
446            .unwrap();
447
448        assert_eq!(feature_map.features.len(), 2);
449
450        let array = psi_monitor
451            .convert_strings_to_ndarray_f64(&string_features, &string_vec, &feature_map)
452            .unwrap();
453
454        assert_eq!(array.shape(), &[5, 2]);
455
456        let mut config = PsiDriftConfig {
457            feature_map,
458            ..Default::default()
459        };
460
461        config.alert_config.features_to_monitor =
462            vec!["feature_1".to_string(), "feature_2".to_string()];
463
464        let profile = psi_monitor
465            .create_2d_drift_profile(&string_features, &array.view(), &config)
466            .unwrap();
467        assert_eq!(profile.features.len(), 2);
468
469        let feature_queue = PsiFeatureQueue::new(profile);
470
471        assert_eq!(feature_queue.empty_queue.len(), 2);
472
473        let mut batch_features = Vec::new();
474        for _ in 0..9 {
475            let one = Feature::string("feature_1".to_string(), "c".to_string());
476            let two = Feature::string("feature_2".to_string(), "a".to_string());
477
478            let features = Features {
479                features: vec![one, two],
480                entity_type: EntityType::Feature,
481            };
482
483            batch_features.push(features);
484        }
485
486        let mut queue = feature_queue.empty_queue.clone();
487        for feature in batch_features {
488            feature_queue.insert(&feature.features, &mut queue).unwrap();
489        }
490
491        let is_empty = !queue
492            .values()
493            .any(|bin_map| bin_map.values().any(|count| *count > 0));
494
495        assert_eq!(is_empty as u8, 0);
496    }
497
498    #[test]
499    fn test_feature_queue_create_drift_records() {
500        let array = Array::random((1030, 3), Uniform::new(1.0, 100.0));
501        let features = vec![
502            "feature_1".to_string(),
503            "feature_2".to_string(),
504            "feature_3".to_string(),
505        ];
506
507        let monitor = PsiMonitor::new();
508
509        let mut profile = monitor
510            .create_2d_drift_profile(&features, &array.view(), &PsiDriftConfig::default())
511            .unwrap();
512
513        profile.config.alert_config.features_to_monitor = features.clone();
514
515        assert_eq!(profile.features.len(), 3);
516
517        let feature_queue = PsiFeatureQueue::new(profile);
518
519        assert_eq!(feature_queue.empty_queue.len(), 3);
520
521        let mut batch_features = Vec::new();
522        for _ in 0..9 {
523            let one = Feature::float("feature_1".to_string(), 1.0);
524            let two = Feature::float("feature_2".to_string(), 10.0);
525            let three = Feature::float("feature_3".to_string(), 10000.0);
526
527            let features = Features {
528                features: vec![one, two, three],
529                entity_type: EntityType::Feature,
530            };
531
532            batch_features.push(features);
533        }
534
535        let drift_records = feature_queue
536            .create_drift_records_from_batch(batch_features)
537            .unwrap();
538
539        // We have 3 features, the 3 features are numeric in nature and thus should have 10 bins assigned per due to our current decile approach.
540        // Each record contains information for a given feature bin pair and this we should see a vec of len 30
541        assert_eq!(drift_records.len(), 30);
542    }
543
544    #[test]
545    fn test_feature_queue_insert_numeric_non_finite() {
546        let min = 1.0;
547        let max = 87.0;
548        let mut array = Array::random((1030, 3), Uniform::new(min, max));
549
550        // Ensure that each column has at least one `1.0` and one `87.0`
551        for col in 0..3 {
552            array[[0, col]] = min;
553            array[[1, col]] = max;
554        }
555
556        let features = vec![
557            "feature_1".to_string(),
558            "feature_2".to_string(),
559            "feature_3".to_string(),
560        ];
561
562        let monitor = PsiMonitor::new();
563
564        let alert_config = PsiAlertConfig {
565            features_to_monitor: features.clone(),
566            ..Default::default()
567        };
568
569        let config = PsiDriftConfig {
570            space: "name".to_string(),
571            name: "repo".to_string(),
572            version: DEFAULT_VERSION.to_string(),
573            alert_config,
574            ..Default::default()
575        };
576
577        let profile = monitor
578            .create_2d_drift_profile(&features, &array.view(), &config)
579            .unwrap();
580        assert_eq!(profile.features.len(), 3);
581
582        let feature_queue = PsiFeatureQueue::new(profile);
583
584        assert_eq!(feature_queue.empty_queue.len(), 3);
585
586        let mut batch_features = Vec::new();
587        let non_finite_values = [f64::INFINITY, f64::NEG_INFINITY, f64::NAN];
588
589        for i in 0..9 {
590            let one = Feature::float("feature_1".to_string(), min);
591            // Randomly select a non-finite value for feature_2
592            let two = Feature::float("feature_2".to_string(), non_finite_values[i % 3]);
593            let three = Feature::float("feature_3".to_string(), max);
594            let features = Features {
595                features: vec![one, two, three],
596                entity_type: EntityType::Feature,
597            };
598            batch_features.push(features);
599        }
600
601        let mut queue = feature_queue.empty_queue.clone();
602        for feature in batch_features {
603            feature_queue.insert(&feature.features, &mut queue).unwrap();
604        }
605
606        assert_eq!(*queue.get("feature_1").unwrap().get(&1).unwrap(), 9);
607        assert!((1..=10).all(|bin| *queue.get("feature_2").unwrap().get(&bin).unwrap() == 0));
608        assert_eq!(*queue.get("feature_3").unwrap().get(&10).unwrap(), 9);
609    }
610}