Skip to main content

scouter_sql/sql/traits/
alert.rs

1use crate::sql::query::Queries;
2use crate::sql::schema::UpdateAlertResult;
3
4use scouter_types::contracts::{
5    DriftAlertPaginationRequest, DriftAlertPaginationResponse, UpdateAlertStatus,
6};
7
8use crate::sql::error::SqlError;
9use scouter_types::{alert::Alert, AlertMap, RecordCursor};
10
11use async_trait::async_trait;
12use sqlx::{postgres::PgQueryResult, Pool, Postgres};
13use std::result::Result::Ok;
14
15#[async_trait]
16pub trait AlertSqlLogic {
17    /// Inserts a drift alert into the database
18    ///
19    /// # Arguments
20    ///
21    /// * `task_info` - The drift task info containing entity_id
22    /// * `entity_name` - The name of the entity
23    /// * `alert` - The alert to insert into the database
24    /// * `drift_type` - The type of drift alert
25    ///
26    async fn insert_drift_alert(
27        pool: &Pool<Postgres>,
28        entity_id: &i32,
29        alert: &AlertMap,
30    ) -> Result<PgQueryResult, SqlError> {
31        let query = Queries::InsertDriftAlert.get_query();
32
33        let query_result = sqlx::query(query)
34            .bind(entity_id)
35            .bind(alert.entity_name())
36            .bind(serde_json::to_value(alert).unwrap())
37            .execute(pool)
38            .await?;
39
40        Ok(query_result)
41    }
42
43    /// Get drift alerts from the database
44    ///
45    /// # Arguments
46    ///
47    /// * `params` - The drift alert request parameters
48    /// * `id` - The entity ID to filter alerts
49    ///
50    /// # Returns
51    ///
52    /// * `Result<Vec<Alert>, SqlError>` - Result of the query
53    async fn get_paginated_drift_alerts(
54        pool: &Pool<Postgres>,
55        params: &DriftAlertPaginationRequest,
56        entity_id: &i32,
57    ) -> Result<DriftAlertPaginationResponse, SqlError> {
58        let query = Queries::GetPaginatedDriftAlerts.get_query();
59        let limit = params.limit.unwrap_or(50);
60        let direction = params.direction.as_deref().unwrap_or("next");
61
62        let mut items: Vec<Alert> = sqlx::query_as(query)
63            .bind(entity_id) // $1: entity_id
64            .bind(params.active) // $2: active filter
65            .bind(params.cursor_created_at) // $3: cursor created_at
66            .bind(direction) // $4: direction
67            .bind(params.cursor_id) // $5: cursor id
68            .bind(limit) // $6: limit
69            .bind(params.start_datetime) // $7: start_datetime
70            .bind(params.end_datetime) // $8: end_datetime
71            .fetch_all(pool)
72            .await
73            .map_err(SqlError::SqlxError)?;
74
75        let has_more = items.len() > limit as usize;
76
77        if has_more {
78            items.pop();
79        }
80
81        let (has_next, next_cursor, has_previous, previous_cursor) = match direction {
82            "previous" => {
83                // Backward pagination - reverse since we fetched in ASC order
84                items.reverse();
85
86                let previous_cursor = if has_more {
87                    items.first().map(|first| RecordCursor {
88                        created_at: first.created_at,
89                        id: first.id as i64,
90                    })
91                } else {
92                    None
93                };
94
95                let next_cursor = items.last().map(|last| RecordCursor {
96                    created_at: last.created_at,
97                    id: last.id as i64,
98                });
99
100                (
101                    params.cursor_created_at.is_some(), // has_next (we came from somewhere)
102                    next_cursor,
103                    has_more, // has_previous (more items before)
104                    previous_cursor,
105                )
106            }
107            _ => {
108                // Forward pagination (default "next")
109                let next_cursor = if has_more {
110                    items.last().map(|last| RecordCursor {
111                        created_at: last.created_at,
112                        id: last.id as i64,
113                    })
114                } else {
115                    None
116                };
117
118                // Always set previous_cursor to first item (like trace pagination)
119                let previous_cursor = items.first().map(|first| RecordCursor {
120                    created_at: first.created_at,
121                    id: first.id as i64,
122                });
123
124                (
125                    has_more, // has_next (more items after)
126                    next_cursor,
127                    params.cursor_created_at.is_some(), // has_previous (we came from somewhere)
128                    previous_cursor,
129                )
130            }
131        };
132
133        Ok(DriftAlertPaginationResponse {
134            items,
135            has_next,
136            next_cursor,
137            has_previous,
138            previous_cursor,
139        })
140    }
141
142    /// Update drift alert status in the database
143    ////
144    /// # Arguments
145    ///// * `params` - The update alert status parameters
146    /// # Returns
147    //// * `Result<UpdateAlertResult, SqlError>` - Result of the update operation
148    async fn update_drift_alert_status(
149        pool: &Pool<Postgres>,
150        params: &UpdateAlertStatus,
151    ) -> Result<UpdateAlertResult, SqlError> {
152        let query = Queries::UpdateAlertStatus.get_query();
153
154        let result: Result<UpdateAlertResult, SqlError> = sqlx::query_as(query)
155            .bind(params.id)
156            .bind(params.active)
157            .fetch_one(pool)
158            .await
159            .map_err(SqlError::SqlxError);
160
161        result
162    }
163}