Skip to main content

scouter_drift/spc/
drift.rs

1#[cfg(feature = "sql")]
2pub mod spc_drifter {
3    use crate::error::DriftError;
4    use crate::spc::alert::generate_alerts;
5    use crate::spc::monitor::SpcMonitor;
6    use chrono::{DateTime, Utc};
7    use ndarray::Array2;
8    use ndarray::ArrayView2;
9    use scouter_dispatch::AlertDispatcher;
10    use scouter_sql::sql::traits::SpcSqlLogic;
11    use scouter_sql::{sql::cache::entity_cache, PostgresClient};
12    use scouter_types::spc::{SpcDriftFeatures, SpcDriftProfile, TaskAlerts};
13    use scouter_types::AlertMap;
14    use scouter_types::ProfileBaseArgs;
15    use sqlx::{Pool, Postgres};
16    use tracing::error;
17    use tracing::info;
18
19    #[derive(Debug, Clone)]
20    pub struct SpcDriftArray {
21        pub features: Vec<String>,
22        pub array: Array2<f64>,
23    }
24
25    impl SpcDriftArray {
26        pub fn new(records: SpcDriftFeatures) -> Result<Self, DriftError> {
27            let mut features = Vec::new();
28            let mut flattened = Vec::new();
29
30            // check if records is empty. If so return Array2::default()
31            if records.is_empty() {
32                return Ok(SpcDriftArray {
33                    features,
34                    array: Array2::default((0, 0)),
35                });
36            }
37
38            for (feature, drift) in records.features.into_iter() {
39                features.push(feature);
40                flattened.extend(drift.values);
41            }
42
43            let rows = features.len();
44            let cols = flattened.len() / rows;
45            let array = Array2::from_shape_vec((rows, cols), flattened)?;
46
47            Ok(SpcDriftArray { features, array })
48        }
49    }
50
51    // Defines the SpcDrifter struct
52    // This is used to process drift alerts for spc style profiles
53    pub struct SpcDrifter {
54        profile: SpcDriftProfile,
55    }
56
57    impl SpcDrifter {
58        pub fn new(profile: SpcDriftProfile) -> Self {
59            Self { profile }
60        }
61
62        /// Get drift features for a given drift profile
63        ///
64        /// # Arguments
65        ///
66        /// * `db_client` - Postgres client to use for querying feature data
67        /// * `limit_datetime` - Limit timestamp for drift computation (this is the previous_run timestamp)
68        /// * `features_to_monitor` - Features to monitor for drift
69        ///
70        /// # Returns
71        ///
72        /// * `Result<QueryResult>` - Query result
73        async fn get_drift_features(
74            &self,
75            db_pool: &Pool<Postgres>,
76            limit_datetime: &DateTime<Utc>,
77            features_to_monitor: &[String],
78        ) -> Result<SpcDriftArray, DriftError> {
79            let entity_id = entity_cache()
80                .get_entity_id_from_uid(db_pool, &self.profile.config.uid)
81                .await?;
82
83            let records = PostgresClient::get_spc_drift_records(
84                db_pool,
85                limit_datetime,
86                features_to_monitor,
87                &entity_id,
88            )
89            .await?;
90            SpcDriftArray::new(records)
91        }
92
93        /// Compute drift for a given drift profile
94        ///
95        /// # Arguments
96        ///
97        /// * `limit_datetime` - Limit timestamp for drift computation (this is the previous_run timestamp)
98        /// * `db_client` - Postgres client to use for querying feature data
99        ///     
100        /// # Returns
101        ///
102        /// * `Result<Array2<f64>>` - Drift array
103        pub async fn compute_drift(
104            &self,
105            limit_datetime: &DateTime<Utc>,
106            db_pool: &Pool<Postgres>,
107        ) -> Result<(Array2<f64>, Vec<String>), DriftError> {
108            let drift_features = self
109                .get_drift_features(
110                    db_pool,
111                    limit_datetime,
112                    &self.profile.config.alert_config.features_to_monitor,
113                )
114                .await?;
115
116            // if drift features is empty, return early
117            if drift_features.array.is_empty() {
118                info!("No features to process returning early");
119                return Ok((drift_features.array, drift_features.features));
120            }
121
122            let drift = SpcMonitor::new().calculate_drift_from_sample(
123                &drift_features.features,
124                &drift_features.array.t().view(), // need to transpose because calculation is done at the row level across each feature
125                &self.profile,
126            )?;
127
128            Ok((drift, drift_features.features))
129        }
130
131        /// Generate alerts for a given drift profile
132        ///
133        /// # Arguments
134        ///
135        /// * `array` - Drift array
136        /// * `features` - Features to monitor for drift
137        ///
138        /// # Returns
139        ///
140        /// * `Result<Option<TaskAlerts>>` - Task alerts
141        pub async fn generate_alerts(
142            &self,
143            array: &ArrayView2<'_, f64>,
144            features: &[String],
145        ) -> Result<Option<TaskAlerts>, DriftError> {
146            let mut task_alerts = TaskAlerts::default();
147            // Get alerts
148            // keys are the feature names that match the order of the drift array columns
149            let alert_rule = self.profile.config.alert_config.rule.clone();
150            let alerts = generate_alerts(array, features, &alert_rule)?;
151
152            // Get dispatcher, will default to console if env vars are not found for 3rd party service
153            // TODO: Add ability to pass hashmap of kwargs to dispatcher (from drift profile)
154            // This would be for things like opsgenie team, feature priority, slack channel, etc.
155            let alert_dispatcher = AlertDispatcher::new(&self.profile.config).inspect_err(|e| {
156                error!(
157                    "Error creating alert dispatcher for {}/{}/{}: {}",
158                    self.profile.space(),
159                    self.profile.name(),
160                    self.profile.version(),
161                    e
162                );
163            })?;
164
165            if alerts.has_alerts {
166                alert_dispatcher
167                    .process_alerts(&alerts)
168                    .await
169                    .inspect_err(|e| {
170                        error!(
171                            "Error processing alerts for {}/{}/{}: {}",
172                            self.profile.space(),
173                            self.profile.name(),
174                            self.profile.version(),
175                            e
176                        );
177                    })?;
178                task_alerts.alerts = alerts;
179                return Ok(Some(task_alerts));
180            } else {
181                info!(
182                    "No alerts to process for {}/{}/{}",
183                    self.profile.space(),
184                    self.profile.name(),
185                    self.profile.version(),
186                );
187            }
188
189            Ok(None)
190        }
191
192        /// organize alerts so that each alert is mapped to a single entry and feature
193        /// Some features may produce multiple alerts
194        ///
195        /// # Arguments
196        ///
197        /// * `alerts` - TaskAlerts to organize
198        ///
199        /// # Returns
200        ///
201        fn organize_alerts(&self, alerts: TaskAlerts) -> Vec<AlertMap> {
202            let mut tasks = Vec::new();
203            alerts.alerts.features.iter().for_each(|(_, feature)| {
204                let entry_vec = feature.to_alert_map();
205                tasks.extend(entry_vec);
206            });
207
208            tasks
209        }
210
211        /// Process a single drift computation task
212        ///
213        /// # Arguments
214        /// * `previous_run` - Previous run timestamp
215        pub async fn check_for_alerts(
216            &self,
217            db_client: &Pool<Postgres>,
218            previous_run: &DateTime<Utc>,
219        ) -> Result<Option<Vec<AlertMap>>, DriftError> {
220            // Compute drift
221            let (drift_array, keys) = self.compute_drift(previous_run, db_client).await?;
222
223            // if drift array is empty, return early
224            if drift_array.is_empty() {
225                return Ok(None);
226            }
227
228            // Generate alerts (if any)
229            let alerts = self
230                .generate_alerts(&drift_array.view(), &keys)
231                .await
232                .inspect_err(|e| {
233                    error!(
234                        "Error generating alerts for {}/{}/{}: {}",
235                        self.profile.space(),
236                        self.profile.name(),
237                        self.profile.version(),
238                        e
239                    );
240                })?;
241
242            match alerts {
243                Some(alerts) => {
244                    let organized_alerts = self.organize_alerts(alerts);
245                    Ok(Some(organized_alerts))
246                }
247                None => Ok(None),
248            }
249        }
250    }
251}