Skip to main content

scouter_sql/sql/traits/
spc.rs

1use crate::sql::error::SqlError;
2use crate::sql::query::Queries;
3use crate::sql::schema::SpcFeatureResult;
4use crate::sql::utils::split_custom_interval;
5use async_trait::async_trait;
6use chrono::{DateTime, Utc};
7use itertools::multiunzip;
8use scouter_dataframe::parquet::{dataframe_to_spc_drift_features, ParquetDataFrame};
9use scouter_settings::ObjectStorageSettings;
10use scouter_types::{
11    spc::{SpcDriftFeature, SpcDriftFeatures},
12    DriftRequest, RecordType, SpcRecord,
13};
14use sqlx::{postgres::PgQueryResult, Pool, Postgres, Row};
15use std::collections::BTreeMap;
16use tracing::{debug, error, instrument};
17
18#[async_trait]
19pub trait SpcSqlLogic {
20    /// Inserts a batch of SPC drift records into the database
21    /// # Arguments
22    /// * `pool` - The database connection pool
23    /// * `records` - The SPC drift records to insert
24    /// * `entity_id` - The entity ID associated with the records
25    /// # Returns
26    /// * A result containing the query result or an error
27    async fn insert_spc_drift_records_batch(
28        pool: &Pool<Postgres>,
29        records: &[SpcRecord],
30        entity_id: &i32,
31    ) -> Result<PgQueryResult, SqlError> {
32        if records.is_empty() {
33            return Err(SqlError::EmptyBatchError);
34        }
35
36        let query = Queries::InsertSpcDriftRecordBatch.get_query();
37
38        let (created_ats, entity_ids, features, values): (
39            Vec<DateTime<Utc>>,
40            Vec<i32>,
41            Vec<&str>,
42            Vec<f64>,
43        ) = multiunzip(
44            records
45                .iter()
46                .map(|r| (r.created_at, entity_id, r.feature.as_str(), r.value)),
47        );
48
49        sqlx::query(query)
50            .bind(created_ats)
51            .bind(entity_ids)
52            .bind(features)
53            .bind(values)
54            .execute(pool)
55            .await
56            .map_err(SqlError::SqlxError)
57    }
58
59    // Queries the database for all features under an entity
60    /// # Arguments
61    /// * `pool` - The database connection pool
62    /// * `entity_id` - The entity ID to filter features
63    /// # Returns
64    /// * A vector of feature names
65    async fn get_spc_features(
66        pool: &Pool<Postgres>,
67        entity_id: &i32,
68    ) -> Result<Vec<String>, SqlError> {
69        let query = Queries::GetSpcFeatures.get_query();
70
71        Ok(sqlx::query(query)
72            .bind(entity_id)
73            .fetch_all(pool)
74            .await
75            .inspect_err(|e| {
76                error!("Error fetching SPC features: {:?}", e);
77            })
78            .map(|result| {
79                result
80                    .iter()
81                    .map(|row| row.get("feature"))
82                    .collect::<Vec<String>>()
83            })?)
84    }
85
86    /// Get SPC drift records
87    ///
88    /// # Arguments
89    /// * `limit_datetime` - The limit datetime to get drift records for
90    /// * `features_to_monitor` - The features to monitor
91    /// * `entity_id` - The entity ID to filter records
92    async fn get_spc_drift_records(
93        pool: &Pool<Postgres>,
94        limit_datetime: &DateTime<Utc>,
95        features_to_monitor: &[String],
96        entity_id: &i32,
97    ) -> Result<SpcDriftFeatures, SqlError> {
98        let mut features = Self::get_spc_features(pool, entity_id).await?;
99
100        if !features_to_monitor.is_empty() {
101            features.retain(|feature| features_to_monitor.contains(feature));
102        }
103
104        let query = Queries::GetSpcFeatureValues.get_query();
105
106        let records: Vec<SpcFeatureResult> = sqlx::query_as(query)
107            .bind(limit_datetime)
108            .bind(entity_id)
109            .bind(features)
110            .fetch_all(pool)
111            .await
112            .inspect_err(|e| {
113                error!("Error fetching SPC drift records: {:?}", e);
114            })?;
115
116        let feature_drift = records
117            .into_iter()
118            .map(|record| {
119                let feature = SpcDriftFeature {
120                    created_at: record.created_at,
121                    values: record.values,
122                };
123                (record.feature.clone(), feature)
124            })
125            .collect::<BTreeMap<String, SpcDriftFeature>>();
126
127        Ok(SpcDriftFeatures {
128            features: feature_drift,
129        })
130    }
131
132    /// Queries the database for SPC drift records based on a time window
133    /// and aggregation.
134    ///
135    /// # Arguments
136    /// * `pool` - The database connection pool
137    /// * `params` - The drift request parameters
138    /// * `minutes` - The number of minutes to bin the data
139    /// * `entity_id` - The entity ID to filter records
140    ///
141    /// # Returns
142    /// * SpcDriftFeatures
143    async fn get_records(
144        pool: &Pool<Postgres>,
145        params: &DriftRequest,
146        start_dt: DateTime<Utc>,
147        end_dt: DateTime<Utc>,
148        entity_id: &i32,
149    ) -> Result<SpcDriftFeatures, SqlError> {
150        let minutes = end_dt.signed_duration_since(start_dt).num_minutes() as f64;
151        let bin = minutes / params.max_data_points as f64;
152
153        let query = Queries::GetBinnedSpcFeatureValues.get_query();
154
155        let records: Vec<SpcFeatureResult> = sqlx::query_as(query)
156            .bind(bin)
157            .bind(start_dt)
158            .bind(end_dt)
159            .bind(entity_id)
160            .fetch_all(pool)
161            .await?;
162
163        let feature_drift = records
164            .into_iter()
165            .map(|record| {
166                let feature = SpcDriftFeature {
167                    created_at: record.created_at,
168                    values: record.values,
169                };
170                (record.feature.clone(), feature)
171            })
172            .collect::<BTreeMap<String, SpcDriftFeature>>();
173
174        Ok(SpcDriftFeatures {
175            features: feature_drift,
176        })
177    }
178
179    /// Helper for merging current and archived binned spc drift records.
180    fn merge_feature_results(
181        results: SpcDriftFeatures,
182        map: &mut SpcDriftFeatures,
183    ) -> Result<(), SqlError> {
184        for (feature_name, feature) in results.features {
185            let feature_clone = feature.clone();
186            map.features
187                .entry(feature_name)
188                .and_modify(|existing| {
189                    existing.created_at.extend(feature_clone.created_at);
190                    existing.values.extend(feature_clone.values);
191                })
192                .or_insert(feature);
193        }
194
195        Ok(())
196    }
197
198    /// DataFusion implementation for getting spc drift records from archived data.
199    ///
200    /// # Arguments
201    /// * `params` - The drift request parameters
202    /// * `begin` - The start time of the time window
203    /// * `end` - The end time of the time window
204    /// * `minutes` - The number of minutes to bin the data
205    /// * `storage_settings` - The object storage settings
206    /// * `entity_id` - The entity ID to filter records
207    ///
208    /// # Returns
209    /// * A vector of drift records
210    async fn get_archived_records(
211        params: &DriftRequest,
212        begin: DateTime<Utc>,
213        end: DateTime<Utc>,
214        minutes: i32,
215        storage_settings: &ObjectStorageSettings,
216        entity_id: &i32,
217    ) -> Result<SpcDriftFeatures, SqlError> {
218        let path = format!("{}/spc", params.uid);
219        let bin = minutes as f64 / params.max_data_points as f64;
220
221        let archived_df = ParquetDataFrame::new(storage_settings, &RecordType::Spc)?
222            .get_binned_metrics(&path, &bin, &begin, &end, entity_id)
223            .await?;
224
225        Ok(dataframe_to_spc_drift_features(archived_df).await?)
226    }
227
228    // Queries the database for drift records based on a time window and aggregation
229    //
230    // # Arguments
231    //
232    // * `name` - The name of the service to query drift records for
233    // * `space` - The name of the space to query drift records for
234    // * `feature` - The name of the feature to query drift records for
235    // * `aggregation` - The aggregation to use for the query
236    // * `time_interval` - The time window to query drift records for
237    //
238    // # Returns
239    //
240    // * A vector of drift records
241    #[instrument(skip_all)]
242    async fn get_binned_spc_drift_records(
243        pool: &Pool<Postgres>,
244        params: &DriftRequest,
245        retention_period: &i32,
246        storage_settings: &ObjectStorageSettings,
247        entity_id: &i32,
248    ) -> Result<SpcDriftFeatures, SqlError> {
249        debug!("Getting binned SPC drift records for {:?}", params);
250
251        if !params.has_custom_interval() {
252            debug!("No custom interval provided, using default");
253            let (begin_utc, end_utc) = params.time_interval.to_begin_end_times()?;
254            return Self::get_records(pool, params, begin_utc, end_utc, entity_id).await;
255        }
256
257        debug!("Custom interval provided, using custom interval");
258        let interval = params.clone().to_custom_interval().unwrap();
259        let timestamps = split_custom_interval(interval.begin, interval.end, retention_period)?;
260        let mut spc_feature_map = SpcDriftFeatures::default();
261
262        // get data from postgres
263        if let Some((active_begin, active_end)) = timestamps.active_range {
264            let current_results =
265                Self::get_records(pool, params, active_begin, active_end, entity_id).await?;
266            Self::merge_feature_results(current_results, &mut spc_feature_map)?;
267        }
268
269        // get archived data
270        if let Some((archive_begin, archive_end)) = timestamps.archived_range {
271            if let Some(archived_minutes) = timestamps.archived_minutes {
272                let archived_results = Self::get_archived_records(
273                    params,
274                    archive_begin,
275                    archive_end,
276                    archived_minutes,
277                    storage_settings,
278                    entity_id,
279                )
280                .await?;
281
282                Self::merge_feature_results(archived_results, &mut spc_feature_map)?;
283            }
284        }
285
286        Ok(spc_feature_map)
287    }
288}