Skip to main content

nodedb_query/window/
frame.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! Window frame bound resolution.
4//!
5//! Given the current row position, the partition length, the frame spec, and
6//! (for RANGE/GROUPS) order-by values, this module resolves a frame to a
7//! concrete `[start_idx, end_idx]` inclusive range into the partition's index
8//! array.
9
10use super::helpers::as_f64;
11use super::spec::{FrameBound, WindowFrame};
12
13/// Resolve the frame for row at position `pos` in a partition of `len` rows.
14///
15/// Returns `(start_idx, end_idx)` — both are inclusive indices into the
16/// sorted partition slice. Guaranteed: `start_idx <= end_idx`, both within
17/// `0..len`.
18///
19/// Arguments:
20/// - `frame`: the `WindowFrame` from the spec.
21/// - `pos`: current row's position within the partition (0-based).
22/// - `len`: total number of rows in the partition.
23/// - `order_values`: the ORDER BY column value for each row in partition order
24///   (needed for RANGE numeric offsets). Empty slice is fine when no numeric
25///   offsets are used.
26/// - `peer_groups`: for each row, its zero-based peer-group index (needed for
27///   GROUPS mode). Computed once per partition by `build_peer_groups`.
28///   Empty slice is fine when mode != "groups".
29pub(super) fn evaluate_frame_bounds(
30    frame: &WindowFrame,
31    pos: usize,
32    len: usize,
33    order_values: &[serde_json::Value],
34    peer_groups: &[usize],
35) -> (usize, usize) {
36    match frame.mode.as_str() {
37        "rows" => rows_bounds(&frame.start, &frame.end, pos, len),
38        "range" => range_bounds(&frame.start, &frame.end, pos, len, order_values),
39        "groups" => groups_bounds(&frame.start, &frame.end, pos, len, peer_groups),
40        // Unrecognised mode treated as full-partition — the planner must have
41        // validated the mode; reaching here with an unknown value is a bug.
42        _ => (0, len.saturating_sub(1)),
43    }
44}
45
46/// Build a per-row peer-group index array for a partition.
47///
48/// Two rows are in the same peer group when they share the same order-by
49/// value. The returned vec has the same length as the partition; each element
50/// is the zero-based group index of that row.
51pub(super) fn build_peer_groups(order_values: &[serde_json::Value]) -> Vec<usize> {
52    let mut groups = Vec::with_capacity(order_values.len());
53    let mut current_group = 0usize;
54    for (i, val) in order_values.iter().enumerate() {
55        if i > 0 && val != &order_values[i - 1] {
56            current_group += 1;
57        }
58        groups.push(current_group);
59    }
60    groups
61}
62
63// ── ROWS ─────────────────────────────────────────────────────────────────────
64
65fn rows_bounds(start: &FrameBound, end: &FrameBound, pos: usize, len: usize) -> (usize, usize) {
66    let start_idx = rows_bound_to_idx(start, pos, len, true);
67    let end_idx = rows_bound_to_idx(end, pos, len, false);
68    (start_idx.min(end_idx), start_idx.max(end_idx))
69}
70
71fn rows_bound_to_idx(bound: &FrameBound, pos: usize, len: usize, _is_start: bool) -> usize {
72    match bound {
73        FrameBound::UnboundedPreceding => 0,
74        FrameBound::Preceding(n) => pos.saturating_sub(*n as usize),
75        FrameBound::CurrentRow => pos,
76        FrameBound::Following(n) => (pos + *n as usize).min(len.saturating_sub(1)),
77        FrameBound::UnboundedFollowing => len.saturating_sub(1),
78    }
79}
80
81// ── RANGE ─────────────────────────────────────────────────────────────────────
82
83fn range_bounds(
84    start: &FrameBound,
85    end: &FrameBound,
86    pos: usize,
87    len: usize,
88    order_values: &[serde_json::Value],
89) -> (usize, usize) {
90    let current_val = order_values.get(pos).and_then(as_f64);
91
92    let start_idx = range_bound_to_idx(start, pos, len, order_values, current_val, true);
93    let end_idx = range_bound_to_idx(end, pos, len, order_values, current_val, false);
94    (start_idx.min(end_idx), start_idx.max(end_idx))
95}
96
97fn range_bound_to_idx(
98    bound: &FrameBound,
99    pos: usize,
100    len: usize,
101    order_values: &[serde_json::Value],
102    current_val: Option<f64>,
103    is_start: bool,
104) -> usize {
105    match bound {
106        FrameBound::UnboundedPreceding => 0,
107        FrameBound::UnboundedFollowing => len.saturating_sub(1),
108        FrameBound::CurrentRow => {
109            // Peer-aware: for start bound, go back to first peer;
110            // for end bound, advance to last peer.
111            if is_start {
112                // Scan backward to find the first row with the same value.
113                let mut idx = pos;
114                while idx > 0 && order_values.get(idx - 1) == order_values.get(pos) {
115                    idx -= 1;
116                }
117                idx
118            } else {
119                // Scan forward to find the last row with the same value.
120                let mut idx = pos;
121                while idx + 1 < len && order_values.get(idx + 1) == order_values.get(pos) {
122                    idx += 1;
123                }
124                idx
125            }
126        }
127        FrameBound::Preceding(n) => {
128            let threshold = match current_val {
129                Some(cv) => cv - *n as f64,
130                // No numeric order value: fall back to current row position.
131                None => return pos,
132            };
133            // First row whose value >= threshold.
134            let mut idx = 0;
135            for (i, v) in order_values.iter().enumerate() {
136                if as_f64(v).is_some_and(|fv| fv >= threshold) {
137                    idx = i;
138                    break;
139                }
140                idx = i + 1;
141            }
142            idx.min(len.saturating_sub(1))
143        }
144        FrameBound::Following(n) => {
145            let threshold = match current_val {
146                Some(cv) => cv + *n as f64,
147                None => return pos,
148            };
149            // Last row whose value <= threshold.
150            let mut idx = pos;
151            for (i, v) in order_values.iter().enumerate().skip(pos) {
152                if as_f64(v).is_none_or(|fv| fv > threshold) {
153                    break;
154                }
155                idx = i;
156            }
157            idx.min(len.saturating_sub(1))
158        }
159    }
160}
161
162// ── GROUPS ────────────────────────────────────────────────────────────────────
163
164fn groups_bounds(
165    start: &FrameBound,
166    end: &FrameBound,
167    pos: usize,
168    len: usize,
169    peer_groups: &[usize],
170) -> (usize, usize) {
171    let current_group = peer_groups.get(pos).copied().unwrap_or(0);
172    let max_group = peer_groups.last().copied().unwrap_or(0);
173
174    let start_group = groups_bound_to_group(start, current_group, max_group, true);
175    let end_group = groups_bound_to_group(end, current_group, max_group, false);
176
177    // Convert group numbers to row indices.
178    let start_idx = peer_groups
179        .iter()
180        .position(|&g| g == start_group)
181        .unwrap_or(0);
182    let end_idx = peer_groups
183        .iter()
184        .rposition(|&g| g == end_group)
185        .unwrap_or(len.saturating_sub(1));
186
187    (start_idx, end_idx)
188}
189
190fn groups_bound_to_group(
191    bound: &FrameBound,
192    current_group: usize,
193    max_group: usize,
194    _is_start: bool,
195) -> usize {
196    match bound {
197        FrameBound::UnboundedPreceding => 0,
198        FrameBound::UnboundedFollowing => max_group,
199        FrameBound::CurrentRow => current_group,
200        FrameBound::Preceding(n) => current_group.saturating_sub(*n as usize),
201        FrameBound::Following(n) => (current_group + *n as usize).min(max_group),
202    }
203}
204
205// ── Unit tests ────────────────────────────────────────────────────────────────
206
207#[cfg(test)]
208mod tests {
209    use super::*;
210    use crate::window::spec::{FrameBound, WindowFrame};
211    use serde_json::json;
212
213    fn range_frame(start: FrameBound, end: FrameBound) -> WindowFrame {
214        WindowFrame {
215            mode: "range".into(),
216            start,
217            end,
218        }
219    }
220
221    fn rows_frame(start: FrameBound, end: FrameBound) -> WindowFrame {
222        WindowFrame {
223            mode: "rows".into(),
224            start,
225            end,
226        }
227    }
228
229    fn groups_frame(start: FrameBound, end: FrameBound) -> WindowFrame {
230        WindowFrame {
231            mode: "groups".into(),
232            start,
233            end,
234        }
235    }
236
237    fn num_vals(ns: &[i64]) -> Vec<serde_json::Value> {
238        ns.iter().map(|&n| json!(n)).collect()
239    }
240
241    // ROWS ───────────────────────────────────────────────────────────────────
242
243    #[test]
244    fn rows_1_preceding_1_following() {
245        let frame = rows_frame(FrameBound::Preceding(1), FrameBound::Following(1));
246        assert_eq!(evaluate_frame_bounds(&frame, 0, 5, &[], &[]), (0, 1));
247        assert_eq!(evaluate_frame_bounds(&frame, 2, 5, &[], &[]), (1, 3));
248        assert_eq!(evaluate_frame_bounds(&frame, 4, 5, &[], &[]), (3, 4));
249    }
250
251    #[test]
252    fn rows_unbounded_preceding_current() {
253        let frame = rows_frame(FrameBound::UnboundedPreceding, FrameBound::CurrentRow);
254        assert_eq!(evaluate_frame_bounds(&frame, 0, 5, &[], &[]), (0, 0));
255        assert_eq!(evaluate_frame_bounds(&frame, 3, 5, &[], &[]), (0, 3));
256    }
257
258    #[test]
259    fn rows_current_unbounded_following() {
260        let frame = rows_frame(FrameBound::CurrentRow, FrameBound::UnboundedFollowing);
261        assert_eq!(evaluate_frame_bounds(&frame, 0, 5, &[], &[]), (0, 4));
262        assert_eq!(evaluate_frame_bounds(&frame, 4, 5, &[], &[]), (4, 4));
263    }
264
265    // RANGE ──────────────────────────────────────────────────────────────────
266
267    #[test]
268    fn range_unbounded_preceding_current_row_with_ties() {
269        // Values: [1, 1, 2, 3] — both pos=0 and pos=1 share value 1.
270        let vals = num_vals(&[1, 1, 2, 3]);
271        let frame = range_frame(FrameBound::UnboundedPreceding, FrameBound::CurrentRow);
272        // pos 0: start=0, end=last peer at val 1 = idx 1
273        assert_eq!(evaluate_frame_bounds(&frame, 0, 4, &vals, &[]), (0, 1));
274        // pos 1: same group
275        assert_eq!(evaluate_frame_bounds(&frame, 1, 4, &vals, &[]), (0, 1));
276        // pos 2: val 2, no peer
277        assert_eq!(evaluate_frame_bounds(&frame, 2, 4, &vals, &[]), (0, 2));
278    }
279
280    #[test]
281    fn range_numeric_preceding_following() {
282        // Values: [10, 20, 30, 40, 50]
283        let vals = num_vals(&[10, 20, 30, 40, 50]);
284        // RANGE BETWEEN 10 PRECEDING AND 10 FOLLOWING
285        let frame = range_frame(FrameBound::Preceding(10), FrameBound::Following(10));
286        // pos=1 (val=20): includes vals in [10, 30] → indices 0..=2
287        let (s, e) = evaluate_frame_bounds(&frame, 1, 5, &vals, &[]);
288        assert!(s == 0 && e == 2, "got ({s},{e})");
289    }
290
291    // GROUPS ─────────────────────────────────────────────────────────────────
292
293    #[test]
294    fn build_peer_groups_basic() {
295        let vals = num_vals(&[1, 1, 2, 3, 3]);
296        let pg = build_peer_groups(&vals);
297        assert_eq!(pg, vec![0, 0, 1, 2, 2]);
298    }
299
300    #[test]
301    fn groups_1_preceding_1_following() {
302        // Values: [1, 1, 2, 3, 3] → groups [0, 0, 1, 2, 2]
303        let vals = num_vals(&[1, 1, 2, 3, 3]);
304        let pg = build_peer_groups(&vals);
305        let frame = groups_frame(FrameBound::Preceding(1), FrameBound::Following(1));
306
307        // pos=0 (group 0): frame spans groups max(0-1,0)=0 to min(0+1,2)=1
308        //   → rows with group 0 or 1 → idx 0..=2
309        let (s, e) = evaluate_frame_bounds(&frame, 0, 5, &vals, &pg);
310        assert_eq!((s, e), (0, 2), "pos=0");
311
312        // pos=2 (group 1): frame spans groups 0 to 2 → idx 0..=4
313        let (s, e) = evaluate_frame_bounds(&frame, 2, 5, &vals, &pg);
314        assert_eq!((s, e), (0, 4), "pos=2");
315
316        // pos=4 (group 2): frame spans groups 1 to 2 → idx 2..=4
317        let (s, e) = evaluate_frame_bounds(&frame, 4, 5, &vals, &pg);
318        assert_eq!((s, e), (2, 4), "pos=4");
319    }
320}