1use super::aggregate::apply_aggregate_window;
6use super::helpers::build_partitions;
7use super::offset::{apply_lag, apply_lead, apply_nth_value};
8use super::ranking::{
9 apply_cume_dist, apply_dense_rank, apply_ntile, apply_percent_rank, apply_rank,
10 apply_row_number,
11};
12use super::spec::WindowFuncSpec;
13
14pub fn evaluate_window_functions(
24 rows: &mut [(String, serde_json::Value)],
25 specs: &[WindowFuncSpec],
26) {
27 for spec in specs {
28 let partitions = build_partitions(rows, &spec.partition_by);
29
30 for partition_indices in &partitions {
31 match spec.func_name.as_str() {
32 "row_number" => apply_row_number(rows, partition_indices, &spec.alias),
33 "rank" => apply_rank(rows, partition_indices, &spec.alias, &spec.order_by),
34 "dense_rank" => {
35 apply_dense_rank(rows, partition_indices, &spec.alias, &spec.order_by)
36 }
37 "ntile" => apply_ntile(rows, partition_indices, spec),
38 "percent_rank" => {
39 apply_percent_rank(rows, partition_indices, &spec.alias, &spec.order_by)
40 }
41 "cume_dist" => {
42 apply_cume_dist(rows, partition_indices, &spec.alias, &spec.order_by)
43 }
44 "lag" => apply_lag(rows, partition_indices, spec),
45 "lead" => apply_lead(rows, partition_indices, spec),
46 "nth_value" => apply_nth_value(rows, partition_indices, spec),
47 "sum" | "count" | "avg" | "min" | "max" | "first_value" | "last_value" => {
48 apply_aggregate_window(rows, partition_indices, spec)
49 }
50 other => {
51 unreachable!(
52 "invariant: SQL planner validates window function names before dispatch; '{other}' is unrecognized and should have been rejected at planning time"
53 )
54 }
55 }
56 }
57 }
58}
59
60#[cfg(test)]
61mod tests {
62 use super::super::spec::{WindowFrame, WindowFuncSpec};
63 use super::evaluate_window_functions;
64 use crate::expr::SqlExpr;
65 use serde_json::json;
66
67 fn make_rows() -> Vec<(String, serde_json::Value)> {
68 vec![
69 (
70 "1".into(),
71 json!({"dept": "eng", "salary": 100, "name": "Alice"}),
72 ),
73 (
74 "2".into(),
75 json!({"dept": "eng", "salary": 120, "name": "Bob"}),
76 ),
77 (
78 "3".into(),
79 json!({"dept": "eng", "salary": 90, "name": "Carol"}),
80 ),
81 (
82 "4".into(),
83 json!({"dept": "sales", "salary": 80, "name": "Dave"}),
84 ),
85 (
86 "5".into(),
87 json!({"dept": "sales", "salary": 110, "name": "Eve"}),
88 ),
89 ]
90 }
91
92 fn numbered(n: usize) -> Vec<(String, serde_json::Value)> {
93 (1..=n)
94 .map(|i| (i.to_string(), json!({ "n": i })))
95 .collect()
96 }
97
98 #[test]
99 fn row_number_single_partition() {
100 let mut rows = make_rows();
101 let spec = WindowFuncSpec {
102 alias: "rn".into(),
103 func_name: "row_number".into(),
104 args: vec![],
105 partition_by: vec![],
106 order_by: vec![],
107 frame: WindowFrame::default(),
108 };
109 evaluate_window_functions(&mut rows, &[spec]);
110 assert_eq!(rows[0].1["rn"], json!(1));
111 assert_eq!(rows[4].1["rn"], json!(5));
112 }
113
114 #[test]
115 fn row_number_partitioned() {
116 let mut rows = make_rows();
117 let spec = WindowFuncSpec {
118 alias: "rn".into(),
119 func_name: "row_number".into(),
120 args: vec![],
121 partition_by: vec!["dept".into()],
122 order_by: vec![],
123 frame: WindowFrame::default(),
124 };
125 evaluate_window_functions(&mut rows, &[spec]);
126 assert_eq!(rows[0].1["rn"], json!(1));
127 assert_eq!(rows[2].1["rn"], json!(3));
128 assert_eq!(rows[3].1["rn"], json!(1));
129 assert_eq!(rows[4].1["rn"], json!(2));
130 }
131
132 #[test]
133 fn running_sum() {
134 let mut rows = make_rows();
135 let spec = WindowFuncSpec {
136 alias: "running_total".into(),
137 func_name: "sum".into(),
138 args: vec![SqlExpr::Column("salary".into())],
139 partition_by: vec!["dept".into()],
140 order_by: vec![("salary".into(), true)],
141 frame: WindowFrame::default(),
142 };
143 evaluate_window_functions(&mut rows, &[spec]);
144 assert_eq!(rows[0].1["running_total"], json!(100.0));
145 assert_eq!(rows[1].1["running_total"], json!(220.0));
146 assert_eq!(rows[2].1["running_total"], json!(310.0));
147 assert_eq!(rows[3].1["running_total"], json!(80.0));
148 assert_eq!(rows[4].1["running_total"], json!(190.0));
149 }
150
151 #[test]
152 fn percent_rank_distinct_keys() {
153 let mut rows = numbered(5);
154 let spec = WindowFuncSpec {
155 alias: "pr".into(),
156 func_name: "percent_rank".into(),
157 args: vec![],
158 partition_by: vec![],
159 order_by: vec![("n".into(), true)],
160 frame: WindowFrame::default(),
161 };
162 evaluate_window_functions(&mut rows, &[spec]);
163 assert_eq!(rows[0].1["pr"], json!(0.0));
164 assert_eq!(rows[1].1["pr"], json!(0.25));
165 assert_eq!(rows[2].1["pr"], json!(0.5));
166 assert_eq!(rows[3].1["pr"], json!(0.75));
167 assert_eq!(rows[4].1["pr"], json!(1.0));
168 }
169
170 #[test]
171 fn percent_rank_with_peers() {
172 let mut rows = vec![
175 ("a".into(), json!({"n": 1})),
176 ("b".into(), json!({"n": 1})),
177 ("c".into(), json!({"n": 2})),
178 ("d".into(), json!({"n": 3})),
179 ];
180 let spec = WindowFuncSpec {
181 alias: "pr".into(),
182 func_name: "percent_rank".into(),
183 args: vec![],
184 partition_by: vec![],
185 order_by: vec![("n".into(), true)],
186 frame: WindowFrame::default(),
187 };
188 evaluate_window_functions(&mut rows, &[spec]);
189 assert_eq!(rows[0].1["pr"], json!(0.0));
190 assert_eq!(rows[1].1["pr"], json!(0.0));
191 assert_eq!(rows[2].1["pr"], json!(2.0 / 3.0));
192 assert_eq!(rows[3].1["pr"], json!(1.0));
193 }
194
195 #[test]
196 fn cume_dist_distinct_keys() {
197 let mut rows = numbered(5);
198 let spec = WindowFuncSpec {
199 alias: "cd".into(),
200 func_name: "cume_dist".into(),
201 args: vec![],
202 partition_by: vec![],
203 order_by: vec![("n".into(), true)],
204 frame: WindowFrame::default(),
205 };
206 evaluate_window_functions(&mut rows, &[spec]);
207 assert_eq!(rows[0].1["cd"], json!(0.2));
208 assert_eq!(rows[1].1["cd"], json!(0.4));
209 assert_eq!(rows[2].1["cd"], json!(0.6));
210 assert_eq!(rows[3].1["cd"], json!(0.8));
211 assert_eq!(rows[4].1["cd"], json!(1.0));
212 }
213
214 #[test]
215 fn cume_dist_with_peers() {
216 let mut rows = vec![
217 ("a".into(), json!({"n": 1})),
218 ("b".into(), json!({"n": 1})),
219 ("c".into(), json!({"n": 2})),
220 ("d".into(), json!({"n": 3})),
221 ];
222 let spec = WindowFuncSpec {
223 alias: "cd".into(),
224 func_name: "cume_dist".into(),
225 args: vec![],
226 partition_by: vec![],
227 order_by: vec![("n".into(), true)],
228 frame: WindowFrame::default(),
229 };
230 evaluate_window_functions(&mut rows, &[spec]);
231 assert_eq!(rows[0].1["cd"], json!(0.5));
233 assert_eq!(rows[1].1["cd"], json!(0.5));
234 assert_eq!(rows[2].1["cd"], json!(0.75));
235 assert_eq!(rows[3].1["cd"], json!(1.0));
236 }
237
238 #[test]
239 fn nth_value_returns_nth_then_holds() {
240 let mut rows = numbered(5);
241 let spec = WindowFuncSpec {
242 alias: "nv".into(),
243 func_name: "nth_value".into(),
244 args: vec![
245 SqlExpr::Column("n".into()),
246 SqlExpr::Literal(nodedb_types::Value::Integer(2)),
247 ],
248 partition_by: vec![],
249 order_by: vec![("n".into(), true)],
250 frame: WindowFrame::default(),
251 };
252 evaluate_window_functions(&mut rows, &[spec]);
253 assert_eq!(rows[0].1["nv"], json!(null));
254 assert_eq!(rows[1].1["nv"], json!(2));
255 assert_eq!(rows[2].1["nv"], json!(2));
256 assert_eq!(rows[3].1["nv"], json!(2));
257 assert_eq!(rows[4].1["nv"], json!(2));
258 }
259
260 #[test]
261 #[should_panic(expected = "should have been rejected at planning time")]
262 fn unknown_function_panics_at_evaluator() {
263 let mut rows = numbered(2);
264 let spec = WindowFuncSpec {
265 alias: "x".into(),
266 func_name: "frobnicate".into(),
267 args: vec![],
268 partition_by: vec![],
269 order_by: vec![],
270 frame: WindowFrame::default(),
271 };
272 evaluate_window_functions(&mut rows, &[spec]);
273 }
274}