scouter_sql/sql/traits/
spc.rs1use crate::sql::error::SqlError;
2use crate::sql::query::Queries;
3use crate::sql::schema::SpcFeatureResult;
4use crate::sql::utils::split_custom_interval;
5use async_trait::async_trait;
6use chrono::{DateTime, Utc};
7use scouter_dataframe::parquet::{dataframe_to_spc_drift_features, ParquetDataFrame};
8use scouter_settings::ObjectStorageSettings;
9use scouter_types::{
10 spc::{SpcDriftFeature, SpcDriftFeatures},
11 DriftRequest, RecordType, ServiceInfo, SpcServerRecord,
12};
13
14use sqlx::{postgres::PgQueryResult, Pool, Postgres, Row};
15use std::collections::BTreeMap;
16use tracing::{debug, instrument};
17
18#[async_trait]
19pub trait SpcSqlLogic {
20 async fn insert_spc_drift_record(
28 pool: &Pool<Postgres>,
29 record: &SpcServerRecord,
30 ) -> Result<PgQueryResult, SqlError> {
31 let query = Queries::InsertDriftRecord.get_query();
32
33 Ok(sqlx::query(&query.sql)
34 .bind(record.created_at)
35 .bind(&record.name)
36 .bind(&record.space)
37 .bind(&record.version)
38 .bind(&record.feature)
39 .bind(record.value)
40 .execute(pool)
41 .await?)
42 }
43
44 async fn get_spc_features(
47 pool: &Pool<Postgres>,
48 service_info: &ServiceInfo,
49 ) -> Result<Vec<String>, SqlError> {
50 let query = Queries::GetSpcFeatures.get_query();
51
52 Ok(sqlx::query(&query.sql)
53 .bind(&service_info.name)
54 .bind(&service_info.space)
55 .bind(&service_info.version)
56 .fetch_all(pool)
57 .await
58 .map(|result| {
59 result
60 .iter()
61 .map(|row| row.get("feature"))
62 .collect::<Vec<String>>()
63 })?)
64 }
65
66 async fn get_spc_drift_records(
74 pool: &Pool<Postgres>,
75 service_info: &ServiceInfo,
76 limit_datetime: &DateTime<Utc>,
77 features_to_monitor: &[String],
78 ) -> Result<SpcDriftFeatures, SqlError> {
79 let mut features = Self::get_spc_features(pool, service_info).await?;
80
81 if !features_to_monitor.is_empty() {
82 features.retain(|feature| features_to_monitor.contains(feature));
83 }
84
85 let query = Queries::GetSpcFeatureValues.get_query();
86
87 let records: Vec<SpcFeatureResult> = sqlx::query_as(&query.sql)
88 .bind(limit_datetime)
89 .bind(&service_info.name)
90 .bind(&service_info.space)
91 .bind(&service_info.version)
92 .bind(features)
93 .fetch_all(pool)
94 .await?;
95
96 let feature_drift = records
97 .into_iter()
98 .map(|record| {
99 let feature = SpcDriftFeature {
100 created_at: record.created_at,
101 values: record.values,
102 };
103 (record.feature.clone(), feature)
104 })
105 .collect::<BTreeMap<String, SpcDriftFeature>>();
106
107 Ok(SpcDriftFeatures {
108 features: feature_drift,
109 })
110 }
111
112 async fn get_records(
122 pool: &Pool<Postgres>,
123 params: &DriftRequest,
124 minutes: i32,
125 ) -> Result<SpcDriftFeatures, SqlError> {
126 let bin = params.time_interval.to_minutes() as f64 / params.max_data_points as f64;
127
128 let query = Queries::GetBinnedSpcFeatureValues.get_query();
129
130 let records: Vec<SpcFeatureResult> = sqlx::query_as(&query.sql)
131 .bind(bin)
132 .bind(minutes)
133 .bind(¶ms.name)
134 .bind(¶ms.space)
135 .bind(¶ms.version)
136 .fetch_all(pool)
137 .await?;
138
139 let feature_drift = records
140 .into_iter()
141 .map(|record| {
142 let feature = SpcDriftFeature {
143 created_at: record.created_at,
144 values: record.values,
145 };
146 (record.feature.clone(), feature)
147 })
148 .collect::<BTreeMap<String, SpcDriftFeature>>();
149
150 Ok(SpcDriftFeatures {
151 features: feature_drift,
152 })
153 }
154
155 fn merge_feature_results(
157 results: SpcDriftFeatures,
158 map: &mut SpcDriftFeatures,
159 ) -> Result<(), SqlError> {
160 for (feature_name, feature) in results.features {
161 let feature_clone = feature.clone();
162 map.features
163 .entry(feature_name)
164 .and_modify(|existing| {
165 existing.created_at.extend(feature_clone.created_at);
166 existing.values.extend(feature_clone.values);
167 })
168 .or_insert(feature);
169 }
170
171 Ok(())
172 }
173
174 async fn get_archived_records(
186 params: &DriftRequest,
187 begin: DateTime<Utc>,
188 end: DateTime<Utc>,
189 minutes: i32,
190 storage_settings: &ObjectStorageSettings,
191 ) -> Result<SpcDriftFeatures, SqlError> {
192 let path = format!("{}/{}/{}/spc", params.space, params.name, params.version);
193 let bin = minutes as f64 / params.max_data_points as f64;
194
195 let archived_df = ParquetDataFrame::new(storage_settings, &RecordType::Spc)?
196 .get_binned_metrics(
197 &path,
198 &bin,
199 &begin,
200 &end,
201 ¶ms.space,
202 ¶ms.name,
203 ¶ms.version,
204 )
205 .await?;
206
207 Ok(dataframe_to_spc_drift_features(archived_df).await?)
208 }
209
210 #[instrument(skip_all)]
224 async fn get_binned_spc_drift_records(
225 pool: &Pool<Postgres>,
226 params: &DriftRequest,
227 retention_period: &i32,
228 storage_settings: &ObjectStorageSettings,
229 ) -> Result<SpcDriftFeatures, SqlError> {
230 debug!("Getting binned SPC drift records for {:?}", params);
231
232 if !params.has_custom_interval() {
233 debug!("No custom interval provided, using default");
234 let minutes = params.time_interval.to_minutes();
235 return Self::get_records(pool, params, minutes).await;
236 }
237
238 debug!("Custom interval provided, using custom interval");
239 let interval = params.clone().to_custom_interval().unwrap();
240 let timestamps = split_custom_interval(interval.start, interval.end, retention_period)?;
241 let mut spc_feature_map = SpcDriftFeatures::default();
242
243 if let Some(minutes) = timestamps.current_minutes {
245 let current_results = Self::get_records(pool, params, minutes).await?;
246 Self::merge_feature_results(current_results, &mut spc_feature_map)?;
247 }
248
249 if let Some((archive_begin, archive_end)) = timestamps.archived_range {
251 if let Some(archived_minutes) = timestamps.archived_minutes {
252 let archived_results = Self::get_archived_records(
253 params,
254 archive_begin,
255 archive_end,
256 archived_minutes,
257 storage_settings,
258 )
259 .await?;
260
261 Self::merge_feature_results(archived_results, &mut spc_feature_map)?;
262 }
263 }
264
265 Ok(spc_feature_map)
266 }
267}