scouter_sql/sql/traits/
profile.rs1use crate::sql::query::Queries;
2use crate::sql::schema::TaskRequest;
3
4use crate::sql::error::SqlError;
5use crate::sql::schema::VersionResult;
6use async_trait::async_trait;
7use chrono::DateTime;
8use chrono::Utc;
9use cron::Schedule;
10use potato_head::create_uuid7;
11use scouter_semver::VersionArgs;
12use scouter_semver::{VersionParser, VersionValidator};
13use scouter_types::VersionRequest;
14use scouter_types::{
15 DriftProfile, ListProfilesRequest, ListedProfile, ProfileArgs, ProfileStatusRequest,
16};
17use semver::Version;
18use serde_json::Value;
19use sqlx::{postgres::PgQueryResult, Pool, Postgres, Row};
20use std::result::Result::Ok;
21use std::str::FromStr;
22use tracing::{error, instrument};
23
24pub fn add_version_bounds(builder: &mut String, version: &str) -> Result<(), SqlError> {
25 let version_bounds = VersionParser::get_version_to_search(version)?;
26
27 builder.push_str(
29 format!(
30 " AND (major >= {} AND minor >= {} and patch >= {})",
31 version_bounds.lower_bound.major,
32 version_bounds.lower_bound.minor,
33 version_bounds.lower_bound.patch
34 )
35 .as_str(),
36 );
37
38 if !version_bounds.no_upper_bound {
39 if version_bounds.num_parts == 1 {
41 builder
42 .push_str(format!(" AND (major < {})", version_bounds.upper_bound.major).as_str());
43 } else if version_bounds.num_parts == 2
44 || version_bounds.num_parts == 3 && version_bounds.parser_type == VersionParser::Tilde
45 || version_bounds.num_parts == 3 && version_bounds.parser_type == VersionParser::Caret
46 {
47 builder.push_str(
48 format!(
49 " AND (major = {} AND minor < {})",
50 version_bounds.upper_bound.major, version_bounds.upper_bound.minor
51 )
52 .as_str(),
53 );
54 } else {
55 builder.push_str(
56 format!(
57 " AND (major = {} AND minor = {} AND patch < {})",
58 version_bounds.upper_bound.major,
59 version_bounds.upper_bound.minor,
60 version_bounds.upper_bound.patch
61 )
62 .as_str(),
63 );
64 }
65 }
66 Ok(())
67}
68
69#[async_trait]
70pub trait ProfileSqlLogic {
71 #[instrument(skip_all)]
81 async fn get_next_profile_version(
82 pool: &Pool<Postgres>,
83 args: &ProfileArgs,
84 version_request: VersionRequest,
85 ) -> Result<Version, SqlError> {
86 let mut version_query = Queries::GetProfileVersions.get_query().to_string();
87
88 if let Some(version) = &version_request.version {
89 add_version_bounds(&mut version_query, version)?;
90 }
91 version_query.push_str(" ORDER BY created_at DESC LIMIT 20;");
92
93 let cards: Vec<VersionResult> = sqlx::query_as(&version_query)
94 .bind(&args.space)
95 .bind(&args.name)
96 .bind(args.drift_type.to_string())
97 .fetch_all(pool)
98 .await?;
99
100 let versions = cards
101 .iter()
102 .map(|c| c.to_version())
103 .collect::<Result<Vec<Version>, SqlError>>()?;
104
105 let versions = VersionValidator::sort_semver_versions(versions, true)?;
107
108 if versions.is_empty() {
109 return match &version_request.version {
110 Some(version_str) => Ok(VersionValidator::clean_version(version_str)?),
111 None => Ok(Version::new(0, 1, 0)),
112 };
113 }
114
115 let base_version = versions.first().unwrap().to_string();
116
117 let args = VersionArgs {
118 version: base_version,
119 version_type: version_request.version_type,
120 pre: version_request.pre_tag,
121 build: version_request.build_tag,
122 };
123
124 Ok(VersionValidator::bump_version(&args)?)
125 }
126 #[instrument(skip_all)]
136 async fn insert_drift_profile(
137 pool: &Pool<Postgres>,
138 drift_profile: &DriftProfile,
139 base_args: &ProfileArgs,
140 version: &Version,
141 active: &bool,
142 deactivate_others: &bool,
143 ) -> Result<String, SqlError> {
144 let query = Queries::InsertDriftProfile.get_query();
145 let current_time = Utc::now();
146 let schedule = Schedule::from_str(&base_args.schedule)?;
147 let next_run = match schedule.upcoming(Utc).take(1).next() {
148 Some(next_run) => next_run,
149 None => {
150 return Err(SqlError::GetNextRunError);
151 }
152 };
153
154 let major = version.major as i32;
156 let minor = version.minor as i32;
157 let patch = version.patch as i32;
158 let pre: Option<String> = version.pre.to_string().parse().ok();
159 let build: Option<String> = version.build.to_string().parse().ok();
160
161 let result = sqlx::query(query)
162 .bind(create_uuid7())
164 .bind(&base_args.space)
166 .bind(&base_args.name)
167 .bind(major)
169 .bind(minor)
170 .bind(patch)
171 .bind(pre)
172 .bind(build)
173 .bind(version.to_string()) .bind(&base_args.scouter_version)
176 .bind(drift_profile.to_value()) .bind(base_args.drift_type.to_string())
179 .bind(active)
181 .bind(&base_args.schedule)
183 .bind(next_run)
185 .bind(current_time)
187 .bind(deactivate_others)
189 .fetch_one(pool)
190 .await
191 .map_err(SqlError::SqlxError)?;
192
193 let entity_uid: String = result.get("entity_uid");
194 Ok(entity_uid)
195 }
196
197 async fn update_drift_profile(
207 pool: &Pool<Postgres>,
208 drift_profile: &DriftProfile,
209 entity_id: &i32,
210 ) -> Result<PgQueryResult, SqlError> {
211 let query = Queries::UpdateDriftProfile.get_query();
212
213 sqlx::query(query)
214 .bind(drift_profile.to_value())
215 .bind(drift_profile.drift_type().to_string())
216 .bind(entity_id)
217 .execute(pool)
218 .await
219 .map_err(SqlError::SqlxError)
220 }
221
222 async fn get_drift_profile(
230 pool: &Pool<Postgres>,
231 entity_id: &i32,
232 ) -> Result<Option<Value>, SqlError> {
233 let query = Queries::GetDriftProfile.get_query();
234
235 let result = sqlx::query(query)
236 .bind(entity_id)
237 .fetch_optional(pool)
238 .await
239 .map_err(SqlError::SqlxError)?;
240
241 match result {
242 Some(result) => {
243 let profile: Value = result.get("profile");
244 Ok(Some(profile))
245 }
246 None => Ok(None),
247 }
248 }
249
250 async fn get_drift_profile_task(
251 pool: &Pool<Postgres>,
252 ) -> Result<Option<TaskRequest>, SqlError> {
253 let query = Queries::GetDriftTask.get_query();
254 sqlx::query_as(query)
255 .fetch_optional(pool)
256 .await
257 .map_err(|e| {
258 error!("Failed to get drift profile task: {:?}", e);
259 SqlError::SqlxError(e)
260 })
261 }
262
263 async fn list_drift_profiles(
264 pool: &Pool<Postgres>,
265 args: &ListProfilesRequest,
266 ) -> Result<Vec<ListedProfile>, SqlError> {
267 let profile_query = Queries::ListDriftProfiles.get_query();
268
269 let records: Vec<(bool, Value)> = sqlx::query_as(profile_query)
270 .bind(&args.space)
271 .bind(&args.name)
272 .bind(&args.version)
273 .fetch_all(pool)
274 .await
275 .map_err(SqlError::SqlxError)?;
276
277 let listed_profiles: Vec<ListedProfile> = records
278 .into_iter()
279 .map(|(active, value)| -> Result<ListedProfile, SqlError> {
280 Ok(ListedProfile {
281 profile: DriftProfile::from_value(value)?,
282 active,
283 })
284 })
285 .collect::<Result<Vec<_>, _>>()?;
286
287 Ok(listed_profiles)
288 }
289
290 #[instrument(skip_all)]
304 async fn update_drift_profile_run_dates(
305 pool: &Pool<Postgres>,
306 entity_id: &i32,
307 schedule: &str,
308 previous_run: &DateTime<Utc>,
309 ) -> Result<(), SqlError> {
310 let query = Queries::UpdateDriftProfileRunDates.get_query();
311
312 let schedule = Schedule::from_str(schedule)?;
313
314 let next_run = match schedule.after(previous_run).next() {
315 Some(next_run) => next_run,
316 None => {
317 error!("Failed to calculate next run for schedule: {}", schedule);
318 return Err(SqlError::GetNextRunError);
319 }
320 };
321
322 let query_result = sqlx::query(query)
323 .bind(next_run)
324 .bind(entity_id)
325 .execute(pool)
326 .await
327 .map_err(SqlError::SqlxError);
328
329 match query_result {
330 Ok(_) => Ok(()),
331 Err(e) => Err(e),
332 }
333 }
334
335 async fn update_drift_profile_status(
336 pool: &Pool<Postgres>,
337 params: &ProfileStatusRequest,
338 ) -> Result<(), SqlError> {
339 let query = Queries::UpdateDriftProfileStatus.get_query();
340
341 let query_result = sqlx::query(query)
343 .bind(params.active)
344 .bind(¶ms.name)
345 .bind(¶ms.space)
346 .bind(¶ms.version)
347 .bind(params.drift_type.as_ref().map(|t| t.to_string()))
348 .execute(pool)
349 .await
350 .map_err(SqlError::SqlxError);
351
352 match query_result {
353 Ok(_) => {
354 if params.deactivate_others {
355 let query = Queries::DeactivateDriftProfiles.get_query();
356
357 let query_result = sqlx::query(query)
358 .bind(¶ms.name)
359 .bind(¶ms.space)
360 .bind(¶ms.version)
361 .bind(params.drift_type.as_ref().map(|t| t.to_string()))
362 .execute(pool)
363 .await
364 .map_err(SqlError::SqlxError);
365
366 match query_result {
367 Ok(_) => Ok(()),
368 Err(e) => {
369 error!("Failed to deactivate other drift profiles: {:?}", e);
370 Err(e)
371 }
372 }
373 } else {
374 Ok(())
375 }
376 }
377 Err(e) => {
378 error!("Failed to update drift profile status: {:?}", e);
379 Err(e)
380 }
381 }
382 }
383}