1use super::helpers::as_f64;
11use super::spec::{FrameBound, WindowFrame};
12
13pub(super) fn evaluate_frame_bounds(
30 frame: &WindowFrame,
31 pos: usize,
32 len: usize,
33 order_values: &[serde_json::Value],
34 peer_groups: &[usize],
35) -> (usize, usize) {
36 match frame.mode.as_str() {
37 "rows" => rows_bounds(&frame.start, &frame.end, pos, len),
38 "range" => range_bounds(&frame.start, &frame.end, pos, len, order_values),
39 "groups" => groups_bounds(&frame.start, &frame.end, pos, len, peer_groups),
40 _ => (0, len.saturating_sub(1)),
43 }
44}
45
46pub(super) fn build_peer_groups(order_values: &[serde_json::Value]) -> Vec<usize> {
52 let mut groups = Vec::with_capacity(order_values.len());
53 let mut current_group = 0usize;
54 for (i, val) in order_values.iter().enumerate() {
55 if i > 0 && val != &order_values[i - 1] {
56 current_group += 1;
57 }
58 groups.push(current_group);
59 }
60 groups
61}
62
63fn rows_bounds(start: &FrameBound, end: &FrameBound, pos: usize, len: usize) -> (usize, usize) {
66 let start_idx = rows_bound_to_idx(start, pos, len, true);
67 let end_idx = rows_bound_to_idx(end, pos, len, false);
68 (start_idx.min(end_idx), start_idx.max(end_idx))
69}
70
71fn rows_bound_to_idx(bound: &FrameBound, pos: usize, len: usize, _is_start: bool) -> usize {
72 match bound {
73 FrameBound::UnboundedPreceding => 0,
74 FrameBound::Preceding(n) => pos.saturating_sub(*n as usize),
75 FrameBound::CurrentRow => pos,
76 FrameBound::Following(n) => (pos + *n as usize).min(len.saturating_sub(1)),
77 FrameBound::UnboundedFollowing => len.saturating_sub(1),
78 }
79}
80
81fn range_bounds(
84 start: &FrameBound,
85 end: &FrameBound,
86 pos: usize,
87 len: usize,
88 order_values: &[serde_json::Value],
89) -> (usize, usize) {
90 let current_val = order_values.get(pos).and_then(as_f64);
91
92 let start_idx = range_bound_to_idx(start, pos, len, order_values, current_val, true);
93 let end_idx = range_bound_to_idx(end, pos, len, order_values, current_val, false);
94 (start_idx.min(end_idx), start_idx.max(end_idx))
95}
96
97fn range_bound_to_idx(
98 bound: &FrameBound,
99 pos: usize,
100 len: usize,
101 order_values: &[serde_json::Value],
102 current_val: Option<f64>,
103 is_start: bool,
104) -> usize {
105 match bound {
106 FrameBound::UnboundedPreceding => 0,
107 FrameBound::UnboundedFollowing => len.saturating_sub(1),
108 FrameBound::CurrentRow => {
109 if is_start {
112 let mut idx = pos;
114 while idx > 0 && order_values.get(idx - 1) == order_values.get(pos) {
115 idx -= 1;
116 }
117 idx
118 } else {
119 let mut idx = pos;
121 while idx + 1 < len && order_values.get(idx + 1) == order_values.get(pos) {
122 idx += 1;
123 }
124 idx
125 }
126 }
127 FrameBound::Preceding(n) => {
128 let threshold = match current_val {
129 Some(cv) => cv - *n as f64,
130 None => return pos,
132 };
133 let mut idx = 0;
135 for (i, v) in order_values.iter().enumerate() {
136 if as_f64(v).is_some_and(|fv| fv >= threshold) {
137 idx = i;
138 break;
139 }
140 idx = i + 1;
141 }
142 idx.min(len.saturating_sub(1))
143 }
144 FrameBound::Following(n) => {
145 let threshold = match current_val {
146 Some(cv) => cv + *n as f64,
147 None => return pos,
148 };
149 let mut idx = pos;
151 for (i, v) in order_values.iter().enumerate().skip(pos) {
152 if as_f64(v).is_none_or(|fv| fv > threshold) {
153 break;
154 }
155 idx = i;
156 }
157 idx.min(len.saturating_sub(1))
158 }
159 }
160}
161
162fn groups_bounds(
165 start: &FrameBound,
166 end: &FrameBound,
167 pos: usize,
168 len: usize,
169 peer_groups: &[usize],
170) -> (usize, usize) {
171 let current_group = peer_groups.get(pos).copied().unwrap_or(0);
172 let max_group = peer_groups.last().copied().unwrap_or(0);
173
174 let start_group = groups_bound_to_group(start, current_group, max_group, true);
175 let end_group = groups_bound_to_group(end, current_group, max_group, false);
176
177 let start_idx = peer_groups
179 .iter()
180 .position(|&g| g == start_group)
181 .unwrap_or(0);
182 let end_idx = peer_groups
183 .iter()
184 .rposition(|&g| g == end_group)
185 .unwrap_or(len.saturating_sub(1));
186
187 (start_idx, end_idx)
188}
189
190fn groups_bound_to_group(
191 bound: &FrameBound,
192 current_group: usize,
193 max_group: usize,
194 _is_start: bool,
195) -> usize {
196 match bound {
197 FrameBound::UnboundedPreceding => 0,
198 FrameBound::UnboundedFollowing => max_group,
199 FrameBound::CurrentRow => current_group,
200 FrameBound::Preceding(n) => current_group.saturating_sub(*n as usize),
201 FrameBound::Following(n) => (current_group + *n as usize).min(max_group),
202 }
203}
204
205#[cfg(test)]
208mod tests {
209 use super::*;
210 use crate::window::spec::{FrameBound, WindowFrame};
211 use serde_json::json;
212
213 fn range_frame(start: FrameBound, end: FrameBound) -> WindowFrame {
214 WindowFrame {
215 mode: "range".into(),
216 start,
217 end,
218 }
219 }
220
221 fn rows_frame(start: FrameBound, end: FrameBound) -> WindowFrame {
222 WindowFrame {
223 mode: "rows".into(),
224 start,
225 end,
226 }
227 }
228
229 fn groups_frame(start: FrameBound, end: FrameBound) -> WindowFrame {
230 WindowFrame {
231 mode: "groups".into(),
232 start,
233 end,
234 }
235 }
236
237 fn num_vals(ns: &[i64]) -> Vec<serde_json::Value> {
238 ns.iter().map(|&n| json!(n)).collect()
239 }
240
241 #[test]
244 fn rows_1_preceding_1_following() {
245 let frame = rows_frame(FrameBound::Preceding(1), FrameBound::Following(1));
246 assert_eq!(evaluate_frame_bounds(&frame, 0, 5, &[], &[]), (0, 1));
247 assert_eq!(evaluate_frame_bounds(&frame, 2, 5, &[], &[]), (1, 3));
248 assert_eq!(evaluate_frame_bounds(&frame, 4, 5, &[], &[]), (3, 4));
249 }
250
251 #[test]
252 fn rows_unbounded_preceding_current() {
253 let frame = rows_frame(FrameBound::UnboundedPreceding, FrameBound::CurrentRow);
254 assert_eq!(evaluate_frame_bounds(&frame, 0, 5, &[], &[]), (0, 0));
255 assert_eq!(evaluate_frame_bounds(&frame, 3, 5, &[], &[]), (0, 3));
256 }
257
258 #[test]
259 fn rows_current_unbounded_following() {
260 let frame = rows_frame(FrameBound::CurrentRow, FrameBound::UnboundedFollowing);
261 assert_eq!(evaluate_frame_bounds(&frame, 0, 5, &[], &[]), (0, 4));
262 assert_eq!(evaluate_frame_bounds(&frame, 4, 5, &[], &[]), (4, 4));
263 }
264
265 #[test]
268 fn range_unbounded_preceding_current_row_with_ties() {
269 let vals = num_vals(&[1, 1, 2, 3]);
271 let frame = range_frame(FrameBound::UnboundedPreceding, FrameBound::CurrentRow);
272 assert_eq!(evaluate_frame_bounds(&frame, 0, 4, &vals, &[]), (0, 1));
274 assert_eq!(evaluate_frame_bounds(&frame, 1, 4, &vals, &[]), (0, 1));
276 assert_eq!(evaluate_frame_bounds(&frame, 2, 4, &vals, &[]), (0, 2));
278 }
279
280 #[test]
281 fn range_numeric_preceding_following() {
282 let vals = num_vals(&[10, 20, 30, 40, 50]);
284 let frame = range_frame(FrameBound::Preceding(10), FrameBound::Following(10));
286 let (s, e) = evaluate_frame_bounds(&frame, 1, 5, &vals, &[]);
288 assert!(s == 0 && e == 2, "got ({s},{e})");
289 }
290
291 #[test]
294 fn build_peer_groups_basic() {
295 let vals = num_vals(&[1, 1, 2, 3, 3]);
296 let pg = build_peer_groups(&vals);
297 assert_eq!(pg, vec![0, 0, 1, 2, 2]);
298 }
299
300 #[test]
301 fn groups_1_preceding_1_following() {
302 let vals = num_vals(&[1, 1, 2, 3, 3]);
304 let pg = build_peer_groups(&vals);
305 let frame = groups_frame(FrameBound::Preceding(1), FrameBound::Following(1));
306
307 let (s, e) = evaluate_frame_bounds(&frame, 0, 5, &vals, &pg);
310 assert_eq!((s, e), (0, 2), "pos=0");
311
312 let (s, e) = evaluate_frame_bounds(&frame, 2, 5, &vals, &pg);
314 assert_eq!((s, e), (0, 4), "pos=2");
315
316 let (s, e) = evaluate_frame_bounds(&frame, 4, 5, &vals, &pg);
318 assert_eq!((s, e), (2, 4), "pos=4");
319 }
320}