scouter_dataframe/sql/
helper.rs

1// this is a helper file for generating sql queries to retrieve binned data from datafusion.
2use chrono::Datelike;
3use chrono::Timelike;
4use chrono::{DateTime, Utc};
5pub fn get_binned_custom_metric_values_query(
6    bin: &f64,
7    start_time: &DateTime<Utc>,
8    end_time: &DateTime<Utc>,
9    space: &str,
10    name: &str,
11    version: &str,
12) -> String {
13    format!(
14        r#"WITH subquery1 AS (
15    SELECT
16        date_bin(INTERVAL '{} minute', created_at, TIMESTAMP '1970-01-01') as created_at,
17        metric,
18        value
19    FROM binned_custom_metric
20    WHERE 
21        1=1
22        AND created_at between TIMESTAMP '{}' AND TIMESTAMP '{}'
23        AND space = '{}'
24        AND name = '{}'
25        AND version = '{}'
26    ),
27
28subquery2 AS (
29    SELECT
30        created_at,
31        metric,
32        avg(value) as average,
33        stddev(value) as standard_dev
34    FROM subquery1
35    GROUP BY 
36        created_at,
37        metric
38),
39
40subquery3 AS (
41    SELECT
42        created_at,
43        metric,
44        struct(
45            average as avg,
46            average - COALESCE(standard_dev, 0) as lower_bound,
47            average + COALESCE(standard_dev, 0) as upper_bound
48        ) as stats
49    FROM subquery2
50)
51
52SELECT 
53    metric,
54    array_agg(created_at) as created_at,
55    array_agg(stats) as stats
56FROM subquery3
57GROUP BY metric;"#,
58        bin,
59        start_time.to_rfc3339(),
60        end_time.to_rfc3339(),
61        space,
62        name,
63        version
64    )
65}
66
67pub fn get_binned_llm_metric_values_query(
68    bin: &f64,
69    start_time: &DateTime<Utc>,
70    end_time: &DateTime<Utc>,
71    space: &str,
72    name: &str,
73    version: &str,
74) -> String {
75    format!(
76        r#"WITH subquery1 AS (
77    SELECT
78        date_bin(INTERVAL '{} minute', created_at, TIMESTAMP '1970-01-01') as created_at,
79        metric,
80        value
81    FROM binned_llm_metric
82    WHERE 
83        1=1
84        AND created_at between TIMESTAMP '{}' AND TIMESTAMP '{}'
85        AND space = '{}'
86        AND name = '{}'
87        AND version = '{}'
88    ),
89
90subquery2 AS (
91    SELECT
92        created_at,
93        metric,
94        avg(value) as average,
95        stddev(value) as standard_dev
96    FROM subquery1
97    GROUP BY 
98        created_at,
99        metric
100),
101
102subquery3 AS (
103    SELECT
104        created_at,
105        metric,
106        struct(
107            average as avg,
108            average - COALESCE(standard_dev, 0) as lower_bound,
109            average + COALESCE(standard_dev, 0) as upper_bound
110        ) as stats
111    FROM subquery2
112)
113
114SELECT 
115    metric,
116    array_agg(created_at) as created_at,
117    array_agg(stats) as stats
118FROM subquery3
119GROUP BY metric;"#,
120        bin,
121        start_time.to_rfc3339(),
122        end_time.to_rfc3339(),
123        space,
124        name,
125        version
126    )
127}
128
129pub fn get_binned_psi_drift_records_query(
130    bin: &f64,
131    start_time: &DateTime<Utc>,
132    end_time: &DateTime<Utc>,
133    space: &str,
134    name: &str,
135    version: &str,
136) -> String {
137    format!(
138        r#"WITH feature_bin_total AS (
139        SELECT 
140            date_bin('{} minutes', created_at, TIMESTAMP '1970-01-01') as created_at,
141            name,
142            space,
143            version,
144            feature,
145            bin_id,
146            SUM(bin_count) AS bin_total_count
147        FROM binned_psi
148        WHERE 
149            1=1
150            AND created_at between TIMESTAMP '{}' AND TIMESTAMP '{}'
151            AND space = '{}'
152            AND name = '{}'
153            AND version = '{}'
154        GROUP BY 1, 2, 3, 4, 5, 6
155    ),
156
157    feature_total AS (
158        SELECT 
159            date_bin('{} minutes', created_at, TIMESTAMP '1970-01-01') as created_at,
160            name,
161            space,
162            version,
163            feature,
164            cast(SUM(bin_count) as float) AS feature_total_count
165        FROM binned_psi
166        WHERE 
167            1=1
168            AND created_at between TIMESTAMP '{}' AND TIMESTAMP '{}'
169            AND space = '{}'
170            AND name = '{}'
171            AND version = '{}'
172        GROUP BY 1, 2, 3, 4, 5
173    ),
174
175    feature_bin_proportions AS (
176        SELECT 
177            b.created_at,
178            b.feature,
179            f.feature_total_count,
180            b.bin_id,
181            cast(b.bin_total_count as float) / f.feature_total_count AS proportion
182        FROM feature_bin_total b
183        JOIN feature_total f
184            ON f.feature = b.feature 
185            AND f.version = b.version 
186            AND f.space = b.space
187            AND f.name = b.name
188            AND f.created_at = b.created_at
189    ),
190
191    overall_agg as (
192        SELECT 
193            feature,
194            struct(
195                array_agg(bin_id) as bin_id, 
196                array_agg(proportion) as proportion
197            ) as bins
198        FROM feature_bin_proportions
199        WHERE feature_total_count > 1
200        GROUP BY feature
201    ),
202
203    bin_agg as (
204        SELECT 
205            feature,
206            created_at,
207            struct(
208                array_agg(bin_id) as bin_id, 
209                array_agg(proportion) as proportion
210            ) AS bin_proportions
211        FROM feature_bin_proportions
212        WHERE feature_total_count > 1
213        GROUP BY 
214            feature, 
215            created_at
216    ),
217
218    feature_agg as (
219    select
220    feature,
221    array_agg(created_at order by created_at desc) as created_at,
222    array_agg(bin_proportions order by created_at desc) as bin_proportions
223    FROM bin_agg
224    WHERE 1=1
225    GROUP BY feature
226    )
227
228    SELECT 
229        feature_agg.feature,
230        created_at,
231        bin_proportions,
232        bins as overall_proportions
233    FROM feature_agg
234    JOIN overall_agg
235        ON overall_agg.feature = feature_agg.feature;"#,
236        bin,
237        start_time.to_rfc3339(),
238        end_time.to_rfc3339(),
239        space,
240        name,
241        version,
242        bin,
243        start_time.to_rfc3339(),
244        end_time.to_rfc3339(),
245        space,
246        name,
247        version
248    )
249}
250
251pub fn get_binned_spc_drift_records_query(
252    bin: &f64,
253    start_time: &DateTime<Utc>,
254    end_time: &DateTime<Utc>,
255    space: &str,
256    name: &str,
257    version: &str,
258) -> String {
259    let start_year = start_time.year();
260    let start_month = start_time.month();
261    let start_day = start_time.day();
262    let start_hour = start_time.hour();
263
264    let end_year = end_time.year();
265    let end_month = end_time.month();
266    let end_day = end_time.day();
267    let end_hour = end_time.hour();
268
269    format!(
270        r#"WITH subquery1 AS (
271        SELECT
272            date_bin('{} minutes', created_at, TIMESTAMP '1970-01-01') as created_at,
273            name,
274            space,
275            feature,
276            version,
277            value
278        FROM binned_spc
279        WHERE
280            -- Partition pruning predicates (inclusive)
281            (year >= {start_year}) AND
282            (year > {start_year} OR month >= {start_month}) AND
283            (year > {start_year} OR month > {start_month} OR day >= {start_day}) AND
284            (year > {start_year} OR month > {start_month} OR day > {start_day} OR hour >= {start_hour})
285            AND
286            (year <= {end_year}) AND
287            (year < {end_year} OR month <= {end_month}) AND
288            (year < {end_year} OR month < {end_month} OR day <= {end_day}) AND
289            (year < {end_year} OR month < {end_month} OR day < {end_day} OR hour <= {end_hour})
290            -- Regular filters (inclusive)
291            AND created_at between TIMESTAMP '{}' AND TIMESTAMP '{}'
292            AND space = '{}'
293            AND name = '{}'
294            AND version = '{}'
295        ),
296
297        subquery2 AS (
298            SELECT
299                created_at,
300                name,
301                space,
302                feature,
303                version,
304                avg(value) as value
305            FROM subquery1
306            GROUP BY 
307                created_at,
308                name,
309                space,
310                feature,
311                version
312        )
313
314        SELECT
315        feature,
316        array_agg(created_at ORDER BY created_at DESC) as created_at,
317        array_agg(value ORDER BY created_at DESC) as values
318        FROM subquery2
319        GROUP BY 
320        feature;"#,
321        bin,
322        start_time.to_rfc3339(),
323        end_time.to_rfc3339(),
324        space,
325        name,
326        version
327    )
328}