scouter_events/queue/psi/
feature_queue.rs

1use crate::error::FeatureQueueError;
2use crate::queue::traits::FeatureQueue;
3use core::result::Result::Ok;
4use scouter_drift::psi::monitor::PsiMonitor;
5use scouter_types::{
6    psi::{Bin, BinType, PsiDriftProfile},
7    Feature, PsiServerRecord, QueueExt, ServerRecord, ServerRecords,
8};
9use std::collections::HashMap;
10use tracing::{debug, error, instrument};
11
12pub struct PsiFeatureQueue {
13    pub drift_profile: PsiDriftProfile,
14    pub empty_queue: HashMap<String, HashMap<usize, usize>>,
15    pub monitor: PsiMonitor,
16    pub feature_names: Vec<String>,
17}
18
19impl PsiFeatureQueue {
20    #[instrument(skip_all)]
21    fn find_numeric_bin_given_scaler(
22        value: f64,
23        bins: &[Bin],
24    ) -> Result<&usize, FeatureQueueError> {
25        let bin = bins
26            .iter()
27            .find(|bin| value > bin.lower_limit.unwrap() && value <= bin.upper_limit.unwrap())
28            .map(|bin| &bin.id);
29
30        match bin {
31            Some(bin) => Ok(bin),
32            None => {
33                error!("Failed to find bin for value: {}", value);
34                Err(FeatureQueueError::GetBinError)
35            }
36        }
37    }
38
39    #[instrument(skip_all)]
40    fn process_numeric_queue(
41        queue: &mut HashMap<usize, usize>,
42        value: f64,
43        bins: &[Bin],
44    ) -> Result<(), FeatureQueueError> {
45        let bin_id = Self::find_numeric_bin_given_scaler(value, bins)?;
46        let count = queue
47            .get_mut(bin_id)
48            .ok_or(FeatureQueueError::GetBinError)
49            .map_err(|e| {
50                error!("Error processing numeric queue: {:?}", e);
51                e
52            })?;
53        *count += 1;
54
55        Ok(())
56    }
57
58    #[instrument(skip_all)]
59    fn process_categorical_queue(
60        queue: &mut HashMap<usize, usize>,
61        value: &usize,
62    ) -> Result<(), FeatureQueueError> {
63        let count = queue
64            .get_mut(value)
65            .ok_or(FeatureQueueError::GetBinError)
66            .inspect_err(|e| {
67                error!("Error processing categorical queue: {:?}", e);
68            })?;
69        *count += 1;
70        Ok(())
71    }
72
73    pub fn new(drift_profile: PsiDriftProfile) -> Self {
74        let features_to_monitor = drift_profile
75            .config
76            .alert_config
77            .features_to_monitor
78            .clone();
79
80        let empty_queue: HashMap<String, HashMap<usize, usize>> = drift_profile
81            .features
82            .iter()
83            .filter(|(feature_name, _)| features_to_monitor.contains(feature_name))
84            .map(|(feature_name, feature_drift_profile)| {
85                let inner_map: HashMap<usize, usize> = feature_drift_profile
86                    .bins
87                    .iter()
88                    .map(|bin| (bin.id, 0))
89                    .collect();
90                (feature_name.clone(), inner_map)
91            })
92            .collect();
93
94        let feature_names = empty_queue.keys().cloned().collect();
95
96        PsiFeatureQueue {
97            drift_profile,
98            empty_queue,
99            monitor: PsiMonitor::new(),
100            feature_names,
101        }
102    }
103
104    #[instrument(skip_all, name = "insert_psi")]
105    pub fn insert(
106        &self,
107        features: &[Feature],
108        queue: &mut HashMap<String, HashMap<usize, usize>>,
109    ) -> Result<(), FeatureQueueError> {
110        let feat_map = &self.drift_profile.config.feature_map;
111        for feature in features.iter() {
112            if let Some(feature_drift_profile) = self.drift_profile.features.get(feature.name()) {
113                let name = feature.name().to_string();
114
115                // if feature not in features_to_monitor, skip
116                if !self.feature_names.contains(&name) {
117                    continue;
118                }
119
120                let bins = &feature_drift_profile.bins;
121                let queue = queue
122                    .get_mut(&name)
123                    .ok_or(FeatureQueueError::GetFeatureError)?;
124
125                match feature_drift_profile.bin_type {
126                    BinType::Numeric => {
127                        let value = feature.to_float(feat_map).map_err(|e| {
128                            error!("Error converting feature to float: {:?}", e);
129                            FeatureQueueError::InvalidValueError(
130                                feature.name().to_string(),
131                                e.to_string(),
132                            )
133                        })?;
134
135                        Self::process_numeric_queue(queue, value, bins)?
136                    }
137                    BinType::Category => {
138                        let value = feature.to_usize(feat_map).map_err(|e| {
139                            error!("Error converting feature to usize: {:?}", e);
140                            FeatureQueueError::InvalidValueError(
141                                feature.name().to_string(),
142                                e.to_string(),
143                            )
144                        })?;
145
146                        Self::process_categorical_queue(queue, &value)?
147                    }
148                }
149            }
150        }
151        Ok(())
152    }
153
154    #[instrument(skip_all)]
155    pub fn create_drift_records(
156        &self,
157        queue: HashMap<String, HashMap<usize, usize>>,
158    ) -> Result<ServerRecords, FeatureQueueError> {
159        // filter out any feature thats not in features_to_monitor
160        // Keep feature if any value in the bin map is greater than 0
161
162        let filtered_queue = queue
163            .iter()
164            .filter(|(_, bin_map)| bin_map.iter().any(|(_, count)| *count > 0))
165            .collect::<HashMap<_, _>>();
166
167        debug!("Filtered queue count: {:?}", filtered_queue.len());
168
169        let records = filtered_queue
170            .iter()
171            .flat_map(|(feature_name, bin_map)| {
172                bin_map.iter().map(move |(bin_id, count)| {
173                    ServerRecord::Psi(PsiServerRecord::new(
174                        self.drift_profile.config.space.clone(),
175                        self.drift_profile.config.name.clone(),
176                        self.drift_profile.config.version.clone(),
177                        feature_name.to_string(),
178                        *bin_id,
179                        *count,
180                    ))
181                })
182            })
183            .collect::<Vec<ServerRecord>>();
184
185        Ok(ServerRecords::new(records))
186    }
187}
188
189impl FeatureQueue for PsiFeatureQueue {
190    fn create_drift_records_from_batch<T: QueueExt>(
191        &self,
192        batch: Vec<T>,
193    ) -> Result<ServerRecords, FeatureQueueError> {
194        // clones the empty map (so we don't need to recreate it on each call)
195        let mut queue = self.empty_queue.clone();
196
197        for elem in batch {
198            self.insert(elem.features(), &mut queue)?;
199        }
200
201        self.create_drift_records(queue)
202    }
203}
204
205#[cfg(test)]
206mod tests {
207
208    use super::*;
209    use ndarray::{Array, Axis};
210    use ndarray_rand::rand::distributions::Bernoulli;
211    use ndarray_rand::rand_distr::Uniform;
212    use ndarray_rand::RandomExt;
213    use scouter_drift::utils::CategoricalFeatureHelpers;
214    use scouter_types::psi::PsiAlertConfig;
215    use scouter_types::psi::PsiDriftConfig;
216    use scouter_types::EntityType;
217    use scouter_types::{Features, DEFAULT_VERSION};
218
219    #[test]
220    fn test_feature_queue_insert_numeric() {
221        let min = 1.0;
222        let max = 87.0;
223        let mut array = Array::random((1030, 3), Uniform::new(min, max));
224
225        // Ensure that each column has at least one `1.0` and one `87.0`
226        for col in 0..3 {
227            array[[0, col]] = min;
228            array[[1, col]] = max;
229        }
230
231        let features = vec![
232            "feature_1".to_string(),
233            "feature_2".to_string(),
234            "feature_3".to_string(),
235        ];
236
237        let monitor = PsiMonitor::new();
238
239        let alert_config = PsiAlertConfig {
240            features_to_monitor: features.clone(),
241            ..Default::default()
242        };
243        let config = PsiDriftConfig::new("name", "repo", DEFAULT_VERSION, alert_config, None, None);
244
245        let profile = monitor
246            .create_2d_drift_profile(&features, &array.view(), &config.unwrap())
247            .unwrap();
248        assert_eq!(profile.features.len(), 3);
249
250        let feature_queue = PsiFeatureQueue::new(profile);
251
252        assert_eq!(feature_queue.empty_queue.len(), 3);
253
254        let mut batch_features = Vec::new();
255        for _ in 0..9 {
256            let one = Feature::float("feature_1".to_string(), min);
257            let two = Feature::float("feature_2".to_string(), min);
258            let three = Feature::float("feature_3".to_string(), max);
259
260            let features = Features {
261                features: vec![one, two, three],
262                entity_type: EntityType::Feature,
263            };
264
265            batch_features.push(features);
266        }
267
268        let mut queue = feature_queue.empty_queue.clone();
269        for feature in batch_features {
270            feature_queue.insert(&feature.features, &mut queue).unwrap();
271        }
272
273        assert_eq!(*queue.get("feature_1").unwrap().get(&1).unwrap(), 9);
274        assert_eq!(*queue.get("feature_2").unwrap().get(&1).unwrap(), 9);
275        assert_eq!(*queue.get("feature_3").unwrap().get(&10).unwrap(), 9);
276    }
277
278    #[test]
279    fn test_feature_queue_insert_numeric_categorical() {
280        let numeric_cat_column =
281            Array::random((100, 1), Bernoulli::new(0.5).unwrap())
282                .mapv(|x| if x { 1.0 } else { 0.0 });
283        let uniform_column = Array::random((100, 1), Uniform::new(0.0, 20.0));
284        let array = ndarray::concatenate![Axis(1), numeric_cat_column, uniform_column];
285
286        let features = vec!["feature_1".to_string(), "feature_2".to_string()];
287
288        let monitor = PsiMonitor::new();
289
290        let drift_config = PsiDriftConfig {
291            categorical_features: Some(features.clone()),
292            ..Default::default()
293        };
294
295        let mut profile = monitor
296            .create_2d_drift_profile(&features, &array.view(), &drift_config)
297            .unwrap();
298        profile.config.alert_config.features_to_monitor = features.clone();
299
300        assert_eq!(profile.features.len(), 2);
301
302        let feature_queue = PsiFeatureQueue::new(profile);
303
304        assert_eq!(feature_queue.empty_queue.len(), 2);
305
306        let mut batch_features = Vec::new();
307        for _ in 0..9 {
308            let one = Feature::float("feature_1".to_string(), 0.0);
309            let two = Feature::float("feature_2".to_string(), 1.0);
310
311            let features = Features {
312                features: vec![one, two],
313                entity_type: EntityType::Feature,
314            };
315
316            batch_features.push(features);
317        }
318
319        let mut queue = feature_queue.empty_queue.clone();
320        for feature in batch_features {
321            feature_queue.insert(&feature.features, &mut queue).unwrap();
322        }
323
324        assert_eq!(*queue.get("feature_1").unwrap().get(&0).unwrap(), 9);
325        assert_eq!(*queue.get("feature_2").unwrap().get(&1).unwrap(), 9);
326    }
327
328    #[test]
329    fn test_feature_queue_insert_categorical() {
330        let psi_monitor = PsiMonitor::default();
331        let string_vec = vec![
332            vec![
333                "a".to_string(),
334                "b".to_string(),
335                "c".to_string(),
336                "d".to_string(),
337                "e".to_string(),
338            ],
339            vec![
340                "a".to_string(),
341                "a".to_string(),
342                "a".to_string(),
343                "b".to_string(),
344                "b".to_string(),
345            ],
346        ];
347
348        let string_features = vec!["feature_1".to_string(), "feature_2".to_string()];
349
350        let feature_map = psi_monitor
351            .create_feature_map(&string_features, &string_vec)
352            .unwrap();
353
354        assert_eq!(feature_map.features.len(), 2);
355
356        let mut config = PsiDriftConfig {
357            feature_map: feature_map.clone(),
358            categorical_features: Some(string_features.clone()),
359            ..Default::default()
360        };
361
362        config.alert_config.features_to_monitor =
363            vec!["feature_1".to_string(), "feature_2".to_string()];
364
365        let array = psi_monitor
366            .convert_strings_to_ndarray_f64(&string_features, &string_vec, &feature_map)
367            .unwrap();
368
369        assert_eq!(array.shape(), &[5, 2]);
370
371        let profile = psi_monitor
372            .create_2d_drift_profile(&string_features, &array.view(), &config)
373            .unwrap();
374        assert_eq!(profile.features.len(), 2);
375
376        let feature_queue = PsiFeatureQueue::new(profile);
377
378        assert_eq!(feature_queue.empty_queue.len(), 2);
379
380        let mut batch_features = Vec::new();
381        for _ in 0..9 {
382            let one = Feature::string("feature_1".to_string(), "c".to_string());
383            let two = Feature::string("feature_2".to_string(), "a".to_string());
384
385            let features = Features {
386                features: vec![one, two],
387                entity_type: EntityType::Feature,
388            };
389            batch_features.push(features);
390        }
391
392        let mut queue = feature_queue.empty_queue.clone();
393        for feature in batch_features {
394            feature_queue.insert(&feature.features, &mut queue).unwrap();
395        }
396
397        assert_eq!(*queue.get("feature_1").unwrap().get(&2).unwrap(), 9);
398        assert_eq!(*queue.get("feature_2").unwrap().get(&0).unwrap(), 9);
399    }
400
401    #[test]
402    fn test_feature_queue_is_empty() {
403        let psi_monitor = PsiMonitor::default();
404        let string_vec = vec![
405            vec![
406                "a".to_string(),
407                "b".to_string(),
408                "c".to_string(),
409                "d".to_string(),
410                "e".to_string(),
411            ],
412            vec![
413                "a".to_string(),
414                "a".to_string(),
415                "a".to_string(),
416                "b".to_string(),
417                "b".to_string(),
418            ],
419        ];
420
421        let string_features = vec!["feature_1".to_string(), "feature_2".to_string()];
422
423        let feature_map = psi_monitor
424            .create_feature_map(&string_features, &string_vec)
425            .unwrap();
426
427        assert_eq!(feature_map.features.len(), 2);
428
429        let array = psi_monitor
430            .convert_strings_to_ndarray_f64(&string_features, &string_vec, &feature_map)
431            .unwrap();
432
433        assert_eq!(array.shape(), &[5, 2]);
434
435        let mut config = PsiDriftConfig {
436            feature_map,
437            ..Default::default()
438        };
439
440        config.alert_config.features_to_monitor =
441            vec!["feature_1".to_string(), "feature_2".to_string()];
442
443        let profile = psi_monitor
444            .create_2d_drift_profile(&string_features, &array.view(), &config)
445            .unwrap();
446        assert_eq!(profile.features.len(), 2);
447
448        let feature_queue = PsiFeatureQueue::new(profile);
449
450        assert_eq!(feature_queue.empty_queue.len(), 2);
451
452        let mut batch_features = Vec::new();
453        for _ in 0..9 {
454            let one = Feature::string("feature_1".to_string(), "c".to_string());
455            let two = Feature::string("feature_2".to_string(), "a".to_string());
456
457            let features = Features {
458                features: vec![one, two],
459                entity_type: EntityType::Feature,
460            };
461
462            batch_features.push(features);
463        }
464
465        let mut queue = feature_queue.empty_queue.clone();
466        for feature in batch_features {
467            feature_queue.insert(&feature.features, &mut queue).unwrap();
468        }
469
470        let is_empty = !queue
471            .values()
472            .any(|bin_map| bin_map.values().any(|count| *count > 0));
473
474        assert_eq!(is_empty as u8, 0);
475    }
476
477    #[test]
478    fn test_feature_queue_create_drift_records() {
479        let array = Array::random((1030, 3), Uniform::new(1.0, 100.0));
480        let features = vec![
481            "feature_1".to_string(),
482            "feature_2".to_string(),
483            "feature_3".to_string(),
484        ];
485
486        let monitor = PsiMonitor::new();
487
488        let mut profile = monitor
489            .create_2d_drift_profile(&features, &array.view(), &PsiDriftConfig::default())
490            .unwrap();
491
492        profile.config.alert_config.features_to_monitor = features.clone();
493
494        assert_eq!(profile.features.len(), 3);
495
496        let feature_queue = PsiFeatureQueue::new(profile);
497
498        assert_eq!(feature_queue.empty_queue.len(), 3);
499
500        let mut batch_features = Vec::new();
501        for _ in 0..9 {
502            let one = Feature::float("feature_1".to_string(), 1.0);
503            let two = Feature::float("feature_2".to_string(), 10.0);
504            let three = Feature::float("feature_3".to_string(), 10000.0);
505
506            let features = Features {
507                features: vec![one, two, three],
508                entity_type: EntityType::Feature,
509            };
510
511            batch_features.push(features);
512        }
513
514        let drift_records = feature_queue
515            .create_drift_records_from_batch(batch_features)
516            .unwrap();
517
518        // We have 3 features, the 3 features are numeric in nature and thus should have 10 bins assigned per due to our current decile approach.
519        // Each record contains information for a given feature bin pair and this we should see a vec of len 30
520        assert_eq!(drift_records.records.len(), 30);
521    }
522}