Skip to main content

veloq_query/sql/
window.rs

1use crate::sql::SqlFragment;
2use duckdb::types::Value;
3use veloq_core::WindowRef;
4
5/// Half-open point-sample predicate: `expr >= window_start AND expr < window_end`.
6///
7/// Bind order is always `start, end`.
8pub fn point_filter(expr: &str, window: Option<(i64, i64)>) -> Option<SqlFragment> {
9    let (start, end) = WindowRef::from_option(window).bounds()?;
10    Some(SqlFragment::new(
11        format!("{expr} >= ? AND {expr} < ?"),
12        vec![Value::BigInt(start), Value::BigInt(end)],
13    ))
14}
15
16/// `alias.start < window_end AND alias."end" > window_start`.
17///
18/// Bind order is always `end, start`.
19pub fn overlap_filter(alias: &str, window: Option<(i64, i64)>) -> Option<SqlFragment> {
20    let (start, end) = WindowRef::from_option(window).bounds()?;
21    Some(SqlFragment::new(
22        format!(r#"{alias}.start < ? AND {alias}."end" > ?"#),
23        vec![Value::BigInt(end), Value::BigInt(start)],
24    ))
25}
26
27/// Overlap predicate for callers that already projected start/end
28/// expressions into an outer CTE.
29///
30/// Bind order is always `end, start`.
31pub fn overlap_filter_expr(
32    start_expr: &str,
33    end_expr: &str,
34    window: Option<(i64, i64)>,
35) -> Option<SqlFragment> {
36    let (start, end) = WindowRef::from_option(window).bounds()?;
37    Some(SqlFragment::new(
38        format!("{start_expr} < ? AND {end_expr} > ?"),
39        vec![Value::BigInt(end), Value::BigInt(start)],
40    ))
41}
42
43/// Clipped duration expression for a `t`-style start/end interval.
44///
45/// Bind order is `end, start`; when paired with [`overlap_filter`] and
46/// appended before it, the total order is `end, start, end, start`.
47pub fn clipped_duration_expr(alias: &str, window: Option<(i64, i64)>) -> SqlFragment {
48    match WindowRef::from_option(window).bounds() {
49        Some((start, end)) => SqlFragment::new(
50            format!(r#"LEAST({alias}."end", ?) - GREATEST({alias}.start, ?)"#),
51            vec![Value::BigInt(end), Value::BigInt(start)],
52        ),
53        None => SqlFragment::new(format!(r#"({alias}."end" - {alias}.start)"#), Vec::new()),
54    }
55}
56
57/// Duration clipped to a bucket interval.
58pub fn bucket_clipped_duration_expr(
59    start_expr: &str,
60    end_expr: &str,
61    bucket_start_expr: &str,
62    bucket_end_expr: &str,
63) -> String {
64    format!("LEAST({end_expr}, {bucket_end_expr}) - GREATEST({start_expr}, {bucket_start_expr})")
65}
66
67/// Scope for positive interval samples that may be clipped to a user
68/// time window before aggregation.
69#[derive(Debug, Clone, PartialEq)]
70pub struct IntervalSampleScope {
71    pub start_expr: String,
72    pub end_expr: String,
73    pub where_clause: String,
74    pub params: Vec<Value>,
75}
76
77/// Positive interval sample scope.
78///
79/// Always rejects invalid intervals with
80/// `start_expr >= 0 AND end_expr > start_expr`. When a window is present,
81/// projected start/end expressions are clipped before downstream bucket
82/// math. Bind order is `start, end` for clipped expressions, then
83/// `start, end` for the overlap predicate.
84pub fn positive_interval_sample_scope(
85    start_expr: &str,
86    end_expr: &str,
87    window: Option<(i64, i64)>,
88) -> IntervalSampleScope {
89    let mut predicates = vec![format!("{start_expr} >= 0 AND {end_expr} > {start_expr}")];
90    let mut params = Vec::new();
91
92    let (scoped_start_expr, scoped_end_expr) = match WindowRef::from_option(window).bounds() {
93        Some((start, end)) => {
94            params.push(Value::BigInt(start));
95            params.push(Value::BigInt(end));
96            predicates.push(format!("{end_expr} > ? AND {start_expr} < ?"));
97            params.push(Value::BigInt(start));
98            params.push(Value::BigInt(end));
99            (
100                format!("GREATEST({start_expr}, ?)"),
101                format!("LEAST({end_expr}, ?)"),
102            )
103        }
104        None => (start_expr.to_string(), end_expr.to_string()),
105    };
106
107    IntervalSampleScope {
108        start_expr: scoped_start_expr,
109        end_expr: scoped_end_expr,
110        where_clause: format!("WHERE {}", predicates.join(" AND ")),
111        params,
112    }
113}
114
115#[cfg(test)]
116mod tests {
117    use super::*;
118
119    #[test]
120    fn point_filter_binds_start_then_end() -> anyhow::Result<()> {
121        let fragment = point_filter("m.timestamp", Some((10, 20)))
122            .ok_or_else(|| anyhow::anyhow!("window should emit fragment"))?;
123        assert_eq!(fragment.sql, "m.timestamp >= ? AND m.timestamp < ?");
124        assert_eq!(fragment.params, vec![Value::BigInt(10), Value::BigInt(20)]);
125        Ok(())
126    }
127
128    #[test]
129    fn overlap_filter_binds_end_then_start() -> anyhow::Result<()> {
130        let fragment = overlap_filter("t", Some((10, 20)))
131            .ok_or_else(|| anyhow::anyhow!("window should emit fragment"))?;
132        assert_eq!(fragment.sql, r#"t.start < ? AND t."end" > ?"#);
133        assert_eq!(fragment.params, vec![Value::BigInt(20), Value::BigInt(10)]);
134        Ok(())
135    }
136
137    #[test]
138    fn overlap_expr_binds_end_then_start() -> anyhow::Result<()> {
139        let fragment = overlap_filter_expr("start_ns", "end_ns", Some((11, 22)))
140            .ok_or_else(|| anyhow::anyhow!("window should emit fragment"))?;
141        assert_eq!(fragment.sql, "start_ns < ? AND end_ns > ?");
142        assert_eq!(fragment.params, vec![Value::BigInt(22), Value::BigInt(11)]);
143        Ok(())
144    }
145
146    #[test]
147    fn clipped_duration_binds_end_then_start() {
148        let fragment = clipped_duration_expr("t", Some((10, 20)));
149        assert_eq!(fragment.sql, r#"LEAST(t."end", ?) - GREATEST(t.start, ?)"#);
150        assert_eq!(fragment.params, vec![Value::BigInt(20), Value::BigInt(10)]);
151    }
152
153    #[test]
154    fn absent_window_emits_plain_duration_without_params() {
155        let fragment = clipped_duration_expr("t", None);
156        assert_eq!(fragment.sql, r#"(t."end" - t.start)"#);
157        assert!(fragment.params.is_empty());
158    }
159
160    #[test]
161    fn bucket_clipped_duration_uses_bucket_bounds_without_params() {
162        assert_eq!(
163            bucket_clipped_duration_expr("s", "e", "bucket_start", "bucket_end"),
164            "LEAST(e, bucket_end) - GREATEST(s, bucket_start)"
165        );
166    }
167
168    #[test]
169    fn positive_interval_sample_scope_clips_and_binds_start_end_pairs() {
170        let scope =
171            positive_interval_sample_scope("m.timestamp", "m.timestamp + m.duration", Some((5, 9)));
172
173        assert_eq!(scope.start_expr, "GREATEST(m.timestamp, ?)");
174        assert_eq!(scope.end_expr, "LEAST(m.timestamp + m.duration, ?)");
175        assert_eq!(
176            scope.where_clause,
177            "WHERE m.timestamp >= 0 AND m.timestamp + m.duration > m.timestamp AND m.timestamp + m.duration > ? AND m.timestamp < ?"
178        );
179        assert_eq!(
180            scope.params,
181            vec![
182                Value::BigInt(5),
183                Value::BigInt(9),
184                Value::BigInt(5),
185                Value::BigInt(9)
186            ]
187        );
188    }
189
190    #[test]
191    fn positive_interval_sample_scope_without_window_only_validates_interval() {
192        let scope = positive_interval_sample_scope("start", "end", None);
193
194        assert_eq!(scope.start_expr, "start");
195        assert_eq!(scope.end_expr, "end");
196        assert_eq!(scope.where_clause, "WHERE start >= 0 AND end > start");
197        assert!(scope.params.is_empty());
198    }
199}