scouter_sql/sql/traits/
custom.rs

1use crate::sql::error::SqlError;
2use crate::sql::query::Queries;
3use crate::sql::schema::BinnedCustomMetricWrapper;
4use crate::sql::utils::split_custom_interval;
5use async_trait::async_trait;
6use chrono::{DateTime, Utc};
7use itertools::multiunzip;
8use scouter_dataframe::parquet::{dataframe_to_custom_drift_metrics, ParquetDataFrame};
9use scouter_settings::ObjectStorageSettings;
10use scouter_types::contracts::{DriftRequest, ServiceInfo};
11use scouter_types::{custom::BinnedCustomMetrics, CustomMetricServerRecord, RecordType};
12use sqlx::{postgres::PgQueryResult, Pool, Postgres, Row};
13use std::collections::HashMap;
14use tracing::{debug, instrument};
15#[async_trait]
16pub trait CustomMetricSqlLogic {
17    async fn insert_custom_metric_values_batch(
18        pool: &Pool<Postgres>,
19        records: &[CustomMetricServerRecord],
20    ) -> Result<PgQueryResult, SqlError> {
21        if records.is_empty() {
22            return Err(SqlError::EmptyBatchError);
23        }
24
25        let query = Queries::InsertCustomMetricValuesBatch.get_query();
26
27        let (created_ats, names, spaces, versions, metrics, values): (
28            Vec<DateTime<Utc>>,
29            Vec<&str>,
30            Vec<&str>,
31            Vec<&str>,
32            Vec<&str>,
33            Vec<f64>,
34        ) = multiunzip(records.iter().map(|r| {
35            (
36                r.created_at,
37                r.name.as_str(),
38                r.space.as_str(),
39                r.version.as_str(),
40                r.metric.as_str(),
41                r.value,
42            )
43        }));
44
45        sqlx::query(&query.sql)
46            .bind(created_ats)
47            .bind(names)
48            .bind(spaces)
49            .bind(versions)
50            .bind(metrics)
51            .bind(values)
52            .execute(pool)
53            .await
54            .map_err(SqlError::SqlxError)
55    }
56
57    async fn get_custom_metric_values(
58        pool: &Pool<Postgres>,
59        service_info: &ServiceInfo,
60        limit_datetime: &DateTime<Utc>,
61        metrics: &[String],
62    ) -> Result<HashMap<String, f64>, SqlError> {
63        let query = Queries::GetCustomMetricValues.get_query();
64
65        let records = sqlx::query(&query.sql)
66            .bind(&service_info.name)
67            .bind(&service_info.space)
68            .bind(&service_info.version)
69            .bind(limit_datetime)
70            .bind(metrics)
71            .fetch_all(pool)
72            .await
73            .map_err(SqlError::SqlxError)?;
74
75        let metric_map = records
76            .into_iter()
77            .map(|row| {
78                let metric = row.get("metric");
79                let value = row.get("value");
80                (metric, value)
81            })
82            .collect();
83
84        Ok(metric_map)
85    }
86
87    // Queries the database for Custom drift records based on a time window
88    /// and aggregation.
89    ///
90    /// # Arguments
91    /// * `pool` - The database connection pool
92    /// * `params` - The drift request parameters
93    ///
94    /// # Returns
95    /// * BinnedCustomMetrics
96    #[instrument(skip_all)]
97    async fn get_records(
98        pool: &Pool<Postgres>,
99        params: &DriftRequest,
100        minutes: i32,
101    ) -> Result<BinnedCustomMetrics, SqlError> {
102        let bin = params.time_interval.to_minutes() as f64 / params.max_data_points as f64;
103
104        let query = Queries::GetBinnedCustomMetricValues.get_query();
105
106        let records: Vec<BinnedCustomMetricWrapper> = sqlx::query_as(&query.sql)
107            .bind(bin)
108            .bind(minutes)
109            .bind(&params.name)
110            .bind(&params.space)
111            .bind(&params.version)
112            .fetch_all(pool)
113            .await
114            .map_err(SqlError::SqlxError)?;
115
116        Ok(BinnedCustomMetrics::from_vec(
117            records.into_iter().map(|wrapper| wrapper.0).collect(),
118        ))
119    }
120
121    /// Helper for merging custom drift records
122    fn merge_feature_results(
123        results: BinnedCustomMetrics,
124        map: &mut BinnedCustomMetrics,
125    ) -> Result<(), SqlError> {
126        for (name, metric) in results.metrics {
127            let metric_clone = metric.clone();
128            map.metrics
129                .entry(name)
130                .and_modify(|existing| {
131                    existing.created_at.extend(metric_clone.created_at);
132                    existing.stats.extend(metric_clone.stats);
133                })
134                .or_insert(metric);
135        }
136
137        Ok(())
138    }
139
140    /// DataFusion implementation for getting custom drift records from archived data.
141    ///
142    /// # Arguments
143    /// * `params` - The drift request parameters
144    /// * `begin` - The start time of the time window
145    /// * `end` - The end time of the time window
146    /// * `minutes` - The number of minutes to bin the data
147    /// * `storage_settings` - The object storage settings
148    ///
149    /// # Returns
150    /// * A vector of drift records
151    #[instrument(skip_all)]
152    async fn get_archived_records(
153        params: &DriftRequest,
154        begin: DateTime<Utc>,
155        end: DateTime<Utc>,
156        minutes: i32,
157        storage_settings: &ObjectStorageSettings,
158    ) -> Result<BinnedCustomMetrics, SqlError> {
159        let path = format!("{}/{}/{}/custom", params.space, params.name, params.version);
160        let bin = minutes as f64 / params.max_data_points as f64;
161        let archived_df = ParquetDataFrame::new(storage_settings, &RecordType::Custom)?
162            .get_binned_metrics(
163                &path,
164                &bin,
165                &begin,
166                &end,
167                &params.space,
168                &params.name,
169                &params.version,
170            )
171            .await?;
172
173        Ok(dataframe_to_custom_drift_metrics(archived_df).await?)
174    }
175
176    // Queries the database for drift records based on a time window and aggregation
177    //
178    // # Arguments
179    //
180    // * `name` - The name of the service to query drift records for
181    // * `params` - The drift request parameters
182    // # Returns
183    //
184    // * A vector of drift records
185    #[instrument(skip_all)]
186    async fn get_binned_custom_drift_records(
187        pool: &Pool<Postgres>,
188        params: &DriftRequest,
189        retention_period: &i32,
190        storage_settings: &ObjectStorageSettings,
191    ) -> Result<BinnedCustomMetrics, SqlError> {
192        debug!("Getting binned Custom drift records for {:?}", params);
193
194        if !params.has_custom_interval() {
195            debug!("No custom interval provided, using default");
196            let minutes = params.time_interval.to_minutes();
197            return Self::get_records(pool, params, minutes).await;
198        }
199
200        debug!("Custom interval provided, using custom interval");
201        let interval = params.clone().to_custom_interval().unwrap();
202        let timestamps = split_custom_interval(interval.start, interval.end, retention_period)?;
203        let mut custom_metric_map = BinnedCustomMetrics::default();
204
205        // get data from postgres
206        if let Some(minutes) = timestamps.current_minutes {
207            let current_results = Self::get_records(pool, params, minutes).await?;
208            Self::merge_feature_results(current_results, &mut custom_metric_map)?;
209        }
210
211        // get archived data
212
213        if let Some((archive_begin, archive_end)) = timestamps.archived_range {
214            if let Some(archived_minutes) = timestamps.archived_minutes {
215                let archived_results = Self::get_archived_records(
216                    params,
217                    archive_begin,
218                    archive_end,
219                    archived_minutes,
220                    storage_settings,
221                )
222                .await?;
223                Self::merge_feature_results(archived_results, &mut custom_metric_map)?;
224            }
225        }
226
227        Ok(custom_metric_map)
228    }
229}