1use crate::sql::error::SqlError;
2use crate::sql::query::Queries;
3use crate::sql::schema::SpcFeatureResult;
4use crate::sql::utils::split_custom_interval;
5use async_trait::async_trait;
6use chrono::{DateTime, Utc};
7use itertools::multiunzip;
8use scouter_dataframe::parquet::{dataframe_to_spc_drift_features, ParquetDataFrame};
9use scouter_settings::ObjectStorageSettings;
10use scouter_types::{
11 spc::{SpcDriftFeature, SpcDriftFeatures},
12 DriftRequest, RecordType, ServiceInfo, SpcServerRecord,
13};
14use sqlx::{postgres::PgQueryResult, Pool, Postgres, Row};
15use std::collections::BTreeMap;
16use tracing::{debug, instrument};
17
18#[async_trait]
19pub trait SpcSqlLogic {
20 async fn insert_spc_drift_records_batch(
27 pool: &Pool<Postgres>,
28 records: &[SpcServerRecord],
29 ) -> Result<PgQueryResult, SqlError> {
30 if records.is_empty() {
31 return Err(SqlError::EmptyBatchError);
32 }
33
34 let query = Queries::InsertSpcDriftRecordBatch.get_query();
35
36 let (created_ats, names, spaces, versions, features, values): (
37 Vec<DateTime<Utc>>,
38 Vec<&str>,
39 Vec<&str>,
40 Vec<&str>,
41 Vec<&str>,
42 Vec<f64>,
43 ) = multiunzip(records.iter().map(|r| {
44 (
45 r.created_at,
46 r.name.as_str(),
47 r.space.as_str(),
48 r.version.as_str(),
49 r.feature.as_str(),
50 r.value,
51 )
52 }));
53
54 sqlx::query(&query.sql)
55 .bind(created_ats)
56 .bind(names)
57 .bind(spaces)
58 .bind(versions)
59 .bind(features)
60 .bind(values)
61 .execute(pool)
62 .await
63 .map_err(SqlError::SqlxError)
64 }
65
66 async fn get_spc_features(
69 pool: &Pool<Postgres>,
70 service_info: &ServiceInfo,
71 ) -> Result<Vec<String>, SqlError> {
72 let query = Queries::GetSpcFeatures.get_query();
73
74 Ok(sqlx::query(&query.sql)
75 .bind(&service_info.name)
76 .bind(&service_info.space)
77 .bind(&service_info.version)
78 .fetch_all(pool)
79 .await
80 .map(|result| {
81 result
82 .iter()
83 .map(|row| row.get("feature"))
84 .collect::<Vec<String>>()
85 })?)
86 }
87
88 async fn get_spc_drift_records(
96 pool: &Pool<Postgres>,
97 service_info: &ServiceInfo,
98 limit_datetime: &DateTime<Utc>,
99 features_to_monitor: &[String],
100 ) -> Result<SpcDriftFeatures, SqlError> {
101 let mut features = Self::get_spc_features(pool, service_info).await?;
102
103 if !features_to_monitor.is_empty() {
104 features.retain(|feature| features_to_monitor.contains(feature));
105 }
106
107 let query = Queries::GetSpcFeatureValues.get_query();
108
109 let records: Vec<SpcFeatureResult> = sqlx::query_as(&query.sql)
110 .bind(limit_datetime)
111 .bind(&service_info.name)
112 .bind(&service_info.space)
113 .bind(&service_info.version)
114 .bind(features)
115 .fetch_all(pool)
116 .await?;
117
118 let feature_drift = records
119 .into_iter()
120 .map(|record| {
121 let feature = SpcDriftFeature {
122 created_at: record.created_at,
123 values: record.values,
124 };
125 (record.feature.clone(), feature)
126 })
127 .collect::<BTreeMap<String, SpcDriftFeature>>();
128
129 Ok(SpcDriftFeatures {
130 features: feature_drift,
131 })
132 }
133
134 async fn get_records(
144 pool: &Pool<Postgres>,
145 params: &DriftRequest,
146 minutes: i32,
147 ) -> Result<SpcDriftFeatures, SqlError> {
148 let bin = params.time_interval.to_minutes() as f64 / params.max_data_points as f64;
149
150 let query = Queries::GetBinnedSpcFeatureValues.get_query();
151
152 let records: Vec<SpcFeatureResult> = sqlx::query_as(&query.sql)
153 .bind(bin)
154 .bind(minutes)
155 .bind(¶ms.name)
156 .bind(¶ms.space)
157 .bind(¶ms.version)
158 .fetch_all(pool)
159 .await?;
160
161 let feature_drift = records
162 .into_iter()
163 .map(|record| {
164 let feature = SpcDriftFeature {
165 created_at: record.created_at,
166 values: record.values,
167 };
168 (record.feature.clone(), feature)
169 })
170 .collect::<BTreeMap<String, SpcDriftFeature>>();
171
172 Ok(SpcDriftFeatures {
173 features: feature_drift,
174 })
175 }
176
177 fn merge_feature_results(
179 results: SpcDriftFeatures,
180 map: &mut SpcDriftFeatures,
181 ) -> Result<(), SqlError> {
182 for (feature_name, feature) in results.features {
183 let feature_clone = feature.clone();
184 map.features
185 .entry(feature_name)
186 .and_modify(|existing| {
187 existing.created_at.extend(feature_clone.created_at);
188 existing.values.extend(feature_clone.values);
189 })
190 .or_insert(feature);
191 }
192
193 Ok(())
194 }
195
196 async fn get_archived_records(
208 params: &DriftRequest,
209 begin: DateTime<Utc>,
210 end: DateTime<Utc>,
211 minutes: i32,
212 storage_settings: &ObjectStorageSettings,
213 ) -> Result<SpcDriftFeatures, SqlError> {
214 let path = format!("{}/{}/{}/spc", params.space, params.name, params.version);
215 let bin = minutes as f64 / params.max_data_points as f64;
216
217 let archived_df = ParquetDataFrame::new(storage_settings, &RecordType::Spc)?
218 .get_binned_metrics(
219 &path,
220 &bin,
221 &begin,
222 &end,
223 ¶ms.space,
224 ¶ms.name,
225 ¶ms.version,
226 )
227 .await?;
228
229 Ok(dataframe_to_spc_drift_features(archived_df).await?)
230 }
231
232 #[instrument(skip_all)]
246 async fn get_binned_spc_drift_records(
247 pool: &Pool<Postgres>,
248 params: &DriftRequest,
249 retention_period: &i32,
250 storage_settings: &ObjectStorageSettings,
251 ) -> Result<SpcDriftFeatures, SqlError> {
252 debug!("Getting binned SPC drift records for {:?}", params);
253
254 if !params.has_custom_interval() {
255 debug!("No custom interval provided, using default");
256 let minutes = params.time_interval.to_minutes();
257 return Self::get_records(pool, params, minutes).await;
258 }
259
260 debug!("Custom interval provided, using custom interval");
261 let interval = params.clone().to_custom_interval().unwrap();
262 let timestamps = split_custom_interval(interval.start, interval.end, retention_period)?;
263 let mut spc_feature_map = SpcDriftFeatures::default();
264
265 if let Some(minutes) = timestamps.current_minutes {
267 let current_results = Self::get_records(pool, params, minutes).await?;
268 Self::merge_feature_results(current_results, &mut spc_feature_map)?;
269 }
270
271 if let Some((archive_begin, archive_end)) = timestamps.archived_range {
273 if let Some(archived_minutes) = timestamps.archived_minutes {
274 let archived_results = Self::get_archived_records(
275 params,
276 archive_begin,
277 archive_end,
278 archived_minutes,
279 storage_settings,
280 )
281 .await?;
282
283 Self::merge_feature_results(archived_results, &mut spc_feature_map)?;
284 }
285 }
286
287 Ok(spc_feature_map)
288 }
289}