scouter_sql/sql/traits/
custom.rs1use crate::sql::error::SqlError;
2use crate::sql::query::Queries;
3use crate::sql::schema::BinnedMetricWrapper;
4use crate::sql::utils::split_custom_interval;
5use async_trait::async_trait;
6use chrono::{DateTime, Utc};
7use itertools::multiunzip;
8use scouter_dataframe::parquet::BinnedMetricsExtractor;
9use scouter_dataframe::parquet::ParquetDataFrame;
10use scouter_settings::ObjectStorageSettings;
11use scouter_types::contracts::{DriftRequest, ServiceInfo};
12use scouter_types::{BinnedMetrics, CustomMetricServerRecord, RecordType};
13use sqlx::{postgres::PgQueryResult, Pool, Postgres, Row};
14use std::collections::HashMap;
15use tracing::{debug, instrument};
16
17#[async_trait]
18pub trait CustomMetricSqlLogic {
19 async fn insert_custom_metric_values_batch(
20 pool: &Pool<Postgres>,
21 records: &[CustomMetricServerRecord],
22 ) -> Result<PgQueryResult, SqlError> {
23 if records.is_empty() {
24 return Err(SqlError::EmptyBatchError);
25 }
26
27 let query = Queries::InsertCustomMetricValuesBatch.get_query();
28
29 let (created_ats, names, spaces, versions, metrics, values): (
30 Vec<DateTime<Utc>>,
31 Vec<&str>,
32 Vec<&str>,
33 Vec<&str>,
34 Vec<&str>,
35 Vec<f64>,
36 ) = multiunzip(records.iter().map(|r| {
37 (
38 r.created_at,
39 r.name.as_str(),
40 r.space.as_str(),
41 r.version.as_str(),
42 r.metric.as_str(),
43 r.value,
44 )
45 }));
46
47 sqlx::query(&query.sql)
48 .bind(created_ats)
49 .bind(names)
50 .bind(spaces)
51 .bind(versions)
52 .bind(metrics)
53 .bind(values)
54 .execute(pool)
55 .await
56 .map_err(SqlError::SqlxError)
57 }
58
59 async fn get_custom_metric_values(
60 pool: &Pool<Postgres>,
61 service_info: &ServiceInfo,
62 limit_datetime: &DateTime<Utc>,
63 metrics: &[String],
64 ) -> Result<HashMap<String, f64>, SqlError> {
65 let query = Queries::GetCustomMetricValues.get_query();
66
67 let records = sqlx::query(&query.sql)
68 .bind(&service_info.name)
69 .bind(&service_info.space)
70 .bind(&service_info.version)
71 .bind(limit_datetime)
72 .bind(metrics)
73 .fetch_all(pool)
74 .await
75 .map_err(SqlError::SqlxError)?;
76
77 let metric_map = records
78 .into_iter()
79 .map(|row| {
80 let metric = row.get("metric");
81 let value = row.get("value");
82 (metric, value)
83 })
84 .collect();
85
86 Ok(metric_map)
87 }
88
89 #[instrument(skip_all)]
99 async fn get_records(
100 pool: &Pool<Postgres>,
101 params: &DriftRequest,
102 minutes: i32,
103 ) -> Result<BinnedMetrics, SqlError> {
104 let bin = params.time_interval.to_minutes() as f64 / params.max_data_points as f64;
105
106 let query = Queries::GetBinnedMetricValues.get_query();
107
108 let records: Vec<BinnedMetricWrapper> = sqlx::query_as(&query.sql)
109 .bind(bin)
110 .bind(minutes)
111 .bind(¶ms.name)
112 .bind(¶ms.space)
113 .bind(¶ms.version)
114 .fetch_all(pool)
115 .await
116 .map_err(SqlError::SqlxError)?;
117
118 Ok(BinnedMetrics::from_vec(
119 records.into_iter().map(|wrapper| wrapper.0).collect(),
120 ))
121 }
122
123 fn merge_feature_results(
125 results: BinnedMetrics,
126 map: &mut BinnedMetrics,
127 ) -> Result<(), SqlError> {
128 for (name, metric) in results.metrics {
129 let metric_clone = metric.clone();
130 map.metrics
131 .entry(name)
132 .and_modify(|existing| {
133 existing.created_at.extend(metric_clone.created_at);
134 existing.stats.extend(metric_clone.stats);
135 })
136 .or_insert(metric);
137 }
138
139 Ok(())
140 }
141
142 #[instrument(skip_all)]
154 async fn get_archived_records(
155 params: &DriftRequest,
156 begin: DateTime<Utc>,
157 end: DateTime<Utc>,
158 minutes: i32,
159 storage_settings: &ObjectStorageSettings,
160 ) -> Result<BinnedMetrics, SqlError> {
161 let path = format!("{}/{}/{}/custom", params.space, params.name, params.version);
162 let bin = minutes as f64 / params.max_data_points as f64;
163 let archived_df = ParquetDataFrame::new(storage_settings, &RecordType::Custom)?
164 .get_binned_metrics(
165 &path,
166 &bin,
167 &begin,
168 &end,
169 ¶ms.space,
170 ¶ms.name,
171 ¶ms.version,
172 )
173 .await?;
174
175 Ok(BinnedMetricsExtractor::dataframe_to_binned_metrics(archived_df).await?)
176 }
177
178 #[instrument(skip_all)]
188 async fn get_binned_custom_drift_records(
189 pool: &Pool<Postgres>,
190 params: &DriftRequest,
191 retention_period: &i32,
192 storage_settings: &ObjectStorageSettings,
193 ) -> Result<BinnedMetrics, SqlError> {
194 debug!("Getting binned Custom drift records for {:?}", params);
195
196 if !params.has_custom_interval() {
197 debug!("No custom interval provided, using default");
198 let minutes = params.time_interval.to_minutes();
199 return Self::get_records(pool, params, minutes).await;
200 }
201
202 debug!("Custom interval provided, using custom interval");
203 let interval = params.clone().to_custom_interval().unwrap();
204 let timestamps = split_custom_interval(interval.start, interval.end, retention_period)?;
205 let mut custom_metric_map = BinnedMetrics::default();
206
207 if let Some(minutes) = timestamps.current_minutes {
209 let current_results = Self::get_records(pool, params, minutes).await?;
210 Self::merge_feature_results(current_results, &mut custom_metric_map)?;
211 }
212
213 if let Some((archive_begin, archive_end)) = timestamps.archived_range {
216 if let Some(archived_minutes) = timestamps.archived_minutes {
217 let archived_results = Self::get_archived_records(
218 params,
219 archive_begin,
220 archive_end,
221 archived_minutes,
222 storage_settings,
223 )
224 .await?;
225 Self::merge_feature_results(archived_results, &mut custom_metric_map)?;
226 }
227 }
228
229 Ok(custom_metric_map)
230 }
231}