scouter_sql/sql/traits/
profile.rs

1use crate::sql::query::Queries;
2use crate::sql::schema::TaskRequest;
3
4use chrono::Utc;
5use cron::Schedule;
6
7use crate::sql::error::SqlError;
8use async_trait::async_trait;
9use scouter_types::{DriftProfile, DriftTaskInfo, GetProfileRequest, ProfileStatusRequest};
10use serde_json::Value;
11use sqlx::{postgres::PgQueryResult, Pool, Postgres, Row};
12use std::result::Result::Ok;
13use std::str::FromStr;
14use tracing::{error, instrument};
15
16#[async_trait]
17pub trait ProfileSqlLogic {
18    /// Insert a drift profile into the database
19    ///
20    /// # Arguments
21    ///
22    /// * `drift_profile` - The drift profile to insert
23    ///
24    /// # Returns
25    ///
26    /// * `Result<PgQueryResult, SqlError>` - Result of the query
27    #[instrument(skip_all)]
28    async fn insert_drift_profile(
29        pool: &Pool<Postgres>,
30        drift_profile: &DriftProfile,
31    ) -> Result<PgQueryResult, SqlError> {
32        let query = Queries::InsertDriftProfile.get_query();
33        let base_args = drift_profile.get_base_args();
34
35        let current_time = Utc::now();
36
37        let schedule = Schedule::from_str(&base_args.schedule)?;
38
39        let next_run = match schedule.upcoming(Utc).take(1).next() {
40            Some(next_run) => next_run,
41            None => {
42                return Err(SqlError::GetNextRunError);
43            }
44        };
45
46        sqlx::query(&query.sql)
47            .bind(base_args.name)
48            .bind(base_args.space)
49            .bind(base_args.version)
50            .bind(base_args.scouter_version)
51            .bind(drift_profile.to_value())
52            .bind(base_args.drift_type.to_string())
53            .bind(false)
54            .bind(base_args.schedule)
55            .bind(next_run)
56            .bind(current_time)
57            .execute(pool)
58            .await
59            .map_err(SqlError::SqlxError)
60    }
61
62    /// Update a drift profile in the database
63    ///
64    /// # Arguments
65    ///
66    /// * `drift_profile` - The drift profile to update
67    ///
68    /// # Returns
69    ///
70    /// * `Result<PgQueryResult, SqlError>` - Result of the query
71    async fn update_drift_profile(
72        pool: &Pool<Postgres>,
73        drift_profile: &DriftProfile,
74    ) -> Result<PgQueryResult, SqlError> {
75        let query = Queries::UpdateDriftProfile.get_query();
76        let base_args = drift_profile.get_base_args();
77
78        sqlx::query(&query.sql)
79            .bind(drift_profile.to_value())
80            .bind(base_args.drift_type.to_string())
81            .bind(base_args.name)
82            .bind(base_args.space)
83            .bind(base_args.version)
84            .execute(pool)
85            .await
86            .map_err(SqlError::SqlxError)
87    }
88
89    /// Get a drift profile from the database
90    ///
91    /// # Arguments
92    ///
93    /// * `request` - The request to get the profile for
94    ///
95    /// # Returns
96    async fn get_drift_profile(
97        pool: &Pool<Postgres>,
98        request: &GetProfileRequest,
99    ) -> Result<Option<Value>, SqlError> {
100        let query = Queries::GetDriftProfile.get_query();
101
102        let result = sqlx::query(&query.sql)
103            .bind(&request.name)
104            .bind(&request.space)
105            .bind(&request.version)
106            .bind(request.drift_type.to_string())
107            .fetch_optional(pool)
108            .await
109            .map_err(SqlError::SqlxError)?;
110
111        match result {
112            Some(result) => {
113                let profile: Value = result.get("profile");
114                Ok(Some(profile))
115            }
116            None => Ok(None),
117        }
118    }
119
120    async fn get_drift_profile_task(
121        pool: &Pool<Postgres>,
122    ) -> Result<Option<TaskRequest>, SqlError> {
123        let query = Queries::GetDriftTask.get_query();
124        sqlx::query_as(&query.sql)
125            .fetch_optional(pool)
126            .await
127            .map_err(SqlError::SqlxError)
128    }
129
130    /// Update the drift profile run dates in the database
131    ///
132    /// # Arguments
133    ///
134    /// * `transaction` - The database transaction
135    /// * `service_info` - The service info to update the run dates for
136    /// * `schedule` - The schedule to update the run dates with
137    ///
138    /// # Returns
139    ///
140    /// * `Result<(), SqlError>` - Result of the query
141    #[instrument(skip_all)]
142    async fn update_drift_profile_run_dates(
143        pool: &Pool<Postgres>,
144        task_info: &DriftTaskInfo,
145        schedule: &str,
146    ) -> Result<(), SqlError> {
147        let query = Queries::UpdateDriftProfileRunDates.get_query();
148
149        let schedule = Schedule::from_str(schedule)?;
150
151        let next_run = match schedule.upcoming(Utc).take(1).next() {
152            Some(next_run) => next_run,
153            None => {
154                return Err(SqlError::GetNextRunError);
155            }
156        };
157
158        let query_result = sqlx::query(&query.sql)
159            .bind(next_run)
160            .bind(&task_info.uid)
161            .execute(pool)
162            .await
163            .map_err(SqlError::SqlxError);
164
165        match query_result {
166            Ok(_) => Ok(()),
167            Err(e) => Err(e),
168        }
169    }
170
171    async fn update_drift_profile_status(
172        pool: &Pool<Postgres>,
173        params: &ProfileStatusRequest,
174    ) -> Result<(), SqlError> {
175        let query = Queries::UpdateDriftProfileStatus.get_query();
176
177        // convert drift_type to string or None
178        let query_result = sqlx::query(&query.sql)
179            .bind(params.active)
180            .bind(&params.name)
181            .bind(&params.space)
182            .bind(&params.version)
183            .bind(params.drift_type.as_ref().map(|t| t.to_string()))
184            .execute(pool)
185            .await
186            .map_err(SqlError::SqlxError);
187
188        match query_result {
189            Ok(_) => {
190                if params.deactivate_others {
191                    let query = Queries::DeactivateDriftProfiles.get_query();
192
193                    let query_result = sqlx::query(&query.sql)
194                        .bind(&params.name)
195                        .bind(&params.space)
196                        .bind(&params.version)
197                        .bind(params.drift_type.as_ref().map(|t| t.to_string()))
198                        .execute(pool)
199                        .await
200                        .map_err(SqlError::SqlxError);
201
202                    match query_result {
203                        Ok(_) => Ok(()),
204                        Err(e) => {
205                            error!("Failed to deactivate other drift profiles: {:?}", e);
206                            Err(e)
207                        }
208                    }
209                } else {
210                    Ok(())
211                }
212            }
213            Err(e) => {
214                error!("Failed to update drift profile status: {:?}", e);
215                Err(e)
216            }
217        }
218    }
219}