1use crate::sql::query::{
2 GetBinnedFeatureValuesParams, GetFeatureValuesParams, GetFeaturesParams,
3 InsertMonitorProfileParams, InsertParams, Queries,
4};
5use crate::sql::schema::{DriftRecord, FeatureResult, MonitorProfile, QueryResult};
6use anyhow::*;
7use futures::future::join_all;
8use include_dir::{include_dir, Dir};
9use sqlx::{
10 postgres::{PgQueryResult, PgRow},
11 Pool, Postgres, QueryBuilder, Row,
12};
13
14use chrono::Utc;
15use cron::Schedule;
16use std::collections::BTreeMap;
17use std::result::Result::Ok;
18use std::str::FromStr;
19use tracing::error;
20
21static _MIGRATIONS: Dir = include_dir!("migrations");
22
23pub enum TimeInterval {
24 FiveMinutes,
25 FifteenMinutes,
26 ThirtyMinutes,
27 OneHour,
28 ThreeHours,
29 SixHours,
30 TwelveHours,
31 TwentyFourHours,
32 TwoDays,
33 FiveDays,
34}
35
36impl TimeInterval {
37 pub fn to_minutes(&self) -> i32 {
38 match self {
39 TimeInterval::FiveMinutes => 5,
40 TimeInterval::FifteenMinutes => 15,
41 TimeInterval::ThirtyMinutes => 30,
42 TimeInterval::OneHour => 60,
43 TimeInterval::ThreeHours => 180,
44 TimeInterval::SixHours => 360,
45 TimeInterval::TwelveHours => 720,
46 TimeInterval::TwentyFourHours => 1440,
47 TimeInterval::TwoDays => 2880,
48 TimeInterval::FiveDays => 7200,
49 }
50 }
51
52 pub fn from_string(time_window: &str) -> TimeInterval {
53 match time_window {
54 "5minute" => TimeInterval::FiveMinutes,
55 "15minute" => TimeInterval::FifteenMinutes,
56 "30minute" => TimeInterval::ThirtyMinutes,
57 "1hour" => TimeInterval::OneHour,
58 "3hour" => TimeInterval::ThreeHours,
59 "6hour" => TimeInterval::SixHours,
60 "12hour" => TimeInterval::TwelveHours,
61 "24hour" => TimeInterval::TwentyFourHours,
62 "2day" => TimeInterval::TwoDays,
63 "5day" => TimeInterval::FiveDays,
64 _ => TimeInterval::SixHours,
65 }
66 }
67}
68
69#[derive(Debug, Clone)]
70#[allow(dead_code)]
71pub struct PostgresClient {
72 pub pool: Pool<Postgres>,
73 qualified_table_name: String,
74 queue_table_name: String,
75 profile_table_name: String,
76}
77
78impl PostgresClient {
79 pub fn new(pool: Pool<Postgres>) -> Result<Self, anyhow::Error> {
81 Ok(Self {
84 pool,
85 qualified_table_name: "scouter.drift".to_string(),
86 queue_table_name: "scouter.drift_queue".to_string(),
87 profile_table_name: "scouter.drift_profile".to_string(),
88 })
89 }
90
91 pub async fn insert_drift_record(
99 &self,
100 record: &DriftRecord,
101 ) -> Result<PgQueryResult, anyhow::Error> {
102 let query = Queries::InsertDriftRecord.get_query();
103
104 let params = InsertParams {
105 table: self.qualified_table_name.to_string(),
106 created_at: record.created_at,
107 name: record.name.clone(),
108 repository: record.repository.clone(),
109 feature: record.feature.clone(),
110 value: record.value.to_string(),
111 version: record.version.clone(),
112 };
113
114 let query_result: std::prelude::v1::Result<sqlx::postgres::PgQueryResult, sqlx::Error> =
115 sqlx::raw_sql(query.format(¶ms).as_str())
116 .execute(&self.pool)
117 .await;
118
119 match query_result {
121 Ok(result) => Ok(result),
122 Err(e) => {
123 error!("Failed to insert record into database: {:?}", e);
124 Err(anyhow!("Failed to insert record into database: {:?}", e))
125 }
126 }
127 }
128
129 pub async fn insert_drift_profile(
130 &self,
131 monitor_profile: &MonitorProfile,
132 ) -> Result<PgQueryResult, anyhow::Error> {
133 let query = Queries::InsertMonitorProfile.get_query();
134
135 let cron = Schedule::from_str(&monitor_profile.config.cron).with_context(|| {
136 format!(
137 "Failed to parse cron expression: {}",
138 &monitor_profile.config.cron
139 )
140 })?;
141
142 let next_run = cron.upcoming(Utc).take(1).next().with_context(|| {
143 format!(
144 "Failed to get next run time for cron expression: {}",
145 &monitor_profile.config.cron
146 )
147 })?;
148
149 let params = InsertMonitorProfileParams {
150 table: "scouter.drift_profile".to_string(),
151 name: monitor_profile.config.name.clone(),
152 repository: monitor_profile.config.repository.clone(),
153 version: monitor_profile.config.version.clone(),
154 profile: serde_json::to_string(&monitor_profile).unwrap(),
155 cron: monitor_profile.config.cron.clone(),
156 next_run: next_run.naive_utc(),
157 };
158
159 let query_result: std::prelude::v1::Result<sqlx::postgres::PgQueryResult, sqlx::Error> =
160 sqlx::raw_sql(query.format(¶ms).as_str())
161 .execute(&self.pool)
162 .await;
163
164 match query_result {
165 Ok(result) => Ok(result),
166 Err(e) => {
167 error!("Failed to insert record into database: {:?}", e);
168 Err(anyhow!("Failed to insert record into database: {:?}", e))
169 }
170 }
171 }
172
173 #[allow(dead_code)]
175 pub async fn insert_drift_records(
176 &self,
177 records: &[DriftRecord],
178 ) -> Result<PgQueryResult, anyhow::Error> {
179 let insert_statement = format!(
180 "INSERT INTO {} (created_at, name, repository, version, feature, value)",
181 self.qualified_table_name
182 );
183
184 let mut query_builder = QueryBuilder::new(insert_statement);
185
186 query_builder.push_values(records.iter(), |mut b, record| {
187 b.push_bind(record.created_at)
188 .push_bind(&record.name)
189 .push_bind(&record.repository)
190 .push_bind(&record.version)
191 .push_bind(&record.feature)
192 .push_bind(record.value);
193 });
194
195 let query = query_builder.build();
196
197 let query_result = query.execute(&self.pool).await;
198
199 match query_result {
200 Ok(result) => Ok(result),
201 Err(e) => {
202 error!("Failed to insert record into database: {:?}", e);
203 Err(anyhow!("Failed to insert record into database: {:?}", e))
204 }
205 }
206 }
207
208 async fn get_features(
211 &self,
212 name: &str,
213 repository: &str,
214 version: &str,
215 ) -> Result<Vec<String>, anyhow::Error> {
216 let query = Queries::GetFeatures.get_query();
217
218 let params = GetFeaturesParams {
219 table: self.qualified_table_name.to_string(),
220 name: name.to_string(),
221 repository: repository.to_string(),
222 version: version.to_string(),
223 };
224
225 let result = sqlx::raw_sql(query.format(¶ms).as_str())
226 .fetch_all(&self.pool)
227 .await?;
228
229 let mut features = Vec::new();
230
231 for row in result {
232 features.push(row.get("feature"));
233 }
234
235 Ok(features)
236 }
237
238 #[allow(dead_code)]
239 async fn run_feature_query(
240 &self,
241 feature: &str,
242 name: &str,
243 repository: &str,
244 version: &str,
245 limit_timestamp: &str,
246 ) -> Result<Vec<PgRow>, anyhow::Error> {
247 let query = Queries::GetFeatureValues.get_query();
248
249 let params = GetFeatureValuesParams {
250 table: self.qualified_table_name.to_string(),
251 name: name.to_string(),
252 repository: repository.to_string(),
253 version: version.to_string(),
254 feature: feature.to_string(),
255 limit_timestamp: limit_timestamp.to_string(),
256 };
257
258 let result = sqlx::raw_sql(query.format(¶ms).as_str())
259 .fetch_all(&self.pool)
260 .await;
261
262 match result {
263 Ok(result) => Ok(result),
264 Err(e) => {
265 error!("Failed to run query: {:?}", e);
266 Err(anyhow!("Failed to run query: {:?}", e))
267 }
268 }
269 }
270
271 async fn run_binned_feature_query(
272 &self,
273 bin: &f64,
274 feature: String,
275 version: &str,
276 time_window: &i32,
277 name: &str,
278 repository: &str,
279 ) -> Result<Vec<PgRow>, anyhow::Error> {
280 let query = Queries::GetBinnedFeatureValues.get_query();
281
282 let params = GetBinnedFeatureValuesParams {
283 table: self.qualified_table_name.to_string(),
284 name: name.to_string(),
285 repository: repository.to_string(),
286 feature,
287 version: version.to_string(),
288 time_window: time_window.to_string(),
289 bin: bin.to_string(),
290 };
291
292 let result = sqlx::raw_sql(query.format(¶ms).as_str())
293 .fetch_all(&self.pool)
294 .await;
295
296 match result {
297 Ok(result) => Ok(result),
298 Err(e) => {
299 error!("Failed to run query: {:?}", e);
300 Err(anyhow!("Failed to run query: {:?}", e))
301 }
302 }
303 }
304
305 pub async fn get_binned_drift_records(
318 &self,
319 name: &str,
320 repository: &str,
321 version: &str,
322 max_data_points: &i32,
323 time_window: &i32,
324 ) -> Result<QueryResult, anyhow::Error> {
325 let features = self.get_features(name, repository, version).await?;
327
328 let bin = *time_window as f64 / *max_data_points as f64;
329
330 let async_queries = features
331 .iter()
332 .map(|feature| {
333 self.run_binned_feature_query(
334 &bin,
335 feature.to_string(),
336 version,
337 time_window,
338 name,
339 repository,
340 )
341 })
342 .collect::<Vec<_>>();
343
344 let query_results = join_all(async_queries).await;
345
346 let mut query_result = QueryResult {
348 features: BTreeMap::new(),
349 };
350
351 for data in query_results {
352 match data {
353 Ok(data) => {
354 if data.is_empty() {
356 continue;
357 }
358
359 let feature_name = data[0].get("feature");
360 let mut created_at = Vec::new();
361 let mut values = Vec::new();
362
363 for row in data {
364 created_at.push(row.get("created_at"));
365 values.push(row.get("value"));
366 }
367
368 query_result
369 .features
370 .insert(feature_name, FeatureResult { created_at, values });
371 }
372 Err(e) => {
373 error!("Failed to run query: {:?}", e);
374 return Err(anyhow!("Failed to run query: {:?}", e));
375 }
376 }
377 }
378
379 Ok(query_result)
380 }
381
382 #[allow(dead_code)]
383 pub async fn get_drift_records(
384 &self,
385 name: &str,
386 repository: &str,
387 version: &str,
388 limit_timestamp: &str,
389 ) -> Result<QueryResult, anyhow::Error> {
390 let features = self.get_features(name, repository, version).await?;
391
392 let async_queries = features
393 .iter()
394 .map(|feature| {
395 self.run_feature_query(feature, name, repository, version, limit_timestamp)
396 })
397 .collect::<Vec<_>>();
398
399 let query_results = join_all(async_queries).await;
400
401 let mut query_result = QueryResult {
402 features: BTreeMap::new(),
403 };
404
405 for data in query_results {
406 match data {
407 Ok(data) => {
408 if data.is_empty() {
410 continue;
411 }
412
413 let feature_name = data[0].get("feature");
414 let mut created_at = Vec::new();
415 let mut values = Vec::new();
416
417 for row in data {
418 created_at.push(row.get("created_at"));
419 values.push(row.get("value"));
420 }
421
422 query_result
423 .features
424 .insert(feature_name, FeatureResult { created_at, values });
425 }
426 Err(e) => {
427 error!("Failed to run query: {:?}", e);
428 return Err(anyhow!("Failed to run query: {:?}", e));
429 }
430 }
431 }
432 Ok(query_result)
433 }
434
435 #[allow(dead_code)]
436 pub async fn raw_query(&self, query: &str) -> Result<Vec<PgRow>, anyhow::Error> {
437 let result = sqlx::raw_sql(query).fetch_all(&self.pool).await;
438
439 match result {
440 Ok(result) => {
441 Ok(result)
443 }
444 Err(e) => {
445 error!("Failed to run query: {:?}", e);
446 Err(anyhow!("Failed to run query: {:?}", e))
447 }
448 }
449 }
450}
451
452#[cfg(test)]
454mod tests {
455
456 use crate::api::setup::create_db_pool;
457
458 use super::*;
459 use std::env;
460 use tokio;
461
462 #[tokio::test]
463 async fn test_client() {
464 env::set_var(
465 "DATABASE_URL",
466 "postgresql://postgres:admin@localhost:5432/monitor?",
467 );
468
469 let pool = create_db_pool(None)
470 .await
471 .with_context(|| "Failed to create Postgres client")
472 .unwrap();
473 PostgresClient::new(pool).unwrap();
474 }
475
476 #[test]
477 fn test_time_interval() {
478 assert_eq!(TimeInterval::FiveMinutes.to_minutes(), 5);
479 assert_eq!(TimeInterval::FifteenMinutes.to_minutes(), 15);
480 assert_eq!(TimeInterval::ThirtyMinutes.to_minutes(), 30);
481 assert_eq!(TimeInterval::OneHour.to_minutes(), 60);
482 assert_eq!(TimeInterval::ThreeHours.to_minutes(), 180);
483 assert_eq!(TimeInterval::SixHours.to_minutes(), 360);
484 assert_eq!(TimeInterval::TwelveHours.to_minutes(), 720);
485 assert_eq!(TimeInterval::TwentyFourHours.to_minutes(), 1440);
486 assert_eq!(TimeInterval::TwoDays.to_minutes(), 2880);
487 assert_eq!(TimeInterval::FiveDays.to_minutes(), 7200);
488 }
489}