scouter_events/queue/custom/
feature_queue.rs

1use crate::error::FeatureQueueError;
2use crate::queue::traits::FeatureQueue;
3use core::result::Result::Ok;
4use scouter_types::MessageRecord;
5use scouter_types::Metric;
6use scouter_types::QueueExt;
7use scouter_types::{
8    custom::CustomDriftProfile, CustomMetricServerRecord, ServerRecord, ServerRecords,
9};
10use std::collections::HashMap;
11use tracing::{error, instrument};
12pub struct CustomMetricFeatureQueue {
13    drift_profile: CustomDriftProfile,
14    empty_queue: HashMap<String, Vec<f64>>,
15    metric_names: Vec<String>,
16}
17
18impl CustomMetricFeatureQueue {
19    pub fn new(drift_profile: CustomDriftProfile) -> Self {
20        let empty_queue: HashMap<String, Vec<f64>> = drift_profile
21            .metrics
22            .keys()
23            .map(|metric| (metric.clone(), Vec::new()))
24            .collect();
25
26        let metric_names = empty_queue.keys().cloned().collect();
27
28        CustomMetricFeatureQueue {
29            drift_profile,
30            empty_queue,
31            metric_names,
32        }
33    }
34
35    /// Insert metrics into the feature queue
36    ///
37    /// # Arguments
38    ///
39    /// * `metrics` - A vector of metrics to insert into the feature queue
40    ///
41    /// # Returns
42    ///
43    /// * `Result<(), FeatureQueueError>` - A result indicating success or failure
44    #[instrument(skip_all, name = "insert_custom")]
45    fn insert(
46        &self,
47        metrics: &Vec<Metric>,
48        queue: &mut HashMap<String, Vec<f64>>,
49    ) -> Result<(), FeatureQueueError> {
50        for metric in metrics {
51            if !self.metric_names.contains(&metric.name) {
52                error!("Custom metric {} not found in drift profile", metric.name);
53                continue;
54            }
55            if let Some(queue) = queue.get_mut(&metric.name) {
56                queue.push(metric.value);
57            }
58        }
59        Ok(())
60    }
61
62    fn create_drift_records(
63        &self,
64        queue: HashMap<String, Vec<f64>>,
65    ) -> Result<ServerRecords, FeatureQueueError> {
66        let averages = queue
67            .iter()
68            // filter out empty values
69            .filter(|(_, values)| !values.is_empty())
70            .map(|(key, values)| {
71                let avg = values.iter().sum::<f64>() / values.len() as f64;
72                ServerRecord::Custom(CustomMetricServerRecord::new(
73                    self.drift_profile.config.space.clone(),
74                    self.drift_profile.config.name.clone(),
75                    self.drift_profile.config.version.clone(),
76                    key.clone(),
77                    avg,
78                ))
79            })
80            .collect::<Vec<ServerRecord>>();
81
82        Ok(ServerRecords::new(averages))
83    }
84}
85
86impl FeatureQueue for CustomMetricFeatureQueue {
87    fn create_drift_records_from_batch<T: QueueExt>(
88        &self,
89        batch: Vec<T>,
90    ) -> Result<MessageRecord, FeatureQueueError> {
91        // clones the empty map (so we don't need to recreate it on each call)
92        let mut queue = self.empty_queue.clone();
93
94        for elem in batch {
95            self.insert(elem.metrics(), &mut queue)?;
96        }
97
98        Ok(MessageRecord::ServerRecords(
99            self.create_drift_records(queue)?,
100        ))
101    }
102}
103
104#[cfg(test)]
105mod tests {
106
107    use super::*;
108    use scouter_types::custom::{CustomMetric, CustomMetricAlertConfig, CustomMetricDriftConfig};
109    use scouter_types::{AlertThreshold, EntityType};
110    use scouter_types::{Metric, Metrics};
111
112    #[test]
113    fn test_feature_queue_custom_insert_metric() {
114        let metric1 = CustomMetric::new("mae", 10.0, AlertThreshold::Above, None).unwrap();
115
116        let metric2 = CustomMetric::new("mape", 10.0, AlertThreshold::Above, None).unwrap();
117
118        let metric3 = CustomMetric::new("empty", 10.0, AlertThreshold::Above, None).unwrap();
119
120        let custom_config = CustomMetricDriftConfig::new(
121            "test",
122            "test",
123            "0.1.0",
124            25,
125            CustomMetricAlertConfig::default(),
126            None,
127        )
128        .unwrap();
129        let profile =
130            CustomDriftProfile::new(custom_config, vec![metric1, metric2, metric3]).unwrap();
131        let feature_queue = CustomMetricFeatureQueue::new(profile);
132
133        assert_eq!(feature_queue.empty_queue.len(), 3);
134
135        let mut metric_batch = Vec::new();
136        for i in 0..25 {
137            let one = Metric::new_rs("mae".to_string(), i as f64);
138            let two = Metric::new_rs("mape".to_string(), i as f64);
139
140            let metrics = Metrics {
141                metrics: vec![one, two],
142                entity_type: EntityType::Metric,
143            };
144
145            metric_batch.push(metrics);
146        }
147
148        let records = feature_queue
149            .create_drift_records_from_batch(metric_batch)
150            .unwrap();
151
152        // empty should be excluded
153        assert_eq!(records.len(), 2);
154
155        // check average of mae
156        for record in records.iter_server_records().unwrap() {
157            if let ServerRecord::Custom(custom_record) = record {
158                assert!(custom_record.metric.contains("ma"));
159                assert_eq!(custom_record.value, 12.0);
160            }
161        }
162    }
163}