scouter_sql/sql/traits/
custom.rs1use crate::sql::error::SqlError;
2use crate::sql::query::Queries;
3use crate::sql::schema::BinnedCustomMetricWrapper;
4use crate::sql::utils::split_custom_interval;
5use async_trait::async_trait;
6use chrono::{DateTime, Utc};
7use scouter_dataframe::parquet::{dataframe_to_custom_drift_metrics, ParquetDataFrame};
8use scouter_settings::ObjectStorageSettings;
9use scouter_types::contracts::{DriftRequest, ServiceInfo};
10use scouter_types::{custom::BinnedCustomMetrics, CustomMetricServerRecord, RecordType};
11use sqlx::{postgres::PgQueryResult, Pool, Postgres, Row};
12use std::collections::HashMap;
13use tracing::{debug, instrument};
14
15#[async_trait]
16pub trait CustomMetricSqlLogic {
17 async fn insert_custom_metric_value(
19 pool: &Pool<Postgres>,
20 record: &CustomMetricServerRecord,
21 ) -> Result<PgQueryResult, SqlError> {
22 let query = Queries::InsertCustomMetricValues.get_query();
23
24 sqlx::query(&query.sql)
25 .bind(record.created_at)
26 .bind(&record.name)
27 .bind(&record.space)
28 .bind(&record.version)
29 .bind(&record.metric)
30 .bind(record.value)
31 .execute(pool)
32 .await
33 .map_err(SqlError::SqlxError)
34 }
35
36 async fn get_custom_metric_values(
37 pool: &Pool<Postgres>,
38 service_info: &ServiceInfo,
39 limit_datetime: &DateTime<Utc>,
40 metrics: &[String],
41 ) -> Result<HashMap<String, f64>, SqlError> {
42 let query = Queries::GetCustomMetricValues.get_query();
43
44 let records = sqlx::query(&query.sql)
45 .bind(&service_info.name)
46 .bind(&service_info.space)
47 .bind(&service_info.version)
48 .bind(limit_datetime)
49 .bind(metrics)
50 .fetch_all(pool)
51 .await
52 .map_err(SqlError::SqlxError)?;
53
54 let metric_map = records
55 .into_iter()
56 .map(|row| {
57 let metric = row.get("metric");
58 let value = row.get("value");
59 (metric, value)
60 })
61 .collect();
62
63 Ok(metric_map)
64 }
65
66 #[instrument(skip_all)]
76 async fn get_records(
77 pool: &Pool<Postgres>,
78 params: &DriftRequest,
79 minutes: i32,
80 ) -> Result<BinnedCustomMetrics, SqlError> {
81 let bin = params.time_interval.to_minutes() as f64 / params.max_data_points as f64;
82
83 let query = Queries::GetBinnedCustomMetricValues.get_query();
84
85 let records: Vec<BinnedCustomMetricWrapper> = sqlx::query_as(&query.sql)
86 .bind(bin)
87 .bind(minutes)
88 .bind(¶ms.name)
89 .bind(¶ms.space)
90 .bind(¶ms.version)
91 .fetch_all(pool)
92 .await
93 .map_err(SqlError::SqlxError)?;
94
95 Ok(BinnedCustomMetrics::from_vec(
96 records.into_iter().map(|wrapper| wrapper.0).collect(),
97 ))
98 }
99
100 fn merge_feature_results(
102 results: BinnedCustomMetrics,
103 map: &mut BinnedCustomMetrics,
104 ) -> Result<(), SqlError> {
105 for (name, metric) in results.metrics {
106 let metric_clone = metric.clone();
107 map.metrics
108 .entry(name)
109 .and_modify(|existing| {
110 existing.created_at.extend(metric_clone.created_at);
111 existing.stats.extend(metric_clone.stats);
112 })
113 .or_insert(metric);
114 }
115
116 Ok(())
117 }
118
119 #[instrument(skip_all)]
131 async fn get_archived_records(
132 params: &DriftRequest,
133 begin: DateTime<Utc>,
134 end: DateTime<Utc>,
135 minutes: i32,
136 storage_settings: &ObjectStorageSettings,
137 ) -> Result<BinnedCustomMetrics, SqlError> {
138 let path = format!("{}/{}/{}/custom", params.space, params.name, params.version);
139 let bin = minutes as f64 / params.max_data_points as f64;
140 let archived_df = ParquetDataFrame::new(storage_settings, &RecordType::Custom)?
141 .get_binned_metrics(
142 &path,
143 &bin,
144 &begin,
145 &end,
146 ¶ms.space,
147 ¶ms.name,
148 ¶ms.version,
149 )
150 .await?;
151
152 Ok(dataframe_to_custom_drift_metrics(archived_df).await?)
153 }
154
155 #[instrument(skip_all)]
165 async fn get_binned_custom_drift_records(
166 pool: &Pool<Postgres>,
167 params: &DriftRequest,
168 retention_period: &i32,
169 storage_settings: &ObjectStorageSettings,
170 ) -> Result<BinnedCustomMetrics, SqlError> {
171 debug!("Getting binned Custom drift records for {:?}", params);
172
173 if !params.has_custom_interval() {
174 debug!("No custom interval provided, using default");
175 let minutes = params.time_interval.to_minutes();
176 return Self::get_records(pool, params, minutes).await;
177 }
178
179 debug!("Custom interval provided, using custom interval");
180 let interval = params.clone().to_custom_interval().unwrap();
181 let timestamps = split_custom_interval(interval.start, interval.end, retention_period)?;
182 let mut custom_metric_map = BinnedCustomMetrics::default();
183
184 if let Some(minutes) = timestamps.current_minutes {
186 let current_results = Self::get_records(pool, params, minutes).await?;
187 Self::merge_feature_results(current_results, &mut custom_metric_map)?;
188 }
189
190 if let Some((archive_begin, archive_end)) = timestamps.archived_range {
193 if let Some(archived_minutes) = timestamps.archived_minutes {
194 let archived_results = Self::get_archived_records(
195 params,
196 archive_begin,
197 archive_end,
198 archived_minutes,
199 storage_settings,
200 )
201 .await?;
202 Self::merge_feature_results(archived_results, &mut custom_metric_map)?;
203 }
204 }
205
206 Ok(custom_metric_map)
207 }
208}