Skip to main content

nodedb_query/window/
helpers.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! Shared helpers for window-function evaluation.
4
5use std::collections::HashMap;
6
7use crate::expr::types::SqlExpr;
8
9/// Group row indices by partition key, preserving first-seen partition order.
10pub(super) fn build_partitions(
11    rows: &[(String, serde_json::Value)],
12    partition_by: &[SqlExpr],
13) -> Vec<Vec<usize>> {
14    if partition_by.is_empty() {
15        return vec![(0..rows.len()).collect()];
16    }
17
18    let mut groups: HashMap<String, Vec<usize>> = HashMap::new();
19    let mut order = Vec::new();
20
21    for (i, (_id, doc)) in rows.iter().enumerate() {
22        let key: String = partition_by
23            .iter()
24            .map(|expr| eval_expr_on_json(expr, doc).to_string())
25            .collect::<Vec<_>>()
26            .join("\x00");
27        let entry = groups.entry(key.clone()).or_default();
28        if entry.is_empty() {
29            order.push(key);
30        }
31        entry.push(i);
32    }
33
34    order.iter().filter_map(|k| groups.remove(k)).collect()
35}
36
37pub(super) fn set_window_col(row: &mut serde_json::Value, alias: &str, val: serde_json::Value) {
38    if let serde_json::Value::Object(map) = row {
39        map.insert(alias.to_string(), val);
40    }
41}
42
43pub(super) fn get_field(doc: &serde_json::Value, field: &str) -> serde_json::Value {
44    doc.get(field).cloned().unwrap_or(serde_json::Value::Null)
45}
46
47/// Evaluate a `SqlExpr` against a serde_json document, returning a serde_json value.
48pub(super) fn eval_expr_on_json(expr: &SqlExpr, doc: &serde_json::Value) -> serde_json::Value {
49    match expr {
50        SqlExpr::Column(name) => get_field(doc, name),
51        SqlExpr::Literal(v) => serde_json::Value::from(v.clone()),
52        other => {
53            let ndb_doc = nodedb_types::Value::from(doc.clone());
54            let result = other.eval(&ndb_doc);
55            serde_json::Value::from(result)
56        }
57    }
58}
59
60pub(super) fn as_f64(v: &serde_json::Value) -> Option<f64> {
61    match v {
62        serde_json::Value::Number(n) => n.as_f64(),
63        serde_json::Value::String(s) => s.parse().ok(),
64        _ => None,
65    }
66}
67
68/// Returns true when row at index `b` has the same ORDER BY key as row at
69/// index `a` (used by peer-aware ranking like RANK and PERCENT_RANK).
70pub(super) fn order_keys_equal(
71    rows: &[(String, serde_json::Value)],
72    a: usize,
73    b: usize,
74    order_by: &[(SqlExpr, bool)],
75) -> bool {
76    order_by.iter().all(|(expr, _)| {
77        let va = eval_expr_on_json(expr, &rows[a].1);
78        let vb = eval_expr_on_json(expr, &rows[b].1);
79        va == vb
80    })
81}