1use crate::error::FeatureQueueError;
2use crate::queue::traits::FeatureQueue;
3use core::result::Result::Ok;
4use scouter_drift::psi::monitor::PsiMonitor;
5use scouter_types::{
6 psi::{Bin, BinType, PsiDriftProfile},
7 Feature, PsiServerRecord, QueueExt, ServerRecord, ServerRecords,
8};
9use std::collections::HashMap;
10use tracing::{debug, error, instrument};
11
12pub struct PsiFeatureQueue {
13 pub drift_profile: PsiDriftProfile,
14 pub empty_queue: HashMap<String, HashMap<usize, usize>>,
15 pub monitor: PsiMonitor,
16 pub feature_names: Vec<String>,
17}
18
19impl PsiFeatureQueue {
20 #[instrument(skip_all)]
21 fn find_numeric_bin_given_scaler(
22 value: f64,
23 bins: &[Bin],
24 ) -> Result<&usize, FeatureQueueError> {
25 let bin = bins
26 .iter()
27 .find(|bin| value > bin.lower_limit.unwrap() && value <= bin.upper_limit.unwrap())
28 .map(|bin| &bin.id);
29
30 match bin {
31 Some(bin) => Ok(bin),
32 None => {
33 error!("Failed to find bin for value: {}", value);
34 Err(FeatureQueueError::GetBinError)
35 }
36 }
37 }
38
39 #[instrument(skip_all)]
40 fn process_numeric_queue(
41 queue: &mut HashMap<usize, usize>,
42 value: f64,
43 bins: &[Bin],
44 ) -> Result<(), FeatureQueueError> {
45 let bin_id = Self::find_numeric_bin_given_scaler(value, bins)?;
46 let count = queue
47 .get_mut(bin_id)
48 .ok_or(FeatureQueueError::GetBinError)
49 .map_err(|e| {
50 error!("Error processing numeric queue: {:?}", e);
51 e
52 })?;
53 *count += 1;
54
55 Ok(())
56 }
57
58 #[instrument(skip_all)]
59 fn process_categorical_queue(
60 queue: &mut HashMap<usize, usize>,
61 value: &usize,
62 ) -> Result<(), FeatureQueueError> {
63 let count = queue
64 .get_mut(value)
65 .ok_or(FeatureQueueError::GetBinError)
66 .inspect_err(|e| {
67 error!("Error processing categorical queue: {:?}", e);
68 })?;
69 *count += 1;
70 Ok(())
71 }
72
73 pub fn new(drift_profile: PsiDriftProfile) -> Self {
74 let features_to_monitor = drift_profile
75 .config
76 .alert_config
77 .features_to_monitor
78 .clone();
79
80 let empty_queue: HashMap<String, HashMap<usize, usize>> = drift_profile
81 .features
82 .iter()
83 .filter(|(feature_name, _)| features_to_monitor.contains(feature_name))
84 .map(|(feature_name, feature_drift_profile)| {
85 let inner_map: HashMap<usize, usize> = feature_drift_profile
86 .bins
87 .iter()
88 .map(|bin| (bin.id, 0))
89 .collect();
90 (feature_name.clone(), inner_map)
91 })
92 .collect();
93
94 let feature_names = empty_queue.keys().cloned().collect();
95
96 PsiFeatureQueue {
97 drift_profile,
98 empty_queue,
99 monitor: PsiMonitor::new(),
100 feature_names,
101 }
102 }
103
104 #[instrument(skip_all, name = "insert_psi")]
105 pub fn insert(
106 &self,
107 features: &[Feature],
108 queue: &mut HashMap<String, HashMap<usize, usize>>,
109 ) -> Result<(), FeatureQueueError> {
110 let feat_map = &self.drift_profile.config.feature_map;
111 for feature in features.iter() {
112 if let Some(feature_drift_profile) = self.drift_profile.features.get(feature.name()) {
113 let name = feature.name().to_string();
114
115 if !self.feature_names.contains(&name) {
117 error!(
118 "Feature {} not in features to monitor, skipping",
119 feature.name()
120 );
121 continue;
122 }
123
124 let bins = &feature_drift_profile.bins;
125 let queue = queue
126 .get_mut(&name)
127 .ok_or(FeatureQueueError::GetFeatureError)?;
128
129 match feature_drift_profile.bin_type {
130 BinType::Numeric => {
131 let value = feature.to_float(feat_map).map_err(|e| {
132 error!("Error converting feature to float: {:?}", e);
133 FeatureQueueError::InvalidValueError(
134 feature.name().to_string(),
135 e.to_string(),
136 )
137 })?;
138
139 Self::process_numeric_queue(queue, value, bins)?
140 }
141 BinType::Category => {
142 let value = feature.to_usize(feat_map).map_err(|e| {
143 error!("Error converting feature to usize: {:?}", e);
144 FeatureQueueError::InvalidValueError(
145 feature.name().to_string(),
146 e.to_string(),
147 )
148 })?;
149
150 Self::process_categorical_queue(queue, &value)?
151 }
152 }
153 }
154 }
155 Ok(())
156 }
157
158 #[instrument(skip_all)]
159 pub fn create_drift_records(
160 &self,
161 queue: HashMap<String, HashMap<usize, usize>>,
162 ) -> Result<ServerRecords, FeatureQueueError> {
163 let filtered_queue = queue
167 .iter()
168 .filter(|(_, bin_map)| bin_map.iter().any(|(_, count)| *count > 0))
169 .collect::<HashMap<_, _>>();
170
171 debug!("Filtered queue count: {:?}", filtered_queue.len());
172
173 let records = filtered_queue
174 .iter()
175 .flat_map(|(feature_name, bin_map)| {
176 bin_map.iter().map(move |(bin_id, count)| {
177 ServerRecord::Psi(PsiServerRecord::new(
178 self.drift_profile.config.space.clone(),
179 self.drift_profile.config.name.clone(),
180 self.drift_profile.config.version.clone(),
181 feature_name.to_string(),
182 *bin_id,
183 *count,
184 ))
185 })
186 })
187 .collect::<Vec<ServerRecord>>();
188
189 Ok(ServerRecords::new(records))
190 }
191}
192
193impl FeatureQueue for PsiFeatureQueue {
194 fn create_drift_records_from_batch<T: QueueExt>(
195 &self,
196 batch: Vec<T>,
197 ) -> Result<ServerRecords, FeatureQueueError> {
198 let mut queue = self.empty_queue.clone();
200
201 for elem in batch {
202 self.insert(elem.features(), &mut queue)?;
203 }
204
205 self.create_drift_records(queue)
206 }
207}
208
209#[cfg(test)]
210mod tests {
211
212 use super::*;
213 use ndarray::{Array, Axis};
214 use ndarray_rand::rand::distributions::Bernoulli;
215 use ndarray_rand::rand_distr::Uniform;
216 use ndarray_rand::RandomExt;
217 use scouter_drift::utils::CategoricalFeatureHelpers;
218 use scouter_types::psi::PsiAlertConfig;
219 use scouter_types::psi::PsiDriftConfig;
220 use scouter_types::EntityType;
221 use scouter_types::{Features, DEFAULT_VERSION};
222
223 #[test]
224 fn test_feature_queue_insert_numeric() {
225 let min = 1.0;
226 let max = 87.0;
227 let mut array = Array::random((1030, 3), Uniform::new(min, max));
228
229 for col in 0..3 {
231 array[[0, col]] = min;
232 array[[1, col]] = max;
233 }
234
235 let features = vec![
236 "feature_1".to_string(),
237 "feature_2".to_string(),
238 "feature_3".to_string(),
239 ];
240
241 let monitor = PsiMonitor::new();
242
243 let alert_config = PsiAlertConfig {
244 features_to_monitor: features.clone(),
245 ..Default::default()
246 };
247 let config = PsiDriftConfig::new("name", "repo", DEFAULT_VERSION, alert_config, None, None);
248
249 let profile = monitor
250 .create_2d_drift_profile(&features, &array.view(), &config.unwrap())
251 .unwrap();
252 assert_eq!(profile.features.len(), 3);
253
254 let feature_queue = PsiFeatureQueue::new(profile);
255
256 assert_eq!(feature_queue.empty_queue.len(), 3);
257
258 let mut batch_features = Vec::new();
259 for _ in 0..9 {
260 let one = Feature::float("feature_1".to_string(), min);
261 let two = Feature::float("feature_2".to_string(), min);
262 let three = Feature::float("feature_3".to_string(), max);
263
264 let features = Features {
265 features: vec![one, two, three],
266 entity_type: EntityType::Feature,
267 };
268
269 batch_features.push(features);
270 }
271
272 let mut queue = feature_queue.empty_queue.clone();
273 for feature in batch_features {
274 feature_queue.insert(&feature.features, &mut queue).unwrap();
275 }
276
277 assert_eq!(*queue.get("feature_1").unwrap().get(&1).unwrap(), 9);
278 assert_eq!(*queue.get("feature_2").unwrap().get(&1).unwrap(), 9);
279 assert_eq!(*queue.get("feature_3").unwrap().get(&10).unwrap(), 9);
280 }
281
282 #[test]
283 fn test_feature_queue_insert_numeric_categorical() {
284 let numeric_cat_column =
285 Array::random((100, 1), Bernoulli::new(0.5).unwrap())
286 .mapv(|x| if x { 1.0 } else { 0.0 });
287 let uniform_column = Array::random((100, 1), Uniform::new(0.0, 20.0));
288 let array = ndarray::concatenate![Axis(1), numeric_cat_column, uniform_column];
289
290 let features = vec!["feature_1".to_string(), "feature_2".to_string()];
291
292 let monitor = PsiMonitor::new();
293
294 let drift_config = PsiDriftConfig {
295 categorical_features: Some(features.clone()),
296 ..Default::default()
297 };
298
299 let mut profile = monitor
300 .create_2d_drift_profile(&features, &array.view(), &drift_config)
301 .unwrap();
302 profile.config.alert_config.features_to_monitor = features.clone();
303
304 assert_eq!(profile.features.len(), 2);
305
306 let feature_queue = PsiFeatureQueue::new(profile);
307
308 assert_eq!(feature_queue.empty_queue.len(), 2);
309
310 let mut batch_features = Vec::new();
311 for _ in 0..9 {
312 let one = Feature::float("feature_1".to_string(), 0.0);
313 let two = Feature::float("feature_2".to_string(), 1.0);
314
315 let features = Features {
316 features: vec![one, two],
317 entity_type: EntityType::Feature,
318 };
319
320 batch_features.push(features);
321 }
322
323 let mut queue = feature_queue.empty_queue.clone();
324 for feature in batch_features {
325 feature_queue.insert(&feature.features, &mut queue).unwrap();
326 }
327
328 assert_eq!(*queue.get("feature_1").unwrap().get(&0).unwrap(), 9);
329 assert_eq!(*queue.get("feature_2").unwrap().get(&1).unwrap(), 9);
330 }
331
332 #[test]
333 fn test_feature_queue_insert_categorical() {
334 let psi_monitor = PsiMonitor::default();
335 let string_vec = vec![
336 vec![
337 "a".to_string(),
338 "b".to_string(),
339 "c".to_string(),
340 "d".to_string(),
341 "e".to_string(),
342 ],
343 vec![
344 "a".to_string(),
345 "a".to_string(),
346 "a".to_string(),
347 "b".to_string(),
348 "b".to_string(),
349 ],
350 ];
351
352 let string_features = vec!["feature_1".to_string(), "feature_2".to_string()];
353
354 let feature_map = psi_monitor
355 .create_feature_map(&string_features, &string_vec)
356 .unwrap();
357
358 assert_eq!(feature_map.features.len(), 2);
359
360 let mut config = PsiDriftConfig {
361 feature_map: feature_map.clone(),
362 categorical_features: Some(string_features.clone()),
363 ..Default::default()
364 };
365
366 config.alert_config.features_to_monitor =
367 vec!["feature_1".to_string(), "feature_2".to_string()];
368
369 let array = psi_monitor
370 .convert_strings_to_ndarray_f64(&string_features, &string_vec, &feature_map)
371 .unwrap();
372
373 assert_eq!(array.shape(), &[5, 2]);
374
375 let profile = psi_monitor
376 .create_2d_drift_profile(&string_features, &array.view(), &config)
377 .unwrap();
378 assert_eq!(profile.features.len(), 2);
379
380 let feature_queue = PsiFeatureQueue::new(profile);
381
382 assert_eq!(feature_queue.empty_queue.len(), 2);
383
384 let mut batch_features = Vec::new();
385 for _ in 0..9 {
386 let one = Feature::string("feature_1".to_string(), "c".to_string());
387 let two = Feature::string("feature_2".to_string(), "a".to_string());
388
389 let features = Features {
390 features: vec![one, two],
391 entity_type: EntityType::Feature,
392 };
393 batch_features.push(features);
394 }
395
396 let mut queue = feature_queue.empty_queue.clone();
397 for feature in batch_features {
398 feature_queue.insert(&feature.features, &mut queue).unwrap();
399 }
400
401 assert_eq!(*queue.get("feature_1").unwrap().get(&2).unwrap(), 9);
402 assert_eq!(*queue.get("feature_2").unwrap().get(&0).unwrap(), 9);
403 }
404
405 #[test]
406 fn test_feature_queue_is_empty() {
407 let psi_monitor = PsiMonitor::default();
408 let string_vec = vec![
409 vec![
410 "a".to_string(),
411 "b".to_string(),
412 "c".to_string(),
413 "d".to_string(),
414 "e".to_string(),
415 ],
416 vec![
417 "a".to_string(),
418 "a".to_string(),
419 "a".to_string(),
420 "b".to_string(),
421 "b".to_string(),
422 ],
423 ];
424
425 let string_features = vec!["feature_1".to_string(), "feature_2".to_string()];
426
427 let feature_map = psi_monitor
428 .create_feature_map(&string_features, &string_vec)
429 .unwrap();
430
431 assert_eq!(feature_map.features.len(), 2);
432
433 let array = psi_monitor
434 .convert_strings_to_ndarray_f64(&string_features, &string_vec, &feature_map)
435 .unwrap();
436
437 assert_eq!(array.shape(), &[5, 2]);
438
439 let mut config = PsiDriftConfig {
440 feature_map,
441 ..Default::default()
442 };
443
444 config.alert_config.features_to_monitor =
445 vec!["feature_1".to_string(), "feature_2".to_string()];
446
447 let profile = psi_monitor
448 .create_2d_drift_profile(&string_features, &array.view(), &config)
449 .unwrap();
450 assert_eq!(profile.features.len(), 2);
451
452 let feature_queue = PsiFeatureQueue::new(profile);
453
454 assert_eq!(feature_queue.empty_queue.len(), 2);
455
456 let mut batch_features = Vec::new();
457 for _ in 0..9 {
458 let one = Feature::string("feature_1".to_string(), "c".to_string());
459 let two = Feature::string("feature_2".to_string(), "a".to_string());
460
461 let features = Features {
462 features: vec![one, two],
463 entity_type: EntityType::Feature,
464 };
465
466 batch_features.push(features);
467 }
468
469 let mut queue = feature_queue.empty_queue.clone();
470 for feature in batch_features {
471 feature_queue.insert(&feature.features, &mut queue).unwrap();
472 }
473
474 let is_empty = !queue
475 .values()
476 .any(|bin_map| bin_map.values().any(|count| *count > 0));
477
478 assert_eq!(is_empty as u8, 0);
479 }
480
481 #[test]
482 fn test_feature_queue_create_drift_records() {
483 let array = Array::random((1030, 3), Uniform::new(1.0, 100.0));
484 let features = vec![
485 "feature_1".to_string(),
486 "feature_2".to_string(),
487 "feature_3".to_string(),
488 ];
489
490 let monitor = PsiMonitor::new();
491
492 let mut profile = monitor
493 .create_2d_drift_profile(&features, &array.view(), &PsiDriftConfig::default())
494 .unwrap();
495
496 profile.config.alert_config.features_to_monitor = features.clone();
497
498 assert_eq!(profile.features.len(), 3);
499
500 let feature_queue = PsiFeatureQueue::new(profile);
501
502 assert_eq!(feature_queue.empty_queue.len(), 3);
503
504 let mut batch_features = Vec::new();
505 for _ in 0..9 {
506 let one = Feature::float("feature_1".to_string(), 1.0);
507 let two = Feature::float("feature_2".to_string(), 10.0);
508 let three = Feature::float("feature_3".to_string(), 10000.0);
509
510 let features = Features {
511 features: vec![one, two, three],
512 entity_type: EntityType::Feature,
513 };
514
515 batch_features.push(features);
516 }
517
518 let drift_records = feature_queue
519 .create_drift_records_from_batch(batch_features)
520 .unwrap();
521
522 assert_eq!(drift_records.records.len(), 30);
525 }
526}