scouter_sql/sql/traits/
profile.rs1use crate::sql::query::Queries;
2use crate::sql::schema::TaskRequest;
3
4use chrono::Utc;
5use cron::Schedule;
6
7use crate::sql::error::SqlError;
8use async_trait::async_trait;
9use scouter_types::{DriftProfile, DriftTaskInfo, GetProfileRequest, ProfileStatusRequest};
10use serde_json::Value;
11use sqlx::{postgres::PgQueryResult, Pool, Postgres, Row};
12use std::result::Result::Ok;
13use std::str::FromStr;
14use tracing::{error, instrument};
15
16#[async_trait]
17pub trait ProfileSqlLogic {
18 #[instrument(skip_all)]
28 async fn insert_drift_profile(
29 pool: &Pool<Postgres>,
30 drift_profile: &DriftProfile,
31 ) -> Result<PgQueryResult, SqlError> {
32 let query = Queries::InsertDriftProfile.get_query();
33 let base_args = drift_profile.get_base_args();
34
35 let current_time = Utc::now();
36
37 let schedule = Schedule::from_str(&base_args.schedule)?;
38
39 let next_run = match schedule.upcoming(Utc).take(1).next() {
40 Some(next_run) => next_run,
41 None => {
42 return Err(SqlError::GetNextRunError);
43 }
44 };
45
46 sqlx::query(&query.sql)
47 .bind(base_args.name)
48 .bind(base_args.space)
49 .bind(base_args.version)
50 .bind(base_args.scouter_version)
51 .bind(drift_profile.to_value())
52 .bind(base_args.drift_type.to_string())
53 .bind(false)
54 .bind(base_args.schedule)
55 .bind(next_run)
56 .bind(current_time)
57 .execute(pool)
58 .await
59 .map_err(SqlError::SqlxError)
60 }
61
62 async fn update_drift_profile(
72 pool: &Pool<Postgres>,
73 drift_profile: &DriftProfile,
74 ) -> Result<PgQueryResult, SqlError> {
75 let query = Queries::UpdateDriftProfile.get_query();
76 let base_args = drift_profile.get_base_args();
77
78 sqlx::query(&query.sql)
79 .bind(drift_profile.to_value())
80 .bind(base_args.drift_type.to_string())
81 .bind(base_args.name)
82 .bind(base_args.space)
83 .bind(base_args.version)
84 .execute(pool)
85 .await
86 .map_err(SqlError::SqlxError)
87 }
88
89 async fn get_drift_profile(
97 pool: &Pool<Postgres>,
98 request: &GetProfileRequest,
99 ) -> Result<Option<Value>, SqlError> {
100 let query = Queries::GetDriftProfile.get_query();
101
102 let result = sqlx::query(&query.sql)
103 .bind(&request.name)
104 .bind(&request.space)
105 .bind(&request.version)
106 .bind(request.drift_type.to_string())
107 .fetch_optional(pool)
108 .await
109 .map_err(SqlError::SqlxError)?;
110
111 match result {
112 Some(result) => {
113 let profile: Value = result.get("profile");
114 Ok(Some(profile))
115 }
116 None => Ok(None),
117 }
118 }
119
120 async fn get_drift_profile_task(
121 pool: &Pool<Postgres>,
122 ) -> Result<Option<TaskRequest>, SqlError> {
123 let query = Queries::GetDriftTask.get_query();
124 sqlx::query_as(&query.sql)
125 .fetch_optional(pool)
126 .await
127 .map_err(SqlError::SqlxError)
128 }
129
130 #[instrument(skip_all)]
142 async fn update_drift_profile_run_dates(
143 pool: &Pool<Postgres>,
144 task_info: &DriftTaskInfo,
145 schedule: &str,
146 ) -> Result<(), SqlError> {
147 let query = Queries::UpdateDriftProfileRunDates.get_query();
148
149 let schedule = Schedule::from_str(schedule)?;
150
151 let next_run = match schedule.upcoming(Utc).take(1).next() {
152 Some(next_run) => next_run,
153 None => {
154 return Err(SqlError::GetNextRunError);
155 }
156 };
157
158 let query_result = sqlx::query(&query.sql)
159 .bind(next_run)
160 .bind(&task_info.uid)
161 .execute(pool)
162 .await
163 .map_err(SqlError::SqlxError);
164
165 match query_result {
166 Ok(_) => Ok(()),
167 Err(e) => Err(e),
168 }
169 }
170
171 async fn update_drift_profile_status(
172 pool: &Pool<Postgres>,
173 params: &ProfileStatusRequest,
174 ) -> Result<(), SqlError> {
175 let query = Queries::UpdateDriftProfileStatus.get_query();
176
177 let query_result = sqlx::query(&query.sql)
179 .bind(params.active)
180 .bind(¶ms.name)
181 .bind(¶ms.space)
182 .bind(¶ms.version)
183 .bind(params.drift_type.as_ref().map(|t| t.to_string()))
184 .execute(pool)
185 .await
186 .map_err(SqlError::SqlxError);
187
188 match query_result {
189 Ok(_) => {
190 if params.deactivate_others {
191 let query = Queries::DeactivateDriftProfiles.get_query();
192
193 let query_result = sqlx::query(&query.sql)
194 .bind(¶ms.name)
195 .bind(¶ms.space)
196 .bind(¶ms.version)
197 .bind(params.drift_type.as_ref().map(|t| t.to_string()))
198 .execute(pool)
199 .await
200 .map_err(SqlError::SqlxError);
201
202 match query_result {
203 Ok(_) => Ok(()),
204 Err(e) => {
205 error!("Failed to deactivate other drift profiles: {:?}", e);
206 Err(e)
207 }
208 }
209 } else {
210 Ok(())
211 }
212 }
213 Err(e) => {
214 error!("Failed to update drift profile status: {:?}", e);
215 Err(e)
216 }
217 }
218 }
219}