1use crate::expr::SqlExpr;
14
15use super::frame::{build_peer_groups, evaluate_frame_bounds};
16use super::helpers::{as_f64, get_field, set_window_col};
17use super::running::running_aggregate;
18use super::spec::{FrameBound, WindowFuncSpec};
19
20pub(super) fn apply_aggregate_window(
21 rows: &mut [(String, serde_json::Value)],
22 indices: &[usize],
23 spec: &WindowFuncSpec,
24) {
25 let field = spec
26 .args
27 .first()
28 .and_then(|e| match e {
29 SqlExpr::Column(c) => Some(c.as_str()),
30 _ => None,
31 })
32 .unwrap_or("*");
33
34 let use_running = spec.frame.mode == "range"
38 && matches!(spec.frame.start, FrameBound::UnboundedPreceding)
39 && matches!(spec.frame.end, FrameBound::CurrentRow);
40
41 if use_running {
42 running_aggregate(rows, indices, spec, field);
43 return;
44 }
45
46 per_row_aggregate(rows, indices, spec, field);
47}
48
49fn per_row_aggregate(
57 rows: &mut [(String, serde_json::Value)],
58 indices: &[usize],
59 spec: &WindowFuncSpec,
60 field: &str,
61) {
62 let len = indices.len();
63 if len == 0 {
64 return;
65 }
66
67 let order_col = spec.order_by.first().map(|(col, _)| col.as_str());
69 let order_values: Vec<serde_json::Value> = indices
70 .iter()
71 .map(|&i| {
72 order_col
73 .map(|col| get_field(&rows[i].1, col))
74 .unwrap_or(serde_json::Value::Null)
75 })
76 .collect();
77
78 let peer_groups: Vec<usize> = if spec.frame.mode == "groups" {
81 build_peer_groups(&order_values)
82 } else {
83 Vec::new()
84 };
85
86 let all_vals: Vec<Option<f64>> = indices
88 .iter()
89 .map(|&i| as_f64(&get_field(&rows[i].1, field)))
90 .collect();
91
92 let results: Vec<serde_json::Value> = (0..len)
95 .map(|pos| {
96 let (start_idx, end_idx) =
97 evaluate_frame_bounds(&spec.frame, pos, len, &order_values, &peer_groups);
98
99 aggregate_slice(&all_vals, indices, rows, field, spec, start_idx, end_idx)
100 })
101 .collect();
102
103 for (pos, result) in results.into_iter().enumerate() {
104 let row_idx = indices[pos];
105 set_window_col(&mut rows[row_idx].1, &spec.alias, result);
106 }
107}
108
109fn aggregate_slice(
111 all_vals: &[Option<f64>],
112 indices: &[usize],
113 rows: &[(String, serde_json::Value)],
114 field: &str,
115 spec: &WindowFuncSpec,
116 start_idx: usize,
117 end_idx: usize,
118) -> serde_json::Value {
119 let slice_vals: Vec<f64> = all_vals[start_idx..=end_idx]
120 .iter()
121 .filter_map(|v| *v)
122 .collect();
123 let slice_count = end_idx - start_idx + 1;
124
125 match spec.func_name.as_str() {
126 "sum" => {
127 let rt = crate::simd_agg::ts_runtime();
128 serde_json::json!((rt.sum_f64)(&slice_vals))
129 }
130 "count" => serde_json::json!(slice_count),
131 "avg" => {
132 if slice_vals.is_empty() {
133 serde_json::Value::Null
134 } else {
135 let rt = crate::simd_agg::ts_runtime();
136 serde_json::json!((rt.sum_f64)(&slice_vals) / slice_vals.len() as f64)
137 }
138 }
139 "min" => {
140 if slice_vals.is_empty() {
141 serde_json::Value::Null
142 } else {
143 let rt = crate::simd_agg::ts_runtime();
144 serde_json::json!((rt.min_f64)(&slice_vals))
145 }
146 }
147 "max" => {
148 if slice_vals.is_empty() {
149 serde_json::Value::Null
150 } else {
151 let rt = crate::simd_agg::ts_runtime();
152 serde_json::json!((rt.max_f64)(&slice_vals))
153 }
154 }
155 "first_value" => indices
156 .get(start_idx)
157 .map(|&i| get_field(&rows[i].1, field))
158 .unwrap_or(serde_json::Value::Null),
159 "last_value" => indices
160 .get(end_idx)
161 .map(|&i| get_field(&rows[i].1, field))
162 .unwrap_or(serde_json::Value::Null),
163 _ => serde_json::Value::Null,
164 }
165}
166
167#[cfg(test)]
168mod tests {
169 use super::super::spec::{FrameBound, WindowFrame, WindowFuncSpec};
170 use super::apply_aggregate_window;
171 use crate::expr::SqlExpr;
172 use serde_json::json;
173
174 fn numbered(n: usize) -> Vec<(String, serde_json::Value)> {
175 (1..=n)
176 .map(|i| (i.to_string(), json!({ "n": i as i64 })))
177 .collect()
178 }
179
180 fn make_spec(func: &str, field: &str, frame: WindowFrame) -> WindowFuncSpec {
181 WindowFuncSpec {
182 alias: "result".into(),
183 func_name: func.into(),
184 args: if field == "*" {
185 vec![]
186 } else {
187 vec![SqlExpr::Column(field.into())]
188 },
189 partition_by: vec![],
190 order_by: vec![("n".into(), true)],
191 frame,
192 }
193 }
194
195 fn rows_frame(start: FrameBound, end: FrameBound) -> WindowFrame {
196 WindowFrame {
197 mode: "rows".into(),
198 start,
199 end,
200 }
201 }
202
203 fn range_frame(start: FrameBound, end: FrameBound) -> WindowFrame {
204 WindowFrame {
205 mode: "range".into(),
206 start,
207 end,
208 }
209 }
210
211 fn groups_frame(start: FrameBound, end: FrameBound) -> WindowFrame {
212 WindowFrame {
213 mode: "groups".into(),
214 start,
215 end,
216 }
217 }
218
219 #[test]
222 fn rows_1_preceding_1_following_sum() {
223 let mut rows = numbered(5);
224 let indices: Vec<usize> = (0..5).collect();
225 let spec = make_spec(
226 "sum",
227 "n",
228 rows_frame(FrameBound::Preceding(1), FrameBound::Following(1)),
229 );
230 apply_aggregate_window(&mut rows, &indices, &spec);
231 assert_eq!(rows[0].1["result"], json!(3.0));
237 assert_eq!(rows[1].1["result"], json!(6.0));
238 assert_eq!(rows[2].1["result"], json!(9.0));
239 assert_eq!(rows[3].1["result"], json!(12.0));
240 assert_eq!(rows[4].1["result"], json!(9.0));
241 }
242
243 #[test]
244 fn rows_unbounded_preceding_current_sum() {
245 let mut rows = numbered(5);
246 let indices: Vec<usize> = (0..5).collect();
247 let spec = make_spec(
248 "sum",
249 "n",
250 rows_frame(FrameBound::UnboundedPreceding, FrameBound::CurrentRow),
251 );
252 apply_aggregate_window(&mut rows, &indices, &spec);
253 assert_eq!(rows[0].1["result"], json!(1.0));
254 assert_eq!(rows[1].1["result"], json!(3.0));
255 assert_eq!(rows[2].1["result"], json!(6.0));
256 assert_eq!(rows[3].1["result"], json!(10.0));
257 assert_eq!(rows[4].1["result"], json!(15.0));
258 }
259
260 #[test]
261 fn rows_current_unbounded_following_sum() {
262 let mut rows = numbered(5);
263 let indices: Vec<usize> = (0..5).collect();
264 let spec = make_spec(
265 "sum",
266 "n",
267 rows_frame(FrameBound::CurrentRow, FrameBound::UnboundedFollowing),
268 );
269 apply_aggregate_window(&mut rows, &indices, &spec);
270 assert_eq!(rows[0].1["result"], json!(15.0));
274 assert_eq!(rows[1].1["result"], json!(14.0));
275 assert_eq!(rows[4].1["result"], json!(5.0));
276 }
277
278 #[test]
281 fn range_unbounded_preceding_current_row_with_ties() {
282 let mut rows = vec![
284 ("a".into(), json!({"n": 1i64})),
285 ("b".into(), json!({"n": 1i64})),
286 ("c".into(), json!({"n": 2i64})),
287 ("d".into(), json!({"n": 3i64})),
288 ];
289 let indices: Vec<usize> = (0..4).collect();
290 let spec = make_spec(
294 "sum",
295 "n",
296 range_frame(FrameBound::UnboundedPreceding, FrameBound::CurrentRow),
297 );
298 apply_aggregate_window(&mut rows, &indices, &spec);
299 assert_eq!(rows[0].1["result"], json!(2.0));
301 assert_eq!(rows[1].1["result"], json!(2.0));
303 assert_eq!(rows[2].1["result"], json!(4.0));
305 assert_eq!(rows[3].1["result"], json!(7.0));
307 }
308
309 #[test]
312 fn groups_1_preceding_1_following_sum() {
313 let mut rows = vec![
315 ("a".into(), json!({"n": 1i64})),
316 ("b".into(), json!({"n": 1i64})),
317 ("c".into(), json!({"n": 2i64})),
318 ("d".into(), json!({"n": 3i64})),
319 ("e".into(), json!({"n": 3i64})),
320 ];
321 let indices: Vec<usize> = (0..5).collect();
322 let spec = make_spec(
323 "sum",
324 "n",
325 groups_frame(FrameBound::Preceding(1), FrameBound::Following(1)),
326 );
327 apply_aggregate_window(&mut rows, &indices, &spec);
328 assert_eq!(rows[0].1["result"], json!(4.0));
330 assert_eq!(rows[1].1["result"], json!(4.0));
332 assert_eq!(rows[2].1["result"], json!(10.0));
334 assert_eq!(rows[3].1["result"], json!(8.0));
336 assert_eq!(rows[4].1["result"], json!(8.0));
338 }
339}