Skip to main content

nodedb_query/window/
ranking.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! Ranking and distribution window functions: row_number, rank, dense_rank,
4//! ntile, percent_rank, cume_dist.
5
6use crate::expr::SqlExpr;
7
8use super::helpers::{order_keys_equal, set_window_col};
9use super::spec::WindowFuncSpec;
10
11pub(super) fn apply_row_number(
12    rows: &mut [(String, serde_json::Value)],
13    indices: &[usize],
14    alias: &str,
15) {
16    for (rank, &i) in indices.iter().enumerate() {
17        set_window_col(&mut rows[i].1, alias, serde_json::json!(rank + 1));
18    }
19}
20
21pub(super) fn apply_rank(
22    rows: &mut [(String, serde_json::Value)],
23    indices: &[usize],
24    alias: &str,
25    order_by: &[(String, bool)],
26) {
27    if indices.is_empty() {
28        return;
29    }
30    let mut current_rank = 1;
31    set_window_col(&mut rows[indices[0]].1, alias, serde_json::json!(1));
32
33    for pos in 1..indices.len() {
34        if !order_keys_equal(rows, indices[pos - 1], indices[pos], order_by) {
35            current_rank = pos + 1;
36        }
37        set_window_col(
38            &mut rows[indices[pos]].1,
39            alias,
40            serde_json::json!(current_rank),
41        );
42    }
43}
44
45pub(super) fn apply_dense_rank(
46    rows: &mut [(String, serde_json::Value)],
47    indices: &[usize],
48    alias: &str,
49    order_by: &[(String, bool)],
50) {
51    if indices.is_empty() {
52        return;
53    }
54    let mut current_rank = 1;
55    set_window_col(&mut rows[indices[0]].1, alias, serde_json::json!(1));
56
57    for pos in 1..indices.len() {
58        if !order_keys_equal(rows, indices[pos - 1], indices[pos], order_by) {
59            current_rank += 1;
60        }
61        set_window_col(
62            &mut rows[indices[pos]].1,
63            alias,
64            serde_json::json!(current_rank),
65        );
66    }
67}
68
69pub(super) fn apply_ntile(
70    rows: &mut [(String, serde_json::Value)],
71    indices: &[usize],
72    spec: &WindowFuncSpec,
73) {
74    let n = spec
75        .args
76        .first()
77        .and_then(|e| {
78            if let SqlExpr::Literal(v) = e {
79                v.as_f64().map(|x| x as usize)
80            } else {
81                None
82            }
83        })
84        .unwrap_or(1)
85        .max(1);
86    let total = indices.len();
87    if total == 0 {
88        return;
89    }
90    for (pos, &i) in indices.iter().enumerate() {
91        // Integer division distributes rows as evenly as possible (PostgreSQL semantics).
92        let bucket = (pos * n / total) + 1;
93        set_window_col(&mut rows[i].1, &spec.alias, serde_json::json!(bucket));
94    }
95}
96
97/// PostgreSQL `percent_rank()` — `(rank - 1) / (partition_rows - 1)`. Single-
98/// row partitions return 0. Peer rows share their leader's value.
99pub(super) fn apply_percent_rank(
100    rows: &mut [(String, serde_json::Value)],
101    indices: &[usize],
102    alias: &str,
103    order_by: &[(String, bool)],
104) {
105    let total = indices.len();
106    if total == 0 {
107        return;
108    }
109    if total == 1 {
110        set_window_col(&mut rows[indices[0]].1, alias, serde_json::json!(0.0));
111        return;
112    }
113    let denom = (total - 1) as f64;
114    let mut current_rank = 1usize;
115    set_window_col(&mut rows[indices[0]].1, alias, serde_json::json!(0.0));
116
117    for pos in 1..total {
118        if !order_keys_equal(rows, indices[pos - 1], indices[pos], order_by) {
119            current_rank = pos + 1;
120        }
121        let pr = (current_rank - 1) as f64 / denom;
122        set_window_col(&mut rows[indices[pos]].1, alias, serde_json::json!(pr));
123    }
124}
125
126/// PostgreSQL `cume_dist()` — `rows_at_or_before_current_peer / partition_rows`.
127/// Peer rows (equal ORDER BY keys) share the same value, taken from the last
128/// peer's position.
129pub(super) fn apply_cume_dist(
130    rows: &mut [(String, serde_json::Value)],
131    indices: &[usize],
132    alias: &str,
133    order_by: &[(String, bool)],
134) {
135    let total = indices.len();
136    if total == 0 {
137        return;
138    }
139    let denom = total as f64;
140
141    let mut group_start = 0;
142    while group_start < total {
143        let mut group_end = group_start + 1;
144        while group_end < total
145            && order_keys_equal(rows, indices[group_start], indices[group_end], order_by)
146        {
147            group_end += 1;
148        }
149        let cd = group_end as f64 / denom;
150        for pos in group_start..group_end {
151            set_window_col(&mut rows[indices[pos]].1, alias, serde_json::json!(cd));
152        }
153        group_start = group_end;
154    }
155}