Skip to main content

scouter_sql/sql/traits/
profile.rs

1use crate::sql::query::Queries;
2use crate::sql::schema::TaskRequest;
3
4use crate::sql::error::SqlError;
5use crate::sql::schema::VersionResult;
6use async_trait::async_trait;
7use chrono::DateTime;
8use chrono::Utc;
9use cron::Schedule;
10use potato_head::create_uuid7;
11use scouter_semver::VersionArgs;
12use scouter_semver::{VersionParser, VersionValidator};
13use scouter_types::VersionRequest;
14use scouter_types::{
15    DriftProfile, ListProfilesRequest, ListedProfile, ProfileArgs, ProfileStatusRequest,
16};
17use semver::Version;
18use serde_json::Value;
19use sqlx::{postgres::PgQueryResult, Pool, Postgres, Row};
20use std::result::Result::Ok;
21use std::str::FromStr;
22use tracing::{error, instrument};
23
24pub fn add_version_bounds(builder: &mut String, version: &str) -> Result<(), SqlError> {
25    let version_bounds = VersionParser::get_version_to_search(version)?;
26
27    // construct lower bound (already validated)
28    builder.push_str(
29        format!(
30            " AND (major >= {} AND minor >= {} and patch >= {})",
31            version_bounds.lower_bound.major,
32            version_bounds.lower_bound.minor,
33            version_bounds.lower_bound.patch
34        )
35        .as_str(),
36    );
37
38    if !version_bounds.no_upper_bound {
39        // construct upper bound based on number of components
40        if version_bounds.num_parts == 1 {
41            builder
42                .push_str(format!(" AND (major < {})", version_bounds.upper_bound.major).as_str());
43        } else if version_bounds.num_parts == 2
44            || version_bounds.num_parts == 3 && version_bounds.parser_type == VersionParser::Tilde
45            || version_bounds.num_parts == 3 && version_bounds.parser_type == VersionParser::Caret
46        {
47            builder.push_str(
48                format!(
49                    " AND (major = {} AND minor < {})",
50                    version_bounds.upper_bound.major, version_bounds.upper_bound.minor
51                )
52                .as_str(),
53            );
54        } else {
55            builder.push_str(
56                format!(
57                    " AND (major = {} AND minor = {} AND patch < {})",
58                    version_bounds.upper_bound.major,
59                    version_bounds.upper_bound.minor,
60                    version_bounds.upper_bound.patch
61                )
62                .as_str(),
63            );
64        }
65    }
66    Ok(())
67}
68
69#[async_trait]
70pub trait ProfileSqlLogic {
71    /// Get profile versions
72    /// Determines the next version based on existing versions in the database
73    /// # Arguments
74    /// * `args` - The profile arguments containing space, name, and version
75    /// * `version_type` - The type of version bump (major, minor, patch)
76    /// * `pre_tag` - Optional pre-release tag
77    /// * `build_tag` - Optional build metadata
78    /// # Returns
79    /// * `Result<Version, SqlError>` - Result of the query returning
80    #[instrument(skip_all)]
81    async fn get_next_profile_version(
82        pool: &Pool<Postgres>,
83        args: &ProfileArgs,
84        version_request: VersionRequest,
85    ) -> Result<Version, SqlError> {
86        let mut version_query = Queries::GetProfileVersions.get_query().to_string();
87
88        if let Some(version) = &version_request.version {
89            add_version_bounds(&mut version_query, version)?;
90        }
91        version_query.push_str(" ORDER BY created_at DESC LIMIT 20;");
92
93        let cards: Vec<VersionResult> = sqlx::query_as(&version_query)
94            .bind(&args.space)
95            .bind(&args.name)
96            .bind(args.drift_type.to_string())
97            .fetch_all(pool)
98            .await?;
99
100        let versions = cards
101            .iter()
102            .map(|c| c.to_version())
103            .collect::<Result<Vec<Version>, SqlError>>()?;
104
105        // sort semvers
106        let versions = VersionValidator::sort_semver_versions(versions, true)?;
107
108        if versions.is_empty() {
109            return match &version_request.version {
110                Some(version_str) => Ok(VersionValidator::clean_version(version_str)?),
111                None => Ok(Version::new(0, 1, 0)),
112            };
113        }
114
115        let base_version = versions.first().unwrap().to_string();
116
117        let args = VersionArgs {
118            version: base_version,
119            version_type: version_request.version_type,
120            pre: version_request.pre_tag,
121            build: version_request.build_tag,
122        };
123
124        Ok(VersionValidator::bump_version(&args)?)
125    }
126    /// Insert a drift profile into the database
127    ///
128    /// # Arguments
129    ///
130    /// * `drift_profile` - The drift profile to insert
131    ///
132    /// # Returns
133    ///
134    /// * `Result<String, SqlError>` - Result of the query returning the entity_uid
135    #[instrument(skip_all)]
136    async fn insert_drift_profile(
137        pool: &Pool<Postgres>,
138        drift_profile: &DriftProfile,
139        base_args: &ProfileArgs,
140        version: &Version,
141        active: &bool,
142        deactivate_others: &bool,
143    ) -> Result<String, SqlError> {
144        let query = Queries::InsertDriftProfile.get_query();
145        let current_time = Utc::now();
146        let schedule = Schedule::from_str(&base_args.schedule)?;
147        let next_run = match schedule.upcoming(Utc).take(1).next() {
148            Some(next_run) => next_run,
149            None => {
150                return Err(SqlError::GetNextRunError);
151            }
152        };
153
154        // Need to convert version to postgres type
155        let major = version.major as i32;
156        let minor = version.minor as i32;
157        let patch = version.patch as i32;
158        let pre: Option<String> = version.pre.to_string().parse().ok();
159        let build: Option<String> = version.build.to_string().parse().ok();
160
161        let result = sqlx::query(query)
162            // 1. Entity UID (for entity_insert) -> $1
163            .bind(create_uuid7())
164            // 2-3. Entity Identity (for entity_insert & deactivation logic) -> $2, $3
165            .bind(&base_args.space)
166            .bind(&base_args.name)
167            // 4-8. Version Components (for drift_profile insert) -> $4 to $8
168            .bind(major)
169            .bind(minor)
170            .bind(patch)
171            .bind(pre)
172            .bind(build)
173            // 9-11. String/JSON values -> $9 to $11
174            .bind(version.to_string()) // Full Version String
175            .bind(&base_args.scouter_version)
176            .bind(drift_profile.to_value()) // Profile JSON
177            // 12. Drift Type (for entity_insert & deactivation logic) -> $12
178            .bind(base_args.drift_type.to_string())
179            // 13. Active Flag (for deactivation logic & drift_profile insert) -> $13
180            .bind(active)
181            // 14. Schedule -> $14
182            .bind(&base_args.schedule)
183            // 15. Next Run -> $15
184            .bind(next_run)
185            // 16. Current Time (for previous_run/timestamps) -> $16
186            .bind(current_time)
187            // 17. Deactivate Others Flag (for deactivation logic) -> $17
188            .bind(deactivate_others)
189            .fetch_one(pool)
190            .await
191            .map_err(SqlError::SqlxError)?;
192
193        let entity_uid: String = result.get("entity_uid");
194        Ok(entity_uid)
195    }
196
197    /// Update a drift profile in the database
198    ///
199    /// # Arguments
200    ///
201    /// * `drift_profile` - The drift profile to update
202    ///
203    /// # Returns
204    ///
205    /// * `Result<PgQueryResult, SqlError>` - Result of the query
206    async fn update_drift_profile(
207        pool: &Pool<Postgres>,
208        drift_profile: &DriftProfile,
209        entity_id: &i32,
210    ) -> Result<PgQueryResult, SqlError> {
211        let query = Queries::UpdateDriftProfile.get_query();
212
213        sqlx::query(query)
214            .bind(drift_profile.to_value())
215            .bind(drift_profile.drift_type().to_string())
216            .bind(entity_id)
217            .execute(pool)
218            .await
219            .map_err(SqlError::SqlxError)
220    }
221
222    /// Get a drift profile from the database
223    ///
224    /// # Arguments
225    ///
226    /// * `request` - The request to get the profile for
227    ///
228    /// # Returns
229    async fn get_drift_profile(
230        pool: &Pool<Postgres>,
231        entity_id: &i32,
232    ) -> Result<Option<Value>, SqlError> {
233        let query = Queries::GetDriftProfile.get_query();
234
235        let result = sqlx::query(query)
236            .bind(entity_id)
237            .fetch_optional(pool)
238            .await
239            .map_err(SqlError::SqlxError)?;
240
241        match result {
242            Some(result) => {
243                let profile: Value = result.get("profile");
244                Ok(Some(profile))
245            }
246            None => Ok(None),
247        }
248    }
249
250    async fn get_drift_profile_task(
251        pool: &Pool<Postgres>,
252    ) -> Result<Option<TaskRequest>, SqlError> {
253        let query = Queries::GetDriftTask.get_query();
254        sqlx::query_as(query)
255            .fetch_optional(pool)
256            .await
257            .map_err(|e| {
258                error!("Failed to get drift profile task: {:?}", e);
259                SqlError::SqlxError(e)
260            })
261    }
262
263    async fn list_drift_profiles(
264        pool: &Pool<Postgres>,
265        args: &ListProfilesRequest,
266    ) -> Result<Vec<ListedProfile>, SqlError> {
267        let profile_query = Queries::ListDriftProfiles.get_query();
268
269        let records: Vec<(bool, Value)> = sqlx::query_as(profile_query)
270            .bind(&args.space)
271            .bind(&args.name)
272            .bind(&args.version)
273            .fetch_all(pool)
274            .await
275            .map_err(SqlError::SqlxError)?;
276
277        let listed_profiles: Vec<ListedProfile> = records
278            .into_iter()
279            .map(|(active, value)| -> Result<ListedProfile, SqlError> {
280                Ok(ListedProfile {
281                    profile: DriftProfile::from_value(value)?,
282                    active,
283                })
284            })
285            .collect::<Result<Vec<_>, _>>()?;
286
287        Ok(listed_profiles)
288    }
289
290    /// Update the drift profile run dates in the database
291    /// Next run is calculated based on the provided schedule and previous run date
292    ///
293    /// # Arguments
294    ///
295    /// * `transaction` - The database transaction
296    /// * `service_info` - The service info to update the run dates for
297    /// * `schedule` - The schedule to update the run dates with
298    /// * `previous_run` - The previous run datetime
299    ///
300    /// # Returns
301    ///
302    /// * `Result<(), SqlError>` - Result of the query
303    #[instrument(skip_all)]
304    async fn update_drift_profile_run_dates(
305        pool: &Pool<Postgres>,
306        entity_id: &i32,
307        schedule: &str,
308        previous_run: &DateTime<Utc>,
309    ) -> Result<(), SqlError> {
310        let query = Queries::UpdateDriftProfileRunDates.get_query();
311
312        let schedule = Schedule::from_str(schedule)?;
313
314        let next_run = match schedule.after(previous_run).next() {
315            Some(next_run) => next_run,
316            None => {
317                error!("Failed to calculate next run for schedule: {}", schedule);
318                return Err(SqlError::GetNextRunError);
319            }
320        };
321
322        let query_result = sqlx::query(query)
323            .bind(next_run)
324            .bind(entity_id)
325            .execute(pool)
326            .await
327            .map_err(SqlError::SqlxError);
328
329        match query_result {
330            Ok(_) => Ok(()),
331            Err(e) => Err(e),
332        }
333    }
334
335    async fn update_drift_profile_status(
336        pool: &Pool<Postgres>,
337        params: &ProfileStatusRequest,
338    ) -> Result<(), SqlError> {
339        let query = Queries::UpdateDriftProfileStatus.get_query();
340
341        // convert drift_type to string or None
342        let query_result = sqlx::query(query)
343            .bind(params.active)
344            .bind(&params.name)
345            .bind(&params.space)
346            .bind(&params.version)
347            .bind(params.drift_type.as_ref().map(|t| t.to_string()))
348            .execute(pool)
349            .await
350            .map_err(SqlError::SqlxError);
351
352        match query_result {
353            Ok(_) => {
354                if params.deactivate_others {
355                    let query = Queries::DeactivateDriftProfiles.get_query();
356
357                    let query_result = sqlx::query(query)
358                        .bind(&params.name)
359                        .bind(&params.space)
360                        .bind(&params.version)
361                        .bind(params.drift_type.as_ref().map(|t| t.to_string()))
362                        .execute(pool)
363                        .await
364                        .map_err(SqlError::SqlxError);
365
366                    match query_result {
367                        Ok(_) => Ok(()),
368                        Err(e) => {
369                            error!("Failed to deactivate other drift profiles: {:?}", e);
370                            Err(e)
371                        }
372                    }
373                } else {
374                    Ok(())
375                }
376            }
377            Err(e) => {
378                error!("Failed to update drift profile status: {:?}", e);
379                Err(e)
380            }
381        }
382    }
383}