scouter_events/queue/custom/
feature_queue.rs

1use crate::error::FeatureQueueError;
2use crate::queue::traits::FeatureQueue;
3use core::result::Result::Ok;
4use scouter_types::Metric;
5use scouter_types::QueueExt;
6use scouter_types::{
7    custom::CustomDriftProfile, CustomMetricServerRecord, ServerRecord, ServerRecords,
8};
9use std::collections::HashMap;
10use tracing::{error, instrument};
11pub struct CustomMetricFeatureQueue {
12    drift_profile: CustomDriftProfile,
13    empty_queue: HashMap<String, Vec<f64>>,
14    metric_names: Vec<String>,
15}
16
17impl CustomMetricFeatureQueue {
18    pub fn new(drift_profile: CustomDriftProfile) -> Self {
19        let empty_queue: HashMap<String, Vec<f64>> = drift_profile
20            .metrics
21            .keys()
22            .map(|metric| (metric.clone(), Vec::new()))
23            .collect();
24
25        let metric_names = empty_queue.keys().cloned().collect();
26
27        CustomMetricFeatureQueue {
28            drift_profile,
29            empty_queue,
30            metric_names,
31        }
32    }
33
34    /// Insert metrics into the feature queue
35    ///
36    /// # Arguments
37    ///
38    /// * `metrics` - A vector of metrics to insert into the feature queue
39    ///
40    /// # Returns
41    ///
42    /// * `Result<(), FeatureQueueError>` - A result indicating success or failure
43    #[instrument(skip_all, name = "insert_custom")]
44    fn insert(
45        &self,
46        metrics: &Vec<Metric>,
47        queue: &mut HashMap<String, Vec<f64>>,
48    ) -> Result<(), FeatureQueueError> {
49        for metric in metrics {
50            if !self.metric_names.contains(&metric.name) {
51                error!("Custom metric {} not found in drift profile", metric.name);
52                continue;
53            }
54            if let Some(queue) = queue.get_mut(&metric.name) {
55                queue.push(metric.value);
56            }
57        }
58        Ok(())
59    }
60
61    fn create_drift_records(
62        &self,
63        queue: HashMap<String, Vec<f64>>,
64    ) -> Result<ServerRecords, FeatureQueueError> {
65        let averages = queue
66            .iter()
67            // filter out empty values
68            .filter(|(_, values)| !values.is_empty())
69            .map(|(key, values)| {
70                let avg = values.iter().sum::<f64>() / values.len() as f64;
71                ServerRecord::Custom(CustomMetricServerRecord::new(
72                    self.drift_profile.config.space.clone(),
73                    self.drift_profile.config.name.clone(),
74                    self.drift_profile.config.version.clone(),
75                    key.clone(),
76                    avg,
77                ))
78            })
79            .collect::<Vec<ServerRecord>>();
80
81        Ok(ServerRecords::new(averages))
82    }
83}
84
85impl FeatureQueue for CustomMetricFeatureQueue {
86    fn create_drift_records_from_batch<T: QueueExt>(
87        &self,
88        batch: Vec<T>,
89    ) -> Result<ServerRecords, FeatureQueueError> {
90        // clones the empty map (so we don't need to recreate it on each call)
91        let mut queue = self.empty_queue.clone();
92
93        for elem in batch {
94            self.insert(elem.metrics(), &mut queue)?;
95        }
96
97        self.create_drift_records(queue)
98    }
99}
100
101#[cfg(test)]
102mod tests {
103
104    use super::*;
105    use scouter_types::custom::{
106        AlertThreshold, CustomMetric, CustomMetricAlertConfig, CustomMetricDriftConfig,
107    };
108    use scouter_types::EntityType;
109    use scouter_types::{Metric, Metrics};
110
111    #[test]
112    fn test_feature_queue_custom_insert_metric() {
113        let metric1 = CustomMetric::new("mae", 10.0, AlertThreshold::Above, None).unwrap();
114
115        let metric2 = CustomMetric::new("mape", 10.0, AlertThreshold::Above, None).unwrap();
116
117        let metric3 = CustomMetric::new("empty", 10.0, AlertThreshold::Above, None).unwrap();
118
119        let custom_config = CustomMetricDriftConfig::new(
120            "test",
121            "test",
122            "0.1.0",
123            25,
124            CustomMetricAlertConfig::default(),
125            None,
126        )
127        .unwrap();
128        let profile =
129            CustomDriftProfile::new(custom_config, vec![metric1, metric2, metric3], None).unwrap();
130        let feature_queue = CustomMetricFeatureQueue::new(profile);
131
132        assert_eq!(feature_queue.empty_queue.len(), 3);
133
134        let mut metric_batch = Vec::new();
135        for i in 0..25 {
136            let one = Metric::new("mae".to_string(), i as f64);
137            let two = Metric::new("mape".to_string(), i as f64);
138
139            let metrics = Metrics {
140                metrics: vec![one, two],
141                entity_type: EntityType::Metric,
142            };
143
144            metric_batch.push(metrics);
145        }
146
147        let records = feature_queue
148            .create_drift_records_from_batch(metric_batch)
149            .unwrap();
150
151        // empty should be excluded
152        assert_eq!(records.records.len(), 2);
153
154        // check average of mae
155        for record in records.records.iter() {
156            if let ServerRecord::Custom(custom_record) = record {
157                assert!(custom_record.metric.contains("ma"));
158                assert_eq!(custom_record.value, 12.0);
159            }
160        }
161    }
162}