scouter_events/queue/spc/
feature_queue.rs1use crate::error::FeatureQueueError;
2use crate::queue::traits::FeatureQueue;
3use core::result::Result::Ok;
4use ndarray::prelude::*;
5use ndarray::Array2;
6use scouter_drift::spc::monitor::SpcMonitor;
7use scouter_types::spc::SpcDriftProfile;
8use scouter_types::QueueExt;
9use scouter_types::{Feature, MessageRecord, ServerRecords};
10use std::collections::HashMap;
11use tracing::instrument;
12use tracing::{debug, error};
13
14pub struct SpcFeatureQueue {
15 pub drift_profile: SpcDriftProfile,
16 pub empty_queue: HashMap<String, Vec<f64>>,
17 pub monitor: SpcMonitor,
18 pub feature_names: Vec<String>,
19}
20
21impl SpcFeatureQueue {
22 #[instrument(skip(drift_profile))]
23 pub fn new(drift_profile: SpcDriftProfile) -> Self {
24 let empty_queue: HashMap<String, Vec<f64>> = drift_profile
25 .config
26 .alert_config
27 .features_to_monitor
28 .iter()
29 .map(|feature| (feature.clone(), Vec::new()))
30 .collect();
31
32 let feature_names = empty_queue.keys().cloned().collect();
33
34 SpcFeatureQueue {
35 drift_profile,
36 empty_queue,
37 monitor: SpcMonitor::new(),
38 feature_names,
39 }
40 }
41
42 #[instrument(skip(self, features), name = "insert_spc")]
43 pub fn insert(
44 &self,
45 features: &[Feature],
46 queue: &mut HashMap<String, Vec<f64>>,
47 ) -> Result<(), FeatureQueueError> {
48 let feat_map = &self.drift_profile.config.feature_map;
49
50 debug!("Inserting features into queue");
51 features.iter().for_each(|feature| {
52 let name = feature.name().to_string();
53
54 if self.feature_names.contains(&name) {
55 if let Some(queue) = queue.get_mut(&name) {
56 if let Ok(value) = feature.to_float(feat_map) {
57 queue.push(value);
58 }
59 }
60 } else {
61 error!("Feature {} not found in drift profile", name);
62 }
63 });
64
65 Ok(())
66 }
67
68 #[instrument(skip(self), name = "Create Server Records")]
72 pub fn create_drift_records(
73 &self,
74 queue: HashMap<String, Vec<f64>>,
75 ) -> Result<ServerRecords, FeatureQueueError> {
76 let (arrays, feature_names): (Vec<_>, Vec<_>) = queue
78 .iter()
79 .filter(|(_, values)| !values.is_empty())
80 .map(|(feature, values)| {
81 (
82 Array2::from_shape_vec((values.len(), 1), values.clone()).unwrap(),
83 feature.clone(),
84 )
85 })
86 .unzip();
87 let n = arrays[0].dim().0;
88 if arrays.iter().any(|array| array.dim().0 != n) {
89 error!("Shape mismatch");
90 return Err(FeatureQueueError::DriftRecordError(
91 "Shape mismatch".to_string(),
92 ));
93 }
94
95 let concatenated = ndarray::concatenate(
96 Axis(1),
97 &arrays.iter().map(|a| a.view()).collect::<Vec<_>>(),
98 )
99 .map_err(|e| {
100 error!("Failed to concatenate arrays: {:?}", e);
101 FeatureQueueError::DriftRecordError(format!("Failed to concatenate arrays: {e:?}"))
102 })?;
103
104 let records = self
105 .monitor
106 .sample_data(&feature_names, &concatenated.view(), &self.drift_profile)
107 .map_err(|e| {
108 error!("Failed to create drift record: {:?}", e);
109 FeatureQueueError::DriftRecordError(format!("Failed to create drift record: {e:?}"))
110 })?;
111
112 Ok(records)
113 }
114}
115
116impl FeatureQueue for SpcFeatureQueue {
117 fn create_drift_records_from_batch<T: QueueExt>(
118 &self,
119 batch: Vec<T>,
120 ) -> Result<MessageRecord, FeatureQueueError> {
121 let mut queue = self.empty_queue.clone();
123
124 for elem in batch {
125 self.insert(elem.features(), &mut queue)?;
126 }
127
128 Ok(MessageRecord::ServerRecords(
129 self.create_drift_records(queue)?,
130 ))
131 }
132}
133
134#[cfg(test)]
135mod tests {
136
137 use scouter_types::spc::{SpcAlertConfig, SpcDriftConfig};
138 use scouter_types::Features;
139
140 use super::*;
141 use ndarray::Array;
142 use ndarray_rand::rand_distr::Uniform;
143 use ndarray_rand::RandomExt;
144
145 #[test]
146 fn test_feature_queue_new() {
147 let array = Array::random((1030, 3), Uniform::new(0., 10.));
148
149 let features = vec![
150 "feature_1".to_string(),
151 "feature_2".to_string(),
152 "feature_3".to_string(),
153 ];
154
155 let monitor = SpcMonitor::new();
156 let alert_config = SpcAlertConfig {
157 features_to_monitor: features.clone(),
158 ..Default::default()
159 };
160 let config = SpcDriftConfig::new(
161 Some("name".to_string()),
162 Some("repo".to_string()),
163 None,
164 None,
165 None,
166 Some(alert_config),
167 None,
168 );
169
170 let profile = monitor
171 .create_2d_drift_profile(&features, &array.view(), &config.unwrap())
172 .unwrap();
173 assert_eq!(profile.features.len(), 3);
174
175 let feature_queue = SpcFeatureQueue::new(profile);
176
177 assert_eq!(feature_queue.empty_queue.len(), 3);
178 let mut batch_features = Vec::new();
179
180 for _ in 0..9 {
181 let one = Feature::int("feature_1".to_string(), 1);
182 let two = Feature::int("feature_2".to_string(), 2);
183 let three = Feature::int("feature_3".to_string(), 3);
184
185 let features = Features {
186 features: vec![one, two, three],
187 entity_type: scouter_types::EntityType::Feature,
188 };
189
190 batch_features.push(features);
191 }
192
193 let mut queue = feature_queue.empty_queue.clone();
194 for feature in batch_features.clone() {
195 feature_queue.insert(&feature.features, &mut queue).unwrap();
196 }
197
198 assert_eq!(queue.get("feature_1").unwrap().len(), 9);
199 assert_eq!(queue.get("feature_2").unwrap().len(), 9);
200 assert_eq!(queue.get("feature_3").unwrap().len(), 9);
201
202 let records = feature_queue
203 .create_drift_records_from_batch(batch_features)
204 .unwrap();
205
206 assert_eq!(records.len(), 3);
207
208 let json_records = records.model_dump_json();
210 assert!(!json_records.is_empty());
211
212 let records: ServerRecords = serde_json::from_str(&json_records).unwrap();
214 assert_eq!(records.records.len(), 3);
215
216 let bytes = json_records.as_bytes();
218
219 let records = ServerRecords::load_from_bytes(bytes).unwrap();
220 assert_eq!(records.records.len(), 3);
221 }
222}