scouter_sql/sql/traits/
archive.rs1use crate::sql::schema::Entity;
2use crate::sql::{query::Queries, utils::parse_pg_rows};
3use chrono::{DateTime, Utc};
4
5use crate::sql::error::SqlError;
6use scouter_types::{RecordType, ServerRecords};
7use sqlx::{Pool, Postgres};
8
9use std::result::Result::Ok;
10
11use async_trait::async_trait;
12
13#[async_trait]
14pub trait ArchiveSqlLogic {
15 async fn get_entities_to_archive(
22 pool: &Pool<Postgres>,
23 record_type: &RecordType,
24 retention_period: &i32,
25 ) -> Result<Vec<Entity>, SqlError> {
26 let query = match record_type {
27 RecordType::Spc => Queries::GetSpcEntities.get_query(),
28 RecordType::Psi => Queries::GetBinCountEntities.get_query(),
29 RecordType::Custom => Queries::GetCustomEntities.get_query(),
30 RecordType::GenAIEval => Queries::GetGenAIEvalRecordEntitiesForArchive.get_query(),
31 RecordType::GenAITask => Queries::GetGenAIEvalTaskResultEntitiesForArchive.get_query(),
32 RecordType::GenAIWorkflow => {
33 Queries::GetGenAIEvalWorkflowEntitiesForArchive.get_query()
34 }
35 _ => {
36 return Err(SqlError::InvalidRecordTypeError(record_type.to_string()));
37 }
38 };
39
40 let entities: Vec<Entity> = sqlx::query_as(query)
41 .bind(retention_period)
42 .fetch_all(pool)
43 .await?;
44
45 Ok(entities)
46 }
47
48 async fn get_data_to_archive(
60 entity_id: &i32,
61 begin_timestamp: &DateTime<Utc>,
62 end_timestamp: &DateTime<Utc>,
63 record_type: &RecordType,
64 db_pool: &Pool<Postgres>,
65 ) -> Result<ServerRecords, SqlError> {
66 let query = match record_type {
67 RecordType::Spc => Queries::GetSpcDataForArchive.get_query(),
68 RecordType::Psi => Queries::GetBinCountDataForArchive.get_query(),
69 RecordType::Custom => Queries::GetCustomDataForArchive.get_query(),
70 RecordType::GenAIEval => Queries::GetGenAIEvalRecordDataForArchive.get_query(),
71 RecordType::GenAITask => Queries::GetGenAITaskResultDataForArchive.get_query(),
72 RecordType::GenAIWorkflow => Queries::GetGenAIWorkflowResultDataForArchive.get_query(),
73 _ => {
74 return Err(SqlError::InvalidRecordTypeError(record_type.to_string()));
75 }
76 };
77 let rows = sqlx::query(query)
78 .bind(begin_timestamp)
79 .bind(end_timestamp)
80 .bind(entity_id)
81 .fetch_all(db_pool)
82 .await
83 .map_err(SqlError::SqlxError)?;
84
85 parse_pg_rows(&rows, record_type)
87 }
88
89 async fn update_data_to_archived(
90 entity_id: &i32,
91 begin_timestamp: &DateTime<Utc>,
92 end_timestamp: &DateTime<Utc>,
93 record_type: &RecordType,
94 db_pool: &Pool<Postgres>,
95 ) -> Result<(), SqlError> {
96 let query = match record_type {
97 RecordType::Spc => Queries::UpdateSpcEntities.get_query(),
98 RecordType::Psi => Queries::UpdateBinCountEntities.get_query(),
99 RecordType::Custom => Queries::UpdateCustomEntities.get_query(),
100 RecordType::GenAIEval => Queries::UpdateGenAIEvalEntities.get_query(),
101 RecordType::GenAITask => Queries::UpdateGenAITaskEntities.get_query(),
102 RecordType::GenAIWorkflow => Queries::UpdateGenAIWorkflowEntities.get_query(),
103 _ => {
104 return Err(SqlError::InvalidRecordTypeError(record_type.to_string()));
105 }
106 };
107 sqlx::query(query)
108 .bind(begin_timestamp)
109 .bind(end_timestamp)
110 .bind(entity_id)
111 .execute(db_pool)
112 .await
113 .map_err(SqlError::SqlxError)?;
114
115 Ok(())
116 }
117}