scouter_sql/sql/traits/
spc.rs

1use crate::sql::error::SqlError;
2use crate::sql::query::Queries;
3use crate::sql::schema::SpcFeatureResult;
4use crate::sql::utils::split_custom_interval;
5use async_trait::async_trait;
6use chrono::{DateTime, Utc};
7use scouter_dataframe::parquet::{dataframe_to_spc_drift_features, ParquetDataFrame};
8use scouter_settings::ObjectStorageSettings;
9use scouter_types::{
10    spc::{SpcDriftFeature, SpcDriftFeatures},
11    DriftRequest, RecordType, ServiceInfo, SpcServerRecord,
12};
13
14use sqlx::{postgres::PgQueryResult, Pool, Postgres, Row};
15use std::collections::BTreeMap;
16use tracing::{debug, instrument};
17
18#[async_trait]
19pub trait SpcSqlLogic {
20    /// Inserts a drift record into the database
21    ///
22    /// # Arguments
23    ///
24    /// * `record` - A drift record to insert into the database
25    /// * `table_name` - The name of the table to insert the record into
26    ///
27    async fn insert_spc_drift_record(
28        pool: &Pool<Postgres>,
29        record: &SpcServerRecord,
30    ) -> Result<PgQueryResult, SqlError> {
31        let query = Queries::InsertDriftRecord.get_query();
32
33        Ok(sqlx::query(&query.sql)
34            .bind(record.created_at)
35            .bind(&record.name)
36            .bind(&record.space)
37            .bind(&record.version)
38            .bind(&record.feature)
39            .bind(record.value)
40            .execute(pool)
41            .await?)
42    }
43
44    // Queries the database for all features under a service
45    // Private method that'll be used to run drift retrieval in parallel
46    async fn get_spc_features(
47        pool: &Pool<Postgres>,
48        service_info: &ServiceInfo,
49    ) -> Result<Vec<String>, SqlError> {
50        let query = Queries::GetSpcFeatures.get_query();
51
52        Ok(sqlx::query(&query.sql)
53            .bind(&service_info.name)
54            .bind(&service_info.space)
55            .bind(&service_info.version)
56            .fetch_all(pool)
57            .await
58            .map(|result| {
59                result
60                    .iter()
61                    .map(|row| row.get("feature"))
62                    .collect::<Vec<String>>()
63            })?)
64    }
65
66    /// Get SPC drift records
67    ///
68    /// # Arguments
69    ///
70    /// * `service_info` - The service to get drift records for
71    /// * `limit_datetime` - The limit datetime to get drift records for
72    /// * `features_to_monitor` - The features to monitor
73    async fn get_spc_drift_records(
74        pool: &Pool<Postgres>,
75        service_info: &ServiceInfo,
76        limit_datetime: &DateTime<Utc>,
77        features_to_monitor: &[String],
78    ) -> Result<SpcDriftFeatures, SqlError> {
79        let mut features = Self::get_spc_features(pool, service_info).await?;
80
81        if !features_to_monitor.is_empty() {
82            features.retain(|feature| features_to_monitor.contains(feature));
83        }
84
85        let query = Queries::GetSpcFeatureValues.get_query();
86
87        let records: Vec<SpcFeatureResult> = sqlx::query_as(&query.sql)
88            .bind(limit_datetime)
89            .bind(&service_info.name)
90            .bind(&service_info.space)
91            .bind(&service_info.version)
92            .bind(features)
93            .fetch_all(pool)
94            .await?;
95
96        let feature_drift = records
97            .into_iter()
98            .map(|record| {
99                let feature = SpcDriftFeature {
100                    created_at: record.created_at,
101                    values: record.values,
102                };
103                (record.feature.clone(), feature)
104            })
105            .collect::<BTreeMap<String, SpcDriftFeature>>();
106
107        Ok(SpcDriftFeatures {
108            features: feature_drift,
109        })
110    }
111
112    /// Queries the database for SPC drift records based on a time window
113    /// and aggregation.
114    ///
115    /// # Arguments
116    /// * `pool` - The database connection pool
117    /// * `params` - The drift request parameters
118    ///
119    /// # Returns
120    /// * SpcDriftFeatures
121    async fn get_records(
122        pool: &Pool<Postgres>,
123        params: &DriftRequest,
124        minutes: i32,
125    ) -> Result<SpcDriftFeatures, SqlError> {
126        let bin = params.time_interval.to_minutes() as f64 / params.max_data_points as f64;
127
128        let query = Queries::GetBinnedSpcFeatureValues.get_query();
129
130        let records: Vec<SpcFeatureResult> = sqlx::query_as(&query.sql)
131            .bind(bin)
132            .bind(minutes)
133            .bind(&params.name)
134            .bind(&params.space)
135            .bind(&params.version)
136            .fetch_all(pool)
137            .await?;
138
139        let feature_drift = records
140            .into_iter()
141            .map(|record| {
142                let feature = SpcDriftFeature {
143                    created_at: record.created_at,
144                    values: record.values,
145                };
146                (record.feature.clone(), feature)
147            })
148            .collect::<BTreeMap<String, SpcDriftFeature>>();
149
150        Ok(SpcDriftFeatures {
151            features: feature_drift,
152        })
153    }
154
155    /// Helper for merging current and archived binned spc drift records.
156    fn merge_feature_results(
157        results: SpcDriftFeatures,
158        map: &mut SpcDriftFeatures,
159    ) -> Result<(), SqlError> {
160        for (feature_name, feature) in results.features {
161            let feature_clone = feature.clone();
162            map.features
163                .entry(feature_name)
164                .and_modify(|existing| {
165                    existing.created_at.extend(feature_clone.created_at);
166                    existing.values.extend(feature_clone.values);
167                })
168                .or_insert(feature);
169        }
170
171        Ok(())
172    }
173
174    /// DataFusion implementation for getting spc drift records from archived data.
175    ///
176    /// # Arguments
177    /// * `params` - The drift request parameters
178    /// * `begin` - The start time of the time window
179    /// * `end` - The end time of the time window
180    /// * `minutes` - The number of minutes to bin the data
181    /// * `storage_settings` - The object storage settings
182    ///
183    /// # Returns
184    /// * A vector of drift records
185    async fn get_archived_records(
186        params: &DriftRequest,
187        begin: DateTime<Utc>,
188        end: DateTime<Utc>,
189        minutes: i32,
190        storage_settings: &ObjectStorageSettings,
191    ) -> Result<SpcDriftFeatures, SqlError> {
192        let path = format!("{}/{}/{}/spc", params.space, params.name, params.version);
193        let bin = minutes as f64 / params.max_data_points as f64;
194
195        let archived_df = ParquetDataFrame::new(storage_settings, &RecordType::Spc)?
196            .get_binned_metrics(
197                &path,
198                &bin,
199                &begin,
200                &end,
201                &params.space,
202                &params.name,
203                &params.version,
204            )
205            .await?;
206
207        Ok(dataframe_to_spc_drift_features(archived_df).await?)
208    }
209
210    // Queries the database for drift records based on a time window and aggregation
211    //
212    // # Arguments
213    //
214    // * `name` - The name of the service to query drift records for
215    // * `space` - The name of the space to query drift records for
216    // * `feature` - The name of the feature to query drift records for
217    // * `aggregation` - The aggregation to use for the query
218    // * `time_interval` - The time window to query drift records for
219    //
220    // # Returns
221    //
222    // * A vector of drift records
223    #[instrument(skip_all)]
224    async fn get_binned_spc_drift_records(
225        pool: &Pool<Postgres>,
226        params: &DriftRequest,
227        retention_period: &i32,
228        storage_settings: &ObjectStorageSettings,
229    ) -> Result<SpcDriftFeatures, SqlError> {
230        debug!("Getting binned SPC drift records for {:?}", params);
231
232        if !params.has_custom_interval() {
233            debug!("No custom interval provided, using default");
234            let minutes = params.time_interval.to_minutes();
235            return Self::get_records(pool, params, minutes).await;
236        }
237
238        debug!("Custom interval provided, using custom interval");
239        let interval = params.clone().to_custom_interval().unwrap();
240        let timestamps = split_custom_interval(interval.start, interval.end, retention_period)?;
241        let mut spc_feature_map = SpcDriftFeatures::default();
242
243        // get data from postgres
244        if let Some(minutes) = timestamps.current_minutes {
245            let current_results = Self::get_records(pool, params, minutes).await?;
246            Self::merge_feature_results(current_results, &mut spc_feature_map)?;
247        }
248
249        // get archived data
250        if let Some((archive_begin, archive_end)) = timestamps.archived_range {
251            if let Some(archived_minutes) = timestamps.archived_minutes {
252                let archived_results = Self::get_archived_records(
253                    params,
254                    archive_begin,
255                    archive_end,
256                    archived_minutes,
257                    storage_settings,
258                )
259                .await?;
260
261                Self::merge_feature_results(archived_results, &mut spc_feature_map)?;
262            }
263        }
264
265        Ok(spc_feature_map)
266    }
267}