scouter_sql/sql/traits/
spc.rs

1use crate::sql::error::SqlError;
2use crate::sql::query::Queries;
3use crate::sql::schema::SpcFeatureResult;
4use crate::sql::utils::split_custom_interval;
5use async_trait::async_trait;
6use chrono::{DateTime, Utc};
7use itertools::multiunzip;
8use scouter_dataframe::parquet::{dataframe_to_spc_drift_features, ParquetDataFrame};
9use scouter_settings::ObjectStorageSettings;
10use scouter_types::{
11    spc::{SpcDriftFeature, SpcDriftFeatures},
12    DriftRequest, RecordType, ServiceInfo, SpcServerRecord,
13};
14use sqlx::{postgres::PgQueryResult, Pool, Postgres, Row};
15use std::collections::BTreeMap;
16use tracing::{debug, instrument};
17
18#[async_trait]
19pub trait SpcSqlLogic {
20    /// Inserts a batch of SPC drift records into the database
21    /// # Arguments
22    /// * `pool` - The database connection pool
23    /// * `records` - The SPC drift records to insert
24    /// # Returns
25    /// * A result containing the query result or an error
26    async fn insert_spc_drift_records_batch(
27        pool: &Pool<Postgres>,
28        records: &[SpcServerRecord],
29    ) -> Result<PgQueryResult, SqlError> {
30        if records.is_empty() {
31            return Err(SqlError::EmptyBatchError);
32        }
33
34        let query = Queries::InsertSpcDriftRecordBatch.get_query();
35
36        let (created_ats, names, spaces, versions, features, values): (
37            Vec<DateTime<Utc>>,
38            Vec<&str>,
39            Vec<&str>,
40            Vec<&str>,
41            Vec<&str>,
42            Vec<f64>,
43        ) = multiunzip(records.iter().map(|r| {
44            (
45                r.created_at,
46                r.name.as_str(),
47                r.space.as_str(),
48                r.version.as_str(),
49                r.feature.as_str(),
50                r.value,
51            )
52        }));
53
54        sqlx::query(&query.sql)
55            .bind(created_ats)
56            .bind(names)
57            .bind(spaces)
58            .bind(versions)
59            .bind(features)
60            .bind(values)
61            .execute(pool)
62            .await
63            .map_err(SqlError::SqlxError)
64    }
65
66    // Queries the database for all features under a service
67    // Private method that'll be used to run drift retrieval in parallel
68    async fn get_spc_features(
69        pool: &Pool<Postgres>,
70        service_info: &ServiceInfo,
71    ) -> Result<Vec<String>, SqlError> {
72        let query = Queries::GetSpcFeatures.get_query();
73
74        Ok(sqlx::query(&query.sql)
75            .bind(&service_info.name)
76            .bind(&service_info.space)
77            .bind(&service_info.version)
78            .fetch_all(pool)
79            .await
80            .map(|result| {
81                result
82                    .iter()
83                    .map(|row| row.get("feature"))
84                    .collect::<Vec<String>>()
85            })?)
86    }
87
88    /// Get SPC drift records
89    ///
90    /// # Arguments
91    ///
92    /// * `service_info` - The service to get drift records for
93    /// * `limit_datetime` - The limit datetime to get drift records for
94    /// * `features_to_monitor` - The features to monitor
95    async fn get_spc_drift_records(
96        pool: &Pool<Postgres>,
97        service_info: &ServiceInfo,
98        limit_datetime: &DateTime<Utc>,
99        features_to_monitor: &[String],
100    ) -> Result<SpcDriftFeatures, SqlError> {
101        let mut features = Self::get_spc_features(pool, service_info).await?;
102
103        if !features_to_monitor.is_empty() {
104            features.retain(|feature| features_to_monitor.contains(feature));
105        }
106
107        let query = Queries::GetSpcFeatureValues.get_query();
108
109        let records: Vec<SpcFeatureResult> = sqlx::query_as(&query.sql)
110            .bind(limit_datetime)
111            .bind(&service_info.name)
112            .bind(&service_info.space)
113            .bind(&service_info.version)
114            .bind(features)
115            .fetch_all(pool)
116            .await?;
117
118        let feature_drift = records
119            .into_iter()
120            .map(|record| {
121                let feature = SpcDriftFeature {
122                    created_at: record.created_at,
123                    values: record.values,
124                };
125                (record.feature.clone(), feature)
126            })
127            .collect::<BTreeMap<String, SpcDriftFeature>>();
128
129        Ok(SpcDriftFeatures {
130            features: feature_drift,
131        })
132    }
133
134    /// Queries the database for SPC drift records based on a time window
135    /// and aggregation.
136    ///
137    /// # Arguments
138    /// * `pool` - The database connection pool
139    /// * `params` - The drift request parameters
140    ///
141    /// # Returns
142    /// * SpcDriftFeatures
143    async fn get_records(
144        pool: &Pool<Postgres>,
145        params: &DriftRequest,
146        minutes: i32,
147    ) -> Result<SpcDriftFeatures, SqlError> {
148        let bin = params.time_interval.to_minutes() as f64 / params.max_data_points as f64;
149
150        let query = Queries::GetBinnedSpcFeatureValues.get_query();
151
152        let records: Vec<SpcFeatureResult> = sqlx::query_as(&query.sql)
153            .bind(bin)
154            .bind(minutes)
155            .bind(&params.name)
156            .bind(&params.space)
157            .bind(&params.version)
158            .fetch_all(pool)
159            .await?;
160
161        let feature_drift = records
162            .into_iter()
163            .map(|record| {
164                let feature = SpcDriftFeature {
165                    created_at: record.created_at,
166                    values: record.values,
167                };
168                (record.feature.clone(), feature)
169            })
170            .collect::<BTreeMap<String, SpcDriftFeature>>();
171
172        Ok(SpcDriftFeatures {
173            features: feature_drift,
174        })
175    }
176
177    /// Helper for merging current and archived binned spc drift records.
178    fn merge_feature_results(
179        results: SpcDriftFeatures,
180        map: &mut SpcDriftFeatures,
181    ) -> Result<(), SqlError> {
182        for (feature_name, feature) in results.features {
183            let feature_clone = feature.clone();
184            map.features
185                .entry(feature_name)
186                .and_modify(|existing| {
187                    existing.created_at.extend(feature_clone.created_at);
188                    existing.values.extend(feature_clone.values);
189                })
190                .or_insert(feature);
191        }
192
193        Ok(())
194    }
195
196    /// DataFusion implementation for getting spc drift records from archived data.
197    ///
198    /// # Arguments
199    /// * `params` - The drift request parameters
200    /// * `begin` - The start time of the time window
201    /// * `end` - The end time of the time window
202    /// * `minutes` - The number of minutes to bin the data
203    /// * `storage_settings` - The object storage settings
204    ///
205    /// # Returns
206    /// * A vector of drift records
207    async fn get_archived_records(
208        params: &DriftRequest,
209        begin: DateTime<Utc>,
210        end: DateTime<Utc>,
211        minutes: i32,
212        storage_settings: &ObjectStorageSettings,
213    ) -> Result<SpcDriftFeatures, SqlError> {
214        let path = format!("{}/{}/{}/spc", params.space, params.name, params.version);
215        let bin = minutes as f64 / params.max_data_points as f64;
216
217        let archived_df = ParquetDataFrame::new(storage_settings, &RecordType::Spc)?
218            .get_binned_metrics(
219                &path,
220                &bin,
221                &begin,
222                &end,
223                &params.space,
224                &params.name,
225                &params.version,
226            )
227            .await?;
228
229        Ok(dataframe_to_spc_drift_features(archived_df).await?)
230    }
231
232    // Queries the database for drift records based on a time window and aggregation
233    //
234    // # Arguments
235    //
236    // * `name` - The name of the service to query drift records for
237    // * `space` - The name of the space to query drift records for
238    // * `feature` - The name of the feature to query drift records for
239    // * `aggregation` - The aggregation to use for the query
240    // * `time_interval` - The time window to query drift records for
241    //
242    // # Returns
243    //
244    // * A vector of drift records
245    #[instrument(skip_all)]
246    async fn get_binned_spc_drift_records(
247        pool: &Pool<Postgres>,
248        params: &DriftRequest,
249        retention_period: &i32,
250        storage_settings: &ObjectStorageSettings,
251    ) -> Result<SpcDriftFeatures, SqlError> {
252        debug!("Getting binned SPC drift records for {:?}", params);
253
254        if !params.has_custom_interval() {
255            debug!("No custom interval provided, using default");
256            let minutes = params.time_interval.to_minutes();
257            return Self::get_records(pool, params, minutes).await;
258        }
259
260        debug!("Custom interval provided, using custom interval");
261        let interval = params.clone().to_custom_interval().unwrap();
262        let timestamps = split_custom_interval(interval.start, interval.end, retention_period)?;
263        let mut spc_feature_map = SpcDriftFeatures::default();
264
265        // get data from postgres
266        if let Some(minutes) = timestamps.current_minutes {
267            let current_results = Self::get_records(pool, params, minutes).await?;
268            Self::merge_feature_results(current_results, &mut spc_feature_map)?;
269        }
270
271        // get archived data
272        if let Some((archive_begin, archive_end)) = timestamps.archived_range {
273            if let Some(archived_minutes) = timestamps.archived_minutes {
274                let archived_results = Self::get_archived_records(
275                    params,
276                    archive_begin,
277                    archive_end,
278                    archived_minutes,
279                    storage_settings,
280                )
281                .await?;
282
283                Self::merge_feature_results(archived_results, &mut spc_feature_map)?;
284            }
285        }
286
287        Ok(spc_feature_map)
288    }
289}