scouter_sql/sql/traits/
alert.rs1use crate::sql::query::Queries;
2use crate::sql::schema::{AlertWrapper, UpdateAlertResult};
3
4use scouter_types::contracts::{DriftAlertRequest, UpdateAlertStatus};
5
6use crate::sql::error::SqlError;
7use scouter_types::alert::Alert;
8use scouter_types::{DriftTaskInfo, DriftType};
9
10use sqlx::{postgres::PgQueryResult, Pool, Postgres};
11use std::collections::BTreeMap;
12use std::result::Result::Ok;
13
14use async_trait::async_trait;
15
16#[async_trait]
17pub trait AlertSqlLogic {
18 async fn insert_drift_alert(
28 pool: &Pool<Postgres>,
29 task_info: &DriftTaskInfo,
30 entity_name: &str,
31 alert: &BTreeMap<String, String>,
32 drift_type: &DriftType,
33 ) -> Result<PgQueryResult, SqlError> {
34 let query = Queries::InsertDriftAlert.get_query();
35
36 let query_result = sqlx::query(&query.sql)
37 .bind(&task_info.name)
38 .bind(&task_info.space)
39 .bind(&task_info.version)
40 .bind(entity_name)
41 .bind(serde_json::to_value(alert).unwrap())
42 .bind(drift_type.to_string())
43 .execute(pool)
44 .await?;
45
46 Ok(query_result)
47 }
48
49 async fn get_drift_alerts(
59 pool: &Pool<Postgres>,
60 params: &DriftAlertRequest,
61 ) -> Result<Vec<Alert>, SqlError> {
62 let mut query = Queries::GetDriftAlerts.get_query().sql;
63
64 if params.active.unwrap_or(false) {
65 query.push_str(" AND active = true");
66 }
67
68 query.push_str(" ORDER BY created_at DESC");
69
70 if let Some(limit) = params.limit {
71 query.push_str(&format!(" LIMIT {limit}"));
72 }
73
74 let result: Result<Vec<AlertWrapper>, SqlError> = sqlx::query_as(&query)
77 .bind(¶ms.version)
78 .bind(¶ms.name)
79 .bind(¶ms.space)
80 .bind(params.limit_datetime)
81 .fetch_all(pool)
82 .await
83 .map_err(SqlError::SqlxError);
84
85 result.map(|result| result.into_iter().map(|wrapper| wrapper.0).collect())
86 }
87
88 async fn update_drift_alert_status(
89 pool: &Pool<Postgres>,
90 params: &UpdateAlertStatus,
91 ) -> Result<UpdateAlertResult, SqlError> {
92 let query = Queries::UpdateAlertStatus.get_query();
93
94 let result: Result<UpdateAlertResult, SqlError> = sqlx::query_as(&query.sql)
95 .bind(params.id)
96 .bind(params.active)
97 .fetch_one(pool)
98 .await
99 .map_err(SqlError::SqlxError);
100
101 result
102 }
103}