scouter_sql/sql/traits/
archive.rs1use crate::sql::query::Queries;
2use crate::sql::schema::Entity;
3
4use crate::sql::utils::pg_rows_to_server_records;
5use chrono::{DateTime, Utc};
6
7use crate::sql::error::SqlError;
8use scouter_types::{RecordType, ServerRecords};
9
10use sqlx::{Pool, Postgres};
11
12use std::result::Result::Ok;
13
14use async_trait::async_trait;
15
16#[async_trait]
17pub trait ArchiveSqlLogic {
18 async fn get_entities_to_archive(
25 pool: &Pool<Postgres>,
26 record_type: &RecordType,
27 retention_period: &i32,
28 ) -> Result<Vec<Entity>, SqlError> {
29 let query = match record_type {
30 RecordType::Spc => Queries::GetSpcEntities.get_query(),
31 RecordType::Psi => Queries::GetBinCountEntities.get_query(),
32 RecordType::Custom => Queries::GetCustomEntities.get_query(),
33 RecordType::LLMDrift => Queries::GetLLMDriftRecordEntitiesForArchive.get_query(),
34 RecordType::LLMMetric => Queries::GetLLMMetricEntitiesForArchive.get_query(),
35 _ => {
36 return Err(SqlError::InvalidRecordTypeError(record_type.to_string()));
37 }
38 };
39
40 let entities: Vec<Entity> = sqlx::query_as(&query.sql)
41 .bind(retention_period)
42 .fetch_all(pool)
43 .await?;
44
45 Ok(entities)
46 }
47
48 async fn get_data_to_archive(
60 space: &str,
61 name: &str,
62 version: &str,
63 begin_timestamp: &DateTime<Utc>,
64 end_timestamp: &DateTime<Utc>,
65 record_type: &RecordType,
66 db_pool: &Pool<Postgres>,
67 ) -> Result<ServerRecords, SqlError> {
68 let query = match record_type {
69 RecordType::Spc => Queries::GetSpcDataForArchive.get_query(),
70 RecordType::Psi => Queries::GetBinCountDataForArchive.get_query(),
71 RecordType::Custom => Queries::GetCustomDataForArchive.get_query(),
72 RecordType::LLMDrift => Queries::GetLLMDriftRecordDataForArchive.get_query(),
73 RecordType::LLMMetric => Queries::GetLLMMetricDataForArchive.get_query(),
74 _ => {
75 return Err(SqlError::InvalidRecordTypeError(record_type.to_string()));
76 }
77 };
78 let rows = sqlx::query(&query.sql)
79 .bind(begin_timestamp)
80 .bind(end_timestamp)
81 .bind(space)
82 .bind(name)
83 .bind(version)
84 .fetch_all(db_pool)
85 .await
86 .map_err(SqlError::SqlxError)?;
87
88 pg_rows_to_server_records(&rows, record_type)
90 }
91
92 async fn update_data_to_archived(
93 space: &str,
94 name: &str,
95 version: &str,
96 begin_timestamp: &DateTime<Utc>,
97 end_timestamp: &DateTime<Utc>,
98 record_type: &RecordType,
99 db_pool: &Pool<Postgres>,
100 ) -> Result<(), SqlError> {
101 let query = match record_type {
102 RecordType::Spc => Queries::UpdateSpcEntities.get_query(),
103 RecordType::Psi => Queries::UpdateBinCountEntities.get_query(),
104 RecordType::Custom => Queries::UpdateCustomEntities.get_query(),
105 RecordType::LLMDrift => Queries::UpdateLLMDriftEntities.get_query(),
106 RecordType::LLMMetric => Queries::UpdateLLMMetricEntities.get_query(),
107 _ => {
108 return Err(SqlError::InvalidRecordTypeError(record_type.to_string()));
109 }
110 };
111 sqlx::query(&query.sql)
112 .bind(begin_timestamp)
113 .bind(end_timestamp)
114 .bind(space)
115 .bind(name)
116 .bind(version)
117 .execute(db_pool)
118 .await
119 .map_err(SqlError::SqlxError)?;
120
121 Ok(())
122 }
123}