nodedb_query/window/running.rs
1// SPDX-License-Identifier: Apache-2.0
2
3//! Peer-aware running-aggregate for RANGE BETWEEN UNBOUNDED PRECEDING AND
4//! CURRENT ROW.
5//!
6//! This is the specialised fast path for the most common ordered-window
7//! pattern (the PostgreSQL default frame for windows with ORDER BY).
8//!
9//! Peer semantics: two rows with equal ORDER BY values are in the same peer
10//! group. All peers in a group see the same aggregate result: the value
11//! computed at the *last* peer in the group (i.e., they all include each
12//! other). This matches PostgreSQL behaviour for RANGE CURRENT ROW.
13
14use super::helpers::{as_f64, get_field, order_keys_equal, set_window_col};
15use super::spec::WindowFuncSpec;
16
17/// Apply a peer-aware running aggregate over a sorted partition.
18///
19/// `indices` is the sorted slice of row indices within the partition.
20/// `order_by` is the `(column, ascending)` list from the window spec — used
21/// to detect peer groups.
22pub(super) fn running_aggregate(
23 rows: &mut [(String, serde_json::Value)],
24 indices: &[usize],
25 spec: &WindowFuncSpec,
26 field: &str,
27) {
28 let len = indices.len();
29 if len == 0 {
30 return;
31 }
32
33 // Accumulate state incrementally row-by-row, but defer writing results
34 // until the end of each peer group (so all peers see the group's final
35 // value). We track where the current peer group started.
36 let mut running_sum = 0.0f64;
37 let mut running_count = 0u64;
38 let mut running_min: Option<f64> = None;
39 let mut running_max: Option<f64> = None;
40
41 // Indices of rows belonging to the *current* peer group (deferred write).
42 let mut peer_start = 0usize;
43
44 for pos in 0..len {
45 let i = indices[pos];
46 let val = get_field(&rows[i].1, field);
47 if let Some(n) = as_f64(&val) {
48 running_sum += n;
49 running_count += 1;
50 running_min = Some(running_min.map_or(n, |m: f64| m.min(n)));
51 running_max = Some(running_max.map_or(n, |m: f64| m.max(n)));
52 } else if spec.func_name == "count" {
53 running_count += 1;
54 }
55
56 // Check if the *next* row starts a new peer group (or we're at the end).
57 let is_last_in_group =
58 pos + 1 == len || !order_keys_equal(rows, i, indices[pos + 1], &spec.order_by);
59
60 if is_last_in_group {
61 // Compute the result at the end of this peer group.
62 let result = match spec.func_name.as_str() {
63 "sum" => serde_json::json!(running_sum),
64 "count" => serde_json::json!(running_count),
65 "avg" => {
66 if running_count > 0 {
67 serde_json::json!(running_sum / running_count as f64)
68 } else {
69 serde_json::Value::Null
70 }
71 }
72 "min" => running_min
73 .map(|m| serde_json::json!(m))
74 .unwrap_or(serde_json::Value::Null),
75 "max" => running_max
76 .map(|m| serde_json::json!(m))
77 .unwrap_or(serde_json::Value::Null),
78 "first_value" => get_field(&rows[indices[0]].1, field),
79 "last_value" => get_field(&rows[indices[pos]].1, field),
80 _ => serde_json::Value::Null,
81 };
82
83 // Write the *same* result to every row in the peer group.
84 for &peer_idx in &indices[peer_start..=pos] {
85 set_window_col(&mut rows[peer_idx].1, &spec.alias, result.clone());
86 }
87
88 peer_start = pos + 1;
89 }
90 }
91}