1use chrono::Datelike;
3use chrono::Timelike;
4use chrono::{DateTime, Utc};
5pub fn get_binned_custom_metric_values_query(
6 bin: &f64,
7 start_time: &DateTime<Utc>,
8 end_time: &DateTime<Utc>,
9 space: &str,
10 name: &str,
11 version: &str,
12) -> String {
13 format!(
14 r#"WITH subquery1 AS (
15 SELECT
16 date_bin(INTERVAL '{} minute', created_at, TIMESTAMP '1970-01-01') as created_at,
17 metric,
18 value
19 FROM binned_custom_metric
20 WHERE
21 1=1
22 AND created_at between TIMESTAMP '{}' AND TIMESTAMP '{}'
23 AND space = '{}'
24 AND name = '{}'
25 AND version = '{}'
26 ),
27
28subquery2 AS (
29 SELECT
30 created_at,
31 metric,
32 avg(value) as average,
33 stddev(value) as standard_dev
34 FROM subquery1
35 GROUP BY
36 created_at,
37 metric
38),
39
40subquery3 AS (
41 SELECT
42 created_at,
43 metric,
44 struct(
45 average as avg,
46 average - COALESCE(standard_dev, 0) as lower_bound,
47 average + COALESCE(standard_dev, 0) as upper_bound
48 ) as stats
49 FROM subquery2
50)
51
52SELECT
53 metric,
54 array_agg(created_at) as created_at,
55 array_agg(stats) as stats
56FROM subquery3
57GROUP BY metric;"#,
58 bin,
59 start_time.to_rfc3339(),
60 end_time.to_rfc3339(),
61 space,
62 name,
63 version
64 )
65}
66
67pub fn get_binned_psi_drift_records_query(
68 bin: &f64,
69 start_time: &DateTime<Utc>,
70 end_time: &DateTime<Utc>,
71 space: &str,
72 name: &str,
73 version: &str,
74) -> String {
75 format!(
76 r#"WITH feature_bin_total AS (
77 SELECT
78 date_bin('{} minutes', created_at, TIMESTAMP '1970-01-01') as created_at,
79 name,
80 space,
81 version,
82 feature,
83 bin_id,
84 SUM(bin_count) AS bin_total_count
85 FROM binned_psi
86 WHERE
87 1=1
88 AND created_at between TIMESTAMP '{}' AND TIMESTAMP '{}'
89 AND space = '{}'
90 AND name = '{}'
91 AND version = '{}'
92 GROUP BY 1, 2, 3, 4, 5, 6
93 ),
94
95 feature_total AS (
96 SELECT
97 date_bin('{} minutes', created_at, TIMESTAMP '1970-01-01') as created_at,
98 name,
99 space,
100 version,
101 feature,
102 cast(SUM(bin_count) as float) AS feature_total_count
103 FROM binned_psi
104 WHERE
105 1=1
106 AND created_at between TIMESTAMP '{}' AND TIMESTAMP '{}'
107 AND space = '{}'
108 AND name = '{}'
109 AND version = '{}'
110 GROUP BY 1, 2, 3, 4, 5
111 ),
112
113 feature_bin_proportions AS (
114 SELECT
115 b.created_at,
116 b.feature,
117 f.feature_total_count,
118 b.bin_id,
119 cast(b.bin_total_count as float) / f.feature_total_count AS proportion
120 FROM feature_bin_total b
121 JOIN feature_total f
122 ON f.feature = b.feature
123 AND f.version = b.version
124 AND f.space = b.space
125 AND f.name = b.name
126 AND f.created_at = b.created_at
127 ),
128
129 overall_agg as (
130 SELECT
131 feature,
132 struct(
133 array_agg(bin_id) as bin_id,
134 array_agg(proportion) as proportion
135 ) as bins
136 FROM feature_bin_proportions
137 WHERE feature_total_count > 1
138 GROUP BY feature
139 ),
140
141 bin_agg as (
142 SELECT
143 feature,
144 created_at,
145 struct(
146 array_agg(bin_id) as bin_id,
147 array_agg(proportion) as proportion
148 ) AS bin_proportions
149 FROM feature_bin_proportions
150 WHERE feature_total_count > 1
151 GROUP BY
152 feature,
153 created_at
154 ),
155
156 feature_agg as (
157 select
158 feature,
159 array_agg(created_at order by created_at desc) as created_at,
160 array_agg(bin_proportions order by created_at desc) as bin_proportions
161 FROM bin_agg
162 WHERE 1=1
163 GROUP BY feature
164 )
165
166 SELECT
167 feature_agg.feature,
168 created_at,
169 bin_proportions,
170 bins as overall_proportions
171 FROM feature_agg
172 JOIN overall_agg
173 ON overall_agg.feature = feature_agg.feature;"#,
174 bin,
175 start_time.to_rfc3339(),
176 end_time.to_rfc3339(),
177 space,
178 name,
179 version,
180 bin,
181 start_time.to_rfc3339(),
182 end_time.to_rfc3339(),
183 space,
184 name,
185 version
186 )
187}
188
189pub fn get_binned_spc_drift_records_query(
190 bin: &f64,
191 start_time: &DateTime<Utc>,
192 end_time: &DateTime<Utc>,
193 space: &str,
194 name: &str,
195 version: &str,
196) -> String {
197 let start_year = start_time.year();
198 let start_month = start_time.month();
199 let start_day = start_time.day();
200 let start_hour = start_time.hour();
201
202 let end_year = end_time.year();
203 let end_month = end_time.month();
204 let end_day = end_time.day();
205 let end_hour = end_time.hour();
206
207 format!(
208 r#"WITH subquery1 AS (
209 SELECT
210 date_bin('{} minutes', created_at, TIMESTAMP '1970-01-01') as created_at,
211 name,
212 space,
213 feature,
214 version,
215 value
216 FROM binned_spc
217 WHERE
218 -- Partition pruning predicates (inclusive)
219 (year >= {start_year}) AND
220 (year > {start_year} OR month >= {start_month}) AND
221 (year > {start_year} OR month > {start_month} OR day >= {start_day}) AND
222 (year > {start_year} OR month > {start_month} OR day > {start_day} OR hour >= {start_hour})
223 AND
224 (year <= {end_year}) AND
225 (year < {end_year} OR month <= {end_month}) AND
226 (year < {end_year} OR month < {end_month} OR day <= {end_day}) AND
227 (year < {end_year} OR month < {end_month} OR day < {end_day} OR hour <= {end_hour})
228 -- Regular filters (inclusive)
229 AND created_at between TIMESTAMP '{}' AND TIMESTAMP '{}'
230 AND space = '{}'
231 AND name = '{}'
232 AND version = '{}'
233 ),
234
235 subquery2 AS (
236 SELECT
237 created_at,
238 name,
239 space,
240 feature,
241 version,
242 avg(value) as value
243 FROM subquery1
244 GROUP BY
245 created_at,
246 name,
247 space,
248 feature,
249 version
250 )
251
252 SELECT
253 feature,
254 array_agg(created_at ORDER BY created_at DESC) as created_at,
255 array_agg(value ORDER BY created_at DESC) as values
256 FROM subquery2
257 GROUP BY
258 feature;"#,
259 bin,
260 start_time.to_rfc3339(),
261 end_time.to_rfc3339(),
262 space,
263 name,
264 version
265 )
266}