scouter_sql/sql/traits/
archive.rs

1use crate::sql::query::Queries;
2use crate::sql::schema::Entity;
3
4use crate::sql::utils::pg_rows_to_server_records;
5use chrono::{DateTime, Utc};
6
7use crate::sql::error::SqlError;
8use scouter_types::{RecordType, ServerRecords};
9
10use sqlx::{Pool, Postgres};
11
12use std::result::Result::Ok;
13
14use async_trait::async_trait;
15
16#[async_trait]
17pub trait ArchiveSqlLogic {
18    /// Function to get entities for archival
19    ///
20    /// # Arguments
21    /// * `record_type` - The type of record to get entities for
22    /// * `retention_period` - The retention period to get entities for
23    ///
24    async fn get_entities_to_archive(
25        pool: &Pool<Postgres>,
26        record_type: &RecordType,
27        retention_period: &i32,
28    ) -> Result<Vec<Entity>, SqlError> {
29        let query = match record_type {
30            RecordType::Spc => Queries::GetSpcEntities.get_query(),
31            RecordType::Psi => Queries::GetBinCountEntities.get_query(),
32            RecordType::Custom => Queries::GetCustomEntities.get_query(),
33            _ => {
34                return Err(SqlError::InvalidRecordTypeError);
35            }
36        };
37
38        let entities: Vec<Entity> = sqlx::query_as(&query.sql)
39            .bind(retention_period)
40            .fetch_all(pool)
41            .await?;
42
43        Ok(entities)
44    }
45
46    /// Function to get data for archival
47    ///
48    /// # Arguments
49    /// * `record_type` - The type of record to get data for
50    /// * `days` - The number of days to get data for
51    ///
52    /// # Returns
53    /// * `Result<ServerRecords, SqlError>` - Result of the query
54    ///
55    /// # Errors
56    /// * `SqlError` - If the query fails
57    async fn get_data_to_archive(
58        space: &str,
59        name: &str,
60        version: &str,
61        begin_timestamp: &DateTime<Utc>,
62        end_timestamp: &DateTime<Utc>,
63        record_type: &RecordType,
64        db_pool: &Pool<Postgres>,
65    ) -> Result<ServerRecords, SqlError> {
66        let query = match record_type {
67            RecordType::Spc => Queries::GetSpcDataForArchive.get_query(),
68            RecordType::Psi => Queries::GetBinCountDataForArchive.get_query(),
69            RecordType::Custom => Queries::GetCustomDataForArchive.get_query(),
70            _ => {
71                return Err(SqlError::InvalidRecordTypeError);
72            }
73        };
74        let rows = sqlx::query(&query.sql)
75            .bind(begin_timestamp)
76            .bind(end_timestamp)
77            .bind(space)
78            .bind(name)
79            .bind(version)
80            .fetch_all(db_pool)
81            .await
82            .map_err(SqlError::SqlxError)?;
83
84        // need to convert the rows to server records (storage dataframe expects this)
85        pg_rows_to_server_records(&rows, record_type)
86    }
87
88    async fn update_data_to_archived(
89        space: &str,
90        name: &str,
91        version: &str,
92        begin_timestamp: &DateTime<Utc>,
93        end_timestamp: &DateTime<Utc>,
94        record_type: &RecordType,
95        db_pool: &Pool<Postgres>,
96    ) -> Result<(), SqlError> {
97        let query = match record_type {
98            RecordType::Spc => Queries::UpdateSpcEntities.get_query(),
99            RecordType::Psi => Queries::UpdateBinCountEntities.get_query(),
100            RecordType::Custom => Queries::UpdateCustomEntities.get_query(),
101            _ => {
102                return Err(SqlError::InvalidRecordTypeError);
103            }
104        };
105        sqlx::query(&query.sql)
106            .bind(begin_timestamp)
107            .bind(end_timestamp)
108            .bind(space)
109            .bind(name)
110            .bind(version)
111            .execute(db_pool)
112            .await
113            .map_err(SqlError::SqlxError)?;
114
115        Ok(())
116    }
117}