scouter_server/sql/
postgres.rs

1use crate::sql::query::{
2    GetBinnedFeatureValuesParams, GetFeatureValuesParams, GetFeaturesParams,
3    InsertMonitorProfileParams, InsertParams, Queries,
4};
5use crate::sql::schema::{DriftRecord, FeatureResult, MonitorProfile, QueryResult};
6use anyhow::*;
7use futures::future::join_all;
8use include_dir::{include_dir, Dir};
9use sqlx::{
10    postgres::{PgQueryResult, PgRow},
11    Pool, Postgres, QueryBuilder, Row,
12};
13
14use chrono::Utc;
15use cron::Schedule;
16use std::collections::BTreeMap;
17use std::result::Result::Ok;
18use std::str::FromStr;
19use tracing::error;
20
21static _MIGRATIONS: Dir = include_dir!("migrations");
22
23pub enum TimeInterval {
24    FiveMinutes,
25    FifteenMinutes,
26    ThirtyMinutes,
27    OneHour,
28    ThreeHours,
29    SixHours,
30    TwelveHours,
31    TwentyFourHours,
32    TwoDays,
33    FiveDays,
34}
35
36impl TimeInterval {
37    pub fn to_minutes(&self) -> i32 {
38        match self {
39            TimeInterval::FiveMinutes => 5,
40            TimeInterval::FifteenMinutes => 15,
41            TimeInterval::ThirtyMinutes => 30,
42            TimeInterval::OneHour => 60,
43            TimeInterval::ThreeHours => 180,
44            TimeInterval::SixHours => 360,
45            TimeInterval::TwelveHours => 720,
46            TimeInterval::TwentyFourHours => 1440,
47            TimeInterval::TwoDays => 2880,
48            TimeInterval::FiveDays => 7200,
49        }
50    }
51
52    pub fn from_string(time_window: &str) -> TimeInterval {
53        match time_window {
54            "5minute" => TimeInterval::FiveMinutes,
55            "15minute" => TimeInterval::FifteenMinutes,
56            "30minute" => TimeInterval::ThirtyMinutes,
57            "1hour" => TimeInterval::OneHour,
58            "3hour" => TimeInterval::ThreeHours,
59            "6hour" => TimeInterval::SixHours,
60            "12hour" => TimeInterval::TwelveHours,
61            "24hour" => TimeInterval::TwentyFourHours,
62            "2day" => TimeInterval::TwoDays,
63            "5day" => TimeInterval::FiveDays,
64            _ => TimeInterval::SixHours,
65        }
66    }
67}
68
69#[derive(Debug, Clone)]
70#[allow(dead_code)]
71pub struct PostgresClient {
72    pub pool: Pool<Postgres>,
73    qualified_table_name: String,
74    queue_table_name: String,
75    profile_table_name: String,
76}
77
78impl PostgresClient {
79    // Create a new instance of PostgresClient
80    pub fn new(pool: Pool<Postgres>) -> Result<Self, anyhow::Error> {
81        // get database url from env or use the provided one
82
83        Ok(Self {
84            pool,
85            qualified_table_name: "scouter.drift".to_string(),
86            queue_table_name: "scouter.drift_queue".to_string(),
87            profile_table_name: "scouter.drift_profile".to_string(),
88        })
89    }
90
91    // Inserts a drift record into the database
92    //
93    // # Arguments
94    //
95    // * `record` - A drift record to insert into the database
96    // * `table_name` - The name of the table to insert the record into
97    //
98    pub async fn insert_drift_record(
99        &self,
100        record: &DriftRecord,
101    ) -> Result<PgQueryResult, anyhow::Error> {
102        let query = Queries::InsertDriftRecord.get_query();
103
104        let params = InsertParams {
105            table: self.qualified_table_name.to_string(),
106            created_at: record.created_at,
107            name: record.name.clone(),
108            repository: record.repository.clone(),
109            feature: record.feature.clone(),
110            value: record.value.to_string(),
111            version: record.version.clone(),
112        };
113
114        let query_result: std::prelude::v1::Result<sqlx::postgres::PgQueryResult, sqlx::Error> =
115            sqlx::raw_sql(query.format(&params).as_str())
116                .execute(&self.pool)
117                .await;
118
119        //drop params
120        match query_result {
121            Ok(result) => Ok(result),
122            Err(e) => {
123                error!("Failed to insert record into database: {:?}", e);
124                Err(anyhow!("Failed to insert record into database: {:?}", e))
125            }
126        }
127    }
128
129    pub async fn insert_drift_profile(
130        &self,
131        monitor_profile: &MonitorProfile,
132    ) -> Result<PgQueryResult, anyhow::Error> {
133        let query = Queries::InsertMonitorProfile.get_query();
134
135        let cron = Schedule::from_str(&monitor_profile.config.cron).with_context(|| {
136            format!(
137                "Failed to parse cron expression: {}",
138                &monitor_profile.config.cron
139            )
140        })?;
141
142        let next_run = cron.upcoming(Utc).take(1).next().with_context(|| {
143            format!(
144                "Failed to get next run time for cron expression: {}",
145                &monitor_profile.config.cron
146            )
147        })?;
148
149        let params = InsertMonitorProfileParams {
150            table: "scouter.drift_profile".to_string(),
151            name: monitor_profile.config.name.clone(),
152            repository: monitor_profile.config.repository.clone(),
153            version: monitor_profile.config.version.clone(),
154            profile: serde_json::to_string(&monitor_profile).unwrap(),
155            cron: monitor_profile.config.cron.clone(),
156            next_run: next_run.naive_utc(),
157        };
158
159        let query_result: std::prelude::v1::Result<sqlx::postgres::PgQueryResult, sqlx::Error> =
160            sqlx::raw_sql(query.format(&params).as_str())
161                .execute(&self.pool)
162                .await;
163
164        match query_result {
165            Ok(result) => Ok(result),
166            Err(e) => {
167                error!("Failed to insert record into database: {:?}", e);
168                Err(anyhow!("Failed to insert record into database: {:?}", e))
169            }
170        }
171    }
172
173    //func batch insert drift records
174    #[allow(dead_code)]
175    pub async fn insert_drift_records(
176        &self,
177        records: &[DriftRecord],
178    ) -> Result<PgQueryResult, anyhow::Error> {
179        let insert_statement = format!(
180            "INSERT INTO {} (created_at, name, repository, version, feature, value)",
181            self.qualified_table_name
182        );
183
184        let mut query_builder = QueryBuilder::new(insert_statement);
185
186        query_builder.push_values(records.iter(), |mut b, record| {
187            b.push_bind(record.created_at)
188                .push_bind(&record.name)
189                .push_bind(&record.repository)
190                .push_bind(&record.version)
191                .push_bind(&record.feature)
192                .push_bind(record.value);
193        });
194
195        let query = query_builder.build();
196
197        let query_result = query.execute(&self.pool).await;
198
199        match query_result {
200            Ok(result) => Ok(result),
201            Err(e) => {
202                error!("Failed to insert record into database: {:?}", e);
203                Err(anyhow!("Failed to insert record into database: {:?}", e))
204            }
205        }
206    }
207
208    // Queries the database for all features under a service
209    // Private method that'll be used to run drift retrieval in parallel
210    async fn get_features(
211        &self,
212        name: &str,
213        repository: &str,
214        version: &str,
215    ) -> Result<Vec<String>, anyhow::Error> {
216        let query = Queries::GetFeatures.get_query();
217
218        let params = GetFeaturesParams {
219            table: self.qualified_table_name.to_string(),
220            name: name.to_string(),
221            repository: repository.to_string(),
222            version: version.to_string(),
223        };
224
225        let result = sqlx::raw_sql(query.format(&params).as_str())
226            .fetch_all(&self.pool)
227            .await?;
228
229        let mut features = Vec::new();
230
231        for row in result {
232            features.push(row.get("feature"));
233        }
234
235        Ok(features)
236    }
237
238    #[allow(dead_code)]
239    async fn run_feature_query(
240        &self,
241        feature: &str,
242        name: &str,
243        repository: &str,
244        version: &str,
245        limit_timestamp: &str,
246    ) -> Result<Vec<PgRow>, anyhow::Error> {
247        let query = Queries::GetFeatureValues.get_query();
248
249        let params = GetFeatureValuesParams {
250            table: self.qualified_table_name.to_string(),
251            name: name.to_string(),
252            repository: repository.to_string(),
253            version: version.to_string(),
254            feature: feature.to_string(),
255            limit_timestamp: limit_timestamp.to_string(),
256        };
257
258        let result = sqlx::raw_sql(query.format(&params).as_str())
259            .fetch_all(&self.pool)
260            .await;
261
262        match result {
263            Ok(result) => Ok(result),
264            Err(e) => {
265                error!("Failed to run query: {:?}", e);
266                Err(anyhow!("Failed to run query: {:?}", e))
267            }
268        }
269    }
270
271    async fn run_binned_feature_query(
272        &self,
273        bin: &f64,
274        feature: String,
275        version: &str,
276        time_window: &i32,
277        name: &str,
278        repository: &str,
279    ) -> Result<Vec<PgRow>, anyhow::Error> {
280        let query = Queries::GetBinnedFeatureValues.get_query();
281
282        let params = GetBinnedFeatureValuesParams {
283            table: self.qualified_table_name.to_string(),
284            name: name.to_string(),
285            repository: repository.to_string(),
286            feature,
287            version: version.to_string(),
288            time_window: time_window.to_string(),
289            bin: bin.to_string(),
290        };
291
292        let result = sqlx::raw_sql(query.format(&params).as_str())
293            .fetch_all(&self.pool)
294            .await;
295
296        match result {
297            Ok(result) => Ok(result),
298            Err(e) => {
299                error!("Failed to run query: {:?}", e);
300                Err(anyhow!("Failed to run query: {:?}", e))
301            }
302        }
303    }
304
305    // Queries the database for drift records based on a time window and aggregation
306    //
307    // # Arguments
308    //
309    // * `service_name` - The name of the service to query drift records for
310    // * `feature` - The name of the feature to query drift records for
311    // * `aggregation` - The aggregation to use for the query
312    // * `time_window` - The time window to query drift records for
313    //
314    // # Returns
315    //
316    // * A vector of drift records
317    pub async fn get_binned_drift_records(
318        &self,
319        name: &str,
320        repository: &str,
321        version: &str,
322        max_data_points: &i32,
323        time_window: &i32,
324    ) -> Result<QueryResult, anyhow::Error> {
325        // get features
326        let features = self.get_features(name, repository, version).await?;
327
328        let bin = *time_window as f64 / *max_data_points as f64;
329
330        let async_queries = features
331            .iter()
332            .map(|feature| {
333                self.run_binned_feature_query(
334                    &bin,
335                    feature.to_string(),
336                    version,
337                    time_window,
338                    name,
339                    repository,
340                )
341            })
342            .collect::<Vec<_>>();
343
344        let query_results = join_all(async_queries).await;
345
346        // parse results
347        let mut query_result = QueryResult {
348            features: BTreeMap::new(),
349        };
350
351        for data in query_results {
352            match data {
353                Ok(data) => {
354                    //check if data is empty
355                    if data.is_empty() {
356                        continue;
357                    }
358
359                    let feature_name = data[0].get("feature");
360                    let mut created_at = Vec::new();
361                    let mut values = Vec::new();
362
363                    for row in data {
364                        created_at.push(row.get("created_at"));
365                        values.push(row.get("value"));
366                    }
367
368                    query_result
369                        .features
370                        .insert(feature_name, FeatureResult { created_at, values });
371                }
372                Err(e) => {
373                    error!("Failed to run query: {:?}", e);
374                    return Err(anyhow!("Failed to run query: {:?}", e));
375                }
376            }
377        }
378
379        Ok(query_result)
380    }
381
382    #[allow(dead_code)]
383    pub async fn get_drift_records(
384        &self,
385        name: &str,
386        repository: &str,
387        version: &str,
388        limit_timestamp: &str,
389    ) -> Result<QueryResult, anyhow::Error> {
390        let features = self.get_features(name, repository, version).await?;
391
392        let async_queries = features
393            .iter()
394            .map(|feature| {
395                self.run_feature_query(feature, name, repository, version, limit_timestamp)
396            })
397            .collect::<Vec<_>>();
398
399        let query_results = join_all(async_queries).await;
400
401        let mut query_result = QueryResult {
402            features: BTreeMap::new(),
403        };
404
405        for data in query_results {
406            match data {
407                Ok(data) => {
408                    //check if data is empty
409                    if data.is_empty() {
410                        continue;
411                    }
412
413                    let feature_name = data[0].get("feature");
414                    let mut created_at = Vec::new();
415                    let mut values = Vec::new();
416
417                    for row in data {
418                        created_at.push(row.get("created_at"));
419                        values.push(row.get("value"));
420                    }
421
422                    query_result
423                        .features
424                        .insert(feature_name, FeatureResult { created_at, values });
425                }
426                Err(e) => {
427                    error!("Failed to run query: {:?}", e);
428                    return Err(anyhow!("Failed to run query: {:?}", e));
429                }
430            }
431        }
432        Ok(query_result)
433    }
434
435    #[allow(dead_code)]
436    pub async fn raw_query(&self, query: &str) -> Result<Vec<PgRow>, anyhow::Error> {
437        let result = sqlx::raw_sql(query).fetch_all(&self.pool).await;
438
439        match result {
440            Ok(result) => {
441                // pretty print
442                Ok(result)
443            }
444            Err(e) => {
445                error!("Failed to run query: {:?}", e);
446                Err(anyhow!("Failed to run query: {:?}", e))
447            }
448        }
449    }
450}
451
452// integration tests
453#[cfg(test)]
454mod tests {
455
456    use crate::api::setup::create_db_pool;
457
458    use super::*;
459    use std::env;
460    use tokio;
461
462    #[tokio::test]
463    async fn test_client() {
464        env::set_var(
465            "DATABASE_URL",
466            "postgresql://postgres:admin@localhost:5432/monitor?",
467        );
468
469        let pool = create_db_pool(None)
470            .await
471            .with_context(|| "Failed to create Postgres client")
472            .unwrap();
473        PostgresClient::new(pool).unwrap();
474    }
475
476    #[test]
477    fn test_time_interval() {
478        assert_eq!(TimeInterval::FiveMinutes.to_minutes(), 5);
479        assert_eq!(TimeInterval::FifteenMinutes.to_minutes(), 15);
480        assert_eq!(TimeInterval::ThirtyMinutes.to_minutes(), 30);
481        assert_eq!(TimeInterval::OneHour.to_minutes(), 60);
482        assert_eq!(TimeInterval::ThreeHours.to_minutes(), 180);
483        assert_eq!(TimeInterval::SixHours.to_minutes(), 360);
484        assert_eq!(TimeInterval::TwelveHours.to_minutes(), 720);
485        assert_eq!(TimeInterval::TwentyFourHours.to_minutes(), 1440);
486        assert_eq!(TimeInterval::TwoDays.to_minutes(), 2880);
487        assert_eq!(TimeInterval::FiveDays.to_minutes(), 7200);
488    }
489}