scouter_drift/spc/
drift.rs1#[cfg(feature = "sql")]
2pub mod spc_drifter {
3 use crate::error::DriftError;
4 use crate::spc::alert::generate_alerts;
5 use crate::spc::monitor::SpcMonitor;
6 use chrono::{DateTime, Utc};
7 use ndarray::Array2;
8 use ndarray::ArrayView2;
9 use scouter_dispatch::AlertDispatcher;
10 use scouter_sql::sql::traits::SpcSqlLogic;
11 use scouter_sql::{sql::cache::entity_cache, PostgresClient};
12 use scouter_types::spc::{SpcDriftFeatures, SpcDriftProfile, TaskAlerts};
13 use scouter_types::AlertMap;
14 use scouter_types::ProfileBaseArgs;
15 use sqlx::{Pool, Postgres};
16 use tracing::error;
17 use tracing::info;
18
19 #[derive(Debug, Clone)]
20 pub struct SpcDriftArray {
21 pub features: Vec<String>,
22 pub array: Array2<f64>,
23 }
24
25 impl SpcDriftArray {
26 pub fn new(records: SpcDriftFeatures) -> Result<Self, DriftError> {
27 let mut features = Vec::new();
28 let mut flattened = Vec::new();
29
30 if records.is_empty() {
32 return Ok(SpcDriftArray {
33 features,
34 array: Array2::default((0, 0)),
35 });
36 }
37
38 for (feature, drift) in records.features.into_iter() {
39 features.push(feature);
40 flattened.extend(drift.values);
41 }
42
43 let rows = features.len();
44 let cols = flattened.len() / rows;
45 let array = Array2::from_shape_vec((rows, cols), flattened)?;
46
47 Ok(SpcDriftArray { features, array })
48 }
49 }
50
51 pub struct SpcDrifter {
54 profile: SpcDriftProfile,
55 }
56
57 impl SpcDrifter {
58 pub fn new(profile: SpcDriftProfile) -> Self {
59 Self { profile }
60 }
61
62 async fn get_drift_features(
74 &self,
75 db_pool: &Pool<Postgres>,
76 limit_datetime: &DateTime<Utc>,
77 features_to_monitor: &[String],
78 ) -> Result<SpcDriftArray, DriftError> {
79 let entity_id = entity_cache()
80 .get_entity_id_from_uid(db_pool, &self.profile.config.uid)
81 .await?;
82
83 let records = PostgresClient::get_spc_drift_records(
84 db_pool,
85 limit_datetime,
86 features_to_monitor,
87 &entity_id,
88 )
89 .await?;
90 SpcDriftArray::new(records)
91 }
92
93 pub async fn compute_drift(
104 &self,
105 limit_datetime: &DateTime<Utc>,
106 db_pool: &Pool<Postgres>,
107 ) -> Result<(Array2<f64>, Vec<String>), DriftError> {
108 let drift_features = self
109 .get_drift_features(
110 db_pool,
111 limit_datetime,
112 &self.profile.config.alert_config.features_to_monitor,
113 )
114 .await?;
115
116 if drift_features.array.is_empty() {
118 info!("No features to process returning early");
119 return Ok((drift_features.array, drift_features.features));
120 }
121
122 let drift = SpcMonitor::new().calculate_drift_from_sample(
123 &drift_features.features,
124 &drift_features.array.t().view(), &self.profile,
126 )?;
127
128 Ok((drift, drift_features.features))
129 }
130
131 pub async fn generate_alerts(
142 &self,
143 array: &ArrayView2<'_, f64>,
144 features: &[String],
145 ) -> Result<Option<TaskAlerts>, DriftError> {
146 let mut task_alerts = TaskAlerts::default();
147 let alert_rule = self.profile.config.alert_config.rule.clone();
150 let alerts = generate_alerts(array, features, &alert_rule)?;
151
152 let alert_dispatcher = AlertDispatcher::new(&self.profile.config).inspect_err(|e| {
156 error!(
157 "Error creating alert dispatcher for {}/{}/{}: {}",
158 self.profile.space(),
159 self.profile.name(),
160 self.profile.version(),
161 e
162 );
163 })?;
164
165 if alerts.has_alerts {
166 alert_dispatcher
167 .process_alerts(&alerts)
168 .await
169 .inspect_err(|e| {
170 error!(
171 "Error processing alerts for {}/{}/{}: {}",
172 self.profile.space(),
173 self.profile.name(),
174 self.profile.version(),
175 e
176 );
177 })?;
178 task_alerts.alerts = alerts;
179 return Ok(Some(task_alerts));
180 } else {
181 info!(
182 "No alerts to process for {}/{}/{}",
183 self.profile.space(),
184 self.profile.name(),
185 self.profile.version(),
186 );
187 }
188
189 Ok(None)
190 }
191
192 fn organize_alerts(&self, alerts: TaskAlerts) -> Vec<AlertMap> {
202 let mut tasks = Vec::new();
203 alerts.alerts.features.iter().for_each(|(_, feature)| {
204 let entry_vec = feature.to_alert_map();
205 tasks.extend(entry_vec);
206 });
207
208 tasks
209 }
210
211 pub async fn check_for_alerts(
216 &self,
217 db_client: &Pool<Postgres>,
218 previous_run: &DateTime<Utc>,
219 ) -> Result<Option<Vec<AlertMap>>, DriftError> {
220 let (drift_array, keys) = self.compute_drift(previous_run, db_client).await?;
222
223 if drift_array.is_empty() {
225 return Ok(None);
226 }
227
228 let alerts = self
230 .generate_alerts(&drift_array.view(), &keys)
231 .await
232 .inspect_err(|e| {
233 error!(
234 "Error generating alerts for {}/{}/{}: {}",
235 self.profile.space(),
236 self.profile.name(),
237 self.profile.version(),
238 e
239 );
240 })?;
241
242 match alerts {
243 Some(alerts) => {
244 let organized_alerts = self.organize_alerts(alerts);
245 Ok(Some(organized_alerts))
246 }
247 None => Ok(None),
248 }
249 }
250 }
251}