scouter_sql/sql/traits/
custom.rs1use crate::sql::error::SqlError;
2use crate::sql::query::Queries;
3use crate::sql::schema::BinnedCustomMetricWrapper;
4use crate::sql::utils::split_custom_interval;
5use async_trait::async_trait;
6use chrono::{DateTime, Utc};
7use itertools::multiunzip;
8use scouter_dataframe::parquet::{dataframe_to_custom_drift_metrics, ParquetDataFrame};
9use scouter_settings::ObjectStorageSettings;
10use scouter_types::contracts::{DriftRequest, ServiceInfo};
11use scouter_types::{custom::BinnedCustomMetrics, CustomMetricServerRecord, RecordType};
12use sqlx::{postgres::PgQueryResult, Pool, Postgres, Row};
13use std::collections::HashMap;
14use tracing::{debug, instrument};
15#[async_trait]
16pub trait CustomMetricSqlLogic {
17 async fn insert_custom_metric_values_batch(
18 pool: &Pool<Postgres>,
19 records: &[CustomMetricServerRecord],
20 ) -> Result<PgQueryResult, SqlError> {
21 if records.is_empty() {
22 return Err(SqlError::EmptyBatchError);
23 }
24
25 let query = Queries::InsertCustomMetricValuesBatch.get_query();
26
27 let (created_ats, names, spaces, versions, metrics, values): (
28 Vec<DateTime<Utc>>,
29 Vec<&str>,
30 Vec<&str>,
31 Vec<&str>,
32 Vec<&str>,
33 Vec<f64>,
34 ) = multiunzip(records.iter().map(|r| {
35 (
36 r.created_at,
37 r.name.as_str(),
38 r.space.as_str(),
39 r.version.as_str(),
40 r.metric.as_str(),
41 r.value,
42 )
43 }));
44
45 sqlx::query(&query.sql)
46 .bind(created_ats)
47 .bind(names)
48 .bind(spaces)
49 .bind(versions)
50 .bind(metrics)
51 .bind(values)
52 .execute(pool)
53 .await
54 .map_err(SqlError::SqlxError)
55 }
56
57 async fn get_custom_metric_values(
58 pool: &Pool<Postgres>,
59 service_info: &ServiceInfo,
60 limit_datetime: &DateTime<Utc>,
61 metrics: &[String],
62 ) -> Result<HashMap<String, f64>, SqlError> {
63 let query = Queries::GetCustomMetricValues.get_query();
64
65 let records = sqlx::query(&query.sql)
66 .bind(&service_info.name)
67 .bind(&service_info.space)
68 .bind(&service_info.version)
69 .bind(limit_datetime)
70 .bind(metrics)
71 .fetch_all(pool)
72 .await
73 .map_err(SqlError::SqlxError)?;
74
75 let metric_map = records
76 .into_iter()
77 .map(|row| {
78 let metric = row.get("metric");
79 let value = row.get("value");
80 (metric, value)
81 })
82 .collect();
83
84 Ok(metric_map)
85 }
86
87 #[instrument(skip_all)]
97 async fn get_records(
98 pool: &Pool<Postgres>,
99 params: &DriftRequest,
100 minutes: i32,
101 ) -> Result<BinnedCustomMetrics, SqlError> {
102 let bin = params.time_interval.to_minutes() as f64 / params.max_data_points as f64;
103
104 let query = Queries::GetBinnedCustomMetricValues.get_query();
105
106 let records: Vec<BinnedCustomMetricWrapper> = sqlx::query_as(&query.sql)
107 .bind(bin)
108 .bind(minutes)
109 .bind(¶ms.name)
110 .bind(¶ms.space)
111 .bind(¶ms.version)
112 .fetch_all(pool)
113 .await
114 .map_err(SqlError::SqlxError)?;
115
116 Ok(BinnedCustomMetrics::from_vec(
117 records.into_iter().map(|wrapper| wrapper.0).collect(),
118 ))
119 }
120
121 fn merge_feature_results(
123 results: BinnedCustomMetrics,
124 map: &mut BinnedCustomMetrics,
125 ) -> Result<(), SqlError> {
126 for (name, metric) in results.metrics {
127 let metric_clone = metric.clone();
128 map.metrics
129 .entry(name)
130 .and_modify(|existing| {
131 existing.created_at.extend(metric_clone.created_at);
132 existing.stats.extend(metric_clone.stats);
133 })
134 .or_insert(metric);
135 }
136
137 Ok(())
138 }
139
140 #[instrument(skip_all)]
152 async fn get_archived_records(
153 params: &DriftRequest,
154 begin: DateTime<Utc>,
155 end: DateTime<Utc>,
156 minutes: i32,
157 storage_settings: &ObjectStorageSettings,
158 ) -> Result<BinnedCustomMetrics, SqlError> {
159 let path = format!("{}/{}/{}/custom", params.space, params.name, params.version);
160 let bin = minutes as f64 / params.max_data_points as f64;
161 let archived_df = ParquetDataFrame::new(storage_settings, &RecordType::Custom)?
162 .get_binned_metrics(
163 &path,
164 &bin,
165 &begin,
166 &end,
167 ¶ms.space,
168 ¶ms.name,
169 ¶ms.version,
170 )
171 .await?;
172
173 Ok(dataframe_to_custom_drift_metrics(archived_df).await?)
174 }
175
176 #[instrument(skip_all)]
186 async fn get_binned_custom_drift_records(
187 pool: &Pool<Postgres>,
188 params: &DriftRequest,
189 retention_period: &i32,
190 storage_settings: &ObjectStorageSettings,
191 ) -> Result<BinnedCustomMetrics, SqlError> {
192 debug!("Getting binned Custom drift records for {:?}", params);
193
194 if !params.has_custom_interval() {
195 debug!("No custom interval provided, using default");
196 let minutes = params.time_interval.to_minutes();
197 return Self::get_records(pool, params, minutes).await;
198 }
199
200 debug!("Custom interval provided, using custom interval");
201 let interval = params.clone().to_custom_interval().unwrap();
202 let timestamps = split_custom_interval(interval.start, interval.end, retention_period)?;
203 let mut custom_metric_map = BinnedCustomMetrics::default();
204
205 if let Some(minutes) = timestamps.current_minutes {
207 let current_results = Self::get_records(pool, params, minutes).await?;
208 Self::merge_feature_results(current_results, &mut custom_metric_map)?;
209 }
210
211 if let Some((archive_begin, archive_end)) = timestamps.archived_range {
214 if let Some(archived_minutes) = timestamps.archived_minutes {
215 let archived_results = Self::get_archived_records(
216 params,
217 archive_begin,
218 archive_end,
219 archived_minutes,
220 storage_settings,
221 )
222 .await?;
223 Self::merge_feature_results(archived_results, &mut custom_metric_map)?;
224 }
225 }
226
227 Ok(custom_metric_map)
228 }
229}