scouter_events/queue/custom/
feature_queue.rs1use crate::error::FeatureQueueError;
2use crate::queue::traits::FeatureQueue;
3use core::result::Result::Ok;
4use scouter_types::MessageRecord;
5use scouter_types::Metric;
6use scouter_types::QueueExt;
7use scouter_types::{
8 custom::CustomDriftProfile, CustomMetricServerRecord, ServerRecord, ServerRecords,
9};
10use std::collections::HashMap;
11use tracing::{error, instrument};
12pub struct CustomMetricFeatureQueue {
13 drift_profile: CustomDriftProfile,
14 empty_queue: HashMap<String, Vec<f64>>,
15 metric_names: Vec<String>,
16}
17
18impl CustomMetricFeatureQueue {
19 pub fn new(drift_profile: CustomDriftProfile) -> Self {
20 let empty_queue: HashMap<String, Vec<f64>> = drift_profile
21 .metrics
22 .keys()
23 .map(|metric| (metric.clone(), Vec::new()))
24 .collect();
25
26 let metric_names = empty_queue.keys().cloned().collect();
27
28 CustomMetricFeatureQueue {
29 drift_profile,
30 empty_queue,
31 metric_names,
32 }
33 }
34
35 #[instrument(skip_all, name = "insert_custom")]
45 fn insert(
46 &self,
47 metrics: &Vec<Metric>,
48 queue: &mut HashMap<String, Vec<f64>>,
49 ) -> Result<(), FeatureQueueError> {
50 for metric in metrics {
51 if !self.metric_names.contains(&metric.name) {
52 error!("Custom metric {} not found in drift profile", metric.name);
53 continue;
54 }
55 if let Some(queue) = queue.get_mut(&metric.name) {
56 queue.push(metric.value);
57 }
58 }
59 Ok(())
60 }
61
62 fn create_drift_records(
63 &self,
64 queue: HashMap<String, Vec<f64>>,
65 ) -> Result<ServerRecords, FeatureQueueError> {
66 let averages = queue
67 .iter()
68 .filter(|(_, values)| !values.is_empty())
70 .map(|(key, values)| {
71 let avg = values.iter().sum::<f64>() / values.len() as f64;
72 ServerRecord::Custom(CustomMetricServerRecord::new(
73 self.drift_profile.config.space.clone(),
74 self.drift_profile.config.name.clone(),
75 self.drift_profile.config.version.clone(),
76 key.clone(),
77 avg,
78 ))
79 })
80 .collect::<Vec<ServerRecord>>();
81
82 Ok(ServerRecords::new(averages))
83 }
84}
85
86impl FeatureQueue for CustomMetricFeatureQueue {
87 fn create_drift_records_from_batch<T: QueueExt>(
88 &self,
89 batch: Vec<T>,
90 ) -> Result<MessageRecord, FeatureQueueError> {
91 let mut queue = self.empty_queue.clone();
93
94 for elem in batch {
95 self.insert(elem.metrics(), &mut queue)?;
96 }
97
98 Ok(MessageRecord::ServerRecords(
99 self.create_drift_records(queue)?,
100 ))
101 }
102}
103
104#[cfg(test)]
105mod tests {
106
107 use super::*;
108 use scouter_types::custom::{CustomMetric, CustomMetricAlertConfig, CustomMetricDriftConfig};
109 use scouter_types::{AlertThreshold, EntityType};
110 use scouter_types::{Metric, Metrics};
111
112 #[test]
113 fn test_feature_queue_custom_insert_metric() {
114 let metric1 = CustomMetric::new("mae", 10.0, AlertThreshold::Above, None).unwrap();
115
116 let metric2 = CustomMetric::new("mape", 10.0, AlertThreshold::Above, None).unwrap();
117
118 let metric3 = CustomMetric::new("empty", 10.0, AlertThreshold::Above, None).unwrap();
119
120 let custom_config = CustomMetricDriftConfig::new(
121 "test",
122 "test",
123 "0.1.0",
124 25,
125 CustomMetricAlertConfig::default(),
126 None,
127 )
128 .unwrap();
129 let profile =
130 CustomDriftProfile::new(custom_config, vec![metric1, metric2, metric3]).unwrap();
131 let feature_queue = CustomMetricFeatureQueue::new(profile);
132
133 assert_eq!(feature_queue.empty_queue.len(), 3);
134
135 let mut metric_batch = Vec::new();
136 for i in 0..25 {
137 let one = Metric::new_rs("mae".to_string(), i as f64);
138 let two = Metric::new_rs("mape".to_string(), i as f64);
139
140 let metrics = Metrics {
141 metrics: vec![one, two],
142 entity_type: EntityType::Metric,
143 };
144
145 metric_batch.push(metrics);
146 }
147
148 let records = feature_queue
149 .create_drift_records_from_batch(metric_batch)
150 .unwrap();
151
152 assert_eq!(records.len(), 2);
154
155 for record in records.iter_server_records().unwrap() {
157 if let ServerRecord::Custom(custom_record) = record {
158 assert!(custom_record.metric.contains("ma"));
159 assert_eq!(custom_record.value, 12.0);
160 }
161 }
162 }
163}