Skip to main content

nodedb_query/window/
eval.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! Top-level dispatch for window-function evaluation.
4
5use super::aggregate::apply_aggregate_window;
6use super::helpers::build_partitions;
7use super::offset::{apply_lag, apply_lead, apply_nth_value};
8use super::ranking::{
9    apply_cume_dist, apply_dense_rank, apply_ntile, apply_percent_rank, apply_rank,
10    apply_row_number,
11};
12use super::spec::WindowFuncSpec;
13
14/// Evaluate window functions over sorted, partitioned rows.
15///
16/// `rows` is the sorted result set. Each row is a `(doc_id, serde_json::Value)`.
17/// The same rows are mutated in place with window columns appended to each
18/// document.
19///
20/// Unknown window function names must be rejected by the planner before
21/// reaching this dispatcher; an unrecognised name here is an internal bug
22/// and panics rather than silently dropping the projection.
23pub fn evaluate_window_functions(
24    rows: &mut [(String, serde_json::Value)],
25    specs: &[WindowFuncSpec],
26) {
27    for spec in specs {
28        let partitions = build_partitions(rows, &spec.partition_by);
29
30        for partition_indices in &partitions {
31            match spec.func_name.as_str() {
32                "row_number" => apply_row_number(rows, partition_indices, &spec.alias),
33                "rank" => apply_rank(rows, partition_indices, &spec.alias, &spec.order_by),
34                "dense_rank" => {
35                    apply_dense_rank(rows, partition_indices, &spec.alias, &spec.order_by)
36                }
37                "ntile" => apply_ntile(rows, partition_indices, spec),
38                "percent_rank" => {
39                    apply_percent_rank(rows, partition_indices, &spec.alias, &spec.order_by)
40                }
41                "cume_dist" => {
42                    apply_cume_dist(rows, partition_indices, &spec.alias, &spec.order_by)
43                }
44                "lag" => apply_lag(rows, partition_indices, spec),
45                "lead" => apply_lead(rows, partition_indices, spec),
46                "nth_value" => apply_nth_value(rows, partition_indices, spec),
47                "sum" | "count" | "avg" | "min" | "max" | "first_value" | "last_value" => {
48                    apply_aggregate_window(rows, partition_indices, spec)
49                }
50                other => {
51                    unreachable!(
52                        "invariant: SQL planner validates window function names before dispatch; '{other}' is unrecognized and should have been rejected at planning time"
53                    )
54                }
55            }
56        }
57    }
58}
59
60#[cfg(test)]
61mod tests {
62    use super::super::spec::{WindowFrame, WindowFuncSpec};
63    use super::evaluate_window_functions;
64    use crate::expr::SqlExpr;
65    use serde_json::json;
66
67    fn make_rows() -> Vec<(String, serde_json::Value)> {
68        vec![
69            (
70                "1".into(),
71                json!({"dept": "eng", "salary": 100, "name": "Alice"}),
72            ),
73            (
74                "2".into(),
75                json!({"dept": "eng", "salary": 120, "name": "Bob"}),
76            ),
77            (
78                "3".into(),
79                json!({"dept": "eng", "salary": 90, "name": "Carol"}),
80            ),
81            (
82                "4".into(),
83                json!({"dept": "sales", "salary": 80, "name": "Dave"}),
84            ),
85            (
86                "5".into(),
87                json!({"dept": "sales", "salary": 110, "name": "Eve"}),
88            ),
89        ]
90    }
91
92    fn numbered(n: usize) -> Vec<(String, serde_json::Value)> {
93        (1..=n)
94            .map(|i| (i.to_string(), json!({ "n": i })))
95            .collect()
96    }
97
98    #[test]
99    fn row_number_single_partition() {
100        let mut rows = make_rows();
101        let spec = WindowFuncSpec {
102            alias: "rn".into(),
103            func_name: "row_number".into(),
104            args: vec![],
105            partition_by: vec![],
106            order_by: vec![],
107            frame: WindowFrame::default(),
108        };
109        evaluate_window_functions(&mut rows, &[spec]);
110        assert_eq!(rows[0].1["rn"], json!(1));
111        assert_eq!(rows[4].1["rn"], json!(5));
112    }
113
114    #[test]
115    fn row_number_partitioned() {
116        let mut rows = make_rows();
117        let spec = WindowFuncSpec {
118            alias: "rn".into(),
119            func_name: "row_number".into(),
120            args: vec![],
121            partition_by: vec!["dept".into()],
122            order_by: vec![],
123            frame: WindowFrame::default(),
124        };
125        evaluate_window_functions(&mut rows, &[spec]);
126        assert_eq!(rows[0].1["rn"], json!(1));
127        assert_eq!(rows[2].1["rn"], json!(3));
128        assert_eq!(rows[3].1["rn"], json!(1));
129        assert_eq!(rows[4].1["rn"], json!(2));
130    }
131
132    #[test]
133    fn running_sum() {
134        let mut rows = make_rows();
135        let spec = WindowFuncSpec {
136            alias: "running_total".into(),
137            func_name: "sum".into(),
138            args: vec![SqlExpr::Column("salary".into())],
139            partition_by: vec!["dept".into()],
140            order_by: vec![("salary".into(), true)],
141            frame: WindowFrame::default(),
142        };
143        evaluate_window_functions(&mut rows, &[spec]);
144        assert_eq!(rows[0].1["running_total"], json!(100.0));
145        assert_eq!(rows[1].1["running_total"], json!(220.0));
146        assert_eq!(rows[2].1["running_total"], json!(310.0));
147        assert_eq!(rows[3].1["running_total"], json!(80.0));
148        assert_eq!(rows[4].1["running_total"], json!(190.0));
149    }
150
151    #[test]
152    fn percent_rank_distinct_keys() {
153        let mut rows = numbered(5);
154        let spec = WindowFuncSpec {
155            alias: "pr".into(),
156            func_name: "percent_rank".into(),
157            args: vec![],
158            partition_by: vec![],
159            order_by: vec![("n".into(), true)],
160            frame: WindowFrame::default(),
161        };
162        evaluate_window_functions(&mut rows, &[spec]);
163        assert_eq!(rows[0].1["pr"], json!(0.0));
164        assert_eq!(rows[1].1["pr"], json!(0.25));
165        assert_eq!(rows[2].1["pr"], json!(0.5));
166        assert_eq!(rows[3].1["pr"], json!(0.75));
167        assert_eq!(rows[4].1["pr"], json!(1.0));
168    }
169
170    #[test]
171    fn percent_rank_with_peers() {
172        // Peers share the leader's rank, so [1, 1, 2, 3] yields ranks
173        // 1, 1, 3, 4 → percent_rank = 0, 0, 2/3, 3/3.
174        let mut rows = vec![
175            ("a".into(), json!({"n": 1})),
176            ("b".into(), json!({"n": 1})),
177            ("c".into(), json!({"n": 2})),
178            ("d".into(), json!({"n": 3})),
179        ];
180        let spec = WindowFuncSpec {
181            alias: "pr".into(),
182            func_name: "percent_rank".into(),
183            args: vec![],
184            partition_by: vec![],
185            order_by: vec![("n".into(), true)],
186            frame: WindowFrame::default(),
187        };
188        evaluate_window_functions(&mut rows, &[spec]);
189        assert_eq!(rows[0].1["pr"], json!(0.0));
190        assert_eq!(rows[1].1["pr"], json!(0.0));
191        assert_eq!(rows[2].1["pr"], json!(2.0 / 3.0));
192        assert_eq!(rows[3].1["pr"], json!(1.0));
193    }
194
195    #[test]
196    fn cume_dist_distinct_keys() {
197        let mut rows = numbered(5);
198        let spec = WindowFuncSpec {
199            alias: "cd".into(),
200            func_name: "cume_dist".into(),
201            args: vec![],
202            partition_by: vec![],
203            order_by: vec![("n".into(), true)],
204            frame: WindowFrame::default(),
205        };
206        evaluate_window_functions(&mut rows, &[spec]);
207        assert_eq!(rows[0].1["cd"], json!(0.2));
208        assert_eq!(rows[1].1["cd"], json!(0.4));
209        assert_eq!(rows[2].1["cd"], json!(0.6));
210        assert_eq!(rows[3].1["cd"], json!(0.8));
211        assert_eq!(rows[4].1["cd"], json!(1.0));
212    }
213
214    #[test]
215    fn cume_dist_with_peers() {
216        let mut rows = vec![
217            ("a".into(), json!({"n": 1})),
218            ("b".into(), json!({"n": 1})),
219            ("c".into(), json!({"n": 2})),
220            ("d".into(), json!({"n": 3})),
221        ];
222        let spec = WindowFuncSpec {
223            alias: "cd".into(),
224            func_name: "cume_dist".into(),
225            args: vec![],
226            partition_by: vec![],
227            order_by: vec![("n".into(), true)],
228            frame: WindowFrame::default(),
229        };
230        evaluate_window_functions(&mut rows, &[spec]);
231        // Peers share value of last peer's position / N.
232        assert_eq!(rows[0].1["cd"], json!(0.5));
233        assert_eq!(rows[1].1["cd"], json!(0.5));
234        assert_eq!(rows[2].1["cd"], json!(0.75));
235        assert_eq!(rows[3].1["cd"], json!(1.0));
236    }
237
238    #[test]
239    fn nth_value_returns_nth_then_holds() {
240        let mut rows = numbered(5);
241        let spec = WindowFuncSpec {
242            alias: "nv".into(),
243            func_name: "nth_value".into(),
244            args: vec![
245                SqlExpr::Column("n".into()),
246                SqlExpr::Literal(nodedb_types::Value::Integer(2)),
247            ],
248            partition_by: vec![],
249            order_by: vec![("n".into(), true)],
250            frame: WindowFrame::default(),
251        };
252        evaluate_window_functions(&mut rows, &[spec]);
253        assert_eq!(rows[0].1["nv"], json!(null));
254        assert_eq!(rows[1].1["nv"], json!(2));
255        assert_eq!(rows[2].1["nv"], json!(2));
256        assert_eq!(rows[3].1["nv"], json!(2));
257        assert_eq!(rows[4].1["nv"], json!(2));
258    }
259
260    #[test]
261    #[should_panic(expected = "should have been rejected at planning time")]
262    fn unknown_function_panics_at_evaluator() {
263        let mut rows = numbered(2);
264        let spec = WindowFuncSpec {
265            alias: "x".into(),
266            func_name: "frobnicate".into(),
267            args: vec![],
268            partition_by: vec![],
269            order_by: vec![],
270            frame: WindowFrame::default(),
271        };
272        evaluate_window_functions(&mut rows, &[spec]);
273    }
274}