scouter_sql/sql/traits/
archive.rs

1use crate::sql::query::Queries;
2use crate::sql::schema::Entity;
3
4use crate::sql::utils::pg_rows_to_server_records;
5use chrono::{DateTime, Utc};
6
7use crate::sql::error::SqlError;
8use scouter_types::{RecordType, ServerRecords};
9
10use sqlx::{Pool, Postgres};
11
12use std::result::Result::Ok;
13
14use async_trait::async_trait;
15
16#[async_trait]
17pub trait ArchiveSqlLogic {
18    /// Function to get entities for archival
19    ///
20    /// # Arguments
21    /// * `record_type` - The type of record to get entities for
22    /// * `retention_period` - The retention period to get entities for
23    ///
24    async fn get_entities_to_archive(
25        pool: &Pool<Postgres>,
26        record_type: &RecordType,
27        retention_period: &i32,
28    ) -> Result<Vec<Entity>, SqlError> {
29        let query = match record_type {
30            RecordType::Spc => Queries::GetSpcEntities.get_query(),
31            RecordType::Psi => Queries::GetBinCountEntities.get_query(),
32            RecordType::Custom => Queries::GetCustomEntities.get_query(),
33            RecordType::LLMDrift => Queries::GetLLMDriftRecordEntitiesForArchive.get_query(),
34            RecordType::LLMMetric => Queries::GetLLMMetricEntitiesForArchive.get_query(),
35            _ => {
36                return Err(SqlError::InvalidRecordTypeError(record_type.to_string()));
37            }
38        };
39
40        let entities: Vec<Entity> = sqlx::query_as(&query.sql)
41            .bind(retention_period)
42            .fetch_all(pool)
43            .await?;
44
45        Ok(entities)
46    }
47
48    /// Function to get data for archival
49    ///
50    /// # Arguments
51    /// * `record_type` - The type of record to get data for
52    /// * `days` - The number of days to get data for
53    ///
54    /// # Returns
55    /// * `Result<ServerRecords, SqlError>` - Result of the query
56    ///
57    /// # Errors
58    /// * `SqlError` - If the query fails
59    async fn get_data_to_archive(
60        space: &str,
61        name: &str,
62        version: &str,
63        begin_timestamp: &DateTime<Utc>,
64        end_timestamp: &DateTime<Utc>,
65        record_type: &RecordType,
66        db_pool: &Pool<Postgres>,
67    ) -> Result<ServerRecords, SqlError> {
68        let query = match record_type {
69            RecordType::Spc => Queries::GetSpcDataForArchive.get_query(),
70            RecordType::Psi => Queries::GetBinCountDataForArchive.get_query(),
71            RecordType::Custom => Queries::GetCustomDataForArchive.get_query(),
72            RecordType::LLMDrift => Queries::GetLLMDriftRecordDataForArchive.get_query(),
73            RecordType::LLMMetric => Queries::GetLLMMetricDataForArchive.get_query(),
74            _ => {
75                return Err(SqlError::InvalidRecordTypeError(record_type.to_string()));
76            }
77        };
78        let rows = sqlx::query(&query.sql)
79            .bind(begin_timestamp)
80            .bind(end_timestamp)
81            .bind(space)
82            .bind(name)
83            .bind(version)
84            .fetch_all(db_pool)
85            .await
86            .map_err(SqlError::SqlxError)?;
87
88        // need to convert the rows to server records (storage dataframe expects this)
89        pg_rows_to_server_records(&rows, record_type)
90    }
91
92    async fn update_data_to_archived(
93        space: &str,
94        name: &str,
95        version: &str,
96        begin_timestamp: &DateTime<Utc>,
97        end_timestamp: &DateTime<Utc>,
98        record_type: &RecordType,
99        db_pool: &Pool<Postgres>,
100    ) -> Result<(), SqlError> {
101        let query = match record_type {
102            RecordType::Spc => Queries::UpdateSpcEntities.get_query(),
103            RecordType::Psi => Queries::UpdateBinCountEntities.get_query(),
104            RecordType::Custom => Queries::UpdateCustomEntities.get_query(),
105            RecordType::LLMDrift => Queries::UpdateLLMDriftEntities.get_query(),
106            RecordType::LLMMetric => Queries::UpdateLLMMetricEntities.get_query(),
107            _ => {
108                return Err(SqlError::InvalidRecordTypeError(record_type.to_string()));
109            }
110        };
111        sqlx::query(&query.sql)
112            .bind(begin_timestamp)
113            .bind(end_timestamp)
114            .bind(space)
115            .bind(name)
116            .bind(version)
117            .execute(db_pool)
118            .await
119            .map_err(SqlError::SqlxError)?;
120
121        Ok(())
122    }
123}