scouter_events/queue/spc/
feature_queue.rs

1use crate::error::FeatureQueueError;
2use crate::queue::traits::FeatureQueue;
3use core::result::Result::Ok;
4use ndarray::prelude::*;
5use ndarray::Array2;
6use scouter_drift::spc::monitor::SpcMonitor;
7use scouter_types::spc::SpcDriftProfile;
8use scouter_types::QueueExt;
9use scouter_types::{Feature, MessageRecord, ServerRecords};
10use std::collections::HashMap;
11use tracing::instrument;
12use tracing::{debug, error};
13
14pub struct SpcFeatureQueue {
15    pub drift_profile: SpcDriftProfile,
16    pub empty_queue: HashMap<String, Vec<f64>>,
17    pub monitor: SpcMonitor,
18    pub feature_names: Vec<String>,
19}
20
21impl SpcFeatureQueue {
22    #[instrument(skip(drift_profile))]
23    pub fn new(drift_profile: SpcDriftProfile) -> Self {
24        let empty_queue: HashMap<String, Vec<f64>> = drift_profile
25            .config
26            .alert_config
27            .features_to_monitor
28            .iter()
29            .map(|feature| (feature.clone(), Vec::new()))
30            .collect();
31
32        let feature_names = empty_queue.keys().cloned().collect();
33
34        SpcFeatureQueue {
35            drift_profile,
36            empty_queue,
37            monitor: SpcMonitor::new(),
38            feature_names,
39        }
40    }
41
42    #[instrument(skip(self, features), name = "insert_spc")]
43    pub fn insert(
44        &self,
45        features: &[Feature],
46        queue: &mut HashMap<String, Vec<f64>>,
47    ) -> Result<(), FeatureQueueError> {
48        let feat_map = &self.drift_profile.config.feature_map;
49
50        debug!("Inserting features into queue");
51        features.iter().for_each(|feature| {
52            let name = feature.name().to_string();
53
54            if self.feature_names.contains(&name) {
55                if let Some(queue) = queue.get_mut(&name) {
56                    if let Ok(value) = feature.to_float(feat_map) {
57                        queue.push(value);
58                    }
59                }
60            } else {
61                error!("Feature {} not found in drift profile", name);
62            }
63        });
64
65        Ok(())
66    }
67
68    // Create drift records from queue items
69    //
70    // returns: DriftServerRecords
71    #[instrument(skip(self), name = "Create Server Records")]
72    pub fn create_drift_records(
73        &self,
74        queue: HashMap<String, Vec<f64>>,
75    ) -> Result<ServerRecords, FeatureQueueError> {
76        // filter out empty queues
77        let (arrays, feature_names): (Vec<_>, Vec<_>) = queue
78            .iter()
79            .filter(|(_, values)| !values.is_empty())
80            .map(|(feature, values)| {
81                (
82                    Array2::from_shape_vec((values.len(), 1), values.clone()).unwrap(),
83                    feature.clone(),
84                )
85            })
86            .unzip();
87        let n = arrays[0].dim().0;
88        if arrays.iter().any(|array| array.dim().0 != n) {
89            error!("Shape mismatch");
90            return Err(FeatureQueueError::DriftRecordError(
91                "Shape mismatch".to_string(),
92            ));
93        }
94
95        let concatenated = ndarray::concatenate(
96            Axis(1),
97            &arrays.iter().map(|a| a.view()).collect::<Vec<_>>(),
98        )
99        .map_err(|e| {
100            error!("Failed to concatenate arrays: {:?}", e);
101            FeatureQueueError::DriftRecordError(format!("Failed to concatenate arrays: {e:?}"))
102        })?;
103
104        let records = self
105            .monitor
106            .sample_data(&feature_names, &concatenated.view(), &self.drift_profile)
107            .map_err(|e| {
108                error!("Failed to create drift record: {:?}", e);
109                FeatureQueueError::DriftRecordError(format!("Failed to create drift record: {e:?}"))
110            })?;
111
112        Ok(records)
113    }
114}
115
116impl FeatureQueue for SpcFeatureQueue {
117    fn create_drift_records_from_batch<T: QueueExt>(
118        &self,
119        batch: Vec<T>,
120    ) -> Result<MessageRecord, FeatureQueueError> {
121        // clones the empty map (so we don't need to recreate it on each call)
122        let mut queue = self.empty_queue.clone();
123
124        for elem in batch {
125            self.insert(elem.features(), &mut queue)?;
126        }
127
128        Ok(MessageRecord::ServerRecords(
129            self.create_drift_records(queue)?,
130        ))
131    }
132}
133
134#[cfg(test)]
135mod tests {
136
137    use scouter_types::spc::{SpcAlertConfig, SpcDriftConfig};
138    use scouter_types::Features;
139
140    use super::*;
141    use ndarray::Array;
142    use ndarray_rand::rand_distr::Uniform;
143    use ndarray_rand::RandomExt;
144
145    #[test]
146    fn test_feature_queue_new() {
147        let array = Array::random((1030, 3), Uniform::new(0., 10.));
148
149        let features = vec![
150            "feature_1".to_string(),
151            "feature_2".to_string(),
152            "feature_3".to_string(),
153        ];
154
155        let monitor = SpcMonitor::new();
156        let alert_config = SpcAlertConfig {
157            features_to_monitor: features.clone(),
158            ..Default::default()
159        };
160        let config = SpcDriftConfig::new(
161            Some("name".to_string()),
162            Some("repo".to_string()),
163            None,
164            None,
165            None,
166            Some(alert_config),
167            None,
168        );
169
170        let profile = monitor
171            .create_2d_drift_profile(&features, &array.view(), &config.unwrap())
172            .unwrap();
173        assert_eq!(profile.features.len(), 3);
174
175        let feature_queue = SpcFeatureQueue::new(profile);
176
177        assert_eq!(feature_queue.empty_queue.len(), 3);
178        let mut batch_features = Vec::new();
179
180        for _ in 0..9 {
181            let one = Feature::int("feature_1".to_string(), 1);
182            let two = Feature::int("feature_2".to_string(), 2);
183            let three = Feature::int("feature_3".to_string(), 3);
184
185            let features = Features {
186                features: vec![one, two, three],
187                entity_type: scouter_types::EntityType::Feature,
188            };
189
190            batch_features.push(features);
191        }
192
193        let mut queue = feature_queue.empty_queue.clone();
194        for feature in batch_features.clone() {
195            feature_queue.insert(&feature.features, &mut queue).unwrap();
196        }
197
198        assert_eq!(queue.get("feature_1").unwrap().len(), 9);
199        assert_eq!(queue.get("feature_2").unwrap().len(), 9);
200        assert_eq!(queue.get("feature_3").unwrap().len(), 9);
201
202        let records = feature_queue
203            .create_drift_records_from_batch(batch_features)
204            .unwrap();
205
206        assert_eq!(records.len(), 3);
207
208        // serialize records
209        let json_records = records.model_dump_json();
210        assert!(!json_records.is_empty());
211
212        // deserialize records
213        let records: ServerRecords = serde_json::from_str(&json_records).unwrap();
214        assert_eq!(records.records.len(), 3);
215
216        // convert to bytes and back
217        let bytes = json_records.as_bytes();
218
219        let records = ServerRecords::load_from_bytes(bytes).unwrap();
220        assert_eq!(records.records.len(), 3);
221    }
222}