nodedb_query/window/
offset.rs1use crate::expr::SqlExpr;
6
7use super::helpers::{get_field, set_window_col};
8use super::spec::WindowFuncSpec;
9
10fn col_arg(spec: &WindowFuncSpec, idx: usize) -> &str {
11 spec.args
12 .get(idx)
13 .and_then(|e| match e {
14 SqlExpr::Column(c) => Some(c.as_str()),
15 _ => None,
16 })
17 .unwrap_or("*")
18}
19
20fn usize_arg(spec: &WindowFuncSpec, idx: usize, default: usize) -> usize {
21 spec.args
22 .get(idx)
23 .and_then(|e| match e {
24 SqlExpr::Literal(v) => v.as_f64().map(|n| n as usize),
25 _ => None,
26 })
27 .unwrap_or(default)
28}
29
30fn default_arg(spec: &WindowFuncSpec, idx: usize) -> serde_json::Value {
31 spec.args
32 .get(idx)
33 .and_then(|e| match e {
34 SqlExpr::Literal(v) => Some(serde_json::Value::from(v.clone())),
35 _ => None,
36 })
37 .unwrap_or(serde_json::Value::Null)
38}
39
40pub(super) fn apply_lag(
41 rows: &mut [(String, serde_json::Value)],
42 indices: &[usize],
43 spec: &WindowFuncSpec,
44) {
45 let field = col_arg(spec, 0);
46 let offset = usize_arg(spec, 1, 1);
47 let default = default_arg(spec, 2);
48
49 for (pos, &i) in indices.iter().enumerate() {
50 let val = if pos >= offset {
51 get_field(&rows[indices[pos - offset]].1, field)
52 } else {
53 default.clone()
54 };
55 set_window_col(&mut rows[i].1, &spec.alias, val);
56 }
57}
58
59pub(super) fn apply_lead(
60 rows: &mut [(String, serde_json::Value)],
61 indices: &[usize],
62 spec: &WindowFuncSpec,
63) {
64 let field = col_arg(spec, 0);
65 let offset = usize_arg(spec, 1, 1);
66 let default = default_arg(spec, 2);
67
68 for (pos, &i) in indices.iter().enumerate() {
69 let val = if pos + offset < indices.len() {
70 get_field(&rows[indices[pos + offset]].1, field)
71 } else {
72 default.clone()
73 };
74 set_window_col(&mut rows[i].1, &spec.alias, val);
75 }
76}
77
78pub(super) fn apply_nth_value(
84 rows: &mut [(String, serde_json::Value)],
85 indices: &[usize],
86 spec: &WindowFuncSpec,
87) {
88 let field = col_arg(spec, 0);
89 let n = usize_arg(spec, 1, 1).max(1);
90
91 for (pos, &i) in indices.iter().enumerate() {
92 let val = if pos + 1 >= n {
93 get_field(&rows[indices[n - 1]].1, field)
94 } else {
95 serde_json::Value::Null
96 };
97 set_window_col(&mut rows[i].1, &spec.alias, val);
98 }
99}