scouter_sql/sql/traits/
custom.rs

1use crate::sql::error::SqlError;
2use crate::sql::query::Queries;
3use crate::sql::schema::BinnedCustomMetricWrapper;
4use crate::sql::utils::split_custom_interval;
5use async_trait::async_trait;
6use chrono::{DateTime, Utc};
7use scouter_dataframe::parquet::{dataframe_to_custom_drift_metrics, ParquetDataFrame};
8use scouter_settings::ObjectStorageSettings;
9use scouter_types::contracts::{DriftRequest, ServiceInfo};
10use scouter_types::{custom::BinnedCustomMetrics, CustomMetricServerRecord, RecordType};
11use sqlx::{postgres::PgQueryResult, Pool, Postgres, Row};
12use std::collections::HashMap;
13use tracing::{debug, instrument};
14
15#[async_trait]
16pub trait CustomMetricSqlLogic {
17    /// Inserts a custom metric value into the database.
18    async fn insert_custom_metric_value(
19        pool: &Pool<Postgres>,
20        record: &CustomMetricServerRecord,
21    ) -> Result<PgQueryResult, SqlError> {
22        let query = Queries::InsertCustomMetricValues.get_query();
23
24        sqlx::query(&query.sql)
25            .bind(record.created_at)
26            .bind(&record.name)
27            .bind(&record.space)
28            .bind(&record.version)
29            .bind(&record.metric)
30            .bind(record.value)
31            .execute(pool)
32            .await
33            .map_err(SqlError::SqlxError)
34    }
35
36    async fn get_custom_metric_values(
37        pool: &Pool<Postgres>,
38        service_info: &ServiceInfo,
39        limit_datetime: &DateTime<Utc>,
40        metrics: &[String],
41    ) -> Result<HashMap<String, f64>, SqlError> {
42        let query = Queries::GetCustomMetricValues.get_query();
43
44        let records = sqlx::query(&query.sql)
45            .bind(&service_info.name)
46            .bind(&service_info.space)
47            .bind(&service_info.version)
48            .bind(limit_datetime)
49            .bind(metrics)
50            .fetch_all(pool)
51            .await
52            .map_err(SqlError::SqlxError)?;
53
54        let metric_map = records
55            .into_iter()
56            .map(|row| {
57                let metric = row.get("metric");
58                let value = row.get("value");
59                (metric, value)
60            })
61            .collect();
62
63        Ok(metric_map)
64    }
65
66    // Queries the database for Custom drift records based on a time window
67    /// and aggregation.
68    ///
69    /// # Arguments
70    /// * `pool` - The database connection pool
71    /// * `params` - The drift request parameters
72    ///
73    /// # Returns
74    /// * BinnedCustomMetrics
75    #[instrument(skip_all)]
76    async fn get_records(
77        pool: &Pool<Postgres>,
78        params: &DriftRequest,
79        minutes: i32,
80    ) -> Result<BinnedCustomMetrics, SqlError> {
81        let bin = params.time_interval.to_minutes() as f64 / params.max_data_points as f64;
82
83        let query = Queries::GetBinnedCustomMetricValues.get_query();
84
85        let records: Vec<BinnedCustomMetricWrapper> = sqlx::query_as(&query.sql)
86            .bind(bin)
87            .bind(minutes)
88            .bind(&params.name)
89            .bind(&params.space)
90            .bind(&params.version)
91            .fetch_all(pool)
92            .await
93            .map_err(SqlError::SqlxError)?;
94
95        Ok(BinnedCustomMetrics::from_vec(
96            records.into_iter().map(|wrapper| wrapper.0).collect(),
97        ))
98    }
99
100    /// Helper for merging custom drift records
101    fn merge_feature_results(
102        results: BinnedCustomMetrics,
103        map: &mut BinnedCustomMetrics,
104    ) -> Result<(), SqlError> {
105        for (name, metric) in results.metrics {
106            let metric_clone = metric.clone();
107            map.metrics
108                .entry(name)
109                .and_modify(|existing| {
110                    existing.created_at.extend(metric_clone.created_at);
111                    existing.stats.extend(metric_clone.stats);
112                })
113                .or_insert(metric);
114        }
115
116        Ok(())
117    }
118
119    /// DataFusion implementation for getting custom drift records from archived data.
120    ///
121    /// # Arguments
122    /// * `params` - The drift request parameters
123    /// * `begin` - The start time of the time window
124    /// * `end` - The end time of the time window
125    /// * `minutes` - The number of minutes to bin the data
126    /// * `storage_settings` - The object storage settings
127    ///
128    /// # Returns
129    /// * A vector of drift records
130    #[instrument(skip_all)]
131    async fn get_archived_records(
132        params: &DriftRequest,
133        begin: DateTime<Utc>,
134        end: DateTime<Utc>,
135        minutes: i32,
136        storage_settings: &ObjectStorageSettings,
137    ) -> Result<BinnedCustomMetrics, SqlError> {
138        let path = format!("{}/{}/{}/custom", params.space, params.name, params.version);
139        let bin = minutes as f64 / params.max_data_points as f64;
140        let archived_df = ParquetDataFrame::new(storage_settings, &RecordType::Custom)?
141            .get_binned_metrics(
142                &path,
143                &bin,
144                &begin,
145                &end,
146                &params.space,
147                &params.name,
148                &params.version,
149            )
150            .await?;
151
152        Ok(dataframe_to_custom_drift_metrics(archived_df).await?)
153    }
154
155    // Queries the database for drift records based on a time window and aggregation
156    //
157    // # Arguments
158    //
159    // * `name` - The name of the service to query drift records for
160    // * `params` - The drift request parameters
161    // # Returns
162    //
163    // * A vector of drift records
164    #[instrument(skip_all)]
165    async fn get_binned_custom_drift_records(
166        pool: &Pool<Postgres>,
167        params: &DriftRequest,
168        retention_period: &i32,
169        storage_settings: &ObjectStorageSettings,
170    ) -> Result<BinnedCustomMetrics, SqlError> {
171        debug!("Getting binned Custom drift records for {:?}", params);
172
173        if !params.has_custom_interval() {
174            debug!("No custom interval provided, using default");
175            let minutes = params.time_interval.to_minutes();
176            return Self::get_records(pool, params, minutes).await;
177        }
178
179        debug!("Custom interval provided, using custom interval");
180        let interval = params.clone().to_custom_interval().unwrap();
181        let timestamps = split_custom_interval(interval.start, interval.end, retention_period)?;
182        let mut custom_metric_map = BinnedCustomMetrics::default();
183
184        // get data from postgres
185        if let Some(minutes) = timestamps.current_minutes {
186            let current_results = Self::get_records(pool, params, minutes).await?;
187            Self::merge_feature_results(current_results, &mut custom_metric_map)?;
188        }
189
190        // get archived data
191
192        if let Some((archive_begin, archive_end)) = timestamps.archived_range {
193            if let Some(archived_minutes) = timestamps.archived_minutes {
194                let archived_results = Self::get_archived_records(
195                    params,
196                    archive_begin,
197                    archive_end,
198                    archived_minutes,
199                    storage_settings,
200                )
201                .await?;
202                Self::merge_feature_results(archived_results, &mut custom_metric_map)?;
203            }
204        }
205
206        Ok(custom_metric_map)
207    }
208}