scouter_events/queue/psi/
feature_queue.rs

1use crate::error::FeatureQueueError;
2use crate::queue::traits::FeatureQueue;
3use core::result::Result::Ok;
4use scouter_drift::psi::monitor::PsiMonitor;
5use scouter_types::{
6    psi::{Bin, BinType, PsiDriftProfile},
7    Feature, PsiServerRecord, QueueExt, ServerRecord, ServerRecords,
8};
9use std::collections::HashMap;
10use tracing::{debug, error, instrument};
11
12pub struct PsiFeatureQueue {
13    pub drift_profile: PsiDriftProfile,
14    pub empty_queue: HashMap<String, HashMap<usize, usize>>,
15    pub monitor: PsiMonitor,
16    pub feature_names: Vec<String>,
17}
18
19impl PsiFeatureQueue {
20    #[instrument(skip_all)]
21    fn find_numeric_bin_given_scaler(
22        value: f64,
23        bins: &[Bin],
24    ) -> Result<&usize, FeatureQueueError> {
25        let bin = bins
26            .iter()
27            .find(|bin| value > bin.lower_limit.unwrap() && value <= bin.upper_limit.unwrap())
28            .map(|bin| &bin.id);
29
30        match bin {
31            Some(bin) => Ok(bin),
32            None => {
33                error!("Failed to find bin for value: {}", value);
34                Err(FeatureQueueError::GetBinError)
35            }
36        }
37    }
38
39    #[instrument(skip_all)]
40    fn process_numeric_queue(
41        queue: &mut HashMap<usize, usize>,
42        value: f64,
43        bins: &[Bin],
44    ) -> Result<(), FeatureQueueError> {
45        let bin_id = Self::find_numeric_bin_given_scaler(value, bins)?;
46        let count = queue
47            .get_mut(bin_id)
48            .ok_or(FeatureQueueError::GetBinError)
49            .map_err(|e| {
50                error!("Error processing numeric queue: {:?}", e);
51                e
52            })?;
53        *count += 1;
54
55        Ok(())
56    }
57
58    #[instrument(skip_all)]
59    fn process_categorical_queue(
60        queue: &mut HashMap<usize, usize>,
61        value: &usize,
62    ) -> Result<(), FeatureQueueError> {
63        let count = queue
64            .get_mut(value)
65            .ok_or(FeatureQueueError::GetBinError)
66            .inspect_err(|e| {
67                error!("Error processing categorical queue: {:?}", e);
68            })?;
69        *count += 1;
70        Ok(())
71    }
72
73    pub fn new(drift_profile: PsiDriftProfile) -> Self {
74        let features_to_monitor = drift_profile
75            .config
76            .alert_config
77            .features_to_monitor
78            .clone();
79
80        let empty_queue: HashMap<String, HashMap<usize, usize>> = drift_profile
81            .features
82            .iter()
83            .filter(|(feature_name, _)| features_to_monitor.contains(feature_name))
84            .map(|(feature_name, feature_drift_profile)| {
85                let inner_map: HashMap<usize, usize> = feature_drift_profile
86                    .bins
87                    .iter()
88                    .map(|bin| (bin.id, 0))
89                    .collect();
90                (feature_name.clone(), inner_map)
91            })
92            .collect();
93
94        let feature_names = empty_queue.keys().cloned().collect();
95
96        PsiFeatureQueue {
97            drift_profile,
98            empty_queue,
99            monitor: PsiMonitor::new(),
100            feature_names,
101        }
102    }
103
104    #[instrument(skip_all, name = "insert_psi")]
105    pub fn insert(
106        &self,
107        features: &[Feature],
108        queue: &mut HashMap<String, HashMap<usize, usize>>,
109    ) -> Result<(), FeatureQueueError> {
110        let feat_map = &self.drift_profile.config.feature_map;
111        for feature in features.iter() {
112            if let Some(feature_drift_profile) = self.drift_profile.features.get(feature.name()) {
113                let name = feature.name().to_string();
114
115                // if feature not in features_to_monitor, skip
116                if !self.feature_names.contains(&name) {
117                    error!(
118                        "Feature {} not in features to monitor, skipping",
119                        feature.name()
120                    );
121                    continue;
122                }
123
124                let bins = &feature_drift_profile.bins;
125                let queue = queue
126                    .get_mut(&name)
127                    .ok_or(FeatureQueueError::GetFeatureError)?;
128
129                match feature_drift_profile.bin_type {
130                    BinType::Numeric => {
131                        let value = feature.to_float(feat_map).map_err(|e| {
132                            error!("Error converting feature to float: {:?}", e);
133                            FeatureQueueError::InvalidValueError(
134                                feature.name().to_string(),
135                                e.to_string(),
136                            )
137                        })?;
138
139                        Self::process_numeric_queue(queue, value, bins)?
140                    }
141                    BinType::Category => {
142                        let value = feature.to_usize(feat_map).map_err(|e| {
143                            error!("Error converting feature to usize: {:?}", e);
144                            FeatureQueueError::InvalidValueError(
145                                feature.name().to_string(),
146                                e.to_string(),
147                            )
148                        })?;
149
150                        Self::process_categorical_queue(queue, &value)?
151                    }
152                }
153            }
154        }
155        Ok(())
156    }
157
158    #[instrument(skip_all)]
159    pub fn create_drift_records(
160        &self,
161        queue: HashMap<String, HashMap<usize, usize>>,
162    ) -> Result<ServerRecords, FeatureQueueError> {
163        // filter out any feature thats not in features_to_monitor
164        // Keep feature if any value in the bin map is greater than 0
165
166        let filtered_queue = queue
167            .iter()
168            .filter(|(_, bin_map)| bin_map.iter().any(|(_, count)| *count > 0))
169            .collect::<HashMap<_, _>>();
170
171        debug!("Filtered queue count: {:?}", filtered_queue.len());
172
173        let records = filtered_queue
174            .iter()
175            .flat_map(|(feature_name, bin_map)| {
176                bin_map.iter().map(move |(bin_id, count)| {
177                    ServerRecord::Psi(PsiServerRecord::new(
178                        self.drift_profile.config.space.clone(),
179                        self.drift_profile.config.name.clone(),
180                        self.drift_profile.config.version.clone(),
181                        feature_name.to_string(),
182                        *bin_id,
183                        *count,
184                    ))
185                })
186            })
187            .collect::<Vec<ServerRecord>>();
188
189        Ok(ServerRecords::new(records))
190    }
191}
192
193impl FeatureQueue for PsiFeatureQueue {
194    fn create_drift_records_from_batch<T: QueueExt>(
195        &self,
196        batch: Vec<T>,
197    ) -> Result<ServerRecords, FeatureQueueError> {
198        // clones the empty map (so we don't need to recreate it on each call)
199        let mut queue = self.empty_queue.clone();
200
201        for elem in batch {
202            self.insert(elem.features(), &mut queue)?;
203        }
204
205        self.create_drift_records(queue)
206    }
207}
208
209#[cfg(test)]
210mod tests {
211
212    use super::*;
213    use ndarray::{Array, Axis};
214    use ndarray_rand::rand::distributions::Bernoulli;
215    use ndarray_rand::rand_distr::Uniform;
216    use ndarray_rand::RandomExt;
217    use scouter_drift::utils::CategoricalFeatureHelpers;
218    use scouter_types::psi::PsiAlertConfig;
219    use scouter_types::psi::PsiDriftConfig;
220    use scouter_types::EntityType;
221    use scouter_types::{Features, DEFAULT_VERSION};
222
223    #[test]
224    fn test_feature_queue_insert_numeric() {
225        let min = 1.0;
226        let max = 87.0;
227        let mut array = Array::random((1030, 3), Uniform::new(min, max));
228
229        // Ensure that each column has at least one `1.0` and one `87.0`
230        for col in 0..3 {
231            array[[0, col]] = min;
232            array[[1, col]] = max;
233        }
234
235        let features = vec![
236            "feature_1".to_string(),
237            "feature_2".to_string(),
238            "feature_3".to_string(),
239        ];
240
241        let monitor = PsiMonitor::new();
242
243        let alert_config = PsiAlertConfig {
244            features_to_monitor: features.clone(),
245            ..Default::default()
246        };
247        let config = PsiDriftConfig::new("name", "repo", DEFAULT_VERSION, alert_config, None, None);
248
249        let profile = monitor
250            .create_2d_drift_profile(&features, &array.view(), &config.unwrap())
251            .unwrap();
252        assert_eq!(profile.features.len(), 3);
253
254        let feature_queue = PsiFeatureQueue::new(profile);
255
256        assert_eq!(feature_queue.empty_queue.len(), 3);
257
258        let mut batch_features = Vec::new();
259        for _ in 0..9 {
260            let one = Feature::float("feature_1".to_string(), min);
261            let two = Feature::float("feature_2".to_string(), min);
262            let three = Feature::float("feature_3".to_string(), max);
263
264            let features = Features {
265                features: vec![one, two, three],
266                entity_type: EntityType::Feature,
267            };
268
269            batch_features.push(features);
270        }
271
272        let mut queue = feature_queue.empty_queue.clone();
273        for feature in batch_features {
274            feature_queue.insert(&feature.features, &mut queue).unwrap();
275        }
276
277        assert_eq!(*queue.get("feature_1").unwrap().get(&1).unwrap(), 9);
278        assert_eq!(*queue.get("feature_2").unwrap().get(&1).unwrap(), 9);
279        assert_eq!(*queue.get("feature_3").unwrap().get(&10).unwrap(), 9);
280    }
281
282    #[test]
283    fn test_feature_queue_insert_numeric_categorical() {
284        let numeric_cat_column =
285            Array::random((100, 1), Bernoulli::new(0.5).unwrap())
286                .mapv(|x| if x { 1.0 } else { 0.0 });
287        let uniform_column = Array::random((100, 1), Uniform::new(0.0, 20.0));
288        let array = ndarray::concatenate![Axis(1), numeric_cat_column, uniform_column];
289
290        let features = vec!["feature_1".to_string(), "feature_2".to_string()];
291
292        let monitor = PsiMonitor::new();
293
294        let drift_config = PsiDriftConfig {
295            categorical_features: Some(features.clone()),
296            ..Default::default()
297        };
298
299        let mut profile = monitor
300            .create_2d_drift_profile(&features, &array.view(), &drift_config)
301            .unwrap();
302        profile.config.alert_config.features_to_monitor = features.clone();
303
304        assert_eq!(profile.features.len(), 2);
305
306        let feature_queue = PsiFeatureQueue::new(profile);
307
308        assert_eq!(feature_queue.empty_queue.len(), 2);
309
310        let mut batch_features = Vec::new();
311        for _ in 0..9 {
312            let one = Feature::float("feature_1".to_string(), 0.0);
313            let two = Feature::float("feature_2".to_string(), 1.0);
314
315            let features = Features {
316                features: vec![one, two],
317                entity_type: EntityType::Feature,
318            };
319
320            batch_features.push(features);
321        }
322
323        let mut queue = feature_queue.empty_queue.clone();
324        for feature in batch_features {
325            feature_queue.insert(&feature.features, &mut queue).unwrap();
326        }
327
328        assert_eq!(*queue.get("feature_1").unwrap().get(&0).unwrap(), 9);
329        assert_eq!(*queue.get("feature_2").unwrap().get(&1).unwrap(), 9);
330    }
331
332    #[test]
333    fn test_feature_queue_insert_categorical() {
334        let psi_monitor = PsiMonitor::default();
335        let string_vec = vec![
336            vec![
337                "a".to_string(),
338                "b".to_string(),
339                "c".to_string(),
340                "d".to_string(),
341                "e".to_string(),
342            ],
343            vec![
344                "a".to_string(),
345                "a".to_string(),
346                "a".to_string(),
347                "b".to_string(),
348                "b".to_string(),
349            ],
350        ];
351
352        let string_features = vec!["feature_1".to_string(), "feature_2".to_string()];
353
354        let feature_map = psi_monitor
355            .create_feature_map(&string_features, &string_vec)
356            .unwrap();
357
358        assert_eq!(feature_map.features.len(), 2);
359
360        let mut config = PsiDriftConfig {
361            feature_map: feature_map.clone(),
362            categorical_features: Some(string_features.clone()),
363            ..Default::default()
364        };
365
366        config.alert_config.features_to_monitor =
367            vec!["feature_1".to_string(), "feature_2".to_string()];
368
369        let array = psi_monitor
370            .convert_strings_to_ndarray_f64(&string_features, &string_vec, &feature_map)
371            .unwrap();
372
373        assert_eq!(array.shape(), &[5, 2]);
374
375        let profile = psi_monitor
376            .create_2d_drift_profile(&string_features, &array.view(), &config)
377            .unwrap();
378        assert_eq!(profile.features.len(), 2);
379
380        let feature_queue = PsiFeatureQueue::new(profile);
381
382        assert_eq!(feature_queue.empty_queue.len(), 2);
383
384        let mut batch_features = Vec::new();
385        for _ in 0..9 {
386            let one = Feature::string("feature_1".to_string(), "c".to_string());
387            let two = Feature::string("feature_2".to_string(), "a".to_string());
388
389            let features = Features {
390                features: vec![one, two],
391                entity_type: EntityType::Feature,
392            };
393            batch_features.push(features);
394        }
395
396        let mut queue = feature_queue.empty_queue.clone();
397        for feature in batch_features {
398            feature_queue.insert(&feature.features, &mut queue).unwrap();
399        }
400
401        assert_eq!(*queue.get("feature_1").unwrap().get(&2).unwrap(), 9);
402        assert_eq!(*queue.get("feature_2").unwrap().get(&0).unwrap(), 9);
403    }
404
405    #[test]
406    fn test_feature_queue_is_empty() {
407        let psi_monitor = PsiMonitor::default();
408        let string_vec = vec![
409            vec![
410                "a".to_string(),
411                "b".to_string(),
412                "c".to_string(),
413                "d".to_string(),
414                "e".to_string(),
415            ],
416            vec![
417                "a".to_string(),
418                "a".to_string(),
419                "a".to_string(),
420                "b".to_string(),
421                "b".to_string(),
422            ],
423        ];
424
425        let string_features = vec!["feature_1".to_string(), "feature_2".to_string()];
426
427        let feature_map = psi_monitor
428            .create_feature_map(&string_features, &string_vec)
429            .unwrap();
430
431        assert_eq!(feature_map.features.len(), 2);
432
433        let array = psi_monitor
434            .convert_strings_to_ndarray_f64(&string_features, &string_vec, &feature_map)
435            .unwrap();
436
437        assert_eq!(array.shape(), &[5, 2]);
438
439        let mut config = PsiDriftConfig {
440            feature_map,
441            ..Default::default()
442        };
443
444        config.alert_config.features_to_monitor =
445            vec!["feature_1".to_string(), "feature_2".to_string()];
446
447        let profile = psi_monitor
448            .create_2d_drift_profile(&string_features, &array.view(), &config)
449            .unwrap();
450        assert_eq!(profile.features.len(), 2);
451
452        let feature_queue = PsiFeatureQueue::new(profile);
453
454        assert_eq!(feature_queue.empty_queue.len(), 2);
455
456        let mut batch_features = Vec::new();
457        for _ in 0..9 {
458            let one = Feature::string("feature_1".to_string(), "c".to_string());
459            let two = Feature::string("feature_2".to_string(), "a".to_string());
460
461            let features = Features {
462                features: vec![one, two],
463                entity_type: EntityType::Feature,
464            };
465
466            batch_features.push(features);
467        }
468
469        let mut queue = feature_queue.empty_queue.clone();
470        for feature in batch_features {
471            feature_queue.insert(&feature.features, &mut queue).unwrap();
472        }
473
474        let is_empty = !queue
475            .values()
476            .any(|bin_map| bin_map.values().any(|count| *count > 0));
477
478        assert_eq!(is_empty as u8, 0);
479    }
480
481    #[test]
482    fn test_feature_queue_create_drift_records() {
483        let array = Array::random((1030, 3), Uniform::new(1.0, 100.0));
484        let features = vec![
485            "feature_1".to_string(),
486            "feature_2".to_string(),
487            "feature_3".to_string(),
488        ];
489
490        let monitor = PsiMonitor::new();
491
492        let mut profile = monitor
493            .create_2d_drift_profile(&features, &array.view(), &PsiDriftConfig::default())
494            .unwrap();
495
496        profile.config.alert_config.features_to_monitor = features.clone();
497
498        assert_eq!(profile.features.len(), 3);
499
500        let feature_queue = PsiFeatureQueue::new(profile);
501
502        assert_eq!(feature_queue.empty_queue.len(), 3);
503
504        let mut batch_features = Vec::new();
505        for _ in 0..9 {
506            let one = Feature::float("feature_1".to_string(), 1.0);
507            let two = Feature::float("feature_2".to_string(), 10.0);
508            let three = Feature::float("feature_3".to_string(), 10000.0);
509
510            let features = Features {
511                features: vec![one, two, three],
512                entity_type: EntityType::Feature,
513            };
514
515            batch_features.push(features);
516        }
517
518        let drift_records = feature_queue
519            .create_drift_records_from_batch(batch_features)
520            .unwrap();
521
522        // We have 3 features, the 3 features are numeric in nature and thus should have 10 bins assigned per due to our current decile approach.
523        // Each record contains information for a given feature bin pair and this we should see a vec of len 30
524        assert_eq!(drift_records.records.len(), 30);
525    }
526}