scouter_drift/spc/
drift.rs1#[cfg(feature = "sql")]
2pub mod spc_drifter {
3 use crate::error::DriftError;
4 use crate::spc::alert::generate_alerts;
5 use crate::spc::monitor::SpcMonitor;
6 use chrono::{DateTime, Utc};
7 use ndarray::Array2;
8 use ndarray::ArrayView2;
9 use scouter_dispatch::AlertDispatcher;
10 use scouter_sql::sql::traits::SpcSqlLogic;
11 use scouter_sql::PostgresClient;
12 use scouter_types::contracts::ServiceInfo;
13 use scouter_types::spc::{SpcDriftFeatures, SpcDriftProfile, TaskAlerts};
14 use sqlx::{Pool, Postgres};
15 use std::collections::BTreeMap;
16 use tracing::error;
17 use tracing::info;
18
19 #[derive(Debug, Clone)]
20 pub struct SpcDriftArray {
21 pub features: Vec<String>,
22 pub array: Array2<f64>,
23 }
24
25 impl SpcDriftArray {
26 pub fn new(records: SpcDriftFeatures) -> Result<Self, DriftError> {
27 let mut features = Vec::new();
28 let mut flattened = Vec::new();
29
30 if records.is_empty() {
32 return Ok(SpcDriftArray {
33 features,
34 array: Array2::default((0, 0)),
35 });
36 }
37
38 for (feature, drift) in records.features.into_iter() {
39 features.push(feature);
40 flattened.extend(drift.values);
41 }
42
43 let rows = features.len();
44 let cols = flattened.len() / rows;
45 let array = Array2::from_shape_vec((rows, cols), flattened)?;
46
47 Ok(SpcDriftArray { features, array })
48 }
49 }
50
51 pub struct SpcDrifter {
54 service_info: ServiceInfo,
55 profile: SpcDriftProfile,
56 }
57
58 impl SpcDrifter {
59 pub fn new(profile: SpcDriftProfile) -> Self {
60 Self {
61 service_info: ServiceInfo {
62 name: profile.config.name.clone(),
63 space: profile.config.space.clone(),
64 version: profile.config.version.clone(),
65 },
66 profile,
67 }
68 }
69
70 async fn get_drift_features(
82 &self,
83 db_pool: &Pool<Postgres>,
84 limit_datetime: &DateTime<Utc>,
85 features_to_monitor: &[String],
86 ) -> Result<SpcDriftArray, DriftError> {
87 let records = PostgresClient::get_spc_drift_records(
88 db_pool,
89 &self.service_info,
90 limit_datetime,
91 features_to_monitor,
92 )
93 .await?;
94 SpcDriftArray::new(records)
95 }
96
97 pub async fn compute_drift(
108 &self,
109 limit_datetime: &DateTime<Utc>,
110 db_pool: &Pool<Postgres>,
111 ) -> Result<(Array2<f64>, Vec<String>), DriftError> {
112 let drift_features = self
113 .get_drift_features(
114 db_pool,
115 limit_datetime,
116 &self.profile.config.alert_config.features_to_monitor,
117 )
118 .await?;
119
120 if drift_features.array.is_empty() {
122 info!("No features to process returning early");
123 return Ok((drift_features.array, drift_features.features));
124 }
125
126 let drift = SpcMonitor::new().calculate_drift_from_sample(
127 &drift_features.features,
128 &drift_features.array.t().view(), &self.profile,
130 )?;
131
132 Ok((drift, drift_features.features))
133 }
134
135 pub async fn generate_alerts(
146 &self,
147 array: &ArrayView2<'_, f64>,
148 features: &[String],
149 ) -> Result<Option<TaskAlerts>, DriftError> {
150 let mut task_alerts = TaskAlerts::default();
151 let alert_rule = self.profile.config.alert_config.rule.clone();
154 let alerts = generate_alerts(array, features, &alert_rule)?;
155
156 let alert_dispatcher = AlertDispatcher::new(&self.profile.config).inspect_err(|e| {
160 error!(
161 "Error creating alert dispatcher for {}/{}/{}: {}",
162 self.service_info.space, self.service_info.name, self.service_info.version, e
163 );
164 })?;
165
166 if alerts.has_alerts {
167 alert_dispatcher
168 .process_alerts(&alerts)
169 .await
170 .inspect_err(|e| {
171 error!(
172 "Error processing alerts for {}/{}/{}: {}",
173 self.service_info.space,
174 self.service_info.name,
175 self.service_info.version,
176 e
177 );
178 })?;
179 task_alerts.alerts = alerts;
180 return Ok(Some(task_alerts));
181 } else {
182 info!(
183 "No alerts to process for {}/{}/{}",
184 self.service_info.space, self.service_info.name, self.service_info.version
185 );
186 }
187
188 Ok(None)
189 }
190
191 fn organize_alerts(&self, mut alerts: TaskAlerts) -> Vec<BTreeMap<String, String>> {
201 let mut tasks = Vec::new();
202 alerts.alerts.features.iter_mut().for_each(|(_, feature)| {
203 feature.alerts.iter().for_each(|alert| {
204 let alert_map = {
205 let mut alert_map = BTreeMap::new();
206 alert_map.insert("zone".to_string(), alert.zone.clone().to_string());
207 alert_map.insert("kind".to_string(), alert.kind.clone().to_string());
208 alert_map.insert("entity_name".to_string(), feature.feature.clone());
209 alert_map
210 };
211 tasks.push(alert_map);
212 });
213 });
214
215 tasks
216 }
217
218 pub async fn check_for_alerts(
223 &self,
224 db_client: &Pool<Postgres>,
225 previous_run: DateTime<Utc>,
226 ) -> Result<Option<Vec<BTreeMap<String, String>>>, DriftError> {
227 let (drift_array, keys) = self.compute_drift(&previous_run, db_client).await?;
229
230 if drift_array.is_empty() {
232 return Ok(None);
233 }
234
235 let alerts = self
237 .generate_alerts(&drift_array.view(), &keys)
238 .await
239 .inspect_err(|e| {
240 error!(
241 "Error generating alerts for {}/{}/{}: {}",
242 self.service_info.space,
243 self.service_info.name,
244 self.service_info.version,
245 e
246 );
247 })?;
248
249 match alerts {
250 Some(alerts) => {
251 let organized_alerts = self.organize_alerts(alerts);
252 Ok(Some(organized_alerts))
253 }
254 None => Ok(None),
255 }
256 }
257 }
258}