scouter_events/queue/custom/
feature_queue.rs1use crate::error::FeatureQueueError;
2use crate::queue::traits::FeatureQueue;
3use core::result::Result::Ok;
4use scouter_types::Metric;
5use scouter_types::QueueExt;
6use scouter_types::{
7 custom::CustomDriftProfile, CustomMetricServerRecord, ServerRecord, ServerRecords,
8};
9use std::collections::HashMap;
10use tracing::{error, instrument};
11pub struct CustomMetricFeatureQueue {
12 drift_profile: CustomDriftProfile,
13 empty_queue: HashMap<String, Vec<f64>>,
14 metric_names: Vec<String>,
15}
16
17impl CustomMetricFeatureQueue {
18 pub fn new(drift_profile: CustomDriftProfile) -> Self {
19 let empty_queue: HashMap<String, Vec<f64>> = drift_profile
20 .metrics
21 .keys()
22 .map(|metric| (metric.clone(), Vec::new()))
23 .collect();
24
25 let metric_names = empty_queue.keys().cloned().collect();
26
27 CustomMetricFeatureQueue {
28 drift_profile,
29 empty_queue,
30 metric_names,
31 }
32 }
33
34 #[instrument(skip_all, name = "insert_custom")]
44 fn insert(
45 &self,
46 metrics: &Vec<Metric>,
47 queue: &mut HashMap<String, Vec<f64>>,
48 ) -> Result<(), FeatureQueueError> {
49 for metric in metrics {
50 if !self.metric_names.contains(&metric.name) {
51 error!("Custom metric {} not found in drift profile", metric.name);
52 continue;
53 }
54 if let Some(queue) = queue.get_mut(&metric.name) {
55 queue.push(metric.value);
56 }
57 }
58 Ok(())
59 }
60
61 fn create_drift_records(
62 &self,
63 queue: HashMap<String, Vec<f64>>,
64 ) -> Result<ServerRecords, FeatureQueueError> {
65 let averages = queue
66 .iter()
67 .filter(|(_, values)| !values.is_empty())
69 .map(|(key, values)| {
70 let avg = values.iter().sum::<f64>() / values.len() as f64;
71 ServerRecord::Custom(CustomMetricServerRecord::new(
72 self.drift_profile.config.space.clone(),
73 self.drift_profile.config.name.clone(),
74 self.drift_profile.config.version.clone(),
75 key.clone(),
76 avg,
77 ))
78 })
79 .collect::<Vec<ServerRecord>>();
80
81 Ok(ServerRecords::new(averages))
82 }
83}
84
85impl FeatureQueue for CustomMetricFeatureQueue {
86 fn create_drift_records_from_batch<T: QueueExt>(
87 &self,
88 batch: Vec<T>,
89 ) -> Result<ServerRecords, FeatureQueueError> {
90 let mut queue = self.empty_queue.clone();
92
93 for elem in batch {
94 self.insert(elem.metrics(), &mut queue)?;
95 }
96
97 self.create_drift_records(queue)
98 }
99}
100
101#[cfg(test)]
102mod tests {
103
104 use super::*;
105 use scouter_types::custom::{
106 AlertThreshold, CustomMetric, CustomMetricAlertConfig, CustomMetricDriftConfig,
107 };
108 use scouter_types::EntityType;
109 use scouter_types::{Metric, Metrics};
110
111 #[test]
112 fn test_feature_queue_custom_insert_metric() {
113 let metric1 = CustomMetric::new("mae", 10.0, AlertThreshold::Above, None).unwrap();
114
115 let metric2 = CustomMetric::new("mape", 10.0, AlertThreshold::Above, None).unwrap();
116
117 let metric3 = CustomMetric::new("empty", 10.0, AlertThreshold::Above, None).unwrap();
118
119 let custom_config = CustomMetricDriftConfig::new(
120 "test",
121 "test",
122 "0.1.0",
123 25,
124 CustomMetricAlertConfig::default(),
125 None,
126 )
127 .unwrap();
128 let profile =
129 CustomDriftProfile::new(custom_config, vec![metric1, metric2, metric3], None).unwrap();
130 let feature_queue = CustomMetricFeatureQueue::new(profile);
131
132 assert_eq!(feature_queue.empty_queue.len(), 3);
133
134 let mut metric_batch = Vec::new();
135 for i in 0..25 {
136 let one = Metric::new("mae".to_string(), i as f64);
137 let two = Metric::new("mape".to_string(), i as f64);
138
139 let metrics = Metrics {
140 metrics: vec![one, two],
141 entity_type: EntityType::Metric,
142 };
143
144 metric_batch.push(metrics);
145 }
146
147 let records = feature_queue
148 .create_drift_records_from_batch(metric_batch)
149 .unwrap();
150
151 assert_eq!(records.records.len(), 2);
153
154 for record in records.records.iter() {
156 if let ServerRecord::Custom(custom_record) = record {
157 assert!(custom_record.metric.contains("ma"));
158 assert_eq!(custom_record.value, 12.0);
159 }
160 }
161 }
162}