1use crate::sql::error::SqlError;
2use crate::sql::query::Queries;
3use crate::sql::schema::SpcFeatureResult;
4use crate::sql::utils::split_custom_interval;
5use async_trait::async_trait;
6use chrono::{DateTime, Utc};
7use itertools::multiunzip;
8use scouter_dataframe::parquet::{dataframe_to_spc_drift_features, ParquetDataFrame};
9use scouter_settings::ObjectStorageSettings;
10use scouter_types::{
11 spc::{SpcDriftFeature, SpcDriftFeatures},
12 DriftRequest, RecordType, SpcRecord,
13};
14use sqlx::{postgres::PgQueryResult, Pool, Postgres, Row};
15use std::collections::BTreeMap;
16use tracing::{debug, error, instrument};
17
18#[async_trait]
19pub trait SpcSqlLogic {
20 async fn insert_spc_drift_records_batch(
28 pool: &Pool<Postgres>,
29 records: &[SpcRecord],
30 entity_id: &i32,
31 ) -> Result<PgQueryResult, SqlError> {
32 if records.is_empty() {
33 return Err(SqlError::EmptyBatchError);
34 }
35
36 let query = Queries::InsertSpcDriftRecordBatch.get_query();
37
38 let (created_ats, entity_ids, features, values): (
39 Vec<DateTime<Utc>>,
40 Vec<i32>,
41 Vec<&str>,
42 Vec<f64>,
43 ) = multiunzip(
44 records
45 .iter()
46 .map(|r| (r.created_at, entity_id, r.feature.as_str(), r.value)),
47 );
48
49 sqlx::query(query)
50 .bind(created_ats)
51 .bind(entity_ids)
52 .bind(features)
53 .bind(values)
54 .execute(pool)
55 .await
56 .map_err(SqlError::SqlxError)
57 }
58
59 async fn get_spc_features(
66 pool: &Pool<Postgres>,
67 entity_id: &i32,
68 ) -> Result<Vec<String>, SqlError> {
69 let query = Queries::GetSpcFeatures.get_query();
70
71 Ok(sqlx::query(query)
72 .bind(entity_id)
73 .fetch_all(pool)
74 .await
75 .inspect_err(|e| {
76 error!("Error fetching SPC features: {:?}", e);
77 })
78 .map(|result| {
79 result
80 .iter()
81 .map(|row| row.get("feature"))
82 .collect::<Vec<String>>()
83 })?)
84 }
85
86 async fn get_spc_drift_records(
93 pool: &Pool<Postgres>,
94 limit_datetime: &DateTime<Utc>,
95 features_to_monitor: &[String],
96 entity_id: &i32,
97 ) -> Result<SpcDriftFeatures, SqlError> {
98 let mut features = Self::get_spc_features(pool, entity_id).await?;
99
100 if !features_to_monitor.is_empty() {
101 features.retain(|feature| features_to_monitor.contains(feature));
102 }
103
104 let query = Queries::GetSpcFeatureValues.get_query();
105
106 let records: Vec<SpcFeatureResult> = sqlx::query_as(query)
107 .bind(limit_datetime)
108 .bind(entity_id)
109 .bind(features)
110 .fetch_all(pool)
111 .await
112 .inspect_err(|e| {
113 error!("Error fetching SPC drift records: {:?}", e);
114 })?;
115
116 let feature_drift = records
117 .into_iter()
118 .map(|record| {
119 let feature = SpcDriftFeature {
120 created_at: record.created_at,
121 values: record.values,
122 };
123 (record.feature.clone(), feature)
124 })
125 .collect::<BTreeMap<String, SpcDriftFeature>>();
126
127 Ok(SpcDriftFeatures {
128 features: feature_drift,
129 })
130 }
131
132 async fn get_records(
144 pool: &Pool<Postgres>,
145 params: &DriftRequest,
146 start_dt: DateTime<Utc>,
147 end_dt: DateTime<Utc>,
148 entity_id: &i32,
149 ) -> Result<SpcDriftFeatures, SqlError> {
150 let minutes = end_dt.signed_duration_since(start_dt).num_minutes() as f64;
151 let bin = minutes / params.max_data_points as f64;
152
153 let query = Queries::GetBinnedSpcFeatureValues.get_query();
154
155 let records: Vec<SpcFeatureResult> = sqlx::query_as(query)
156 .bind(bin)
157 .bind(start_dt)
158 .bind(end_dt)
159 .bind(entity_id)
160 .fetch_all(pool)
161 .await?;
162
163 let feature_drift = records
164 .into_iter()
165 .map(|record| {
166 let feature = SpcDriftFeature {
167 created_at: record.created_at,
168 values: record.values,
169 };
170 (record.feature.clone(), feature)
171 })
172 .collect::<BTreeMap<String, SpcDriftFeature>>();
173
174 Ok(SpcDriftFeatures {
175 features: feature_drift,
176 })
177 }
178
179 fn merge_feature_results(
181 results: SpcDriftFeatures,
182 map: &mut SpcDriftFeatures,
183 ) -> Result<(), SqlError> {
184 for (feature_name, feature) in results.features {
185 let feature_clone = feature.clone();
186 map.features
187 .entry(feature_name)
188 .and_modify(|existing| {
189 existing.created_at.extend(feature_clone.created_at);
190 existing.values.extend(feature_clone.values);
191 })
192 .or_insert(feature);
193 }
194
195 Ok(())
196 }
197
198 async fn get_archived_records(
211 params: &DriftRequest,
212 begin: DateTime<Utc>,
213 end: DateTime<Utc>,
214 minutes: i32,
215 storage_settings: &ObjectStorageSettings,
216 entity_id: &i32,
217 ) -> Result<SpcDriftFeatures, SqlError> {
218 let path = format!("{}/spc", params.uid);
219 let bin = minutes as f64 / params.max_data_points as f64;
220
221 let archived_df = ParquetDataFrame::new(storage_settings, &RecordType::Spc)?
222 .get_binned_metrics(&path, &bin, &begin, &end, entity_id)
223 .await?;
224
225 Ok(dataframe_to_spc_drift_features(archived_df).await?)
226 }
227
228 #[instrument(skip_all)]
242 async fn get_binned_spc_drift_records(
243 pool: &Pool<Postgres>,
244 params: &DriftRequest,
245 retention_period: &i32,
246 storage_settings: &ObjectStorageSettings,
247 entity_id: &i32,
248 ) -> Result<SpcDriftFeatures, SqlError> {
249 debug!("Getting binned SPC drift records for {:?}", params);
250
251 if !params.has_custom_interval() {
252 debug!("No custom interval provided, using default");
253 let (begin_utc, end_utc) = params.time_interval.to_begin_end_times()?;
254 return Self::get_records(pool, params, begin_utc, end_utc, entity_id).await;
255 }
256
257 debug!("Custom interval provided, using custom interval");
258 let interval = params.clone().to_custom_interval().unwrap();
259 let timestamps = split_custom_interval(interval.begin, interval.end, retention_period)?;
260 let mut spc_feature_map = SpcDriftFeatures::default();
261
262 if let Some((active_begin, active_end)) = timestamps.active_range {
264 let current_results =
265 Self::get_records(pool, params, active_begin, active_end, entity_id).await?;
266 Self::merge_feature_results(current_results, &mut spc_feature_map)?;
267 }
268
269 if let Some((archive_begin, archive_end)) = timestamps.archived_range {
271 if let Some(archived_minutes) = timestamps.archived_minutes {
272 let archived_results = Self::get_archived_records(
273 params,
274 archive_begin,
275 archive_end,
276 archived_minutes,
277 storage_settings,
278 entity_id,
279 )
280 .await?;
281
282 Self::merge_feature_results(archived_results, &mut spc_feature_map)?;
283 }
284 }
285
286 Ok(spc_feature_map)
287 }
288}