Skip to main content

nodedb_query/window/
offset.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! Offset window functions: lag, lead, nth_value.
4
5use crate::expr::SqlExpr;
6
7use super::helpers::{get_field, set_window_col};
8use super::spec::WindowFuncSpec;
9
10fn col_arg(spec: &WindowFuncSpec, idx: usize) -> &str {
11    spec.args
12        .get(idx)
13        .and_then(|e| match e {
14            SqlExpr::Column(c) => Some(c.as_str()),
15            _ => None,
16        })
17        .unwrap_or("*")
18}
19
20fn usize_arg(spec: &WindowFuncSpec, idx: usize, default: usize) -> usize {
21    spec.args
22        .get(idx)
23        .and_then(|e| match e {
24            SqlExpr::Literal(v) => v.as_f64().map(|n| n as usize),
25            _ => None,
26        })
27        .unwrap_or(default)
28}
29
30fn default_arg(spec: &WindowFuncSpec, idx: usize) -> serde_json::Value {
31    spec.args
32        .get(idx)
33        .and_then(|e| match e {
34            SqlExpr::Literal(v) => Some(serde_json::Value::from(v.clone())),
35            _ => None,
36        })
37        .unwrap_or(serde_json::Value::Null)
38}
39
40pub(super) fn apply_lag(
41    rows: &mut [(String, serde_json::Value)],
42    indices: &[usize],
43    spec: &WindowFuncSpec,
44) {
45    let field = col_arg(spec, 0);
46    let offset = usize_arg(spec, 1, 1);
47    let default = default_arg(spec, 2);
48
49    for (pos, &i) in indices.iter().enumerate() {
50        let val = if pos >= offset {
51            get_field(&rows[indices[pos - offset]].1, field)
52        } else {
53            default.clone()
54        };
55        set_window_col(&mut rows[i].1, &spec.alias, val);
56    }
57}
58
59pub(super) fn apply_lead(
60    rows: &mut [(String, serde_json::Value)],
61    indices: &[usize],
62    spec: &WindowFuncSpec,
63) {
64    let field = col_arg(spec, 0);
65    let offset = usize_arg(spec, 1, 1);
66    let default = default_arg(spec, 2);
67
68    for (pos, &i) in indices.iter().enumerate() {
69        let val = if pos + offset < indices.len() {
70            get_field(&rows[indices[pos + offset]].1, field)
71        } else {
72            default.clone()
73        };
74        set_window_col(&mut rows[i].1, &spec.alias, val);
75    }
76}
77
78/// PostgreSQL `nth_value(expr, n)` — value of `expr` at the n'th row of the
79/// window frame, NULL if the frame doesn't yet contain n rows. Default frame
80/// is RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, so the first n-1
81/// rows of each partition return NULL and rows from the n'th onward return
82/// the value of `expr` at the n'th row.
83pub(super) fn apply_nth_value(
84    rows: &mut [(String, serde_json::Value)],
85    indices: &[usize],
86    spec: &WindowFuncSpec,
87) {
88    let field = col_arg(spec, 0);
89    let n = usize_arg(spec, 1, 1).max(1);
90
91    for (pos, &i) in indices.iter().enumerate() {
92        let val = if pos + 1 >= n {
93            get_field(&rows[indices[n - 1]].1, field)
94        } else {
95            serde_json::Value::Null
96        };
97        set_window_col(&mut rows[i].1, &spec.alias, val);
98    }
99}