Skip to main content

scouter_sql/sql/traits/
custom.rs

1use crate::sql::error::SqlError;
2use crate::sql::query::Queries;
3use crate::sql::schema::BinnedMetricWrapper;
4use crate::sql::traits::EntitySqlLogic;
5use crate::sql::utils::split_custom_interval;
6use crate::PostgresClient;
7use async_trait::async_trait;
8use chrono::{DateTime, Utc};
9use itertools::multiunzip;
10use scouter_dataframe::parquet::BinnedMetricsExtractor;
11use scouter_dataframe::parquet::ParquetDataFrame;
12use scouter_settings::ObjectStorageSettings;
13use scouter_types::contracts::DriftRequest;
14use scouter_types::{BinnedMetrics, CustomMetricRecord, RecordType};
15use sqlx::{postgres::PgQueryResult, Pool, Postgres, Row};
16use std::collections::HashMap;
17use tracing::{debug, instrument};
18
19#[async_trait]
20impl EntitySqlLogic for PostgresClient {}
21
22#[async_trait]
23pub trait CustomMetricSqlLogic {
24    /// Inserts a batch of custom metric values into the database
25    /// - This is an event route, so we need to get the entity_id from the uid
26    #[instrument(skip_all)]
27    async fn insert_custom_metric_values_batch(
28        pool: &Pool<Postgres>,
29        records: &[CustomMetricRecord],
30        entity_id: &i32,
31    ) -> Result<PgQueryResult, SqlError> {
32        if records.is_empty() {
33            return Err(SqlError::EmptyBatchError);
34        }
35
36        let query = Queries::InsertCustomMetricValuesBatch.get_query();
37
38        let (created_ats, metrics, values, entity_ids): (
39            Vec<DateTime<Utc>>,
40            Vec<&str>,
41            Vec<f64>,
42            Vec<&i32>,
43        ) = multiunzip(
44            records
45                .iter()
46                .map(|r| (r.created_at, r.metric.as_str(), r.value, entity_id)),
47        );
48
49        sqlx::query(query)
50            .bind(created_ats)
51            .bind(entity_ids)
52            .bind(metrics)
53            .bind(values)
54            .execute(pool)
55            .await
56            .map_err(SqlError::SqlxError)
57    }
58
59    async fn get_custom_metric_values(
60        pool: &Pool<Postgres>,
61        limit_datetime: &DateTime<Utc>,
62        metrics: &[String],
63        entity_id: &i32,
64    ) -> Result<HashMap<String, f64>, SqlError> {
65        let query = Queries::GetCustomMetricValues.get_query();
66
67        let records = sqlx::query(query)
68            .bind(limit_datetime)
69            .bind(entity_id)
70            .bind(metrics)
71            .fetch_all(pool)
72            .await
73            .map_err(SqlError::SqlxError)?;
74
75        let metric_map = records
76            .into_iter()
77            .map(|row| {
78                let metric = row.get("metric");
79                let value = row.get("value");
80                (metric, value)
81            })
82            .collect();
83
84        Ok(metric_map)
85    }
86
87    // Queries the database for Custom drift records based on a time window
88    /// and aggregation.
89    ///
90    /// # Arguments
91    /// * `pool` - The database connection pool
92    /// * `params` - The drift request parameters
93    ///
94    /// # Returns
95    /// * BinnedMetrics
96    #[instrument(skip_all)]
97    async fn get_records(
98        pool: &Pool<Postgres>,
99        params: &DriftRequest,
100        start_dt: DateTime<Utc>,
101        end_dt: DateTime<Utc>,
102        entity_id: &i32,
103    ) -> Result<BinnedMetrics, SqlError> {
104        let minutes = end_dt.signed_duration_since(start_dt).num_minutes() as f64;
105        let bin = minutes / params.max_data_points as f64;
106        let query = Queries::GetBinnedMetricValues.get_query();
107        let records: Vec<BinnedMetricWrapper> = sqlx::query_as(query)
108            .bind(bin)
109            .bind(start_dt)
110            .bind(end_dt)
111            .bind(entity_id)
112            .fetch_all(pool)
113            .await
114            .map_err(SqlError::SqlxError)?;
115
116        Ok(BinnedMetrics::from_vec(
117            records.into_iter().map(|wrapper| wrapper.0).collect(),
118        ))
119    }
120
121    /// Helper for merging custom drift records
122    fn merge_feature_results(
123        results: BinnedMetrics,
124        map: &mut BinnedMetrics,
125    ) -> Result<(), SqlError> {
126        for (name, metric) in results.metrics {
127            let metric_clone = metric.clone();
128            map.metrics
129                .entry(name)
130                .and_modify(|existing| {
131                    existing.created_at.extend(metric_clone.created_at);
132                    existing.stats.extend(metric_clone.stats);
133                })
134                .or_insert(metric);
135        }
136
137        Ok(())
138    }
139
140    /// DataFusion implementation for getting custom drift records from archived data.
141    ///
142    /// # Arguments
143    /// * `params` - The drift request parameters
144    /// * `begin` - The start time of the time window
145    /// * `end` - The end time of the time window
146    /// * `minutes` - The number of minutes to bin the data
147    /// * `storage_settings` - The object storage settings
148    ///
149    /// # Returns
150    /// * A vector of drift records
151    #[instrument(skip_all)]
152    async fn get_archived_records(
153        params: &DriftRequest,
154        begin: DateTime<Utc>,
155        end: DateTime<Utc>,
156        minutes: i32,
157        storage_settings: &ObjectStorageSettings,
158        entity_id: &i32,
159    ) -> Result<BinnedMetrics, SqlError> {
160        let path = format!("{}/custom", params.uid);
161        let bin = minutes as f64 / params.max_data_points as f64;
162        let archived_df = ParquetDataFrame::new(storage_settings, &RecordType::Custom)?
163            .get_binned_metrics(&path, &bin, &begin, &end, entity_id)
164            .await?;
165
166        Ok(BinnedMetricsExtractor::dataframe_to_binned_metrics(archived_df).await?)
167    }
168
169    // Queries the database for drift records based on a time window and aggregation
170    //
171    // # Arguments
172    //
173    // * `name` - The name of the service to query drift records for
174    // * `params` - The drift request parameters
175    // # Returns
176    //
177    // * A vector of drift records
178    #[instrument(skip_all)]
179    async fn get_binned_custom_drift_records(
180        pool: &Pool<Postgres>,
181        params: &DriftRequest,
182        retention_period: &i32,
183        storage_settings: &ObjectStorageSettings,
184        entity_id: &i32,
185    ) -> Result<BinnedMetrics, SqlError> {
186        debug!("Getting binned Custom drift records for {:?}", params);
187
188        if !params.has_custom_interval() {
189            debug!("No custom interval provided, using default");
190            let (start_dt, end_dt) = params.time_interval.to_begin_end_times()?;
191            return Self::get_records(pool, params, start_dt, end_dt, entity_id).await;
192        }
193
194        debug!("Custom interval provided, using custom interval");
195        let interval = params.clone().to_custom_interval().unwrap();
196        let timestamps = split_custom_interval(interval.begin, interval.end, retention_period)?;
197        let mut custom_metric_map = BinnedMetrics::default();
198
199        // get data from postgres
200        if let Some((active_begin, active_end)) = timestamps.active_range {
201            let current_results =
202                Self::get_records(pool, params, active_begin, active_end, entity_id).await?;
203            Self::merge_feature_results(current_results, &mut custom_metric_map)?;
204        }
205
206        // get archived data
207
208        if let Some((archive_begin, archive_end)) = timestamps.archived_range {
209            if let Some(archived_minutes) = timestamps.archived_minutes {
210                let archived_results = Self::get_archived_records(
211                    params,
212                    archive_begin,
213                    archive_end,
214                    archived_minutes,
215                    storage_settings,
216                    entity_id,
217                )
218                .await?;
219                Self::merge_feature_results(archived_results, &mut custom_metric_map)?;
220            }
221        }
222
223        Ok(custom_metric_map)
224    }
225}