Skip to main content

scouter_sql/sql/traits/
archive.rs

1use crate::sql::schema::Entity;
2use crate::sql::{query::Queries, utils::parse_pg_rows};
3use chrono::{DateTime, Utc};
4
5use crate::sql::error::SqlError;
6use scouter_types::{RecordType, ServerRecords};
7use sqlx::{Pool, Postgres};
8
9use std::result::Result::Ok;
10
11use async_trait::async_trait;
12
13#[async_trait]
14pub trait ArchiveSqlLogic {
15    /// Function to get entities for archival
16    ///
17    /// # Arguments
18    /// * `record_type` - The type of record to get entities for
19    /// * `retention_period` - The retention period to get entities for
20    ///
21    async fn get_entities_to_archive(
22        pool: &Pool<Postgres>,
23        record_type: &RecordType,
24        retention_period: &i32,
25    ) -> Result<Vec<Entity>, SqlError> {
26        let query = match record_type {
27            RecordType::Spc => Queries::GetSpcEntities.get_query(),
28            RecordType::Psi => Queries::GetBinCountEntities.get_query(),
29            RecordType::Custom => Queries::GetCustomEntities.get_query(),
30            RecordType::GenAIEval => Queries::GetGenAIEvalRecordEntitiesForArchive.get_query(),
31            RecordType::GenAITask => Queries::GetGenAIEvalTaskResultEntitiesForArchive.get_query(),
32            RecordType::GenAIWorkflow => {
33                Queries::GetGenAIEvalWorkflowEntitiesForArchive.get_query()
34            }
35            _ => {
36                return Err(SqlError::InvalidRecordTypeError(record_type.to_string()));
37            }
38        };
39
40        let entities: Vec<Entity> = sqlx::query_as(query)
41            .bind(retention_period)
42            .fetch_all(pool)
43            .await?;
44
45        Ok(entities)
46    }
47
48    /// Function to get data for archival
49    ///
50    /// # Arguments
51    /// * `record_type` - The type of record to get data for
52    /// * `days` - The number of days to get data for
53    ///
54    /// # Returns
55    /// * `Result<ServerRecords, SqlError>` - Result of the query
56    ///
57    /// # Errors
58    /// * `SqlError` - If the query fails
59    async fn get_data_to_archive(
60        entity_id: &i32,
61        begin_timestamp: &DateTime<Utc>,
62        end_timestamp: &DateTime<Utc>,
63        record_type: &RecordType,
64        db_pool: &Pool<Postgres>,
65    ) -> Result<ServerRecords, SqlError> {
66        let query = match record_type {
67            RecordType::Spc => Queries::GetSpcDataForArchive.get_query(),
68            RecordType::Psi => Queries::GetBinCountDataForArchive.get_query(),
69            RecordType::Custom => Queries::GetCustomDataForArchive.get_query(),
70            RecordType::GenAIEval => Queries::GetGenAIEvalRecordDataForArchive.get_query(),
71            RecordType::GenAITask => Queries::GetGenAITaskResultDataForArchive.get_query(),
72            RecordType::GenAIWorkflow => Queries::GetGenAIWorkflowResultDataForArchive.get_query(),
73            _ => {
74                return Err(SqlError::InvalidRecordTypeError(record_type.to_string()));
75            }
76        };
77        let rows = sqlx::query(query)
78            .bind(begin_timestamp)
79            .bind(end_timestamp)
80            .bind(entity_id)
81            .fetch_all(db_pool)
82            .await
83            .map_err(SqlError::SqlxError)?;
84
85        // need to convert the rows to server records (storage dataframe expects this)
86        parse_pg_rows(&rows, record_type)
87    }
88
89    async fn update_data_to_archived(
90        entity_id: &i32,
91        begin_timestamp: &DateTime<Utc>,
92        end_timestamp: &DateTime<Utc>,
93        record_type: &RecordType,
94        db_pool: &Pool<Postgres>,
95    ) -> Result<(), SqlError> {
96        let query = match record_type {
97            RecordType::Spc => Queries::UpdateSpcEntities.get_query(),
98            RecordType::Psi => Queries::UpdateBinCountEntities.get_query(),
99            RecordType::Custom => Queries::UpdateCustomEntities.get_query(),
100            RecordType::GenAIEval => Queries::UpdateGenAIEvalEntities.get_query(),
101            RecordType::GenAITask => Queries::UpdateGenAITaskEntities.get_query(),
102            RecordType::GenAIWorkflow => Queries::UpdateGenAIWorkflowEntities.get_query(),
103            _ => {
104                return Err(SqlError::InvalidRecordTypeError(record_type.to_string()));
105            }
106        };
107        sqlx::query(query)
108            .bind(begin_timestamp)
109            .bind(end_timestamp)
110            .bind(entity_id)
111            .execute(db_pool)
112            .await
113            .map_err(SqlError::SqlxError)?;
114
115        Ok(())
116    }
117}