scouter_sql/sql/traits/
profile.rs

1use crate::sql::query::Queries;
2use crate::sql::schema::TaskRequest;
3
4use crate::sql::error::SqlError;
5use crate::sql::schema::VersionResult;
6use async_trait::async_trait;
7use chrono::Utc;
8use cron::Schedule;
9use scouter_semver::VersionArgs;
10use scouter_semver::VersionType;
11use scouter_semver::{VersionParser, VersionValidator};
12use scouter_types::{
13    DriftProfile, DriftTaskInfo, GetProfileRequest, ProfileArgs, ProfileStatusRequest,
14};
15use semver::Version;
16use serde_json::Value;
17use sqlx::{postgres::PgQueryResult, Pool, Postgres, Row};
18use std::result::Result::Ok;
19use std::str::FromStr;
20use tracing::{error, instrument};
21
22pub fn add_version_bounds(builder: &mut String, version: &str) -> Result<(), SqlError> {
23    let version_bounds = VersionParser::get_version_to_search(version)?;
24
25    // construct lower bound (already validated)
26    builder.push_str(
27        format!(
28            " AND (major >= {} AND minor >= {} and patch >= {})",
29            version_bounds.lower_bound.major,
30            version_bounds.lower_bound.minor,
31            version_bounds.lower_bound.patch
32        )
33        .as_str(),
34    );
35
36    if !version_bounds.no_upper_bound {
37        // construct upper bound based on number of components
38        if version_bounds.num_parts == 1 {
39            builder
40                .push_str(format!(" AND (major < {})", version_bounds.upper_bound.major).as_str());
41        } else if version_bounds.num_parts == 2
42            || version_bounds.num_parts == 3 && version_bounds.parser_type == VersionParser::Tilde
43            || version_bounds.num_parts == 3 && version_bounds.parser_type == VersionParser::Caret
44        {
45            builder.push_str(
46                format!(
47                    " AND (major = {} AND minor < {})",
48                    version_bounds.upper_bound.major, version_bounds.upper_bound.minor
49                )
50                .as_str(),
51            );
52        } else {
53            builder.push_str(
54                format!(
55                    " AND (major = {} AND minor = {} AND patch < {})",
56                    version_bounds.upper_bound.major,
57                    version_bounds.upper_bound.minor,
58                    version_bounds.upper_bound.patch
59                )
60                .as_str(),
61            );
62        }
63    }
64    Ok(())
65}
66
67#[async_trait]
68pub trait ProfileSqlLogic {
69    /// Get profile versions
70    #[instrument(skip_all)]
71    async fn get_next_profile_version(
72        pool: &Pool<Postgres>,
73        args: &ProfileArgs,
74        version_type: VersionType,
75        pre_tag: Option<String>,
76        build_tag: Option<String>,
77    ) -> Result<Version, SqlError> {
78        let mut version_query = Queries::GetProfileVersions.get_query().sql;
79
80        if let Some(version) = &args.version {
81            add_version_bounds(&mut version_query, version)?;
82        }
83        version_query.push_str(" ORDER BY created_at DESC LIMIT 20;");
84
85        let cards: Vec<VersionResult> = sqlx::query_as(&version_query)
86            .bind(&args.space)
87            .bind(&args.name)
88            .fetch_all(pool)
89            .await?;
90
91        let versions = cards
92            .iter()
93            .map(|c| c.to_version())
94            .collect::<Result<Vec<Version>, SqlError>>()?;
95
96        // sort semvers
97        let versions = VersionValidator::sort_semver_versions(versions, true)?;
98
99        if versions.is_empty() {
100            return match &args.version {
101                Some(version_str) => Ok(VersionValidator::clean_version(version_str)?),
102                None => Ok(Version::new(0, 1, 0)),
103            };
104        }
105
106        let base_version = versions.first().unwrap().to_string();
107
108        let args = VersionArgs {
109            version: base_version,
110            version_type,
111            pre: pre_tag,
112            build: build_tag,
113        };
114
115        Ok(VersionValidator::bump_version(&args)?)
116    }
117    /// Insert a drift profile into the database
118    ///
119    /// # Arguments
120    ///
121    /// * `drift_profile` - The drift profile to insert
122    ///
123    /// # Returns
124    ///
125    /// * `Result<PgQueryResult, SqlError>` - Result of the query
126    #[instrument(skip_all)]
127    async fn insert_drift_profile(
128        pool: &Pool<Postgres>,
129        drift_profile: &DriftProfile,
130        base_args: &ProfileArgs,
131        version: &Version,
132    ) -> Result<PgQueryResult, SqlError> {
133        let query = Queries::InsertDriftProfile.get_query();
134        let current_time = Utc::now();
135        let schedule = Schedule::from_str(&base_args.schedule)?;
136        let next_run = match schedule.upcoming(Utc).take(1).next() {
137            Some(next_run) => next_run,
138            None => {
139                return Err(SqlError::GetNextRunError);
140            }
141        };
142
143        // Need to convert version to postgres type
144        let major = version.major as i32;
145        let minor = version.minor as i32;
146        let patch = version.patch as i32;
147        let pre: Option<String> = version.pre.to_string().parse().ok();
148        let build: Option<String> = version.build.to_string().parse().ok();
149
150        sqlx::query(&query.sql)
151            .bind(&base_args.space)
152            .bind(&base_args.name)
153            .bind(major)
154            .bind(minor)
155            .bind(patch)
156            .bind(pre)
157            .bind(build)
158            .bind(version.to_string())
159            .bind(&base_args.scouter_version)
160            .bind(drift_profile.to_value())
161            .bind(base_args.drift_type.to_string())
162            .bind(false)
163            .bind(&base_args.schedule)
164            .bind(next_run)
165            .bind(current_time)
166            .execute(pool)
167            .await
168            .map_err(SqlError::SqlxError)
169    }
170
171    /// Update a drift profile in the database
172    ///
173    /// # Arguments
174    ///
175    /// * `drift_profile` - The drift profile to update
176    ///
177    /// # Returns
178    ///
179    /// * `Result<PgQueryResult, SqlError>` - Result of the query
180    async fn update_drift_profile(
181        pool: &Pool<Postgres>,
182        drift_profile: &DriftProfile,
183    ) -> Result<PgQueryResult, SqlError> {
184        let query = Queries::UpdateDriftProfile.get_query();
185        let base_args = drift_profile.get_base_args();
186
187        sqlx::query(&query.sql)
188            .bind(drift_profile.to_value())
189            .bind(base_args.drift_type.to_string())
190            .bind(base_args.name)
191            .bind(base_args.space)
192            .bind(base_args.version)
193            .execute(pool)
194            .await
195            .map_err(SqlError::SqlxError)
196    }
197
198    /// Get a drift profile from the database
199    ///
200    /// # Arguments
201    ///
202    /// * `request` - The request to get the profile for
203    ///
204    /// # Returns
205    async fn get_drift_profile(
206        pool: &Pool<Postgres>,
207        request: &GetProfileRequest,
208    ) -> Result<Option<Value>, SqlError> {
209        let query = Queries::GetDriftProfile.get_query();
210
211        let result = sqlx::query(&query.sql)
212            .bind(&request.name)
213            .bind(&request.space)
214            .bind(&request.version)
215            .bind(request.drift_type.to_string())
216            .fetch_optional(pool)
217            .await
218            .map_err(SqlError::SqlxError)?;
219
220        match result {
221            Some(result) => {
222                let profile: Value = result.get("profile");
223                Ok(Some(profile))
224            }
225            None => Ok(None),
226        }
227    }
228
229    async fn get_drift_profile_task(
230        pool: &Pool<Postgres>,
231    ) -> Result<Option<TaskRequest>, SqlError> {
232        let query = Queries::GetDriftTask.get_query();
233        sqlx::query_as(&query.sql)
234            .fetch_optional(pool)
235            .await
236            .map_err(SqlError::SqlxError)
237    }
238
239    /// Update the drift profile run dates in the database
240    ///
241    /// # Arguments
242    ///
243    /// * `transaction` - The database transaction
244    /// * `service_info` - The service info to update the run dates for
245    /// * `schedule` - The schedule to update the run dates with
246    ///
247    /// # Returns
248    ///
249    /// * `Result<(), SqlError>` - Result of the query
250    #[instrument(skip_all)]
251    async fn update_drift_profile_run_dates(
252        pool: &Pool<Postgres>,
253        task_info: &DriftTaskInfo,
254        schedule: &str,
255    ) -> Result<(), SqlError> {
256        let query = Queries::UpdateDriftProfileRunDates.get_query();
257
258        let schedule = Schedule::from_str(schedule)?;
259
260        let next_run = match schedule.upcoming(Utc).take(1).next() {
261            Some(next_run) => next_run,
262            None => {
263                return Err(SqlError::GetNextRunError);
264            }
265        };
266
267        let query_result = sqlx::query(&query.sql)
268            .bind(next_run)
269            .bind(&task_info.uid)
270            .execute(pool)
271            .await
272            .map_err(SqlError::SqlxError);
273
274        match query_result {
275            Ok(_) => Ok(()),
276            Err(e) => Err(e),
277        }
278    }
279
280    async fn update_drift_profile_status(
281        pool: &Pool<Postgres>,
282        params: &ProfileStatusRequest,
283    ) -> Result<(), SqlError> {
284        let query = Queries::UpdateDriftProfileStatus.get_query();
285
286        // convert drift_type to string or None
287        let query_result = sqlx::query(&query.sql)
288            .bind(params.active)
289            .bind(&params.name)
290            .bind(&params.space)
291            .bind(&params.version)
292            .bind(params.drift_type.as_ref().map(|t| t.to_string()))
293            .execute(pool)
294            .await
295            .map_err(SqlError::SqlxError);
296
297        match query_result {
298            Ok(_) => {
299                if params.deactivate_others {
300                    let query = Queries::DeactivateDriftProfiles.get_query();
301
302                    let query_result = sqlx::query(&query.sql)
303                        .bind(&params.name)
304                        .bind(&params.space)
305                        .bind(&params.version)
306                        .bind(params.drift_type.as_ref().map(|t| t.to_string()))
307                        .execute(pool)
308                        .await
309                        .map_err(SqlError::SqlxError);
310
311                    match query_result {
312                        Ok(_) => Ok(()),
313                        Err(e) => {
314                            error!("Failed to deactivate other drift profiles: {:?}", e);
315                            Err(e)
316                        }
317                    }
318                } else {
319                    Ok(())
320                }
321            }
322            Err(e) => {
323                error!("Failed to update drift profile status: {:?}", e);
324                Err(e)
325            }
326        }
327    }
328}