Skip to main content

nodedb_query/window/
running.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! Peer-aware running-aggregate for RANGE BETWEEN UNBOUNDED PRECEDING AND
4//! CURRENT ROW.
5//!
6//! This is the specialised fast path for the most common ordered-window
7//! pattern (the PostgreSQL default frame for windows with ORDER BY).
8//!
9//! Peer semantics: two rows with equal ORDER BY values are in the same peer
10//! group. All peers in a group see the same aggregate result: the value
11//! computed at the *last* peer in the group (i.e., they all include each
12//! other). This matches PostgreSQL behaviour for RANGE CURRENT ROW.
13
14use super::helpers::{as_f64, get_field, order_keys_equal, set_window_col};
15use super::spec::WindowFuncSpec;
16
17/// Apply a peer-aware running aggregate over a sorted partition.
18///
19/// `indices` is the sorted slice of row indices within the partition.
20/// `order_by` is the `(column, ascending)` list from the window spec — used
21/// to detect peer groups.
22pub(super) fn running_aggregate(
23    rows: &mut [(String, serde_json::Value)],
24    indices: &[usize],
25    spec: &WindowFuncSpec,
26    field: &str,
27) {
28    let len = indices.len();
29    if len == 0 {
30        return;
31    }
32
33    // Accumulate state incrementally row-by-row, but defer writing results
34    // until the end of each peer group (so all peers see the group's final
35    // value). We track where the current peer group started.
36    let mut running_sum = 0.0f64;
37    let mut running_count = 0u64;
38    let mut running_min: Option<f64> = None;
39    let mut running_max: Option<f64> = None;
40
41    // Indices of rows belonging to the *current* peer group (deferred write).
42    let mut peer_start = 0usize;
43
44    for pos in 0..len {
45        let i = indices[pos];
46        let val = get_field(&rows[i].1, field);
47        if let Some(n) = as_f64(&val) {
48            running_sum += n;
49            running_count += 1;
50            running_min = Some(running_min.map_or(n, |m: f64| m.min(n)));
51            running_max = Some(running_max.map_or(n, |m: f64| m.max(n)));
52        } else if spec.func_name == "count" {
53            running_count += 1;
54        }
55
56        // Check if the *next* row starts a new peer group (or we're at the end).
57        let is_last_in_group =
58            pos + 1 == len || !order_keys_equal(rows, i, indices[pos + 1], &spec.order_by);
59
60        if is_last_in_group {
61            // Compute the result at the end of this peer group.
62            let result = match spec.func_name.as_str() {
63                "sum" => serde_json::json!(running_sum),
64                "count" => serde_json::json!(running_count),
65                "avg" => {
66                    if running_count > 0 {
67                        serde_json::json!(running_sum / running_count as f64)
68                    } else {
69                        serde_json::Value::Null
70                    }
71                }
72                "min" => running_min
73                    .map(|m| serde_json::json!(m))
74                    .unwrap_or(serde_json::Value::Null),
75                "max" => running_max
76                    .map(|m| serde_json::json!(m))
77                    .unwrap_or(serde_json::Value::Null),
78                "first_value" => get_field(&rows[indices[0]].1, field),
79                "last_value" => get_field(&rows[indices[pos]].1, field),
80                _ => serde_json::Value::Null,
81            };
82
83            // Write the *same* result to every row in the peer group.
84            for &peer_idx in &indices[peer_start..=pos] {
85                set_window_col(&mut rows[peer_idx].1, &spec.alias, result.clone());
86            }
87
88            peer_start = pos + 1;
89        }
90    }
91}