Skip to main content

nodedb_query/window/
aggregate.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! Aggregate functions used as windows: sum, count, avg, min, max,
4//! first_value, last_value.
5//!
6//! Dispatch logic:
7//! - `RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW` → fast running path
8//!   (preserves SIMD accumulation).
9//! - All other frame combinations → per-row frame evaluator that computes the
10//!   concrete `[start_idx, end_idx]` slice for every row and aggregates over
11//!   it.
12
13use crate::expr::SqlExpr;
14
15use super::frame::{build_peer_groups, evaluate_frame_bounds};
16use super::helpers::{as_f64, get_field, set_window_col};
17use super::running::running_aggregate;
18use super::spec::{FrameBound, WindowFuncSpec};
19
20pub(super) fn apply_aggregate_window(
21    rows: &mut [(String, serde_json::Value)],
22    indices: &[usize],
23    spec: &WindowFuncSpec,
24) {
25    let field = spec
26        .args
27        .first()
28        .and_then(|e| match e {
29            SqlExpr::Column(c) => Some(c.as_str()),
30            _ => None,
31        })
32        .unwrap_or("*");
33
34    // Fast path: RANGE UNBOUNDED PRECEDING TO CURRENT ROW is the most common
35    // pattern (the PostgreSQL default for ordered windows). Use the running
36    // accumulator rather than re-aggregating the slice from scratch each row.
37    let use_running = spec.frame.mode == "range"
38        && matches!(spec.frame.start, FrameBound::UnboundedPreceding)
39        && matches!(spec.frame.end, FrameBound::CurrentRow);
40
41    if use_running {
42        running_aggregate(rows, indices, spec, field);
43        return;
44    }
45
46    per_row_aggregate(rows, indices, spec, field);
47}
48
49/// Per-row frame evaluator.
50///
51/// For each row position `pos` in the partition:
52/// 1. Resolve the concrete `[start_idx, end_idx]` frame slice via
53///    `evaluate_frame_bounds`.
54/// 2. Aggregate `field` over `indices[start_idx..=end_idx]`.
55/// 3. Write the result back under `spec.alias`.
56fn per_row_aggregate(
57    rows: &mut [(String, serde_json::Value)],
58    indices: &[usize],
59    spec: &WindowFuncSpec,
60    field: &str,
61) {
62    let len = indices.len();
63    if len == 0 {
64        return;
65    }
66
67    // Extract order-by values for RANGE numeric offsets.
68    let order_col = spec.order_by.first().map(|(col, _)| col.as_str());
69    let order_values: Vec<serde_json::Value> = indices
70        .iter()
71        .map(|&i| {
72            order_col
73                .map(|col| get_field(&rows[i].1, col))
74                .unwrap_or(serde_json::Value::Null)
75        })
76        .collect();
77
78    // Peer groups needed for GROUPS mode (and for RANGE CurrentRow peer
79    // awareness — reused from the frame module which handles both).
80    let peer_groups: Vec<usize> = if spec.frame.mode == "groups" {
81        build_peer_groups(&order_values)
82    } else {
83        Vec::new()
84    };
85
86    // Pre-collect all numeric values to avoid repeated borrow issues.
87    let all_vals: Vec<Option<f64>> = indices
88        .iter()
89        .map(|&i| as_f64(&get_field(&rows[i].1, field)))
90        .collect();
91
92    // We need to write into `rows` after computing each result; collect
93    // results first so we only borrow `rows` immutably during computation.
94    let results: Vec<serde_json::Value> = (0..len)
95        .map(|pos| {
96            let (start_idx, end_idx) =
97                evaluate_frame_bounds(&spec.frame, pos, len, &order_values, &peer_groups);
98
99            aggregate_slice(&all_vals, indices, rows, field, spec, start_idx, end_idx)
100        })
101        .collect();
102
103    for (pos, result) in results.into_iter().enumerate() {
104        let row_idx = indices[pos];
105        set_window_col(&mut rows[row_idx].1, &spec.alias, result);
106    }
107}
108
109/// Aggregate `field` over the slice `indices[start_idx..=end_idx]`.
110fn aggregate_slice(
111    all_vals: &[Option<f64>],
112    indices: &[usize],
113    rows: &[(String, serde_json::Value)],
114    field: &str,
115    spec: &WindowFuncSpec,
116    start_idx: usize,
117    end_idx: usize,
118) -> serde_json::Value {
119    let slice_vals: Vec<f64> = all_vals[start_idx..=end_idx]
120        .iter()
121        .filter_map(|v| *v)
122        .collect();
123    let slice_count = end_idx - start_idx + 1;
124
125    match spec.func_name.as_str() {
126        "sum" => {
127            let rt = crate::simd_agg::ts_runtime();
128            serde_json::json!((rt.sum_f64)(&slice_vals))
129        }
130        "count" => serde_json::json!(slice_count),
131        "avg" => {
132            if slice_vals.is_empty() {
133                serde_json::Value::Null
134            } else {
135                let rt = crate::simd_agg::ts_runtime();
136                serde_json::json!((rt.sum_f64)(&slice_vals) / slice_vals.len() as f64)
137            }
138        }
139        "min" => {
140            if slice_vals.is_empty() {
141                serde_json::Value::Null
142            } else {
143                let rt = crate::simd_agg::ts_runtime();
144                serde_json::json!((rt.min_f64)(&slice_vals))
145            }
146        }
147        "max" => {
148            if slice_vals.is_empty() {
149                serde_json::Value::Null
150            } else {
151                let rt = crate::simd_agg::ts_runtime();
152                serde_json::json!((rt.max_f64)(&slice_vals))
153            }
154        }
155        "first_value" => indices
156            .get(start_idx)
157            .map(|&i| get_field(&rows[i].1, field))
158            .unwrap_or(serde_json::Value::Null),
159        "last_value" => indices
160            .get(end_idx)
161            .map(|&i| get_field(&rows[i].1, field))
162            .unwrap_or(serde_json::Value::Null),
163        _ => serde_json::Value::Null,
164    }
165}
166
167#[cfg(test)]
168mod tests {
169    use super::super::spec::{FrameBound, WindowFrame, WindowFuncSpec};
170    use super::apply_aggregate_window;
171    use crate::expr::SqlExpr;
172    use serde_json::json;
173
174    fn numbered(n: usize) -> Vec<(String, serde_json::Value)> {
175        (1..=n)
176            .map(|i| (i.to_string(), json!({ "n": i as i64 })))
177            .collect()
178    }
179
180    fn make_spec(func: &str, field: &str, frame: WindowFrame) -> WindowFuncSpec {
181        WindowFuncSpec {
182            alias: "result".into(),
183            func_name: func.into(),
184            args: if field == "*" {
185                vec![]
186            } else {
187                vec![SqlExpr::Column(field.into())]
188            },
189            partition_by: vec![],
190            order_by: vec![("n".into(), true)],
191            frame,
192        }
193    }
194
195    fn rows_frame(start: FrameBound, end: FrameBound) -> WindowFrame {
196        WindowFrame {
197            mode: "rows".into(),
198            start,
199            end,
200        }
201    }
202
203    fn range_frame(start: FrameBound, end: FrameBound) -> WindowFrame {
204        WindowFrame {
205            mode: "range".into(),
206            start,
207            end,
208        }
209    }
210
211    fn groups_frame(start: FrameBound, end: FrameBound) -> WindowFrame {
212        WindowFrame {
213            mode: "groups".into(),
214            start,
215            end,
216        }
217    }
218
219    // ── ROWS ──────────────────────────────────────────────────────────────────
220
221    #[test]
222    fn rows_1_preceding_1_following_sum() {
223        let mut rows = numbered(5);
224        let indices: Vec<usize> = (0..5).collect();
225        let spec = make_spec(
226            "sum",
227            "n",
228            rows_frame(FrameBound::Preceding(1), FrameBound::Following(1)),
229        );
230        apply_aggregate_window(&mut rows, &indices, &spec);
231        // row 0 (n=1): sum of [1,2] = 3
232        // row 1 (n=2): sum of [1,2,3] = 6
233        // row 2 (n=3): sum of [2,3,4] = 9
234        // row 3 (n=4): sum of [3,4,5] = 12
235        // row 4 (n=5): sum of [4,5] = 9
236        assert_eq!(rows[0].1["result"], json!(3.0));
237        assert_eq!(rows[1].1["result"], json!(6.0));
238        assert_eq!(rows[2].1["result"], json!(9.0));
239        assert_eq!(rows[3].1["result"], json!(12.0));
240        assert_eq!(rows[4].1["result"], json!(9.0));
241    }
242
243    #[test]
244    fn rows_unbounded_preceding_current_sum() {
245        let mut rows = numbered(5);
246        let indices: Vec<usize> = (0..5).collect();
247        let spec = make_spec(
248            "sum",
249            "n",
250            rows_frame(FrameBound::UnboundedPreceding, FrameBound::CurrentRow),
251        );
252        apply_aggregate_window(&mut rows, &indices, &spec);
253        assert_eq!(rows[0].1["result"], json!(1.0));
254        assert_eq!(rows[1].1["result"], json!(3.0));
255        assert_eq!(rows[2].1["result"], json!(6.0));
256        assert_eq!(rows[3].1["result"], json!(10.0));
257        assert_eq!(rows[4].1["result"], json!(15.0));
258    }
259
260    #[test]
261    fn rows_current_unbounded_following_sum() {
262        let mut rows = numbered(5);
263        let indices: Vec<usize> = (0..5).collect();
264        let spec = make_spec(
265            "sum",
266            "n",
267            rows_frame(FrameBound::CurrentRow, FrameBound::UnboundedFollowing),
268        );
269        apply_aggregate_window(&mut rows, &indices, &spec);
270        // row 0: sum 1+2+3+4+5=15
271        // row 1: sum 2+3+4+5=14
272        // ...
273        assert_eq!(rows[0].1["result"], json!(15.0));
274        assert_eq!(rows[1].1["result"], json!(14.0));
275        assert_eq!(rows[4].1["result"], json!(5.0));
276    }
277
278    // ── RANGE ─────────────────────────────────────────────────────────────────
279
280    #[test]
281    fn range_unbounded_preceding_current_row_with_ties() {
282        // Values: n in [1, 1, 2, 3] — two rows with n=1 share same frame.
283        let mut rows = vec![
284            ("a".into(), json!({"n": 1i64})),
285            ("b".into(), json!({"n": 1i64})),
286            ("c".into(), json!({"n": 2i64})),
287            ("d".into(), json!({"n": 3i64})),
288        ];
289        let indices: Vec<usize> = (0..4).collect();
290        // Use the fast running path (RANGE UNBOUNDED PRECEDING TO CURRENT ROW)
291        // This is the default frame; both rows a and b must see SUM=2 (both
292        // peers are included up to CURRENT ROW which expands to the last peer).
293        let spec = make_spec(
294            "sum",
295            "n",
296            range_frame(FrameBound::UnboundedPreceding, FrameBound::CurrentRow),
297        );
298        apply_aggregate_window(&mut rows, &indices, &spec);
299        // Row a (n=1, pos=0): CURRENT ROW expands to last peer at pos=1, sum=1+1=2
300        assert_eq!(rows[0].1["result"], json!(2.0));
301        // Row b (n=1, pos=1): same
302        assert_eq!(rows[1].1["result"], json!(2.0));
303        // Row c (n=2): sum=1+1+2=4
304        assert_eq!(rows[2].1["result"], json!(4.0));
305        // Row d (n=3): sum=1+1+2+3=7
306        assert_eq!(rows[3].1["result"], json!(7.0));
307    }
308
309    // ── GROUPS ────────────────────────────────────────────────────────────────
310
311    #[test]
312    fn groups_1_preceding_1_following_sum() {
313        // Values: [1, 1, 2, 3, 3] — groups [0, 0, 1, 2, 2]
314        let mut rows = vec![
315            ("a".into(), json!({"n": 1i64})),
316            ("b".into(), json!({"n": 1i64})),
317            ("c".into(), json!({"n": 2i64})),
318            ("d".into(), json!({"n": 3i64})),
319            ("e".into(), json!({"n": 3i64})),
320        ];
321        let indices: Vec<usize> = (0..5).collect();
322        let spec = make_spec(
323            "sum",
324            "n",
325            groups_frame(FrameBound::Preceding(1), FrameBound::Following(1)),
326        );
327        apply_aggregate_window(&mut rows, &indices, &spec);
328        // pos=0 (group 0): frame → groups 0..=1 → rows 0..=2 → sum=1+1+2=4
329        assert_eq!(rows[0].1["result"], json!(4.0));
330        // pos=1 (group 0): same frame
331        assert_eq!(rows[1].1["result"], json!(4.0));
332        // pos=2 (group 1): frame → groups 0..=2 → rows 0..=4 → sum=1+1+2+3+3=10
333        assert_eq!(rows[2].1["result"], json!(10.0));
334        // pos=3 (group 2): frame → groups 1..=2 → rows 2..=4 → sum=2+3+3=8
335        assert_eq!(rows[3].1["result"], json!(8.0));
336        // pos=4 (group 2): same
337        assert_eq!(rows[4].1["result"], json!(8.0));
338    }
339}