1use crate::sql::SqlFragment;
2use duckdb::types::Value;
3use veloq_core::WindowRef;
4
5pub fn point_filter(expr: &str, window: Option<(i64, i64)>) -> Option<SqlFragment> {
9 let (start, end) = WindowRef::from_option(window).bounds()?;
10 Some(SqlFragment::new(
11 format!("{expr} >= ? AND {expr} < ?"),
12 vec![Value::BigInt(start), Value::BigInt(end)],
13 ))
14}
15
16pub fn overlap_filter(alias: &str, window: Option<(i64, i64)>) -> Option<SqlFragment> {
20 let (start, end) = WindowRef::from_option(window).bounds()?;
21 Some(SqlFragment::new(
22 format!(r#"{alias}.start < ? AND {alias}."end" > ?"#),
23 vec![Value::BigInt(end), Value::BigInt(start)],
24 ))
25}
26
27pub fn overlap_filter_expr(
32 start_expr: &str,
33 end_expr: &str,
34 window: Option<(i64, i64)>,
35) -> Option<SqlFragment> {
36 let (start, end) = WindowRef::from_option(window).bounds()?;
37 Some(SqlFragment::new(
38 format!("{start_expr} < ? AND {end_expr} > ?"),
39 vec![Value::BigInt(end), Value::BigInt(start)],
40 ))
41}
42
43pub fn clipped_duration_expr(alias: &str, window: Option<(i64, i64)>) -> SqlFragment {
48 match WindowRef::from_option(window).bounds() {
49 Some((start, end)) => SqlFragment::new(
50 format!(r#"LEAST({alias}."end", ?) - GREATEST({alias}.start, ?)"#),
51 vec![Value::BigInt(end), Value::BigInt(start)],
52 ),
53 None => SqlFragment::new(format!(r#"({alias}."end" - {alias}.start)"#), Vec::new()),
54 }
55}
56
57pub fn bucket_clipped_duration_expr(
59 start_expr: &str,
60 end_expr: &str,
61 bucket_start_expr: &str,
62 bucket_end_expr: &str,
63) -> String {
64 format!("LEAST({end_expr}, {bucket_end_expr}) - GREATEST({start_expr}, {bucket_start_expr})")
65}
66
67#[derive(Debug, Clone, PartialEq)]
70pub struct IntervalSampleScope {
71 pub start_expr: String,
72 pub end_expr: String,
73 pub where_clause: String,
74 pub params: Vec<Value>,
75}
76
77pub fn positive_interval_sample_scope(
85 start_expr: &str,
86 end_expr: &str,
87 window: Option<(i64, i64)>,
88) -> IntervalSampleScope {
89 let mut predicates = vec![format!("{start_expr} >= 0 AND {end_expr} > {start_expr}")];
90 let mut params = Vec::new();
91
92 let (scoped_start_expr, scoped_end_expr) = match WindowRef::from_option(window).bounds() {
93 Some((start, end)) => {
94 params.push(Value::BigInt(start));
95 params.push(Value::BigInt(end));
96 predicates.push(format!("{end_expr} > ? AND {start_expr} < ?"));
97 params.push(Value::BigInt(start));
98 params.push(Value::BigInt(end));
99 (
100 format!("GREATEST({start_expr}, ?)"),
101 format!("LEAST({end_expr}, ?)"),
102 )
103 }
104 None => (start_expr.to_string(), end_expr.to_string()),
105 };
106
107 IntervalSampleScope {
108 start_expr: scoped_start_expr,
109 end_expr: scoped_end_expr,
110 where_clause: format!("WHERE {}", predicates.join(" AND ")),
111 params,
112 }
113}
114
115#[cfg(test)]
116mod tests {
117 use super::*;
118
119 #[test]
120 fn point_filter_binds_start_then_end() -> anyhow::Result<()> {
121 let fragment = point_filter("m.timestamp", Some((10, 20)))
122 .ok_or_else(|| anyhow::anyhow!("window should emit fragment"))?;
123 assert_eq!(fragment.sql, "m.timestamp >= ? AND m.timestamp < ?");
124 assert_eq!(fragment.params, vec![Value::BigInt(10), Value::BigInt(20)]);
125 Ok(())
126 }
127
128 #[test]
129 fn overlap_filter_binds_end_then_start() -> anyhow::Result<()> {
130 let fragment = overlap_filter("t", Some((10, 20)))
131 .ok_or_else(|| anyhow::anyhow!("window should emit fragment"))?;
132 assert_eq!(fragment.sql, r#"t.start < ? AND t."end" > ?"#);
133 assert_eq!(fragment.params, vec![Value::BigInt(20), Value::BigInt(10)]);
134 Ok(())
135 }
136
137 #[test]
138 fn overlap_expr_binds_end_then_start() -> anyhow::Result<()> {
139 let fragment = overlap_filter_expr("start_ns", "end_ns", Some((11, 22)))
140 .ok_or_else(|| anyhow::anyhow!("window should emit fragment"))?;
141 assert_eq!(fragment.sql, "start_ns < ? AND end_ns > ?");
142 assert_eq!(fragment.params, vec![Value::BigInt(22), Value::BigInt(11)]);
143 Ok(())
144 }
145
146 #[test]
147 fn clipped_duration_binds_end_then_start() {
148 let fragment = clipped_duration_expr("t", Some((10, 20)));
149 assert_eq!(fragment.sql, r#"LEAST(t."end", ?) - GREATEST(t.start, ?)"#);
150 assert_eq!(fragment.params, vec![Value::BigInt(20), Value::BigInt(10)]);
151 }
152
153 #[test]
154 fn absent_window_emits_plain_duration_without_params() {
155 let fragment = clipped_duration_expr("t", None);
156 assert_eq!(fragment.sql, r#"(t."end" - t.start)"#);
157 assert!(fragment.params.is_empty());
158 }
159
160 #[test]
161 fn bucket_clipped_duration_uses_bucket_bounds_without_params() {
162 assert_eq!(
163 bucket_clipped_duration_expr("s", "e", "bucket_start", "bucket_end"),
164 "LEAST(e, bucket_end) - GREATEST(s, bucket_start)"
165 );
166 }
167
168 #[test]
169 fn positive_interval_sample_scope_clips_and_binds_start_end_pairs() {
170 let scope =
171 positive_interval_sample_scope("m.timestamp", "m.timestamp + m.duration", Some((5, 9)));
172
173 assert_eq!(scope.start_expr, "GREATEST(m.timestamp, ?)");
174 assert_eq!(scope.end_expr, "LEAST(m.timestamp + m.duration, ?)");
175 assert_eq!(
176 scope.where_clause,
177 "WHERE m.timestamp >= 0 AND m.timestamp + m.duration > m.timestamp AND m.timestamp + m.duration > ? AND m.timestamp < ?"
178 );
179 assert_eq!(
180 scope.params,
181 vec![
182 Value::BigInt(5),
183 Value::BigInt(9),
184 Value::BigInt(5),
185 Value::BigInt(9)
186 ]
187 );
188 }
189
190 #[test]
191 fn positive_interval_sample_scope_without_window_only_validates_interval() {
192 let scope = positive_interval_sample_scope("start", "end", None);
193
194 assert_eq!(scope.start_expr, "start");
195 assert_eq!(scope.end_expr, "end");
196 assert_eq!(scope.where_clause, "WHERE start >= 0 AND end > start");
197 assert!(scope.params.is_empty());
198 }
199}