scouter_sql/sql/traits/
profile.rs1use crate::sql::query::Queries;
2use crate::sql::schema::TaskRequest;
3
4use crate::sql::error::SqlError;
5use crate::sql::schema::VersionResult;
6use async_trait::async_trait;
7use chrono::Utc;
8use cron::Schedule;
9use scouter_semver::VersionArgs;
10use scouter_semver::VersionType;
11use scouter_semver::{VersionParser, VersionValidator};
12use scouter_types::{
13 DriftProfile, DriftTaskInfo, GetProfileRequest, ListProfilesRequest, ListedProfile,
14 ProfileArgs, ProfileStatusRequest,
15};
16use semver::Version;
17use serde_json::Value;
18use sqlx::{postgres::PgQueryResult, Pool, Postgres, Row};
19use std::result::Result::Ok;
20use std::str::FromStr;
21use tracing::{error, instrument};
22
23pub fn add_version_bounds(builder: &mut String, version: &str) -> Result<(), SqlError> {
24 let version_bounds = VersionParser::get_version_to_search(version)?;
25
26 builder.push_str(
28 format!(
29 " AND (major >= {} AND minor >= {} and patch >= {})",
30 version_bounds.lower_bound.major,
31 version_bounds.lower_bound.minor,
32 version_bounds.lower_bound.patch
33 )
34 .as_str(),
35 );
36
37 if !version_bounds.no_upper_bound {
38 if version_bounds.num_parts == 1 {
40 builder
41 .push_str(format!(" AND (major < {})", version_bounds.upper_bound.major).as_str());
42 } else if version_bounds.num_parts == 2
43 || version_bounds.num_parts == 3 && version_bounds.parser_type == VersionParser::Tilde
44 || version_bounds.num_parts == 3 && version_bounds.parser_type == VersionParser::Caret
45 {
46 builder.push_str(
47 format!(
48 " AND (major = {} AND minor < {})",
49 version_bounds.upper_bound.major, version_bounds.upper_bound.minor
50 )
51 .as_str(),
52 );
53 } else {
54 builder.push_str(
55 format!(
56 " AND (major = {} AND minor = {} AND patch < {})",
57 version_bounds.upper_bound.major,
58 version_bounds.upper_bound.minor,
59 version_bounds.upper_bound.patch
60 )
61 .as_str(),
62 );
63 }
64 }
65 Ok(())
66}
67
68#[async_trait]
69pub trait ProfileSqlLogic {
70 #[instrument(skip_all)]
72 async fn get_next_profile_version(
73 pool: &Pool<Postgres>,
74 args: &ProfileArgs,
75 version_type: VersionType,
76 pre_tag: Option<String>,
77 build_tag: Option<String>,
78 ) -> Result<Version, SqlError> {
79 let mut version_query = Queries::GetProfileVersions.get_query().sql;
80
81 if let Some(version) = &args.version {
82 add_version_bounds(&mut version_query, version)?;
83 }
84 version_query.push_str(" ORDER BY created_at DESC LIMIT 20;");
85
86 let cards: Vec<VersionResult> = sqlx::query_as(&version_query)
87 .bind(&args.space)
88 .bind(&args.name)
89 .fetch_all(pool)
90 .await?;
91
92 let versions = cards
93 .iter()
94 .map(|c| c.to_version())
95 .collect::<Result<Vec<Version>, SqlError>>()?;
96
97 let versions = VersionValidator::sort_semver_versions(versions, true)?;
99
100 if versions.is_empty() {
101 return match &args.version {
102 Some(version_str) => Ok(VersionValidator::clean_version(version_str)?),
103 None => Ok(Version::new(0, 1, 0)),
104 };
105 }
106
107 let base_version = versions.first().unwrap().to_string();
108
109 let args = VersionArgs {
110 version: base_version,
111 version_type,
112 pre: pre_tag,
113 build: build_tag,
114 };
115
116 Ok(VersionValidator::bump_version(&args)?)
117 }
118 #[instrument(skip_all)]
128 async fn insert_drift_profile(
129 pool: &Pool<Postgres>,
130 drift_profile: &DriftProfile,
131 base_args: &ProfileArgs,
132 version: &Version,
133 active: &bool,
134 deactivate_others: &bool,
135 ) -> Result<PgQueryResult, SqlError> {
136 let query = Queries::InsertDriftProfile.get_query();
137 let current_time = Utc::now();
138 let schedule = Schedule::from_str(&base_args.schedule)?;
139 let next_run = match schedule.upcoming(Utc).take(1).next() {
140 Some(next_run) => next_run,
141 None => {
142 return Err(SqlError::GetNextRunError);
143 }
144 };
145
146 let major = version.major as i32;
148 let minor = version.minor as i32;
149 let patch = version.patch as i32;
150 let pre: Option<String> = version.pre.to_string().parse().ok();
151 let build: Option<String> = version.build.to_string().parse().ok();
152
153 let result = sqlx::query(&query.sql)
154 .bind(&base_args.space)
155 .bind(&base_args.name)
156 .bind(major)
157 .bind(minor)
158 .bind(patch)
159 .bind(pre)
160 .bind(build)
161 .bind(version.to_string())
162 .bind(&base_args.scouter_version)
163 .bind(drift_profile.to_value())
164 .bind(base_args.drift_type.to_string())
165 .bind(active)
166 .bind(&base_args.schedule)
167 .bind(next_run)
168 .bind(current_time)
169 .execute(pool)
170 .await
171 .map_err(SqlError::SqlxError)?;
172
173 if *active && *deactivate_others {
175 let query = Queries::DeactivateDriftProfiles.get_query();
176
177 let query_result = sqlx::query(&query.sql)
178 .bind(&base_args.name)
179 .bind(&base_args.space)
180 .bind(&base_args.version)
181 .bind(base_args.drift_type.to_string())
182 .execute(pool)
183 .await
184 .map_err(SqlError::SqlxError);
185
186 match query_result {
187 Ok(_) => Ok(result), Err(e) => {
189 error!("Failed to deactivate other drift profiles: {:?}", e);
190 Err(e)
191 }
192 }
193 } else {
194 Ok(result)
195 }
196 }
197
198 async fn update_drift_profile(
208 pool: &Pool<Postgres>,
209 drift_profile: &DriftProfile,
210 ) -> Result<PgQueryResult, SqlError> {
211 let query = Queries::UpdateDriftProfile.get_query();
212 let base_args = drift_profile.get_base_args();
213
214 sqlx::query(&query.sql)
215 .bind(drift_profile.to_value())
216 .bind(base_args.drift_type.to_string())
217 .bind(base_args.name)
218 .bind(base_args.space)
219 .bind(base_args.version)
220 .execute(pool)
221 .await
222 .map_err(SqlError::SqlxError)
223 }
224
225 async fn get_drift_profile(
233 pool: &Pool<Postgres>,
234 request: &GetProfileRequest,
235 ) -> Result<Option<Value>, SqlError> {
236 let query = Queries::GetDriftProfile.get_query();
237
238 let result = sqlx::query(&query.sql)
239 .bind(&request.name)
240 .bind(&request.space)
241 .bind(&request.version)
242 .bind(request.drift_type.to_string())
243 .fetch_optional(pool)
244 .await
245 .map_err(SqlError::SqlxError)?;
246
247 match result {
248 Some(result) => {
249 let profile: Value = result.get("profile");
250 Ok(Some(profile))
251 }
252 None => Ok(None),
253 }
254 }
255
256 async fn get_drift_profile_task(
257 pool: &Pool<Postgres>,
258 ) -> Result<Option<TaskRequest>, SqlError> {
259 let query = Queries::GetDriftTask.get_query();
260 sqlx::query_as(&query.sql)
261 .fetch_optional(pool)
262 .await
263 .map_err(SqlError::SqlxError)
264 }
265
266 async fn list_drift_profiles(
267 pool: &Pool<Postgres>,
268 args: &ListProfilesRequest,
269 ) -> Result<Vec<ListedProfile>, SqlError> {
270 let profile_query = Queries::ListDriftProfiles.get_query().sql;
271
272 let records: Vec<(bool, Value)> = sqlx::query_as(&profile_query)
273 .bind(&args.space)
274 .bind(&args.name)
275 .bind(&args.version)
276 .fetch_all(pool)
277 .await
278 .map_err(SqlError::SqlxError)?;
279
280 let listed_profiles: Vec<ListedProfile> = records
281 .into_iter()
282 .map(|(active, value)| -> Result<ListedProfile, SqlError> {
283 Ok(ListedProfile {
284 profile: DriftProfile::from_value(value)?,
285 active,
286 })
287 })
288 .collect::<Result<Vec<_>, _>>()?;
289
290 Ok(listed_profiles)
291 }
292
293 #[instrument(skip_all)]
305 async fn update_drift_profile_run_dates(
306 pool: &Pool<Postgres>,
307 task_info: &DriftTaskInfo,
308 schedule: &str,
309 ) -> Result<(), SqlError> {
310 let query = Queries::UpdateDriftProfileRunDates.get_query();
311
312 let schedule = Schedule::from_str(schedule)?;
313
314 let next_run = match schedule.upcoming(Utc).take(1).next() {
315 Some(next_run) => next_run,
316 None => {
317 return Err(SqlError::GetNextRunError);
318 }
319 };
320
321 let query_result = sqlx::query(&query.sql)
322 .bind(next_run)
323 .bind(&task_info.uid)
324 .execute(pool)
325 .await
326 .map_err(SqlError::SqlxError);
327
328 match query_result {
329 Ok(_) => Ok(()),
330 Err(e) => Err(e),
331 }
332 }
333
334 async fn update_drift_profile_status(
335 pool: &Pool<Postgres>,
336 params: &ProfileStatusRequest,
337 ) -> Result<(), SqlError> {
338 let query = Queries::UpdateDriftProfileStatus.get_query();
339
340 let query_result = sqlx::query(&query.sql)
342 .bind(params.active)
343 .bind(¶ms.name)
344 .bind(¶ms.space)
345 .bind(¶ms.version)
346 .bind(params.drift_type.as_ref().map(|t| t.to_string()))
347 .execute(pool)
348 .await
349 .map_err(SqlError::SqlxError);
350
351 match query_result {
352 Ok(_) => {
353 if params.deactivate_others {
354 let query = Queries::DeactivateDriftProfiles.get_query();
355
356 let query_result = sqlx::query(&query.sql)
357 .bind(¶ms.name)
358 .bind(¶ms.space)
359 .bind(¶ms.version)
360 .bind(params.drift_type.as_ref().map(|t| t.to_string()))
361 .execute(pool)
362 .await
363 .map_err(SqlError::SqlxError);
364
365 match query_result {
366 Ok(_) => Ok(()),
367 Err(e) => {
368 error!("Failed to deactivate other drift profiles: {:?}", e);
369 Err(e)
370 }
371 }
372 } else {
373 Ok(())
374 }
375 }
376 Err(e) => {
377 error!("Failed to update drift profile status: {:?}", e);
378 Err(e)
379 }
380 }
381 }
382}