scouter_drift/spc/
drift.rs

1#[cfg(feature = "sql")]
2pub mod spc_drifter {
3    use crate::error::DriftError;
4    use crate::spc::alert::generate_alerts;
5    use crate::spc::monitor::SpcMonitor;
6    use chrono::{DateTime, Utc};
7    use ndarray::Array2;
8    use ndarray::ArrayView2;
9    use scouter_dispatch::AlertDispatcher;
10    use scouter_sql::sql::traits::SpcSqlLogic;
11    use scouter_sql::PostgresClient;
12    use scouter_types::contracts::ServiceInfo;
13    use scouter_types::spc::{SpcDriftFeatures, SpcDriftProfile, TaskAlerts};
14    use sqlx::{Pool, Postgres};
15    use std::collections::BTreeMap;
16    use tracing::error;
17    use tracing::info;
18
19    #[derive(Debug, Clone)]
20    pub struct SpcDriftArray {
21        pub features: Vec<String>,
22        pub array: Array2<f64>,
23    }
24
25    impl SpcDriftArray {
26        pub fn new(records: SpcDriftFeatures) -> Result<Self, DriftError> {
27            let mut features = Vec::new();
28            let mut flattened = Vec::new();
29
30            // check if records is empty. If so return Array2::default()
31            if records.is_empty() {
32                return Ok(SpcDriftArray {
33                    features,
34                    array: Array2::default((0, 0)),
35                });
36            }
37
38            for (feature, drift) in records.features.into_iter() {
39                features.push(feature);
40                flattened.extend(drift.values);
41            }
42
43            let rows = features.len();
44            let cols = flattened.len() / rows;
45            let array = Array2::from_shape_vec((rows, cols), flattened)?;
46
47            Ok(SpcDriftArray { features, array })
48        }
49    }
50
51    // Defines the SpcDrifter struct
52    // This is used to process drift alerts for spc style profiles
53    pub struct SpcDrifter {
54        service_info: ServiceInfo,
55        profile: SpcDriftProfile,
56    }
57
58    impl SpcDrifter {
59        pub fn new(profile: SpcDriftProfile) -> Self {
60            Self {
61                service_info: ServiceInfo {
62                    name: profile.config.name.clone(),
63                    space: profile.config.space.clone(),
64                    version: profile.config.version.clone(),
65                },
66                profile,
67            }
68        }
69
70        /// Get drift features for a given drift profile
71        ///
72        /// # Arguments
73        ///
74        /// * `db_client` - Postgres client to use for querying feature data
75        /// * `limit_datetime` - Limit timestamp for drift computation (this is the previous_run timestamp)
76        /// * `features_to_monitor` - Features to monitor for drift
77        ///
78        /// # Returns
79        ///
80        /// * `Result<QueryResult>` - Query result
81        async fn get_drift_features(
82            &self,
83            db_pool: &Pool<Postgres>,
84            limit_datetime: &DateTime<Utc>,
85            features_to_monitor: &[String],
86        ) -> Result<SpcDriftArray, DriftError> {
87            let records = PostgresClient::get_spc_drift_records(
88                db_pool,
89                &self.service_info,
90                limit_datetime,
91                features_to_monitor,
92            )
93            .await?;
94            SpcDriftArray::new(records)
95        }
96
97        /// Compute drift for a given drift profile
98        ///
99        /// # Arguments
100        ///
101        /// * `limit_datetime` - Limit timestamp for drift computation (this is the previous_run timestamp)
102        /// * `db_client` - Postgres client to use for querying feature data
103        ///     
104        /// # Returns
105        ///
106        /// * `Result<Array2<f64>>` - Drift array
107        pub async fn compute_drift(
108            &self,
109            limit_datetime: &DateTime<Utc>,
110            db_pool: &Pool<Postgres>,
111        ) -> Result<(Array2<f64>, Vec<String>), DriftError> {
112            let drift_features = self
113                .get_drift_features(
114                    db_pool,
115                    limit_datetime,
116                    &self.profile.config.alert_config.features_to_monitor,
117                )
118                .await?;
119
120            // if drift features is empty, return early
121            if drift_features.array.is_empty() {
122                info!("No features to process returning early");
123                return Ok((drift_features.array, drift_features.features));
124            }
125
126            let drift = SpcMonitor::new().calculate_drift_from_sample(
127                &drift_features.features,
128                &drift_features.array.t().view(), // need to transpose because calculation is done at the row level across each feature
129                &self.profile,
130            )?;
131
132            Ok((drift, drift_features.features))
133        }
134
135        /// Generate alerts for a given drift profile
136        ///
137        /// # Arguments
138        ///
139        /// * `array` - Drift array
140        /// * `features` - Features to monitor for drift
141        ///
142        /// # Returns
143        ///
144        /// * `Result<Option<TaskAlerts>>` - Task alerts
145        pub async fn generate_alerts(
146            &self,
147            array: &ArrayView2<'_, f64>,
148            features: &[String],
149        ) -> Result<Option<TaskAlerts>, DriftError> {
150            let mut task_alerts = TaskAlerts::default();
151            // Get alerts
152            // keys are the feature names that match the order of the drift array columns
153            let alert_rule = self.profile.config.alert_config.rule.clone();
154            let alerts = generate_alerts(array, features, &alert_rule)?;
155
156            // Get dispatcher, will default to console if env vars are not found for 3rd party service
157            // TODO: Add ability to pass hashmap of kwargs to dispatcher (from drift profile)
158            // This would be for things like opsgenie team, feature priority, slack channel, etc.
159            let alert_dispatcher = AlertDispatcher::new(&self.profile.config).inspect_err(|e| {
160                error!(
161                    "Error creating alert dispatcher for {}/{}/{}: {}",
162                    self.service_info.space, self.service_info.name, self.service_info.version, e
163                );
164            })?;
165
166            if alerts.has_alerts {
167                alert_dispatcher
168                    .process_alerts(&alerts)
169                    .await
170                    .inspect_err(|e| {
171                        error!(
172                            "Error processing alerts for {}/{}/{}: {}",
173                            self.service_info.space,
174                            self.service_info.name,
175                            self.service_info.version,
176                            e
177                        );
178                    })?;
179                task_alerts.alerts = alerts;
180                return Ok(Some(task_alerts));
181            } else {
182                info!(
183                    "No alerts to process for {}/{}/{}",
184                    self.service_info.space, self.service_info.name, self.service_info.version
185                );
186            }
187
188            Ok(None)
189        }
190
191        /// organize alerts so that each alert is mapped to a single entry and feature
192        /// Some features may produce multiple alerts
193        ///
194        /// # Arguments
195        ///
196        /// * `alerts` - TaskAlerts to organize
197        ///
198        /// # Returns
199        ///
200        fn organize_alerts(&self, mut alerts: TaskAlerts) -> Vec<BTreeMap<String, String>> {
201            let mut tasks = Vec::new();
202            alerts.alerts.features.iter_mut().for_each(|(_, feature)| {
203                feature.alerts.iter().for_each(|alert| {
204                    let alert_map = {
205                        let mut alert_map = BTreeMap::new();
206                        alert_map.insert("zone".to_string(), alert.zone.clone().to_string());
207                        alert_map.insert("kind".to_string(), alert.kind.clone().to_string());
208                        alert_map.insert("entity_name".to_string(), feature.feature.clone());
209                        alert_map
210                    };
211                    tasks.push(alert_map);
212                });
213            });
214
215            tasks
216        }
217
218        /// Process a single drift computation task
219        ///
220        /// # Arguments
221        /// * `previous_run` - Previous run timestamp
222        pub async fn check_for_alerts(
223            &self,
224            db_client: &Pool<Postgres>,
225            previous_run: DateTime<Utc>,
226        ) -> Result<Option<Vec<BTreeMap<String, String>>>, DriftError> {
227            // Compute drift
228            let (drift_array, keys) = self.compute_drift(&previous_run, db_client).await?;
229
230            // if drift array is empty, return early
231            if drift_array.is_empty() {
232                return Ok(None);
233            }
234
235            // Generate alerts (if any)
236            let alerts = self
237                .generate_alerts(&drift_array.view(), &keys)
238                .await
239                .inspect_err(|e| {
240                    error!(
241                        "Error generating alerts for {}/{}/{}: {}",
242                        self.service_info.space,
243                        self.service_info.name,
244                        self.service_info.version,
245                        e
246                    );
247                })?;
248
249            match alerts {
250                Some(alerts) => {
251                    let organized_alerts = self.organize_alerts(alerts);
252                    Ok(Some(organized_alerts))
253                }
254                None => Ok(None),
255            }
256        }
257    }
258}