1use crate::error::FeatureQueueError;
2use crate::queue::traits::FeatureQueue;
3use core::result::Result::Ok;
4use scouter_drift::psi::monitor::PsiMonitor;
5use scouter_types::{
6 psi::{Bin, BinType, PsiDriftProfile},
7 Feature, MessageRecord, PsiServerRecord, QueueExt, ServerRecord, ServerRecords,
8};
9use std::collections::HashMap;
10use tracing::{debug, error, info, instrument};
11
12pub struct PsiFeatureQueue {
13 pub drift_profile: PsiDriftProfile,
14 pub empty_queue: HashMap<String, HashMap<usize, usize>>,
15 pub monitor: PsiMonitor,
16 pub feature_names: Vec<String>,
17}
18
19impl PsiFeatureQueue {
20 #[instrument(skip_all)]
21 fn find_numeric_bin_given_scaler(
22 value: f64,
23 bins: &[Bin],
24 ) -> Result<&usize, FeatureQueueError> {
25 let bin = bins
26 .iter()
27 .find(|bin| value > bin.lower_limit.unwrap() && value <= bin.upper_limit.unwrap())
28 .map(|bin| &bin.id);
29
30 match bin {
31 Some(bin) => Ok(bin),
32 None => {
33 error!("Failed to find bin for value: {}", value);
34 Err(FeatureQueueError::GetBinError)
35 }
36 }
37 }
38
39 #[instrument(skip_all)]
40 fn process_numeric_queue(
41 queue: &mut HashMap<usize, usize>,
42 value: f64,
43 bins: &[Bin],
44 ) -> Result<(), FeatureQueueError> {
45 let bin_id = Self::find_numeric_bin_given_scaler(value, bins)?;
46 let count = queue
47 .get_mut(bin_id)
48 .ok_or(FeatureQueueError::GetBinError)
49 .map_err(|e| {
50 error!("Error processing numeric queue: {:?}", e);
51 e
52 })?;
53 *count += 1;
54
55 Ok(())
56 }
57
58 #[instrument(skip_all)]
59 fn process_categorical_queue(
60 queue: &mut HashMap<usize, usize>,
61 value: &usize,
62 ) -> Result<(), FeatureQueueError> {
63 let count = queue
64 .get_mut(value)
65 .ok_or(FeatureQueueError::GetBinError)
66 .inspect_err(|e| {
67 error!("Error processing categorical queue: {:?}", e);
68 })?;
69 *count += 1;
70 Ok(())
71 }
72
73 pub fn new(drift_profile: PsiDriftProfile) -> Self {
74 let features_to_monitor = drift_profile
75 .config
76 .alert_config
77 .features_to_monitor
78 .clone();
79
80 let empty_queue: HashMap<String, HashMap<usize, usize>> = drift_profile
81 .features
82 .iter()
83 .filter(|(feature_name, _)| features_to_monitor.contains(feature_name))
84 .map(|(feature_name, feature_drift_profile)| {
85 let inner_map: HashMap<usize, usize> = feature_drift_profile
86 .bins
87 .iter()
88 .map(|bin| (bin.id, 0))
89 .collect();
90 (feature_name.clone(), inner_map)
91 })
92 .collect();
93
94 let feature_names = empty_queue.keys().cloned().collect();
95
96 PsiFeatureQueue {
97 drift_profile,
98 empty_queue,
99 monitor: PsiMonitor::new(),
100 feature_names,
101 }
102 }
103
104 #[instrument(skip_all, name = "insert_psi")]
105 pub fn insert(
106 &self,
107 features: &[Feature],
108 queue: &mut HashMap<String, HashMap<usize, usize>>,
109 ) -> Result<(), FeatureQueueError> {
110 let feat_map = &self.drift_profile.config.feature_map;
111 for feature in features.iter() {
112 if let Some(feature_drift_profile) = self.drift_profile.features.get(feature.name()) {
113 let name = feature.name().to_string();
114
115 if !self.feature_names.contains(&name) {
117 error!(
118 "Feature {} not in features to monitor, skipping",
119 feature.name()
120 );
121 continue;
122 }
123
124 let bins = &feature_drift_profile.bins;
125 let queue = queue
126 .get_mut(&name)
127 .ok_or(FeatureQueueError::GetFeatureError)?;
128
129 match feature_drift_profile.bin_type {
130 BinType::Numeric => {
131 let value = feature.to_float(feat_map).map_err(|e| {
132 error!("Error converting feature to float: {:?}", e);
133 FeatureQueueError::InvalidValueError(
134 feature.name().to_string(),
135 e.to_string(),
136 )
137 })?;
138
139 if !value.is_finite() {
140 info!(
141 "Non finite value detected for {}, value will not be inserted into queue",
142 feature.name()
143 );
144 continue;
145 }
146
147 Self::process_numeric_queue(queue, value, bins)?
148 }
149 BinType::Category => {
150 let value = feature.to_usize(feat_map).map_err(|e| {
151 error!("Error converting feature to usize: {:?}", e);
152 FeatureQueueError::InvalidValueError(
153 feature.name().to_string(),
154 e.to_string(),
155 )
156 })?;
157
158 Self::process_categorical_queue(queue, &value)?
159 }
160 }
161 }
162 }
163 Ok(())
164 }
165
166 #[instrument(skip_all)]
167 pub fn create_drift_records(
168 &self,
169 queue: HashMap<String, HashMap<usize, usize>>,
170 ) -> Result<ServerRecords, FeatureQueueError> {
171 let filtered_queue = queue
175 .iter()
176 .filter(|(_, bin_map)| bin_map.iter().any(|(_, count)| *count > 0))
177 .collect::<HashMap<_, _>>();
178
179 debug!("Filtered queue count: {:?}", filtered_queue.len());
180
181 let records = filtered_queue
182 .iter()
183 .flat_map(|(feature_name, bin_map)| {
184 bin_map.iter().map(move |(bin_id, count)| {
185 ServerRecord::Psi(PsiServerRecord::new(
186 self.drift_profile.config.space.clone(),
187 self.drift_profile.config.name.clone(),
188 self.drift_profile.config.version.clone(),
189 feature_name.to_string(),
190 *bin_id,
191 *count,
192 ))
193 })
194 })
195 .collect::<Vec<ServerRecord>>();
196
197 Ok(ServerRecords::new(records))
198 }
199}
200
201impl FeatureQueue for PsiFeatureQueue {
202 fn create_drift_records_from_batch<T: QueueExt>(
203 &self,
204 batch: Vec<T>,
205 ) -> Result<MessageRecord, FeatureQueueError> {
206 let mut queue = self.empty_queue.clone();
208
209 for elem in batch {
210 self.insert(elem.features(), &mut queue)?;
211 }
212
213 Ok(MessageRecord::ServerRecords(
214 self.create_drift_records(queue)?,
215 ))
216 }
217}
218
219#[cfg(test)]
220mod tests {
221
222 use super::*;
223 use ndarray::{Array, Axis};
224 use ndarray_rand::rand::distributions::Bernoulli;
225 use ndarray_rand::rand_distr::Uniform;
226 use ndarray_rand::RandomExt;
227 use scouter_drift::utils::CategoricalFeatureHelpers;
228 use scouter_types::psi::PsiAlertConfig;
229 use scouter_types::psi::PsiDriftConfig;
230 use scouter_types::EntityType;
231 use scouter_types::{Features, DEFAULT_VERSION};
232
233 #[test]
234 fn test_feature_queue_insert_numeric() {
235 let min = 1.0;
236 let max = 87.0;
237 let mut array = Array::random((1030, 3), Uniform::new(min, max));
238
239 for col in 0..3 {
241 array[[0, col]] = min;
242 array[[1, col]] = max;
243 }
244
245 let features = vec![
246 "feature_1".to_string(),
247 "feature_2".to_string(),
248 "feature_3".to_string(),
249 ];
250
251 let monitor = PsiMonitor::new();
252
253 let alert_config = PsiAlertConfig {
254 features_to_monitor: features.clone(),
255 ..Default::default()
256 };
257
258 let config = PsiDriftConfig {
259 space: "name".to_string(),
260 name: "repo".to_string(),
261 version: DEFAULT_VERSION.to_string(),
262 alert_config,
263 ..Default::default()
264 };
265
266 let profile = monitor
267 .create_2d_drift_profile(&features, &array.view(), &config)
268 .unwrap();
269 assert_eq!(profile.features.len(), 3);
270
271 let feature_queue = PsiFeatureQueue::new(profile);
272
273 assert_eq!(feature_queue.empty_queue.len(), 3);
274
275 let mut batch_features = Vec::new();
276 for _ in 0..9 {
277 let one = Feature::float("feature_1".to_string(), min);
278 let two = Feature::float("feature_2".to_string(), min);
279 let three = Feature::float("feature_3".to_string(), max);
280
281 let features = Features {
282 features: vec![one, two, three],
283 entity_type: EntityType::Feature,
284 };
285
286 batch_features.push(features);
287 }
288
289 let mut queue = feature_queue.empty_queue.clone();
290 for feature in batch_features {
291 feature_queue.insert(&feature.features, &mut queue).unwrap();
292 }
293
294 assert_eq!(*queue.get("feature_1").unwrap().get(&1).unwrap(), 9);
295 assert_eq!(*queue.get("feature_2").unwrap().get(&1).unwrap(), 9);
296 assert_eq!(*queue.get("feature_3").unwrap().get(&10).unwrap(), 9);
297 }
298
299 #[test]
300 fn test_feature_queue_insert_numeric_categorical() {
301 let numeric_cat_column =
302 Array::random((100, 1), Bernoulli::new(0.5).unwrap())
303 .mapv(|x| if x { 1.0 } else { 0.0 });
304 let uniform_column = Array::random((100, 1), Uniform::new(0.0, 20.0));
305 let array = ndarray::concatenate![Axis(1), numeric_cat_column, uniform_column];
306
307 let features = vec!["feature_1".to_string(), "feature_2".to_string()];
308
309 let monitor = PsiMonitor::new();
310
311 let drift_config = PsiDriftConfig {
312 categorical_features: Some(features.clone()),
313 ..Default::default()
314 };
315
316 let mut profile = monitor
317 .create_2d_drift_profile(&features, &array.view(), &drift_config)
318 .unwrap();
319 profile.config.alert_config.features_to_monitor = features.clone();
320
321 assert_eq!(profile.features.len(), 2);
322
323 let feature_queue = PsiFeatureQueue::new(profile);
324
325 assert_eq!(feature_queue.empty_queue.len(), 2);
326
327 let mut batch_features = Vec::new();
328 for _ in 0..9 {
329 let one = Feature::float("feature_1".to_string(), 0.0);
330 let two = Feature::float("feature_2".to_string(), 1.0);
331
332 let features = Features {
333 features: vec![one, two],
334 entity_type: EntityType::Feature,
335 };
336
337 batch_features.push(features);
338 }
339
340 let mut queue = feature_queue.empty_queue.clone();
341 for feature in batch_features {
342 feature_queue.insert(&feature.features, &mut queue).unwrap();
343 }
344
345 assert_eq!(*queue.get("feature_1").unwrap().get(&0).unwrap(), 9);
346 assert_eq!(*queue.get("feature_2").unwrap().get(&1).unwrap(), 9);
347 }
348
349 #[test]
350 fn test_feature_queue_insert_categorical() {
351 let psi_monitor = PsiMonitor::default();
352 let string_vec = vec![
353 vec![
354 "a".to_string(),
355 "b".to_string(),
356 "c".to_string(),
357 "d".to_string(),
358 "e".to_string(),
359 ],
360 vec![
361 "a".to_string(),
362 "a".to_string(),
363 "a".to_string(),
364 "b".to_string(),
365 "b".to_string(),
366 ],
367 ];
368
369 let string_features = vec!["feature_1".to_string(), "feature_2".to_string()];
370
371 let feature_map = psi_monitor
372 .create_feature_map(&string_features, &string_vec)
373 .unwrap();
374
375 assert_eq!(feature_map.features.len(), 2);
376
377 let mut config = PsiDriftConfig {
378 feature_map: feature_map.clone(),
379 categorical_features: Some(string_features.clone()),
380 ..Default::default()
381 };
382
383 config.alert_config.features_to_monitor =
384 vec!["feature_1".to_string(), "feature_2".to_string()];
385
386 let array = psi_monitor
387 .convert_strings_to_ndarray_f64(&string_features, &string_vec, &feature_map)
388 .unwrap();
389
390 assert_eq!(array.shape(), &[5, 2]);
391
392 let profile = psi_monitor
393 .create_2d_drift_profile(&string_features, &array.view(), &config)
394 .unwrap();
395 assert_eq!(profile.features.len(), 2);
396
397 let feature_queue = PsiFeatureQueue::new(profile);
398
399 assert_eq!(feature_queue.empty_queue.len(), 2);
400
401 let mut batch_features = Vec::new();
402 for _ in 0..9 {
403 let one = Feature::string("feature_1".to_string(), "c".to_string());
404 let two = Feature::string("feature_2".to_string(), "a".to_string());
405
406 let features = Features {
407 features: vec![one, two],
408 entity_type: EntityType::Feature,
409 };
410 batch_features.push(features);
411 }
412
413 let mut queue = feature_queue.empty_queue.clone();
414 for feature in batch_features {
415 feature_queue.insert(&feature.features, &mut queue).unwrap();
416 }
417
418 assert_eq!(*queue.get("feature_1").unwrap().get(&2).unwrap(), 9);
419 assert_eq!(*queue.get("feature_2").unwrap().get(&0).unwrap(), 9);
420 }
421
422 #[test]
423 fn test_feature_queue_is_empty() {
424 let psi_monitor = PsiMonitor::default();
425 let string_vec = vec![
426 vec![
427 "a".to_string(),
428 "b".to_string(),
429 "c".to_string(),
430 "d".to_string(),
431 "e".to_string(),
432 ],
433 vec![
434 "a".to_string(),
435 "a".to_string(),
436 "a".to_string(),
437 "b".to_string(),
438 "b".to_string(),
439 ],
440 ];
441
442 let string_features = vec!["feature_1".to_string(), "feature_2".to_string()];
443
444 let feature_map = psi_monitor
445 .create_feature_map(&string_features, &string_vec)
446 .unwrap();
447
448 assert_eq!(feature_map.features.len(), 2);
449
450 let array = psi_monitor
451 .convert_strings_to_ndarray_f64(&string_features, &string_vec, &feature_map)
452 .unwrap();
453
454 assert_eq!(array.shape(), &[5, 2]);
455
456 let mut config = PsiDriftConfig {
457 feature_map,
458 ..Default::default()
459 };
460
461 config.alert_config.features_to_monitor =
462 vec!["feature_1".to_string(), "feature_2".to_string()];
463
464 let profile = psi_monitor
465 .create_2d_drift_profile(&string_features, &array.view(), &config)
466 .unwrap();
467 assert_eq!(profile.features.len(), 2);
468
469 let feature_queue = PsiFeatureQueue::new(profile);
470
471 assert_eq!(feature_queue.empty_queue.len(), 2);
472
473 let mut batch_features = Vec::new();
474 for _ in 0..9 {
475 let one = Feature::string("feature_1".to_string(), "c".to_string());
476 let two = Feature::string("feature_2".to_string(), "a".to_string());
477
478 let features = Features {
479 features: vec![one, two],
480 entity_type: EntityType::Feature,
481 };
482
483 batch_features.push(features);
484 }
485
486 let mut queue = feature_queue.empty_queue.clone();
487 for feature in batch_features {
488 feature_queue.insert(&feature.features, &mut queue).unwrap();
489 }
490
491 let is_empty = !queue
492 .values()
493 .any(|bin_map| bin_map.values().any(|count| *count > 0));
494
495 assert_eq!(is_empty as u8, 0);
496 }
497
498 #[test]
499 fn test_feature_queue_create_drift_records() {
500 let array = Array::random((1030, 3), Uniform::new(1.0, 100.0));
501 let features = vec![
502 "feature_1".to_string(),
503 "feature_2".to_string(),
504 "feature_3".to_string(),
505 ];
506
507 let monitor = PsiMonitor::new();
508
509 let mut profile = monitor
510 .create_2d_drift_profile(&features, &array.view(), &PsiDriftConfig::default())
511 .unwrap();
512
513 profile.config.alert_config.features_to_monitor = features.clone();
514
515 assert_eq!(profile.features.len(), 3);
516
517 let feature_queue = PsiFeatureQueue::new(profile);
518
519 assert_eq!(feature_queue.empty_queue.len(), 3);
520
521 let mut batch_features = Vec::new();
522 for _ in 0..9 {
523 let one = Feature::float("feature_1".to_string(), 1.0);
524 let two = Feature::float("feature_2".to_string(), 10.0);
525 let three = Feature::float("feature_3".to_string(), 10000.0);
526
527 let features = Features {
528 features: vec![one, two, three],
529 entity_type: EntityType::Feature,
530 };
531
532 batch_features.push(features);
533 }
534
535 let drift_records = feature_queue
536 .create_drift_records_from_batch(batch_features)
537 .unwrap();
538
539 assert_eq!(drift_records.len(), 30);
542 }
543
544 #[test]
545 fn test_feature_queue_insert_numeric_non_finite() {
546 let min = 1.0;
547 let max = 87.0;
548 let mut array = Array::random((1030, 3), Uniform::new(min, max));
549
550 for col in 0..3 {
552 array[[0, col]] = min;
553 array[[1, col]] = max;
554 }
555
556 let features = vec![
557 "feature_1".to_string(),
558 "feature_2".to_string(),
559 "feature_3".to_string(),
560 ];
561
562 let monitor = PsiMonitor::new();
563
564 let alert_config = PsiAlertConfig {
565 features_to_monitor: features.clone(),
566 ..Default::default()
567 };
568
569 let config = PsiDriftConfig {
570 space: "name".to_string(),
571 name: "repo".to_string(),
572 version: DEFAULT_VERSION.to_string(),
573 alert_config,
574 ..Default::default()
575 };
576
577 let profile = monitor
578 .create_2d_drift_profile(&features, &array.view(), &config)
579 .unwrap();
580 assert_eq!(profile.features.len(), 3);
581
582 let feature_queue = PsiFeatureQueue::new(profile);
583
584 assert_eq!(feature_queue.empty_queue.len(), 3);
585
586 let mut batch_features = Vec::new();
587 let non_finite_values = [f64::INFINITY, f64::NEG_INFINITY, f64::NAN];
588
589 for i in 0..9 {
590 let one = Feature::float("feature_1".to_string(), min);
591 let two = Feature::float("feature_2".to_string(), non_finite_values[i % 3]);
593 let three = Feature::float("feature_3".to_string(), max);
594 let features = Features {
595 features: vec![one, two, three],
596 entity_type: EntityType::Feature,
597 };
598 batch_features.push(features);
599 }
600
601 let mut queue = feature_queue.empty_queue.clone();
602 for feature in batch_features {
603 feature_queue.insert(&feature.features, &mut queue).unwrap();
604 }
605
606 assert_eq!(*queue.get("feature_1").unwrap().get(&1).unwrap(), 9);
607 assert!((1..=10).all(|bin| *queue.get("feature_2").unwrap().get(&bin).unwrap() == 0));
608 assert_eq!(*queue.get("feature_3").unwrap().get(&10).unwrap(), 9);
609 }
610}