1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
use crate::sql::query::Queries;
use crate::sql::schema::UpdateAlertResult;
use scouter_types::contracts::{
DriftAlertPaginationRequest, DriftAlertPaginationResponse, UpdateAlertStatus,
};
use crate::sql::error::SqlError;
use scouter_types::{alert::Alert, AlertMap, RecordCursor};
use async_trait::async_trait;
use sqlx::{postgres::PgQueryResult, Pool, Postgres};
use std::result::Result::Ok;
#[async_trait]
pub trait AlertSqlLogic {
/// Inserts a drift alert into the database
///
/// # Arguments
///
/// * `task_info` - The drift task info containing entity_id
/// * `entity_name` - The name of the entity
/// * `alert` - The alert to insert into the database
/// * `drift_type` - The type of drift alert
///
async fn insert_drift_alert(
pool: &Pool<Postgres>,
entity_id: &i32,
alert: &AlertMap,
) -> Result<PgQueryResult, SqlError> {
let query = Queries::InsertDriftAlert.get_query();
let query_result = sqlx::query(query)
.bind(entity_id)
.bind(alert.entity_name())
.bind(serde_json::to_value(alert).unwrap())
.execute(pool)
.await?;
Ok(query_result)
}
/// Get drift alerts from the database
///
/// # Arguments
///
/// * `params` - The drift alert request parameters
/// * `id` - The entity ID to filter alerts
///
/// # Returns
///
/// * `Result<Vec<Alert>, SqlError>` - Result of the query
async fn get_paginated_drift_alerts(
pool: &Pool<Postgres>,
params: &DriftAlertPaginationRequest,
entity_id: &i32,
) -> Result<DriftAlertPaginationResponse, SqlError> {
let query = Queries::GetPaginatedDriftAlerts.get_query();
let limit = params.limit.unwrap_or(50);
let direction = params.direction.as_deref().unwrap_or("next");
let mut items: Vec<Alert> = sqlx::query_as(query)
.bind(entity_id) // $1: entity_id
.bind(params.active) // $2: active filter
.bind(params.cursor_created_at) // $3: cursor created_at
.bind(direction) // $4: direction
.bind(params.cursor_id) // $5: cursor id
.bind(limit) // $6: limit
.bind(params.start_datetime) // $7: start_datetime
.bind(params.end_datetime) // $8: end_datetime
.fetch_all(pool)
.await
.map_err(SqlError::SqlxError)?;
let has_more = items.len() > limit as usize;
if has_more {
items.pop();
}
let (has_next, next_cursor, has_previous, previous_cursor) = match direction {
"previous" => {
// Backward pagination - reverse since we fetched in ASC order
items.reverse();
let previous_cursor = if has_more {
items.first().map(|first| RecordCursor {
created_at: first.created_at,
id: first.id as i64,
})
} else {
None
};
let next_cursor = items.last().map(|last| RecordCursor {
created_at: last.created_at,
id: last.id as i64,
});
(
params.cursor_created_at.is_some(), // has_next (we came from somewhere)
next_cursor,
has_more, // has_previous (more items before)
previous_cursor,
)
}
_ => {
// Forward pagination (default "next")
let next_cursor = if has_more {
items.last().map(|last| RecordCursor {
created_at: last.created_at,
id: last.id as i64,
})
} else {
None
};
// Always set previous_cursor to first item (like trace pagination)
let previous_cursor = items.first().map(|first| RecordCursor {
created_at: first.created_at,
id: first.id as i64,
});
(
has_more, // has_next (more items after)
next_cursor,
params.cursor_created_at.is_some(), // has_previous (we came from somewhere)
previous_cursor,
)
}
};
Ok(DriftAlertPaginationResponse {
items,
has_next,
next_cursor,
has_previous,
previous_cursor,
})
}
/// Update drift alert status in the database
////
/// # Arguments
///// * `params` - The update alert status parameters
/// # Returns
//// * `Result<UpdateAlertResult, SqlError>` - Result of the update operation
async fn update_drift_alert_status(
pool: &Pool<Postgres>,
params: &UpdateAlertStatus,
) -> Result<UpdateAlertResult, SqlError> {
let query = Queries::UpdateAlertStatus.get_query();
let result: Result<UpdateAlertResult, SqlError> = sqlx::query_as(query)
.bind(params.id)
.bind(params.active)
.fetch_one(pool)
.await
.map_err(SqlError::SqlxError);
result
}
}