scouter_sql/sql/traits/
profile.rs1use crate::sql::query::Queries;
2use crate::sql::schema::TaskRequest;
3
4use crate::sql::error::SqlError;
5use crate::sql::schema::VersionResult;
6use async_trait::async_trait;
7use chrono::Utc;
8use cron::Schedule;
9use scouter_semver::VersionArgs;
10use scouter_semver::VersionType;
11use scouter_semver::{VersionParser, VersionValidator};
12use scouter_types::{
13 DriftProfile, DriftTaskInfo, GetProfileRequest, ProfileArgs, ProfileStatusRequest,
14};
15use semver::Version;
16use serde_json::Value;
17use sqlx::{postgres::PgQueryResult, Pool, Postgres, Row};
18use std::result::Result::Ok;
19use std::str::FromStr;
20use tracing::{error, instrument};
21
22pub fn add_version_bounds(builder: &mut String, version: &str) -> Result<(), SqlError> {
23 let version_bounds = VersionParser::get_version_to_search(version)?;
24
25 builder.push_str(
27 format!(
28 " AND (major >= {} AND minor >= {} and patch >= {})",
29 version_bounds.lower_bound.major,
30 version_bounds.lower_bound.minor,
31 version_bounds.lower_bound.patch
32 )
33 .as_str(),
34 );
35
36 if !version_bounds.no_upper_bound {
37 if version_bounds.num_parts == 1 {
39 builder
40 .push_str(format!(" AND (major < {})", version_bounds.upper_bound.major).as_str());
41 } else if version_bounds.num_parts == 2
42 || version_bounds.num_parts == 3 && version_bounds.parser_type == VersionParser::Tilde
43 || version_bounds.num_parts == 3 && version_bounds.parser_type == VersionParser::Caret
44 {
45 builder.push_str(
46 format!(
47 " AND (major = {} AND minor < {})",
48 version_bounds.upper_bound.major, version_bounds.upper_bound.minor
49 )
50 .as_str(),
51 );
52 } else {
53 builder.push_str(
54 format!(
55 " AND (major = {} AND minor = {} AND patch < {})",
56 version_bounds.upper_bound.major,
57 version_bounds.upper_bound.minor,
58 version_bounds.upper_bound.patch
59 )
60 .as_str(),
61 );
62 }
63 }
64 Ok(())
65}
66
67#[async_trait]
68pub trait ProfileSqlLogic {
69 #[instrument(skip_all)]
71 async fn get_next_profile_version(
72 pool: &Pool<Postgres>,
73 args: &ProfileArgs,
74 version_type: VersionType,
75 pre_tag: Option<String>,
76 build_tag: Option<String>,
77 ) -> Result<Version, SqlError> {
78 let mut version_query = Queries::GetProfileVersions.get_query().sql;
79
80 if let Some(version) = &args.version {
81 add_version_bounds(&mut version_query, version)?;
82 }
83 version_query.push_str(" ORDER BY created_at DESC LIMIT 20;");
84
85 let cards: Vec<VersionResult> = sqlx::query_as(&version_query)
86 .bind(&args.space)
87 .bind(&args.name)
88 .fetch_all(pool)
89 .await?;
90
91 let versions = cards
92 .iter()
93 .map(|c| c.to_version())
94 .collect::<Result<Vec<Version>, SqlError>>()?;
95
96 let versions = VersionValidator::sort_semver_versions(versions, true)?;
98
99 if versions.is_empty() {
100 return match &args.version {
101 Some(version_str) => Ok(VersionValidator::clean_version(version_str)?),
102 None => Ok(Version::new(0, 1, 0)),
103 };
104 }
105
106 let base_version = versions.first().unwrap().to_string();
107
108 let args = VersionArgs {
109 version: base_version,
110 version_type,
111 pre: pre_tag,
112 build: build_tag,
113 };
114
115 Ok(VersionValidator::bump_version(&args)?)
116 }
117 #[instrument(skip_all)]
127 async fn insert_drift_profile(
128 pool: &Pool<Postgres>,
129 drift_profile: &DriftProfile,
130 base_args: &ProfileArgs,
131 version: &Version,
132 ) -> Result<PgQueryResult, SqlError> {
133 let query = Queries::InsertDriftProfile.get_query();
134 let current_time = Utc::now();
135 let schedule = Schedule::from_str(&base_args.schedule)?;
136 let next_run = match schedule.upcoming(Utc).take(1).next() {
137 Some(next_run) => next_run,
138 None => {
139 return Err(SqlError::GetNextRunError);
140 }
141 };
142
143 let major = version.major as i32;
145 let minor = version.minor as i32;
146 let patch = version.patch as i32;
147 let pre: Option<String> = version.pre.to_string().parse().ok();
148 let build: Option<String> = version.build.to_string().parse().ok();
149
150 sqlx::query(&query.sql)
151 .bind(&base_args.space)
152 .bind(&base_args.name)
153 .bind(major)
154 .bind(minor)
155 .bind(patch)
156 .bind(pre)
157 .bind(build)
158 .bind(version.to_string())
159 .bind(&base_args.scouter_version)
160 .bind(drift_profile.to_value())
161 .bind(base_args.drift_type.to_string())
162 .bind(false)
163 .bind(&base_args.schedule)
164 .bind(next_run)
165 .bind(current_time)
166 .execute(pool)
167 .await
168 .map_err(SqlError::SqlxError)
169 }
170
171 async fn update_drift_profile(
181 pool: &Pool<Postgres>,
182 drift_profile: &DriftProfile,
183 ) -> Result<PgQueryResult, SqlError> {
184 let query = Queries::UpdateDriftProfile.get_query();
185 let base_args = drift_profile.get_base_args();
186
187 sqlx::query(&query.sql)
188 .bind(drift_profile.to_value())
189 .bind(base_args.drift_type.to_string())
190 .bind(base_args.name)
191 .bind(base_args.space)
192 .bind(base_args.version)
193 .execute(pool)
194 .await
195 .map_err(SqlError::SqlxError)
196 }
197
198 async fn get_drift_profile(
206 pool: &Pool<Postgres>,
207 request: &GetProfileRequest,
208 ) -> Result<Option<Value>, SqlError> {
209 let query = Queries::GetDriftProfile.get_query();
210
211 let result = sqlx::query(&query.sql)
212 .bind(&request.name)
213 .bind(&request.space)
214 .bind(&request.version)
215 .bind(request.drift_type.to_string())
216 .fetch_optional(pool)
217 .await
218 .map_err(SqlError::SqlxError)?;
219
220 match result {
221 Some(result) => {
222 let profile: Value = result.get("profile");
223 Ok(Some(profile))
224 }
225 None => Ok(None),
226 }
227 }
228
229 async fn get_drift_profile_task(
230 pool: &Pool<Postgres>,
231 ) -> Result<Option<TaskRequest>, SqlError> {
232 let query = Queries::GetDriftTask.get_query();
233 sqlx::query_as(&query.sql)
234 .fetch_optional(pool)
235 .await
236 .map_err(SqlError::SqlxError)
237 }
238
239 #[instrument(skip_all)]
251 async fn update_drift_profile_run_dates(
252 pool: &Pool<Postgres>,
253 task_info: &DriftTaskInfo,
254 schedule: &str,
255 ) -> Result<(), SqlError> {
256 let query = Queries::UpdateDriftProfileRunDates.get_query();
257
258 let schedule = Schedule::from_str(schedule)?;
259
260 let next_run = match schedule.upcoming(Utc).take(1).next() {
261 Some(next_run) => next_run,
262 None => {
263 return Err(SqlError::GetNextRunError);
264 }
265 };
266
267 let query_result = sqlx::query(&query.sql)
268 .bind(next_run)
269 .bind(&task_info.uid)
270 .execute(pool)
271 .await
272 .map_err(SqlError::SqlxError);
273
274 match query_result {
275 Ok(_) => Ok(()),
276 Err(e) => Err(e),
277 }
278 }
279
280 async fn update_drift_profile_status(
281 pool: &Pool<Postgres>,
282 params: &ProfileStatusRequest,
283 ) -> Result<(), SqlError> {
284 let query = Queries::UpdateDriftProfileStatus.get_query();
285
286 let query_result = sqlx::query(&query.sql)
288 .bind(params.active)
289 .bind(¶ms.name)
290 .bind(¶ms.space)
291 .bind(¶ms.version)
292 .bind(params.drift_type.as_ref().map(|t| t.to_string()))
293 .execute(pool)
294 .await
295 .map_err(SqlError::SqlxError);
296
297 match query_result {
298 Ok(_) => {
299 if params.deactivate_others {
300 let query = Queries::DeactivateDriftProfiles.get_query();
301
302 let query_result = sqlx::query(&query.sql)
303 .bind(¶ms.name)
304 .bind(¶ms.space)
305 .bind(¶ms.version)
306 .bind(params.drift_type.as_ref().map(|t| t.to_string()))
307 .execute(pool)
308 .await
309 .map_err(SqlError::SqlxError);
310
311 match query_result {
312 Ok(_) => Ok(()),
313 Err(e) => {
314 error!("Failed to deactivate other drift profiles: {:?}", e);
315 Err(e)
316 }
317 }
318 } else {
319 Ok(())
320 }
321 }
322 Err(e) => {
323 error!("Failed to update drift profile status: {:?}", e);
324 Err(e)
325 }
326 }
327 }
328}