Skip to main content

scouter_dataframe/sql/
helper.rs

1// this is a helper file for generating sql queries to retrieve binned data from datafusion.
2use chrono::Datelike;
3use chrono::Timelike;
4use chrono::{DateTime, Utc};
5pub fn get_binned_custom_metric_values_query(
6    bin: &f64,
7    start_time: &DateTime<Utc>,
8    end_time: &DateTime<Utc>,
9    entity_id: &i32,
10) -> String {
11    format!(
12        r#"WITH subquery1 AS (
13    SELECT
14        date_bin(INTERVAL '{} minute', created_at, TIMESTAMP '1970-01-01') as created_at,
15        metric,
16        value
17    FROM binned_custom_metric
18    WHERE
19        1=1
20        AND created_at between TIMESTAMP '{}' AND TIMESTAMP '{}'
21        AND entity_id = {}
22    ),
23
24subquery2 AS (
25    SELECT
26        created_at,
27        metric,
28        avg(value) as average,
29        stddev(value) as standard_dev
30    FROM subquery1
31    GROUP BY
32        created_at,
33        metric
34),
35
36subquery3 AS (
37    SELECT
38        created_at,
39        metric,
40        struct(
41            average as avg,
42            average - COALESCE(standard_dev, 0) as lower_bound,
43            average + COALESCE(standard_dev, 0) as upper_bound
44        ) as stats
45    FROM subquery2
46)
47
48SELECT
49    metric,
50    array_agg(created_at) as created_at,
51    array_agg(stats) as stats
52FROM subquery3
53GROUP BY metric;"#,
54        bin,
55        start_time.to_rfc3339(),
56        end_time.to_rfc3339(),
57        entity_id
58    )
59}
60
61pub fn get_binned_genai_task_values_query(
62    bin: &f64,
63    start_time: &DateTime<Utc>,
64    end_time: &DateTime<Utc>,
65    entity_id: &i32,
66) -> String {
67    format!(
68        r#"WITH subquery1 AS (
69    SELECT
70        date_bin(INTERVAL '{} minute', created_at, TIMESTAMP '1970-01-01') as created_at,
71        task_id as metric,
72        value,
73        passed
74    FROM binned_genai_task
75    WHERE
76        created_at BETWEEN TIMESTAMP '{}' AND TIMESTAMP '{}'
77        AND entity_id = {}
78),
79
80subquery2 AS (
81    SELECT
82        created_at,
83        metric,
84        avg(value) as average,
85        stddev(value) as standard_dev,
86        sum(CASE WHEN passed THEN 1 ELSE 0 END)::FLOAT / count(*)::FLOAT as pass_rate
87    FROM subquery1
88    GROUP BY
89        created_at,
90        metric
91),
92
93subquery3 AS (
94    SELECT
95        created_at,
96        metric,
97        struct(
98            average as avg,
99            average - COALESCE(standard_dev, 0) as lower_bound,
100            average + COALESCE(standard_dev, 0) as upper_bound,
101            pass_rate
102        ) as stats
103    FROM subquery2
104)
105
106SELECT
107    metric,
108    array_agg(created_at ORDER BY created_at) as created_at,
109    array_agg(stats ORDER BY created_at) as stats
110FROM subquery3
111GROUP BY metric;"#,
112        bin,
113        start_time.to_rfc3339(),
114        end_time.to_rfc3339(),
115        entity_id
116    )
117}
118
119pub fn get_binned_genai_workflow_values_query(
120    bin: &f64,
121    start_time: &DateTime<Utc>,
122    end_time: &DateTime<Utc>,
123    entity_id: &i32,
124) -> String {
125    format!(
126        r#"WITH subquery1 AS (
127    SELECT
128        date_bin(INTERVAL '{} minute', created_at, TIMESTAMP '1970-01-01') as created_at,
129        metric,
130        pass_rate as value
131    FROM binned_genai_workflow
132    WHERE
133        1=1
134        AND created_at between TIMESTAMP '{}' AND TIMESTAMP '{}'
135        AND entity_id = {}
136    ),
137
138subquery2 AS (
139    SELECT
140        created_at,
141        metric,
142        avg(value) as average,
143        stddev(value) as standard_dev
144    FROM subquery1
145    GROUP BY
146        created_at,
147        metric
148),
149
150subquery3 AS (
151    SELECT
152        created_at,
153        metric,
154        struct(
155            average as avg,
156            average - COALESCE(standard_dev, 0) as lower_bound,
157            average + COALESCE(standard_dev, 0) as upper_bound
158        ) as stats
159    FROM subquery2
160)
161
162SELECT
163    metric,
164    array_agg(created_at) as created_at,
165    array_agg(stats) as stats
166FROM subquery3
167GROUP BY metric;"#,
168        bin,
169        start_time.to_rfc3339(),
170        end_time.to_rfc3339(),
171        entity_id
172    )
173}
174
175pub fn get_binned_psi_drift_records_query(
176    bin: &f64,
177    start_time: &DateTime<Utc>,
178    end_time: &DateTime<Utc>,
179    entity_id: &i32,
180) -> String {
181    format!(
182        r#"WITH feature_bin_total AS (
183        SELECT
184            date_bin('{} minutes', created_at, TIMESTAMP '1970-01-01') as created_at,
185            entity_id,
186            feature,
187            bin_id,
188            SUM(bin_count) AS bin_total_count
189        FROM binned_psi
190        WHERE
191            1=1
192            AND created_at between TIMESTAMP '{}' AND TIMESTAMP '{}'
193            AND entity_id = {}
194        GROUP BY 1, 2, 3, 4
195    ),
196
197    feature_total AS (
198        SELECT
199            date_bin('{} minutes', created_at, TIMESTAMP '1970-01-01') as created_at,
200            entity_id,
201            feature,
202            cast(SUM(bin_count) as float) AS feature_total_count
203        FROM binned_psi
204        WHERE
205            1=1
206            AND created_at between TIMESTAMP '{}' AND TIMESTAMP '{}'
207            AND entity_id = {}
208        GROUP BY 1, 2, 3
209    ),
210
211    feature_bin_proportions AS (
212        SELECT
213            b.created_at,
214            b.feature,
215            f.feature_total_count,
216            b.bin_id,
217            cast(b.bin_total_count as float) / f.feature_total_count AS proportion
218        FROM feature_bin_total b
219        JOIN feature_total f
220            ON f.feature = b.feature
221            AND f.entity_id = b.entity_id
222            AND f.created_at = b.created_at
223    ),
224
225    overall_agg as (
226        SELECT
227            feature,
228            struct(
229                array_agg(bin_id) as bin_id,
230                array_agg(proportion) as proportion
231            ) as bins
232        FROM feature_bin_proportions
233        WHERE feature_total_count > 1
234        GROUP BY feature
235    ),
236
237    bin_agg as (
238        SELECT
239            feature,
240            created_at,
241            struct(
242                array_agg(bin_id) as bin_id,
243                array_agg(proportion) as proportion
244            ) AS bin_proportions
245        FROM feature_bin_proportions
246        WHERE feature_total_count > 1
247        GROUP BY
248            feature,
249            created_at
250    ),
251
252    feature_agg as (
253    select
254    feature,
255    array_agg(created_at order by created_at desc) as created_at,
256    array_agg(bin_proportions order by created_at desc) as bin_proportions
257    FROM bin_agg
258    WHERE 1=1
259    GROUP BY feature
260    )
261
262    SELECT
263        feature_agg.feature,
264        created_at,
265        bin_proportions,
266        bins as overall_proportions
267    FROM feature_agg
268    JOIN overall_agg
269        ON overall_agg.feature = feature_agg.feature;"#,
270        bin,
271        start_time.to_rfc3339(),
272        end_time.to_rfc3339(),
273        entity_id,
274        bin,
275        start_time.to_rfc3339(),
276        end_time.to_rfc3339(),
277        entity_id,
278    )
279}
280
281pub fn get_binned_spc_drift_records_query(
282    bin: &f64,
283    start_time: &DateTime<Utc>,
284    end_time: &DateTime<Utc>,
285    entity_id: &i32,
286) -> String {
287    let start_year = start_time.year();
288    let start_month = start_time.month();
289    let start_day = start_time.day();
290    let start_hour = start_time.hour();
291
292    let end_year = end_time.year();
293    let end_month = end_time.month();
294    let end_day = end_time.day();
295    let end_hour = end_time.hour();
296
297    format!(
298        r#"WITH subquery1 AS (
299        SELECT
300            date_bin('{} minutes', created_at, TIMESTAMP '1970-01-01') as created_at,
301            entity_id,
302            feature,
303            value
304        FROM binned_spc
305        WHERE
306            -- Partition pruning predicates (inclusive)
307            (year >= {start_year}) AND
308            (year > {start_year} OR month >= {start_month}) AND
309            (year > {start_year} OR month > {start_month} OR day >= {start_day}) AND
310            (year > {start_year} OR month > {start_month} OR day > {start_day} OR hour >= {start_hour})
311            AND
312            (year <= {end_year}) AND
313            (year < {end_year} OR month <= {end_month}) AND
314            (year < {end_year} OR month < {end_month} OR day <= {end_day}) AND
315            (year < {end_year} OR month < {end_month} OR day < {end_day} OR hour <= {end_hour})
316            -- Regular filters (inclusive)
317            AND created_at between TIMESTAMP '{}' AND TIMESTAMP '{}'
318            AND entity_id = {}
319        ),
320
321        subquery2 AS (
322            SELECT
323                created_at,
324                entity_id,
325                feature,
326                avg(value) as value
327            FROM subquery1
328            GROUP BY
329                created_at,
330                entity_id,
331                feature
332        )
333
334        SELECT
335        feature,
336        array_agg(created_at ORDER BY created_at DESC) as created_at,
337        array_agg(value ORDER BY created_at DESC) as values
338        FROM subquery2
339        GROUP BY
340        feature;"#,
341        bin,
342        start_time.to_rfc3339(),
343        end_time.to_rfc3339(),
344        entity_id
345    )
346}