scouter_sql/sql/traits/
profile.rs

1use crate::sql::query::Queries;
2use crate::sql::schema::TaskRequest;
3
4use crate::sql::error::SqlError;
5use crate::sql::schema::VersionResult;
6use async_trait::async_trait;
7use chrono::Utc;
8use cron::Schedule;
9use scouter_semver::VersionArgs;
10use scouter_semver::VersionType;
11use scouter_semver::{VersionParser, VersionValidator};
12use scouter_types::{
13    DriftProfile, DriftTaskInfo, GetProfileRequest, ListProfilesRequest, ListedProfile,
14    ProfileArgs, ProfileStatusRequest,
15};
16use semver::Version;
17use serde_json::Value;
18use sqlx::{postgres::PgQueryResult, Pool, Postgres, Row};
19use std::result::Result::Ok;
20use std::str::FromStr;
21use tracing::{error, instrument};
22
23pub fn add_version_bounds(builder: &mut String, version: &str) -> Result<(), SqlError> {
24    let version_bounds = VersionParser::get_version_to_search(version)?;
25
26    // construct lower bound (already validated)
27    builder.push_str(
28        format!(
29            " AND (major >= {} AND minor >= {} and patch >= {})",
30            version_bounds.lower_bound.major,
31            version_bounds.lower_bound.minor,
32            version_bounds.lower_bound.patch
33        )
34        .as_str(),
35    );
36
37    if !version_bounds.no_upper_bound {
38        // construct upper bound based on number of components
39        if version_bounds.num_parts == 1 {
40            builder
41                .push_str(format!(" AND (major < {})", version_bounds.upper_bound.major).as_str());
42        } else if version_bounds.num_parts == 2
43            || version_bounds.num_parts == 3 && version_bounds.parser_type == VersionParser::Tilde
44            || version_bounds.num_parts == 3 && version_bounds.parser_type == VersionParser::Caret
45        {
46            builder.push_str(
47                format!(
48                    " AND (major = {} AND minor < {})",
49                    version_bounds.upper_bound.major, version_bounds.upper_bound.minor
50                )
51                .as_str(),
52            );
53        } else {
54            builder.push_str(
55                format!(
56                    " AND (major = {} AND minor = {} AND patch < {})",
57                    version_bounds.upper_bound.major,
58                    version_bounds.upper_bound.minor,
59                    version_bounds.upper_bound.patch
60                )
61                .as_str(),
62            );
63        }
64    }
65    Ok(())
66}
67
68#[async_trait]
69pub trait ProfileSqlLogic {
70    /// Get profile versions
71    #[instrument(skip_all)]
72    async fn get_next_profile_version(
73        pool: &Pool<Postgres>,
74        args: &ProfileArgs,
75        version_type: VersionType,
76        pre_tag: Option<String>,
77        build_tag: Option<String>,
78    ) -> Result<Version, SqlError> {
79        let mut version_query = Queries::GetProfileVersions.get_query().sql;
80
81        if let Some(version) = &args.version {
82            add_version_bounds(&mut version_query, version)?;
83        }
84        version_query.push_str(" ORDER BY created_at DESC LIMIT 20;");
85
86        let cards: Vec<VersionResult> = sqlx::query_as(&version_query)
87            .bind(&args.space)
88            .bind(&args.name)
89            .fetch_all(pool)
90            .await?;
91
92        let versions = cards
93            .iter()
94            .map(|c| c.to_version())
95            .collect::<Result<Vec<Version>, SqlError>>()?;
96
97        // sort semvers
98        let versions = VersionValidator::sort_semver_versions(versions, true)?;
99
100        if versions.is_empty() {
101            return match &args.version {
102                Some(version_str) => Ok(VersionValidator::clean_version(version_str)?),
103                None => Ok(Version::new(0, 1, 0)),
104            };
105        }
106
107        let base_version = versions.first().unwrap().to_string();
108
109        let args = VersionArgs {
110            version: base_version,
111            version_type,
112            pre: pre_tag,
113            build: build_tag,
114        };
115
116        Ok(VersionValidator::bump_version(&args)?)
117    }
118    /// Insert a drift profile into the database
119    ///
120    /// # Arguments
121    ///
122    /// * `drift_profile` - The drift profile to insert
123    ///
124    /// # Returns
125    ///
126    /// * `Result<PgQueryResult, SqlError>` - Result of the query
127    #[instrument(skip_all)]
128    async fn insert_drift_profile(
129        pool: &Pool<Postgres>,
130        drift_profile: &DriftProfile,
131        base_args: &ProfileArgs,
132        version: &Version,
133        active: &bool,
134        deactivate_others: &bool,
135    ) -> Result<PgQueryResult, SqlError> {
136        let query = Queries::InsertDriftProfile.get_query();
137        let current_time = Utc::now();
138        let schedule = Schedule::from_str(&base_args.schedule)?;
139        let next_run = match schedule.upcoming(Utc).take(1).next() {
140            Some(next_run) => next_run,
141            None => {
142                return Err(SqlError::GetNextRunError);
143            }
144        };
145
146        // Need to convert version to postgres type
147        let major = version.major as i32;
148        let minor = version.minor as i32;
149        let patch = version.patch as i32;
150        let pre: Option<String> = version.pre.to_string().parse().ok();
151        let build: Option<String> = version.build.to_string().parse().ok();
152
153        let result = sqlx::query(&query.sql)
154            .bind(&base_args.space)
155            .bind(&base_args.name)
156            .bind(major)
157            .bind(minor)
158            .bind(patch)
159            .bind(pre)
160            .bind(build)
161            .bind(version.to_string())
162            .bind(&base_args.scouter_version)
163            .bind(drift_profile.to_value())
164            .bind(base_args.drift_type.to_string())
165            .bind(active)
166            .bind(&base_args.schedule)
167            .bind(next_run)
168            .bind(current_time)
169            .execute(pool)
170            .await
171            .map_err(SqlError::SqlxError)?;
172
173        // Only want to deactivate other profiles if this one is active and deactivate_others is true
174        if *active && *deactivate_others {
175            let query = Queries::DeactivateDriftProfiles.get_query();
176
177            let query_result = sqlx::query(&query.sql)
178                .bind(&base_args.name)
179                .bind(&base_args.space)
180                .bind(&base_args.version)
181                .bind(base_args.drift_type.to_string())
182                .execute(pool)
183                .await
184                .map_err(SqlError::SqlxError);
185
186            match query_result {
187                Ok(_) => Ok(result), // return original result
188                Err(e) => {
189                    error!("Failed to deactivate other drift profiles: {:?}", e);
190                    Err(e)
191                }
192            }
193        } else {
194            Ok(result)
195        }
196    }
197
198    /// Update a drift profile in the database
199    ///
200    /// # Arguments
201    ///
202    /// * `drift_profile` - The drift profile to update
203    ///
204    /// # Returns
205    ///
206    /// * `Result<PgQueryResult, SqlError>` - Result of the query
207    async fn update_drift_profile(
208        pool: &Pool<Postgres>,
209        drift_profile: &DriftProfile,
210    ) -> Result<PgQueryResult, SqlError> {
211        let query = Queries::UpdateDriftProfile.get_query();
212        let base_args = drift_profile.get_base_args();
213
214        sqlx::query(&query.sql)
215            .bind(drift_profile.to_value())
216            .bind(base_args.drift_type.to_string())
217            .bind(base_args.name)
218            .bind(base_args.space)
219            .bind(base_args.version)
220            .execute(pool)
221            .await
222            .map_err(SqlError::SqlxError)
223    }
224
225    /// Get a drift profile from the database
226    ///
227    /// # Arguments
228    ///
229    /// * `request` - The request to get the profile for
230    ///
231    /// # Returns
232    async fn get_drift_profile(
233        pool: &Pool<Postgres>,
234        request: &GetProfileRequest,
235    ) -> Result<Option<Value>, SqlError> {
236        let query = Queries::GetDriftProfile.get_query();
237
238        let result = sqlx::query(&query.sql)
239            .bind(&request.name)
240            .bind(&request.space)
241            .bind(&request.version)
242            .bind(request.drift_type.to_string())
243            .fetch_optional(pool)
244            .await
245            .map_err(SqlError::SqlxError)?;
246
247        match result {
248            Some(result) => {
249                let profile: Value = result.get("profile");
250                Ok(Some(profile))
251            }
252            None => Ok(None),
253        }
254    }
255
256    async fn get_drift_profile_task(
257        pool: &Pool<Postgres>,
258    ) -> Result<Option<TaskRequest>, SqlError> {
259        let query = Queries::GetDriftTask.get_query();
260        sqlx::query_as(&query.sql)
261            .fetch_optional(pool)
262            .await
263            .map_err(SqlError::SqlxError)
264    }
265
266    async fn list_drift_profiles(
267        pool: &Pool<Postgres>,
268        args: &ListProfilesRequest,
269    ) -> Result<Vec<ListedProfile>, SqlError> {
270        let profile_query = Queries::ListDriftProfiles.get_query().sql;
271
272        let records: Vec<(bool, Value)> = sqlx::query_as(&profile_query)
273            .bind(&args.space)
274            .bind(&args.name)
275            .bind(&args.version)
276            .fetch_all(pool)
277            .await
278            .map_err(SqlError::SqlxError)?;
279
280        let listed_profiles: Vec<ListedProfile> = records
281            .into_iter()
282            .map(|(active, value)| -> Result<ListedProfile, SqlError> {
283                Ok(ListedProfile {
284                    profile: DriftProfile::from_value(value)?,
285                    active,
286                })
287            })
288            .collect::<Result<Vec<_>, _>>()?;
289
290        Ok(listed_profiles)
291    }
292
293    /// Update the drift profile run dates in the database
294    ///
295    /// # Arguments
296    ///
297    /// * `transaction` - The database transaction
298    /// * `service_info` - The service info to update the run dates for
299    /// * `schedule` - The schedule to update the run dates with
300    ///
301    /// # Returns
302    ///
303    /// * `Result<(), SqlError>` - Result of the query
304    #[instrument(skip_all)]
305    async fn update_drift_profile_run_dates(
306        pool: &Pool<Postgres>,
307        task_info: &DriftTaskInfo,
308        schedule: &str,
309    ) -> Result<(), SqlError> {
310        let query = Queries::UpdateDriftProfileRunDates.get_query();
311
312        let schedule = Schedule::from_str(schedule)?;
313
314        let next_run = match schedule.upcoming(Utc).take(1).next() {
315            Some(next_run) => next_run,
316            None => {
317                return Err(SqlError::GetNextRunError);
318            }
319        };
320
321        let query_result = sqlx::query(&query.sql)
322            .bind(next_run)
323            .bind(&task_info.uid)
324            .execute(pool)
325            .await
326            .map_err(SqlError::SqlxError);
327
328        match query_result {
329            Ok(_) => Ok(()),
330            Err(e) => Err(e),
331        }
332    }
333
334    async fn update_drift_profile_status(
335        pool: &Pool<Postgres>,
336        params: &ProfileStatusRequest,
337    ) -> Result<(), SqlError> {
338        let query = Queries::UpdateDriftProfileStatus.get_query();
339
340        // convert drift_type to string or None
341        let query_result = sqlx::query(&query.sql)
342            .bind(params.active)
343            .bind(&params.name)
344            .bind(&params.space)
345            .bind(&params.version)
346            .bind(params.drift_type.as_ref().map(|t| t.to_string()))
347            .execute(pool)
348            .await
349            .map_err(SqlError::SqlxError);
350
351        match query_result {
352            Ok(_) => {
353                if params.deactivate_others {
354                    let query = Queries::DeactivateDriftProfiles.get_query();
355
356                    let query_result = sqlx::query(&query.sql)
357                        .bind(&params.name)
358                        .bind(&params.space)
359                        .bind(&params.version)
360                        .bind(params.drift_type.as_ref().map(|t| t.to_string()))
361                        .execute(pool)
362                        .await
363                        .map_err(SqlError::SqlxError);
364
365                    match query_result {
366                        Ok(_) => Ok(()),
367                        Err(e) => {
368                            error!("Failed to deactivate other drift profiles: {:?}", e);
369                            Err(e)
370                        }
371                    }
372                } else {
373                    Ok(())
374                }
375            }
376            Err(e) => {
377                error!("Failed to update drift profile status: {:?}", e);
378                Err(e)
379            }
380        }
381    }
382}