scouter_sql/sql/traits/
custom.rs

1use crate::sql::error::SqlError;
2use crate::sql::query::Queries;
3use crate::sql::schema::BinnedMetricWrapper;
4use crate::sql::utils::split_custom_interval;
5use async_trait::async_trait;
6use chrono::{DateTime, Utc};
7use itertools::multiunzip;
8use scouter_dataframe::parquet::BinnedMetricsExtractor;
9use scouter_dataframe::parquet::ParquetDataFrame;
10use scouter_settings::ObjectStorageSettings;
11use scouter_types::contracts::{DriftRequest, ServiceInfo};
12use scouter_types::{BinnedMetrics, CustomMetricServerRecord, RecordType};
13use sqlx::{postgres::PgQueryResult, Pool, Postgres, Row};
14use std::collections::HashMap;
15use tracing::{debug, instrument};
16
17#[async_trait]
18pub trait CustomMetricSqlLogic {
19    async fn insert_custom_metric_values_batch(
20        pool: &Pool<Postgres>,
21        records: &[CustomMetricServerRecord],
22    ) -> Result<PgQueryResult, SqlError> {
23        if records.is_empty() {
24            return Err(SqlError::EmptyBatchError);
25        }
26
27        let query = Queries::InsertCustomMetricValuesBatch.get_query();
28
29        let (created_ats, names, spaces, versions, metrics, values): (
30            Vec<DateTime<Utc>>,
31            Vec<&str>,
32            Vec<&str>,
33            Vec<&str>,
34            Vec<&str>,
35            Vec<f64>,
36        ) = multiunzip(records.iter().map(|r| {
37            (
38                r.created_at,
39                r.name.as_str(),
40                r.space.as_str(),
41                r.version.as_str(),
42                r.metric.as_str(),
43                r.value,
44            )
45        }));
46
47        sqlx::query(&query.sql)
48            .bind(created_ats)
49            .bind(names)
50            .bind(spaces)
51            .bind(versions)
52            .bind(metrics)
53            .bind(values)
54            .execute(pool)
55            .await
56            .map_err(SqlError::SqlxError)
57    }
58
59    async fn get_custom_metric_values(
60        pool: &Pool<Postgres>,
61        service_info: &ServiceInfo,
62        limit_datetime: &DateTime<Utc>,
63        metrics: &[String],
64    ) -> Result<HashMap<String, f64>, SqlError> {
65        let query = Queries::GetCustomMetricValues.get_query();
66
67        let records = sqlx::query(&query.sql)
68            .bind(&service_info.name)
69            .bind(&service_info.space)
70            .bind(&service_info.version)
71            .bind(limit_datetime)
72            .bind(metrics)
73            .fetch_all(pool)
74            .await
75            .map_err(SqlError::SqlxError)?;
76
77        let metric_map = records
78            .into_iter()
79            .map(|row| {
80                let metric = row.get("metric");
81                let value = row.get("value");
82                (metric, value)
83            })
84            .collect();
85
86        Ok(metric_map)
87    }
88
89    // Queries the database for Custom drift records based on a time window
90    /// and aggregation.
91    ///
92    /// # Arguments
93    /// * `pool` - The database connection pool
94    /// * `params` - The drift request parameters
95    ///
96    /// # Returns
97    /// * BinnedMetrics
98    #[instrument(skip_all)]
99    async fn get_records(
100        pool: &Pool<Postgres>,
101        params: &DriftRequest,
102        minutes: i32,
103    ) -> Result<BinnedMetrics, SqlError> {
104        let bin = params.time_interval.to_minutes() as f64 / params.max_data_points as f64;
105
106        let query = Queries::GetBinnedMetricValues.get_query();
107
108        let records: Vec<BinnedMetricWrapper> = sqlx::query_as(&query.sql)
109            .bind(bin)
110            .bind(minutes)
111            .bind(&params.name)
112            .bind(&params.space)
113            .bind(&params.version)
114            .fetch_all(pool)
115            .await
116            .map_err(SqlError::SqlxError)?;
117
118        Ok(BinnedMetrics::from_vec(
119            records.into_iter().map(|wrapper| wrapper.0).collect(),
120        ))
121    }
122
123    /// Helper for merging custom drift records
124    fn merge_feature_results(
125        results: BinnedMetrics,
126        map: &mut BinnedMetrics,
127    ) -> Result<(), SqlError> {
128        for (name, metric) in results.metrics {
129            let metric_clone = metric.clone();
130            map.metrics
131                .entry(name)
132                .and_modify(|existing| {
133                    existing.created_at.extend(metric_clone.created_at);
134                    existing.stats.extend(metric_clone.stats);
135                })
136                .or_insert(metric);
137        }
138
139        Ok(())
140    }
141
142    /// DataFusion implementation for getting custom drift records from archived data.
143    ///
144    /// # Arguments
145    /// * `params` - The drift request parameters
146    /// * `begin` - The start time of the time window
147    /// * `end` - The end time of the time window
148    /// * `minutes` - The number of minutes to bin the data
149    /// * `storage_settings` - The object storage settings
150    ///
151    /// # Returns
152    /// * A vector of drift records
153    #[instrument(skip_all)]
154    async fn get_archived_records(
155        params: &DriftRequest,
156        begin: DateTime<Utc>,
157        end: DateTime<Utc>,
158        minutes: i32,
159        storage_settings: &ObjectStorageSettings,
160    ) -> Result<BinnedMetrics, SqlError> {
161        let path = format!("{}/{}/{}/custom", params.space, params.name, params.version);
162        let bin = minutes as f64 / params.max_data_points as f64;
163        let archived_df = ParquetDataFrame::new(storage_settings, &RecordType::Custom)?
164            .get_binned_metrics(
165                &path,
166                &bin,
167                &begin,
168                &end,
169                &params.space,
170                &params.name,
171                &params.version,
172            )
173            .await?;
174
175        Ok(BinnedMetricsExtractor::dataframe_to_binned_metrics(archived_df).await?)
176    }
177
178    // Queries the database for drift records based on a time window and aggregation
179    //
180    // # Arguments
181    //
182    // * `name` - The name of the service to query drift records for
183    // * `params` - The drift request parameters
184    // # Returns
185    //
186    // * A vector of drift records
187    #[instrument(skip_all)]
188    async fn get_binned_custom_drift_records(
189        pool: &Pool<Postgres>,
190        params: &DriftRequest,
191        retention_period: &i32,
192        storage_settings: &ObjectStorageSettings,
193    ) -> Result<BinnedMetrics, SqlError> {
194        debug!("Getting binned Custom drift records for {:?}", params);
195
196        if !params.has_custom_interval() {
197            debug!("No custom interval provided, using default");
198            let minutes = params.time_interval.to_minutes();
199            return Self::get_records(pool, params, minutes).await;
200        }
201
202        debug!("Custom interval provided, using custom interval");
203        let interval = params.clone().to_custom_interval().unwrap();
204        let timestamps = split_custom_interval(interval.start, interval.end, retention_period)?;
205        let mut custom_metric_map = BinnedMetrics::default();
206
207        // get data from postgres
208        if let Some(minutes) = timestamps.current_minutes {
209            let current_results = Self::get_records(pool, params, minutes).await?;
210            Self::merge_feature_results(current_results, &mut custom_metric_map)?;
211        }
212
213        // get archived data
214
215        if let Some((archive_begin, archive_end)) = timestamps.archived_range {
216            if let Some(archived_minutes) = timestamps.archived_minutes {
217                let archived_results = Self::get_archived_records(
218                    params,
219                    archive_begin,
220                    archive_end,
221                    archived_minutes,
222                    storage_settings,
223                )
224                .await?;
225                Self::merge_feature_results(archived_results, &mut custom_metric_map)?;
226            }
227        }
228
229        Ok(custom_metric_map)
230    }
231}