scouter_dataframe/sql/
helper.rs

1// this is a helper file for generating sql queries to retrieve binned data from datafusion.
2use chrono::Datelike;
3use chrono::Timelike;
4use chrono::{DateTime, Utc};
5pub fn get_binned_custom_metric_values_query(
6    bin: &f64,
7    start_time: &DateTime<Utc>,
8    end_time: &DateTime<Utc>,
9    space: &str,
10    name: &str,
11    version: &str,
12) -> String {
13    format!(
14        r#"WITH subquery1 AS (
15    SELECT
16        date_bin(INTERVAL '{} minute', created_at, TIMESTAMP '1970-01-01') as created_at,
17        metric,
18        value
19    FROM binned_custom_metric
20    WHERE 
21        1=1
22        AND created_at between TIMESTAMP '{}' AND TIMESTAMP '{}'
23        AND space = '{}'
24        AND name = '{}'
25        AND version = '{}'
26    ),
27
28subquery2 AS (
29    SELECT
30        created_at,
31        metric,
32        avg(value) as average,
33        stddev(value) as standard_dev
34    FROM subquery1
35    GROUP BY 
36        created_at,
37        metric
38),
39
40subquery3 AS (
41    SELECT
42        created_at,
43        metric,
44        struct(
45            average as avg,
46            average - COALESCE(standard_dev, 0) as lower_bound,
47            average + COALESCE(standard_dev, 0) as upper_bound
48        ) as stats
49    FROM subquery2
50)
51
52SELECT 
53    metric,
54    array_agg(created_at) as created_at,
55    array_agg(stats) as stats
56FROM subquery3
57GROUP BY metric;"#,
58        bin,
59        start_time.to_rfc3339(),
60        end_time.to_rfc3339(),
61        space,
62        name,
63        version
64    )
65}
66
67pub fn get_binned_psi_drift_records_query(
68    bin: &f64,
69    start_time: &DateTime<Utc>,
70    end_time: &DateTime<Utc>,
71    space: &str,
72    name: &str,
73    version: &str,
74) -> String {
75    format!(
76        r#"WITH feature_bin_total AS (
77        SELECT 
78            date_bin('{} minutes', created_at, TIMESTAMP '1970-01-01') as created_at,
79            name,
80            space,
81            version,
82            feature,
83            bin_id,
84            SUM(bin_count) AS bin_total_count
85        FROM binned_psi
86        WHERE 
87            1=1
88            AND created_at between TIMESTAMP '{}' AND TIMESTAMP '{}'
89            AND space = '{}'
90            AND name = '{}'
91            AND version = '{}'
92        GROUP BY 1, 2, 3, 4, 5, 6
93    ),
94
95    feature_total AS (
96        SELECT 
97            date_bin('{} minutes', created_at, TIMESTAMP '1970-01-01') as created_at,
98            name,
99            space,
100            version,
101            feature,
102            cast(SUM(bin_count) as float) AS feature_total_count
103        FROM binned_psi
104        WHERE 
105            1=1
106            AND created_at between TIMESTAMP '{}' AND TIMESTAMP '{}'
107            AND space = '{}'
108            AND name = '{}'
109            AND version = '{}'
110        GROUP BY 1, 2, 3, 4, 5
111    ),
112
113    feature_bin_proportions AS (
114        SELECT 
115            b.created_at,
116            b.feature,
117            f.feature_total_count,
118            b.bin_id,
119            cast(b.bin_total_count as float) / f.feature_total_count AS proportion
120        FROM feature_bin_total b
121        JOIN feature_total f
122            ON f.feature = b.feature 
123            AND f.version = b.version 
124            AND f.space = b.space
125            AND f.name = b.name
126            AND f.created_at = b.created_at
127    ),
128
129    overall_agg as (
130        SELECT 
131            feature,
132            struct(
133                array_agg(bin_id) as bin_id, 
134                array_agg(proportion) as proportion
135            ) as bins
136        FROM feature_bin_proportions
137        WHERE feature_total_count > 1
138        GROUP BY feature
139    ),
140
141    bin_agg as (
142        SELECT 
143            feature,
144            created_at,
145            struct(
146                array_agg(bin_id) as bin_id, 
147                array_agg(proportion) as proportion
148            ) AS bin_proportions
149        FROM feature_bin_proportions
150        WHERE feature_total_count > 1
151        GROUP BY 
152            feature, 
153            created_at
154    ),
155
156    feature_agg as (
157    select
158    feature,
159    array_agg(created_at order by created_at desc) as created_at,
160    array_agg(bin_proportions order by created_at desc) as bin_proportions
161    FROM bin_agg
162    WHERE 1=1
163    GROUP BY feature
164    )
165
166    SELECT 
167        feature_agg.feature,
168        created_at,
169        bin_proportions,
170        bins as overall_proportions
171    FROM feature_agg
172    JOIN overall_agg
173        ON overall_agg.feature = feature_agg.feature;"#,
174        bin,
175        start_time.to_rfc3339(),
176        end_time.to_rfc3339(),
177        space,
178        name,
179        version,
180        bin,
181        start_time.to_rfc3339(),
182        end_time.to_rfc3339(),
183        space,
184        name,
185        version
186    )
187}
188
189pub fn get_binned_spc_drift_records_query(
190    bin: &f64,
191    start_time: &DateTime<Utc>,
192    end_time: &DateTime<Utc>,
193    space: &str,
194    name: &str,
195    version: &str,
196) -> String {
197    let start_year = start_time.year();
198    let start_month = start_time.month();
199    let start_day = start_time.day();
200    let start_hour = start_time.hour();
201
202    let end_year = end_time.year();
203    let end_month = end_time.month();
204    let end_day = end_time.day();
205    let end_hour = end_time.hour();
206
207    format!(
208        r#"WITH subquery1 AS (
209        SELECT
210            date_bin('{} minutes', created_at, TIMESTAMP '1970-01-01') as created_at,
211            name,
212            space,
213            feature,
214            version,
215            value
216        FROM binned_spc
217        WHERE
218            -- Partition pruning predicates (inclusive)
219            (year >= {start_year}) AND
220            (year > {start_year} OR month >= {start_month}) AND
221            (year > {start_year} OR month > {start_month} OR day >= {start_day}) AND
222            (year > {start_year} OR month > {start_month} OR day > {start_day} OR hour >= {start_hour})
223            AND
224            (year <= {end_year}) AND
225            (year < {end_year} OR month <= {end_month}) AND
226            (year < {end_year} OR month < {end_month} OR day <= {end_day}) AND
227            (year < {end_year} OR month < {end_month} OR day < {end_day} OR hour <= {end_hour})
228            -- Regular filters (inclusive)
229            AND created_at between TIMESTAMP '{}' AND TIMESTAMP '{}'
230            AND space = '{}'
231            AND name = '{}'
232            AND version = '{}'
233        ),
234
235        subquery2 AS (
236            SELECT
237                created_at,
238                name,
239                space,
240                feature,
241                version,
242                avg(value) as value
243            FROM subquery1
244            GROUP BY 
245                created_at,
246                name,
247                space,
248                feature,
249                version
250        )
251
252        SELECT
253        feature,
254        array_agg(created_at ORDER BY created_at DESC) as created_at,
255        array_agg(value ORDER BY created_at DESC) as values
256        FROM subquery2
257        GROUP BY 
258        feature;"#,
259        bin,
260        start_time.to_rfc3339(),
261        end_time.to_rfc3339(),
262        space,
263        name,
264        version
265    )
266}