scouter_sql/sql/traits/
custom.rs1use crate::sql::error::SqlError;
2use crate::sql::query::Queries;
3use crate::sql::schema::BinnedMetricWrapper;
4use crate::sql::traits::EntitySqlLogic;
5use crate::sql::utils::split_custom_interval;
6use crate::PostgresClient;
7use async_trait::async_trait;
8use chrono::{DateTime, Utc};
9use itertools::multiunzip;
10use scouter_dataframe::parquet::BinnedMetricsExtractor;
11use scouter_dataframe::parquet::ParquetDataFrame;
12use scouter_settings::ObjectStorageSettings;
13use scouter_types::contracts::DriftRequest;
14use scouter_types::{BinnedMetrics, CustomMetricRecord, RecordType};
15use sqlx::{postgres::PgQueryResult, Pool, Postgres, Row};
16use std::collections::HashMap;
17use tracing::{debug, instrument};
18
19#[async_trait]
20impl EntitySqlLogic for PostgresClient {}
21
22#[async_trait]
23pub trait CustomMetricSqlLogic {
24 #[instrument(skip_all)]
27 async fn insert_custom_metric_values_batch(
28 pool: &Pool<Postgres>,
29 records: &[CustomMetricRecord],
30 entity_id: &i32,
31 ) -> Result<PgQueryResult, SqlError> {
32 if records.is_empty() {
33 return Err(SqlError::EmptyBatchError);
34 }
35
36 let query = Queries::InsertCustomMetricValuesBatch.get_query();
37
38 let (created_ats, metrics, values, entity_ids): (
39 Vec<DateTime<Utc>>,
40 Vec<&str>,
41 Vec<f64>,
42 Vec<&i32>,
43 ) = multiunzip(
44 records
45 .iter()
46 .map(|r| (r.created_at, r.metric.as_str(), r.value, entity_id)),
47 );
48
49 sqlx::query(query)
50 .bind(created_ats)
51 .bind(entity_ids)
52 .bind(metrics)
53 .bind(values)
54 .execute(pool)
55 .await
56 .map_err(SqlError::SqlxError)
57 }
58
59 async fn get_custom_metric_values(
60 pool: &Pool<Postgres>,
61 limit_datetime: &DateTime<Utc>,
62 metrics: &[String],
63 entity_id: &i32,
64 ) -> Result<HashMap<String, f64>, SqlError> {
65 let query = Queries::GetCustomMetricValues.get_query();
66
67 let records = sqlx::query(query)
68 .bind(limit_datetime)
69 .bind(entity_id)
70 .bind(metrics)
71 .fetch_all(pool)
72 .await
73 .map_err(SqlError::SqlxError)?;
74
75 let metric_map = records
76 .into_iter()
77 .map(|row| {
78 let metric = row.get("metric");
79 let value = row.get("value");
80 (metric, value)
81 })
82 .collect();
83
84 Ok(metric_map)
85 }
86
87 #[instrument(skip_all)]
97 async fn get_records(
98 pool: &Pool<Postgres>,
99 params: &DriftRequest,
100 start_dt: DateTime<Utc>,
101 end_dt: DateTime<Utc>,
102 entity_id: &i32,
103 ) -> Result<BinnedMetrics, SqlError> {
104 let minutes = end_dt.signed_duration_since(start_dt).num_minutes() as f64;
105 let bin = minutes / params.max_data_points as f64;
106 let query = Queries::GetBinnedMetricValues.get_query();
107 let records: Vec<BinnedMetricWrapper> = sqlx::query_as(query)
108 .bind(bin)
109 .bind(start_dt)
110 .bind(end_dt)
111 .bind(entity_id)
112 .fetch_all(pool)
113 .await
114 .map_err(SqlError::SqlxError)?;
115
116 Ok(BinnedMetrics::from_vec(
117 records.into_iter().map(|wrapper| wrapper.0).collect(),
118 ))
119 }
120
121 fn merge_feature_results(
123 results: BinnedMetrics,
124 map: &mut BinnedMetrics,
125 ) -> Result<(), SqlError> {
126 for (name, metric) in results.metrics {
127 let metric_clone = metric.clone();
128 map.metrics
129 .entry(name)
130 .and_modify(|existing| {
131 existing.created_at.extend(metric_clone.created_at);
132 existing.stats.extend(metric_clone.stats);
133 })
134 .or_insert(metric);
135 }
136
137 Ok(())
138 }
139
140 #[instrument(skip_all)]
152 async fn get_archived_records(
153 params: &DriftRequest,
154 begin: DateTime<Utc>,
155 end: DateTime<Utc>,
156 minutes: i32,
157 storage_settings: &ObjectStorageSettings,
158 entity_id: &i32,
159 ) -> Result<BinnedMetrics, SqlError> {
160 let path = format!("{}/custom", params.uid);
161 let bin = minutes as f64 / params.max_data_points as f64;
162 let archived_df = ParquetDataFrame::new(storage_settings, &RecordType::Custom)?
163 .get_binned_metrics(&path, &bin, &begin, &end, entity_id)
164 .await?;
165
166 Ok(BinnedMetricsExtractor::dataframe_to_binned_metrics(archived_df).await?)
167 }
168
169 #[instrument(skip_all)]
179 async fn get_binned_custom_drift_records(
180 pool: &Pool<Postgres>,
181 params: &DriftRequest,
182 retention_period: &i32,
183 storage_settings: &ObjectStorageSettings,
184 entity_id: &i32,
185 ) -> Result<BinnedMetrics, SqlError> {
186 debug!("Getting binned Custom drift records for {:?}", params);
187
188 if !params.has_custom_interval() {
189 debug!("No custom interval provided, using default");
190 let (start_dt, end_dt) = params.time_interval.to_begin_end_times()?;
191 return Self::get_records(pool, params, start_dt, end_dt, entity_id).await;
192 }
193
194 debug!("Custom interval provided, using custom interval");
195 let interval = params.clone().to_custom_interval().unwrap();
196 let timestamps = split_custom_interval(interval.begin, interval.end, retention_period)?;
197 let mut custom_metric_map = BinnedMetrics::default();
198
199 if let Some((active_begin, active_end)) = timestamps.active_range {
201 let current_results =
202 Self::get_records(pool, params, active_begin, active_end, entity_id).await?;
203 Self::merge_feature_results(current_results, &mut custom_metric_map)?;
204 }
205
206 if let Some((archive_begin, archive_end)) = timestamps.archived_range {
209 if let Some(archived_minutes) = timestamps.archived_minutes {
210 let archived_results = Self::get_archived_records(
211 params,
212 archive_begin,
213 archive_end,
214 archived_minutes,
215 storage_settings,
216 entity_id,
217 )
218 .await?;
219 Self::merge_feature_results(archived_results, &mut custom_metric_map)?;
220 }
221 }
222
223 Ok(custom_metric_map)
224 }
225}