use crate::sql::query::Queries;
use crate::sql::schema::TaskRequest;
use chrono::Utc;
use cron::Schedule;
use crate::sql::error::SqlError;
use async_trait::async_trait;
use scouter_types::{DriftProfile, DriftTaskInfo, GetProfileRequest, ProfileStatusRequest};
use serde_json::Value;
use sqlx::{postgres::PgQueryResult, Pool, Postgres, Row};
use std::result::Result::Ok;
use std::str::FromStr;
use tracing::{error, instrument};
#[async_trait]
pub trait ProfileSqlLogic {
#[instrument(skip_all)]
async fn insert_drift_profile(
pool: &Pool<Postgres>,
drift_profile: &DriftProfile,
) -> Result<PgQueryResult, SqlError> {
let query = Queries::InsertDriftProfile.get_query();
let base_args = drift_profile.get_base_args();
let current_time = Utc::now();
let schedule = Schedule::from_str(&base_args.schedule)?;
let next_run = match schedule.upcoming(Utc).take(1).next() {
Some(next_run) => next_run,
None => {
return Err(SqlError::GetNextRunError);
}
};
sqlx::query(&query.sql)
.bind(base_args.name)
.bind(base_args.space)
.bind(base_args.version)
.bind(base_args.scouter_version)
.bind(drift_profile.to_value())
.bind(base_args.drift_type.to_string())
.bind(false)
.bind(base_args.schedule)
.bind(next_run)
.bind(current_time)
.execute(pool)
.await
.map_err(SqlError::SqlxError)
}
async fn update_drift_profile(
pool: &Pool<Postgres>,
drift_profile: &DriftProfile,
) -> Result<PgQueryResult, SqlError> {
let query = Queries::UpdateDriftProfile.get_query();
let base_args = drift_profile.get_base_args();
sqlx::query(&query.sql)
.bind(drift_profile.to_value())
.bind(base_args.drift_type.to_string())
.bind(base_args.name)
.bind(base_args.space)
.bind(base_args.version)
.execute(pool)
.await
.map_err(SqlError::SqlxError)
}
async fn get_drift_profile(
pool: &Pool<Postgres>,
request: &GetProfileRequest,
) -> Result<Option<Value>, SqlError> {
let query = Queries::GetDriftProfile.get_query();
let result = sqlx::query(&query.sql)
.bind(&request.name)
.bind(&request.space)
.bind(&request.version)
.bind(request.drift_type.to_string())
.fetch_optional(pool)
.await
.map_err(SqlError::SqlxError)?;
match result {
Some(result) => {
let profile: Value = result.get("profile");
Ok(Some(profile))
}
None => Ok(None),
}
}
async fn get_drift_profile_task(
pool: &Pool<Postgres>,
) -> Result<Option<TaskRequest>, SqlError> {
let query = Queries::GetDriftTask.get_query();
sqlx::query_as(&query.sql)
.fetch_optional(pool)
.await
.map_err(SqlError::SqlxError)
}
#[instrument(skip_all)]
async fn update_drift_profile_run_dates(
pool: &Pool<Postgres>,
task_info: &DriftTaskInfo,
schedule: &str,
) -> Result<(), SqlError> {
let query = Queries::UpdateDriftProfileRunDates.get_query();
let schedule = Schedule::from_str(schedule)?;
let next_run = match schedule.upcoming(Utc).take(1).next() {
Some(next_run) => next_run,
None => {
return Err(SqlError::GetNextRunError);
}
};
let query_result = sqlx::query(&query.sql)
.bind(next_run)
.bind(&task_info.uid)
.execute(pool)
.await
.map_err(SqlError::SqlxError);
match query_result {
Ok(_) => Ok(()),
Err(e) => Err(e),
}
}
async fn update_drift_profile_status(
pool: &Pool<Postgres>,
params: &ProfileStatusRequest,
) -> Result<(), SqlError> {
let query = Queries::UpdateDriftProfileStatus.get_query();
let query_result = sqlx::query(&query.sql)
.bind(params.active)
.bind(¶ms.name)
.bind(¶ms.space)
.bind(¶ms.version)
.bind(params.drift_type.as_ref().map(|t| t.to_string()))
.execute(pool)
.await
.map_err(SqlError::SqlxError);
match query_result {
Ok(_) => {
if params.deactivate_others {
let query = Queries::DeactivateDriftProfiles.get_query();
let query_result = sqlx::query(&query.sql)
.bind(¶ms.name)
.bind(¶ms.space)
.bind(¶ms.version)
.bind(params.drift_type.as_ref().map(|t| t.to_string()))
.execute(pool)
.await
.map_err(SqlError::SqlxError);
match query_result {
Ok(_) => Ok(()),
Err(e) => {
error!("Failed to deactivate other drift profiles: {:?}", e);
Err(e)
}
}
} else {
Ok(())
}
}
Err(e) => {
error!("Failed to update drift profile status: {:?}", e);
Err(e)
}
}
}
}