scouter_dataframe/sql/
helper.rs

1// this is a helper file for generating sql queries to retrieve binned data from datafusion.
2use chrono::Datelike;
3use chrono::Timelike;
4use chrono::{DateTime, Utc};
5pub fn get_binned_custom_metric_values_query(
6    bin: &f64,
7    start_time: &DateTime<Utc>,
8    end_time: &DateTime<Utc>,
9    entity_id: &i32,
10) -> String {
11    format!(
12        r#"WITH subquery1 AS (
13    SELECT
14        date_bin(INTERVAL '{} minute', created_at, TIMESTAMP '1970-01-01') as created_at,
15        metric,
16        value
17    FROM binned_custom_metric
18    WHERE
19        1=1
20        AND created_at between TIMESTAMP '{}' AND TIMESTAMP '{}'
21        AND entity_id = {}
22    ),
23
24subquery2 AS (
25    SELECT
26        created_at,
27        metric,
28        avg(value) as average,
29        stddev(value) as standard_dev
30    FROM subquery1
31    GROUP BY
32        created_at,
33        metric
34),
35
36subquery3 AS (
37    SELECT
38        created_at,
39        metric,
40        struct(
41            average as avg,
42            average - COALESCE(standard_dev, 0) as lower_bound,
43            average + COALESCE(standard_dev, 0) as upper_bound
44        ) as stats
45    FROM subquery2
46)
47
48SELECT
49    metric,
50    array_agg(created_at) as created_at,
51    array_agg(stats) as stats
52FROM subquery3
53GROUP BY metric;"#,
54        bin,
55        start_time.to_rfc3339(),
56        end_time.to_rfc3339(),
57        entity_id
58    )
59}
60
61pub fn get_binned_genai_task_values_query(
62    bin: &f64,
63    start_time: &DateTime<Utc>,
64    end_time: &DateTime<Utc>,
65    entity_id: &i32,
66) -> String {
67    format!(
68        r#"WITH subquery1 AS (
69    SELECT
70        date_bin(INTERVAL '{} minute', created_at, TIMESTAMP '1970-01-01') as created_at,
71        task_id as metric,
72        value
73    FROM binned_genai_task
74    WHERE
75        1=1
76        AND created_at between TIMESTAMP '{}' AND TIMESTAMP '{}'
77        AND entity_id = {}
78    ),
79
80subquery2 AS (
81    SELECT
82        created_at,
83        metric,
84        avg(value) as average,
85        stddev(value) as standard_dev
86    FROM subquery1
87    GROUP BY
88        created_at,
89        metric
90),
91
92subquery3 AS (
93    SELECT
94        created_at,
95        metric,
96        struct(
97            average as avg,
98            average - COALESCE(standard_dev, 0) as lower_bound,
99            average + COALESCE(standard_dev, 0) as upper_bound
100        ) as stats
101    FROM subquery2
102)
103
104SELECT
105    metric,
106    array_agg(created_at) as created_at,
107    array_agg(stats) as stats
108FROM subquery3
109GROUP BY metric;"#,
110        bin,
111        start_time.to_rfc3339(),
112        end_time.to_rfc3339(),
113        entity_id
114    )
115}
116
117pub fn get_binned_genai_workflow_values_query(
118    bin: &f64,
119    start_time: &DateTime<Utc>,
120    end_time: &DateTime<Utc>,
121    entity_id: &i32,
122) -> String {
123    format!(
124        r#"WITH subquery1 AS (
125    SELECT
126        date_bin(INTERVAL '{} minute', created_at, TIMESTAMP '1970-01-01') as created_at,
127        metric,
128        pass_rate as value
129    FROM binned_genai_workflow
130    WHERE
131        1=1
132        AND created_at between TIMESTAMP '{}' AND TIMESTAMP '{}'
133        AND entity_id = {}
134    ),
135
136subquery2 AS (
137    SELECT
138        created_at,
139        metric,
140        avg(value) as average,
141        stddev(value) as standard_dev
142    FROM subquery1
143    GROUP BY
144        created_at,
145        metric
146),
147
148subquery3 AS (
149    SELECT
150        created_at,
151        metric,
152        struct(
153            average as avg,
154            average - COALESCE(standard_dev, 0) as lower_bound,
155            average + COALESCE(standard_dev, 0) as upper_bound
156        ) as stats
157    FROM subquery2
158)
159
160SELECT
161    metric,
162    array_agg(created_at) as created_at,
163    array_agg(stats) as stats
164FROM subquery3
165GROUP BY metric;"#,
166        bin,
167        start_time.to_rfc3339(),
168        end_time.to_rfc3339(),
169        entity_id
170    )
171}
172
173pub fn get_binned_psi_drift_records_query(
174    bin: &f64,
175    start_time: &DateTime<Utc>,
176    end_time: &DateTime<Utc>,
177    entity_id: &i32,
178) -> String {
179    format!(
180        r#"WITH feature_bin_total AS (
181        SELECT
182            date_bin('{} minutes', created_at, TIMESTAMP '1970-01-01') as created_at,
183            entity_id,
184            feature,
185            bin_id,
186            SUM(bin_count) AS bin_total_count
187        FROM binned_psi
188        WHERE
189            1=1
190            AND created_at between TIMESTAMP '{}' AND TIMESTAMP '{}'
191            AND entity_id = {}
192        GROUP BY 1, 2, 3, 4
193    ),
194
195    feature_total AS (
196        SELECT
197            date_bin('{} minutes', created_at, TIMESTAMP '1970-01-01') as created_at,
198            entity_id,
199            feature,
200            cast(SUM(bin_count) as float) AS feature_total_count
201        FROM binned_psi
202        WHERE
203            1=1
204            AND created_at between TIMESTAMP '{}' AND TIMESTAMP '{}'
205            AND entity_id = {}
206        GROUP BY 1, 2, 3
207    ),
208
209    feature_bin_proportions AS (
210        SELECT
211            b.created_at,
212            b.feature,
213            f.feature_total_count,
214            b.bin_id,
215            cast(b.bin_total_count as float) / f.feature_total_count AS proportion
216        FROM feature_bin_total b
217        JOIN feature_total f
218            ON f.feature = b.feature
219            AND f.entity_id = b.entity_id
220            AND f.created_at = b.created_at
221    ),
222
223    overall_agg as (
224        SELECT
225            feature,
226            struct(
227                array_agg(bin_id) as bin_id,
228                array_agg(proportion) as proportion
229            ) as bins
230        FROM feature_bin_proportions
231        WHERE feature_total_count > 1
232        GROUP BY feature
233    ),
234
235    bin_agg as (
236        SELECT
237            feature,
238            created_at,
239            struct(
240                array_agg(bin_id) as bin_id,
241                array_agg(proportion) as proportion
242            ) AS bin_proportions
243        FROM feature_bin_proportions
244        WHERE feature_total_count > 1
245        GROUP BY
246            feature,
247            created_at
248    ),
249
250    feature_agg as (
251    select
252    feature,
253    array_agg(created_at order by created_at desc) as created_at,
254    array_agg(bin_proportions order by created_at desc) as bin_proportions
255    FROM bin_agg
256    WHERE 1=1
257    GROUP BY feature
258    )
259
260    SELECT
261        feature_agg.feature,
262        created_at,
263        bin_proportions,
264        bins as overall_proportions
265    FROM feature_agg
266    JOIN overall_agg
267        ON overall_agg.feature = feature_agg.feature;"#,
268        bin,
269        start_time.to_rfc3339(),
270        end_time.to_rfc3339(),
271        entity_id,
272        bin,
273        start_time.to_rfc3339(),
274        end_time.to_rfc3339(),
275        entity_id,
276    )
277}
278
279pub fn get_binned_spc_drift_records_query(
280    bin: &f64,
281    start_time: &DateTime<Utc>,
282    end_time: &DateTime<Utc>,
283    entity_id: &i32,
284) -> String {
285    let start_year = start_time.year();
286    let start_month = start_time.month();
287    let start_day = start_time.day();
288    let start_hour = start_time.hour();
289
290    let end_year = end_time.year();
291    let end_month = end_time.month();
292    let end_day = end_time.day();
293    let end_hour = end_time.hour();
294
295    format!(
296        r#"WITH subquery1 AS (
297        SELECT
298            date_bin('{} minutes', created_at, TIMESTAMP '1970-01-01') as created_at,
299            entity_id,
300            feature,
301            value
302        FROM binned_spc
303        WHERE
304            -- Partition pruning predicates (inclusive)
305            (year >= {start_year}) AND
306            (year > {start_year} OR month >= {start_month}) AND
307            (year > {start_year} OR month > {start_month} OR day >= {start_day}) AND
308            (year > {start_year} OR month > {start_month} OR day > {start_day} OR hour >= {start_hour})
309            AND
310            (year <= {end_year}) AND
311            (year < {end_year} OR month <= {end_month}) AND
312            (year < {end_year} OR month < {end_month} OR day <= {end_day}) AND
313            (year < {end_year} OR month < {end_month} OR day < {end_day} OR hour <= {end_hour})
314            -- Regular filters (inclusive)
315            AND created_at between TIMESTAMP '{}' AND TIMESTAMP '{}'
316            AND entity_id = {}
317        ),
318
319        subquery2 AS (
320            SELECT
321                created_at,
322                entity_id,
323                feature,
324                avg(value) as value
325            FROM subquery1
326            GROUP BY
327                created_at,
328                entity_id,
329                feature
330        )
331
332        SELECT
333        feature,
334        array_agg(created_at ORDER BY created_at DESC) as created_at,
335        array_agg(value ORDER BY created_at DESC) as values
336        FROM subquery2
337        GROUP BY
338        feature;"#,
339        bin,
340        start_time.to_rfc3339(),
341        end_time.to_rfc3339(),
342        entity_id
343    )
344}