1use crate::error::FeatureQueueError;
2use crate::queue::traits::FeatureQueue;
3use core::result::Result::Ok;
4use scouter_drift::psi::monitor::PsiMonitor;
5use scouter_types::{
6 psi::{Bin, BinType, PsiDriftProfile},
7 Feature, PsiServerRecord, QueueExt, ServerRecord, ServerRecords,
8};
9use std::collections::HashMap;
10use tracing::{debug, error, instrument};
11
12pub struct PsiFeatureQueue {
13 pub drift_profile: PsiDriftProfile,
14 pub empty_queue: HashMap<String, HashMap<usize, usize>>,
15 pub monitor: PsiMonitor,
16 pub feature_names: Vec<String>,
17}
18
19impl PsiFeatureQueue {
20 #[instrument(skip_all)]
21 fn find_numeric_bin_given_scaler(
22 value: f64,
23 bins: &[Bin],
24 ) -> Result<&usize, FeatureQueueError> {
25 let bin = bins
26 .iter()
27 .find(|bin| value > bin.lower_limit.unwrap() && value <= bin.upper_limit.unwrap())
28 .map(|bin| &bin.id);
29
30 match bin {
31 Some(bin) => Ok(bin),
32 None => {
33 error!("Failed to find bin for value: {}", value);
34 Err(FeatureQueueError::GetBinError)
35 }
36 }
37 }
38
39 #[instrument(skip_all)]
40 fn process_numeric_queue(
41 queue: &mut HashMap<usize, usize>,
42 value: f64,
43 bins: &[Bin],
44 ) -> Result<(), FeatureQueueError> {
45 let bin_id = Self::find_numeric_bin_given_scaler(value, bins)?;
46 let count = queue
47 .get_mut(bin_id)
48 .ok_or(FeatureQueueError::GetBinError)
49 .map_err(|e| {
50 error!("Error processing numeric queue: {:?}", e);
51 e
52 })?;
53 *count += 1;
54
55 Ok(())
56 }
57
58 #[instrument(skip_all)]
59 fn process_categorical_queue(
60 queue: &mut HashMap<usize, usize>,
61 value: &usize,
62 ) -> Result<(), FeatureQueueError> {
63 let count = queue
64 .get_mut(value)
65 .ok_or(FeatureQueueError::GetBinError)
66 .inspect_err(|e| {
67 error!("Error processing categorical queue: {:?}", e);
68 })?;
69 *count += 1;
70 Ok(())
71 }
72
73 pub fn new(drift_profile: PsiDriftProfile) -> Self {
74 let features_to_monitor = drift_profile
75 .config
76 .alert_config
77 .features_to_monitor
78 .clone();
79
80 let empty_queue: HashMap<String, HashMap<usize, usize>> = drift_profile
81 .features
82 .iter()
83 .filter(|(feature_name, _)| features_to_monitor.contains(feature_name))
84 .map(|(feature_name, feature_drift_profile)| {
85 let inner_map: HashMap<usize, usize> = feature_drift_profile
86 .bins
87 .iter()
88 .map(|bin| (bin.id, 0))
89 .collect();
90 (feature_name.clone(), inner_map)
91 })
92 .collect();
93
94 let feature_names = empty_queue.keys().cloned().collect();
95
96 PsiFeatureQueue {
97 drift_profile,
98 empty_queue,
99 monitor: PsiMonitor::new(),
100 feature_names,
101 }
102 }
103
104 #[instrument(skip_all, name = "insert_psi")]
105 pub fn insert(
106 &self,
107 features: &[Feature],
108 queue: &mut HashMap<String, HashMap<usize, usize>>,
109 ) -> Result<(), FeatureQueueError> {
110 let feat_map = &self.drift_profile.config.feature_map;
111 for feature in features.iter() {
112 if let Some(feature_drift_profile) = self.drift_profile.features.get(feature.name()) {
113 let name = feature.name().to_string();
114
115 if !self.feature_names.contains(&name) {
117 continue;
118 }
119
120 let bins = &feature_drift_profile.bins;
121 let queue = queue
122 .get_mut(&name)
123 .ok_or(FeatureQueueError::GetFeatureError)?;
124
125 match feature_drift_profile.bin_type {
126 BinType::Numeric => {
127 let value = feature.to_float(feat_map).map_err(|e| {
128 error!("Error converting feature to float: {:?}", e);
129 FeatureQueueError::InvalidValueError(
130 feature.name().to_string(),
131 e.to_string(),
132 )
133 })?;
134
135 Self::process_numeric_queue(queue, value, bins)?
136 }
137 BinType::Category => {
138 let value = feature.to_usize(feat_map).map_err(|e| {
139 error!("Error converting feature to usize: {:?}", e);
140 FeatureQueueError::InvalidValueError(
141 feature.name().to_string(),
142 e.to_string(),
143 )
144 })?;
145
146 Self::process_categorical_queue(queue, &value)?
147 }
148 }
149 }
150 }
151 Ok(())
152 }
153
154 #[instrument(skip_all)]
155 pub fn create_drift_records(
156 &self,
157 queue: HashMap<String, HashMap<usize, usize>>,
158 ) -> Result<ServerRecords, FeatureQueueError> {
159 let filtered_queue = queue
163 .iter()
164 .filter(|(_, bin_map)| bin_map.iter().any(|(_, count)| *count > 0))
165 .collect::<HashMap<_, _>>();
166
167 debug!("Filtered queue count: {:?}", filtered_queue.len());
168
169 let records = filtered_queue
170 .iter()
171 .flat_map(|(feature_name, bin_map)| {
172 bin_map.iter().map(move |(bin_id, count)| {
173 ServerRecord::Psi(PsiServerRecord::new(
174 self.drift_profile.config.space.clone(),
175 self.drift_profile.config.name.clone(),
176 self.drift_profile.config.version.clone(),
177 feature_name.to_string(),
178 *bin_id,
179 *count,
180 ))
181 })
182 })
183 .collect::<Vec<ServerRecord>>();
184
185 Ok(ServerRecords::new(records))
186 }
187}
188
189impl FeatureQueue for PsiFeatureQueue {
190 fn create_drift_records_from_batch<T: QueueExt>(
191 &self,
192 batch: Vec<T>,
193 ) -> Result<ServerRecords, FeatureQueueError> {
194 let mut queue = self.empty_queue.clone();
196
197 for elem in batch {
198 self.insert(elem.features(), &mut queue)?;
199 }
200
201 self.create_drift_records(queue)
202 }
203}
204
205#[cfg(test)]
206mod tests {
207
208 use super::*;
209 use ndarray::{Array, Axis};
210 use ndarray_rand::rand::distributions::Bernoulli;
211 use ndarray_rand::rand_distr::Uniform;
212 use ndarray_rand::RandomExt;
213 use scouter_drift::utils::CategoricalFeatureHelpers;
214 use scouter_types::psi::PsiAlertConfig;
215 use scouter_types::psi::PsiDriftConfig;
216 use scouter_types::EntityType;
217 use scouter_types::{Features, DEFAULT_VERSION};
218
219 #[test]
220 fn test_feature_queue_insert_numeric() {
221 let min = 1.0;
222 let max = 87.0;
223 let mut array = Array::random((1030, 3), Uniform::new(min, max));
224
225 for col in 0..3 {
227 array[[0, col]] = min;
228 array[[1, col]] = max;
229 }
230
231 let features = vec![
232 "feature_1".to_string(),
233 "feature_2".to_string(),
234 "feature_3".to_string(),
235 ];
236
237 let monitor = PsiMonitor::new();
238
239 let alert_config = PsiAlertConfig {
240 features_to_monitor: features.clone(),
241 ..Default::default()
242 };
243 let config = PsiDriftConfig::new("name", "repo", DEFAULT_VERSION, alert_config, None, None);
244
245 let profile = monitor
246 .create_2d_drift_profile(&features, &array.view(), &config.unwrap())
247 .unwrap();
248 assert_eq!(profile.features.len(), 3);
249
250 let feature_queue = PsiFeatureQueue::new(profile);
251
252 assert_eq!(feature_queue.empty_queue.len(), 3);
253
254 let mut batch_features = Vec::new();
255 for _ in 0..9 {
256 let one = Feature::float("feature_1".to_string(), min);
257 let two = Feature::float("feature_2".to_string(), min);
258 let three = Feature::float("feature_3".to_string(), max);
259
260 let features = Features {
261 features: vec![one, two, three],
262 entity_type: EntityType::Feature,
263 };
264
265 batch_features.push(features);
266 }
267
268 let mut queue = feature_queue.empty_queue.clone();
269 for feature in batch_features {
270 feature_queue.insert(&feature.features, &mut queue).unwrap();
271 }
272
273 assert_eq!(*queue.get("feature_1").unwrap().get(&1).unwrap(), 9);
274 assert_eq!(*queue.get("feature_2").unwrap().get(&1).unwrap(), 9);
275 assert_eq!(*queue.get("feature_3").unwrap().get(&10).unwrap(), 9);
276 }
277
278 #[test]
279 fn test_feature_queue_insert_numeric_categorical() {
280 let numeric_cat_column =
281 Array::random((100, 1), Bernoulli::new(0.5).unwrap())
282 .mapv(|x| if x { 1.0 } else { 0.0 });
283 let uniform_column = Array::random((100, 1), Uniform::new(0.0, 20.0));
284 let array = ndarray::concatenate![Axis(1), numeric_cat_column, uniform_column];
285
286 let features = vec!["feature_1".to_string(), "feature_2".to_string()];
287
288 let monitor = PsiMonitor::new();
289
290 let drift_config = PsiDriftConfig {
291 categorical_features: Some(features.clone()),
292 ..Default::default()
293 };
294
295 let mut profile = monitor
296 .create_2d_drift_profile(&features, &array.view(), &drift_config)
297 .unwrap();
298 profile.config.alert_config.features_to_monitor = features.clone();
299
300 assert_eq!(profile.features.len(), 2);
301
302 let feature_queue = PsiFeatureQueue::new(profile);
303
304 assert_eq!(feature_queue.empty_queue.len(), 2);
305
306 let mut batch_features = Vec::new();
307 for _ in 0..9 {
308 let one = Feature::float("feature_1".to_string(), 0.0);
309 let two = Feature::float("feature_2".to_string(), 1.0);
310
311 let features = Features {
312 features: vec![one, two],
313 entity_type: EntityType::Feature,
314 };
315
316 batch_features.push(features);
317 }
318
319 let mut queue = feature_queue.empty_queue.clone();
320 for feature in batch_features {
321 feature_queue.insert(&feature.features, &mut queue).unwrap();
322 }
323
324 assert_eq!(*queue.get("feature_1").unwrap().get(&0).unwrap(), 9);
325 assert_eq!(*queue.get("feature_2").unwrap().get(&1).unwrap(), 9);
326 }
327
328 #[test]
329 fn test_feature_queue_insert_categorical() {
330 let psi_monitor = PsiMonitor::default();
331 let string_vec = vec![
332 vec![
333 "a".to_string(),
334 "b".to_string(),
335 "c".to_string(),
336 "d".to_string(),
337 "e".to_string(),
338 ],
339 vec![
340 "a".to_string(),
341 "a".to_string(),
342 "a".to_string(),
343 "b".to_string(),
344 "b".to_string(),
345 ],
346 ];
347
348 let string_features = vec!["feature_1".to_string(), "feature_2".to_string()];
349
350 let feature_map = psi_monitor
351 .create_feature_map(&string_features, &string_vec)
352 .unwrap();
353
354 assert_eq!(feature_map.features.len(), 2);
355
356 let mut config = PsiDriftConfig {
357 feature_map: feature_map.clone(),
358 categorical_features: Some(string_features.clone()),
359 ..Default::default()
360 };
361
362 config.alert_config.features_to_monitor =
363 vec!["feature_1".to_string(), "feature_2".to_string()];
364
365 let array = psi_monitor
366 .convert_strings_to_ndarray_f64(&string_features, &string_vec, &feature_map)
367 .unwrap();
368
369 assert_eq!(array.shape(), &[5, 2]);
370
371 let profile = psi_monitor
372 .create_2d_drift_profile(&string_features, &array.view(), &config)
373 .unwrap();
374 assert_eq!(profile.features.len(), 2);
375
376 let feature_queue = PsiFeatureQueue::new(profile);
377
378 assert_eq!(feature_queue.empty_queue.len(), 2);
379
380 let mut batch_features = Vec::new();
381 for _ in 0..9 {
382 let one = Feature::string("feature_1".to_string(), "c".to_string());
383 let two = Feature::string("feature_2".to_string(), "a".to_string());
384
385 let features = Features {
386 features: vec![one, two],
387 entity_type: EntityType::Feature,
388 };
389 batch_features.push(features);
390 }
391
392 let mut queue = feature_queue.empty_queue.clone();
393 for feature in batch_features {
394 feature_queue.insert(&feature.features, &mut queue).unwrap();
395 }
396
397 assert_eq!(*queue.get("feature_1").unwrap().get(&2).unwrap(), 9);
398 assert_eq!(*queue.get("feature_2").unwrap().get(&0).unwrap(), 9);
399 }
400
401 #[test]
402 fn test_feature_queue_is_empty() {
403 let psi_monitor = PsiMonitor::default();
404 let string_vec = vec![
405 vec![
406 "a".to_string(),
407 "b".to_string(),
408 "c".to_string(),
409 "d".to_string(),
410 "e".to_string(),
411 ],
412 vec![
413 "a".to_string(),
414 "a".to_string(),
415 "a".to_string(),
416 "b".to_string(),
417 "b".to_string(),
418 ],
419 ];
420
421 let string_features = vec!["feature_1".to_string(), "feature_2".to_string()];
422
423 let feature_map = psi_monitor
424 .create_feature_map(&string_features, &string_vec)
425 .unwrap();
426
427 assert_eq!(feature_map.features.len(), 2);
428
429 let array = psi_monitor
430 .convert_strings_to_ndarray_f64(&string_features, &string_vec, &feature_map)
431 .unwrap();
432
433 assert_eq!(array.shape(), &[5, 2]);
434
435 let mut config = PsiDriftConfig {
436 feature_map,
437 ..Default::default()
438 };
439
440 config.alert_config.features_to_monitor =
441 vec!["feature_1".to_string(), "feature_2".to_string()];
442
443 let profile = psi_monitor
444 .create_2d_drift_profile(&string_features, &array.view(), &config)
445 .unwrap();
446 assert_eq!(profile.features.len(), 2);
447
448 let feature_queue = PsiFeatureQueue::new(profile);
449
450 assert_eq!(feature_queue.empty_queue.len(), 2);
451
452 let mut batch_features = Vec::new();
453 for _ in 0..9 {
454 let one = Feature::string("feature_1".to_string(), "c".to_string());
455 let two = Feature::string("feature_2".to_string(), "a".to_string());
456
457 let features = Features {
458 features: vec![one, two],
459 entity_type: EntityType::Feature,
460 };
461
462 batch_features.push(features);
463 }
464
465 let mut queue = feature_queue.empty_queue.clone();
466 for feature in batch_features {
467 feature_queue.insert(&feature.features, &mut queue).unwrap();
468 }
469
470 let is_empty = !queue
471 .values()
472 .any(|bin_map| bin_map.values().any(|count| *count > 0));
473
474 assert_eq!(is_empty as u8, 0);
475 }
476
477 #[test]
478 fn test_feature_queue_create_drift_records() {
479 let array = Array::random((1030, 3), Uniform::new(1.0, 100.0));
480 let features = vec![
481 "feature_1".to_string(),
482 "feature_2".to_string(),
483 "feature_3".to_string(),
484 ];
485
486 let monitor = PsiMonitor::new();
487
488 let mut profile = monitor
489 .create_2d_drift_profile(&features, &array.view(), &PsiDriftConfig::default())
490 .unwrap();
491
492 profile.config.alert_config.features_to_monitor = features.clone();
493
494 assert_eq!(profile.features.len(), 3);
495
496 let feature_queue = PsiFeatureQueue::new(profile);
497
498 assert_eq!(feature_queue.empty_queue.len(), 3);
499
500 let mut batch_features = Vec::new();
501 for _ in 0..9 {
502 let one = Feature::float("feature_1".to_string(), 1.0);
503 let two = Feature::float("feature_2".to_string(), 10.0);
504 let three = Feature::float("feature_3".to_string(), 10000.0);
505
506 let features = Features {
507 features: vec![one, two, three],
508 entity_type: EntityType::Feature,
509 };
510
511 batch_features.push(features);
512 }
513
514 let drift_records = feature_queue
515 .create_drift_records_from_batch(batch_features)
516 .unwrap();
517
518 assert_eq!(drift_records.records.len(), 30);
521 }
522}