1use chrono::NaiveDateTime;
2use std::collections::BTreeMap;
3
4const INSERT_DRIFT_RECORD: &str = include_str!("scripts/insert_drift_record.sql");
7const GET_FEATURES: &str = include_str!("scripts/unique_features.sql");
8const GET_BINNED_FEATURE_VALUES: &str = include_str!("scripts/binned_feature_values.sql");
9const GET_FEATURE_VALUES: &str = include_str!("scripts/feature_values.sql");
10const INSERT_DRIFT_PROFILE: &str = include_str!("scripts/insert_drift_profile.sql");
11const INSERT_INTO_QUEUE: &str = include_str!("scripts/insert_into_queue.sql");
12const DELETE_FROM_QUEUE: &str = include_str!("scripts/delete_from_queue.sql");
13
14pub trait ToMap {
15 fn to_map(&self) -> BTreeMap<String, String>;
16}
17
18pub struct QueueParams {
19 pub table: String,
20 pub name: String,
21 pub repository: String,
22 pub version: String,
23 pub next_run: NaiveDateTime,
24}
25
26impl ToMap for QueueParams {
27 fn to_map(&self) -> BTreeMap<String, String> {
28 let mut params = BTreeMap::new();
29 params.insert("table".to_string(), self.table.clone());
30 params.insert("name".to_string(), self.name.clone());
31 params.insert("repository".to_string(), self.repository.clone());
32 params.insert("version".to_string(), self.version.clone());
33 params.insert("next_run".to_string(), self.next_run.to_string());
34 params
35 }
36}
37
38pub struct InsertParams {
39 pub table: String,
40 pub name: String,
41 pub repository: String,
42 pub version: String,
43 pub feature: String,
44 pub value: String,
45 pub created_at: NaiveDateTime,
46}
47
48impl ToMap for InsertParams {
49 fn to_map(&self) -> BTreeMap<String, String> {
50 let mut params = BTreeMap::new();
51 params.insert("table".to_string(), self.table.clone());
52 params.insert("name".to_string(), self.name.clone());
53 params.insert("repository".to_string(), self.repository.clone());
54 params.insert("feature".to_string(), self.feature.clone());
55 params.insert("value".to_string(), self.value.clone());
56 params.insert("version".to_string(), self.version.clone());
57 params.insert("created_at".to_string(), self.created_at.to_string());
58
59 params
60 }
61}
62
63pub struct GetFeaturesParams {
64 pub table: String,
65 pub name: String,
66 pub repository: String,
67 pub version: String,
68}
69
70impl ToMap for GetFeaturesParams {
71 fn to_map(&self) -> BTreeMap<String, String> {
72 let mut params = BTreeMap::new();
73 params.insert("table".to_string(), self.table.clone());
74 params.insert("name".to_string(), self.name.clone());
75 params.insert("repository".to_string(), self.repository.clone());
76 params.insert("version".to_string(), self.version.clone());
77
78 params
79 }
80}
81
82pub struct GetFeatureValuesParams {
83 pub table: String,
84 pub name: String,
85 pub repository: String,
86 pub version: String,
87 pub feature: String,
88 pub limit_timestamp: String,
89}
90
91impl ToMap for GetFeatureValuesParams {
92 fn to_map(&self) -> BTreeMap<String, String> {
93 let mut params = BTreeMap::new();
94 params.insert("table".to_string(), self.table.clone());
95 params.insert("name".to_string(), self.name.clone());
96 params.insert("repository".to_string(), self.repository.clone());
97 params.insert("version".to_string(), self.version.clone());
98 params.insert("feature".to_string(), self.feature.clone());
99 params.insert("limit_timestamp".to_string(), self.limit_timestamp.clone());
100
101 params
102 }
103}
104
105pub struct InsertMonitorProfileParams {
106 pub table: String,
107 pub name: String,
108 pub repository: String,
109 pub version: String,
110 pub profile: String,
111 pub cron: String,
112 pub next_run: NaiveDateTime,
113}
114
115impl ToMap for InsertMonitorProfileParams {
116 fn to_map(&self) -> BTreeMap<String, String> {
117 let mut params = BTreeMap::new();
118 params.insert("table".to_string(), self.table.clone());
119 params.insert("name".to_string(), self.name.clone());
120 params.insert("repository".to_string(), self.repository.clone());
121 params.insert("version".to_string(), self.version.clone());
122 params.insert("profile".to_string(), self.profile.clone());
123 params.insert("cron".to_string(), self.cron.clone());
124 params.insert("next_run".to_string(), self.next_run.to_string());
125
126 params
127 }
128}
129
130pub struct GetBinnedFeatureValuesParams {
131 pub table: String,
132 pub name: String,
133 pub repository: String,
134 pub feature: String,
135 pub version: String,
136 pub time_window: String,
137 pub bin: String,
138}
139
140impl ToMap for GetBinnedFeatureValuesParams {
141 fn to_map(&self) -> BTreeMap<String, String> {
142 let mut params = BTreeMap::new();
143 params.insert("table".to_string(), self.table.clone());
144 params.insert("name".to_string(), self.name.clone());
145 params.insert("repository".to_string(), self.repository.clone());
146 params.insert("feature".to_string(), self.feature.clone());
147 params.insert("version".to_string(), self.version.clone());
148 params.insert("time_window".to_string(), self.time_window.clone());
149 params.insert("bin".to_string(), self.bin.clone());
150
151 params
152 }
153}
154
155#[allow(dead_code)]
156pub enum Queries {
157 GetFeatures,
158 InsertDriftRecord,
159 InsertMonitorProfile,
160 GetBinnedFeatureValues,
161 GetFeatureValues,
162 InsertIntoQueue,
163 DeleteFromQueue,
164}
165
166impl Queries {
167 pub fn get_query(&self) -> SqlQuery {
168 match self {
169 Queries::GetFeatures => SqlQuery::new(GET_FEATURES),
171 Queries::InsertDriftRecord => SqlQuery::new(INSERT_DRIFT_RECORD),
172 Queries::GetBinnedFeatureValues => SqlQuery::new(GET_BINNED_FEATURE_VALUES),
173 Queries::GetFeatureValues => SqlQuery::new(GET_FEATURE_VALUES),
174 Queries::InsertMonitorProfile => SqlQuery::new(INSERT_DRIFT_PROFILE),
175 Queries::InsertIntoQueue => SqlQuery::new(INSERT_INTO_QUEUE),
176 Queries::DeleteFromQueue => SqlQuery::new(DELETE_FROM_QUEUE),
177 }
178 }
179}
180
181pub struct SqlQuery {
182 sql: String,
183}
184
185impl SqlQuery {
186 fn new(sql: &str) -> Self {
187 Self {
188 sql: sql.to_string(),
189 }
190 }
191
192 pub fn format<T>(&self, params: &T) -> String
193 where
194 T: ToMap,
195 {
196 let mut formatted_sql = self.sql.clone();
197 let params = params.to_map();
198
199 for (key, value) in params {
200 formatted_sql = formatted_sql.replace(&format!("${}", key), &value);
201 }
202
203 formatted_sql
204 }
205}
206
207#[cfg(test)]
208mod tests {
209
210 use super::*;
211 use chrono;
212
213 #[test]
214 pub fn test_insert_query() {
215 let query = Queries::InsertDriftRecord.get_query();
216
217 let params = InsertParams {
218 table: "features".to_string(),
219 name: "test".to_string(),
220 repository: "test".to_string(),
221 feature: "test".to_string(),
222 value: "test".to_string(),
223 version: "test".to_string(),
224 created_at: chrono::Utc::now().naive_utc(),
225 };
226
227 let formatted_sql = query.format(¶ms);
228
229 assert_eq!(
230 formatted_sql,
231 format!("INSERT INTO features (created_at, name, repository, version, feature, value) \nVALUES ('{}', 'test', 'test', 'test', 'test', 'test')\nON CONFLICT DO NOTHING;", params.created_at)
232 );
233 }
234
235 #[test]
236 fn test_get_features_query() {
237 let query = Queries::GetFeatures.get_query();
238
239 let params = GetFeaturesParams {
240 table: "schema.table".to_string(),
241 name: "test".to_string(),
242 repository: "test".to_string(),
243 version: "test".to_string(),
244 };
245
246 let formatted_sql = query.format(¶ms);
247
248 assert_eq!(
249 formatted_sql,
250 "SELECT
251DISTINCT feature
252FROM schema.table
253WHERE
254 name = 'test'
255 AND repository = 'test'
256 AND version = 'test';"
257 );
258 }
259
260 #[test]
261 fn test_get_features_values_query() {
262 let query = Queries::GetBinnedFeatureValues.get_query();
263
264 let params = GetBinnedFeatureValuesParams {
265 table: "schema.table".to_string(),
266 name: "test".to_string(),
267 repository: "test".to_string(),
268 feature: "test".to_string(),
269 version: "test".to_string(),
270 time_window: "10".to_string(),
271 bin: "1".to_string(),
272 };
273
274 let formatted_sql = query.format(¶ms);
275
276 assert_eq!(
277 formatted_sql,
278 "with subquery as (
279 SELECT
280 date_bin('1 minutes', created_at, TIMESTAMP '1970-01-01') as created_at,
281 name,
282 repository,
283 feature,
284 version,
285 value
286 from schema.table
287 WHERE
288 created_at > timezone('utc', now()) - interval '10' minute
289 AND version = 'test'
290 AND name = 'test'
291 AND repository = 'test'
292 AND feature = 'test'
293)
294
295SELECT
296created_at,
297name,
298repository,
299feature,
300version,
301avg(value) as value
302FROM subquery
303GROUP BY
304 created_at,
305 name,
306 repository,
307 feature,
308 version
309ORDER BY
310 created_at DESC;"
311 );
312 }
313
314 #[test]
315 fn test_get_feature_values_query() {
316 let query = Queries::GetFeatureValues.get_query();
317
318 let params = GetFeatureValuesParams {
319 table: "schema.table".to_string(),
320 name: "test".to_string(),
321 repository: "test".to_string(),
322 feature: "test".to_string(),
323 version: "test".to_string(),
324 limit_timestamp: "2024-01-01 00:00:00".to_string(),
325 };
326
327 let formatted_sql = query.format(¶ms);
328
329 assert_eq!(
330 formatted_sql,
331 "SELECT
332created_at,
333feature,
334value,
335version
336FROM schema.table
337WHERE
338 created_at > '2024-01-01 00:00:00'
339 AND version = 'test'
340 AND name = 'test'
341 AND repository = 'test'
342 AND feature = 'test';"
343 );
344 }
345}