use crate::sql::query::{
GetBinnedFeatureValuesParams, GetFeatureValuesParams, GetFeaturesParams,
InsertMonitorProfileParams, InsertParams, Queries,
};
use crate::sql::schema::{DriftRecord, FeatureResult, MonitorProfile, QueryResult};
use anyhow::*;
use futures::future::join_all;
use include_dir::{include_dir, Dir};
use sqlx::{
postgres::{PgQueryResult, PgRow},
Pool, Postgres, QueryBuilder, Row,
};
use chrono::Utc;
use cron::Schedule;
use std::collections::BTreeMap;
use std::result::Result::Ok;
use std::str::FromStr;
use tracing::error;
static _MIGRATIONS: Dir = include_dir!("migrations");
pub enum TimeInterval {
FiveMinutes,
FifteenMinutes,
ThirtyMinutes,
OneHour,
ThreeHours,
SixHours,
TwelveHours,
TwentyFourHours,
TwoDays,
FiveDays,
}
impl TimeInterval {
pub fn to_minutes(&self) -> i32 {
match self {
TimeInterval::FiveMinutes => 5,
TimeInterval::FifteenMinutes => 15,
TimeInterval::ThirtyMinutes => 30,
TimeInterval::OneHour => 60,
TimeInterval::ThreeHours => 180,
TimeInterval::SixHours => 360,
TimeInterval::TwelveHours => 720,
TimeInterval::TwentyFourHours => 1440,
TimeInterval::TwoDays => 2880,
TimeInterval::FiveDays => 7200,
}
}
pub fn from_string(time_window: &str) -> TimeInterval {
match time_window {
"5minute" => TimeInterval::FiveMinutes,
"15minute" => TimeInterval::FifteenMinutes,
"30minute" => TimeInterval::ThirtyMinutes,
"1hour" => TimeInterval::OneHour,
"3hour" => TimeInterval::ThreeHours,
"6hour" => TimeInterval::SixHours,
"12hour" => TimeInterval::TwelveHours,
"24hour" => TimeInterval::TwentyFourHours,
"2day" => TimeInterval::TwoDays,
"5day" => TimeInterval::FiveDays,
_ => TimeInterval::SixHours,
}
}
}
#[derive(Debug, Clone)]
#[allow(dead_code)]
pub struct PostgresClient {
pub pool: Pool<Postgres>,
qualified_table_name: String,
queue_table_name: String,
profile_table_name: String,
}
impl PostgresClient {
pub fn new(pool: Pool<Postgres>) -> Result<Self, anyhow::Error> {
Ok(Self {
pool,
qualified_table_name: "scouter.drift".to_string(),
queue_table_name: "scouter.drift_queue".to_string(),
profile_table_name: "scouter.drift_profile".to_string(),
})
}
pub async fn insert_drift_record(
&self,
record: &DriftRecord,
) -> Result<PgQueryResult, anyhow::Error> {
let query = Queries::InsertDriftRecord.get_query();
let params = InsertParams {
table: self.qualified_table_name.to_string(),
created_at: record.created_at,
name: record.name.clone(),
repository: record.repository.clone(),
feature: record.feature.clone(),
value: record.value.to_string(),
version: record.version.clone(),
};
let query_result: std::prelude::v1::Result<sqlx::postgres::PgQueryResult, sqlx::Error> =
sqlx::raw_sql(query.format(¶ms).as_str())
.execute(&self.pool)
.await;
match query_result {
Ok(result) => Ok(result),
Err(e) => {
error!("Failed to insert record into database: {:?}", e);
Err(anyhow!("Failed to insert record into database: {:?}", e))
}
}
}
pub async fn insert_drift_profile(
&self,
monitor_profile: &MonitorProfile,
) -> Result<PgQueryResult, anyhow::Error> {
let query = Queries::InsertMonitorProfile.get_query();
let cron = Schedule::from_str(&monitor_profile.config.cron).with_context(|| {
format!(
"Failed to parse cron expression: {}",
&monitor_profile.config.cron
)
})?;
let next_run = cron.upcoming(Utc).take(1).next().with_context(|| {
format!(
"Failed to get next run time for cron expression: {}",
&monitor_profile.config.cron
)
})?;
let params = InsertMonitorProfileParams {
table: "scouter.drift_profile".to_string(),
name: monitor_profile.config.name.clone(),
repository: monitor_profile.config.repository.clone(),
version: monitor_profile.config.version.clone(),
profile: serde_json::to_string(&monitor_profile).unwrap(),
cron: monitor_profile.config.cron.clone(),
next_run: next_run.naive_utc(),
};
let query_result: std::prelude::v1::Result<sqlx::postgres::PgQueryResult, sqlx::Error> =
sqlx::raw_sql(query.format(¶ms).as_str())
.execute(&self.pool)
.await;
match query_result {
Ok(result) => Ok(result),
Err(e) => {
error!("Failed to insert record into database: {:?}", e);
Err(anyhow!("Failed to insert record into database: {:?}", e))
}
}
}
#[allow(dead_code)]
pub async fn insert_drift_records(
&self,
records: &[DriftRecord],
) -> Result<PgQueryResult, anyhow::Error> {
let insert_statement = format!(
"INSERT INTO {} (created_at, name, repository, version, feature, value)",
self.qualified_table_name
);
let mut query_builder = QueryBuilder::new(insert_statement);
query_builder.push_values(records.iter(), |mut b, record| {
b.push_bind(record.created_at)
.push_bind(&record.name)
.push_bind(&record.repository)
.push_bind(&record.version)
.push_bind(&record.feature)
.push_bind(record.value);
});
let query = query_builder.build();
let query_result = query.execute(&self.pool).await;
match query_result {
Ok(result) => Ok(result),
Err(e) => {
error!("Failed to insert record into database: {:?}", e);
Err(anyhow!("Failed to insert record into database: {:?}", e))
}
}
}
async fn get_features(
&self,
name: &str,
repository: &str,
version: &str,
) -> Result<Vec<String>, anyhow::Error> {
let query = Queries::GetFeatures.get_query();
let params = GetFeaturesParams {
table: self.qualified_table_name.to_string(),
name: name.to_string(),
repository: repository.to_string(),
version: version.to_string(),
};
let result = sqlx::raw_sql(query.format(¶ms).as_str())
.fetch_all(&self.pool)
.await?;
let mut features = Vec::new();
for row in result {
features.push(row.get("feature"));
}
Ok(features)
}
#[allow(dead_code)]
async fn run_feature_query(
&self,
feature: &str,
name: &str,
repository: &str,
version: &str,
limit_timestamp: &str,
) -> Result<Vec<PgRow>, anyhow::Error> {
let query = Queries::GetFeatureValues.get_query();
let params = GetFeatureValuesParams {
table: self.qualified_table_name.to_string(),
name: name.to_string(),
repository: repository.to_string(),
version: version.to_string(),
feature: feature.to_string(),
limit_timestamp: limit_timestamp.to_string(),
};
let result = sqlx::raw_sql(query.format(¶ms).as_str())
.fetch_all(&self.pool)
.await;
match result {
Ok(result) => Ok(result),
Err(e) => {
error!("Failed to run query: {:?}", e);
Err(anyhow!("Failed to run query: {:?}", e))
}
}
}
async fn run_binned_feature_query(
&self,
bin: &f64,
feature: String,
version: &str,
time_window: &i32,
name: &str,
repository: &str,
) -> Result<Vec<PgRow>, anyhow::Error> {
let query = Queries::GetBinnedFeatureValues.get_query();
let params = GetBinnedFeatureValuesParams {
table: self.qualified_table_name.to_string(),
name: name.to_string(),
repository: repository.to_string(),
feature,
version: version.to_string(),
time_window: time_window.to_string(),
bin: bin.to_string(),
};
let result = sqlx::raw_sql(query.format(¶ms).as_str())
.fetch_all(&self.pool)
.await;
match result {
Ok(result) => Ok(result),
Err(e) => {
error!("Failed to run query: {:?}", e);
Err(anyhow!("Failed to run query: {:?}", e))
}
}
}
pub async fn get_binned_drift_records(
&self,
name: &str,
repository: &str,
version: &str,
max_data_points: &i32,
time_window: &i32,
) -> Result<QueryResult, anyhow::Error> {
let features = self.get_features(name, repository, version).await?;
let bin = *time_window as f64 / *max_data_points as f64;
let async_queries = features
.iter()
.map(|feature| {
self.run_binned_feature_query(
&bin,
feature.to_string(),
version,
time_window,
name,
repository,
)
})
.collect::<Vec<_>>();
let query_results = join_all(async_queries).await;
let mut query_result = QueryResult {
features: BTreeMap::new(),
};
for data in query_results {
match data {
Ok(data) => {
if data.is_empty() {
continue;
}
let feature_name = data[0].get("feature");
let mut created_at = Vec::new();
let mut values = Vec::new();
for row in data {
created_at.push(row.get("created_at"));
values.push(row.get("value"));
}
query_result
.features
.insert(feature_name, FeatureResult { created_at, values });
}
Err(e) => {
error!("Failed to run query: {:?}", e);
return Err(anyhow!("Failed to run query: {:?}", e));
}
}
}
Ok(query_result)
}
#[allow(dead_code)]
pub async fn get_drift_records(
&self,
name: &str,
repository: &str,
version: &str,
limit_timestamp: &str,
) -> Result<QueryResult, anyhow::Error> {
let features = self.get_features(name, repository, version).await?;
let async_queries = features
.iter()
.map(|feature| {
self.run_feature_query(feature, name, repository, version, limit_timestamp)
})
.collect::<Vec<_>>();
let query_results = join_all(async_queries).await;
let mut query_result = QueryResult {
features: BTreeMap::new(),
};
for data in query_results {
match data {
Ok(data) => {
if data.is_empty() {
continue;
}
let feature_name = data[0].get("feature");
let mut created_at = Vec::new();
let mut values = Vec::new();
for row in data {
created_at.push(row.get("created_at"));
values.push(row.get("value"));
}
query_result
.features
.insert(feature_name, FeatureResult { created_at, values });
}
Err(e) => {
error!("Failed to run query: {:?}", e);
return Err(anyhow!("Failed to run query: {:?}", e));
}
}
}
Ok(query_result)
}
#[allow(dead_code)]
pub async fn raw_query(&self, query: &str) -> Result<Vec<PgRow>, anyhow::Error> {
let result = sqlx::raw_sql(query).fetch_all(&self.pool).await;
match result {
Ok(result) => {
Ok(result)
}
Err(e) => {
error!("Failed to run query: {:?}", e);
Err(anyhow!("Failed to run query: {:?}", e))
}
}
}
}
#[cfg(test)]
mod tests {
use crate::api::setup::create_db_pool;
use super::*;
use std::env;
use tokio;
#[tokio::test]
async fn test_client() {
env::set_var(
"DATABASE_URL",
"postgresql://postgres:admin@localhost:5432/monitor?",
);
let pool = create_db_pool(None)
.await
.with_context(|| "Failed to create Postgres client")
.unwrap();
PostgresClient::new(pool).unwrap();
}
#[test]
fn test_time_interval() {
assert_eq!(TimeInterval::FiveMinutes.to_minutes(), 5);
assert_eq!(TimeInterval::FifteenMinutes.to_minutes(), 15);
assert_eq!(TimeInterval::ThirtyMinutes.to_minutes(), 30);
assert_eq!(TimeInterval::OneHour.to_minutes(), 60);
assert_eq!(TimeInterval::ThreeHours.to_minutes(), 180);
assert_eq!(TimeInterval::SixHours.to_minutes(), 360);
assert_eq!(TimeInterval::TwelveHours.to_minutes(), 720);
assert_eq!(TimeInterval::TwentyFourHours.to_minutes(), 1440);
assert_eq!(TimeInterval::TwoDays.to_minutes(), 2880);
assert_eq!(TimeInterval::FiveDays.to_minutes(), 7200);
}
}